Whamcloud - gitweb
b=11270
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_cksum.h>
48 #include <obd_ost.h>
49 #include <obd_lov.h>
50
51 #ifdef  __CYGWIN__
52 # include <ctype.h>
53 #endif
54
55 #include <lustre_ha.h>
56 #include <lprocfs_status.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include "osc_internal.h"
61
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69                       struct lov_stripe_md *lsm)
70 {
71         int lmm_size;
72         ENTRY;
73
74         lmm_size = sizeof(**lmmp);
75         if (!lmmp)
76                 RETURN(lmm_size);
77
78         if (*lmmp && !lsm) {
79                 OBD_FREE(*lmmp, lmm_size);
80                 *lmmp = NULL;
81                 RETURN(0);
82         }
83
84         if (!*lmmp) {
85                 OBD_ALLOC(*lmmp, lmm_size);
86                 if (!*lmmp)
87                         RETURN(-ENOMEM);
88         }
89
90         if (lsm) {
91                 LASSERT(lsm->lsm_object_id);
92                 LASSERT(lsm->lsm_object_gr);
93                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
94                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
95         }
96
97         RETURN(lmm_size);
98 }
99
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102                         struct lov_mds_md *lmm, int lmm_bytes)
103 {
104         int lsm_size;
105         ENTRY;
106
107         if (lmm != NULL) {
108                 if (lmm_bytes < sizeof (*lmm)) {
109                         CERROR("lov_mds_md too small: %d, need %d\n",
110                                lmm_bytes, (int)sizeof(*lmm));
111                         RETURN(-EINVAL);
112                 }
113                 /* XXX LOV_MAGIC etc check? */
114
115                 if (lmm->lmm_object_id == 0) {
116                         CERROR("lov_mds_md: zero lmm_object_id\n");
117                         RETURN(-EINVAL);
118                 }
119         }
120
121         lsm_size = lov_stripe_md_size(1);
122         if (lsmp == NULL)
123                 RETURN(lsm_size);
124
125         if (*lsmp != NULL && lmm == NULL) {
126                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127                 OBD_FREE(*lsmp, lsm_size);
128                 *lsmp = NULL;
129                 RETURN(0);
130         }
131
132         if (*lsmp == NULL) {
133                 OBD_ALLOC(*lsmp, lsm_size);
134                 if (*lsmp == NULL)
135                         RETURN(-ENOMEM);
136                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138                         OBD_FREE(*lsmp, lsm_size);
139                         RETURN(-ENOMEM);
140                 }
141                 loi_init((*lsmp)->lsm_oinfo[0]);
142         }
143
144         if (lmm != NULL) {
145                 /* XXX zero *lsmp? */
146                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
148                 LASSERT((*lsmp)->lsm_object_id);
149                 LASSERT((*lsmp)->lsm_object_gr);
150         }
151
152         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
153
154         RETURN(lsm_size);
155 }
156
157 static inline void osc_pack_capa(struct ptlrpc_request *req,
158                                  struct ost_body *body, void *capa)
159 {
160         struct obd_capa *oc = (struct obd_capa *)capa;
161         struct lustre_capa *c;
162
163         if (!capa)
164                 return;
165
166         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
167         LASSERT(c);
168         capa_cpy(c, oc);
169         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
170         DEBUG_CAPA(D_SEC, c, "pack");
171 }
172
173 static inline void osc_pack_req_body(struct ptlrpc_request *req,
174                                      struct obd_info *oinfo)
175 {
176         struct ost_body *body;
177
178         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
179         LASSERT(body);
180
181         body->oa = *oinfo->oi_oa;
182         osc_pack_capa(req, body, oinfo->oi_capa);
183 }
184
185 static inline void osc_set_capa_size(struct ptlrpc_request *req,
186                                      const struct req_msg_field *field,
187                                      struct obd_capa *oc)
188 {
189         if (oc == NULL)
190                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
191         else
192                 /* it is already calculated as sizeof struct obd_capa */
193                 ;
194 }
195
196 static int osc_getattr_interpret(struct ptlrpc_request *req,
197                                  struct osc_async_args *aa, int rc)
198 {
199         struct ost_body *body;
200         ENTRY;
201
202         if (rc != 0)
203                 GOTO(out, rc);
204
205         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
206                                   lustre_swab_ost_body);
207         if (body) {
208                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
209                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
210
211                 /* This should really be sent by the OST */
212                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
213                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
214         } else {
215                 CDEBUG(D_INFO, "can't unpack ost_body\n");
216                 rc = -EPROTO;
217                 aa->aa_oi->oi_oa->o_valid = 0;
218         }
219 out:
220         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
221         RETURN(rc);
222 }
223
224 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
225                              struct ptlrpc_request_set *set)
226 {
227         struct ptlrpc_request *req;
228         struct osc_async_args *aa;
229         int                    rc;
230         ENTRY;
231
232         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
233         if (req == NULL)
234                 RETURN(-ENOMEM);
235
236         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
237         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
238         if (rc) {
239                 ptlrpc_request_free(req);
240                 RETURN(rc);
241         }
242
243         osc_pack_req_body(req, oinfo);
244
245         ptlrpc_request_set_replen(req);
246         req->rq_interpret_reply = osc_getattr_interpret;
247
248         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
249         aa = (struct osc_async_args *)&req->rq_async_args;
250         aa->aa_oi = oinfo;
251
252         ptlrpc_set_add_req(set, req);
253         RETURN(0);
254 }
255
256 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
257 {
258         struct ptlrpc_request *req;
259         struct ost_body       *body;
260         int                    rc;
261         ENTRY;
262
263         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
264         if (req == NULL)
265                 RETURN(-ENOMEM);
266
267         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
268         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
269         if (rc) {
270                 ptlrpc_request_free(req);
271                 RETURN(rc);
272         }
273
274         osc_pack_req_body(req, oinfo);
275
276         ptlrpc_request_set_replen(req);
277  
278         rc = ptlrpc_queue_wait(req);
279         if (rc)
280                 GOTO(out, rc);
281
282         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
283         if (body == NULL)
284                 GOTO(out, rc = -EPROTO);
285
286         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
287         *oinfo->oi_oa = body->oa;
288
289         /* This should really be sent by the OST */
290         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
291         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
292
293         EXIT;
294  out:
295         ptlrpc_req_finished(req);
296         return rc;
297 }
298
299 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
300                        struct obd_trans_info *oti)
301 {
302         struct ptlrpc_request *req;
303         struct ost_body       *body;
304         int                    rc;
305         ENTRY;
306
307         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
308                                         oinfo->oi_oa->o_gr > 0);
309
310         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
311         if (req == NULL)
312                 RETURN(-ENOMEM);
313
314         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
315         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
316         if (rc) {
317                 ptlrpc_request_free(req);
318                 RETURN(rc);
319         }
320
321         osc_pack_req_body(req, oinfo);
322
323         ptlrpc_request_set_replen(req);
324  
325
326         rc = ptlrpc_queue_wait(req);
327         if (rc)
328                 GOTO(out, rc);
329
330         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
331         if (body == NULL)
332                 GOTO(out, rc = -EPROTO);
333
334         *oinfo->oi_oa = body->oa;
335
336         EXIT;
337 out:
338         ptlrpc_req_finished(req);
339         RETURN(rc);
340 }
341
342 static int osc_setattr_interpret(struct ptlrpc_request *req,
343                                  struct osc_async_args *aa, int rc)
344 {
345         struct ost_body *body;
346         ENTRY;
347
348         if (rc != 0)
349                 GOTO(out, rc);
350
351         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
352         if (body == NULL)
353                 GOTO(out, rc = -EPROTO);
354
355         *aa->aa_oi->oi_oa = body->oa;
356 out:
357         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
358         RETURN(rc);
359 }
360
361 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
362                              struct obd_trans_info *oti,
363                              struct ptlrpc_request_set *rqset)
364 {
365         struct ptlrpc_request *req;
366         struct osc_async_args *aa;
367         int                    rc;
368         ENTRY;
369
370         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
371         if (req == NULL)
372                 RETURN(-ENOMEM);
373
374         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
375         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
376         if (rc) {
377                 ptlrpc_request_free(req);
378                 RETURN(rc);
379         }
380
381         osc_pack_req_body(req, oinfo);
382
383         ptlrpc_request_set_replen(req);
384  
385         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
386                 LASSERT(oti);
387                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
388         }
389
390         /* do mds to ost setattr asynchronouly */
391         if (!rqset) {
392                 /* Do not wait for response. */
393                 ptlrpcd_add_req(req);
394         } else {
395                 req->rq_interpret_reply = osc_setattr_interpret;
396
397                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
398                 aa = (struct osc_async_args *)&req->rq_async_args;
399                 aa->aa_oi = oinfo;
400
401                 ptlrpc_set_add_req(rqset, req);
402         }
403
404         RETURN(0);
405 }
406
407 int osc_real_create(struct obd_export *exp, struct obdo *oa,
408                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
409 {
410         struct ptlrpc_request *req;
411         struct ost_body       *body;
412         struct lov_stripe_md  *lsm;
413         int                    rc;
414         ENTRY;
415
416         LASSERT(oa);
417         LASSERT(ea);
418
419         lsm = *ea;
420         if (!lsm) {
421                 rc = obd_alloc_memmd(exp, &lsm);
422                 if (rc < 0)
423                         RETURN(rc);
424         }
425
426         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
427         if (req == NULL)
428                 GOTO(out, rc = -ENOMEM);
429
430         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
431         if (rc) {
432                 ptlrpc_request_free(req);
433                 GOTO(out, rc);
434         }
435
436         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
437         LASSERT(body);
438         body->oa = *oa;
439
440         ptlrpc_request_set_replen(req);
441
442         if (oa->o_valid & OBD_MD_FLINLINE) {
443                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
444                         oa->o_flags == OBD_FL_DELORPHAN);
445                 DEBUG_REQ(D_HA, req,
446                           "delorphan from OST integration");
447                 /* Don't resend the delorphan req */
448                 req->rq_no_resend = req->rq_no_delay = 1;
449         }
450
451         rc = ptlrpc_queue_wait(req);
452         if (rc)
453                 GOTO(out_req, rc);
454
455         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
456         if (body == NULL)
457                 GOTO(out_req, rc = -EPROTO);
458
459         *oa = body->oa;
460
461         /* This should really be sent by the OST */
462         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
463         oa->o_valid |= OBD_MD_FLBLKSZ;
464
465         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
466          * have valid lsm_oinfo data structs, so don't go touching that.
467          * This needs to be fixed in a big way.
468          */
469         lsm->lsm_object_id = oa->o_id;
470         lsm->lsm_object_gr = oa->o_gr;
471         *ea = lsm;
472
473         if (oti != NULL) {
474                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
475
476                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
477                         if (!oti->oti_logcookies)
478                                 oti_alloc_cookies(oti, 1);
479                         *oti->oti_logcookies = *obdo_logcookie(oa);
480                 }
481         }
482
483         CDEBUG(D_HA, "transno: "LPD64"\n",
484                lustre_msg_get_transno(req->rq_repmsg));
485 out_req:
486         ptlrpc_req_finished(req);
487 out:
488         if (rc && !*ea)
489                 obd_free_memmd(exp, &lsm);
490         RETURN(rc);
491 }
492
493 static int osc_punch_interpret(struct ptlrpc_request *req,
494                                struct osc_async_args *aa, int rc)
495 {
496         struct ost_body *body;
497         ENTRY;
498
499         if (rc != 0)
500                 GOTO(out, rc);
501
502         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
503         if (body == NULL)
504                 GOTO(out, rc = -EPROTO);
505
506         *aa->aa_oi->oi_oa = body->oa;
507 out:
508         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
509         RETURN(rc);
510 }
511
512 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
513                      struct obd_trans_info *oti,
514                      struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request *req;
517         struct osc_async_args *aa;
518         struct ost_body       *body;
519         int                    rc;
520         ENTRY;
521
522         if (!oinfo->oi_oa) {
523                 CDEBUG(D_INFO, "oa NULL\n");
524                 RETURN(-EINVAL);
525         }
526
527         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
528         if (req == NULL)
529                 RETURN(-ENOMEM);
530
531         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
532         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
533         if (rc) {
534                 ptlrpc_request_free(req);
535                 RETURN(rc);
536         }
537         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
538         osc_pack_req_body(req, oinfo);
539
540         /* overload the size and blocks fields in the oa with start/end */
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         body->oa.o_size = oinfo->oi_policy.l_extent.start;
544         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
545         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
546         ptlrpc_request_set_replen(req);
547
548
549         req->rq_interpret_reply = osc_punch_interpret;
550         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551         aa = (struct osc_async_args *)&req->rq_async_args;
552         aa->aa_oi = oinfo;
553         ptlrpc_set_add_req(rqset, req);
554
555         RETURN(0);
556 }
557
558 static int osc_sync(struct obd_export *exp, struct obdo *oa,
559                     struct lov_stripe_md *md, obd_size start, obd_size end,
560                     void *capa)
561 {
562         struct ptlrpc_request *req;
563         struct ost_body       *body;
564         int                    rc;
565         ENTRY;
566
567         if (!oa) {
568                 CDEBUG(D_INFO, "oa NULL\n");
569                 RETURN(-EINVAL);
570         }
571
572         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
573         if (req == NULL)
574                 RETURN(-ENOMEM);
575
576         osc_set_capa_size(req, &RMF_CAPA1, capa);
577         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
578         if (rc) {
579                 ptlrpc_request_free(req);
580                 RETURN(rc);
581         }
582
583         /* overload the size and blocks fields in the oa with start/end */
584         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
585         LASSERT(body);
586         body->oa = *oa;
587         body->oa.o_size = start;
588         body->oa.o_blocks = end;
589         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
590         osc_pack_capa(req, body, capa);
591
592         ptlrpc_request_set_replen(req);
593
594         rc = ptlrpc_queue_wait(req);
595         if (rc)
596                 GOTO(out, rc);
597
598         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
599         if (body == NULL)
600                 GOTO(out, rc = -EPROTO);
601
602         *oa = body->oa;
603
604         EXIT;
605  out:
606         ptlrpc_req_finished(req);
607         return rc;
608 }
609
610 /* Find and cancel locally locks matched by @mode in the resource found by
611  * @objid. Found locks are added into @cancel list. Returns the amount of
612  * locks added to @cancels list. */
613 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
614                                    struct list_head *cancels, ldlm_mode_t mode,
615                                    int lock_flags)
616 {
617         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
618         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
619         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
620         int count;
621         ENTRY;
622
623         if (res == NULL)
624                 RETURN(0);
625
626         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
627                                            lock_flags, 0, NULL);
628         ldlm_resource_putref(res);
629         RETURN(count);
630 }
631
632 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
633                                  int rc)
634 {
635         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
636
637         atomic_dec(&cli->cl_destroy_in_flight);
638         cfs_waitq_signal(&cli->cl_destroy_waitq);
639         return 0;
640 }
641
642 static int osc_can_send_destroy(struct client_obd *cli)
643 {
644         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
645             cli->cl_max_rpcs_in_flight) {
646                 /* The destroy request can be sent */
647                 return 1;
648         }
649         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
650             cli->cl_max_rpcs_in_flight) {
651                 /*
652                  * The counter has been modified between the two atomic
653                  * operations.
654                  */
655                 cfs_waitq_signal(&cli->cl_destroy_waitq);
656         }
657         return 0;
658 }
659
660 /* Destroy requests can be async always on the client, and we don't even really
661  * care about the return code since the client cannot do anything at all about
662  * a destroy failure.
663  * When the MDS is unlinking a filename, it saves the file objects into a
664  * recovery llog, and these object records are cancelled when the OST reports
665  * they were destroyed and sync'd to disk (i.e. transaction committed).
666  * If the client dies, or the OST is down when the object should be destroyed,
667  * the records are not cancelled, and when the OST reconnects to the MDS next,
668  * it will retrieve the llog unlink logs and then sends the log cancellation
669  * cookies to the MDS after committing destroy transactions. */
670 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
671                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
672                        struct obd_export *md_export)
673 {
674         struct client_obd     *cli = &exp->exp_obd->u.cli;
675         struct ptlrpc_request *req;
676         struct ost_body       *body;
677         CFS_LIST_HEAD(cancels);
678         int rc, count;
679         ENTRY;
680
681         if (!oa) {
682                 CDEBUG(D_INFO, "oa NULL\n");
683                 RETURN(-EINVAL);
684         }
685
686         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
687                                         LDLM_FL_DISCARD_DATA);
688
689         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
690         if (req == NULL) {
691                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
692                 RETURN(-ENOMEM);
693         }
694
695         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
696                                0, &cancels, count);
697         if (rc) {
698                 ptlrpc_request_free(req);
699                 RETURN(rc);
700         }
701
702         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
703         req->rq_interpret_reply = osc_destroy_interpret;
704
705         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
706                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
707                        sizeof(*oti->oti_logcookies));
708         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
709         LASSERT(body);
710         body->oa = *oa;
711
712         ptlrpc_request_set_replen(req);
713
714         if (!osc_can_send_destroy(cli)) {
715                 struct l_wait_info lwi = { 0 };
716
717                 /*
718                  * Wait until the number of on-going destroy RPCs drops
719                  * under max_rpc_in_flight
720                  */
721                 l_wait_event_exclusive(cli->cl_destroy_waitq,
722                                        osc_can_send_destroy(cli), &lwi);
723         }
724
725         /* Do not wait for response */
726         ptlrpcd_add_req(req);
727         RETURN(0);
728 }
729
730 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
731                                 long writing_bytes)
732 {
733         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
734
735         LASSERT(!(oa->o_valid & bits));
736
737         oa->o_valid |= bits;
738         client_obd_list_lock(&cli->cl_loi_list_lock);
739         oa->o_dirty = cli->cl_dirty;
740         if (cli->cl_dirty > cli->cl_dirty_max) {
741                 CERROR("dirty %lu > dirty_max %lu\n",
742                        cli->cl_dirty, cli->cl_dirty_max);
743                 oa->o_undirty = 0;
744         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
745                 CERROR("dirty %d > system dirty_max %d\n",
746                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
747                 oa->o_undirty = 0;
748         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
749                 CERROR("dirty %lu - dirty_max %lu too big???\n",
750                        cli->cl_dirty, cli->cl_dirty_max);
751                 oa->o_undirty = 0;
752         } else {
753                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
754                                 (cli->cl_max_rpcs_in_flight + 1);
755                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
756         }
757         oa->o_grant = cli->cl_avail_grant;
758         oa->o_dropped = cli->cl_lost_grant;
759         cli->cl_lost_grant = 0;
760         client_obd_list_unlock(&cli->cl_loi_list_lock);
761         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
762                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
763 }
764
765 /* caller must hold loi_list_lock */
766 static void osc_consume_write_grant(struct client_obd *cli,
767                                     struct brw_page *pga)
768 {
769         atomic_inc(&obd_dirty_pages);
770         cli->cl_dirty += CFS_PAGE_SIZE;
771         cli->cl_avail_grant -= CFS_PAGE_SIZE;
772         pga->flag |= OBD_BRW_FROM_GRANT;
773         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
774                CFS_PAGE_SIZE, pga, pga->pg);
775         LASSERT(cli->cl_avail_grant >= 0);
776 }
777
778 /* the companion to osc_consume_write_grant, called when a brw has completed.
779  * must be called with the loi lock held. */
780 static void osc_release_write_grant(struct client_obd *cli,
781                                     struct brw_page *pga, int sent)
782 {
783         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
784         ENTRY;
785
786         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
787                 EXIT;
788                 return;
789         }
790
791         pga->flag &= ~OBD_BRW_FROM_GRANT;
792         atomic_dec(&obd_dirty_pages);
793         cli->cl_dirty -= CFS_PAGE_SIZE;
794         if (!sent) {
795                 cli->cl_lost_grant += CFS_PAGE_SIZE;
796                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
797                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
798         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
799                 /* For short writes we shouldn't count parts of pages that
800                  * span a whole block on the OST side, or our accounting goes
801                  * wrong.  Should match the code in filter_grant_check. */
802                 int offset = pga->off & ~CFS_PAGE_MASK;
803                 int count = pga->count + (offset & (blocksize - 1));
804                 int end = (offset + pga->count) & (blocksize - 1);
805                 if (end)
806                         count += blocksize - end;
807
808                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
809                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
810                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
811                        cli->cl_avail_grant, cli->cl_dirty);
812         }
813
814         EXIT;
815 }
816
817 static unsigned long rpcs_in_flight(struct client_obd *cli)
818 {
819         return cli->cl_r_in_flight + cli->cl_w_in_flight;
820 }
821
822 /* caller must hold loi_list_lock */
823 void osc_wake_cache_waiters(struct client_obd *cli)
824 {
825         struct list_head *l, *tmp;
826         struct osc_cache_waiter *ocw;
827
828         ENTRY;
829         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
830                 /* if we can't dirty more, we must wait until some is written */
831                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
832                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
833                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
834                                "osc max %ld, sys max %d\n", cli->cl_dirty,
835                                cli->cl_dirty_max, obd_max_dirty_pages);
836                         return;
837                 }
838
839                 /* if still dirty cache but no grant wait for pending RPCs that
840                  * may yet return us some grant before doing sync writes */
841                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
842                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
843                                cli->cl_w_in_flight);
844                         return;
845                 }
846
847                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
848                 list_del_init(&ocw->ocw_entry);
849                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
850                         /* no more RPCs in flight to return grant, do sync IO */
851                         ocw->ocw_rc = -EDQUOT;
852                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
853                 } else {
854                         osc_consume_write_grant(cli,
855                                                 &ocw->ocw_oap->oap_brw_page);
856                 }
857
858                 cfs_waitq_signal(&ocw->ocw_waitq);
859         }
860
861         EXIT;
862 }
863
864 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
865 {
866         client_obd_list_lock(&cli->cl_loi_list_lock);
867         cli->cl_avail_grant = ocd->ocd_grant;
868         client_obd_list_unlock(&cli->cl_loi_list_lock);
869
870         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
871                cli->cl_avail_grant, cli->cl_lost_grant);
872         LASSERT(cli->cl_avail_grant >= 0);
873 }
874
875 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
879         if (body->oa.o_valid & OBD_MD_FLGRANT)
880                 cli->cl_avail_grant += body->oa.o_grant;
881         /* waiters are woken in brw_interpret_oap */
882         client_obd_list_unlock(&cli->cl_loi_list_lock);
883 }
884
885 /* We assume that the reason this OSC got a short read is because it read
886  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
887  * via the LOV, and it _knows_ it's reading inside the file, it's just that
888  * this stripe never got written at or beyond this stripe offset yet. */
889 static void handle_short_read(int nob_read, obd_count page_count,
890                               struct brw_page **pga)
891 {
892         char *ptr;
893         int i = 0;
894
895         /* skip bytes read OK */
896         while (nob_read > 0) {
897                 LASSERT (page_count > 0);
898
899                 if (pga[i]->count > nob_read) {
900                         /* EOF inside this page */
901                         ptr = cfs_kmap(pga[i]->pg) +
902                                 (pga[i]->off & ~CFS_PAGE_MASK);
903                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
904                         cfs_kunmap(pga[i]->pg);
905                         page_count--;
906                         i++;
907                         break;
908                 }
909
910                 nob_read -= pga[i]->count;
911                 page_count--;
912                 i++;
913         }
914
915         /* zero remaining pages */
916         while (page_count-- > 0) {
917                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
918                 memset(ptr, 0, pga[i]->count);
919                 cfs_kunmap(pga[i]->pg);
920                 i++;
921         }
922 }
923
924 static int check_write_rcs(struct ptlrpc_request *req,
925                            int requested_nob, int niocount,
926                            obd_count page_count, struct brw_page **pga)
927 {
928         int    *remote_rcs, i;
929
930         /* return error if any niobuf was in error */
931         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
932                                         sizeof(*remote_rcs) * niocount, NULL);
933         if (remote_rcs == NULL) {
934                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
935                 return(-EPROTO);
936         }
937         if (lustre_msg_swabbed(req->rq_repmsg))
938                 for (i = 0; i < niocount; i++)
939                         __swab32s(&remote_rcs[i]);
940
941         for (i = 0; i < niocount; i++) {
942                 if (remote_rcs[i] < 0)
943                         return(remote_rcs[i]);
944
945                 if (remote_rcs[i] != 0) {
946                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
947                                 i, remote_rcs[i], req);
948                         return(-EPROTO);
949                 }
950         }
951
952         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
953                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
954                        requested_nob, req->rq_bulk->bd_nob_transferred);
955                 return(-EPROTO);
956         }
957
958         return (0);
959 }
960
961 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
962 {
963         if (p1->flag != p2->flag) {
964                 unsigned mask = ~OBD_BRW_FROM_GRANT;
965
966                 /* warn if we try to combine flags that we don't know to be
967                  * safe to combine */
968                 if ((p1->flag & mask) != (p2->flag & mask))
969                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
970                                "same brw?\n", p1->flag, p2->flag);
971                 return 0;
972         }
973
974         return (p1->off + p1->count == p2->off);
975 }
976
977 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
978                                    struct brw_page **pga, int opc,
979                                    cksum_type_t cksum_type)
980 {
981         __u32 cksum;
982         int i = 0;
983
984         LASSERT (pg_count > 0);
985         cksum = init_checksum(cksum_type);
986         while (nob > 0 && pg_count > 0) {
987                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
988                 int off = pga[i]->off & ~CFS_PAGE_MASK;
989                 int count = pga[i]->count > nob ? nob : pga[i]->count;
990
991                 /* corrupt the data before we compute the checksum, to
992                  * simulate an OST->client data error */
993                 if (i == 0 && opc == OST_READ &&
994                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
995                         memcpy(ptr + off, "bad1", min(4, nob));
996                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
997                 cfs_kunmap(pga[i]->pg);
998                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
999                                off, cksum);
1000
1001                 nob -= pga[i]->count;
1002                 pg_count--;
1003                 i++;
1004         }
1005         /* For sending we only compute the wrong checksum instead
1006          * of corrupting the data so it is still correct on a redo */
1007         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1008                 cksum++;
1009
1010         return cksum;
1011 }
1012
1013 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1014                                 struct lov_stripe_md *lsm, obd_count page_count,
1015                                 struct brw_page **pga, 
1016                                 struct ptlrpc_request **reqp,
1017                                 struct obd_capa *ocapa)
1018 {
1019         struct ptlrpc_request   *req;
1020         struct ptlrpc_bulk_desc *desc;
1021         struct ost_body         *body;
1022         struct obd_ioobj        *ioobj;
1023         struct niobuf_remote    *niobuf;
1024         int niocount, i, requested_nob, opc, rc;
1025         struct osc_brw_async_args *aa;
1026         struct req_capsule      *pill;
1027
1028         ENTRY;
1029         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1030                 RETURN(-ENOMEM); /* Recoverable */
1031         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1032                 RETURN(-EINVAL); /* Fatal */
1033
1034         if ((cmd & OBD_BRW_WRITE) != 0) {
1035                 opc = OST_WRITE;
1036                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1037                                                 cli->cl_import->imp_rq_pool,
1038                                                 &RQF_OST_BRW);
1039         } else {
1040                 opc = OST_READ;
1041                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1042         }
1043
1044         if (req == NULL)
1045                 RETURN(-ENOMEM);
1046
1047         for (niocount = i = 1; i < page_count; i++) {
1048                 if (!can_merge_pages(pga[i - 1], pga[i]))
1049                         niocount++;
1050         }
1051
1052         pill = &req->rq_pill;
1053         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1054                              niocount * sizeof(*niobuf));
1055         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1056
1057         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1058         if (rc) {
1059                 ptlrpc_request_free(req);
1060                 RETURN(rc);
1061         }
1062         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1063
1064         if (opc == OST_WRITE)
1065                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1066                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1067         else
1068                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1069                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1070
1071         if (desc == NULL)
1072                 GOTO(out, rc = -ENOMEM);
1073         /* NB request now owns desc and will free it when it gets freed */
1074
1075         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1076         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1077         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1078         LASSERT(body && ioobj && niobuf);
1079
1080         body->oa = *oa;
1081
1082         obdo_to_ioobj(oa, ioobj);
1083         ioobj->ioo_bufcnt = niocount;
1084         osc_pack_capa(req, body, ocapa);
1085         LASSERT (page_count > 0);
1086         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1087                 struct brw_page *pg = pga[i];
1088                 struct brw_page *pg_prev = pga[i - 1];
1089
1090                 LASSERT(pg->count > 0);
1091                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1092                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1093                          pg->off, pg->count);
1094 #ifdef __linux__
1095                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1096                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1097                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1098                          i, page_count,
1099                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1100                          pg_prev->pg, page_private(pg_prev->pg),
1101                          pg_prev->pg->index, pg_prev->off);
1102 #else
1103                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1104                          "i %d p_c %u\n", i, page_count);
1105 #endif
1106                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1107                         (pg->flag & OBD_BRW_SRVLOCK));
1108
1109                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1110                                       pg->count);
1111                 requested_nob += pg->count;
1112
1113                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1114                         niobuf--;
1115                         niobuf->len += pg->count;
1116                 } else {
1117                         niobuf->offset = pg->off;
1118                         niobuf->len    = pg->count;
1119                         niobuf->flags  = pg->flag;
1120                 }
1121         }
1122
1123         LASSERT((void *)(niobuf - niocount) ==
1124                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1125                                niocount * sizeof(*niobuf)));
1126         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1127
1128         /* size[REQ_REC_OFF] still sizeof (*body) */
1129         if (opc == OST_WRITE) {
1130                 if (unlikely(cli->cl_checksum) &&
1131                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1132                         /* store cl_cksum_type in a local variable since
1133                          * it can be changed via lprocfs */
1134                         cksum_type_t cksum_type = cli->cl_cksum_type;
1135
1136                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1137                                 oa->o_flags = body->oa.o_flags = 0;
1138                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1139                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1140                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1141                                                              page_count, pga,
1142                                                              OST_WRITE,
1143                                                              cksum_type);
1144                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1145                                body->oa.o_cksum);
1146                         /* save this in 'oa', too, for later checking */
1147                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1148                         oa->o_flags |= cksum_type_pack(cksum_type);
1149                 } else {
1150                         /* clear out the checksum flag, in case this is a
1151                          * resend but cl_checksum is no longer set. b=11238 */
1152                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1153                 }
1154                 oa->o_cksum = body->oa.o_cksum;
1155                 /* 1 RC per niobuf */
1156                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1157                                      sizeof(__u32) * niocount);
1158         } else {
1159                 if (unlikely(cli->cl_checksum) &&
1160                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1161                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1162                                 body->oa.o_flags = 0;
1163                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1164                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1165                 }
1166                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1167                 /* 1 RC for the whole I/O */
1168         }
1169         ptlrpc_request_set_replen(req);
1170
1171         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1172         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1173         aa->aa_oa = oa;
1174         aa->aa_requested_nob = requested_nob;
1175         aa->aa_nio_count = niocount;
1176         aa->aa_page_count = page_count;
1177         aa->aa_resends = 0;
1178         aa->aa_ppga = pga;
1179         aa->aa_cli = cli;
1180         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1181
1182         *reqp = req;
1183         RETURN(0);
1184
1185  out:
1186         ptlrpc_req_finished(req);
1187         RETURN(rc);
1188 }
1189
1190 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1191                                 __u32 client_cksum, __u32 server_cksum, int nob,
1192                                 obd_count page_count, struct brw_page **pga,
1193                                 cksum_type_t client_cksum_type)
1194 {
1195         __u32 new_cksum;
1196         char *msg;
1197         cksum_type_t cksum_type;
1198
1199         if (server_cksum == client_cksum) {
1200                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1201                 return 0;
1202         }
1203
1204         if (oa->o_valid & OBD_MD_FLFLAGS)
1205                 cksum_type = cksum_type_unpack(oa->o_flags);
1206         else
1207                 cksum_type = OBD_CKSUM_CRC32;
1208
1209         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1210                                       cksum_type);
1211
1212         if (cksum_type != client_cksum_type)
1213                 msg = "the server did not use the checksum type specified in "
1214                       "the original request - likely a protocol problem";
1215         else if (new_cksum == server_cksum)
1216                 msg = "changed on the client after we checksummed it - "
1217                       "likely false positive due to mmap IO (bug 11742)";
1218         else if (new_cksum == client_cksum)
1219                 msg = "changed in transit before arrival at OST";
1220         else
1221                 msg = "changed in transit AND doesn't match the original - "
1222                       "likely false positive due to mmap IO (bug 11742)";
1223
1224         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1225                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1226                            "["LPU64"-"LPU64"]\n",
1227                            msg, libcfs_nid2str(peer->nid),
1228                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1229                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1230                                                         (__u64)0,
1231                            oa->o_id,
1232                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1233                            pga[0]->off,
1234                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1235         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1236                "client csum now %x\n", client_cksum, client_cksum_type,
1237                server_cksum, cksum_type, new_cksum);
1238         return 1;        
1239 }
1240
1241 /* Note rc enters this function as number of bytes transferred */
1242 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1243 {
1244         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1245         const lnet_process_id_t *peer =
1246                         &req->rq_import->imp_connection->c_peer;
1247         struct client_obd *cli = aa->aa_cli;
1248         struct ost_body *body;
1249         __u32 client_cksum = 0;
1250         ENTRY;
1251
1252         if (rc < 0 && rc != -EDQUOT)
1253                 RETURN(rc);
1254
1255         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1256         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1257                                   lustre_swab_ost_body);
1258         if (body == NULL) {
1259                 CDEBUG(D_INFO, "Can't unpack body\n");
1260                 RETURN(-EPROTO);
1261         }
1262
1263         /* set/clear over quota flag for a uid/gid */
1264         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1265             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1266                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1267                              body->oa.o_gid, body->oa.o_valid,
1268                              body->oa.o_flags);
1269
1270         if (rc < 0)
1271                 RETURN(rc);
1272
1273         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1274                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1275
1276         osc_update_grant(cli, body);
1277
1278         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1279                 if (rc > 0) {
1280                         CERROR("Unexpected +ve rc %d\n", rc);
1281                         RETURN(-EPROTO);
1282                 }
1283                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1284
1285                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1286                     check_write_checksum(&body->oa, peer, client_cksum,
1287                                          body->oa.o_cksum, aa->aa_requested_nob,
1288                                          aa->aa_page_count, aa->aa_ppga,
1289                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1290                         RETURN(-EAGAIN);
1291
1292                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1293                         RETURN(-EAGAIN);
1294
1295                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1296                                      aa->aa_page_count, aa->aa_ppga);
1297                 GOTO(out, rc);
1298         }
1299
1300         /* The rest of this function executes only for OST_READs */
1301         if (rc > aa->aa_requested_nob) {
1302                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1303                        aa->aa_requested_nob);
1304                 RETURN(-EPROTO);
1305         }
1306
1307         if (rc != req->rq_bulk->bd_nob_transferred) {
1308                 CERROR ("Unexpected rc %d (%d transferred)\n",
1309                         rc, req->rq_bulk->bd_nob_transferred);
1310                 return (-EPROTO);
1311         }
1312
1313         if (rc < aa->aa_requested_nob)
1314                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1315
1316         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1317                                          aa->aa_ppga))
1318                 GOTO(out, rc = -EAGAIN);
1319
1320         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1321                 static int cksum_counter;
1322                 __u32      server_cksum = body->oa.o_cksum;
1323                 char      *via;
1324                 char      *router;
1325                 cksum_type_t cksum_type;
1326
1327                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1328                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1329                 else
1330                         cksum_type = OBD_CKSUM_CRC32;
1331                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1332                                                  aa->aa_ppga, OST_READ,
1333                                                  cksum_type);
1334
1335                 if (peer->nid == req->rq_bulk->bd_sender) {
1336                         via = router = "";
1337                 } else {
1338                         via = " via ";
1339                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1340                 }
1341
1342                 if (server_cksum == ~0 && rc > 0) {
1343                         CERROR("Protocol error: server %s set the 'checksum' "
1344                                "bit, but didn't send a checksum.  Not fatal, "
1345                                "but please tell CFS.\n",
1346                                libcfs_nid2str(peer->nid));
1347                 } else if (server_cksum != client_cksum) {
1348                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1349                                            "%s%s%s inum "LPU64"/"LPU64" object "
1350                                            LPU64"/"LPU64" extent "
1351                                            "["LPU64"-"LPU64"]\n",
1352                                            req->rq_import->imp_obd->obd_name,
1353                                            libcfs_nid2str(peer->nid),
1354                                            via, router,
1355                                            body->oa.o_valid & OBD_MD_FLFID ?
1356                                                 body->oa.o_fid : (__u64)0,
1357                                            body->oa.o_valid & OBD_MD_FLFID ?
1358                                                 body->oa.o_generation :(__u64)0,
1359                                            body->oa.o_id,
1360                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1361                                                 body->oa.o_gr : (__u64)0,
1362                                            aa->aa_ppga[0]->off,
1363                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1364                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1365                                                                         1);
1366                         CERROR("client %x, server %x, cksum_type %x\n",
1367                                client_cksum, server_cksum, cksum_type);
1368                         cksum_counter = 0;
1369                         aa->aa_oa->o_cksum = client_cksum;
1370                         rc = -EAGAIN;
1371                 } else {
1372                         cksum_counter++;
1373                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1374                         rc = 0;
1375                 }
1376         } else if (unlikely(client_cksum)) {
1377                 static int cksum_missed;
1378
1379                 cksum_missed++;
1380                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1381                         CERROR("Checksum %u requested from %s but not sent\n",
1382                                cksum_missed, libcfs_nid2str(peer->nid));
1383         } else {
1384                 rc = 0;
1385         }
1386 out:
1387         if (rc >= 0)
1388                 *aa->aa_oa = body->oa;
1389
1390         RETURN(rc);
1391 }
1392
1393 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1394                             struct lov_stripe_md *lsm,
1395                             obd_count page_count, struct brw_page **pga,
1396                             struct obd_capa *ocapa)
1397 {
1398         struct ptlrpc_request *req;
1399         int                    rc;
1400         cfs_waitq_t            waitq;
1401         int                    resends = 0;
1402         struct l_wait_info     lwi;
1403
1404         ENTRY;
1405
1406         cfs_waitq_init(&waitq);
1407
1408 restart_bulk:
1409         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1410                                   page_count, pga, &req, ocapa);
1411         if (rc != 0)
1412                 return (rc);
1413
1414         rc = ptlrpc_queue_wait(req);
1415
1416         if (rc == -ETIMEDOUT && req->rq_resend) {
1417                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1418                 ptlrpc_req_finished(req);
1419                 goto restart_bulk;
1420         }
1421
1422         rc = osc_brw_fini_request(req, rc);
1423
1424         ptlrpc_req_finished(req);
1425         if (osc_recoverable_error(rc)) {
1426                 resends++;
1427                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1428                         CERROR("too many resend retries, returning error\n");
1429                         RETURN(-EIO);
1430                 }
1431
1432                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1433                 l_wait_event(waitq, 0, &lwi);
1434
1435                 goto restart_bulk;
1436         }
1437         
1438         RETURN (rc);
1439 }
1440
1441 int osc_brw_redo_request(struct ptlrpc_request *request,
1442                          struct osc_brw_async_args *aa)
1443 {
1444         struct ptlrpc_request *new_req;
1445         struct ptlrpc_request_set *set = request->rq_set;
1446         struct osc_brw_async_args *new_aa;
1447         struct osc_async_page *oap;
1448         int rc = 0;
1449         ENTRY;
1450
1451         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1452                 CERROR("too many resend retries, returning error\n");
1453                 RETURN(-EIO);
1454         }
1455
1456         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1457 /*
1458         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1459         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1460                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1461                                            REQ_REC_OFF + 3);
1462 */
1463         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1464                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1465                                   aa->aa_cli, aa->aa_oa,
1466                                   NULL /* lsm unused by osc currently */,
1467                                   aa->aa_page_count, aa->aa_ppga, 
1468                                   &new_req, NULL /* ocapa */);
1469         if (rc)
1470                 RETURN(rc);
1471
1472         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1473
1474         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1475                 if (oap->oap_request != NULL) {
1476                         LASSERTF(request == oap->oap_request,
1477                                  "request %p != oap_request %p\n",
1478                                  request, oap->oap_request);
1479                         if (oap->oap_interrupted) {
1480                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1481                                 ptlrpc_req_finished(new_req);
1482                                 RETURN(-EINTR);
1483                         }
1484                 }
1485         }
1486         /* New request takes over pga and oaps from old request.
1487          * Note that copying a list_head doesn't work, need to move it... */
1488         aa->aa_resends++;
1489         new_req->rq_interpret_reply = request->rq_interpret_reply;
1490         new_req->rq_async_args = request->rq_async_args;
1491         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1492
1493         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1494
1495         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1496         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1497         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1498
1499         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1500                 if (oap->oap_request) {
1501                         ptlrpc_req_finished(oap->oap_request);
1502                         oap->oap_request = ptlrpc_request_addref(new_req);
1503                 }
1504         }
1505
1506         /* use ptlrpc_set_add_req is safe because interpret functions work 
1507          * in check_set context. only one way exist with access to request 
1508          * from different thread got -EINTR - this way protected with 
1509          * cl_loi_list_lock */
1510         ptlrpc_set_add_req(set, new_req);
1511
1512         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1513
1514         DEBUG_REQ(D_INFO, new_req, "new request");
1515         RETURN(0);
1516 }
1517
1518 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1519 {
1520         struct osc_brw_async_args *aa = data;
1521         int                        i;
1522         ENTRY;
1523
1524         rc = osc_brw_fini_request(req, rc);
1525         if (osc_recoverable_error(rc)) {
1526                 rc = osc_brw_redo_request(req, aa);
1527                 if (rc == 0)
1528                         RETURN(0);
1529         }
1530
1531         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1532         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1533                 aa->aa_cli->cl_w_in_flight--;
1534         else
1535                 aa->aa_cli->cl_r_in_flight--;
1536         for (i = 0; i < aa->aa_page_count; i++)
1537                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1538         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1539
1540         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1541
1542         RETURN(rc);
1543 }
1544
1545 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1546                           struct lov_stripe_md *lsm, obd_count page_count,
1547                           struct brw_page **pga, struct ptlrpc_request_set *set,
1548                           struct obd_capa *ocapa)
1549 {
1550         struct ptlrpc_request     *req;
1551         struct client_obd         *cli = &exp->exp_obd->u.cli;
1552         int                        rc, i;
1553         struct osc_brw_async_args *aa;
1554         ENTRY;
1555
1556         /* Consume write credits even if doing a sync write -
1557          * otherwise we may run out of space on OST due to grant. */
1558         if (cmd == OBD_BRW_WRITE) {
1559                 spin_lock(&cli->cl_loi_list_lock);
1560                 for (i = 0; i < page_count; i++) {
1561                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1562                                 osc_consume_write_grant(cli, pga[i]);
1563                 }
1564                 spin_unlock(&cli->cl_loi_list_lock);
1565         }
1566
1567         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1568                                   &req, ocapa);
1569
1570         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1571         if (cmd == OBD_BRW_READ) {
1572                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1573                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1574                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1575         } else {
1576                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1577                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1578                                  cli->cl_w_in_flight);
1579                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1580         }
1581
1582         if (rc == 0) {
1583                 req->rq_interpret_reply = brw_interpret;
1584                 ptlrpc_set_add_req(set, req);
1585                 client_obd_list_lock(&cli->cl_loi_list_lock);
1586                 if (cmd == OBD_BRW_READ)
1587                         cli->cl_r_in_flight++;
1588                 else
1589                         cli->cl_w_in_flight++;
1590                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1591         } else if (cmd == OBD_BRW_WRITE) {
1592                 client_obd_list_lock(&cli->cl_loi_list_lock);
1593                 for (i = 0; i < page_count; i++)
1594                         osc_release_write_grant(cli, pga[i], 0);
1595                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1596         }
1597         RETURN (rc);
1598 }
1599
1600 /*
1601  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1602  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1603  * fine for our small page arrays and doesn't require allocation.  its an
1604  * insertion sort that swaps elements that are strides apart, shrinking the
1605  * stride down until its '1' and the array is sorted.
1606  */
1607 static void sort_brw_pages(struct brw_page **array, int num)
1608 {
1609         int stride, i, j;
1610         struct brw_page *tmp;
1611
1612         if (num == 1)
1613                 return;
1614         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1615                 ;
1616
1617         do {
1618                 stride /= 3;
1619                 for (i = stride ; i < num ; i++) {
1620                         tmp = array[i];
1621                         j = i;
1622                         while (j >= stride && array[j - stride]->off > tmp->off) {
1623                                 array[j] = array[j - stride];
1624                                 j -= stride;
1625                         }
1626                         array[j] = tmp;
1627                 }
1628         } while (stride > 1);
1629 }
1630
1631 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1632 {
1633         int count = 1;
1634         int offset;
1635         int i = 0;
1636
1637         LASSERT (pages > 0);
1638         offset = pg[i]->off & ~CFS_PAGE_MASK;
1639
1640         for (;;) {
1641                 pages--;
1642                 if (pages == 0)         /* that's all */
1643                         return count;
1644
1645                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1646                         return count;   /* doesn't end on page boundary */
1647
1648                 i++;
1649                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1650                 if (offset != 0)        /* doesn't start on page boundary */
1651                         return count;
1652
1653                 count++;
1654         }
1655 }
1656
1657 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1658 {
1659         struct brw_page **ppga;
1660         int i;
1661
1662         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1663         if (ppga == NULL)
1664                 return NULL;
1665
1666         for (i = 0; i < count; i++)
1667                 ppga[i] = pga + i;
1668         return ppga;
1669 }
1670
1671 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1672 {
1673         LASSERT(ppga != NULL);
1674         OBD_FREE(ppga, sizeof(*ppga) * count);
1675 }
1676
1677 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1678                    obd_count page_count, struct brw_page *pga,
1679                    struct obd_trans_info *oti)
1680 {
1681         struct obdo *saved_oa = NULL;
1682         struct brw_page **ppga, **orig;
1683         struct obd_import *imp = class_exp2cliimp(exp);
1684         struct client_obd *cli = &imp->imp_obd->u.cli;
1685         int rc, page_count_orig;
1686         ENTRY;
1687
1688         if (cmd & OBD_BRW_CHECK) {
1689                 /* The caller just wants to know if there's a chance that this
1690                  * I/O can succeed */
1691
1692                 if (imp == NULL || imp->imp_invalid)
1693                         RETURN(-EIO);
1694                 RETURN(0);
1695         }
1696
1697         /* test_brw with a failed create can trip this, maybe others. */
1698         LASSERT(cli->cl_max_pages_per_rpc);
1699
1700         rc = 0;
1701
1702         orig = ppga = osc_build_ppga(pga, page_count);
1703         if (ppga == NULL)
1704                 RETURN(-ENOMEM);
1705         page_count_orig = page_count;
1706
1707         sort_brw_pages(ppga, page_count);
1708         while (page_count) {
1709                 obd_count pages_per_brw;
1710
1711                 if (page_count > cli->cl_max_pages_per_rpc)
1712                         pages_per_brw = cli->cl_max_pages_per_rpc;
1713                 else
1714                         pages_per_brw = page_count;
1715
1716                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1717
1718                 if (saved_oa != NULL) {
1719                         /* restore previously saved oa */
1720                         *oinfo->oi_oa = *saved_oa;
1721                 } else if (page_count > pages_per_brw) {
1722                         /* save a copy of oa (brw will clobber it) */
1723                         OBDO_ALLOC(saved_oa);
1724                         if (saved_oa == NULL)
1725                                 GOTO(out, rc = -ENOMEM);
1726                         *saved_oa = *oinfo->oi_oa;
1727                 }
1728
1729                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1730                                       pages_per_brw, ppga, oinfo->oi_capa);
1731
1732                 if (rc != 0)
1733                         break;
1734
1735                 page_count -= pages_per_brw;
1736                 ppga += pages_per_brw;
1737         }
1738
1739 out:
1740         osc_release_ppga(orig, page_count_orig);
1741
1742         if (saved_oa != NULL)
1743                 OBDO_FREE(saved_oa);
1744
1745         RETURN(rc);
1746 }
1747
1748 static int osc_brw_async(int cmd, struct obd_export *exp,
1749                          struct obd_info *oinfo, obd_count page_count,
1750                          struct brw_page *pga, struct obd_trans_info *oti,
1751                          struct ptlrpc_request_set *set)
1752 {
1753         struct brw_page **ppga, **orig;
1754         struct client_obd *cli = &exp->exp_obd->u.cli;
1755         int page_count_orig;
1756         int rc = 0;
1757         ENTRY;
1758
1759         if (cmd & OBD_BRW_CHECK) {
1760                 struct obd_import *imp = class_exp2cliimp(exp);
1761                 /* The caller just wants to know if there's a chance that this
1762                  * I/O can succeed */
1763
1764                 if (imp == NULL || imp->imp_invalid)
1765                         RETURN(-EIO);
1766                 RETURN(0);
1767         }
1768
1769         orig = ppga = osc_build_ppga(pga, page_count);
1770         if (ppga == NULL)
1771                 RETURN(-ENOMEM);
1772         page_count_orig = page_count;
1773
1774         sort_brw_pages(ppga, page_count);
1775         while (page_count) {
1776                 struct brw_page **copy;
1777                 obd_count pages_per_brw;
1778
1779                 pages_per_brw = min_t(obd_count, page_count,
1780                                       cli->cl_max_pages_per_rpc);
1781
1782                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1783
1784                 /* use ppga only if single RPC is going to fly */
1785                 if (pages_per_brw != page_count_orig || ppga != orig) {
1786                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1787                         if (copy == NULL)
1788                                 GOTO(out, rc = -ENOMEM);
1789                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1790                 } else
1791                         copy = ppga;
1792
1793                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1794                                     pages_per_brw, copy, set, oinfo->oi_capa);
1795
1796                 if (rc != 0) {
1797                         if (copy != ppga)
1798                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1799                         break;
1800                 }
1801                 if (copy == orig) {
1802                         /* we passed it to async_internal() which is
1803                          * now responsible for releasing memory */
1804                         orig = NULL;
1805                 }
1806
1807                 page_count -= pages_per_brw;
1808                 ppga += pages_per_brw;
1809         }
1810 out:
1811         if (orig)
1812                 osc_release_ppga(orig, page_count_orig);
1813         RETURN(rc);
1814 }
1815
1816 static void osc_check_rpcs(struct client_obd *cli);
1817
1818 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1819  * the dirty accounting.  Writeback completes or truncate happens before
1820  * writing starts.  Must be called with the loi lock held. */
1821 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1822                            int sent)
1823 {
1824         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1825 }
1826
1827
1828 /* This maintains the lists of pending pages to read/write for a given object
1829  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1830  * to quickly find objects that are ready to send an RPC. */
1831 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1832                          int cmd)
1833 {
1834         int optimal;
1835         ENTRY;
1836
1837         if (lop->lop_num_pending == 0)
1838                 RETURN(0);
1839
1840         /* if we have an invalid import we want to drain the queued pages
1841          * by forcing them through rpcs that immediately fail and complete
1842          * the pages.  recovery relies on this to empty the queued pages
1843          * before canceling the locks and evicting down the llite pages */
1844         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1845                 RETURN(1);
1846
1847         /* stream rpcs in queue order as long as as there is an urgent page
1848          * queued.  this is our cheap solution for good batching in the case
1849          * where writepage marks some random page in the middle of the file
1850          * as urgent because of, say, memory pressure */
1851         if (!list_empty(&lop->lop_urgent)) {
1852                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1853                 RETURN(1);
1854         }
1855         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1856         optimal = cli->cl_max_pages_per_rpc;
1857         if (cmd & OBD_BRW_WRITE) {
1858                 /* trigger a write rpc stream as long as there are dirtiers
1859                  * waiting for space.  as they're waiting, they're not going to
1860                  * create more pages to coallesce with what's waiting.. */
1861                 if (!list_empty(&cli->cl_cache_waiters)) {
1862                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1863                         RETURN(1);
1864                 }
1865                 /* +16 to avoid triggering rpcs that would want to include pages
1866                  * that are being queued but which can't be made ready until
1867                  * the queuer finishes with the page. this is a wart for
1868                  * llite::commit_write() */
1869                 optimal += 16;
1870         }
1871         if (lop->lop_num_pending >= optimal)
1872                 RETURN(1);
1873
1874         RETURN(0);
1875 }
1876
1877 static void on_list(struct list_head *item, struct list_head *list,
1878                     int should_be_on)
1879 {
1880         if (list_empty(item) && should_be_on)
1881                 list_add_tail(item, list);
1882         else if (!list_empty(item) && !should_be_on)
1883                 list_del_init(item);
1884 }
1885
1886 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1887  * can find pages to build into rpcs quickly */
1888 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1889 {
1890         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1891                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1892                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1893
1894         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1895                 loi->loi_write_lop.lop_num_pending);
1896
1897         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1898                 loi->loi_read_lop.lop_num_pending);
1899 }
1900
1901 static void lop_update_pending(struct client_obd *cli,
1902                                struct loi_oap_pages *lop, int cmd, int delta)
1903 {
1904         lop->lop_num_pending += delta;
1905         if (cmd & OBD_BRW_WRITE)
1906                 cli->cl_pending_w_pages += delta;
1907         else
1908                 cli->cl_pending_r_pages += delta;
1909 }
1910
1911 /* this is called when a sync waiter receives an interruption.  Its job is to
1912  * get the caller woken as soon as possible.  If its page hasn't been put in an
1913  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1914  * desiring interruption which will forcefully complete the rpc once the rpc
1915  * has timed out */
1916 static void osc_occ_interrupted(struct oig_callback_context *occ)
1917 {
1918         struct osc_async_page *oap;
1919         struct loi_oap_pages *lop;
1920         struct lov_oinfo *loi;
1921         ENTRY;
1922
1923         /* XXX member_of() */
1924         oap = list_entry(occ, struct osc_async_page, oap_occ);
1925
1926         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1927
1928         oap->oap_interrupted = 1;
1929
1930         /* ok, it's been put in an rpc. only one oap gets a request reference */
1931         if (oap->oap_request != NULL) {
1932                 ptlrpc_mark_interrupted(oap->oap_request);
1933                 ptlrpcd_wake(oap->oap_request);
1934                 GOTO(unlock, 0);
1935         }
1936
1937         /* we don't get interruption callbacks until osc_trigger_group_io()
1938          * has been called and put the sync oaps in the pending/urgent lists.*/
1939         if (!list_empty(&oap->oap_pending_item)) {
1940                 list_del_init(&oap->oap_pending_item);
1941                 list_del_init(&oap->oap_urgent_item);
1942
1943                 loi = oap->oap_loi;
1944                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1945                         &loi->loi_write_lop : &loi->loi_read_lop;
1946                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1947                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1948
1949                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1950                 oap->oap_oig = NULL;
1951         }
1952
1953 unlock:
1954         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1955 }
1956
1957 /* this is trying to propogate async writeback errors back up to the
1958  * application.  As an async write fails we record the error code for later if
1959  * the app does an fsync.  As long as errors persist we force future rpcs to be
1960  * sync so that the app can get a sync error and break the cycle of queueing
1961  * pages for which writeback will fail. */
1962 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1963                            int rc)
1964 {
1965         if (rc) {
1966                 if (!ar->ar_rc)
1967                         ar->ar_rc = rc;
1968
1969                 ar->ar_force_sync = 1;
1970                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1971                 return;
1972
1973         }
1974
1975         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1976                 ar->ar_force_sync = 0;
1977 }
1978
1979 static void osc_oap_to_pending(struct osc_async_page *oap)
1980 {
1981         struct loi_oap_pages *lop;
1982
1983         if (oap->oap_cmd & OBD_BRW_WRITE)
1984                 lop = &oap->oap_loi->loi_write_lop;
1985         else
1986                 lop = &oap->oap_loi->loi_read_lop;
1987
1988         if (oap->oap_async_flags & ASYNC_URGENT)
1989                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1990         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1991         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1992 }
1993
1994 /* this must be called holding the loi list lock to give coverage to exit_cache,
1995  * async_flag maintenance, and oap_request */
1996 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1997                               struct osc_async_page *oap, int sent, int rc)
1998 {
1999         __u64 xid = 0;
2000
2001         ENTRY;
2002         if (oap->oap_request != NULL) {
2003                 xid = ptlrpc_req_xid(oap->oap_request);
2004                 ptlrpc_req_finished(oap->oap_request);
2005                 oap->oap_request = NULL;
2006         }
2007
2008         oap->oap_async_flags = 0;
2009         oap->oap_interrupted = 0;
2010
2011         if (oap->oap_cmd & OBD_BRW_WRITE) {
2012                 osc_process_ar(&cli->cl_ar, xid, rc);
2013                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2014         }
2015
2016         if (rc == 0 && oa != NULL) {
2017                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2018                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2019                 if (oa->o_valid & OBD_MD_FLMTIME)
2020                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2021                 if (oa->o_valid & OBD_MD_FLATIME)
2022                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2023                 if (oa->o_valid & OBD_MD_FLCTIME)
2024                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2025         }
2026
2027         if (oap->oap_oig) {
2028                 osc_exit_cache(cli, oap, sent);
2029                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2030                 oap->oap_oig = NULL;
2031                 EXIT;
2032                 return;
2033         }
2034
2035         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2036                                                 oap->oap_cmd, oa, rc);
2037
2038         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2039          * I/O on the page could start, but OSC calls it under lock
2040          * and thus we can add oap back to pending safely */
2041         if (rc)
2042                 /* upper layer wants to leave the page on pending queue */
2043                 osc_oap_to_pending(oap);
2044         else
2045                 osc_exit_cache(cli, oap, sent);
2046         EXIT;
2047 }
2048
2049 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2050 {
2051         struct osc_async_page *oap, *tmp;
2052         struct osc_brw_async_args *aa = data;
2053         struct client_obd *cli;
2054         ENTRY;
2055
2056         rc = osc_brw_fini_request(req, rc);
2057         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2058         if (osc_recoverable_error(rc)) {
2059                 rc = osc_brw_redo_request(req, aa);
2060                 if (rc == 0)
2061                         RETURN(0);
2062         }
2063
2064         cli = aa->aa_cli;
2065
2066         client_obd_list_lock(&cli->cl_loi_list_lock);
2067
2068         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2069          * is called so we know whether to go to sync BRWs or wait for more
2070          * RPCs to complete */
2071         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2072                 cli->cl_w_in_flight--;
2073         else
2074                 cli->cl_r_in_flight--;
2075
2076         /* the caller may re-use the oap after the completion call so
2077          * we need to clean it up a little */
2078         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2079                 list_del_init(&oap->oap_rpc_item);
2080                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2081         }
2082
2083         osc_wake_cache_waiters(cli);
2084         osc_check_rpcs(cli);
2085
2086         client_obd_list_unlock(&cli->cl_loi_list_lock);
2087
2088         OBDO_FREE(aa->aa_oa);
2089         
2090         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2091         RETURN(rc);
2092 }
2093
2094 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2095                                             struct list_head *rpc_list,
2096                                             int page_count, int cmd)
2097 {
2098         struct ptlrpc_request *req;
2099         struct brw_page **pga = NULL;
2100         struct osc_brw_async_args *aa;
2101         struct obdo *oa = NULL;
2102         struct obd_async_page_ops *ops = NULL;
2103         void *caller_data = NULL;
2104         struct obd_capa *ocapa;
2105         struct osc_async_page *oap;
2106         int i, rc;
2107
2108         ENTRY;
2109         LASSERT(!list_empty(rpc_list));
2110
2111         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2112         if (pga == NULL)
2113                 RETURN(ERR_PTR(-ENOMEM));
2114
2115         OBDO_ALLOC(oa);
2116         if (oa == NULL)
2117                 GOTO(out, req = ERR_PTR(-ENOMEM));
2118
2119         i = 0;
2120         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2121                 if (ops == NULL) {
2122                         ops = oap->oap_caller_ops;
2123                         caller_data = oap->oap_caller_data;
2124                 }
2125                 pga[i] = &oap->oap_brw_page;
2126                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2128                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2129                 i++;
2130         }
2131
2132         /* always get the data for the obdo for the rpc */
2133         LASSERT(ops != NULL);
2134         ops->ap_fill_obdo(caller_data, cmd, oa);
2135         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2136
2137         sort_brw_pages(pga, page_count);
2138         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2139                                   pga, &req, ocapa);
2140         capa_put(ocapa);
2141         if (rc != 0) {
2142                 CERROR("prep_req failed: %d\n", rc);
2143                 GOTO(out, req = ERR_PTR(rc));
2144         }
2145
2146         /* Need to update the timestamps after the request is built in case
2147          * we race with setattr (locally or in queue at OST).  If OST gets
2148          * later setattr before earlier BRW (as determined by the request xid),
2149          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2150          * way to do this in a single call.  bug 10150 */
2151         ops->ap_update_obdo(caller_data, cmd, oa,
2152                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2153
2154         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2155         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2156         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2157         list_splice(rpc_list, &aa->aa_oaps);
2158         CFS_INIT_LIST_HEAD(rpc_list);
2159
2160 out:
2161         if (IS_ERR(req)) {
2162                 if (oa)
2163                         OBDO_FREE(oa);
2164                 if (pga)
2165                         OBD_FREE(pga, sizeof(*pga) * page_count);
2166         }
2167         RETURN(req);
2168 }
2169
2170 /* the loi lock is held across this function but it's allowed to release
2171  * and reacquire it during its work */
2172 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2173                             int cmd, struct loi_oap_pages *lop)
2174 {
2175         struct ptlrpc_request *req;
2176         obd_count page_count = 0;
2177         struct osc_async_page *oap = NULL, *tmp;
2178         struct osc_brw_async_args *aa;
2179         struct obd_async_page_ops *ops;
2180         CFS_LIST_HEAD(rpc_list);
2181         unsigned int ending_offset;
2182         unsigned  starting_offset = 0;
2183         int srvlock = 0;
2184         ENTRY;
2185
2186         /* first we find the pages we're allowed to work with */
2187         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2188                                  oap_pending_item) {
2189                 ops = oap->oap_caller_ops;
2190
2191                 LASSERT(oap->oap_magic == OAP_MAGIC);
2192
2193                 if (page_count != 0 &&
2194                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2195                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2196                                " oap %p, page %p, srvlock %u\n",
2197                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2198                         break;
2199                 }
2200                 /* in llite being 'ready' equates to the page being locked
2201                  * until completion unlocks it.  commit_write submits a page
2202                  * as not ready because its unlock will happen unconditionally
2203                  * as the call returns.  if we race with commit_write giving
2204                  * us that page we dont' want to create a hole in the page
2205                  * stream, so we stop and leave the rpc to be fired by
2206                  * another dirtier or kupdated interval (the not ready page
2207                  * will still be on the dirty list).  we could call in
2208                  * at the end of ll_file_write to process the queue again. */
2209                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2210                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2211                         if (rc < 0)
2212                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2213                                                 "instead of ready\n", oap,
2214                                                 oap->oap_page, rc);
2215                         switch (rc) {
2216                         case -EAGAIN:
2217                                 /* llite is telling us that the page is still
2218                                  * in commit_write and that we should try
2219                                  * and put it in an rpc again later.  we
2220                                  * break out of the loop so we don't create
2221                                  * a hole in the sequence of pages in the rpc
2222                                  * stream.*/
2223                                 oap = NULL;
2224                                 break;
2225                         case -EINTR:
2226                                 /* the io isn't needed.. tell the checks
2227                                  * below to complete the rpc with EINTR */
2228                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2229                                 oap->oap_count = -EINTR;
2230                                 break;
2231                         case 0:
2232                                 oap->oap_async_flags |= ASYNC_READY;
2233                                 break;
2234                         default:
2235                                 LASSERTF(0, "oap %p page %p returned %d "
2236                                             "from make_ready\n", oap,
2237                                             oap->oap_page, rc);
2238                                 break;
2239                         }
2240                 }
2241                 if (oap == NULL)
2242                         break;
2243                 /*
2244                  * Page submitted for IO has to be locked. Either by
2245                  * ->ap_make_ready() or by higher layers.
2246                  *
2247                  * XXX nikita: this assertion should be adjusted when lustre
2248                  * starts using PG_writeback for pages being written out.
2249                  */
2250 #if defined(__KERNEL__) && defined(__linux__)
2251                 LASSERT(PageLocked(oap->oap_page));
2252 #endif
2253                 /* If there is a gap at the start of this page, it can't merge
2254                  * with any previous page, so we'll hand the network a
2255                  * "fragmented" page array that it can't transfer in 1 RDMA */
2256                 if (page_count != 0 && oap->oap_page_off != 0)
2257                         break;
2258
2259                 /* take the page out of our book-keeping */
2260                 list_del_init(&oap->oap_pending_item);
2261                 lop_update_pending(cli, lop, cmd, -1);
2262                 list_del_init(&oap->oap_urgent_item);
2263
2264                 if (page_count == 0)
2265                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2266                                           (PTLRPC_MAX_BRW_SIZE - 1);
2267
2268                 /* ask the caller for the size of the io as the rpc leaves. */
2269                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2270                         oap->oap_count =
2271                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2272                 if (oap->oap_count <= 0) {
2273                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2274                                oap->oap_count);
2275                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2276                         continue;
2277                 }
2278
2279                 /* now put the page back in our accounting */
2280                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2281                 if (page_count == 0)
2282                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2283                 if (++page_count >= cli->cl_max_pages_per_rpc)
2284                         break;
2285
2286                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2287                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2288                  * have the same alignment as the initial writes that allocated
2289                  * extents on the server. */
2290                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2291                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2292                 if (ending_offset == 0)
2293                         break;
2294
2295                 /* If there is a gap at the end of this page, it can't merge
2296                  * with any subsequent pages, so we'll hand the network a
2297                  * "fragmented" page array that it can't transfer in 1 RDMA */
2298                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2299                         break;
2300         }
2301
2302         osc_wake_cache_waiters(cli);
2303
2304         if (page_count == 0)
2305                 RETURN(0);
2306
2307         loi_list_maint(cli, loi);
2308
2309         client_obd_list_unlock(&cli->cl_loi_list_lock);
2310
2311         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2312         if (IS_ERR(req)) {
2313                 /* this should happen rarely and is pretty bad, it makes the
2314                  * pending list not follow the dirty order */
2315                 client_obd_list_lock(&cli->cl_loi_list_lock);
2316                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2317                         list_del_init(&oap->oap_rpc_item);
2318
2319                         /* queued sync pages can be torn down while the pages
2320                          * were between the pending list and the rpc */
2321                         if (oap->oap_interrupted) {
2322                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2323                                 osc_ap_completion(cli, NULL, oap, 0,
2324                                                   oap->oap_count);
2325                                 continue;
2326                         }
2327                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2328                 }
2329                 loi_list_maint(cli, loi);
2330                 RETURN(PTR_ERR(req));
2331         }
2332
2333         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2334
2335         if (cmd == OBD_BRW_READ) {
2336                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2337                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2338                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2339                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2340                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2341         } else {
2342                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2343                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2344                                  cli->cl_w_in_flight);
2345                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2346                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2347                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2348         }
2349
2350         client_obd_list_lock(&cli->cl_loi_list_lock);
2351
2352         if (cmd == OBD_BRW_READ)
2353                 cli->cl_r_in_flight++;
2354         else
2355                 cli->cl_w_in_flight++;
2356
2357         /* queued sync pages can be torn down while the pages
2358          * were between the pending list and the rpc */
2359         tmp = NULL;
2360         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2361                 /* only one oap gets a request reference */
2362                 if (tmp == NULL)
2363                         tmp = oap;
2364                 if (oap->oap_interrupted && !req->rq_intr) {
2365                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2366                                oap, req);
2367                         ptlrpc_mark_interrupted(req);
2368                 }
2369         }
2370         if (tmp != NULL)
2371                 tmp->oap_request = ptlrpc_request_addref(req);
2372
2373         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2374                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2375
2376         req->rq_interpret_reply = brw_interpret_oap;
2377         ptlrpcd_add_req(req);
2378         RETURN(1);
2379 }
2380
2381 #define LOI_DEBUG(LOI, STR, args...)                                     \
2382         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2383                !list_empty(&(LOI)->loi_cli_item),                        \
2384                (LOI)->loi_write_lop.lop_num_pending,                     \
2385                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2386                (LOI)->loi_read_lop.lop_num_pending,                      \
2387                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2388                args)                                                     \
2389
2390 /* This is called by osc_check_rpcs() to find which objects have pages that
2391  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2392 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2393 {
2394         ENTRY;
2395         /* first return all objects which we already know to have
2396          * pages ready to be stuffed into rpcs */
2397         if (!list_empty(&cli->cl_loi_ready_list))
2398                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2399                                   struct lov_oinfo, loi_cli_item));
2400
2401         /* then if we have cache waiters, return all objects with queued
2402          * writes.  This is especially important when many small files
2403          * have filled up the cache and not been fired into rpcs because
2404          * they don't pass the nr_pending/object threshhold */
2405         if (!list_empty(&cli->cl_cache_waiters) &&
2406             !list_empty(&cli->cl_loi_write_list))
2407                 RETURN(list_entry(cli->cl_loi_write_list.next,
2408                                   struct lov_oinfo, loi_write_item));
2409
2410         /* then return all queued objects when we have an invalid import
2411          * so that they get flushed */
2412         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2413                 if (!list_empty(&cli->cl_loi_write_list))
2414                         RETURN(list_entry(cli->cl_loi_write_list.next,
2415                                           struct lov_oinfo, loi_write_item));
2416                 if (!list_empty(&cli->cl_loi_read_list))
2417                         RETURN(list_entry(cli->cl_loi_read_list.next,
2418                                           struct lov_oinfo, loi_read_item));
2419         }
2420         RETURN(NULL);
2421 }
2422
2423 /* called with the loi list lock held */
2424 static void osc_check_rpcs(struct client_obd *cli)
2425 {
2426         struct lov_oinfo *loi;
2427         int rc = 0, race_counter = 0;
2428         ENTRY;
2429
2430         while ((loi = osc_next_loi(cli)) != NULL) {
2431                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2432
2433                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2434                         break;
2435
2436                 /* attempt some read/write balancing by alternating between
2437                  * reads and writes in an object.  The makes_rpc checks here
2438                  * would be redundant if we were getting read/write work items
2439                  * instead of objects.  we don't want send_oap_rpc to drain a
2440                  * partial read pending queue when we're given this object to
2441                  * do io on writes while there are cache waiters */
2442                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2443                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2444                                               &loi->loi_write_lop);
2445                         if (rc < 0)
2446                                 break;
2447                         if (rc > 0)
2448                                 race_counter = 0;
2449                         else
2450                                 race_counter++;
2451                 }
2452                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2453                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2454                                               &loi->loi_read_lop);
2455                         if (rc < 0)
2456                                 break;
2457                         if (rc > 0)
2458                                 race_counter = 0;
2459                         else
2460                                 race_counter++;
2461                 }
2462
2463                 /* attempt some inter-object balancing by issueing rpcs
2464                  * for each object in turn */
2465                 if (!list_empty(&loi->loi_cli_item))
2466                         list_del_init(&loi->loi_cli_item);
2467                 if (!list_empty(&loi->loi_write_item))
2468                         list_del_init(&loi->loi_write_item);
2469                 if (!list_empty(&loi->loi_read_item))
2470                         list_del_init(&loi->loi_read_item);
2471
2472                 loi_list_maint(cli, loi);
2473
2474                 /* send_oap_rpc fails with 0 when make_ready tells it to
2475                  * back off.  llite's make_ready does this when it tries
2476                  * to lock a page queued for write that is already locked.
2477                  * we want to try sending rpcs from many objects, but we
2478                  * don't want to spin failing with 0.  */
2479                 if (race_counter == 10)
2480                         break;
2481         }
2482         EXIT;
2483 }
2484
2485 /* we're trying to queue a page in the osc so we're subject to the
2486  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2487  * If the osc's queued pages are already at that limit, then we want to sleep
2488  * until there is space in the osc's queue for us.  We also may be waiting for
2489  * write credits from the OST if there are RPCs in flight that may return some
2490  * before we fall back to sync writes.
2491  *
2492  * We need this know our allocation was granted in the presence of signals */
2493 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2494 {
2495         int rc;
2496         ENTRY;
2497         client_obd_list_lock(&cli->cl_loi_list_lock);
2498         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2499         client_obd_list_unlock(&cli->cl_loi_list_lock);
2500         RETURN(rc);
2501 };
2502
2503 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2504  * grant or cache space. */
2505 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2506                            struct osc_async_page *oap)
2507 {
2508         struct osc_cache_waiter ocw;
2509         struct l_wait_info lwi = { 0 };
2510
2511         ENTRY;
2512
2513         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2514                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2515                cli->cl_dirty_max, obd_max_dirty_pages,
2516                cli->cl_lost_grant, cli->cl_avail_grant);
2517
2518         /* force the caller to try sync io.  this can jump the list
2519          * of queued writes and create a discontiguous rpc stream */
2520         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2521             loi->loi_ar.ar_force_sync)
2522                 RETURN(-EDQUOT);
2523
2524         /* Hopefully normal case - cache space and write credits available */
2525         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2526             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2527             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2528                 /* account for ourselves */
2529                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2530                 RETURN(0);
2531         }
2532
2533         /* Make sure that there are write rpcs in flight to wait for.  This
2534          * is a little silly as this object may not have any pending but
2535          * other objects sure might. */
2536         if (cli->cl_w_in_flight) {
2537                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2538                 cfs_waitq_init(&ocw.ocw_waitq);
2539                 ocw.ocw_oap = oap;
2540                 ocw.ocw_rc = 0;
2541
2542                 loi_list_maint(cli, loi);
2543                 osc_check_rpcs(cli);
2544                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2545
2546                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2547                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2548
2549                 client_obd_list_lock(&cli->cl_loi_list_lock);
2550                 if (!list_empty(&ocw.ocw_entry)) {
2551                         list_del(&ocw.ocw_entry);
2552                         RETURN(-EINTR);
2553                 }
2554                 RETURN(ocw.ocw_rc);
2555         }
2556
2557         RETURN(-EDQUOT);
2558 }
2559
2560 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2561                         struct lov_oinfo *loi, cfs_page_t *page,
2562                         obd_off offset, struct obd_async_page_ops *ops,
2563                         void *data, void **res)
2564 {
2565         struct osc_async_page *oap;
2566         ENTRY;
2567
2568         if (!page)
2569                 return size_round(sizeof(*oap));
2570
2571         oap = *res;
2572         oap->oap_magic = OAP_MAGIC;
2573         oap->oap_cli = &exp->exp_obd->u.cli;
2574         oap->oap_loi = loi;
2575
2576         oap->oap_caller_ops = ops;
2577         oap->oap_caller_data = data;
2578
2579         oap->oap_page = page;
2580         oap->oap_obj_off = offset;
2581
2582         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2583         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2584         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2585
2586         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2587
2588         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2589         RETURN(0);
2590 }
2591
2592 struct osc_async_page *oap_from_cookie(void *cookie)
2593 {
2594         struct osc_async_page *oap = cookie;
2595         if (oap->oap_magic != OAP_MAGIC)
2596                 return ERR_PTR(-EINVAL);
2597         return oap;
2598 };
2599
2600 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2601                               struct lov_oinfo *loi, void *cookie,
2602                               int cmd, obd_off off, int count,
2603                               obd_flag brw_flags, enum async_flags async_flags)
2604 {
2605         struct client_obd *cli = &exp->exp_obd->u.cli;
2606         struct osc_async_page *oap;
2607         int rc = 0;
2608         ENTRY;
2609
2610         oap = oap_from_cookie(cookie);
2611         if (IS_ERR(oap))
2612                 RETURN(PTR_ERR(oap));
2613
2614         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2615                 RETURN(-EIO);
2616
2617         if (!list_empty(&oap->oap_pending_item) ||
2618             !list_empty(&oap->oap_urgent_item) ||
2619             !list_empty(&oap->oap_rpc_item))
2620                 RETURN(-EBUSY);
2621
2622         /* check if the file's owner/group is over quota */
2623 #ifdef HAVE_QUOTA_SUPPORT
2624         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2625                 struct obd_async_page_ops *ops;
2626                 struct obdo *oa;
2627
2628                 OBDO_ALLOC(oa);
2629                 if (oa == NULL)
2630                         RETURN(-ENOMEM);
2631
2632                 ops = oap->oap_caller_ops;
2633                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2634                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2635                     NO_QUOTA)
2636                         rc = -EDQUOT;
2637
2638                 OBDO_FREE(oa);
2639                 if (rc)
2640                         RETURN(rc);
2641         }
2642 #endif
2643
2644         if (loi == NULL)
2645                 loi = lsm->lsm_oinfo[0];
2646
2647         client_obd_list_lock(&cli->cl_loi_list_lock);
2648
2649         oap->oap_cmd = cmd;
2650         oap->oap_page_off = off;
2651         oap->oap_count = count;
2652         oap->oap_brw_flags = brw_flags;
2653         oap->oap_async_flags = async_flags;
2654
2655         if (cmd & OBD_BRW_WRITE) {
2656                 rc = osc_enter_cache(cli, loi, oap);
2657                 if (rc) {
2658                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2659                         RETURN(rc);
2660                 }
2661         }
2662
2663         osc_oap_to_pending(oap);
2664         loi_list_maint(cli, loi);
2665
2666         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2667                   cmd);
2668
2669         osc_check_rpcs(cli);
2670         client_obd_list_unlock(&cli->cl_loi_list_lock);
2671
2672         RETURN(0);
2673 }
2674
2675 /* aka (~was & now & flag), but this is more clear :) */
2676 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2677
2678 static int osc_set_async_flags(struct obd_export *exp,
2679                                struct lov_stripe_md *lsm,
2680                                struct lov_oinfo *loi, void *cookie,
2681                                obd_flag async_flags)
2682 {
2683         struct client_obd *cli = &exp->exp_obd->u.cli;
2684         struct loi_oap_pages *lop;
2685         struct osc_async_page *oap;
2686         int rc = 0;
2687         ENTRY;
2688
2689         oap = oap_from_cookie(cookie);
2690         if (IS_ERR(oap))
2691                 RETURN(PTR_ERR(oap));
2692
2693         /*
2694          * bug 7311: OST-side locking is only supported for liblustre for now
2695          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2696          * implementation has to handle case where OST-locked page was picked
2697          * up by, e.g., ->writepage().
2698          */
2699         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2700         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2701                                      * tread here. */
2702
2703         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2704                 RETURN(-EIO);
2705
2706         if (loi == NULL)
2707                 loi = lsm->lsm_oinfo[0];
2708
2709         if (oap->oap_cmd & OBD_BRW_WRITE) {
2710                 lop = &loi->loi_write_lop;
2711         } else {
2712                 lop = &loi->loi_read_lop;
2713         }
2714
2715         client_obd_list_lock(&cli->cl_loi_list_lock);
2716
2717         if (list_empty(&oap->oap_pending_item))
2718                 GOTO(out, rc = -EINVAL);
2719
2720         if ((oap->oap_async_flags & async_flags) == async_flags)
2721                 GOTO(out, rc = 0);
2722
2723         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2724                 oap->oap_async_flags |= ASYNC_READY;
2725
2726         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2727                 if (list_empty(&oap->oap_rpc_item)) {
2728                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2729                         loi_list_maint(cli, loi);
2730                 }
2731         }
2732
2733         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2734                         oap->oap_async_flags);
2735 out:
2736         osc_check_rpcs(cli);
2737         client_obd_list_unlock(&cli->cl_loi_list_lock);
2738         RETURN(rc);
2739 }
2740
2741 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2742                              struct lov_oinfo *loi,
2743                              struct obd_io_group *oig, void *cookie,
2744                              int cmd, obd_off off, int count,
2745                              obd_flag brw_flags,
2746                              obd_flag async_flags)
2747 {
2748         struct client_obd *cli = &exp->exp_obd->u.cli;
2749         struct osc_async_page *oap;
2750         struct loi_oap_pages *lop;
2751         int rc = 0;
2752         ENTRY;
2753
2754         oap = oap_from_cookie(cookie);
2755         if (IS_ERR(oap))
2756                 RETURN(PTR_ERR(oap));
2757
2758         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2759                 RETURN(-EIO);
2760
2761         if (!list_empty(&oap->oap_pending_item) ||
2762             !list_empty(&oap->oap_urgent_item) ||
2763             !list_empty(&oap->oap_rpc_item))
2764                 RETURN(-EBUSY);
2765
2766         if (loi == NULL)
2767                 loi = lsm->lsm_oinfo[0];
2768
2769         client_obd_list_lock(&cli->cl_loi_list_lock);
2770
2771         oap->oap_cmd = cmd;
2772         oap->oap_page_off = off;
2773         oap->oap_count = count;
2774         oap->oap_brw_flags = brw_flags;
2775         oap->oap_async_flags = async_flags;
2776
2777         if (cmd & OBD_BRW_WRITE)
2778                 lop = &loi->loi_write_lop;
2779         else
2780                 lop = &loi->loi_read_lop;
2781
2782         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2783         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2784                 oap->oap_oig = oig;
2785                 rc = oig_add_one(oig, &oap->oap_occ);
2786         }
2787
2788         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2789                   oap, oap->oap_page, rc);
2790
2791         client_obd_list_unlock(&cli->cl_loi_list_lock);
2792
2793         RETURN(rc);
2794 }
2795
2796 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2797                                  struct loi_oap_pages *lop, int cmd)
2798 {
2799         struct list_head *pos, *tmp;
2800         struct osc_async_page *oap;
2801
2802         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2803                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2804                 list_del(&oap->oap_pending_item);
2805                 osc_oap_to_pending(oap);
2806         }
2807         loi_list_maint(cli, loi);
2808 }
2809
2810 static int osc_trigger_group_io(struct obd_export *exp,
2811                                 struct lov_stripe_md *lsm,
2812                                 struct lov_oinfo *loi,
2813                                 struct obd_io_group *oig)
2814 {
2815         struct client_obd *cli = &exp->exp_obd->u.cli;
2816         ENTRY;
2817
2818         if (loi == NULL)
2819                 loi = lsm->lsm_oinfo[0];
2820
2821         client_obd_list_lock(&cli->cl_loi_list_lock);
2822
2823         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2824         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2825
2826         osc_check_rpcs(cli);
2827         client_obd_list_unlock(&cli->cl_loi_list_lock);
2828
2829         RETURN(0);
2830 }
2831
2832 static int osc_teardown_async_page(struct obd_export *exp,
2833                                    struct lov_stripe_md *lsm,
2834                                    struct lov_oinfo *loi, void *cookie)
2835 {
2836         struct client_obd *cli = &exp->exp_obd->u.cli;
2837         struct loi_oap_pages *lop;
2838         struct osc_async_page *oap;
2839         int rc = 0;
2840         ENTRY;
2841
2842         oap = oap_from_cookie(cookie);
2843         if (IS_ERR(oap))
2844                 RETURN(PTR_ERR(oap));
2845
2846         if (loi == NULL)
2847                 loi = lsm->lsm_oinfo[0];
2848
2849         if (oap->oap_cmd & OBD_BRW_WRITE) {
2850                 lop = &loi->loi_write_lop;
2851         } else {
2852                 lop = &loi->loi_read_lop;
2853         }
2854
2855         client_obd_list_lock(&cli->cl_loi_list_lock);
2856
2857         if (!list_empty(&oap->oap_rpc_item))
2858                 GOTO(out, rc = -EBUSY);
2859
2860         osc_exit_cache(cli, oap, 0);
2861         osc_wake_cache_waiters(cli);
2862
2863         if (!list_empty(&oap->oap_urgent_item)) {
2864                 list_del_init(&oap->oap_urgent_item);
2865                 oap->oap_async_flags &= ~ASYNC_URGENT;
2866         }
2867         if (!list_empty(&oap->oap_pending_item)) {
2868                 list_del_init(&oap->oap_pending_item);
2869                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2870         }
2871         loi_list_maint(cli, loi);
2872
2873         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2874 out:
2875         client_obd_list_unlock(&cli->cl_loi_list_lock);
2876         RETURN(rc);
2877 }
2878
2879 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2880                                     int flags)
2881 {
2882         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2883
2884         if (lock == NULL) {
2885                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2886                 return;
2887         }
2888         lock_res_and_lock(lock);
2889 #if defined (__KERNEL__) && defined (__linux__)
2890         /* Liang XXX: Darwin and Winnt checking should be added */
2891         if (lock->l_ast_data && lock->l_ast_data != data) {
2892                 struct inode *new_inode = data;
2893                 struct inode *old_inode = lock->l_ast_data;
2894                 if (!(old_inode->i_state & I_FREEING))
2895                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2896                 LASSERTF(old_inode->i_state & I_FREEING,
2897                          "Found existing inode %p/%lu/%u state %lu in lock: "
2898                          "setting data to %p/%lu/%u\n", old_inode,
2899                          old_inode->i_ino, old_inode->i_generation,
2900                          old_inode->i_state,
2901                          new_inode, new_inode->i_ino, new_inode->i_generation);
2902         }
2903 #endif
2904         lock->l_ast_data = data;
2905         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2906         unlock_res_and_lock(lock);
2907         LDLM_LOCK_PUT(lock);
2908 }
2909
2910 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2911                              ldlm_iterator_t replace, void *data)
2912 {
2913         struct ldlm_res_id res_id = { .name = {0} };
2914         struct obd_device *obd = class_exp2obd(exp);
2915
2916         res_id.name[0] = lsm->lsm_object_id;
2917         res_id.name[2] = lsm->lsm_object_gr;
2918
2919         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2920         return 0;
2921 }
2922
2923 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2924                             int intent, int rc)
2925 {
2926         ENTRY;
2927
2928         if (intent) {
2929                 /* The request was created before ldlm_cli_enqueue call. */
2930                 if (rc == ELDLM_LOCK_ABORTED) {
2931                         struct ldlm_reply *rep;
2932                         rep = req_capsule_server_get(&req->rq_pill,
2933                                                      &RMF_DLM_REP);
2934
2935                         LASSERT(rep != NULL);
2936                         if (rep->lock_policy_res1)
2937                                 rc = rep->lock_policy_res1;
2938                 }
2939         }
2940
2941         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2942                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2943                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2944                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2945                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2946         }
2947
2948         /* Call the update callback. */
2949         rc = oinfo->oi_cb_up(oinfo, rc);
2950         RETURN(rc);
2951 }
2952
2953 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2954                                  struct osc_enqueue_args *aa, int rc)
2955 {
2956         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2957         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2958         struct ldlm_lock *lock;
2959
2960         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2961          * be valid. */
2962         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2963
2964         /* Complete obtaining the lock procedure. */
2965         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2966                                    aa->oa_ei->ei_mode,
2967                                    &aa->oa_oi->oi_flags,
2968                                    &lsm->lsm_oinfo[0]->loi_lvb,
2969                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2970                                    lustre_swab_ost_lvb,
2971                                    aa->oa_oi->oi_lockh, rc);
2972
2973         /* Complete osc stuff. */
2974         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2975
2976         /* Release the lock for async request. */
2977         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2978                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2979
2980         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2981                  aa->oa_oi->oi_lockh, req, aa);
2982         LDLM_LOCK_PUT(lock);
2983         return rc;
2984 }
2985
2986 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2987  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2988  * other synchronous requests, however keeping some locks and trying to obtain
2989  * others may take a considerable amount of time in a case of ost failure; and
2990  * when other sync requests do not get released lock from a client, the client
2991  * is excluded from the cluster -- such scenarious make the life difficult, so
2992  * release locks just after they are obtained. */
2993 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2994                        struct ldlm_enqueue_info *einfo,
2995                        struct ptlrpc_request_set *rqset)
2996 {
2997         struct ldlm_res_id res_id = { .name = {0} };
2998         struct obd_device *obd = exp->exp_obd;
2999         struct ptlrpc_request *req = NULL;
3000         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3001         ldlm_mode_t mode;
3002         int rc;
3003         ENTRY;
3004
3005         res_id.name[0] = oinfo->oi_md->lsm_object_id;
3006         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
3007
3008         /* Filesystem lock extents are extended to page boundaries so that
3009          * dealing with the page cache is a little smoother.  */
3010         oinfo->oi_policy.l_extent.start -=
3011                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3012         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3013
3014         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3015                 goto no_match;
3016
3017         /* Next, search for already existing extent locks that will cover us */
3018         /* If we're trying to read, we also search for an existing PW lock.  The
3019          * VFS and page cache already protect us locally, so lots of readers/
3020          * writers can share a single PW lock.
3021          *
3022          * There are problems with conversion deadlocks, so instead of
3023          * converting a read lock to a write lock, we'll just enqueue a new
3024          * one.
3025          *
3026          * At some point we should cancel the read lock instead of making them
3027          * send us a blocking callback, but there are problems with canceling
3028          * locks out from other users right now, too. */
3029         mode = einfo->ei_mode;
3030         if (einfo->ei_mode == LCK_PR)
3031                 mode |= LCK_PW;
3032         mode = ldlm_lock_match(obd->obd_namespace,
3033                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3034                                einfo->ei_type, &oinfo->oi_policy, mode,
3035                                oinfo->oi_lockh);
3036         if (mode) {
3037                 /* addref the lock only if not async requests and PW lock is
3038                  * matched whereas we asked for PR. */
3039                 if (!rqset && einfo->ei_mode != mode)
3040                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3041                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3042                                         oinfo->oi_flags);
3043                 if (intent) {
3044                         /* I would like to be able to ASSERT here that rss <=
3045                          * kms, but I can't, for reasons which are explained in
3046                          * lov_enqueue() */
3047                 }
3048
3049                 /* We already have a lock, and it's referenced */
3050                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3051
3052                 /* For async requests, decref the lock. */
3053                 if (einfo->ei_mode != mode)
3054                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3055                 else if (rqset)
3056                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3057
3058                 RETURN(ELDLM_OK);
3059         }
3060
3061  no_match:
3062         if (intent) {
3063                 CFS_LIST_HEAD(cancels);
3064                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3065                                            &RQF_LDLM_ENQUEUE_LVB);
3066                 if (req == NULL)
3067                         RETURN(-ENOMEM);
3068
3069                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3070                 if (rc)
3071                         RETURN(rc);
3072
3073                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3074                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3075                 ptlrpc_request_set_replen(req);
3076         }
3077
3078         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3079         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3080
3081         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3082                               &oinfo->oi_policy, &oinfo->oi_flags,
3083                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3084                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3085                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3086                               rqset ? 1 : 0);
3087         if (rqset) {
3088                 if (!rc) {
3089                         struct osc_enqueue_args *aa;
3090                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3091                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3092                         aa->oa_oi = oinfo;
3093                         aa->oa_ei = einfo;
3094                         aa->oa_exp = exp;
3095
3096                         req->rq_interpret_reply = osc_enqueue_interpret;
3097                         ptlrpc_set_add_req(rqset, req);
3098                 } else if (intent) {
3099                         ptlrpc_req_finished(req);
3100                 }
3101                 RETURN(rc);
3102         }
3103
3104         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3105         if (intent)
3106                 ptlrpc_req_finished(req);
3107
3108         RETURN(rc);
3109 }
3110
3111 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3112                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3113                      int *flags, void *data, struct lustre_handle *lockh)
3114 {
3115         struct ldlm_res_id res_id = { .name = {0} };
3116         struct obd_device *obd = exp->exp_obd;
3117         int lflags = *flags;
3118         ldlm_mode_t rc;
3119         ENTRY;
3120
3121         res_id.name[0] = lsm->lsm_object_id;
3122         res_id.name[2] = lsm->lsm_object_gr;
3123
3124         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3125                 RETURN(-EIO);
3126
3127         /* Filesystem lock extents are extended to page boundaries so that
3128          * dealing with the page cache is a little smoother */
3129         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3130         policy->l_extent.end |= ~CFS_PAGE_MASK;
3131
3132         /* Next, search for already existing extent locks that will cover us */
3133         /* If we're trying to read, we also search for an existing PW lock.  The
3134          * VFS and page cache already protect us locally, so lots of readers/
3135          * writers can share a single PW lock. */
3136         rc = mode;
3137         if (mode == LCK_PR)
3138                 rc |= LCK_PW;
3139         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3140                              &res_id, type, policy, rc, lockh);
3141         if (rc) {
3142                 osc_set_data_with_check(lockh, data, lflags);
3143                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3144                         ldlm_lock_addref(lockh, LCK_PR);
3145                         ldlm_lock_decref(lockh, LCK_PW);
3146                 }
3147                 RETURN(rc);
3148         }
3149         RETURN(rc);
3150 }
3151
3152 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3153                       __u32 mode, struct lustre_handle *lockh)
3154 {
3155         ENTRY;
3156
3157         if (unlikely(mode == LCK_GROUP))
3158                 ldlm_lock_decref_and_cancel(lockh, mode);
3159         else
3160                 ldlm_lock_decref(lockh, mode);
3161
3162         RETURN(0);
3163 }
3164
3165 static int osc_cancel_unused(struct obd_export *exp,
3166                              struct lov_stripe_md *lsm, int flags,
3167                              void *opaque)
3168 {
3169         struct obd_device *obd = class_exp2obd(exp);
3170         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3171
3172         if (lsm != NULL) {
3173                 res_id.name[0] = lsm->lsm_object_id;
3174                 res_id.name[2] = lsm->lsm_object_gr;
3175                 resp = &res_id;
3176         }
3177
3178         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3179 }
3180
3181 static int osc_join_lru(struct obd_export *exp,
3182                         struct lov_stripe_md *lsm, int join)
3183 {
3184         struct obd_device *obd = class_exp2obd(exp);
3185         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3186
3187         if (lsm != NULL) {
3188                 res_id.name[0] = lsm->lsm_object_id;
3189                 res_id.name[2] = lsm->lsm_object_gr;
3190                 resp = &res_id;
3191         }
3192
3193         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3194 }
3195
3196 static int osc_statfs_interpret(struct ptlrpc_request *req,
3197                                 struct osc_async_args *aa, int rc)
3198 {
3199         struct obd_statfs *msfs;
3200         ENTRY;
3201
3202         if (rc != 0)
3203                 GOTO(out, rc);
3204
3205         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3206         if (msfs == NULL) {
3207                 GOTO(out, rc = -EPROTO);
3208         }
3209
3210         *aa->aa_oi->oi_osfs = *msfs;
3211 out:
3212         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3213         RETURN(rc);
3214 }
3215
3216 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3217                             __u64 max_age, struct ptlrpc_request_set *rqset)
3218 {
3219         struct ptlrpc_request *req;
3220         struct osc_async_args *aa;
3221         int                    rc;
3222         ENTRY;
3223
3224         /* We could possibly pass max_age in the request (as an absolute
3225          * timestamp or a "seconds.usec ago") so the target can avoid doing
3226          * extra calls into the filesystem if that isn't necessary (e.g.
3227          * during mount that would help a bit).  Having relative timestamps
3228          * is not so great if request processing is slow, while absolute
3229          * timestamps are not ideal because they need time synchronization. */
3230         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3231         if (req == NULL)
3232                 RETURN(-ENOMEM);
3233
3234         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3235         if (rc) {
3236                 ptlrpc_request_free(req);
3237                 RETURN(rc);
3238         }
3239         ptlrpc_request_set_replen(req);
3240         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3241         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3242                 /* procfs requests not want stat in wait for avoid deadlock */
3243                 req->rq_no_resend = 1;
3244                 req->rq_no_delay = 1;
3245         }
3246
3247         req->rq_interpret_reply = osc_statfs_interpret;
3248         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3249         aa = (struct osc_async_args *)&req->rq_async_args;
3250         aa->aa_oi = oinfo;
3251
3252         ptlrpc_set_add_req(rqset, req);
3253         RETURN(0);
3254 }
3255
3256 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3257                       __u64 max_age, __u32 flags)
3258 {
3259         struct obd_statfs     *msfs;
3260         struct ptlrpc_request *req;
3261         int rc;
3262         ENTRY;
3263
3264         /* We could possibly pass max_age in the request (as an absolute
3265          * timestamp or a "seconds.usec ago") so the target can avoid doing
3266          * extra calls into the filesystem if that isn't necessary (e.g.
3267          * during mount that would help a bit).  Having relative timestamps
3268          * is not so great if request processing is slow, while absolute
3269          * timestamps are not ideal because they need time synchronization. */
3270         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3271         if (req == NULL)
3272                 RETURN(-ENOMEM);
3273
3274         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3275         if (rc) {
3276                 ptlrpc_request_free(req);
3277                 RETURN(rc);
3278         }
3279         ptlrpc_request_set_replen(req);
3280         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3281
3282         if (flags & OBD_STATFS_NODELAY) {
3283                 /* procfs requests not want stat in wait for avoid deadlock */
3284                 req->rq_no_resend = 1;
3285                 req->rq_no_delay = 1;
3286         }
3287
3288         rc = ptlrpc_queue_wait(req);
3289         if (rc)
3290                 GOTO(out, rc);
3291
3292         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3293         if (msfs == NULL) {
3294                 GOTO(out, rc = -EPROTO);
3295         }
3296
3297         *osfs = *msfs;
3298
3299         EXIT;
3300  out:
3301         ptlrpc_req_finished(req);
3302         return rc;
3303 }
3304
3305 /* Retrieve object striping information.
3306  *
3307  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3308  * the maximum number of OST indices which will fit in the user buffer.
3309  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3310  */
3311 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3312 {
3313         struct lov_user_md lum, *lumk;
3314         int rc = 0, lum_size;
3315         ENTRY;
3316
3317         if (!lsm)
3318                 RETURN(-ENODATA);
3319
3320         if (copy_from_user(&lum, lump, sizeof(lum)))
3321                 RETURN(-EFAULT);
3322
3323         if (lum.lmm_magic != LOV_USER_MAGIC)
3324                 RETURN(-EINVAL);
3325
3326         if (lum.lmm_stripe_count > 0) {
3327                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3328                 OBD_ALLOC(lumk, lum_size);
3329                 if (!lumk)
3330                         RETURN(-ENOMEM);
3331
3332                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3333                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3334         } else {
3335                 lum_size = sizeof(lum);
3336                 lumk = &lum;
3337         }
3338
3339         lumk->lmm_object_id = lsm->lsm_object_id;
3340         lumk->lmm_object_gr = lsm->lsm_object_gr;
3341         lumk->lmm_stripe_count = 1;
3342
3343         if (copy_to_user(lump, lumk, lum_size))
3344                 rc = -EFAULT;
3345
3346         if (lumk != &lum)
3347                 OBD_FREE(lumk, lum_size);
3348
3349         RETURN(rc);
3350 }
3351
3352
3353 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3354                          void *karg, void *uarg)
3355 {
3356         struct obd_device *obd = exp->exp_obd;
3357         struct obd_ioctl_data *data = karg;
3358         int err = 0;
3359         ENTRY;
3360
3361         if (!try_module_get(THIS_MODULE)) {
3362                 CERROR("Can't get module. Is it alive?");
3363                 return -EINVAL;
3364         }
3365         switch (cmd) {
3366         case OBD_IOC_LOV_GET_CONFIG: {
3367                 char *buf;
3368                 struct lov_desc *desc;
3369                 struct obd_uuid uuid;
3370
3371                 buf = NULL;
3372                 len = 0;
3373                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3374                         GOTO(out, err = -EINVAL);
3375
3376                 data = (struct obd_ioctl_data *)buf;
3377
3378                 if (sizeof(*desc) > data->ioc_inllen1) {
3379                         obd_ioctl_freedata(buf, len);
3380                         GOTO(out, err = -EINVAL);
3381                 }
3382
3383                 if (data->ioc_inllen2 < sizeof(uuid)) {
3384                         obd_ioctl_freedata(buf, len);
3385                         GOTO(out, err = -EINVAL);
3386                 }
3387
3388                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3389                 desc->ld_tgt_count = 1;
3390                 desc->ld_active_tgt_count = 1;
3391                 desc->ld_default_stripe_count = 1;
3392                 desc->ld_default_stripe_size = 0;
3393                 desc->ld_default_stripe_offset = 0;
3394                 desc->ld_pattern = 0;
3395                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3396
3397                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3398
3399                 err = copy_to_user((void *)uarg, buf, len);
3400                 if (err)
3401                         err = -EFAULT;
3402                 obd_ioctl_freedata(buf, len);
3403                 GOTO(out, err);
3404         }
3405         case LL_IOC_LOV_SETSTRIPE:
3406                 err = obd_alloc_memmd(exp, karg);
3407                 if (err > 0)
3408                         err = 0;
3409                 GOTO(out, err);
3410         case LL_IOC_LOV_GETSTRIPE:
3411                 err = osc_getstripe(karg, uarg);
3412                 GOTO(out, err);
3413         case OBD_IOC_CLIENT_RECOVER:
3414                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3415                                             data->ioc_inlbuf1);
3416                 if (err > 0)
3417                         err = 0;
3418                 GOTO(out, err);
3419         case IOC_OSC_SET_ACTIVE:
3420                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3421                                                data->ioc_offset);
3422                 GOTO(out, err);
3423         case OBD_IOC_POLL_QUOTACHECK:
3424                 err = lquota_poll_check(quota_interface, exp,
3425                                         (struct if_quotacheck *)karg);
3426                 GOTO(out, err);
3427         default:
3428                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3429                        cmd, cfs_curproc_comm());
3430                 GOTO(out, err = -ENOTTY);
3431         }
3432 out:
3433         module_put(THIS_MODULE);
3434         return err;
3435 }
3436
3437 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3438                         void *key, __u32 *vallen, void *val)
3439 {
3440         ENTRY;
3441         if (!vallen || !val)
3442                 RETURN(-EFAULT);
3443
3444         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3445                 __u32 *stripe = val;
3446                 *vallen = sizeof(*stripe);
3447                 *stripe = 0;
3448                 RETURN(0);
3449         } else if (KEY_IS(KEY_LAST_ID)) {
3450                 struct ptlrpc_request *req;
3451                 obd_id                *reply;
3452                 char                  *tmp;
3453                 int                    rc;
3454
3455                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3456                                            &RQF_OST_GET_INFO_LAST_ID);
3457                 if (req == NULL)
3458                         RETURN(-ENOMEM);
3459
3460                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3461                                      RCL_CLIENT, keylen);
3462                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3463                 if (rc) {
3464                         ptlrpc_request_free(req);
3465                         RETURN(rc);
3466                 }
3467
3468                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3469                 memcpy(tmp, key, keylen);
3470
3471                 ptlrpc_request_set_replen(req);
3472                 rc = ptlrpc_queue_wait(req);
3473                 if (rc)
3474                         GOTO(out, rc);
3475
3476                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3477                 if (reply == NULL)
3478                         GOTO(out, rc = -EPROTO);
3479
3480                 *((obd_id *)val) = *reply;
3481         out:
3482                 ptlrpc_req_finished(req);
3483                 RETURN(rc);
3484         }
3485         RETURN(-EINVAL);
3486 }
3487
3488 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3489                                           void *aa, int rc)
3490 {
3491         struct llog_ctxt *ctxt;
3492         struct obd_import *imp = req->rq_import;
3493         ENTRY;
3494
3495         if (rc != 0)
3496                 RETURN(rc);
3497
3498         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3499         if (ctxt) {
3500                 if (rc == 0)
3501                         rc = llog_initiator_connect(ctxt);
3502                 else
3503                         CERROR("cannot establish connection for "
3504                                "ctxt %p: %d\n", ctxt, rc);
3505         }
3506
3507         llog_ctxt_put(ctxt);
3508         spin_lock(&imp->imp_lock);
3509         imp->imp_server_timeout = 1;
3510         imp->imp_pingable = 1;
3511         spin_unlock(&imp->imp_lock);
3512         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3513
3514         RETURN(rc);
3515 }
3516
3517 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3518                               void *key, obd_count vallen, void *val,
3519                               struct ptlrpc_request_set *set)
3520 {
3521         struct ptlrpc_request *req;
3522         struct obd_device     *obd = exp->exp_obd;
3523         struct obd_import     *imp = class_exp2cliimp(exp);
3524         char                  *tmp;
3525         int                    rc;
3526         ENTRY;
3527
3528         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3529
3530         if (KEY_IS(KEY_NEXT_ID)) {
3531                 if (vallen != sizeof(obd_id))
3532                         RETURN(-ERANGE);
3533                 if (val == NULL)
3534                         RETURN(-EINVAL);
3535                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3536                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3537                        exp->exp_obd->obd_name,
3538                        obd->u.cli.cl_oscc.oscc_next_id);
3539
3540                 RETURN(0);
3541         }
3542
3543         if (KEY_IS("unlinked")) {
3544                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3545                 spin_lock(&oscc->oscc_lock);
3546                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3547                 spin_unlock(&oscc->oscc_lock);
3548                 RETURN(0);
3549         }
3550
3551         if (KEY_IS(KEY_INIT_RECOV)) {
3552                 if (vallen != sizeof(int))
3553                         RETURN(-EINVAL);
3554                 spin_lock(&imp->imp_lock);
3555                 imp->imp_initial_recov = *(int *)val;
3556                 spin_unlock(&imp->imp_lock);
3557                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3558                        exp->exp_obd->obd_name,
3559                        imp->imp_initial_recov);
3560                 RETURN(0);
3561         }
3562
3563         if (KEY_IS("checksum")) {
3564                 if (vallen != sizeof(int))
3565                         RETURN(-EINVAL);
3566                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3567                 RETURN(0);
3568         }
3569
3570         if (KEY_IS(KEY_FLUSH_CTX)) {
3571                 sptlrpc_import_flush_my_ctx(imp);
3572                 RETURN(0);
3573         }
3574
3575         if (!set)
3576                 RETURN(-EINVAL);
3577
3578         /* We pass all other commands directly to OST. Since nobody calls osc
3579            methods directly and everybody is supposed to go through LOV, we
3580            assume lov checked invalid values for us.
3581            The only recognised values so far are evict_by_nid and mds_conn.
3582            Even if something bad goes through, we'd get a -EINVAL from OST
3583            anyway. */
3584
3585
3586         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3587         if (req == NULL)
3588                 RETURN(-ENOMEM);
3589
3590         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3591                              RCL_CLIENT, keylen);
3592         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3593                              RCL_CLIENT, vallen);
3594         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3595         if (rc) {
3596                 ptlrpc_request_free(req);
3597                 RETURN(rc);
3598         }
3599
3600         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3601         memcpy(tmp, key, keylen);
3602         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3603         memcpy(tmp, val, vallen);
3604
3605         if (KEY_IS(KEY_MDS_CONN)) {
3606                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3607
3608                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3609                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3610                 LASSERT(oscc->oscc_oa.o_gr > 0);
3611                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3612         }
3613
3614         ptlrpc_request_set_replen(req);
3615         ptlrpc_set_add_req(set, req);
3616         ptlrpc_check_set(set);
3617
3618         RETURN(0);
3619 }
3620
3621
3622 static struct llog_operations osc_size_repl_logops = {
3623         lop_cancel: llog_obd_repl_cancel
3624 };
3625
3626 static struct llog_operations osc_mds_ost_orig_logops;
3627 static int osc_llog_init(struct obd_device *obd, int group,
3628                          struct obd_device *tgt, int count,
3629                          struct llog_catid *catid, struct obd_uuid *uuid)
3630 {
3631         int rc;
3632         ENTRY;
3633         LASSERT(group == OBD_LLOG_GROUP);
3634         spin_lock(&obd->obd_dev_lock);
3635         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3636                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3637                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3638                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3639                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3640                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3641         }
3642         spin_unlock(&obd->obd_dev_lock);
3643
3644         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3645                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3646         if (rc) {
3647                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3648                 GOTO (out, rc);
3649         }
3650
3651         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3652                         NULL, &osc_size_repl_logops);
3653         if (rc)
3654                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3655 out:
3656         if (rc) {
3657                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3658                        obd->obd_name, tgt->obd_name, count, catid, rc);
3659                 CERROR("logid "LPX64":0x%x\n",
3660                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3661         }
3662         RETURN(rc);
3663 }
3664
3665 static int osc_llog_finish(struct obd_device *obd, int count)
3666 {
3667         struct llog_ctxt *ctxt;
3668         int rc = 0, rc2 = 0;
3669         ENTRY;
3670
3671         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3672         if (ctxt)
3673                 rc = llog_cleanup(ctxt);
3674
3675         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3676         if (ctxt)
3677                 rc2 = llog_cleanup(ctxt);
3678         if (!rc)
3679                 rc = rc2;
3680
3681         RETURN(rc);
3682 }
3683
3684 static int osc_reconnect(const struct lu_env *env,
3685                          struct obd_export *exp, struct obd_device *obd,
3686                          struct obd_uuid *cluuid,
3687                          struct obd_connect_data *data)
3688 {
3689         struct client_obd *cli = &obd->u.cli;
3690
3691         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3692                 long lost_grant;
3693
3694                 client_obd_list_lock(&cli->cl_loi_list_lock);
3695                 data->ocd_grant = cli->cl_avail_grant ?:
3696                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3697                 lost_grant = cli->cl_lost_grant;
3698                 cli->cl_lost_grant = 0;
3699                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3700
3701                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3702                        "cl_lost_grant: %ld\n", data->ocd_grant,
3703                        cli->cl_avail_grant, lost_grant);
3704                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3705                        " ocd_grant: %d\n", data->ocd_connect_flags,
3706                        data->ocd_version, data->ocd_grant);
3707         }
3708
3709         RETURN(0);
3710 }
3711
3712 static int osc_disconnect(struct obd_export *exp)
3713 {
3714         struct obd_device *obd = class_exp2obd(exp);
3715         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3716         int rc;
3717
3718         if (obd->u.cli.cl_conn_count == 1)
3719                 /* flush any remaining cancel messages out to the target */
3720                 llog_sync(ctxt, exp);
3721
3722         llog_ctxt_put(ctxt);
3723
3724         rc = client_disconnect_export(exp);
3725         return rc;
3726 }
3727
3728 static int osc_import_event(struct obd_device *obd,
3729                             struct obd_import *imp,
3730                             enum obd_import_event event)
3731 {
3732         struct client_obd *cli;
3733         int rc = 0;
3734
3735         ENTRY;
3736         LASSERT(imp->imp_obd == obd);
3737
3738         switch (event) {
3739         case IMP_EVENT_DISCON: {
3740                 /* Only do this on the MDS OSC's */
3741                 if (imp->imp_server_timeout) {
3742                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3743
3744                         spin_lock(&oscc->oscc_lock);
3745                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3746                         spin_unlock(&oscc->oscc_lock);
3747                 }
3748                 cli = &obd->u.cli;
3749                 client_obd_list_lock(&cli->cl_loi_list_lock);
3750                 cli->cl_avail_grant = 0;
3751                 cli->cl_lost_grant = 0;
3752                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3753                 break;
3754         }
3755         case IMP_EVENT_INACTIVE: {
3756                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3757                 break;
3758         }
3759         case IMP_EVENT_INVALIDATE: {
3760                 struct ldlm_namespace *ns = obd->obd_namespace;
3761
3762                 /* Reset grants */
3763                 cli = &obd->u.cli;
3764                 client_obd_list_lock(&cli->cl_loi_list_lock);
3765                 /* all pages go to failing rpcs due to the invalid import */
3766                 osc_check_rpcs(cli);
3767                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3768
3769                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3770
3771                 break;
3772         }
3773         case IMP_EVENT_ACTIVE: {
3774                 /* Only do this on the MDS OSC's */
3775                 if (imp->imp_server_timeout) {
3776                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3777
3778                         spin_lock(&oscc->oscc_lock);
3779                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3780                         spin_unlock(&oscc->oscc_lock);
3781                 }
3782                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3783                 break;
3784         }
3785         case IMP_EVENT_OCD: {
3786                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3787
3788                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3789                         osc_init_grant(&obd->u.cli, ocd);
3790
3791                 /* See bug 7198 */
3792                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3793                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3794
3795                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3796                 break;
3797         }
3798         default:
3799                 CERROR("Unknown import event %d\n", event);
3800                 LBUG();
3801         }
3802         RETURN(rc);
3803 }
3804
3805 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3806 {
3807         int rc;
3808         ENTRY;
3809
3810         ENTRY;
3811         rc = ptlrpcd_addref();
3812         if (rc)
3813                 RETURN(rc);
3814
3815         rc = client_obd_setup(obd, lcfg);
3816         if (rc) {
3817                 ptlrpcd_decref();
3818         } else {
3819                 struct lprocfs_static_vars lvars = { 0 };
3820                 struct client_obd *cli = &obd->u.cli;
3821
3822                 lprocfs_osc_init_vars(&lvars);
3823                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3824                         lproc_osc_attach_seqstat(obd);
3825                         sptlrpc_lprocfs_cliobd_attach(obd);
3826                         ptlrpc_lprocfs_register_obd(obd);
3827                 }
3828
3829                 oscc_init(obd);
3830                 /* We need to allocate a few requests more, because
3831                    brw_interpret_oap tries to create new requests before freeing
3832                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3833                    reserved, but I afraid that might be too much wasted RAM
3834                    in fact, so 2 is just my guess and still should work. */
3835                 cli->cl_import->imp_rq_pool =
3836                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3837                                             OST_MAXREQSIZE,
3838                                             ptlrpc_add_rqs_to_pool);
3839         }
3840
3841         RETURN(rc);
3842 }
3843
3844 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3845 {
3846         int rc = 0;
3847         ENTRY;
3848
3849         switch (stage) {
3850         case OBD_CLEANUP_EARLY: {
3851                 struct obd_import *imp;
3852                 imp = obd->u.cli.cl_import;
3853                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3854                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3855                 ptlrpc_deactivate_import(imp);
3856                 spin_lock(&imp->imp_lock);
3857                 imp->imp_pingable = 0;
3858                 spin_unlock(&imp->imp_lock);
3859                 break;
3860         }
3861         case OBD_CLEANUP_EXPORTS: {
3862                 /* If we set up but never connected, the
3863                    client import will not have been cleaned. */
3864                 if (obd->u.cli.cl_import) {
3865                         struct obd_import *imp;
3866                         imp = obd->u.cli.cl_import;
3867                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3868                                obd->obd_name);
3869                         ptlrpc_invalidate_import(imp);
3870                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3871                         class_destroy_import(imp);
3872                         obd->u.cli.cl_import = NULL;
3873                 }
3874                 break;
3875         }
3876         case OBD_CLEANUP_SELF_EXP:
3877                 rc = obd_llog_finish(obd, 0);
3878                 if (rc != 0)
3879                         CERROR("failed to cleanup llogging subsystems\n");
3880                 break;
3881         case OBD_CLEANUP_OBD:
3882                 break;
3883         }
3884         RETURN(rc);
3885 }
3886
3887 int osc_cleanup(struct obd_device *obd)
3888 {
3889         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3890         int rc;
3891
3892         ENTRY;
3893         ptlrpc_lprocfs_unregister_obd(obd);
3894         lprocfs_obd_cleanup(obd);
3895
3896         spin_lock(&oscc->oscc_lock);
3897         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3898         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3899         spin_unlock(&oscc->oscc_lock);
3900
3901         /* free memory of osc quota cache */
3902         lquota_cleanup(quota_interface, obd);
3903
3904         rc = client_obd_cleanup(obd);
3905
3906         ptlrpcd_decref();
3907         RETURN(rc);
3908 }
3909
3910 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3911 {
3912         struct lustre_cfg *lcfg = buf;
3913         struct lprocfs_static_vars lvars = { 0 };
3914         int rc = 0;
3915
3916         lprocfs_osc_init_vars(&lvars);
3917
3918         switch (lcfg->lcfg_command) {
3919         case LCFG_SPTLRPC_CONF:
3920                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3921                 break;
3922         default:
3923                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3924                                               lcfg, obd);
3925                 break;
3926         }
3927
3928         return(rc);
3929 }
3930
3931 struct obd_ops osc_obd_ops = {
3932         .o_owner                = THIS_MODULE,
3933         .o_setup                = osc_setup,
3934         .o_precleanup           = osc_precleanup,
3935         .o_cleanup              = osc_cleanup,
3936         .o_add_conn             = client_import_add_conn,
3937         .o_del_conn             = client_import_del_conn,
3938         .o_connect              = client_connect_import,
3939         .o_reconnect            = osc_reconnect,
3940         .o_disconnect           = osc_disconnect,
3941         .o_statfs               = osc_statfs,
3942         .o_statfs_async         = osc_statfs_async,
3943         .o_packmd               = osc_packmd,
3944         .o_unpackmd             = osc_unpackmd,
3945         .o_precreate            = osc_precreate,
3946         .o_create               = osc_create,
3947         .o_destroy              = osc_destroy,
3948         .o_getattr              = osc_getattr,
3949         .o_getattr_async        = osc_getattr_async,
3950         .o_setattr              = osc_setattr,
3951         .o_setattr_async        = osc_setattr_async,
3952         .o_brw                  = osc_brw,
3953         .o_brw_async            = osc_brw_async,
3954         .o_prep_async_page      = osc_prep_async_page,
3955         .o_queue_async_io       = osc_queue_async_io,
3956         .o_set_async_flags      = osc_set_async_flags,
3957         .o_queue_group_io       = osc_queue_group_io,
3958         .o_trigger_group_io     = osc_trigger_group_io,
3959         .o_teardown_async_page  = osc_teardown_async_page,
3960         .o_punch                = osc_punch,
3961         .o_sync                 = osc_sync,
3962         .o_enqueue              = osc_enqueue,
3963         .o_match                = osc_match,
3964         .o_change_cbdata        = osc_change_cbdata,
3965         .o_cancel               = osc_cancel,
3966         .o_cancel_unused        = osc_cancel_unused,
3967         .o_join_lru             = osc_join_lru,
3968         .o_iocontrol            = osc_iocontrol,
3969         .o_get_info             = osc_get_info,
3970         .o_set_info_async       = osc_set_info_async,
3971         .o_import_event         = osc_import_event,
3972         .o_llog_init            = osc_llog_init,
3973         .o_llog_finish          = osc_llog_finish,
3974         .o_process_config       = osc_process_config,
3975 };
3976 int __init osc_init(void)
3977 {
3978         struct lprocfs_static_vars lvars = { 0 };
3979         int rc;
3980         ENTRY;
3981
3982         lprocfs_osc_init_vars(&lvars);
3983
3984         request_module("lquota");
3985         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3986         lquota_init(quota_interface);
3987         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3988
3989         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3990                                  LUSTRE_OSC_NAME, NULL);
3991         if (rc) {
3992                 if (quota_interface)
3993                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3994                 RETURN(rc);
3995         }
3996
3997         RETURN(rc);
3998 }
3999
4000 #ifdef __KERNEL__
4001 static void /*__exit*/ osc_exit(void)
4002 {
4003         lquota_exit(quota_interface);
4004         if (quota_interface)
4005                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4006
4007         class_unregister_type(LUSTRE_OSC_NAME);
4008 }
4009
4010 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4011 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4012 MODULE_LICENSE("GPL");
4013
4014 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4015 #endif