Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(const struct lu_env *env,
72                          struct ptlrpc_request *req, void *data, int rc);
73 int osc_cleanup(struct obd_device *obd);
74
75 /* Pack OSC object metadata for disk storage (LE byte order). */
76 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
77                       struct lov_stripe_md *lsm)
78 {
79         int lmm_size;
80         ENTRY;
81
82         lmm_size = sizeof(**lmmp);
83         if (!lmmp)
84                 RETURN(lmm_size);
85
86         if (*lmmp && !lsm) {
87                 OBD_FREE(*lmmp, lmm_size);
88                 *lmmp = NULL;
89                 RETURN(0);
90         }
91
92         if (!*lmmp) {
93                 OBD_ALLOC(*lmmp, lmm_size);
94                 if (!*lmmp)
95                         RETURN(-ENOMEM);
96         }
97
98         if (lsm) {
99                 LASSERT(lsm->lsm_object_id);
100                 LASSERT(lsm->lsm_object_gr);
101                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
102                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
103         }
104
105         RETURN(lmm_size);
106 }
107
108 /* Unpack OSC object metadata from disk storage (LE byte order). */
109 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
110                         struct lov_mds_md *lmm, int lmm_bytes)
111 {
112         int lsm_size;
113         ENTRY;
114
115         if (lmm != NULL) {
116                 if (lmm_bytes < sizeof (*lmm)) {
117                         CERROR("lov_mds_md too small: %d, need %d\n",
118                                lmm_bytes, (int)sizeof(*lmm));
119                         RETURN(-EINVAL);
120                 }
121                 /* XXX LOV_MAGIC etc check? */
122
123                 if (lmm->lmm_object_id == 0) {
124                         CERROR("lov_mds_md: zero lmm_object_id\n");
125                         RETURN(-EINVAL);
126                 }
127         }
128
129         lsm_size = lov_stripe_md_size(1);
130         if (lsmp == NULL)
131                 RETURN(lsm_size);
132
133         if (*lsmp != NULL && lmm == NULL) {
134                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
135                 OBD_FREE(*lsmp, lsm_size);
136                 *lsmp = NULL;
137                 RETURN(0);
138         }
139
140         if (*lsmp == NULL) {
141                 OBD_ALLOC(*lsmp, lsm_size);
142                 if (*lsmp == NULL)
143                         RETURN(-ENOMEM);
144                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
145                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
146                         OBD_FREE(*lsmp, lsm_size);
147                         RETURN(-ENOMEM);
148                 }
149                 loi_init((*lsmp)->lsm_oinfo[0]);
150         }
151
152         if (lmm != NULL) {
153                 /* XXX zero *lsmp? */
154                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
155                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
156                 LASSERT((*lsmp)->lsm_object_id);
157                 LASSERT((*lsmp)->lsm_object_gr);
158         }
159
160         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
161
162         RETURN(lsm_size);
163 }
164
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166                                  struct ost_body *body, void *capa)
167 {
168         struct obd_capa *oc = (struct obd_capa *)capa;
169         struct lustre_capa *c;
170
171         if (!capa)
172                 return;
173
174         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175         LASSERT(c);
176         capa_cpy(c, oc);
177         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178         DEBUG_CAPA(D_SEC, c, "pack");
179 }
180
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182                                      struct obd_info *oinfo)
183 {
184         struct ost_body *body;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187         LASSERT(body);
188
189         body->oa = *oinfo->oi_oa;
190         osc_pack_capa(req, body, oinfo->oi_capa);
191 }
192
193 static inline void osc_set_capa_size(struct ptlrpc_request *req,
194                                      const struct req_msg_field *field,
195                                      struct obd_capa *oc)
196 {
197         if (oc == NULL)
198                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199         else
200                 /* it is already calculated as sizeof struct obd_capa */
201                 ;
202 }
203
204 static int osc_getattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_async_args *aa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
215                                   lustre_swab_ost_body);
216         if (body) {
217                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
218                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
219
220                 /* This should really be sent by the OST */
221                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
222                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223         } else {
224                 CDEBUG(D_INFO, "can't unpack ost_body\n");
225                 rc = -EPROTO;
226                 aa->aa_oi->oi_oa->o_valid = 0;
227         }
228 out:
229         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
230         RETURN(rc);
231 }
232
233 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
234                              struct ptlrpc_request_set *set)
235 {
236         struct ptlrpc_request *req;
237         struct osc_async_args *aa;
238         int                    rc;
239         ENTRY;
240
241         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
242         if (req == NULL)
243                 RETURN(-ENOMEM);
244
245         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
246         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247         if (rc) {
248                 ptlrpc_request_free(req);
249                 RETURN(rc);
250         }
251
252         osc_pack_req_body(req, oinfo);
253
254         ptlrpc_request_set_replen(req);
255         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256
257         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
258         aa = ptlrpc_req_async_args(req);
259         aa->aa_oi = oinfo;
260
261         ptlrpc_set_add_req(set, req);
262         RETURN(0);
263 }
264
265 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         *oinfo->oi_oa = body->oa;
297
298         /* This should really be sent by the OST */
299         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
309                        struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
317                                         oinfo->oi_oa->o_gr > 0);
318
319         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320         if (req == NULL)
321                 RETURN(-ENOMEM);
322
323         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
325         if (rc) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329
330         osc_pack_req_body(req, oinfo);
331
332         ptlrpc_request_set_replen(req);
333
334         rc = ptlrpc_queue_wait(req);
335         if (rc)
336                 GOTO(out, rc);
337
338         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
339         if (body == NULL)
340                 GOTO(out, rc = -EPROTO);
341
342         *oinfo->oi_oa = body->oa;
343
344         EXIT;
345 out:
346         ptlrpc_req_finished(req);
347         RETURN(rc);
348 }
349
350 static int osc_setattr_interpret(const struct lu_env *env,
351                                  struct ptlrpc_request *req,
352                                  struct osc_async_args *aa, int rc)
353 {
354         struct ost_body *body;
355         ENTRY;
356
357         if (rc != 0)
358                 GOTO(out, rc);
359
360         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361         if (body == NULL)
362                 GOTO(out, rc = -EPROTO);
363
364         *aa->aa_oi->oi_oa = body->oa;
365 out:
366         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
367         RETURN(rc);
368 }
369
370 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
371                              struct obd_trans_info *oti,
372                              struct ptlrpc_request_set *rqset)
373 {
374         struct ptlrpc_request *req;
375         struct osc_async_args *aa;
376         int                    rc;
377         ENTRY;
378
379         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
380         if (req == NULL)
381                 RETURN(-ENOMEM);
382
383         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
384         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385         if (rc) {
386                 ptlrpc_request_free(req);
387                 RETURN(rc);
388         }
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
395                 LASSERT(oti);
396                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
397         }
398
399         /* do mds to ost setattr asynchronously */
400         if (!rqset) {
401                 /* Do not wait for response. */
402                 ptlrpcd_add_req(req);
403         } else {
404                 req->rq_interpret_reply =
405                         (ptlrpc_interpterer_t)osc_setattr_interpret;
406
407                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
408                 aa = ptlrpc_req_async_args(req);
409                 aa->aa_oi = oinfo;
410
411                 ptlrpc_set_add_req(rqset, req);
412         }
413
414         RETURN(0);
415 }
416
417 int osc_real_create(struct obd_export *exp, struct obdo *oa,
418                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
419 {
420         struct ptlrpc_request *req;
421         struct ost_body       *body;
422         struct lov_stripe_md  *lsm;
423         int                    rc;
424         ENTRY;
425
426         LASSERT(oa);
427         LASSERT(ea);
428
429         lsm = *ea;
430         if (!lsm) {
431                 rc = obd_alloc_memmd(exp, &lsm);
432                 if (rc < 0)
433                         RETURN(rc);
434         }
435
436         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
437         if (req == NULL)
438                 GOTO(out, rc = -ENOMEM);
439
440         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
441         if (rc) {
442                 ptlrpc_request_free(req);
443                 GOTO(out, rc);
444         }
445
446         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
447         LASSERT(body);
448         body->oa = *oa;
449
450         ptlrpc_request_set_replen(req);
451
452         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
453             oa->o_flags == OBD_FL_DELORPHAN) {
454                 DEBUG_REQ(D_HA, req,
455                           "delorphan from OST integration");
456                 /* Don't resend the delorphan req */
457                 req->rq_no_resend = req->rq_no_delay = 1;
458         }
459
460         rc = ptlrpc_queue_wait(req);
461         if (rc)
462                 GOTO(out_req, rc);
463
464         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
465         if (body == NULL)
466                 GOTO(out_req, rc = -EPROTO);
467
468         *oa = body->oa;
469
470         /* This should really be sent by the OST */
471         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
472         oa->o_valid |= OBD_MD_FLBLKSZ;
473
474         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
475          * have valid lsm_oinfo data structs, so don't go touching that.
476          * This needs to be fixed in a big way.
477          */
478         lsm->lsm_object_id = oa->o_id;
479         lsm->lsm_object_gr = oa->o_gr;
480         *ea = lsm;
481
482         if (oti != NULL) {
483                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
484
485                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
486                         if (!oti->oti_logcookies)
487                                 oti_alloc_cookies(oti, 1);
488                         *oti->oti_logcookies = oa->o_lcookie;
489                 }
490         }
491
492         CDEBUG(D_HA, "transno: "LPD64"\n",
493                lustre_msg_get_transno(req->rq_repmsg));
494 out_req:
495         ptlrpc_req_finished(req);
496 out:
497         if (rc && !*ea)
498                 obd_free_memmd(exp, &lsm);
499         RETURN(rc);
500 }
501
502 static int osc_punch_interpret(const struct lu_env *env,
503                                struct ptlrpc_request *req,
504                                struct osc_async_args *aa, int rc)
505 {
506         struct ost_body *body;
507         ENTRY;
508
509         if (rc != 0)
510                 GOTO(out, rc);
511
512         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
513         if (body == NULL)
514                 GOTO(out, rc = -EPROTO);
515
516         *aa->aa_oi->oi_oa = body->oa;
517 out:
518         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
519         RETURN(rc);
520 }
521
522 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
523                      struct obd_trans_info *oti,
524                      struct ptlrpc_request_set *rqset)
525 {
526         struct ptlrpc_request *req;
527         struct osc_async_args *aa;
528         struct ost_body       *body;
529         int                    rc;
530         ENTRY;
531
532         if (!oinfo->oi_oa) {
533                 CDEBUG(D_INFO, "oa NULL\n");
534                 RETURN(-EINVAL);
535         }
536
537         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
538         if (req == NULL)
539                 RETURN(-ENOMEM);
540
541         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
542         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
543         if (rc) {
544                 ptlrpc_request_free(req);
545                 RETURN(rc);
546         }
547         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
548         ptlrpc_at_set_req_timeout(req);
549         osc_pack_req_body(req, oinfo);
550
551         /* overload the size and blocks fields in the oa with start/end */
552         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
553         LASSERT(body);
554         body->oa.o_size = oinfo->oi_policy.l_extent.start;
555         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
556         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
557         ptlrpc_request_set_replen(req);
558
559
560         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
561         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
562         aa = ptlrpc_req_async_args(req);
563         aa->aa_oi = oinfo;
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_sync(struct obd_export *exp, struct obdo *oa,
570                     struct lov_stripe_md *md, obd_size start, obd_size end,
571                     void *capa)
572 {
573         struct ptlrpc_request *req;
574         struct ost_body       *body;
575         int                    rc;
576         ENTRY;
577
578         if (!oa) {
579                 CDEBUG(D_INFO, "oa NULL\n");
580                 RETURN(-EINVAL);
581         }
582
583         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
584         if (req == NULL)
585                 RETURN(-ENOMEM);
586
587         osc_set_capa_size(req, &RMF_CAPA1, capa);
588         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
589         if (rc) {
590                 ptlrpc_request_free(req);
591                 RETURN(rc);
592         }
593
594         /* overload the size and blocks fields in the oa with start/end */
595         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
596         LASSERT(body);
597         body->oa = *oa;
598         body->oa.o_size = start;
599         body->oa.o_blocks = end;
600         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
601         osc_pack_capa(req, body, capa);
602
603         ptlrpc_request_set_replen(req);
604
605         rc = ptlrpc_queue_wait(req);
606         if (rc)
607                 GOTO(out, rc);
608
609         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
610         if (body == NULL)
611                 GOTO(out, rc = -EPROTO);
612
613         *oa = body->oa;
614
615         EXIT;
616  out:
617         ptlrpc_req_finished(req);
618         return rc;
619 }
620
621 /* Find and cancel locally locks matched by @mode in the resource found by
622  * @objid. Found locks are added into @cancel list. Returns the amount of
623  * locks added to @cancels list. */
624 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
625                                    struct list_head *cancels, ldlm_mode_t mode,
626                                    int lock_flags)
627 {
628         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
629         struct ldlm_res_id res_id;
630         struct ldlm_resource *res;
631         int count;
632         ENTRY;
633
634         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
635         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
636         if (res == NULL)
637                 RETURN(0);
638
639         LDLM_RESOURCE_ADDREF(res);
640         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
641                                            lock_flags, 0, NULL);
642         LDLM_RESOURCE_DELREF(res);
643         ldlm_resource_putref(res);
644         RETURN(count);
645 }
646
647 static int osc_destroy_interpret(const struct lu_env *env,
648                                  struct ptlrpc_request *req, void *data,
649                                  int rc)
650 {
651         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
652
653         atomic_dec(&cli->cl_destroy_in_flight);
654         cfs_waitq_signal(&cli->cl_destroy_waitq);
655         return 0;
656 }
657
658 static int osc_can_send_destroy(struct client_obd *cli)
659 {
660         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
661             cli->cl_max_rpcs_in_flight) {
662                 /* The destroy request can be sent */
663                 return 1;
664         }
665         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
666             cli->cl_max_rpcs_in_flight) {
667                 /*
668                  * The counter has been modified between the two atomic
669                  * operations.
670                  */
671                 cfs_waitq_signal(&cli->cl_destroy_waitq);
672         }
673         return 0;
674 }
675
676 /* Destroy requests can be async always on the client, and we don't even really
677  * care about the return code since the client cannot do anything at all about
678  * a destroy failure.
679  * When the MDS is unlinking a filename, it saves the file objects into a
680  * recovery llog, and these object records are cancelled when the OST reports
681  * they were destroyed and sync'd to disk (i.e. transaction committed).
682  * If the client dies, or the OST is down when the object should be destroyed,
683  * the records are not cancelled, and when the OST reconnects to the MDS next,
684  * it will retrieve the llog unlink logs and then sends the log cancellation
685  * cookies to the MDS after committing destroy transactions. */
686 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
687                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
688                        struct obd_export *md_export)
689 {
690         struct client_obd     *cli = &exp->exp_obd->u.cli;
691         struct ptlrpc_request *req;
692         struct ost_body       *body;
693         CFS_LIST_HEAD(cancels);
694         int rc, count;
695         ENTRY;
696
697         if (!oa) {
698                 CDEBUG(D_INFO, "oa NULL\n");
699                 RETURN(-EINVAL);
700         }
701
702         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
703                                         LDLM_FL_DISCARD_DATA);
704
705         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
706         if (req == NULL) {
707                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
708                 RETURN(-ENOMEM);
709         }
710
711         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
712                                0, &cancels, count);
713         if (rc) {
714                 ptlrpc_request_free(req);
715                 RETURN(rc);
716         }
717
718         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
719         req->rq_interpret_reply = osc_destroy_interpret;
720         ptlrpc_at_set_req_timeout(req);
721
722         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
723                 oa->o_lcookie = *oti->oti_logcookies;
724         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
725         LASSERT(body);
726         body->oa = *oa;
727
728         ptlrpc_request_set_replen(req);
729
730         if (!osc_can_send_destroy(cli)) {
731                 struct l_wait_info lwi = { 0 };
732
733                 /*
734                  * Wait until the number of on-going destroy RPCs drops
735                  * under max_rpc_in_flight
736                  */
737                 l_wait_event_exclusive(cli->cl_destroy_waitq,
738                                        osc_can_send_destroy(cli), &lwi);
739         }
740
741         /* Do not wait for response */
742         ptlrpcd_add_req(req);
743         RETURN(0);
744 }
745
746 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
747                                 long writing_bytes)
748 {
749         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
750
751         LASSERT(!(oa->o_valid & bits));
752
753         oa->o_valid |= bits;
754         client_obd_list_lock(&cli->cl_loi_list_lock);
755         oa->o_dirty = cli->cl_dirty;
756         if (cli->cl_dirty > cli->cl_dirty_max) {
757                 CERROR("dirty %lu > dirty_max %lu\n",
758                        cli->cl_dirty, cli->cl_dirty_max);
759                 oa->o_undirty = 0;
760         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
761                 CERROR("dirty %d > system dirty_max %d\n",
762                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
763                 oa->o_undirty = 0;
764         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
765                 CERROR("dirty %lu - dirty_max %lu too big???\n",
766                        cli->cl_dirty, cli->cl_dirty_max);
767                 oa->o_undirty = 0;
768         } else {
769                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
770                                 (cli->cl_max_rpcs_in_flight + 1);
771                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
772         }
773         oa->o_grant = cli->cl_avail_grant;
774         oa->o_dropped = cli->cl_lost_grant;
775         cli->cl_lost_grant = 0;
776         client_obd_list_unlock(&cli->cl_loi_list_lock);
777         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
778                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
779 }
780
781 /* caller must hold loi_list_lock */
782 static void osc_consume_write_grant(struct client_obd *cli,
783                                     struct brw_page *pga)
784 {
785         atomic_inc(&obd_dirty_pages);
786         cli->cl_dirty += CFS_PAGE_SIZE;
787         cli->cl_avail_grant -= CFS_PAGE_SIZE;
788         pga->flag |= OBD_BRW_FROM_GRANT;
789         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
790                CFS_PAGE_SIZE, pga, pga->pg);
791         LASSERT(cli->cl_avail_grant >= 0);
792 }
793
794 /* the companion to osc_consume_write_grant, called when a brw has completed.
795  * must be called with the loi lock held. */
796 static void osc_release_write_grant(struct client_obd *cli,
797                                     struct brw_page *pga, int sent)
798 {
799         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
800         ENTRY;
801
802         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
803                 EXIT;
804                 return;
805         }
806
807         pga->flag &= ~OBD_BRW_FROM_GRANT;
808         atomic_dec(&obd_dirty_pages);
809         cli->cl_dirty -= CFS_PAGE_SIZE;
810         if (!sent) {
811                 cli->cl_lost_grant += CFS_PAGE_SIZE;
812                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
813                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
814         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
815                 /* For short writes we shouldn't count parts of pages that
816                  * span a whole block on the OST side, or our accounting goes
817                  * wrong.  Should match the code in filter_grant_check. */
818                 int offset = pga->off & ~CFS_PAGE_MASK;
819                 int count = pga->count + (offset & (blocksize - 1));
820                 int end = (offset + pga->count) & (blocksize - 1);
821                 if (end)
822                         count += blocksize - end;
823
824                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
825                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
826                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
827                        cli->cl_avail_grant, cli->cl_dirty);
828         }
829
830         EXIT;
831 }
832
833 static unsigned long rpcs_in_flight(struct client_obd *cli)
834 {
835         return cli->cl_r_in_flight + cli->cl_w_in_flight;
836 }
837
838 /* caller must hold loi_list_lock */
839 void osc_wake_cache_waiters(struct client_obd *cli)
840 {
841         struct list_head *l, *tmp;
842         struct osc_cache_waiter *ocw;
843
844         ENTRY;
845         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
846                 /* if we can't dirty more, we must wait until some is written */
847                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
848                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
849                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
850                                "osc max %ld, sys max %d\n", cli->cl_dirty,
851                                cli->cl_dirty_max, obd_max_dirty_pages);
852                         return;
853                 }
854
855                 /* if still dirty cache but no grant wait for pending RPCs that
856                  * may yet return us some grant before doing sync writes */
857                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
858                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
859                                cli->cl_w_in_flight);
860                         return;
861                 }
862
863                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
864                 list_del_init(&ocw->ocw_entry);
865                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
866                         /* no more RPCs in flight to return grant, do sync IO */
867                         ocw->ocw_rc = -EDQUOT;
868                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
869                 } else {
870                         osc_consume_write_grant(cli,
871                                                 &ocw->ocw_oap->oap_brw_page);
872                 }
873
874                 cfs_waitq_signal(&ocw->ocw_waitq);
875         }
876
877         EXIT;
878 }
879
880 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
881 {
882         client_obd_list_lock(&cli->cl_loi_list_lock);
883         cli->cl_avail_grant = ocd->ocd_grant;
884         client_obd_list_unlock(&cli->cl_loi_list_lock);
885
886         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
887                cli->cl_avail_grant, cli->cl_lost_grant);
888         LASSERT(cli->cl_avail_grant >= 0);
889 }
890
891 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
892 {
893         client_obd_list_lock(&cli->cl_loi_list_lock);
894         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
895         if (body->oa.o_valid & OBD_MD_FLGRANT)
896                 cli->cl_avail_grant += body->oa.o_grant;
897         /* waiters are woken in brw_interpret */
898         client_obd_list_unlock(&cli->cl_loi_list_lock);
899 }
900
901 /* We assume that the reason this OSC got a short read is because it read
902  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
903  * via the LOV, and it _knows_ it's reading inside the file, it's just that
904  * this stripe never got written at or beyond this stripe offset yet. */
905 static void handle_short_read(int nob_read, obd_count page_count,
906                               struct brw_page **pga)
907 {
908         char *ptr;
909         int i = 0;
910
911         /* skip bytes read OK */
912         while (nob_read > 0) {
913                 LASSERT (page_count > 0);
914
915                 if (pga[i]->count > nob_read) {
916                         /* EOF inside this page */
917                         ptr = cfs_kmap(pga[i]->pg) +
918                                 (pga[i]->off & ~CFS_PAGE_MASK);
919                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
920                         cfs_kunmap(pga[i]->pg);
921                         page_count--;
922                         i++;
923                         break;
924                 }
925
926                 nob_read -= pga[i]->count;
927                 page_count--;
928                 i++;
929         }
930
931         /* zero remaining pages */
932         while (page_count-- > 0) {
933                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
934                 memset(ptr, 0, pga[i]->count);
935                 cfs_kunmap(pga[i]->pg);
936                 i++;
937         }
938 }
939
940 static int check_write_rcs(struct ptlrpc_request *req,
941                            int requested_nob, int niocount,
942                            obd_count page_count, struct brw_page **pga)
943 {
944         int    *remote_rcs, i;
945
946         /* return error if any niobuf was in error */
947         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
948                                         sizeof(*remote_rcs) * niocount, NULL);
949         if (remote_rcs == NULL) {
950                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
951                 return(-EPROTO);
952         }
953         if (lustre_msg_swabbed(req->rq_repmsg))
954                 for (i = 0; i < niocount; i++)
955                         __swab32s(&remote_rcs[i]);
956
957         for (i = 0; i < niocount; i++) {
958                 if (remote_rcs[i] < 0)
959                         return(remote_rcs[i]);
960
961                 if (remote_rcs[i] != 0) {
962                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
963                                 i, remote_rcs[i], req);
964                         return(-EPROTO);
965                 }
966         }
967
968         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
969                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
970                        req->rq_bulk->bd_nob_transferred, requested_nob);
971                 return(-EPROTO);
972         }
973
974         return (0);
975 }
976
977 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
978 {
979         if (p1->flag != p2->flag) {
980                 unsigned mask = ~OBD_BRW_FROM_GRANT;
981
982                 /* warn if we try to combine flags that we don't know to be
983                  * safe to combine */
984                 if ((p1->flag & mask) != (p2->flag & mask))
985                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
986                                "same brw?\n", p1->flag, p2->flag);
987                 return 0;
988         }
989
990         return (p1->off + p1->count == p2->off);
991 }
992
993 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
994                                    struct brw_page **pga, int opc,
995                                    cksum_type_t cksum_type)
996 {
997         __u32 cksum;
998         int i = 0;
999
1000         LASSERT (pg_count > 0);
1001         cksum = init_checksum(cksum_type);
1002         while (nob > 0 && pg_count > 0) {
1003                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1004                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1005                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1006
1007                 /* corrupt the data before we compute the checksum, to
1008                  * simulate an OST->client data error */
1009                 if (i == 0 && opc == OST_READ &&
1010                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1011                         memcpy(ptr + off, "bad1", min(4, nob));
1012                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1013                 cfs_kunmap(pga[i]->pg);
1014                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1015                                off, cksum);
1016
1017                 nob -= pga[i]->count;
1018                 pg_count--;
1019                 i++;
1020         }
1021         /* For sending we only compute the wrong checksum instead
1022          * of corrupting the data so it is still correct on a redo */
1023         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1024                 cksum++;
1025
1026         return cksum;
1027 }
1028
1029 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1030                                 struct lov_stripe_md *lsm, obd_count page_count,
1031                                 struct brw_page **pga,
1032                                 struct ptlrpc_request **reqp,
1033                                 struct obd_capa *ocapa)
1034 {
1035         struct ptlrpc_request   *req;
1036         struct ptlrpc_bulk_desc *desc;
1037         struct ost_body         *body;
1038         struct obd_ioobj        *ioobj;
1039         struct niobuf_remote    *niobuf;
1040         int niocount, i, requested_nob, opc, rc;
1041         struct osc_brw_async_args *aa;
1042         struct req_capsule      *pill;
1043         struct brw_page *pg_prev;
1044
1045         ENTRY;
1046         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1047                 RETURN(-ENOMEM); /* Recoverable */
1048         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1049                 RETURN(-EINVAL); /* Fatal */
1050
1051         if ((cmd & OBD_BRW_WRITE) != 0) {
1052                 opc = OST_WRITE;
1053                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1054                                                 cli->cl_import->imp_rq_pool,
1055                                                 &RQF_OST_BRW);
1056         } else {
1057                 opc = OST_READ;
1058                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1059         }
1060
1061         if (req == NULL)
1062                 RETURN(-ENOMEM);
1063
1064         for (niocount = i = 1; i < page_count; i++) {
1065                 if (!can_merge_pages(pga[i - 1], pga[i]))
1066                         niocount++;
1067         }
1068
1069         pill = &req->rq_pill;
1070         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1071                              niocount * sizeof(*niobuf));
1072         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1073
1074         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1075         if (rc) {
1076                 ptlrpc_request_free(req);
1077                 RETURN(rc);
1078         }
1079         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1080         ptlrpc_at_set_req_timeout(req);
1081
1082         if (opc == OST_WRITE)
1083                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1084                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1085         else
1086                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1087                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1088
1089         if (desc == NULL)
1090                 GOTO(out, rc = -ENOMEM);
1091         /* NB request now owns desc and will free it when it gets freed */
1092
1093         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1094         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1095         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1096         LASSERT(body && ioobj && niobuf);
1097
1098         body->oa = *oa;
1099
1100         obdo_to_ioobj(oa, ioobj);
1101         ioobj->ioo_bufcnt = niocount;
1102         osc_pack_capa(req, body, ocapa);
1103         LASSERT (page_count > 0);
1104         pg_prev = pga[0];
1105         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1106                 struct brw_page *pg = pga[i];
1107
1108                 LASSERT(pg->count > 0);
1109                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1110                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1111                          pg->off, pg->count);
1112 #ifdef __linux__
1113                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1114                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1115                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1116                          i, page_count,
1117                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1118                          pg_prev->pg, page_private(pg_prev->pg),
1119                          pg_prev->pg->index, pg_prev->off);
1120 #else
1121                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1122                          "i %d p_c %u\n", i, page_count);
1123 #endif
1124                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1125                         (pg->flag & OBD_BRW_SRVLOCK));
1126
1127                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1128                                       pg->count);
1129                 requested_nob += pg->count;
1130
1131                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1132                         niobuf--;
1133                         niobuf->len += pg->count;
1134                 } else {
1135                         niobuf->offset = pg->off;
1136                         niobuf->len    = pg->count;
1137                         niobuf->flags  = pg->flag;
1138                 }
1139                 pg_prev = pg;
1140         }
1141
1142         LASSERTF((void *)(niobuf - niocount) ==
1143                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1144                                niocount * sizeof(*niobuf)),
1145                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1146                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1147                 (void *)(niobuf - niocount));
1148
1149         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1150
1151         /* size[REQ_REC_OFF] still sizeof (*body) */
1152         if (opc == OST_WRITE) {
1153                 if (unlikely(cli->cl_checksum) &&
1154                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1155                         /* store cl_cksum_type in a local variable since
1156                          * it can be changed via lprocfs */
1157                         cksum_type_t cksum_type = cli->cl_cksum_type;
1158
1159                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1160                                 oa->o_flags = body->oa.o_flags = 0;
1161                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1162                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1163                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1164                                                              page_count, pga,
1165                                                              OST_WRITE,
1166                                                              cksum_type);
1167                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1168                                body->oa.o_cksum);
1169                         /* save this in 'oa', too, for later checking */
1170                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1171                         oa->o_flags |= cksum_type_pack(cksum_type);
1172                 } else {
1173                         /* clear out the checksum flag, in case this is a
1174                          * resend but cl_checksum is no longer set. b=11238 */
1175                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1176                 }
1177                 oa->o_cksum = body->oa.o_cksum;
1178                 /* 1 RC per niobuf */
1179                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1180                                      sizeof(__u32) * niocount);
1181         } else {
1182                 if (unlikely(cli->cl_checksum) &&
1183                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1184                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1185                                 body->oa.o_flags = 0;
1186                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1187                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1188                 }
1189                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1190                 /* 1 RC for the whole I/O */
1191         }
1192         ptlrpc_request_set_replen(req);
1193
1194         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1195         aa = ptlrpc_req_async_args(req);
1196         aa->aa_oa = oa;
1197         aa->aa_requested_nob = requested_nob;
1198         aa->aa_nio_count = niocount;
1199         aa->aa_page_count = page_count;
1200         aa->aa_resends = 0;
1201         aa->aa_ppga = pga;
1202         aa->aa_cli = cli;
1203         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1204
1205         *reqp = req;
1206         RETURN(0);
1207
1208  out:
1209         ptlrpc_req_finished(req);
1210         RETURN(rc);
1211 }
1212
1213 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1214                                 __u32 client_cksum, __u32 server_cksum, int nob,
1215                                 obd_count page_count, struct brw_page **pga,
1216                                 cksum_type_t client_cksum_type)
1217 {
1218         __u32 new_cksum;
1219         char *msg;
1220         cksum_type_t cksum_type;
1221
1222         if (server_cksum == client_cksum) {
1223                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1224                 return 0;
1225         }
1226
1227         if (oa->o_valid & OBD_MD_FLFLAGS)
1228                 cksum_type = cksum_type_unpack(oa->o_flags);
1229         else
1230                 cksum_type = OBD_CKSUM_CRC32;
1231
1232         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1233                                       cksum_type);
1234
1235         if (cksum_type != client_cksum_type)
1236                 msg = "the server did not use the checksum type specified in "
1237                       "the original request - likely a protocol problem";
1238         else if (new_cksum == server_cksum)
1239                 msg = "changed on the client after we checksummed it - "
1240                       "likely false positive due to mmap IO (bug 11742)";
1241         else if (new_cksum == client_cksum)
1242                 msg = "changed in transit before arrival at OST";
1243         else
1244                 msg = "changed in transit AND doesn't match the original - "
1245                       "likely false positive due to mmap IO (bug 11742)";
1246
1247         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1248                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1249                            "["LPU64"-"LPU64"]\n",
1250                            msg, libcfs_nid2str(peer->nid),
1251                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1252                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1253                                                         (__u64)0,
1254                            oa->o_id,
1255                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1256                            pga[0]->off,
1257                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1258         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1259                "client csum now %x\n", client_cksum, client_cksum_type,
1260                server_cksum, cksum_type, new_cksum);
1261         return 1;
1262 }
1263
1264 /* Note rc enters this function as number of bytes transferred */
1265 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1266 {
1267         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1268         const lnet_process_id_t *peer =
1269                         &req->rq_import->imp_connection->c_peer;
1270         struct client_obd *cli = aa->aa_cli;
1271         struct ost_body *body;
1272         __u32 client_cksum = 0;
1273         ENTRY;
1274
1275         if (rc < 0 && rc != -EDQUOT)
1276                 RETURN(rc);
1277
1278         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1279         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1280                                   lustre_swab_ost_body);
1281         if (body == NULL) {
1282                 CDEBUG(D_INFO, "Can't unpack body\n");
1283                 RETURN(-EPROTO);
1284         }
1285
1286         /* set/clear over quota flag for a uid/gid */
1287         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1288             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1289                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1290                              body->oa.o_gid, body->oa.o_valid,
1291                              body->oa.o_flags);
1292
1293         if (rc < 0)
1294                 RETURN(rc);
1295
1296         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1297                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1298
1299         osc_update_grant(cli, body);
1300
1301         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1302                 if (rc > 0) {
1303                         CERROR("Unexpected +ve rc %d\n", rc);
1304                         RETURN(-EPROTO);
1305                 }
1306                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1307
1308                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1309                     check_write_checksum(&body->oa, peer, client_cksum,
1310                                          body->oa.o_cksum, aa->aa_requested_nob,
1311                                          aa->aa_page_count, aa->aa_ppga,
1312                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1313                         RETURN(-EAGAIN);
1314
1315                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1316                         RETURN(-EAGAIN);
1317
1318                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1319                                      aa->aa_page_count, aa->aa_ppga);
1320                 GOTO(out, rc);
1321         }
1322
1323         /* The rest of this function executes only for OST_READs */
1324         if (rc > aa->aa_requested_nob) {
1325                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1326                        aa->aa_requested_nob);
1327                 RETURN(-EPROTO);
1328         }
1329
1330         if (rc != req->rq_bulk->bd_nob_transferred) {
1331                 CERROR ("Unexpected rc %d (%d transferred)\n",
1332                         rc, req->rq_bulk->bd_nob_transferred);
1333                 return (-EPROTO);
1334         }
1335
1336         if (rc < aa->aa_requested_nob)
1337                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1338
1339         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1340                                          aa->aa_ppga))
1341                 GOTO(out, rc = -EAGAIN);
1342
1343         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1344                 static int cksum_counter;
1345                 __u32      server_cksum = body->oa.o_cksum;
1346                 char      *via;
1347                 char      *router;
1348                 cksum_type_t cksum_type;
1349
1350                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1351                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1352                 else
1353                         cksum_type = OBD_CKSUM_CRC32;
1354                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1355                                                  aa->aa_ppga, OST_READ,
1356                                                  cksum_type);
1357
1358                 if (peer->nid == req->rq_bulk->bd_sender) {
1359                         via = router = "";
1360                 } else {
1361                         via = " via ";
1362                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1363                 }
1364
1365                 if (server_cksum == ~0 && rc > 0) {
1366                         CERROR("Protocol error: server %s set the 'checksum' "
1367                                "bit, but didn't send a checksum.  Not fatal, "
1368                                "but please notify on http://bugzilla.lustre.org/\n",
1369                                libcfs_nid2str(peer->nid));
1370                 } else if (server_cksum != client_cksum) {
1371                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1372                                            "%s%s%s inum "LPU64"/"LPU64" object "
1373                                            LPU64"/"LPU64" extent "
1374                                            "["LPU64"-"LPU64"]\n",
1375                                            req->rq_import->imp_obd->obd_name,
1376                                            libcfs_nid2str(peer->nid),
1377                                            via, router,
1378                                            body->oa.o_valid & OBD_MD_FLFID ?
1379                                                 body->oa.o_fid : (__u64)0,
1380                                            body->oa.o_valid & OBD_MD_FLFID ?
1381                                                 body->oa.o_generation :(__u64)0,
1382                                            body->oa.o_id,
1383                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1384                                                 body->oa.o_gr : (__u64)0,
1385                                            aa->aa_ppga[0]->off,
1386                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1387                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1388                                                                         1);
1389                         CERROR("client %x, server %x, cksum_type %x\n",
1390                                client_cksum, server_cksum, cksum_type);
1391                         cksum_counter = 0;
1392                         aa->aa_oa->o_cksum = client_cksum;
1393                         rc = -EAGAIN;
1394                 } else {
1395                         cksum_counter++;
1396                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1397                         rc = 0;
1398                 }
1399         } else if (unlikely(client_cksum)) {
1400                 static int cksum_missed;
1401
1402                 cksum_missed++;
1403                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1404                         CERROR("Checksum %u requested from %s but not sent\n",
1405                                cksum_missed, libcfs_nid2str(peer->nid));
1406         } else {
1407                 rc = 0;
1408         }
1409 out:
1410         if (rc >= 0)
1411                 *aa->aa_oa = body->oa;
1412
1413         RETURN(rc);
1414 }
1415
1416 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1417                             struct lov_stripe_md *lsm,
1418                             obd_count page_count, struct brw_page **pga,
1419                             struct obd_capa *ocapa)
1420 {
1421         struct ptlrpc_request *req;
1422         int                    rc;
1423         cfs_waitq_t            waitq;
1424         int                    resends = 0;
1425         struct l_wait_info     lwi;
1426
1427         ENTRY;
1428
1429         cfs_waitq_init(&waitq);
1430
1431 restart_bulk:
1432         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1433                                   page_count, pga, &req, ocapa);
1434         if (rc != 0)
1435                 return (rc);
1436
1437         rc = ptlrpc_queue_wait(req);
1438
1439         if (rc == -ETIMEDOUT && req->rq_resend) {
1440                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1441                 ptlrpc_req_finished(req);
1442                 goto restart_bulk;
1443         }
1444
1445         rc = osc_brw_fini_request(req, rc);
1446
1447         ptlrpc_req_finished(req);
1448         if (osc_recoverable_error(rc)) {
1449                 resends++;
1450                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1451                         CERROR("too many resend retries, returning error\n");
1452                         RETURN(-EIO);
1453                 }
1454
1455                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1456                 l_wait_event(waitq, 0, &lwi);
1457
1458                 goto restart_bulk;
1459         }
1460
1461         RETURN (rc);
1462 }
1463
1464 int osc_brw_redo_request(struct ptlrpc_request *request,
1465                          struct osc_brw_async_args *aa)
1466 {
1467         struct ptlrpc_request *new_req;
1468         struct ptlrpc_request_set *set = request->rq_set;
1469         struct osc_brw_async_args *new_aa;
1470         struct osc_async_page *oap;
1471         int rc = 0;
1472         ENTRY;
1473
1474         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1475                 CERROR("too many resend retries, returning error\n");
1476                 RETURN(-EIO);
1477         }
1478
1479         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1480 /*
1481         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1482         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1483                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1484                                            REQ_REC_OFF + 3);
1485 */
1486         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1487                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1488                                   aa->aa_cli, aa->aa_oa,
1489                                   NULL /* lsm unused by osc currently */,
1490                                   aa->aa_page_count, aa->aa_ppga,
1491                                   &new_req, NULL /* ocapa */);
1492         if (rc)
1493                 RETURN(rc);
1494
1495         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1496
1497         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1498                 if (oap->oap_request != NULL) {
1499                         LASSERTF(request == oap->oap_request,
1500                                  "request %p != oap_request %p\n",
1501                                  request, oap->oap_request);
1502                         if (oap->oap_interrupted) {
1503                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1504                                 ptlrpc_req_finished(new_req);
1505                                 RETURN(-EINTR);
1506                         }
1507                 }
1508         }
1509         /* New request takes over pga and oaps from old request.
1510          * Note that copying a list_head doesn't work, need to move it... */
1511         aa->aa_resends++;
1512         new_req->rq_interpret_reply = request->rq_interpret_reply;
1513         new_req->rq_async_args = request->rq_async_args;
1514         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1515
1516         new_aa = ptlrpc_req_async_args(new_req);
1517
1518         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1519         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1520         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1521
1522         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1523                 if (oap->oap_request) {
1524                         ptlrpc_req_finished(oap->oap_request);
1525                         oap->oap_request = ptlrpc_request_addref(new_req);
1526                 }
1527         }
1528
1529         /* use ptlrpc_set_add_req is safe because interpret functions work
1530          * in check_set context. only one way exist with access to request
1531          * from different thread got -EINTR - this way protected with
1532          * cl_loi_list_lock */
1533         ptlrpc_set_add_req(set, new_req);
1534
1535         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1536
1537         DEBUG_REQ(D_INFO, new_req, "new request");
1538         RETURN(0);
1539 }
1540
1541 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1542                           struct lov_stripe_md *lsm, obd_count page_count,
1543                           struct brw_page **pga, struct ptlrpc_request_set *set,
1544                           struct obd_capa *ocapa)
1545 {
1546         struct ptlrpc_request     *req;
1547         struct client_obd         *cli = &exp->exp_obd->u.cli;
1548         int                        rc, i;
1549         struct osc_brw_async_args *aa;
1550         ENTRY;
1551
1552         /* Consume write credits even if doing a sync write -
1553          * otherwise we may run out of space on OST due to grant. */
1554         if (cmd == OBD_BRW_WRITE) {
1555                 spin_lock(&cli->cl_loi_list_lock);
1556                 for (i = 0; i < page_count; i++) {
1557                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1558                                 osc_consume_write_grant(cli, pga[i]);
1559                 }
1560                 spin_unlock(&cli->cl_loi_list_lock);
1561         }
1562
1563         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1564                                   &req, ocapa);
1565
1566         aa = ptlrpc_req_async_args(req);
1567         if (cmd == OBD_BRW_READ) {
1568                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1569                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1570         } else {
1571                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1572                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1573                                  cli->cl_w_in_flight);
1574         }
1575         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
1576
1577         LASSERT(list_empty(&aa->aa_oaps));
1578         if (rc == 0) {
1579                 req->rq_interpret_reply = brw_interpret;
1580                 ptlrpc_set_add_req(set, req);
1581                 client_obd_list_lock(&cli->cl_loi_list_lock);
1582                 if (cmd == OBD_BRW_READ)
1583                         cli->cl_r_in_flight++;
1584                 else
1585                         cli->cl_w_in_flight++;
1586                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1587                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1588         } else if (cmd == OBD_BRW_WRITE) {
1589                 client_obd_list_lock(&cli->cl_loi_list_lock);
1590                 for (i = 0; i < page_count; i++)
1591                         osc_release_write_grant(cli, pga[i], 0);
1592                 osc_wake_cache_waiters(cli);
1593                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1594         }
1595         RETURN (rc);
1596 }
1597
1598 /*
1599  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1600  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1601  * fine for our small page arrays and doesn't require allocation.  its an
1602  * insertion sort that swaps elements that are strides apart, shrinking the
1603  * stride down until its '1' and the array is sorted.
1604  */
1605 static void sort_brw_pages(struct brw_page **array, int num)
1606 {
1607         int stride, i, j;
1608         struct brw_page *tmp;
1609
1610         if (num == 1)
1611                 return;
1612         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1613                 ;
1614
1615         do {
1616                 stride /= 3;
1617                 for (i = stride ; i < num ; i++) {
1618                         tmp = array[i];
1619                         j = i;
1620                         while (j >= stride && array[j - stride]->off > tmp->off) {
1621                                 array[j] = array[j - stride];
1622                                 j -= stride;
1623                         }
1624                         array[j] = tmp;
1625                 }
1626         } while (stride > 1);
1627 }
1628
1629 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1630 {
1631         int count = 1;
1632         int offset;
1633         int i = 0;
1634
1635         LASSERT (pages > 0);
1636         offset = pg[i]->off & ~CFS_PAGE_MASK;
1637
1638         for (;;) {
1639                 pages--;
1640                 if (pages == 0)         /* that's all */
1641                         return count;
1642
1643                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1644                         return count;   /* doesn't end on page boundary */
1645
1646                 i++;
1647                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1648                 if (offset != 0)        /* doesn't start on page boundary */
1649                         return count;
1650
1651                 count++;
1652         }
1653 }
1654
1655 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1656 {
1657         struct brw_page **ppga;
1658         int i;
1659
1660         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1661         if (ppga == NULL)
1662                 return NULL;
1663
1664         for (i = 0; i < count; i++)
1665                 ppga[i] = pga + i;
1666         return ppga;
1667 }
1668
1669 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1670 {
1671         LASSERT(ppga != NULL);
1672         OBD_FREE(ppga, sizeof(*ppga) * count);
1673 }
1674
1675 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1676                    obd_count page_count, struct brw_page *pga,
1677                    struct obd_trans_info *oti)
1678 {
1679         struct obdo *saved_oa = NULL;
1680         struct brw_page **ppga, **orig;
1681         struct obd_import *imp = class_exp2cliimp(exp);
1682         struct client_obd *cli = &imp->imp_obd->u.cli;
1683         int rc, page_count_orig;
1684         ENTRY;
1685
1686         if (cmd & OBD_BRW_CHECK) {
1687                 /* The caller just wants to know if there's a chance that this
1688                  * I/O can succeed */
1689
1690                 if (imp == NULL || imp->imp_invalid)
1691                         RETURN(-EIO);
1692                 RETURN(0);
1693         }
1694
1695         /* test_brw with a failed create can trip this, maybe others. */
1696         LASSERT(cli->cl_max_pages_per_rpc);
1697
1698         rc = 0;
1699
1700         orig = ppga = osc_build_ppga(pga, page_count);
1701         if (ppga == NULL)
1702                 RETURN(-ENOMEM);
1703         page_count_orig = page_count;
1704
1705         sort_brw_pages(ppga, page_count);
1706         while (page_count) {
1707                 obd_count pages_per_brw;
1708
1709                 if (page_count > cli->cl_max_pages_per_rpc)
1710                         pages_per_brw = cli->cl_max_pages_per_rpc;
1711                 else
1712                         pages_per_brw = page_count;
1713
1714                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1715
1716                 if (saved_oa != NULL) {
1717                         /* restore previously saved oa */
1718                         *oinfo->oi_oa = *saved_oa;
1719                 } else if (page_count > pages_per_brw) {
1720                         /* save a copy of oa (brw will clobber it) */
1721                         OBDO_ALLOC(saved_oa);
1722                         if (saved_oa == NULL)
1723                                 GOTO(out, rc = -ENOMEM);
1724                         *saved_oa = *oinfo->oi_oa;
1725                 }
1726
1727                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1728                                       pages_per_brw, ppga, oinfo->oi_capa);
1729
1730                 if (rc != 0)
1731                         break;
1732
1733                 page_count -= pages_per_brw;
1734                 ppga += pages_per_brw;
1735         }
1736
1737 out:
1738         osc_release_ppga(orig, page_count_orig);
1739
1740         if (saved_oa != NULL)
1741                 OBDO_FREE(saved_oa);
1742
1743         RETURN(rc);
1744 }
1745
1746 static int osc_brw_async(int cmd, struct obd_export *exp,
1747                          struct obd_info *oinfo, obd_count page_count,
1748                          struct brw_page *pga, struct obd_trans_info *oti,
1749                          struct ptlrpc_request_set *set)
1750 {
1751         struct brw_page **ppga, **orig;
1752         struct client_obd *cli = &exp->exp_obd->u.cli;
1753         int page_count_orig;
1754         int rc = 0;
1755         ENTRY;
1756
1757         if (cmd & OBD_BRW_CHECK) {
1758                 struct obd_import *imp = class_exp2cliimp(exp);
1759                 /* The caller just wants to know if there's a chance that this
1760                  * I/O can succeed */
1761
1762                 if (imp == NULL || imp->imp_invalid)
1763                         RETURN(-EIO);
1764                 RETURN(0);
1765         }
1766
1767         orig = ppga = osc_build_ppga(pga, page_count);
1768         if (ppga == NULL)
1769                 RETURN(-ENOMEM);
1770         page_count_orig = page_count;
1771
1772         sort_brw_pages(ppga, page_count);
1773         while (page_count) {
1774                 struct brw_page **copy;
1775                 obd_count pages_per_brw;
1776
1777                 pages_per_brw = min_t(obd_count, page_count,
1778                                       cli->cl_max_pages_per_rpc);
1779
1780                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1781
1782                 /* use ppga only if single RPC is going to fly */
1783                 if (pages_per_brw != page_count_orig || ppga != orig) {
1784                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1785                         if (copy == NULL)
1786                                 GOTO(out, rc = -ENOMEM);
1787                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1788                 } else
1789                         copy = ppga;
1790
1791                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1792                                     pages_per_brw, copy, set, oinfo->oi_capa);
1793
1794                 if (rc != 0) {
1795                         if (copy != ppga)
1796                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1797                         break;
1798                 }
1799                 if (copy == orig) {
1800                         /* we passed it to async_internal() which is
1801                          * now responsible for releasing memory */
1802                         orig = NULL;
1803                 }
1804
1805                 page_count -= pages_per_brw;
1806                 ppga += pages_per_brw;
1807         }
1808 out:
1809         if (orig)
1810                 osc_release_ppga(orig, page_count_orig);
1811         RETURN(rc);
1812 }
1813
1814 static void osc_check_rpcs(struct client_obd *cli);
1815
1816 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1817  * the dirty accounting.  Writeback completes or truncate happens before
1818  * writing starts.  Must be called with the loi lock held. */
1819 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1820                            int sent)
1821 {
1822         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1823 }
1824
1825
1826 /* This maintains the lists of pending pages to read/write for a given object
1827  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1828  * to quickly find objects that are ready to send an RPC. */
1829 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1830                          int cmd)
1831 {
1832         int optimal;
1833         ENTRY;
1834
1835         if (lop->lop_num_pending == 0)
1836                 RETURN(0);
1837
1838         /* if we have an invalid import we want to drain the queued pages
1839          * by forcing them through rpcs that immediately fail and complete
1840          * the pages.  recovery relies on this to empty the queued pages
1841          * before canceling the locks and evicting down the llite pages */
1842         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1843                 RETURN(1);
1844
1845         /* stream rpcs in queue order as long as as there is an urgent page
1846          * queued.  this is our cheap solution for good batching in the case
1847          * where writepage marks some random page in the middle of the file
1848          * as urgent because of, say, memory pressure */
1849         if (!list_empty(&lop->lop_urgent)) {
1850                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1851                 RETURN(1);
1852         }
1853         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1854         optimal = cli->cl_max_pages_per_rpc;
1855         if (cmd & OBD_BRW_WRITE) {
1856                 /* trigger a write rpc stream as long as there are dirtiers
1857                  * waiting for space.  as they're waiting, they're not going to
1858                  * create more pages to coallesce with what's waiting.. */
1859                 if (!list_empty(&cli->cl_cache_waiters)) {
1860                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1861                         RETURN(1);
1862                 }
1863                 /* +16 to avoid triggering rpcs that would want to include pages
1864                  * that are being queued but which can't be made ready until
1865                  * the queuer finishes with the page. this is a wart for
1866                  * llite::commit_write() */
1867                 optimal += 16;
1868         }
1869         if (lop->lop_num_pending >= optimal)
1870                 RETURN(1);
1871
1872         RETURN(0);
1873 }
1874
1875 static void on_list(struct list_head *item, struct list_head *list,
1876                     int should_be_on)
1877 {
1878         if (list_empty(item) && should_be_on)
1879                 list_add_tail(item, list);
1880         else if (!list_empty(item) && !should_be_on)
1881                 list_del_init(item);
1882 }
1883
1884 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1885  * can find pages to build into rpcs quickly */
1886 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1887 {
1888         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1889                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1890                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1891
1892         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1893                 loi->loi_write_lop.lop_num_pending);
1894
1895         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1896                 loi->loi_read_lop.lop_num_pending);
1897 }
1898
1899 static void lop_update_pending(struct client_obd *cli,
1900                                struct loi_oap_pages *lop, int cmd, int delta)
1901 {
1902         lop->lop_num_pending += delta;
1903         if (cmd & OBD_BRW_WRITE)
1904                 cli->cl_pending_w_pages += delta;
1905         else
1906                 cli->cl_pending_r_pages += delta;
1907 }
1908
1909 /* this is called when a sync waiter receives an interruption.  Its job is to
1910  * get the caller woken as soon as possible.  If its page hasn't been put in an
1911  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1912  * desiring interruption which will forcefully complete the rpc once the rpc
1913  * has timed out */
1914 static void osc_occ_interrupted(struct oig_callback_context *occ)
1915 {
1916         struct osc_async_page *oap;
1917         struct loi_oap_pages *lop;
1918         struct lov_oinfo *loi;
1919         ENTRY;
1920
1921         /* XXX member_of() */
1922         oap = list_entry(occ, struct osc_async_page, oap_occ);
1923
1924         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1925
1926         oap->oap_interrupted = 1;
1927
1928         /* ok, it's been put in an rpc. only one oap gets a request reference */
1929         if (oap->oap_request != NULL) {
1930                 ptlrpc_mark_interrupted(oap->oap_request);
1931                 ptlrpcd_wake(oap->oap_request);
1932                 GOTO(unlock, 0);
1933         }
1934
1935         /* we don't get interruption callbacks until osc_trigger_group_io()
1936          * has been called and put the sync oaps in the pending/urgent lists.*/
1937         if (!list_empty(&oap->oap_pending_item)) {
1938                 list_del_init(&oap->oap_pending_item);
1939                 list_del_init(&oap->oap_urgent_item);
1940
1941                 loi = oap->oap_loi;
1942                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1943                         &loi->loi_write_lop : &loi->loi_read_lop;
1944                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1945                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1946
1947                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1948                 oap->oap_oig = NULL;
1949         }
1950
1951 unlock:
1952         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1953 }
1954
1955 /* this is trying to propogate async writeback errors back up to the
1956  * application.  As an async write fails we record the error code for later if
1957  * the app does an fsync.  As long as errors persist we force future rpcs to be
1958  * sync so that the app can get a sync error and break the cycle of queueing
1959  * pages for which writeback will fail. */
1960 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1961                            int rc)
1962 {
1963         if (rc) {
1964                 if (!ar->ar_rc)
1965                         ar->ar_rc = rc;
1966
1967                 ar->ar_force_sync = 1;
1968                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1969                 return;
1970
1971         }
1972
1973         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1974                 ar->ar_force_sync = 0;
1975 }
1976
1977 static void osc_oap_to_pending(struct osc_async_page *oap)
1978 {
1979         struct loi_oap_pages *lop;
1980
1981         if (oap->oap_cmd & OBD_BRW_WRITE)
1982                 lop = &oap->oap_loi->loi_write_lop;
1983         else
1984                 lop = &oap->oap_loi->loi_read_lop;
1985
1986         if (oap->oap_async_flags & ASYNC_URGENT)
1987                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1988         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1989         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1990 }
1991
1992 /* this must be called holding the loi list lock to give coverage to exit_cache,
1993  * async_flag maintenance, and oap_request */
1994 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1995                               struct osc_async_page *oap, int sent, int rc)
1996 {
1997         __u64 xid = 0;
1998
1999         ENTRY;
2000         if (oap->oap_request != NULL) {
2001                 xid = ptlrpc_req_xid(oap->oap_request);
2002                 ptlrpc_req_finished(oap->oap_request);
2003                 oap->oap_request = NULL;
2004         }
2005
2006         oap->oap_async_flags = 0;
2007         oap->oap_interrupted = 0;
2008
2009         if (oap->oap_cmd & OBD_BRW_WRITE) {
2010                 osc_process_ar(&cli->cl_ar, xid, rc);
2011                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2012         }
2013
2014         if (rc == 0 && oa != NULL) {
2015                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2016                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2017                 if (oa->o_valid & OBD_MD_FLMTIME)
2018                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2019                 if (oa->o_valid & OBD_MD_FLATIME)
2020                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2021                 if (oa->o_valid & OBD_MD_FLCTIME)
2022                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2023         }
2024
2025         if (oap->oap_oig) {
2026                 osc_exit_cache(cli, oap, sent);
2027                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2028                 oap->oap_oig = NULL;
2029                 EXIT;
2030                 return;
2031         }
2032
2033         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2034                                                 oap->oap_cmd, oa, rc);
2035
2036         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2037          * I/O on the page could start, but OSC calls it under lock
2038          * and thus we can add oap back to pending safely */
2039         if (rc)
2040                 /* upper layer wants to leave the page on pending queue */
2041                 osc_oap_to_pending(oap);
2042         else
2043                 osc_exit_cache(cli, oap, sent);
2044         EXIT;
2045 }
2046
2047 static int brw_interpret(const struct lu_env *env,
2048                          struct ptlrpc_request *req, void *data, int rc)
2049 {
2050         struct osc_brw_async_args *aa = data;
2051         struct client_obd *cli;
2052         ENTRY;
2053
2054         rc = osc_brw_fini_request(req, rc);
2055         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2056         if (osc_recoverable_error(rc)) {
2057                 rc = osc_brw_redo_request(req, aa);
2058                 if (rc == 0)
2059                         RETURN(0);
2060         }
2061
2062         cli = aa->aa_cli;
2063
2064         client_obd_list_lock(&cli->cl_loi_list_lock);
2065
2066         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2067          * is called so we know whether to go to sync BRWs or wait for more
2068          * RPCs to complete */
2069         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2070                 cli->cl_w_in_flight--;
2071         else
2072                 cli->cl_r_in_flight--;
2073
2074         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2075                 struct osc_async_page *oap, *tmp;
2076                 /* the caller may re-use the oap after the completion call so
2077                  * we need to clean it up a little */
2078                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2079                         list_del_init(&oap->oap_rpc_item);
2080                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2081                 }
2082                 OBDO_FREE(aa->aa_oa);
2083         } else { /* from async_internal() */
2084                 int i;
2085                 for (i = 0; i < aa->aa_page_count; i++)
2086                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2087         }
2088         osc_wake_cache_waiters(cli);
2089         osc_check_rpcs(cli);
2090         client_obd_list_unlock(&cli->cl_loi_list_lock);
2091
2092         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2093         RETURN(rc);
2094 }
2095
2096 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2097                                             struct list_head *rpc_list,
2098                                             int page_count, int cmd)
2099 {
2100         struct ptlrpc_request *req;
2101         struct brw_page **pga = NULL;
2102         struct osc_brw_async_args *aa;
2103         struct obdo *oa = NULL;
2104         struct obd_async_page_ops *ops = NULL;
2105         void *caller_data = NULL;
2106         struct obd_capa *ocapa;
2107         struct osc_async_page *oap;
2108         struct ldlm_lock *lock = NULL;
2109         int i, rc;
2110
2111         ENTRY;
2112         LASSERT(!list_empty(rpc_list));
2113
2114         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2115         if (pga == NULL)
2116                 RETURN(ERR_PTR(-ENOMEM));
2117
2118         OBDO_ALLOC(oa);
2119         if (oa == NULL)
2120                 GOTO(out, req = ERR_PTR(-ENOMEM));
2121
2122         i = 0;
2123         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2124                 if (ops == NULL) {
2125                         ops = oap->oap_caller_ops;
2126                         caller_data = oap->oap_caller_data;
2127                         lock = oap->oap_ldlm_lock;
2128                 }
2129                 pga[i] = &oap->oap_brw_page;
2130                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2131                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2132                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2133                 i++;
2134         }
2135
2136         /* always get the data for the obdo for the rpc */
2137         LASSERT(ops != NULL);
2138         ops->ap_fill_obdo(caller_data, cmd, oa);
2139         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2140         if (lock) {
2141                 oa->o_handle = lock->l_remote_handle;
2142                 oa->o_valid |= OBD_MD_FLHANDLE;
2143         }
2144
2145         sort_brw_pages(pga, page_count);
2146         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2147                                   pga, &req, ocapa);
2148         capa_put(ocapa);
2149         if (rc != 0) {
2150                 CERROR("prep_req failed: %d\n", rc);
2151                 GOTO(out, req = ERR_PTR(rc));
2152         }
2153
2154         /* Need to update the timestamps after the request is built in case
2155          * we race with setattr (locally or in queue at OST).  If OST gets
2156          * later setattr before earlier BRW (as determined by the request xid),
2157          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2158          * way to do this in a single call.  bug 10150 */
2159         ops->ap_update_obdo(caller_data, cmd, oa,
2160                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2161
2162         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2163         aa = ptlrpc_req_async_args(req);
2164         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2165         list_splice(rpc_list, &aa->aa_oaps);
2166         CFS_INIT_LIST_HEAD(rpc_list);
2167
2168 out:
2169         if (IS_ERR(req)) {
2170                 if (oa)
2171                         OBDO_FREE(oa);
2172                 if (pga)
2173                         OBD_FREE(pga, sizeof(*pga) * page_count);
2174         }
2175         RETURN(req);
2176 }
2177
2178 /* the loi lock is held across this function but it's allowed to release
2179  * and reacquire it during its work */
2180 /**
2181  * prepare pages for ASYNC io and put pages in send queue.
2182  *
2183  * \param cli -
2184  * \param loi -
2185  * \param cmd - OBD_BRW_* macroses
2186  * \param lop - pending pages
2187  *
2188  * \return zero if pages successfully add to send queue.
2189  * \return not zere if error occurring.
2190  */
2191 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2192                             int cmd, struct loi_oap_pages *lop)
2193 {
2194         struct ptlrpc_request *req;
2195         obd_count page_count = 0;
2196         struct osc_async_page *oap = NULL, *tmp;
2197         struct osc_brw_async_args *aa;
2198         struct obd_async_page_ops *ops;
2199         CFS_LIST_HEAD(rpc_list);
2200         unsigned int ending_offset;
2201         unsigned  starting_offset = 0;
2202         int srvlock = 0;
2203         ENTRY;
2204
2205         /* first we find the pages we're allowed to work with */
2206         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2207                                  oap_pending_item) {
2208                 ops = oap->oap_caller_ops;
2209
2210                 LASSERT(oap->oap_magic == OAP_MAGIC);
2211
2212                 if (page_count != 0 &&
2213                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2214                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2215                                " oap %p, page %p, srvlock %u\n",
2216                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2217                         break;
2218                 }
2219                 /* in llite being 'ready' equates to the page being locked
2220                  * until completion unlocks it.  commit_write submits a page
2221                  * as not ready because its unlock will happen unconditionally
2222                  * as the call returns.  if we race with commit_write giving
2223                  * us that page we dont' want to create a hole in the page
2224                  * stream, so we stop and leave the rpc to be fired by
2225                  * another dirtier or kupdated interval (the not ready page
2226                  * will still be on the dirty list).  we could call in
2227                  * at the end of ll_file_write to process the queue again. */
2228                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2229                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2230                         if (rc < 0)
2231                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2232                                                 "instead of ready\n", oap,
2233                                                 oap->oap_page, rc);
2234                         switch (rc) {
2235                         case -EAGAIN:
2236                                 /* llite is telling us that the page is still
2237                                  * in commit_write and that we should try
2238                                  * and put it in an rpc again later.  we
2239                                  * break out of the loop so we don't create
2240                                  * a hole in the sequence of pages in the rpc
2241                                  * stream.*/
2242                                 oap = NULL;
2243                                 break;
2244                         case -EINTR:
2245                                 /* the io isn't needed.. tell the checks
2246                                  * below to complete the rpc with EINTR */
2247                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2248                                 oap->oap_count = -EINTR;
2249                                 break;
2250                         case 0:
2251                                 oap->oap_async_flags |= ASYNC_READY;
2252                                 break;
2253                         default:
2254                                 LASSERTF(0, "oap %p page %p returned %d "
2255                                             "from make_ready\n", oap,
2256                                             oap->oap_page, rc);
2257                                 break;
2258                         }
2259                 }
2260                 if (oap == NULL)
2261                         break;
2262                 /*
2263                  * Page submitted for IO has to be locked. Either by
2264                  * ->ap_make_ready() or by higher layers.
2265                  */
2266 #if defined(__KERNEL__) && defined(__linux__)
2267                  if(!(PageLocked(oap->oap_page) &&
2268                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2269                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2270                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2271                         LBUG();
2272                 }
2273 #endif
2274                 /* If there is a gap at the start of this page, it can't merge
2275                  * with any previous page, so we'll hand the network a
2276                  * "fragmented" page array that it can't transfer in 1 RDMA */
2277                 if (page_count != 0 && oap->oap_page_off != 0)
2278                         break;
2279
2280                 /* take the page out of our book-keeping */
2281                 list_del_init(&oap->oap_pending_item);
2282                 lop_update_pending(cli, lop, cmd, -1);
2283                 list_del_init(&oap->oap_urgent_item);
2284
2285                 if (page_count == 0)
2286                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2287                                           (PTLRPC_MAX_BRW_SIZE - 1);
2288
2289                 /* ask the caller for the size of the io as the rpc leaves. */
2290                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2291                         oap->oap_count =
2292                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2293                 if (oap->oap_count <= 0) {
2294                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2295                                oap->oap_count);
2296                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2297                         continue;
2298                 }
2299
2300                 /* now put the page back in our accounting */
2301                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2302                 if (page_count == 0)
2303                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2304                 if (++page_count >= cli->cl_max_pages_per_rpc)
2305                         break;
2306
2307                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2308                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2309                  * have the same alignment as the initial writes that allocated
2310                  * extents on the server. */
2311                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2312                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2313                 if (ending_offset == 0)
2314                         break;
2315
2316                 /* If there is a gap at the end of this page, it can't merge
2317                  * with any subsequent pages, so we'll hand the network a
2318                  * "fragmented" page array that it can't transfer in 1 RDMA */
2319                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2320                         break;
2321         }
2322
2323         osc_wake_cache_waiters(cli);
2324
2325         if (page_count == 0)
2326                 RETURN(0);
2327
2328         loi_list_maint(cli, loi);
2329
2330         client_obd_list_unlock(&cli->cl_loi_list_lock);
2331
2332         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2333         if (IS_ERR(req)) {
2334                 /* this should happen rarely and is pretty bad, it makes the
2335                  * pending list not follow the dirty order */
2336                 client_obd_list_lock(&cli->cl_loi_list_lock);
2337                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2338                         list_del_init(&oap->oap_rpc_item);
2339
2340                         /* queued sync pages can be torn down while the pages
2341                          * were between the pending list and the rpc */
2342                         if (oap->oap_interrupted) {
2343                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2344                                 osc_ap_completion(cli, NULL, oap, 0,
2345                                                   oap->oap_count);
2346                                 continue;
2347                         }
2348                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2349                 }
2350                 loi_list_maint(cli, loi);
2351                 RETURN(PTR_ERR(req));
2352         }
2353
2354         aa = ptlrpc_req_async_args(req);
2355
2356         if (cmd == OBD_BRW_READ) {
2357                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2358                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2359                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2360                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2361         } else {
2362                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2363                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2364                                  cli->cl_w_in_flight);
2365                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2366                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2367         }
2368         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2369
2370         client_obd_list_lock(&cli->cl_loi_list_lock);
2371
2372         if (cmd == OBD_BRW_READ)
2373                 cli->cl_r_in_flight++;
2374         else
2375                 cli->cl_w_in_flight++;
2376
2377         /* queued sync pages can be torn down while the pages
2378          * were between the pending list and the rpc */
2379         tmp = NULL;
2380         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2381                 /* only one oap gets a request reference */
2382                 if (tmp == NULL)
2383                         tmp = oap;
2384                 if (oap->oap_interrupted && !req->rq_intr) {
2385                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2386                                oap, req);
2387                         ptlrpc_mark_interrupted(req);
2388                 }
2389         }
2390         if (tmp != NULL)
2391                 tmp->oap_request = ptlrpc_request_addref(req);
2392
2393         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2394                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2395
2396         req->rq_interpret_reply = brw_interpret;
2397         ptlrpcd_add_req(req);
2398         RETURN(1);
2399 }
2400
2401 #define LOI_DEBUG(LOI, STR, args...)                                     \
2402         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2403                !list_empty(&(LOI)->loi_cli_item),                        \
2404                (LOI)->loi_write_lop.lop_num_pending,                     \
2405                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2406                (LOI)->loi_read_lop.lop_num_pending,                      \
2407                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2408                args)                                                     \
2409
2410 /* This is called by osc_check_rpcs() to find which objects have pages that
2411  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2412 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2413 {
2414         ENTRY;
2415         /* first return all objects which we already know to have
2416          * pages ready to be stuffed into rpcs */
2417         if (!list_empty(&cli->cl_loi_ready_list))
2418                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2419                                   struct lov_oinfo, loi_cli_item));
2420
2421         /* then if we have cache waiters, return all objects with queued
2422          * writes.  This is especially important when many small files
2423          * have filled up the cache and not been fired into rpcs because
2424          * they don't pass the nr_pending/object threshhold */
2425         if (!list_empty(&cli->cl_cache_waiters) &&
2426             !list_empty(&cli->cl_loi_write_list))
2427                 RETURN(list_entry(cli->cl_loi_write_list.next,
2428                                   struct lov_oinfo, loi_write_item));
2429
2430         /* then return all queued objects when we have an invalid import
2431          * so that they get flushed */
2432         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2433                 if (!list_empty(&cli->cl_loi_write_list))
2434                         RETURN(list_entry(cli->cl_loi_write_list.next,
2435                                           struct lov_oinfo, loi_write_item));
2436                 if (!list_empty(&cli->cl_loi_read_list))
2437                         RETURN(list_entry(cli->cl_loi_read_list.next,
2438                                           struct lov_oinfo, loi_read_item));
2439         }
2440         RETURN(NULL);
2441 }
2442
2443 /* called with the loi list lock held */
2444 static void osc_check_rpcs(struct client_obd *cli)
2445 {
2446         struct lov_oinfo *loi;
2447         int rc = 0, race_counter = 0;
2448         ENTRY;
2449
2450         while ((loi = osc_next_loi(cli)) != NULL) {
2451                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2452
2453                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2454                         break;
2455
2456                 /* attempt some read/write balancing by alternating between
2457                  * reads and writes in an object.  The makes_rpc checks here
2458                  * would be redundant if we were getting read/write work items
2459                  * instead of objects.  we don't want send_oap_rpc to drain a
2460                  * partial read pending queue when we're given this object to
2461                  * do io on writes while there are cache waiters */
2462                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2463                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2464                                               &loi->loi_write_lop);
2465                         if (rc < 0)
2466                                 break;
2467                         if (rc > 0)
2468                                 race_counter = 0;
2469                         else
2470                                 race_counter++;
2471                 }
2472                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2473                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2474                                               &loi->loi_read_lop);
2475                         if (rc < 0)
2476                                 break;
2477                         if (rc > 0)
2478                                 race_counter = 0;
2479                         else
2480                                 race_counter++;
2481                 }
2482
2483                 /* attempt some inter-object balancing by issueing rpcs
2484                  * for each object in turn */
2485                 if (!list_empty(&loi->loi_cli_item))
2486                         list_del_init(&loi->loi_cli_item);
2487                 if (!list_empty(&loi->loi_write_item))
2488                         list_del_init(&loi->loi_write_item);
2489                 if (!list_empty(&loi->loi_read_item))
2490                         list_del_init(&loi->loi_read_item);
2491
2492                 loi_list_maint(cli, loi);
2493
2494                 /* send_oap_rpc fails with 0 when make_ready tells it to
2495                  * back off.  llite's make_ready does this when it tries
2496                  * to lock a page queued for write that is already locked.
2497                  * we want to try sending rpcs from many objects, but we
2498                  * don't want to spin failing with 0.  */
2499                 if (race_counter == 10)
2500                         break;
2501         }
2502         EXIT;
2503 }
2504
2505 /* we're trying to queue a page in the osc so we're subject to the
2506  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2507  * If the osc's queued pages are already at that limit, then we want to sleep
2508  * until there is space in the osc's queue for us.  We also may be waiting for
2509  * write credits from the OST if there are RPCs in flight that may return some
2510  * before we fall back to sync writes.
2511  *
2512  * We need this know our allocation was granted in the presence of signals */
2513 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2514 {
2515         int rc;
2516         ENTRY;
2517         client_obd_list_lock(&cli->cl_loi_list_lock);
2518         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2519         client_obd_list_unlock(&cli->cl_loi_list_lock);
2520         RETURN(rc);
2521 };
2522
2523 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2524  * grant or cache space. */
2525 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2526                            struct osc_async_page *oap)
2527 {
2528         struct osc_cache_waiter ocw;
2529         struct l_wait_info lwi = { 0 };
2530
2531         ENTRY;
2532
2533         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2534                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2535                cli->cl_dirty_max, obd_max_dirty_pages,
2536                cli->cl_lost_grant, cli->cl_avail_grant);
2537
2538         /* force the caller to try sync io.  this can jump the list
2539          * of queued writes and create a discontiguous rpc stream */
2540         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2541             loi->loi_ar.ar_force_sync)
2542                 RETURN(-EDQUOT);
2543
2544         /* Hopefully normal case - cache space and write credits available */
2545         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2546             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2547             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2548                 /* account for ourselves */
2549                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2550                 RETURN(0);
2551         }
2552
2553         /* Make sure that there are write rpcs in flight to wait for.  This
2554          * is a little silly as this object may not have any pending but
2555          * other objects sure might. */
2556         if (cli->cl_w_in_flight) {
2557                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2558                 cfs_waitq_init(&ocw.ocw_waitq);
2559                 ocw.ocw_oap = oap;
2560                 ocw.ocw_rc = 0;
2561
2562                 loi_list_maint(cli, loi);
2563                 osc_check_rpcs(cli);
2564                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2565
2566                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2567                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2568
2569                 client_obd_list_lock(&cli->cl_loi_list_lock);
2570                 if (!list_empty(&ocw.ocw_entry)) {
2571                         list_del(&ocw.ocw_entry);
2572                         RETURN(-EINTR);
2573                 }
2574                 RETURN(ocw.ocw_rc);
2575         }
2576
2577         RETURN(-EDQUOT);
2578 }
2579
2580 /**
2581  * Checks if requested extent lock is compatible with a lock under the page.
2582  *
2583  * Checks if the lock under \a page is compatible with a read or write lock
2584  * (specified by \a rw) for an extent [\a start , \a end].
2585  *
2586  * \param exp osc export
2587  * \param lsm striping information for the file
2588  * \param res osc_async_page placeholder
2589  * \param rw OBD_BRW_READ if requested for reading,
2590  *           OBD_BRW_WRITE if requested for writing
2591  * \param start start of the requested extent
2592  * \param end end of the requested extent
2593  * \param cookie transparent parameter for passing locking context
2594  *
2595  * \post result == 1, *cookie == context, appropriate lock is referenced or
2596  * \post result == 0
2597  *
2598  * \retval 1 owned lock is reused for the request
2599  * \retval 0 no lock reused for the request
2600  *
2601  * \see osc_release_short_lock
2602  */
2603 static int osc_reget_short_lock(struct obd_export *exp,
2604                                 struct lov_stripe_md *lsm,
2605                                 void **res, int rw,
2606                                 obd_off start, obd_off end,
2607                                 void **cookie)
2608 {
2609         struct osc_async_page *oap = *res;
2610         int rc;
2611
2612         ENTRY;
2613
2614         spin_lock(&oap->oap_lock);
2615         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2616                                   start, end, cookie);
2617         spin_unlock(&oap->oap_lock);
2618
2619         RETURN(rc);
2620 }
2621
2622 /**
2623  * Releases a reference to a lock taken in a "fast" way.
2624  *
2625  * Releases a read or a write (specified by \a rw) lock
2626  * referenced by \a cookie.
2627  *
2628  * \param exp osc export
2629  * \param lsm striping information for the file
2630  * \param end end of the locked extent
2631  * \param rw OBD_BRW_READ if requested for reading,
2632  *           OBD_BRW_WRITE if requested for writing
2633  * \param cookie transparent parameter for passing locking context
2634  *
2635  * \post appropriate lock is dereferenced
2636  *
2637  * \see osc_reget_short_lock
2638  */
2639 static int osc_release_short_lock(struct obd_export *exp,
2640                                   struct lov_stripe_md *lsm, obd_off end,
2641                                   void *cookie, int rw)
2642 {
2643         ENTRY;
2644         ldlm_lock_fast_release(cookie, rw);
2645         /* no error could have happened at this layer */
2646         RETURN(0);
2647 }
2648
2649 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2650                         struct lov_oinfo *loi, cfs_page_t *page,
2651                         obd_off offset, struct obd_async_page_ops *ops,
2652                         void *data, void **res, int nocache,
2653                         struct lustre_handle *lockh)
2654 {
2655         struct osc_async_page *oap;
2656         struct ldlm_res_id oid;
2657         int rc = 0;
2658         ENTRY;
2659
2660         if (!page)
2661                 return size_round(sizeof(*oap));
2662
2663         oap = *res;
2664         oap->oap_magic = OAP_MAGIC;
2665         oap->oap_cli = &exp->exp_obd->u.cli;
2666         oap->oap_loi = loi;
2667
2668         oap->oap_caller_ops = ops;
2669         oap->oap_caller_data = data;
2670
2671         oap->oap_page = page;
2672         oap->oap_obj_off = offset;
2673
2674         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2675         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2676         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2677         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2678
2679         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2680
2681         spin_lock_init(&oap->oap_lock);
2682
2683         /* If the page was marked as notcacheable - don't add to any locks */
2684         if (!nocache) {
2685                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2686                 /* This is the only place where we can call cache_add_extent
2687                    without oap_lock, because this page is locked now, and
2688                    the lock we are adding it to is referenced, so cannot lose
2689                    any pages either. */
2690                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2691                 if (rc)
2692                         RETURN(rc);
2693         }
2694
2695         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2696         RETURN(0);
2697 }
2698
2699 struct osc_async_page *oap_from_cookie(void *cookie)
2700 {
2701         struct osc_async_page *oap = cookie;
2702         if (oap->oap_magic != OAP_MAGIC)
2703                 return ERR_PTR(-EINVAL);
2704         return oap;
2705 };
2706
2707 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2708                               struct lov_oinfo *loi, void *cookie,
2709                               int cmd, obd_off off, int count,
2710                               obd_flag brw_flags, enum async_flags async_flags)
2711 {
2712         struct client_obd *cli = &exp->exp_obd->u.cli;
2713         struct osc_async_page *oap;
2714         int rc = 0;
2715         ENTRY;
2716
2717         oap = oap_from_cookie(cookie);
2718         if (IS_ERR(oap))
2719                 RETURN(PTR_ERR(oap));
2720
2721         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2722                 RETURN(-EIO);
2723
2724         if (!list_empty(&oap->oap_pending_item) ||
2725             !list_empty(&oap->oap_urgent_item) ||
2726             !list_empty(&oap->oap_rpc_item))
2727                 RETURN(-EBUSY);
2728
2729         /* check if the file's owner/group is over quota */
2730 #ifdef HAVE_QUOTA_SUPPORT
2731         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2732                 struct obd_async_page_ops *ops;
2733                 struct obdo *oa;
2734
2735                 OBDO_ALLOC(oa);
2736                 if (oa == NULL)
2737                         RETURN(-ENOMEM);
2738
2739                 ops = oap->oap_caller_ops;
2740                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2741                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2742                     NO_QUOTA)
2743                         rc = -EDQUOT;
2744
2745                 OBDO_FREE(oa);
2746                 if (rc)
2747                         RETURN(rc);
2748         }
2749 #endif
2750
2751         if (loi == NULL)
2752                 loi = lsm->lsm_oinfo[0];
2753
2754         client_obd_list_lock(&cli->cl_loi_list_lock);
2755
2756         oap->oap_cmd = cmd;
2757         oap->oap_page_off = off;
2758         oap->oap_count = count;
2759         oap->oap_brw_flags = brw_flags;
2760         oap->oap_async_flags = async_flags;
2761
2762         if (cmd & OBD_BRW_WRITE) {
2763                 rc = osc_enter_cache(cli, loi, oap);
2764                 if (rc) {
2765                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2766                         RETURN(rc);
2767                 }
2768         }
2769
2770         osc_oap_to_pending(oap);
2771         loi_list_maint(cli, loi);
2772
2773         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2774                   cmd);
2775
2776         osc_check_rpcs(cli);
2777         client_obd_list_unlock(&cli->cl_loi_list_lock);
2778
2779         RETURN(0);
2780 }
2781
2782 /* aka (~was & now & flag), but this is more clear :) */
2783 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2784
2785 static int osc_set_async_flags(struct obd_export *exp,
2786                                struct lov_stripe_md *lsm,
2787                                struct lov_oinfo *loi, void *cookie,
2788                                obd_flag async_flags)
2789 {
2790         struct client_obd *cli = &exp->exp_obd->u.cli;
2791         struct loi_oap_pages *lop;
2792         struct osc_async_page *oap;
2793         int rc = 0;
2794         ENTRY;
2795
2796         oap = oap_from_cookie(cookie);
2797         if (IS_ERR(oap))
2798                 RETURN(PTR_ERR(oap));
2799
2800         /*
2801          * bug 7311: OST-side locking is only supported for liblustre for now
2802          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2803          * implementation has to handle case where OST-locked page was picked
2804          * up by, e.g., ->writepage().
2805          */
2806         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2807         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2808                                      * tread here. */
2809
2810         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2811                 RETURN(-EIO);
2812
2813         if (loi == NULL)
2814                 loi = lsm->lsm_oinfo[0];
2815
2816         if (oap->oap_cmd & OBD_BRW_WRITE) {
2817                 lop = &loi->loi_write_lop;
2818         } else {
2819                 lop = &loi->loi_read_lop;
2820         }
2821
2822         client_obd_list_lock(&cli->cl_loi_list_lock);
2823
2824         if (list_empty(&oap->oap_pending_item))
2825                 GOTO(out, rc = -EINVAL);
2826
2827         if ((oap->oap_async_flags & async_flags) == async_flags)
2828                 GOTO(out, rc = 0);
2829
2830         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2831                 oap->oap_async_flags |= ASYNC_READY;
2832
2833         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2834                 if (list_empty(&oap->oap_rpc_item)) {
2835                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2836                         loi_list_maint(cli, loi);
2837                 }
2838         }
2839
2840         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2841                         oap->oap_async_flags);
2842 out:
2843         osc_check_rpcs(cli);
2844         client_obd_list_unlock(&cli->cl_loi_list_lock);
2845         RETURN(rc);
2846 }
2847
2848 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2849                              struct lov_oinfo *loi,
2850                              struct obd_io_group *oig, void *cookie,
2851                              int cmd, obd_off off, int count,
2852                              obd_flag brw_flags,
2853                              obd_flag async_flags)
2854 {
2855         struct client_obd *cli = &exp->exp_obd->u.cli;
2856         struct osc_async_page *oap;
2857         struct loi_oap_pages *lop;
2858         int rc = 0;
2859         ENTRY;
2860
2861         oap = oap_from_cookie(cookie);
2862         if (IS_ERR(oap))
2863                 RETURN(PTR_ERR(oap));
2864
2865         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2866                 RETURN(-EIO);
2867
2868         if (!list_empty(&oap->oap_pending_item) ||
2869             !list_empty(&oap->oap_urgent_item) ||
2870             !list_empty(&oap->oap_rpc_item))
2871                 RETURN(-EBUSY);
2872
2873         if (loi == NULL)
2874                 loi = lsm->lsm_oinfo[0];
2875
2876         client_obd_list_lock(&cli->cl_loi_list_lock);
2877
2878         oap->oap_cmd = cmd;
2879         oap->oap_page_off = off;
2880         oap->oap_count = count;
2881         oap->oap_brw_flags = brw_flags;
2882         oap->oap_async_flags = async_flags;
2883
2884         if (cmd & OBD_BRW_WRITE)
2885                 lop = &loi->loi_write_lop;
2886         else
2887                 lop = &loi->loi_read_lop;
2888
2889         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2890         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2891                 oap->oap_oig = oig;
2892                 rc = oig_add_one(oig, &oap->oap_occ);
2893         }
2894
2895         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2896                   oap, oap->oap_page, rc);
2897
2898         client_obd_list_unlock(&cli->cl_loi_list_lock);
2899
2900         RETURN(rc);
2901 }
2902
2903 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2904                                  struct loi_oap_pages *lop, int cmd)
2905 {
2906         struct list_head *pos, *tmp;
2907         struct osc_async_page *oap;
2908
2909         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2910                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2911                 list_del(&oap->oap_pending_item);
2912                 osc_oap_to_pending(oap);
2913         }
2914         loi_list_maint(cli, loi);
2915 }
2916
2917 static int osc_trigger_group_io(struct obd_export *exp,
2918                                 struct lov_stripe_md *lsm,
2919                                 struct lov_oinfo *loi,
2920                                 struct obd_io_group *oig)
2921 {
2922         struct client_obd *cli = &exp->exp_obd->u.cli;
2923         ENTRY;
2924
2925         if (loi == NULL)
2926                 loi = lsm->lsm_oinfo[0];
2927
2928         client_obd_list_lock(&cli->cl_loi_list_lock);
2929
2930         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2931         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2932
2933         osc_check_rpcs(cli);
2934         client_obd_list_unlock(&cli->cl_loi_list_lock);
2935
2936         RETURN(0);
2937 }
2938
2939 static int osc_teardown_async_page(struct obd_export *exp,
2940                                    struct lov_stripe_md *lsm,
2941                                    struct lov_oinfo *loi, void *cookie)
2942 {
2943         struct client_obd *cli = &exp->exp_obd->u.cli;
2944         struct loi_oap_pages *lop;
2945         struct osc_async_page *oap;
2946         int rc = 0;
2947         ENTRY;
2948
2949         oap = oap_from_cookie(cookie);
2950         if (IS_ERR(oap))
2951                 RETURN(PTR_ERR(oap));
2952
2953         if (loi == NULL)
2954                 loi = lsm->lsm_oinfo[0];
2955
2956         if (oap->oap_cmd & OBD_BRW_WRITE) {
2957                 lop = &loi->loi_write_lop;
2958         } else {
2959                 lop = &loi->loi_read_lop;
2960         }
2961
2962         client_obd_list_lock(&cli->cl_loi_list_lock);
2963
2964         if (!list_empty(&oap->oap_rpc_item))
2965                 GOTO(out, rc = -EBUSY);
2966
2967         osc_exit_cache(cli, oap, 0);
2968         osc_wake_cache_waiters(cli);
2969
2970         if (!list_empty(&oap->oap_urgent_item)) {
2971                 list_del_init(&oap->oap_urgent_item);
2972                 oap->oap_async_flags &= ~ASYNC_URGENT;
2973         }
2974         if (!list_empty(&oap->oap_pending_item)) {
2975                 list_del_init(&oap->oap_pending_item);
2976                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2977         }
2978         loi_list_maint(cli, loi);
2979         cache_remove_extent(cli->cl_cache, oap);
2980
2981         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2982 out:
2983         client_obd_list_unlock(&cli->cl_loi_list_lock);
2984         RETURN(rc);
2985 }
2986
2987 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2988                            struct ldlm_lock_desc *new, void *data,
2989                            int flag)
2990 {
2991         struct lustre_handle lockh = { 0 };
2992         int rc;
2993         ENTRY;
2994
2995         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2996                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2997                 LBUG();
2998         }
2999
3000         switch (flag) {
3001         case LDLM_CB_BLOCKING:
3002                 ldlm_lock2handle(lock, &lockh);
3003                 rc = ldlm_cli_cancel(&lockh);
3004                 if (rc != ELDLM_OK)
3005                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
3006                 break;
3007         case LDLM_CB_CANCELING: {
3008
3009                 ldlm_lock2handle(lock, &lockh);
3010                 /* This lock wasn't granted, don't try to do anything */
3011                 if (lock->l_req_mode != lock->l_granted_mode)
3012                         RETURN(0);
3013
3014                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3015                                   &lockh);
3016
3017                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3018                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3019                                                           lock, new, data,flag);
3020                 break;
3021         }
3022         default:
3023                 LBUG();
3024         }
3025
3026         RETURN(0);
3027 }
3028 EXPORT_SYMBOL(osc_extent_blocking_cb);
3029
3030 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3031                                     int flags)
3032 {
3033         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3034
3035         if (lock == NULL) {
3036                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3037                 return;
3038         }
3039         lock_res_and_lock(lock);
3040 #if defined (__KERNEL__) && defined (__linux__)
3041         /* Liang XXX: Darwin and Winnt checking should be added */
3042         if (lock->l_ast_data && lock->l_ast_data != data) {
3043                 struct inode *new_inode = data;
3044                 struct inode *old_inode = lock->l_ast_data;
3045                 if (!(old_inode->i_state & I_FREEING))
3046                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3047                 LASSERTF(old_inode->i_state & I_FREEING,
3048                          "Found existing inode %p/%lu/%u state %lu in lock: "
3049                          "setting data to %p/%lu/%u\n", old_inode,
3050                          old_inode->i_ino, old_inode->i_generation,
3051                          old_inode->i_state,
3052                          new_inode, new_inode->i_ino, new_inode->i_generation);
3053         }
3054 #endif
3055         lock->l_ast_data = data;
3056         unlock_res_and_lock(lock);
3057         LDLM_LOCK_PUT(lock);
3058 }
3059
3060 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3061                              ldlm_iterator_t replace, void *data)
3062 {
3063         struct ldlm_res_id res_id;
3064         struct obd_device *obd = class_exp2obd(exp);
3065
3066         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3067         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3068         return 0;
3069 }
3070
3071 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3072                             struct obd_info *oinfo, int intent, int rc)
3073 {
3074         ENTRY;
3075
3076         if (intent) {
3077                 /* The request was created before ldlm_cli_enqueue call. */
3078                 if (rc == ELDLM_LOCK_ABORTED) {
3079                         struct ldlm_reply *rep;
3080                         rep = req_capsule_server_get(&req->rq_pill,
3081                                                      &RMF_DLM_REP);
3082
3083                         LASSERT(rep != NULL);
3084                         if (rep->lock_policy_res1)
3085                                 rc = rep->lock_policy_res1;
3086                 }
3087         }
3088
3089         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3090                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3091                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3092                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3093                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3094         }
3095
3096         if (!rc)
3097                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3098
3099         /* Call the update callback. */
3100         rc = oinfo->oi_cb_up(oinfo, rc);
3101         RETURN(rc);
3102 }
3103
3104 static int osc_enqueue_interpret(const struct lu_env *env,
3105                                  struct ptlrpc_request *req,
3106                                  struct osc_enqueue_args *aa, int rc)
3107 {
3108         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3109         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3110         struct ldlm_lock *lock;
3111
3112         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3113          * be valid. */
3114         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3115
3116         /* Complete obtaining the lock procedure. */
3117         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3118                                    aa->oa_ei->ei_mode,
3119                                    &aa->oa_oi->oi_flags,
3120                                    &lsm->lsm_oinfo[0]->loi_lvb,
3121                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3122                                    lustre_swab_ost_lvb,
3123                                    aa->oa_oi->oi_lockh, rc);
3124
3125         /* Complete osc stuff. */
3126         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3127
3128         /* Release the lock for async request. */
3129         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3130                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3131
3132         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3133                  aa->oa_oi->oi_lockh, req, aa);
3134         LDLM_LOCK_PUT(lock);
3135         return rc;
3136 }
3137
3138 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3139  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3140  * other synchronous requests, however keeping some locks and trying to obtain
3141  * others may take a considerable amount of time in a case of ost failure; and
3142  * when other sync requests do not get released lock from a client, the client
3143  * is excluded from the cluster -- such scenarious make the life difficult, so
3144  * release locks just after they are obtained. */
3145 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3146                        struct ldlm_enqueue_info *einfo,
3147                        struct ptlrpc_request_set *rqset)
3148 {
3149         struct ldlm_res_id res_id;
3150         struct obd_device *obd = exp->exp_obd;
3151         struct ptlrpc_request *req = NULL;
3152         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3153         ldlm_mode_t mode;
3154         int rc;
3155         ENTRY;
3156
3157
3158         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3159                            oinfo->oi_md->lsm_object_gr, &res_id);
3160         /* Filesystem lock extents are extended to page boundaries so that
3161          * dealing with the page cache is a little smoother.  */
3162         oinfo->oi_policy.l_extent.start -=
3163                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3164         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3165
3166         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3167                 goto no_match;
3168
3169         /* Next, search for already existing extent locks that will cover us */
3170         /* If we're trying to read, we also search for an existing PW lock.  The
3171          * VFS and page cache already protect us locally, so lots of readers/
3172          * writers can share a single PW lock.
3173          *
3174          * There are problems with conversion deadlocks, so instead of
3175          * converting a read lock to a write lock, we'll just enqueue a new
3176          * one.
3177          *
3178          * At some point we should cancel the read lock instead of making them
3179          * send us a blocking callback, but there are problems with canceling
3180          * locks out from other users right now, too. */
3181         mode = einfo->ei_mode;
3182         if (einfo->ei_mode == LCK_PR)
3183                 mode |= LCK_PW;
3184         mode = ldlm_lock_match(obd->obd_namespace,
3185                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3186                                einfo->ei_type, &oinfo->oi_policy, mode,
3187                                oinfo->oi_lockh);
3188         if (mode) {
3189                 /* addref the lock only if not async requests and PW lock is
3190                  * matched whereas we asked for PR. */
3191                 if (!rqset && einfo->ei_mode != mode)
3192                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3193                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3194                                         oinfo->oi_flags);
3195                 if (intent) {
3196                         /* I would like to be able to ASSERT here that rss <=
3197                          * kms, but I can't, for reasons which are explained in
3198                          * lov_enqueue() */
3199                 }
3200
3201                 /* We already have a lock, and it's referenced */
3202                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3203
3204                 /* For async requests, decref the lock. */
3205                 if (einfo->ei_mode != mode)
3206                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3207                 else if (rqset)
3208                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3209
3210                 RETURN(ELDLM_OK);
3211         }
3212
3213  no_match:
3214         if (intent) {
3215                 CFS_LIST_HEAD(cancels);
3216                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3217                                            &RQF_LDLM_ENQUEUE_LVB);
3218                 if (req == NULL)
3219                         RETURN(-ENOMEM);
3220
3221                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3222                 if (rc)
3223                         RETURN(rc);
3224
3225                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3226                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3227                 ptlrpc_request_set_replen(req);
3228         }
3229
3230         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3231         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3232
3233         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3234                               &oinfo->oi_policy, &oinfo->oi_flags,
3235                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3236                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3237                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3238                               rqset ? 1 : 0);
3239         if (rqset) {
3240                 if (!rc) {
3241                         struct osc_enqueue_args *aa;
3242                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3243                         aa = ptlrpc_req_async_args(req);
3244                         aa->oa_oi = oinfo;
3245                         aa->oa_ei = einfo;
3246                         aa->oa_exp = exp;
3247
3248                         req->rq_interpret_reply =
3249                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3250                         ptlrpc_set_add_req(rqset, req);
3251                 } else if (intent) {
3252                         ptlrpc_req_finished(req);
3253                 }
3254                 RETURN(rc);
3255         }
3256
3257         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3258         if (intent)
3259                 ptlrpc_req_finished(req);
3260
3261         RETURN(rc);
3262 }
3263
3264 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3265                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3266                      int *flags, void *data, struct lustre_handle *lockh)
3267 {
3268         struct ldlm_res_id res_id;
3269         struct obd_device *obd = exp->exp_obd;
3270         int lflags = *flags;
3271         ldlm_mode_t rc;
3272         ENTRY;
3273
3274         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3275
3276         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3277                 RETURN(-EIO);
3278
3279         /* Filesystem lock extents are extended to page boundaries so that
3280          * dealing with the page cache is a little smoother */
3281         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3282         policy->l_extent.end |= ~CFS_PAGE_MASK;
3283
3284         /* Next, search for already existing extent locks that will cover us */
3285         /* If we're trying to read, we also search for an existing PW lock.  The
3286          * VFS and page cache already protect us locally, so lots of readers/
3287          * writers can share a single PW lock. */
3288         rc = mode;
3289         if (mode == LCK_PR)
3290                 rc |= LCK_PW;
3291         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3292                              &res_id, type, policy, rc, lockh);
3293         if (rc) {
3294                 osc_set_data_with_check(lockh, data, lflags);
3295                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3296                         ldlm_lock_addref(lockh, LCK_PR);
3297                         ldlm_lock_decref(lockh, LCK_PW);
3298                 }
3299                 RETURN(rc);
3300         }
3301         RETURN(rc);
3302 }
3303
3304 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3305                       __u32 mode, struct lustre_handle *lockh)
3306 {
3307         ENTRY;
3308
3309         if (unlikely(mode == LCK_GROUP))
3310                 ldlm_lock_decref_and_cancel(lockh, mode);
3311         else
3312                 ldlm_lock_decref(lockh, mode);
3313
3314         RETURN(0);
3315 }
3316
3317 static int osc_cancel_unused(struct obd_export *exp,
3318                              struct lov_stripe_md *lsm, int flags,
3319                              void *opaque)
3320 {
3321         struct obd_device *obd = class_exp2obd(exp);
3322         struct ldlm_res_id res_id, *resp = NULL;
3323
3324         if (lsm != NULL) {
3325                 resp = osc_build_res_name(lsm->lsm_object_id,
3326                                           lsm->lsm_object_gr, &res_id);
3327         }
3328
3329         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3330 }
3331
3332 static int osc_statfs_interpret(const struct lu_env *env,
3333                                 struct ptlrpc_request *req,
3334                                 struct osc_async_args *aa, int rc)
3335 {
3336         struct obd_statfs *msfs;
3337         ENTRY;
3338
3339         if (rc != 0)
3340                 GOTO(out, rc);
3341
3342         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3343         if (msfs == NULL) {
3344                 GOTO(out, rc = -EPROTO);
3345         }
3346
3347         *aa->aa_oi->oi_osfs = *msfs;
3348 out:
3349         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3350         RETURN(rc);
3351 }
3352
3353 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3354                             __u64 max_age, struct ptlrpc_request_set *rqset)
3355 {
3356         struct ptlrpc_request *req;
3357         struct osc_async_args *aa;
3358         int                    rc;
3359         ENTRY;
3360
3361         /* We could possibly pass max_age in the request (as an absolute
3362          * timestamp or a "seconds.usec ago") so the target can avoid doing
3363          * extra calls into the filesystem if that isn't necessary (e.g.
3364          * during mount that would help a bit).  Having relative timestamps
3365          * is not so great if request processing is slow, while absolute
3366          * timestamps are not ideal because they need time synchronization. */
3367         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3368         if (req == NULL)
3369                 RETURN(-ENOMEM);
3370
3371         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3372         if (rc) {
3373                 ptlrpc_request_free(req);
3374                 RETURN(rc);
3375         }
3376         ptlrpc_request_set_replen(req);
3377         req->rq_request_portal = OST_CREATE_PORTAL;
3378         ptlrpc_at_set_req_timeout(req);
3379
3380         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3381                 /* procfs requests not want stat in wait for avoid deadlock */
3382                 req->rq_no_resend = 1;
3383                 req->rq_no_delay = 1;
3384         }
3385
3386         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3387         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3388         aa = ptlrpc_req_async_args(req);
3389         aa->aa_oi = oinfo;
3390
3391         ptlrpc_set_add_req(rqset, req);
3392         RETURN(0);
3393 }
3394
3395 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3396                       __u64 max_age, __u32 flags)
3397 {
3398         struct obd_statfs     *msfs;
3399         struct ptlrpc_request *req;
3400         struct obd_import     *imp = NULL;
3401         int rc;
3402         ENTRY;
3403
3404         /*Since the request might also come from lprocfs, so we need
3405          *sync this with client_disconnect_export Bug15684*/
3406         down_read(&obd->u.cli.cl_sem);
3407         if (obd->u.cli.cl_import)
3408                 imp = class_import_get(obd->u.cli.cl_import);
3409         up_read(&obd->u.cli.cl_sem);
3410         if (!imp)
3411                 RETURN(-ENODEV);
3412
3413         /* We could possibly pass max_age in the request (as an absolute
3414          * timestamp or a "seconds.usec ago") so the target can avoid doing
3415          * extra calls into the filesystem if that isn't necessary (e.g.
3416          * during mount that would help a bit).  Having relative timestamps
3417          * is not so great if request processing is slow, while absolute
3418          * timestamps are not ideal because they need time synchronization. */
3419         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3420
3421         class_import_put(imp);
3422
3423         if (req == NULL)
3424                 RETURN(-ENOMEM);
3425
3426         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3427         if (rc) {
3428                 ptlrpc_request_free(req);
3429                 RETURN(rc);
3430         }
3431         ptlrpc_request_set_replen(req);
3432         req->rq_request_portal = OST_CREATE_PORTAL;
3433         ptlrpc_at_set_req_timeout(req);
3434
3435         if (flags & OBD_STATFS_NODELAY) {
3436                 /* procfs requests not want stat in wait for avoid deadlock */
3437                 req->rq_no_resend = 1;
3438                 req->rq_no_delay = 1;
3439         }
3440
3441         rc = ptlrpc_queue_wait(req);
3442         if (rc)
3443                 GOTO(out, rc);
3444
3445         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3446         if (msfs == NULL) {
3447                 GOTO(out, rc = -EPROTO);
3448         }
3449
3450         *osfs = *msfs;
3451
3452         EXIT;
3453  out:
3454         ptlrpc_req_finished(req);
3455         return rc;
3456 }
3457
3458 /* Retrieve object striping information.
3459  *
3460  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3461  * the maximum number of OST indices which will fit in the user buffer.
3462  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3463  */
3464 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3465 {
3466         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3467         struct lov_user_md_v3 lum, *lumk;
3468         struct lov_user_ost_data_v1 *lmm_objects;
3469         int rc = 0, lum_size;
3470         ENTRY;
3471
3472         if (!lsm)
3473                 RETURN(-ENODATA);
3474
3475         /* we only need the header part from user space to get lmm_magic and
3476          * lmm_stripe_count, (the header part is common to v1 and v3) */
3477         lum_size = sizeof(struct lov_user_md_v1);
3478         if (copy_from_user(&lum, lump, lum_size))
3479                 RETURN(-EFAULT);
3480
3481         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3482             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3483                 RETURN(-EINVAL);
3484
3485         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3486         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3487         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3488         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3489
3490         /* we can use lov_mds_md_size() to compute lum_size
3491          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3492         if (lum.lmm_stripe_count > 0) {
3493                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3494                 OBD_ALLOC(lumk, lum_size);
3495                 if (!lumk)
3496                         RETURN(-ENOMEM);
3497
3498                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3499                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3500                 else
3501                         lmm_objects = &(lumk->lmm_objects[0]);
3502                 lmm_objects->l_object_id = lsm->lsm_object_id;
3503         } else {
3504                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3505                 lumk = &lum;
3506         }
3507
3508         lumk->lmm_object_id = lsm->lsm_object_id;
3509         lumk->lmm_object_gr = lsm->lsm_object_gr;
3510         lumk->lmm_stripe_count = 1;
3511
3512         if (copy_to_user(lump, lumk, lum_size))
3513                 rc = -EFAULT;
3514
3515         if (lumk != &lum)
3516                 OBD_FREE(lumk, lum_size);
3517
3518         RETURN(rc);
3519 }
3520
3521
3522 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3523                          void *karg, void *uarg)
3524 {
3525         struct obd_device *obd = exp->exp_obd;
3526         struct obd_ioctl_data *data = karg;
3527         int err = 0;
3528         ENTRY;
3529
3530         if (!try_module_get(THIS_MODULE)) {
3531                 CERROR("Can't get module. Is it alive?");
3532                 return -EINVAL;
3533         }
3534         switch (cmd) {
3535         case OBD_IOC_LOV_GET_CONFIG: {
3536                 char *buf;
3537                 struct lov_desc *desc;
3538                 struct obd_uuid uuid;
3539
3540                 buf = NULL;
3541                 len = 0;
3542                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3543                         GOTO(out, err = -EINVAL);
3544
3545                 data = (struct obd_ioctl_data *)buf;
3546
3547                 if (sizeof(*desc) > data->ioc_inllen1) {
3548                         obd_ioctl_freedata(buf, len);
3549                         GOTO(out, err = -EINVAL);
3550                 }
3551
3552                 if (data->ioc_inllen2 < sizeof(uuid)) {
3553                         obd_ioctl_freedata(buf, len);
3554                         GOTO(out, err = -EINVAL);
3555                 }
3556
3557                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3558                 desc->ld_tgt_count = 1;
3559                 desc->ld_active_tgt_count = 1;
3560                 desc->ld_default_stripe_count = 1;
3561                 desc->ld_default_stripe_size = 0;
3562                 desc->ld_default_stripe_offset = 0;
3563                 desc->ld_pattern = 0;
3564                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3565
3566                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3567
3568                 err = copy_to_user((void *)uarg, buf, len);
3569                 if (err)
3570                         err = -EFAULT;
3571                 obd_ioctl_freedata(buf, len);
3572                 GOTO(out, err);
3573         }
3574         case LL_IOC_LOV_SETSTRIPE:
3575                 err = obd_alloc_memmd(exp, karg);
3576                 if (err > 0)
3577                         err = 0;
3578                 GOTO(out, err);
3579         case LL_IOC_LOV_GETSTRIPE:
3580                 err = osc_getstripe(karg, uarg);
3581                 GOTO(out, err);
3582         case OBD_IOC_CLIENT_RECOVER:
3583                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3584                                             data->ioc_inlbuf1);
3585                 if (err > 0)
3586                         err = 0;
3587                 GOTO(out, err);
3588         case IOC_OSC_SET_ACTIVE:
3589                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3590                                                data->ioc_offset);
3591                 GOTO(out, err);
3592         case OBD_IOC_POLL_QUOTACHECK:
3593                 err = lquota_poll_check(quota_interface, exp,
3594                                         (struct if_quotacheck *)karg);
3595                 GOTO(out, err);
3596         default:
3597                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3598                        cmd, cfs_curproc_comm());
3599                 GOTO(out, err = -ENOTTY);
3600         }
3601 out:
3602         module_put(THIS_MODULE);
3603         return err;
3604 }
3605
3606 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3607                         void *key, __u32 *vallen, void *val,
3608                         struct lov_stripe_md *lsm)
3609 {
3610         ENTRY;
3611         if (!vallen || !val)
3612                 RETURN(-EFAULT);
3613
3614         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3615                 __u32 *stripe = val;
3616                 *vallen = sizeof(*stripe);
3617                 *stripe = 0;
3618                 RETURN(0);
3619         } else if (KEY_IS(KEY_LAST_ID)) {
3620                 struct ptlrpc_request *req;
3621                 obd_id                *reply;
3622                 char                  *tmp;
3623                 int                    rc;
3624
3625                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3626                                            &RQF_OST_GET_INFO_LAST_ID);
3627                 if (req == NULL)
3628                         RETURN(-ENOMEM);
3629
3630                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3631                                      RCL_CLIENT, keylen);
3632                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3633                 if (rc) {
3634                         ptlrpc_request_free(req);
3635                         RETURN(rc);
3636                 }
3637
3638                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3639                 memcpy(tmp, key, keylen);
3640
3641                 ptlrpc_request_set_replen(req);
3642                 rc = ptlrpc_queue_wait(req);
3643                 if (rc)
3644                         GOTO(out, rc);
3645
3646                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3647                 if (reply == NULL)
3648                         GOTO(out, rc = -EPROTO);
3649
3650                 *((obd_id *)val) = *reply;
3651         out:
3652                 ptlrpc_req_finished(req);
3653                 RETURN(rc);
3654         } else if (KEY_IS(KEY_FIEMAP)) {
3655                 struct ptlrpc_request *req;
3656                 struct ll_user_fiemap *reply;
3657                 char *tmp;
3658                 int rc;
3659
3660                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3661                                            &RQF_OST_GET_INFO_FIEMAP);
3662                 if (req == NULL)
3663                         RETURN(-ENOMEM);
3664
3665                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3666                                      RCL_CLIENT, keylen);
3667                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3668                                      RCL_CLIENT, *vallen);
3669                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3670                                      RCL_SERVER, *vallen);
3671
3672                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3673                 if (rc) {
3674                         ptlrpc_request_free(req);
3675                         RETURN(rc);
3676                 }
3677
3678                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3679                 memcpy(tmp, key, keylen);
3680                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3681                 memcpy(tmp, val, *vallen);
3682
3683                 ptlrpc_request_set_replen(req);
3684                 rc = ptlrpc_queue_wait(req);
3685                 if (rc)
3686                         GOTO(out1, rc);
3687
3688                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3689                 if (reply == NULL)
3690                         GOTO(out1, rc = -EPROTO);
3691
3692                 memcpy(val, reply, *vallen);
3693         out1:
3694                 ptlrpc_req_finished(req);
3695
3696                 RETURN(rc);
3697         }
3698
3699         RETURN(-EINVAL);
3700 }
3701
3702 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3703                                           struct ptlrpc_request *req,
3704                                           void *aa, int rc)
3705 {
3706         struct llog_ctxt *ctxt;
3707         struct obd_import *imp = req->rq_import;
3708         ENTRY;
3709
3710         if (rc != 0)
3711                 RETURN(rc);
3712
3713         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3714         if (ctxt) {
3715                 if (rc == 0)
3716                         rc = llog_initiator_connect(ctxt);
3717                 else
3718                         CERROR("cannot establish connection for "
3719                                "ctxt %p: %d\n", ctxt, rc);
3720         }
3721
3722         llog_ctxt_put(ctxt);
3723         spin_lock(&imp->imp_lock);
3724         imp->imp_server_timeout = 1;
3725         imp->imp_pingable = 1;
3726         spin_unlock(&imp->imp_lock);
3727         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3728
3729         RETURN(rc);
3730 }
3731
3732 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3733                               void *key, obd_count vallen, void *val,
3734                               struct ptlrpc_request_set *set)
3735 {
3736         struct ptlrpc_request *req;
3737         struct obd_device     *obd = exp->exp_obd;
3738         struct obd_import     *imp = class_exp2cliimp(exp);
3739         char                  *tmp;
3740         int                    rc;
3741         ENTRY;
3742
3743         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3744
3745         if (KEY_IS(KEY_NEXT_ID)) {
3746                 if (vallen != sizeof(obd_id))
3747                         RETURN(-ERANGE);
3748                 if (val == NULL)
3749                         RETURN(-EINVAL);
3750                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3751                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3752                        exp->exp_obd->obd_name,
3753                        obd->u.cli.cl_oscc.oscc_next_id);
3754
3755                 RETURN(0);
3756         }
3757
3758         if (KEY_IS(KEY_UNLINKED)) {
3759                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3760                 spin_lock(&oscc->oscc_lock);
3761                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3762                 spin_unlock(&oscc->oscc_lock);
3763                 RETURN(0);
3764         }
3765
3766         if (KEY_IS(KEY_INIT_RECOV)) {
3767                 if (vallen != sizeof(int))
3768                         RETURN(-EINVAL);
3769                 spin_lock(&imp->imp_lock);
3770                 imp->imp_initial_recov = *(int *)val;
3771                 spin_unlock(&imp->imp_lock);
3772                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3773                        exp->exp_obd->obd_name,
3774                        imp->imp_initial_recov);
3775                 RETURN(0);
3776         }
3777
3778         if (KEY_IS(KEY_CHECKSUM)) {
3779                 if (vallen != sizeof(int))
3780                         RETURN(-EINVAL);
3781                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3782                 RETURN(0);
3783         }
3784
3785         if (KEY_IS(KEY_FLUSH_CTX)) {
3786                 sptlrpc_import_flush_my_ctx(imp);
3787                 RETURN(0);
3788         }
3789
3790         if (!set)
3791                 RETURN(-EINVAL);
3792
3793         /* We pass all other commands directly to OST. Since nobody calls osc
3794            methods directly and everybody is supposed to go through LOV, we
3795            assume lov checked invalid values for us.
3796            The only recognised values so far are evict_by_nid and mds_conn.
3797            Even if something bad goes through, we'd get a -EINVAL from OST
3798            anyway. */
3799
3800
3801         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3802         if (req == NULL)
3803                 RETURN(-ENOMEM);
3804
3805         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3806                              RCL_CLIENT, keylen);
3807         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3808                              RCL_CLIENT, vallen);
3809         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3810         if (rc) {
3811                 ptlrpc_request_free(req);
3812                 RETURN(rc);
3813         }
3814
3815         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3816         memcpy(tmp, key, keylen);
3817         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3818         memcpy(tmp, val, vallen);
3819
3820         if (KEY_IS(KEY_MDS_CONN)) {
3821                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3822
3823                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3824                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3825                 LASSERT(oscc->oscc_oa.o_gr > 0);
3826                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3827         }
3828
3829         ptlrpc_request_set_replen(req);
3830         ptlrpc_set_add_req(set, req);
3831         ptlrpc_check_set(NULL, set);
3832
3833         RETURN(0);
3834 }
3835
3836
3837 static struct llog_operations osc_size_repl_logops = {
3838         lop_cancel: llog_obd_repl_cancel
3839 };
3840
3841 static struct llog_operations osc_mds_ost_orig_logops;
3842 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3843                          struct obd_device *tgt, int count,
3844                          struct llog_catid *catid, struct obd_uuid *uuid)
3845 {
3846         int rc;
3847         ENTRY;
3848
3849         LASSERT(olg == &obd->obd_olg);
3850         spin_lock(&obd->obd_dev_lock);
3851         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3852                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3853                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3854                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3855                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3856                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3857         }
3858         spin_unlock(&obd->obd_dev_lock);
3859
3860         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3861                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3862         if (rc) {
3863                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3864                 GOTO (out, rc);
3865         }
3866
3867         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3868                         NULL, &osc_size_repl_logops);
3869         if (rc) {
3870                 struct llog_ctxt *ctxt =
3871                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3872                 if (ctxt)
3873                         llog_cleanup(ctxt);
3874                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3875         }
3876         GOTO(out, rc);
3877 out:
3878         if (rc) {
3879                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3880                        obd->obd_name, tgt->obd_name, count, catid, rc);
3881                 CERROR("logid "LPX64":0x%x\n",
3882                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3883         }
3884         return rc;
3885 }
3886
3887 static int osc_llog_finish(struct obd_device *obd, int count)
3888 {
3889         struct llog_ctxt *ctxt;
3890         int rc = 0, rc2 = 0;
3891         ENTRY;
3892
3893         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3894         if (ctxt)
3895                 rc = llog_cleanup(ctxt);
3896
3897         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3898         if (ctxt)
3899                 rc2 = llog_cleanup(ctxt);
3900         if (!rc)
3901                 rc = rc2;
3902
3903         RETURN(rc);
3904 }
3905
3906 static int osc_reconnect(const struct lu_env *env,
3907                          struct obd_export *exp, struct obd_device *obd,
3908                          struct obd_uuid *cluuid,
3909                          struct obd_connect_data *data,
3910                          void *localdata)
3911 {
3912         struct client_obd *cli = &obd->u.cli;
3913
3914         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3915                 long lost_grant;
3916
3917                 client_obd_list_lock(&cli->cl_loi_list_lock);
3918                 data->ocd_grant = cli->cl_avail_grant ?:
3919                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3920                 lost_grant = cli->cl_lost_grant;
3921                 cli->cl_lost_grant = 0;
3922                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3923
3924                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3925                        "cl_lost_grant: %ld\n", data->ocd_grant,
3926                        cli->cl_avail_grant, lost_grant);
3927                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3928                        " ocd_grant: %d\n", data->ocd_connect_flags,
3929                        data->ocd_version, data->ocd_grant);
3930         }
3931
3932         RETURN(0);
3933 }
3934
3935 static int osc_disconnect(struct obd_export *exp)
3936 {
3937         struct obd_device *obd = class_exp2obd(exp);
3938         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3939         int rc;
3940
3941         if (obd->u.cli.cl_conn_count == 1)
3942                 /* flush any remaining cancel messages out to the target */
3943                 llog_sync(ctxt, exp);
3944
3945         llog_ctxt_put(ctxt);
3946
3947         rc = client_disconnect_export(exp);
3948         return rc;
3949 }
3950
3951 static int osc_import_event(struct obd_device *obd,
3952                             struct obd_import *imp,
3953                             enum obd_import_event event)
3954 {
3955         struct client_obd *cli;
3956         int rc = 0;
3957
3958         ENTRY;
3959         LASSERT(imp->imp_obd == obd);
3960
3961         switch (event) {
3962         case IMP_EVENT_DISCON: {
3963                 /* Only do this on the MDS OSC's */
3964                 if (imp->imp_server_timeout) {
3965                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3966
3967                         spin_lock(&oscc->oscc_lock);
3968                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3969                         spin_unlock(&oscc->oscc_lock);
3970                 }
3971                 cli = &obd->u.cli;
3972                 client_obd_list_lock(&cli->cl_loi_list_lock);
3973                 cli->cl_avail_grant = 0;
3974                 cli->cl_lost_grant = 0;
3975                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3976                 break;
3977         }
3978         case IMP_EVENT_INACTIVE: {
3979                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3980                 break;
3981         }
3982         case IMP_EVENT_INVALIDATE: {
3983                 struct ldlm_namespace *ns = obd->obd_namespace;
3984
3985                 /* Reset grants */
3986                 cli = &obd->u.cli;
3987                 client_obd_list_lock(&cli->cl_loi_list_lock);
3988                 /* all pages go to failing rpcs due to the invalid import */
3989                 osc_check_rpcs(cli);
3990                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3991
3992                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3993
3994                 break;
3995         }
3996         case IMP_EVENT_ACTIVE: {
3997                 /* Only do this on the MDS OSC's */
3998                 if (imp->imp_server_timeout) {
3999                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4000
4001                         spin_lock(&oscc->oscc_lock);
4002                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4003                         spin_unlock(&oscc->oscc_lock);
4004                 }
4005                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4006                 break;
4007         }
4008         case IMP_EVENT_OCD: {
4009                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4010
4011                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4012                         osc_init_grant(&obd->u.cli, ocd);
4013
4014                 /* See bug 7198 */
4015                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4016                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4017
4018                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4019                 break;
4020         }
4021         default:
4022                 CERROR("Unknown import event %d\n", event);
4023                 LBUG();
4024         }
4025         RETURN(rc);
4026 }
4027
4028 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4029 {
4030         int rc;
4031         ENTRY;
4032
4033         ENTRY;
4034         rc = ptlrpcd_addref();
4035         if (rc)
4036                 RETURN(rc);
4037
4038         rc = client_obd_setup(obd, lcfg);
4039         if (rc) {
4040                 ptlrpcd_decref();
4041         } else {
4042                 struct lprocfs_static_vars lvars = { 0 };
4043                 struct client_obd *cli = &obd->u.cli;
4044
4045                 lprocfs_osc_init_vars(&lvars);
4046                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4047                         lproc_osc_attach_seqstat(obd);
4048                         sptlrpc_lprocfs_cliobd_attach(obd);
4049                         ptlrpc_lprocfs_register_obd(obd);
4050                 }
4051
4052                 oscc_init(obd);
4053                 /* We need to allocate a few requests more, because
4054                    brw_interpret tries to create new requests before freeing
4055                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4056                    reserved, but I afraid that might be too much wasted RAM
4057                    in fact, so 2 is just my guess and still should work. */
4058                 cli->cl_import->imp_rq_pool =
4059                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4060                                             OST_MAXREQSIZE,
4061                                             ptlrpc_add_rqs_to_pool);
4062                 cli->cl_cache = cache_create(obd);
4063                 if (!cli->cl_cache) {
4064                         osc_cleanup(obd);
4065                         rc = -ENOMEM;
4066                 }
4067         }
4068
4069         RETURN(rc);
4070 }
4071
4072 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4073 {
4074         int rc = 0;
4075         ENTRY;
4076
4077         switch (stage) {
4078         case OBD_CLEANUP_EARLY: {
4079                 struct obd_import *imp;
4080                 imp = obd->u.cli.cl_import;
4081                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4082                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4083                 ptlrpc_deactivate_import(imp);
4084                 spin_lock(&imp->imp_lock);
4085                 imp->imp_pingable = 0;
4086                 spin_unlock(&imp->imp_lock);
4087                 break;
4088         }
4089         case OBD_CLEANUP_EXPORTS: {
4090                 /* If we set up but never connected, the
4091                    client import will not have been cleaned. */
4092                 if (obd->u.cli.cl_import) {
4093                         struct obd_import *imp;
4094                         imp = obd->u.cli.cl_import;
4095                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4096                                obd->obd_name);
4097                         ptlrpc_invalidate_import(imp);
4098                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
4099                         class_destroy_import(imp);
4100                         obd->u.cli.cl_import = NULL;
4101                 }
4102                 rc = obd_llog_finish(obd, 0);
4103                 if (rc != 0)
4104                         CERROR("failed to cleanup llogging subsystems\n");
4105                 break;
4106                 }
4107         }
4108         RETURN(rc);
4109 }
4110
4111 int osc_cleanup(struct obd_device *obd)
4112 {
4113         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4114         int rc;
4115
4116         ENTRY;
4117         ptlrpc_lprocfs_unregister_obd(obd);
4118         lprocfs_obd_cleanup(obd);
4119
4120         spin_lock(&oscc->oscc_lock);
4121         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4122         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4123         spin_unlock(&oscc->oscc_lock);
4124
4125         /* free memory of osc quota cache */
4126         lquota_cleanup(quota_interface, obd);
4127
4128         cache_destroy(obd->u.cli.cl_cache);
4129         rc = client_obd_cleanup(obd);
4130
4131         ptlrpcd_decref();
4132         RETURN(rc);
4133 }
4134
4135 static int osc_register_page_removal_cb(struct obd_export *exp,
4136                                         obd_page_removal_cb_t func,
4137                                         obd_pin_extent_cb pin_cb)
4138 {
4139         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4140                                            pin_cb);
4141 }
4142
4143 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4144                                           obd_page_removal_cb_t func)
4145 {
4146         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4147 }
4148
4149 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4150                                        obd_lock_cancel_cb cb)
4151 {
4152         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4153
4154         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4155         return 0;
4156 }
4157
4158 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4159                                          obd_lock_cancel_cb cb)
4160 {
4161         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4162                 CERROR("Unregistering cancel cb %p, while only %p was "
4163                        "registered\n", cb,
4164                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4165                 RETURN(-EINVAL);
4166         }
4167
4168         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4169         return 0;
4170 }
4171
4172 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4173 {
4174         struct lustre_cfg *lcfg = buf;
4175         struct lprocfs_static_vars lvars = { 0 };
4176         int rc = 0;
4177
4178         lprocfs_osc_init_vars(&lvars);
4179
4180         switch (lcfg->lcfg_command) {
4181         case LCFG_SPTLRPC_CONF:
4182                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4183                 break;
4184         default:
4185                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4186                                               lcfg, obd);
4187                 break;
4188         }
4189
4190         return(rc);
4191 }
4192
4193 struct obd_ops osc_obd_ops = {
4194         .o_owner                = THIS_MODULE,
4195         .o_setup                = osc_setup,
4196         .o_precleanup           = osc_precleanup,
4197         .o_cleanup              = osc_cleanup,
4198         .o_add_conn             = client_import_add_conn,
4199         .o_del_conn             = client_import_del_conn,
4200         .o_connect              = client_connect_import,
4201         .o_reconnect            = osc_reconnect,
4202         .o_disconnect           = osc_disconnect,
4203         .o_statfs               = osc_statfs,
4204         .o_statfs_async         = osc_statfs_async,
4205         .o_packmd               = osc_packmd,
4206         .o_unpackmd             = osc_unpackmd,
4207         .o_precreate            = osc_precreate,
4208         .o_create               = osc_create,
4209         .o_destroy              = osc_destroy,
4210         .o_getattr              = osc_getattr,
4211         .o_getattr_async        = osc_getattr_async,
4212         .o_setattr              = osc_setattr,
4213         .o_setattr_async        = osc_setattr_async,
4214         .o_brw                  = osc_brw,
4215         .o_brw_async            = osc_brw_async,
4216         .o_prep_async_page      = osc_prep_async_page,
4217         .o_reget_short_lock     = osc_reget_short_lock,
4218         .o_release_short_lock   = osc_release_short_lock,
4219         .o_queue_async_io       = osc_queue_async_io,
4220         .o_set_async_flags      = osc_set_async_flags,
4221         .o_queue_group_io       = osc_queue_group_io,
4222         .o_trigger_group_io     = osc_trigger_group_io,
4223         .o_teardown_async_page  = osc_teardown_async_page,
4224         .o_punch                = osc_punch,
4225         .o_sync                 = osc_sync,
4226         .o_enqueue              = osc_enqueue,
4227         .o_match                = osc_match,
4228         .o_change_cbdata        = osc_change_cbdata,
4229         .o_cancel               = osc_cancel,
4230         .o_cancel_unused        = osc_cancel_unused,
4231         .o_iocontrol            = osc_iocontrol,
4232         .o_get_info             = osc_get_info,
4233         .o_set_info_async       = osc_set_info_async,
4234         .o_import_event         = osc_import_event,
4235         .o_llog_init            = osc_llog_init,
4236         .o_llog_finish          = osc_llog_finish,
4237         .o_process_config       = osc_process_config,
4238         .o_register_page_removal_cb = osc_register_page_removal_cb,
4239         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4240         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4241         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4242 };
4243
4244 int __init osc_init(void)
4245 {
4246         struct lprocfs_static_vars lvars = { 0 };
4247         int rc;
4248         ENTRY;
4249
4250         lprocfs_osc_init_vars(&lvars);
4251
4252         request_module("lquota");
4253         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4254         lquota_init(quota_interface);
4255         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4256
4257         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4258                                  LUSTRE_OSC_NAME, NULL);
4259         if (rc) {
4260                 if (quota_interface)
4261                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4262                 RETURN(rc);
4263         }
4264
4265         RETURN(rc);
4266 }
4267
4268 #ifdef __KERNEL__
4269 static void /*__exit*/ osc_exit(void)
4270 {
4271         lquota_exit(quota_interface);
4272         if (quota_interface)
4273                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4274
4275         class_unregister_type(LUSTRE_OSC_NAME);
4276 }
4277
4278 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4279 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4280 MODULE_LICENSE("GPL");
4281
4282 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4283 #endif