Whamcloud - gitweb
b778f1a64f96fd80b36cf8fa93cbe4082691aac2
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* by default 10s */
67 atomic_t osc_resend_time; 
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT(lsm->lsm_object_gr);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof (*lmm)) {
111                         CERROR("lov_mds_md too small: %d, need %d\n",
112                                lmm_bytes, (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (lmm->lmm_object_id == 0) {
118                         CERROR("lov_mds_md: zero lmm_object_id\n");
119                         RETURN(-EINVAL);
120                 }
121         }
122
123         lsm_size = lov_stripe_md_size(1);
124         if (lsmp == NULL)
125                 RETURN(lsm_size);
126
127         if (*lsmp != NULL && lmm == NULL) {
128                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129                 OBD_FREE(*lsmp, lsm_size);
130                 *lsmp = NULL;
131                 RETURN(0);
132         }
133
134         if (*lsmp == NULL) {
135                 OBD_ALLOC(*lsmp, lsm_size);
136                 if (*lsmp == NULL)
137                         RETURN(-ENOMEM);
138                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140                         OBD_FREE(*lsmp, lsm_size);
141                         RETURN(-ENOMEM);
142                 }
143                 loi_init((*lsmp)->lsm_oinfo[0]);
144         }
145
146         if (lmm != NULL) {
147                 /* XXX zero *lsmp? */
148                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150                 LASSERT((*lsmp)->lsm_object_id);
151                 LASSERT((*lsmp)->lsm_object_gr);
152         }
153
154         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155
156         RETURN(lsm_size);
157 }
158
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160                                  struct ost_body *body, void *capa)
161 {
162         struct obd_capa *oc = (struct obd_capa *)capa;
163         struct lustre_capa *c;
164
165         if (!capa)
166                 return;
167
168         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
169         LASSERT(c);
170         capa_cpy(c, oc);
171         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172         DEBUG_CAPA(D_SEC, c, "pack");
173 }
174
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176                                      struct obd_info *oinfo)
177 {
178         struct ost_body *body;
179
180         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181         body->oa = *oinfo->oi_oa;
182         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
183 }
184
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186                                  struct osc_async_args *aa, int rc)
187 {
188         struct ost_body *body;
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195                                   lustre_swab_ost_body);
196         if (body) {
197                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
199
200                 /* This should really be sent by the OST */
201                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
203         } else {
204                 CERROR("can't unpack ost_body\n");
205                 rc = -EPROTO;
206                 aa->aa_oi->oi_oa->o_valid = 0;
207         }
208 out:
209         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210         RETURN(rc);
211 }
212
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214                              struct ptlrpc_request_set *set)
215 {
216         struct ptlrpc_request *req;
217         struct ost_body *body;
218         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219         struct osc_async_args *aa;
220         ENTRY;
221
222         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224                               OST_GETATTR, 3, size,NULL);
225         if (!req)
226                 RETURN(-ENOMEM);
227
228         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
229
230         ptlrpc_req_set_repsize(req, 2, size);
231         req->rq_interpret_reply = osc_getattr_interpret;
232
233         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234         aa = (struct osc_async_args *)&req->rq_async_args;
235         aa->aa_oi = oinfo;
236
237         ptlrpc_set_add_req(set, req);
238         RETURN (0);
239 }
240
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
242 {
243         struct ptlrpc_request *req;
244         struct ost_body *body;
245         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
246         ENTRY;
247
248         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250                               OST_GETATTR, 3, size, NULL);
251         if (!req)
252                 RETURN(-ENOMEM);
253
254         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
255
256         ptlrpc_req_set_repsize(req, 2, size);
257
258         rc = ptlrpc_queue_wait(req);
259         if (rc) {
260                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261                 GOTO(out, rc);
262         }
263
264         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265                                   lustre_swab_ost_body);
266         if (body == NULL) {
267                 CERROR ("can't unpack ost_body\n");
268                 GOTO (out, rc = -EPROTO);
269         }
270
271         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272         *oinfo->oi_oa = body->oa;
273
274         /* This should really be sent by the OST */
275         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277
278         EXIT;
279  out:
280         ptlrpc_req_finished(req);
281         return rc;
282 }
283
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285                        struct obd_trans_info *oti)
286 {
287         struct ptlrpc_request *req;
288         struct ost_body *body;
289         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
290         ENTRY;
291
292         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293                                         oinfo->oi_oa->o_gr > 0);
294         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296                               OST_SETATTR, 3, size, NULL);
297         if (!req)
298                 RETURN(-ENOMEM);
299
300         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
301
302         ptlrpc_req_set_repsize(req, 2, size);
303
304         rc = ptlrpc_queue_wait(req);
305         if (rc)
306                 GOTO(out, rc);
307
308         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309                                   lustre_swab_ost_body);
310         if (body == NULL)
311                 GOTO(out, rc = -EPROTO);
312
313         *oinfo->oi_oa = body->oa;
314
315         EXIT;
316 out:
317         ptlrpc_req_finished(req);
318         RETURN(rc);
319 }
320
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322                                  struct osc_async_args *aa, int rc)
323 {
324         struct ost_body *body;
325         ENTRY;
326
327         if (rc != 0)
328                 GOTO(out, rc);
329
330         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331                                   lustre_swab_ost_body);
332         if (body == NULL) {
333                 CERROR("can't unpack ost_body\n");
334                 GOTO(out, rc = -EPROTO);
335         }
336
337         *aa->aa_oi->oi_oa = body->oa;
338 out:
339         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340         RETURN(rc);
341 }
342
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344                              struct obd_trans_info *oti,
345                              struct ptlrpc_request_set *rqset)
346 {
347         struct ptlrpc_request *req;
348         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349         struct osc_async_args *aa;
350         ENTRY;
351
352         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354                               OST_SETATTR, 3, size, NULL);
355         if (!req)
356                 RETURN(-ENOMEM);
357
358         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
360                 LASSERT(oti);
361                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
362         }
363
364         ptlrpc_req_set_repsize(req, 2, size);
365         /* do mds to ost setattr asynchronouly */
366         if (!rqset) {
367                 /* Do not wait for response. */
368                 ptlrpcd_add_req(req);
369         } else {
370                 req->rq_interpret_reply = osc_setattr_interpret;
371
372                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373                 aa = (struct osc_async_args *)&req->rq_async_args;
374                 aa->aa_oi = oinfo;
375
376                 ptlrpc_set_add_req(rqset, req);
377         }
378
379         RETURN(0);
380 }
381
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
384 {
385         struct ptlrpc_request *req;
386         struct ost_body *body;
387         struct lov_stripe_md *lsm;
388         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
389         ENTRY;
390
391         LASSERT(oa);
392         LASSERT(ea);
393
394         lsm = *ea;
395         if (!lsm) {
396                 rc = obd_alloc_memmd(exp, &lsm);
397                 if (rc < 0)
398                         RETURN(rc);
399         }
400
401         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402                               OST_CREATE, 2, size, NULL);
403         if (!req)
404                 GOTO(out, rc = -ENOMEM);
405
406         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
407         body->oa = *oa;
408
409         ptlrpc_req_set_repsize(req, 2, size);
410         if (oa->o_valid & OBD_MD_FLINLINE) {
411                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412                         oa->o_flags == OBD_FL_DELORPHAN);
413                 DEBUG_REQ(D_HA, req,
414                           "delorphan from OST integration");
415                 /* Don't resend the delorphan req */
416                 req->rq_no_resend = req->rq_no_delay = 1;
417         }
418
419         rc = ptlrpc_queue_wait(req);
420         if (rc)
421                 GOTO(out_req, rc);
422
423         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424                                   lustre_swab_ost_body);
425         if (body == NULL) {
426                 CERROR ("can't unpack ost_body\n");
427                 GOTO (out_req, rc = -EPROTO);
428         }
429
430         *oa = body->oa;
431
432         /* This should really be sent by the OST */
433         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434         oa->o_valid |= OBD_MD_FLBLKSZ;
435
436         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437          * have valid lsm_oinfo data structs, so don't go touching that.
438          * This needs to be fixed in a big way.
439          */
440         lsm->lsm_object_id = oa->o_id;
441         lsm->lsm_object_gr = oa->o_gr;
442         *ea = lsm;
443
444         if (oti != NULL) {
445                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
446
447                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448                         if (!oti->oti_logcookies)
449                                 oti_alloc_cookies(oti, 1);
450                         *oti->oti_logcookies = *obdo_logcookie(oa);
451                 }
452         }
453
454         CDEBUG(D_HA, "transno: "LPD64"\n",
455                lustre_msg_get_transno(req->rq_repmsg));
456         EXIT;
457 out_req:
458         ptlrpc_req_finished(req);
459 out:
460         if (rc && !*ea)
461                 obd_free_memmd(exp, &lsm);
462         return rc;
463 }
464
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466                                struct osc_async_args *aa, int rc)
467 {
468         struct ost_body *body;
469         ENTRY;
470
471         if (rc != 0)
472                 GOTO(out, rc);
473
474         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475                                   lustre_swab_ost_body);
476         if (body == NULL) {
477                 CERROR ("can't unpack ost_body\n");
478                 GOTO(out, rc = -EPROTO);
479         }
480
481         *aa->aa_oi->oi_oa = body->oa;
482 out:
483         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
484         RETURN(rc);
485 }
486
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488                      struct obd_trans_info *oti,
489                      struct ptlrpc_request_set *rqset)
490 {
491         struct ptlrpc_request *req;
492         struct osc_async_args *aa;
493         struct ost_body *body;
494         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
495         ENTRY;
496
497         if (!oinfo->oi_oa) {
498                 CERROR("oa NULL\n");
499                 RETURN(-EINVAL);
500         }
501
502         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504                               OST_PUNCH, 3, size, NULL);
505         if (!req)
506                 RETURN(-ENOMEM);
507
508         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
509
510         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511         /* overload the size and blocks fields in the oa with start/end */
512         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513         body->oa.o_size = oinfo->oi_policy.l_extent.start;
514         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
516
517         ptlrpc_req_set_repsize(req, 2, size);
518
519         req->rq_interpret_reply = osc_punch_interpret;
520         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521         aa = (struct osc_async_args *)&req->rq_async_args;
522         aa->aa_oi = oinfo;
523         ptlrpc_set_add_req(rqset, req);
524
525         RETURN(0);
526 }
527
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529                     struct lov_stripe_md *md, obd_size start, obd_size end,
530                     void *capa)
531 {
532         struct ptlrpc_request *req;
533         struct ost_body *body;
534         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
535         ENTRY;
536
537         if (!oa) {
538                 CERROR("oa NULL\n");
539                 RETURN(-EINVAL);
540         }
541
542         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
543
544         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545                               OST_SYNC, 3, size, NULL);
546         if (!req)
547                 RETURN(-ENOMEM);
548
549         /* overload the size and blocks fields in the oa with start/end */
550         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
551         body->oa = *oa;
552         body->oa.o_size = start;
553         body->oa.o_blocks = end;
554         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
555
556         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
557
558         ptlrpc_req_set_repsize(req, 2, size);
559
560         rc = ptlrpc_queue_wait(req);
561         if (rc)
562                 GOTO(out, rc);
563
564         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565                                   lustre_swab_ost_body);
566         if (body == NULL) {
567                 CERROR ("can't unpack ost_body\n");
568                 GOTO (out, rc = -EPROTO);
569         }
570
571         *oa = body->oa;
572
573         EXIT;
574  out:
575         ptlrpc_req_finished(req);
576         return rc;
577 }
578
579 /* Find and cancel locally locks matched by @mode in the resource found by
580  * @objid. Found locks are added into @cancel list. Returns the amount of
581  * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583                                    struct list_head *cancels, ldlm_mode_t mode,
584                                    int lock_flags)
585 {
586         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
589         int count;
590         ENTRY;
591
592         if (res == NULL)
593                 RETURN(0);
594
595         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596                                            lock_flags, 0, NULL);
597         ldlm_resource_putref(res);
598         RETURN(count);
599 }
600
601 /* Destroy requests can be async always on the client, and we don't even really
602  * care about the return code since the client cannot do anything at all about
603  * a destroy failure.
604  * When the MDS is unlinking a filename, it saves the file objects into a
605  * recovery llog, and these object records are cancelled when the OST reports
606  * they were destroyed and sync'd to disk (i.e. transaction committed).
607  * If the client dies, or the OST is down when the object should be destroyed,
608  * the records are not cancelled, and when the OST reconnects to the MDS next,
609  * it will retrieve the llog unlink logs and then sends the log cancellation
610  * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
613                        struct obd_export *md_export)
614 {
615         CFS_LIST_HEAD(cancels);
616         struct ptlrpc_request *req;
617         struct ost_body *body;
618         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619         int count, bufcount = 2;
620         ENTRY;
621
622         if (!oa) {
623                 CERROR("oa NULL\n");
624                 RETURN(-EINVAL);
625         }
626
627         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628                                         LDLM_FL_DISCARD_DATA);
629         if (exp_connect_cancelset(exp) && count) {
630                 bufcount = 3;
631                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
632         }
633         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634                               OST_DESTROY, bufcount, size, NULL);
635         if (exp_connect_cancelset(exp) && req)
636                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
637         else
638                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
639
640         if (!req)
641                 RETURN(-ENOMEM);
642
643         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
644
645         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648                        sizeof(*oti->oti_logcookies));
649         body->oa = *oa;
650
651         ptlrpc_req_set_repsize(req, 2, size);
652
653         ptlrpcd_add_req(req);
654         RETURN(0);
655 }
656
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
658                                 long writing_bytes)
659 {
660         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
661
662         LASSERT(!(oa->o_valid & bits));
663
664         oa->o_valid |= bits;
665         client_obd_list_lock(&cli->cl_loi_list_lock);
666         oa->o_dirty = cli->cl_dirty;
667         if (cli->cl_dirty > cli->cl_dirty_max) {
668                 CERROR("dirty %lu > dirty_max %lu\n",
669                        cli->cl_dirty, cli->cl_dirty_max);
670                 oa->o_undirty = 0;
671         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672                 CERROR("dirty %d > system dirty_max %d\n",
673                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
674                 oa->o_undirty = 0;
675         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676                 CERROR("dirty %lu - dirty_max %lu too big???\n",
677                        cli->cl_dirty, cli->cl_dirty_max);
678                 oa->o_undirty = 0;
679         } else {
680                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681                                 (cli->cl_max_rpcs_in_flight + 1);
682                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
683         }
684         oa->o_grant = cli->cl_avail_grant;
685         oa->o_dropped = cli->cl_lost_grant;
686         cli->cl_lost_grant = 0;
687         client_obd_list_unlock(&cli->cl_loi_list_lock);
688         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
690 }
691
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694                                     struct brw_page *pga)
695 {
696         atomic_inc(&obd_dirty_pages);
697         cli->cl_dirty += CFS_PAGE_SIZE;
698         cli->cl_avail_grant -= CFS_PAGE_SIZE;
699         pga->flag |= OBD_BRW_FROM_GRANT;
700         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701                CFS_PAGE_SIZE, pga, pga->pg);
702         LASSERT(cli->cl_avail_grant >= 0);
703 }
704
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706  * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708                                     struct brw_page *pga, int sent)
709 {
710         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
711         ENTRY;
712
713         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
714                 EXIT;
715                 return;
716         }
717
718         pga->flag &= ~OBD_BRW_FROM_GRANT;
719         atomic_dec(&obd_dirty_pages);
720         cli->cl_dirty -= CFS_PAGE_SIZE;
721         if (!sent) {
722                 cli->cl_lost_grant += CFS_PAGE_SIZE;
723                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726                 /* For short writes we shouldn't count parts of pages that
727                  * span a whole block on the OST side, or our accounting goes
728                  * wrong.  Should match the code in filter_grant_check. */
729                 int offset = pga->off & ~CFS_PAGE_MASK;
730                 int count = pga->count + (offset & (blocksize - 1));
731                 int end = (offset + pga->count) & (blocksize - 1);
732                 if (end)
733                         count += blocksize - end;
734
735                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738                        cli->cl_avail_grant, cli->cl_dirty);
739         }
740
741         EXIT;
742 }
743
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
745 {
746         return cli->cl_r_in_flight + cli->cl_w_in_flight;
747 }
748
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
751 {
752         struct list_head *l, *tmp;
753         struct osc_cache_waiter *ocw;
754
755         ENTRY;
756         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757                 /* if we can't dirty more, we must wait until some is written */
758                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761                                "osc max %ld, sys max %d\n", cli->cl_dirty,
762                                cli->cl_dirty_max, obd_max_dirty_pages);
763                         return;
764                 }
765
766                 /* if still dirty cache but no grant wait for pending RPCs that
767                  * may yet return us some grant before doing sync writes */
768                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770                                cli->cl_w_in_flight);
771                         return;
772                 }
773
774                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775                 list_del_init(&ocw->ocw_entry);
776                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777                         /* no more RPCs in flight to return grant, do sync IO */
778                         ocw->ocw_rc = -EDQUOT;
779                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
780                 } else {
781                         osc_consume_write_grant(cli,
782                                                 &ocw->ocw_oap->oap_brw_page);
783                 }
784
785                 cfs_waitq_signal(&ocw->ocw_waitq);
786         }
787
788         EXIT;
789 }
790
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
792 {
793         client_obd_list_lock(&cli->cl_loi_list_lock);
794         cli->cl_avail_grant = ocd->ocd_grant;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796
797         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798                cli->cl_avail_grant, cli->cl_lost_grant);
799         LASSERT(cli->cl_avail_grant >= 0);
800 }
801
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
803 {
804         client_obd_list_lock(&cli->cl_loi_list_lock);
805         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806         cli->cl_avail_grant += body->oa.o_grant;
807         /* waiters are woken in brw_interpret_oap */
808         client_obd_list_unlock(&cli->cl_loi_list_lock);
809 }
810
811 /* We assume that the reason this OSC got a short read is because it read
812  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
813  * via the LOV, and it _knows_ it's reading inside the file, it's just that
814  * this stripe never got written at or beyond this stripe offset yet. */
815 static void handle_short_read(int nob_read, obd_count page_count,
816                               struct brw_page **pga)
817 {
818         char *ptr;
819         int i = 0;
820
821         /* skip bytes read OK */
822         while (nob_read > 0) {
823                 LASSERT (page_count > 0);
824
825                 if (pga[i]->count > nob_read) {
826                         /* EOF inside this page */
827                         ptr = cfs_kmap(pga[i]->pg) +
828                                 (pga[i]->off & ~CFS_PAGE_MASK);
829                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
830                         cfs_kunmap(pga[i]->pg);
831                         page_count--;
832                         i++;
833                         break;
834                 }
835
836                 nob_read -= pga[i]->count;
837                 page_count--;
838                 i++;
839         }
840
841         /* zero remaining pages */
842         while (page_count-- > 0) {
843                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
844                 memset(ptr, 0, pga[i]->count);
845                 cfs_kunmap(pga[i]->pg);
846                 i++;
847         }
848 }
849
850 static int check_write_rcs(struct ptlrpc_request *req,
851                            int requested_nob, int niocount,
852                            obd_count page_count, struct brw_page **pga)
853 {
854         int    *remote_rcs, i;
855
856         /* return error if any niobuf was in error */
857         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
858                                         sizeof(*remote_rcs) * niocount, NULL);
859         if (remote_rcs == NULL) {
860                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
861                 return(-EPROTO);
862         }
863         if (lustre_msg_swabbed(req->rq_repmsg))
864                 for (i = 0; i < niocount; i++)
865                         __swab32s(&remote_rcs[i]);
866
867         for (i = 0; i < niocount; i++) {
868                 if (remote_rcs[i] < 0)
869                         return(remote_rcs[i]);
870
871                 if (remote_rcs[i] != 0) {
872                         CERROR("rc[%d] invalid (%d) req %p\n",
873                                 i, remote_rcs[i], req);
874                         return(-EPROTO);
875                 }
876         }
877
878         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
879                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
880                        requested_nob, req->rq_bulk->bd_nob_transferred);
881                 return(-EPROTO);
882         }
883
884         return (0);
885 }
886
887 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
888 {
889         if (p1->flag != p2->flag) {
890                 unsigned mask = ~OBD_BRW_FROM_GRANT;
891
892                 /* warn if we try to combine flags that we don't know to be
893                  * safe to combine */
894                 if ((p1->flag & mask) != (p2->flag & mask))
895                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
896                                "same brw?\n", p1->flag, p2->flag);
897                 return 0;
898         }
899
900         return (p1->off + p1->count == p2->off);
901 }
902
903 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
904                                    struct brw_page **pga)
905 {
906         __u32 cksum = ~0;
907         int i = 0;
908
909         LASSERT (pg_count > 0);
910         while (nob > 0 && pg_count > 0) {
911                 char *ptr = cfs_kmap(pga[i]->pg);
912                 int off = pga[i]->off & ~CFS_PAGE_MASK;
913                 int count = pga[i]->count > nob ? nob : pga[i]->count;
914
915                 /* corrupt the data before we compute the checksum, to
916                  * simulate an OST->client data error */
917                 if (i == 0 &&
918                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
919                         memcpy(ptr + off, "bad1", min(4, nob));
920                 cksum = crc32_le(cksum, ptr + off, count);
921                 cfs_kunmap(pga[i]->pg);
922                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
923                                off, cksum);
924
925                 nob -= pga[i]->count;
926                 pg_count--;
927                 i++;
928         }
929         /* For sending we only compute the wrong checksum instead
930          * of corrupting the data so it is still correct on a redo */
931         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
932                 cksum++;
933
934         return cksum;
935 }
936
937 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
938                                 struct lov_stripe_md *lsm, obd_count page_count,
939                                 struct brw_page **pga, 
940                                 struct ptlrpc_request **reqp,
941                                 struct obd_capa *ocapa)
942 {
943         struct ptlrpc_request   *req;
944         struct ptlrpc_bulk_desc *desc;
945         struct ost_body         *body;
946         struct obd_ioobj        *ioobj;
947         struct niobuf_remote    *niobuf;
948         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
949         int niocount, i, requested_nob, opc, rc;
950         struct ptlrpc_request_pool *pool;
951         struct lustre_capa      *capa;
952         struct osc_brw_async_args *aa;
953
954         ENTRY;
955         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
956         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
957
958         if ((cmd & OBD_BRW_WRITE) != 0) {
959                 opc = OST_WRITE;
960                 pool = cli->cl_import->imp_rq_pool;
961         } else {
962                 opc = OST_READ;
963                 pool = NULL;
964         }
965
966         for (niocount = i = 1; i < page_count; i++) {
967                 if (!can_merge_pages(pga[i - 1], pga[i]))
968                         niocount++;
969         }
970
971         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
972         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
973         if (ocapa)
974                 size[REQ_REC_OFF + 3] = sizeof(*capa);
975
976         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
977                                    size, NULL, pool, NULL);
978         if (req == NULL)
979                 RETURN (-ENOMEM);
980
981         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
982
983         if (opc == OST_WRITE)
984                 desc = ptlrpc_prep_bulk_imp (req, page_count,
985                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
986         else
987                 desc = ptlrpc_prep_bulk_imp (req, page_count,
988                                              BULK_PUT_SINK, OST_BULK_PORTAL);
989         if (desc == NULL)
990                 GOTO(out, rc = -ENOMEM);
991         /* NB request now owns desc and will free it when it gets freed */
992
993         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
994         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
995         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
996                                 niocount * sizeof(*niobuf));
997
998         body->oa = *oa;
999
1000         obdo_to_ioobj(oa, ioobj);
1001         ioobj->ioo_bufcnt = niocount;
1002         if (ocapa) {
1003                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1004                                       sizeof(*capa));
1005                 capa_cpy(capa, ocapa);
1006                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1007         }
1008
1009         LASSERT (page_count > 0);
1010         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1011                 struct brw_page *pg = pga[i];
1012                 struct brw_page *pg_prev = pga[i - 1];
1013
1014                 LASSERT(pg->count > 0);
1015                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1016                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1017                          pg->off, pg->count);
1018 #ifdef __LINUX__
1019                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1020                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1021                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1022                          i, page_count,
1023                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1024                          pg_prev->pg, page_private(pg_prev->pg),
1025                          pg_prev->pg->index, pg_prev->off);
1026 #else
1027                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1028                          "i %d p_c %u\n", i, page_count);
1029 #endif
1030                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1031                         (pg->flag & OBD_BRW_SRVLOCK));
1032
1033                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1034                                       pg->count);
1035                 requested_nob += pg->count;
1036
1037                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1038                         niobuf--;
1039                         niobuf->len += pg->count;
1040                 } else {
1041                         niobuf->offset = pg->off;
1042                         niobuf->len    = pg->count;
1043                         niobuf->flags  = pg->flag;
1044                 }
1045         }
1046
1047         LASSERT((void *)(niobuf - niocount) ==
1048                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1049                                niocount * sizeof(*niobuf)));
1050         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1051
1052         /* size[REQ_REC_OFF] still sizeof (*body) */
1053         if (opc == OST_WRITE) {
1054                 if (unlikely(cli->cl_checksum)) {
1055                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1056                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1057                                                              page_count, pga);
1058                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1059                                body->oa.o_cksum);
1060                         /* save this in 'oa', too, for later checking */
1061                         oa->o_valid |= OBD_MD_FLCKSUM;
1062                 } else {
1063                         /* clear out the checksum flag, in case this is a
1064                          * resend but cl_checksum is no longer set. b=11238 */
1065                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1066                 }
1067                 oa->o_cksum = body->oa.o_cksum;
1068                 /* 1 RC per niobuf */
1069                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1070                 ptlrpc_req_set_repsize(req, 3, size);
1071         } else {
1072                 if (unlikely(cli->cl_checksum))
1073                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1074                 /* 1 RC for the whole I/O */
1075                 ptlrpc_req_set_repsize(req, 2, size);
1076         }
1077
1078         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1079         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1080         aa->aa_oa = oa;
1081         aa->aa_requested_nob = requested_nob;
1082         aa->aa_nio_count = niocount;
1083         aa->aa_page_count = page_count;
1084         aa->aa_resends = 0;
1085         aa->aa_ppga = pga;
1086         aa->aa_cli = cli;
1087         INIT_LIST_HEAD(&aa->aa_oaps);
1088
1089         *reqp = req;
1090         RETURN (0);
1091
1092  out:
1093         ptlrpc_req_finished (req);
1094         RETURN (rc);
1095 }
1096
1097 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1098                                 __u32 client_cksum, __u32 server_cksum,
1099                                 int nob, obd_count page_count,
1100                                 struct brw_page **pga)
1101 {
1102         __u32 new_cksum;
1103         char *msg;
1104
1105         if (server_cksum == client_cksum) {
1106                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1107                 return 0;
1108         }
1109
1110         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1111
1112         if (new_cksum == server_cksum)
1113                 msg = "changed on the client after we checksummed it - "
1114                       "likely false positive due to mmap IO (bug 11742)";
1115         else if (new_cksum == client_cksum)
1116                 msg = "changed in transit before arrival at OST";
1117         else
1118                 msg = "changed in transit AND doesn't match the original - "
1119                       "likely false positive due to mmap IO (bug 11742)";
1120
1121         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1122                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1123                            "["LPU64"-"LPU64"]\n",
1124                            msg, libcfs_nid2str(peer->nid),
1125                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1126                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1127                                                         (__u64)0,
1128                            oa->o_id,
1129                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1130                            pga[0]->off,
1131                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1132         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1133                client_cksum, server_cksum, new_cksum);
1134         return 1;        
1135 }
1136
1137 /* Note rc enters this function as number of bytes transferred */
1138 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1139 {
1140         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1141         const lnet_process_id_t *peer =
1142                         &req->rq_import->imp_connection->c_peer;
1143         struct client_obd *cli = aa->aa_cli;
1144         struct ost_body *body;
1145         __u32 client_cksum = 0;
1146         ENTRY;
1147
1148         if (rc < 0 && rc != -EDQUOT)
1149                 RETURN(rc);
1150
1151         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1152         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1153                                   lustre_swab_ost_body);
1154         if (body == NULL) {
1155                 CERROR ("Can't unpack body\n");
1156                 RETURN(-EPROTO);
1157         }
1158
1159         /* set/clear over quota flag for a uid/gid */
1160         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1161             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1162                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1163                              body->oa.o_gid, body->oa.o_valid,
1164                              body->oa.o_flags);
1165
1166         if (rc < 0)
1167                 RETURN(rc);
1168
1169         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1170                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1171
1172         osc_update_grant(cli, body);
1173
1174         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1175                 if (rc > 0) {
1176                         CERROR ("Unexpected +ve rc %d\n", rc);
1177                         RETURN(-EPROTO);
1178                 }
1179                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1180
1181                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1182                              client_cksum &&
1183                              check_write_checksum(&body->oa, peer, client_cksum,
1184                                                   body->oa.o_cksum,
1185                                                   aa->aa_requested_nob,
1186                                                   aa->aa_page_count,
1187                                                   aa->aa_ppga)))
1188                         RETURN(-EAGAIN);
1189
1190                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1191                         RETURN(-EAGAIN);
1192
1193                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1194                                      aa->aa_page_count, aa->aa_ppga);
1195                 GOTO(out, rc);
1196         }
1197
1198         /* The rest of this function executes only for OST_READs */
1199         if (rc > aa->aa_requested_nob) {
1200                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1201                        aa->aa_requested_nob);
1202                 RETURN(-EPROTO);
1203         }
1204
1205         if (rc != req->rq_bulk->bd_nob_transferred) {
1206                 CERROR ("Unexpected rc %d (%d transferred)\n",
1207                         rc, req->rq_bulk->bd_nob_transferred);
1208                 return (-EPROTO);
1209         }
1210
1211         if (rc < aa->aa_requested_nob)
1212                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1213
1214         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1215                                          aa->aa_ppga))
1216                 GOTO(out, rc = -EAGAIN);
1217
1218         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1219                 static int cksum_counter;
1220                 __u32      server_cksum = body->oa.o_cksum;
1221                 char      *via;
1222                 char      *router;
1223
1224                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1225                                                  aa->aa_ppga);
1226
1227                 if (peer->nid == req->rq_bulk->bd_sender) {
1228                         via = router = "";
1229                 } else {
1230                         via = " via ";
1231                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1232                 }
1233
1234                 if (server_cksum == ~0 && rc > 0) {
1235                         CERROR("Protocol error: server %s set the 'checksum' "
1236                                "bit, but didn't send a checksum.  Not fatal, "
1237                                "but please tell CFS.\n",
1238                                libcfs_nid2str(peer->nid));
1239                 } else if (server_cksum != client_cksum) {
1240                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1241                                            "%s%s%s inum "LPU64"/"LPU64" object "
1242                                            LPU64"/"LPU64" extent "
1243                                            "["LPU64"-"LPU64"]\n",
1244                                            req->rq_import->imp_obd->obd_name,
1245                                            libcfs_nid2str(peer->nid),
1246                                            via, router,
1247                                            body->oa.o_valid & OBD_MD_FLFID ?
1248                                                 body->oa.o_fid : (__u64)0,
1249                                            body->oa.o_valid & OBD_MD_FLFID ?
1250                                                 body->oa.o_generation :(__u64)0,
1251                                            body->oa.o_id,
1252                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1253                                                 body->oa.o_gr : (__u64)0,
1254                                            aa->aa_ppga[0]->off,
1255                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1256                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1257                                                                         1);
1258                         CERROR("client %x, server %x\n",
1259                                client_cksum, server_cksum);
1260                         cksum_counter = 0;
1261                         aa->aa_oa->o_cksum = client_cksum;
1262                         rc = -EAGAIN;
1263                 } else {
1264                         cksum_counter++;
1265                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1266                         rc = 0;
1267                 }
1268         } else if (unlikely(client_cksum)) {
1269                 static int cksum_missed;
1270
1271                 cksum_missed++;
1272                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1273                         CERROR("Checksum %u requested from %s but not sent\n",
1274                                cksum_missed, libcfs_nid2str(peer->nid));
1275         } else {
1276                 rc = 0;
1277         }
1278 out:
1279         if (rc >= 0)
1280                 *aa->aa_oa = body->oa;
1281
1282         RETURN(rc);
1283 }
1284
1285 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1286                             struct lov_stripe_md *lsm,
1287                             obd_count page_count, struct brw_page **pga,
1288                             struct obd_capa *ocapa)
1289 {
1290         struct ptlrpc_request *req;
1291         int                    rc;
1292         cfs_waitq_t            waitq;
1293         int                    resends = 0;
1294         struct l_wait_info     lwi;
1295
1296         ENTRY;
1297
1298         cfs_waitq_init(&waitq);
1299
1300 restart_bulk:
1301         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1302                                   page_count, pga, &req, ocapa);
1303         if (rc != 0)
1304                 return (rc);
1305
1306         rc = ptlrpc_queue_wait(req);
1307
1308         if (rc == -ETIMEDOUT && req->rq_resend) {
1309                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1310                 ptlrpc_req_finished(req);
1311                 goto restart_bulk;
1312         }
1313
1314         rc = osc_brw_fini_request(req, rc);
1315
1316         ptlrpc_req_finished(req);
1317         if (osc_recoverable_error(rc)) {
1318                 resends++;
1319                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1320                         CERROR("too many resend retries, returning error\n");
1321                         RETURN(-EIO);
1322                 }
1323
1324                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1325                 l_wait_event(waitq, 0, &lwi);
1326
1327                 goto restart_bulk;
1328         }
1329         
1330         RETURN (rc);
1331 }
1332
1333 int osc_brw_redo_request(struct ptlrpc_request *request,
1334                          struct osc_brw_async_args *aa)
1335 {
1336         struct ptlrpc_request *new_req;
1337         struct ptlrpc_request_set *set = request->rq_set;
1338         struct osc_brw_async_args *new_aa;
1339         struct osc_async_page *oap;
1340         int rc = 0;
1341         ENTRY;
1342
1343         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1344                 CERROR("too many resend retries, returning error\n");
1345                 RETURN(-EIO);
1346         }
1347         
1348         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1349 /*
1350         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1351         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1352                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1353                                            REQ_REC_OFF + 3);
1354 */
1355         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1356                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1357                                   aa->aa_cli, aa->aa_oa,
1358                                   NULL /* lsm unused by osc currently */,
1359                                   aa->aa_page_count, aa->aa_ppga, 
1360                                   &new_req, NULL /* ocapa */);
1361         if (rc)
1362                 RETURN(rc);
1363
1364         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1365    
1366         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1367                 if (oap->oap_request != NULL) {
1368                         LASSERTF(request == oap->oap_request,
1369                                  "request %p != oap_request %p\n",
1370                                  request, oap->oap_request);
1371                         if (oap->oap_interrupted) {
1372                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1373                                 ptlrpc_req_finished(new_req);                        
1374                                 RETURN(-EINTR);
1375                         }
1376                 }
1377         }
1378         /* New request takes over pga and oaps from old request.
1379          * Note that copying a list_head doesn't work, need to move it... */
1380         aa->aa_resends++;
1381         new_req->rq_interpret_reply = request->rq_interpret_reply;
1382         new_req->rq_async_args = request->rq_async_args;
1383         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1384
1385         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1386
1387         INIT_LIST_HEAD(&new_aa->aa_oaps);
1388         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1389         INIT_LIST_HEAD(&aa->aa_oaps);
1390
1391         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1392                 if (oap->oap_request) {
1393                         ptlrpc_req_finished(oap->oap_request);
1394                         oap->oap_request = ptlrpc_request_addref(new_req);
1395                 }
1396         }
1397         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1398
1399         DEBUG_REQ(D_INFO, new_req, "new request");
1400
1401         ptlrpc_set_add_req(set, new_req);
1402
1403         RETURN(0);
1404 }
1405
1406 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1407 {
1408         struct osc_brw_async_args *aa = data;
1409         int                        i;
1410         int                        nob = rc;
1411         ENTRY;
1412
1413         rc = osc_brw_fini_request(req, rc);
1414         if (osc_recoverable_error(rc)) {
1415                 rc = osc_brw_redo_request(req, aa);
1416                 if (rc == 0)
1417                         RETURN(0);
1418         }
1419         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1420                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1421
1422         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1423         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1424                 aa->aa_cli->cl_w_in_flight--;
1425         else
1426                 aa->aa_cli->cl_r_in_flight--;
1427         for (i = 0; i < aa->aa_page_count; i++)
1428                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1429         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1430
1431         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1432
1433         RETURN(rc);
1434 }
1435
1436 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1437                           struct lov_stripe_md *lsm, obd_count page_count,
1438                           struct brw_page **pga, struct ptlrpc_request_set *set,
1439                           struct obd_capa *ocapa)
1440 {
1441         struct ptlrpc_request     *req;
1442         struct client_obd         *cli = &exp->exp_obd->u.cli;
1443         int                        rc, i;
1444         struct osc_brw_async_args *aa;
1445         ENTRY;
1446
1447         /* Consume write credits even if doing a sync write -
1448          * otherwise we may run out of space on OST due to grant. */
1449         if (cmd == OBD_BRW_WRITE) {
1450                 spin_lock(&cli->cl_loi_list_lock);
1451                 for (i = 0; i < page_count; i++) {
1452                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1453                                 osc_consume_write_grant(cli, pga[i]);
1454                 }
1455                 spin_unlock(&cli->cl_loi_list_lock);
1456         }
1457
1458         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1459                                   &req, ocapa);
1460
1461         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1462         if (cmd == OBD_BRW_READ) {
1463                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1464                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1465                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1466         } else {
1467                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1468                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1469                                  cli->cl_w_in_flight);
1470                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1471         }
1472
1473         if (rc == 0) {
1474                 req->rq_interpret_reply = brw_interpret;
1475                 ptlrpc_set_add_req(set, req);
1476                 client_obd_list_lock(&cli->cl_loi_list_lock);
1477                 if (cmd == OBD_BRW_READ)
1478                         cli->cl_r_in_flight++;
1479                 else
1480                         cli->cl_w_in_flight++;
1481                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1482         } else if (cmd == OBD_BRW_WRITE) {
1483                 client_obd_list_lock(&cli->cl_loi_list_lock);
1484                 for (i = 0; i < page_count; i++)
1485                         osc_release_write_grant(cli, pga[i], 0);
1486                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1487         }
1488         RETURN (rc);
1489 }
1490
1491 /*
1492  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1493  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1494  * fine for our small page arrays and doesn't require allocation.  its an
1495  * insertion sort that swaps elements that are strides apart, shrinking the
1496  * stride down until its '1' and the array is sorted.
1497  */
1498 static void sort_brw_pages(struct brw_page **array, int num)
1499 {
1500         int stride, i, j;
1501         struct brw_page *tmp;
1502
1503         if (num == 1)
1504                 return;
1505         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1506                 ;
1507
1508         do {
1509                 stride /= 3;
1510                 for (i = stride ; i < num ; i++) {
1511                         tmp = array[i];
1512                         j = i;
1513                         while (j >= stride && array[j - stride]->off > tmp->off) {
1514                                 array[j] = array[j - stride];
1515                                 j -= stride;
1516                         }
1517                         array[j] = tmp;
1518                 }
1519         } while (stride > 1);
1520 }
1521
1522 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1523 {
1524         int count = 1;
1525         int offset;
1526         int i = 0;
1527
1528         LASSERT (pages > 0);
1529         offset = pg[i]->off & ~CFS_PAGE_MASK;
1530
1531         for (;;) {
1532                 pages--;
1533                 if (pages == 0)         /* that's all */
1534                         return count;
1535
1536                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1537                         return count;   /* doesn't end on page boundary */
1538
1539                 i++;
1540                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1541                 if (offset != 0)        /* doesn't start on page boundary */
1542                         return count;
1543
1544                 count++;
1545         }
1546 }
1547
1548 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1549 {
1550         struct brw_page **ppga;
1551         int i;
1552
1553         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1554         if (ppga == NULL)
1555                 return NULL;
1556
1557         for (i = 0; i < count; i++)
1558                 ppga[i] = pga + i;
1559         return ppga;
1560 }
1561
1562 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1563 {
1564         LASSERT(ppga != NULL);
1565         OBD_FREE(ppga, sizeof(*ppga) * count);
1566 }
1567
1568 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1569                    obd_count page_count, struct brw_page *pga,
1570                    struct obd_trans_info *oti)
1571 {
1572         struct obdo *saved_oa = NULL;
1573         struct brw_page **ppga, **orig;
1574         struct obd_import *imp = class_exp2cliimp(exp);
1575         struct client_obd *cli = &imp->imp_obd->u.cli;
1576         int rc, page_count_orig;
1577         ENTRY;
1578
1579         if (cmd & OBD_BRW_CHECK) {
1580                 /* The caller just wants to know if there's a chance that this
1581                  * I/O can succeed */
1582
1583                 if (imp == NULL || imp->imp_invalid)
1584                         RETURN(-EIO);
1585                 RETURN(0);
1586         }
1587
1588         /* test_brw with a failed create can trip this, maybe others. */
1589         LASSERT(cli->cl_max_pages_per_rpc);
1590
1591         rc = 0;
1592
1593         orig = ppga = osc_build_ppga(pga, page_count);
1594         if (ppga == NULL)
1595                 RETURN(-ENOMEM);
1596         page_count_orig = page_count;
1597
1598         sort_brw_pages(ppga, page_count);
1599         while (page_count) {
1600                 obd_count pages_per_brw;
1601
1602                 if (page_count > cli->cl_max_pages_per_rpc)
1603                         pages_per_brw = cli->cl_max_pages_per_rpc;
1604                 else
1605                         pages_per_brw = page_count;
1606
1607                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1608
1609                 if (saved_oa != NULL) {
1610                         /* restore previously saved oa */
1611                         *oinfo->oi_oa = *saved_oa;
1612                 } else if (page_count > pages_per_brw) {
1613                         /* save a copy of oa (brw will clobber it) */
1614                         OBDO_ALLOC(saved_oa);
1615                         if (saved_oa == NULL)
1616                                 GOTO(out, rc = -ENOMEM);
1617                         *saved_oa = *oinfo->oi_oa;
1618                 }
1619
1620                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1621                                       pages_per_brw, ppga, oinfo->oi_capa);
1622
1623                 if (rc != 0)
1624                         break;
1625
1626                 page_count -= pages_per_brw;
1627                 ppga += pages_per_brw;
1628         }
1629
1630 out:
1631         osc_release_ppga(orig, page_count_orig);
1632
1633         if (saved_oa != NULL)
1634                 OBDO_FREE(saved_oa);
1635
1636         RETURN(rc);
1637 }
1638
1639 static int osc_brw_async(int cmd, struct obd_export *exp,
1640                          struct obd_info *oinfo, obd_count page_count,
1641                          struct brw_page *pga, struct obd_trans_info *oti,
1642                          struct ptlrpc_request_set *set)
1643 {
1644         struct brw_page **ppga, **orig;
1645         struct client_obd *cli = &exp->exp_obd->u.cli;
1646         int page_count_orig;
1647         int rc = 0;
1648         ENTRY;
1649
1650         if (cmd & OBD_BRW_CHECK) {
1651                 struct obd_import *imp = class_exp2cliimp(exp);
1652                 /* The caller just wants to know if there's a chance that this
1653                  * I/O can succeed */
1654
1655                 if (imp == NULL || imp->imp_invalid)
1656                         RETURN(-EIO);
1657                 RETURN(0);
1658         }
1659
1660         orig = ppga = osc_build_ppga(pga, page_count);
1661         if (ppga == NULL)
1662                 RETURN(-ENOMEM);
1663         page_count_orig = page_count;
1664
1665         sort_brw_pages(ppga, page_count);
1666         while (page_count) {
1667                 struct brw_page **copy;
1668                 obd_count pages_per_brw;
1669
1670                 pages_per_brw = min_t(obd_count, page_count,
1671                                       cli->cl_max_pages_per_rpc);
1672
1673                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1674
1675                 /* use ppga only if single RPC is going to fly */
1676                 if (pages_per_brw != page_count_orig || ppga != orig) {
1677                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1678                         if (copy == NULL)
1679                                 GOTO(out, rc = -ENOMEM);
1680                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1681                 } else
1682                         copy = ppga;
1683
1684                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1685                                     pages_per_brw, copy, set, oinfo->oi_capa);
1686
1687                 if (rc != 0) {
1688                         if (copy != ppga)
1689                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1690                         break;
1691                 }
1692                 if (copy == orig) {
1693                         /* we passed it to async_internal() which is
1694                          * now responsible for releasing memory */
1695                         orig = NULL;
1696                 }
1697
1698                 page_count -= pages_per_brw;
1699                 ppga += pages_per_brw;
1700         }
1701 out:
1702         if (orig)
1703                 osc_release_ppga(orig, page_count_orig);
1704         RETURN(rc);
1705 }
1706
1707 static void osc_check_rpcs(struct client_obd *cli);
1708
1709 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1710  * the dirty accounting.  Writeback completes or truncate happens before
1711  * writing starts.  Must be called with the loi lock held. */
1712 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1713                            int sent)
1714 {
1715         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1716 }
1717
1718
1719 /* This maintains the lists of pending pages to read/write for a given object
1720  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1721  * to quickly find objects that are ready to send an RPC. */
1722 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1723                          int cmd)
1724 {
1725         int optimal;
1726         ENTRY;
1727
1728         if (lop->lop_num_pending == 0)
1729                 RETURN(0);
1730
1731         /* if we have an invalid import we want to drain the queued pages
1732          * by forcing them through rpcs that immediately fail and complete
1733          * the pages.  recovery relies on this to empty the queued pages
1734          * before canceling the locks and evicting down the llite pages */
1735         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1736                 RETURN(1);
1737
1738         /* stream rpcs in queue order as long as as there is an urgent page
1739          * queued.  this is our cheap solution for good batching in the case
1740          * where writepage marks some random page in the middle of the file
1741          * as urgent because of, say, memory pressure */
1742         if (!list_empty(&lop->lop_urgent)) {
1743                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1744                 RETURN(1);
1745         }
1746         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1747         optimal = cli->cl_max_pages_per_rpc;
1748         if (cmd & OBD_BRW_WRITE) {
1749                 /* trigger a write rpc stream as long as there are dirtiers
1750                  * waiting for space.  as they're waiting, they're not going to
1751                  * create more pages to coallesce with what's waiting.. */
1752                 if (!list_empty(&cli->cl_cache_waiters)) {
1753                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1754                         RETURN(1);
1755                 }
1756                 /* +16 to avoid triggering rpcs that would want to include pages
1757                  * that are being queued but which can't be made ready until
1758                  * the queuer finishes with the page. this is a wart for
1759                  * llite::commit_write() */
1760                 optimal += 16;
1761         }
1762         if (lop->lop_num_pending >= optimal)
1763                 RETURN(1);
1764
1765         RETURN(0);
1766 }
1767
1768 static void on_list(struct list_head *item, struct list_head *list,
1769                     int should_be_on)
1770 {
1771         if (list_empty(item) && should_be_on)
1772                 list_add_tail(item, list);
1773         else if (!list_empty(item) && !should_be_on)
1774                 list_del_init(item);
1775 }
1776
1777 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1778  * can find pages to build into rpcs quickly */
1779 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1780 {
1781         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1782                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1783                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1784
1785         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1786                 loi->loi_write_lop.lop_num_pending);
1787
1788         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1789                 loi->loi_read_lop.lop_num_pending);
1790 }
1791
1792 static void lop_update_pending(struct client_obd *cli,
1793                                struct loi_oap_pages *lop, int cmd, int delta)
1794 {
1795         lop->lop_num_pending += delta;
1796         if (cmd & OBD_BRW_WRITE)
1797                 cli->cl_pending_w_pages += delta;
1798         else
1799                 cli->cl_pending_r_pages += delta;
1800 }
1801
1802 /* this is called when a sync waiter receives an interruption.  Its job is to
1803  * get the caller woken as soon as possible.  If its page hasn't been put in an
1804  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1805  * desiring interruption which will forcefully complete the rpc once the rpc
1806  * has timed out */
1807 static void osc_occ_interrupted(struct oig_callback_context *occ)
1808 {
1809         struct osc_async_page *oap;
1810         struct loi_oap_pages *lop;
1811         struct lov_oinfo *loi;
1812         ENTRY;
1813
1814         /* XXX member_of() */
1815         oap = list_entry(occ, struct osc_async_page, oap_occ);
1816
1817         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1818
1819         oap->oap_interrupted = 1;
1820
1821         /* ok, it's been put in an rpc. only one oap gets a request reference */
1822         if (oap->oap_request != NULL) {
1823                 ptlrpc_mark_interrupted(oap->oap_request);
1824                 ptlrpcd_wake(oap->oap_request);
1825                 GOTO(unlock, 0);
1826         }
1827
1828         /* we don't get interruption callbacks until osc_trigger_group_io()
1829          * has been called and put the sync oaps in the pending/urgent lists.*/
1830         if (!list_empty(&oap->oap_pending_item)) {
1831                 list_del_init(&oap->oap_pending_item);
1832                 list_del_init(&oap->oap_urgent_item);
1833
1834                 loi = oap->oap_loi;
1835                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1836                         &loi->loi_write_lop : &loi->loi_read_lop;
1837                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1838                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1839
1840                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1841                 oap->oap_oig = NULL;
1842         }
1843
1844 unlock:
1845         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1846 }
1847
1848 /* this is trying to propogate async writeback errors back up to the
1849  * application.  As an async write fails we record the error code for later if
1850  * the app does an fsync.  As long as errors persist we force future rpcs to be
1851  * sync so that the app can get a sync error and break the cycle of queueing
1852  * pages for which writeback will fail. */
1853 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1854                            int rc)
1855 {
1856         if (rc) {
1857                 if (!ar->ar_rc)
1858                         ar->ar_rc = rc;
1859
1860                 ar->ar_force_sync = 1;
1861                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1862                 return;
1863
1864         }
1865
1866         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1867                 ar->ar_force_sync = 0;
1868 }
1869
1870 static void osc_oap_to_pending(struct osc_async_page *oap)
1871 {
1872         struct loi_oap_pages *lop;
1873
1874         if (oap->oap_cmd & OBD_BRW_WRITE)
1875                 lop = &oap->oap_loi->loi_write_lop;
1876         else
1877                 lop = &oap->oap_loi->loi_read_lop;
1878
1879         if (oap->oap_async_flags & ASYNC_URGENT)
1880                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1881         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1882         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1883 }
1884
1885 /* this must be called holding the loi list lock to give coverage to exit_cache,
1886  * async_flag maintenance, and oap_request */
1887 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1888                               struct osc_async_page *oap, int sent, int rc)
1889 {
1890         __u64 xid = 0;
1891
1892         ENTRY;
1893         if (oap->oap_request != NULL) {
1894                 xid = ptlrpc_req_xid(oap->oap_request);
1895                 ptlrpc_req_finished(oap->oap_request);
1896                 oap->oap_request = NULL;
1897         }
1898
1899         oap->oap_async_flags = 0;
1900         oap->oap_interrupted = 0;
1901
1902         if (oap->oap_cmd & OBD_BRW_WRITE) {
1903                 osc_process_ar(&cli->cl_ar, xid, rc);
1904                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1905         }
1906
1907         if (rc == 0 && oa != NULL) {
1908                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1909                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1910                 if (oa->o_valid & OBD_MD_FLMTIME)
1911                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1912                 if (oa->o_valid & OBD_MD_FLATIME)
1913                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1914                 if (oa->o_valid & OBD_MD_FLCTIME)
1915                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1916         }
1917
1918         if (oap->oap_oig) {
1919                 osc_exit_cache(cli, oap, sent);
1920                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1921                 oap->oap_oig = NULL;
1922                 EXIT;
1923                 return;
1924         }
1925
1926         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1927                                                 oap->oap_cmd, oa, rc);
1928
1929         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1930          * I/O on the page could start, but OSC calls it under lock
1931          * and thus we can add oap back to pending safely */
1932         if (rc)
1933                 /* upper layer wants to leave the page on pending queue */
1934                 osc_oap_to_pending(oap);
1935         else
1936                 osc_exit_cache(cli, oap, sent);
1937         EXIT;
1938 }
1939
1940 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1941 {
1942         struct osc_async_page *oap, *tmp;
1943         struct osc_brw_async_args *aa = data;
1944         struct client_obd *cli;
1945         ENTRY;
1946
1947         rc = osc_brw_fini_request(req, rc);
1948         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1949         if (osc_recoverable_error(rc)) {
1950                 rc = osc_brw_redo_request(req, aa);
1951                 if (rc == 0)
1952                         RETURN(0);
1953         }
1954
1955         cli = aa->aa_cli;
1956
1957         client_obd_list_lock(&cli->cl_loi_list_lock);
1958
1959         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1960          * is called so we know whether to go to sync BRWs or wait for more
1961          * RPCs to complete */
1962         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1963                 cli->cl_w_in_flight--;
1964         else
1965                 cli->cl_r_in_flight--;
1966
1967         /* the caller may re-use the oap after the completion call so
1968          * we need to clean it up a little */
1969         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1970                 list_del_init(&oap->oap_rpc_item);
1971                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1972         }
1973
1974         osc_wake_cache_waiters(cli);
1975         osc_check_rpcs(cli);
1976
1977         client_obd_list_unlock(&cli->cl_loi_list_lock);
1978
1979         OBDO_FREE(aa->aa_oa);
1980         
1981         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1982         RETURN(rc);
1983 }
1984
1985 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1986                                             struct list_head *rpc_list,
1987                                             int page_count, int cmd)
1988 {
1989         struct ptlrpc_request *req;
1990         struct brw_page **pga = NULL;
1991         struct osc_brw_async_args *aa;
1992         struct obdo *oa = NULL;
1993         struct obd_async_page_ops *ops = NULL;
1994         void *caller_data = NULL;
1995         struct obd_capa *ocapa;
1996         struct osc_async_page *oap;
1997         int i, rc;
1998
1999         ENTRY;
2000         LASSERT(!list_empty(rpc_list));
2001
2002         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2003         if (pga == NULL)
2004                 RETURN(ERR_PTR(-ENOMEM));
2005
2006         OBDO_ALLOC(oa);
2007         if (oa == NULL)
2008                 GOTO(out, req = ERR_PTR(-ENOMEM));
2009
2010         i = 0;
2011         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2012                 if (ops == NULL) {
2013                         ops = oap->oap_caller_ops;
2014                         caller_data = oap->oap_caller_data;
2015                 }
2016                 pga[i] = &oap->oap_brw_page;
2017                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2018                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2019                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2020                 i++;
2021         }
2022
2023         /* always get the data for the obdo for the rpc */
2024         LASSERT(ops != NULL);
2025         ops->ap_fill_obdo(caller_data, cmd, oa);
2026         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2027
2028         sort_brw_pages(pga, page_count);
2029         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2030                                   pga, &req, ocapa);
2031         capa_put(ocapa);
2032         if (rc != 0) {
2033                 CERROR("prep_req failed: %d\n", rc);
2034                 GOTO(out, req = ERR_PTR(rc));
2035         }
2036
2037         /* Need to update the timestamps after the request is built in case
2038          * we race with setattr (locally or in queue at OST).  If OST gets
2039          * later setattr before earlier BRW (as determined by the request xid),
2040          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2041          * way to do this in a single call.  bug 10150 */
2042         ops->ap_update_obdo(caller_data, cmd, oa,
2043                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2044
2045         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2046         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2047         INIT_LIST_HEAD(&aa->aa_oaps);
2048         list_splice(rpc_list, &aa->aa_oaps);
2049         INIT_LIST_HEAD(rpc_list);
2050
2051 out:
2052         if (IS_ERR(req)) {
2053                 if (oa)
2054                         OBDO_FREE(oa);
2055                 if (pga)
2056                         OBD_FREE(pga, sizeof(*pga) * page_count);
2057         }
2058         RETURN(req);
2059 }
2060
2061 /* the loi lock is held across this function but it's allowed to release
2062  * and reacquire it during its work */
2063 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2064                             int cmd, struct loi_oap_pages *lop)
2065 {
2066         struct ptlrpc_request *req;
2067         obd_count page_count = 0;
2068         struct osc_async_page *oap = NULL, *tmp;
2069         struct osc_brw_async_args *aa;
2070         struct obd_async_page_ops *ops;
2071         CFS_LIST_HEAD(rpc_list);
2072         unsigned int ending_offset;
2073         unsigned  starting_offset = 0;
2074         ENTRY;
2075
2076         /* first we find the pages we're allowed to work with */
2077         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2078                                  oap_pending_item) {
2079                 ops = oap->oap_caller_ops;
2080
2081                 LASSERT(oap->oap_magic == OAP_MAGIC);
2082
2083                 /* in llite being 'ready' equates to the page being locked
2084                  * until completion unlocks it.  commit_write submits a page
2085                  * as not ready because its unlock will happen unconditionally
2086                  * as the call returns.  if we race with commit_write giving
2087                  * us that page we dont' want to create a hole in the page
2088                  * stream, so we stop and leave the rpc to be fired by
2089                  * another dirtier or kupdated interval (the not ready page
2090                  * will still be on the dirty list).  we could call in
2091                  * at the end of ll_file_write to process the queue again. */
2092                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2093                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2094                         if (rc < 0)
2095                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2096                                                 "instead of ready\n", oap,
2097                                                 oap->oap_page, rc);
2098                         switch (rc) {
2099                         case -EAGAIN:
2100                                 /* llite is telling us that the page is still
2101                                  * in commit_write and that we should try
2102                                  * and put it in an rpc again later.  we
2103                                  * break out of the loop so we don't create
2104                                  * a hole in the sequence of pages in the rpc
2105                                  * stream.*/
2106                                 oap = NULL;
2107                                 break;
2108                         case -EINTR:
2109                                 /* the io isn't needed.. tell the checks
2110                                  * below to complete the rpc with EINTR */
2111                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2112                                 oap->oap_count = -EINTR;
2113                                 break;
2114                         case 0:
2115                                 oap->oap_async_flags |= ASYNC_READY;
2116                                 break;
2117                         default:
2118                                 LASSERTF(0, "oap %p page %p returned %d "
2119                                             "from make_ready\n", oap,
2120                                             oap->oap_page, rc);
2121                                 break;
2122                         }
2123                 }
2124                 if (oap == NULL)
2125                         break;
2126                 /*
2127                  * Page submitted for IO has to be locked. Either by
2128                  * ->ap_make_ready() or by higher layers.
2129                  *
2130                  * XXX nikita: this assertion should be adjusted when lustre
2131                  * starts using PG_writeback for pages being written out.
2132                  */
2133 #if defined(__KERNEL__) && defined(__LINUX__)
2134                 LASSERT(PageLocked(oap->oap_page));
2135 #endif
2136                 /* If there is a gap at the start of this page, it can't merge
2137                  * with any previous page, so we'll hand the network a
2138                  * "fragmented" page array that it can't transfer in 1 RDMA */
2139                 if (page_count != 0 && oap->oap_page_off != 0)
2140                         break;
2141
2142                 /* take the page out of our book-keeping */
2143                 list_del_init(&oap->oap_pending_item);
2144                 lop_update_pending(cli, lop, cmd, -1);
2145                 list_del_init(&oap->oap_urgent_item);
2146
2147                 if (page_count == 0)
2148                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2149                                           (PTLRPC_MAX_BRW_SIZE - 1);
2150
2151                 /* ask the caller for the size of the io as the rpc leaves. */
2152                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2153                         oap->oap_count =
2154                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2155                 if (oap->oap_count <= 0) {
2156                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2157                                oap->oap_count);
2158                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2159                         continue;
2160                 }
2161
2162                 /* now put the page back in our accounting */
2163                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2164                 if (++page_count >= cli->cl_max_pages_per_rpc)
2165                         break;
2166
2167                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2168                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2169                  * have the same alignment as the initial writes that allocated
2170                  * extents on the server. */
2171                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2172                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2173                 if (ending_offset == 0)
2174                         break;
2175
2176                 /* If there is a gap at the end of this page, it can't merge
2177                  * with any subsequent pages, so we'll hand the network a
2178                  * "fragmented" page array that it can't transfer in 1 RDMA */
2179                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2180                         break;
2181         }
2182
2183         osc_wake_cache_waiters(cli);
2184
2185         if (page_count == 0)
2186                 RETURN(0);
2187
2188         loi_list_maint(cli, loi);
2189
2190         client_obd_list_unlock(&cli->cl_loi_list_lock);
2191
2192         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2193         if (IS_ERR(req)) {
2194                 /* this should happen rarely and is pretty bad, it makes the
2195                  * pending list not follow the dirty order */
2196                 client_obd_list_lock(&cli->cl_loi_list_lock);
2197                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2198                         list_del_init(&oap->oap_rpc_item);
2199
2200                         /* queued sync pages can be torn down while the pages
2201                          * were between the pending list and the rpc */
2202                         if (oap->oap_interrupted) {
2203                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2204                                 osc_ap_completion(cli, NULL, oap, 0,
2205                                                   oap->oap_count);
2206                                 continue;
2207                         }
2208                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2209                 }
2210                 loi_list_maint(cli, loi);
2211                 RETURN(PTR_ERR(req));
2212         }
2213
2214         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2215
2216         if (cmd == OBD_BRW_READ) {
2217                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2218                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2219                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2220                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2221                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2222         } else {
2223                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2224                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2225                                  cli->cl_w_in_flight);
2226                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2227                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2228                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2229         }
2230
2231         client_obd_list_lock(&cli->cl_loi_list_lock);
2232
2233         if (cmd == OBD_BRW_READ)
2234                 cli->cl_r_in_flight++;
2235         else
2236                 cli->cl_w_in_flight++;
2237
2238         /* queued sync pages can be torn down while the pages
2239          * were between the pending list and the rpc */
2240         tmp = NULL;
2241         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2242                 /* only one oap gets a request reference */
2243                 if (tmp == NULL)
2244                         tmp = oap;
2245                 if (oap->oap_interrupted && !req->rq_intr) {
2246                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2247                                oap, req);
2248                         ptlrpc_mark_interrupted(req);
2249                 }
2250         }
2251         if (tmp != NULL)
2252                 tmp->oap_request = ptlrpc_request_addref(req);
2253
2254         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2255                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2256
2257         req->rq_interpret_reply = brw_interpret_oap;
2258         ptlrpcd_add_req(req);
2259         RETURN(1);
2260 }
2261
2262 #define LOI_DEBUG(LOI, STR, args...)                                     \
2263         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2264                !list_empty(&(LOI)->loi_cli_item),                        \
2265                (LOI)->loi_write_lop.lop_num_pending,                     \
2266                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2267                (LOI)->loi_read_lop.lop_num_pending,                      \
2268                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2269                args)                                                     \
2270
2271 /* This is called by osc_check_rpcs() to find which objects have pages that
2272  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2273 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2274 {
2275         ENTRY;
2276         /* first return all objects which we already know to have
2277          * pages ready to be stuffed into rpcs */
2278         if (!list_empty(&cli->cl_loi_ready_list))
2279                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2280                                   struct lov_oinfo, loi_cli_item));
2281
2282         /* then if we have cache waiters, return all objects with queued
2283          * writes.  This is especially important when many small files
2284          * have filled up the cache and not been fired into rpcs because
2285          * they don't pass the nr_pending/object threshhold */
2286         if (!list_empty(&cli->cl_cache_waiters) &&
2287             !list_empty(&cli->cl_loi_write_list))
2288                 RETURN(list_entry(cli->cl_loi_write_list.next,
2289                                   struct lov_oinfo, loi_write_item));
2290
2291         /* then return all queued objects when we have an invalid import
2292          * so that they get flushed */
2293         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2294                 if (!list_empty(&cli->cl_loi_write_list))
2295                         RETURN(list_entry(cli->cl_loi_write_list.next,
2296                                           struct lov_oinfo, loi_write_item));
2297                 if (!list_empty(&cli->cl_loi_read_list))
2298                         RETURN(list_entry(cli->cl_loi_read_list.next,
2299                                           struct lov_oinfo, loi_read_item));
2300         }
2301         RETURN(NULL);
2302 }
2303
2304 /* called with the loi list lock held */
2305 static void osc_check_rpcs(struct client_obd *cli)
2306 {
2307         struct lov_oinfo *loi;
2308         int rc = 0, race_counter = 0;
2309         ENTRY;
2310
2311         while ((loi = osc_next_loi(cli)) != NULL) {
2312                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2313
2314                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2315                         break;
2316
2317                 /* attempt some read/write balancing by alternating between
2318                  * reads and writes in an object.  The makes_rpc checks here
2319                  * would be redundant if we were getting read/write work items
2320                  * instead of objects.  we don't want send_oap_rpc to drain a
2321                  * partial read pending queue when we're given this object to
2322                  * do io on writes while there are cache waiters */
2323                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2324                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2325                                               &loi->loi_write_lop);
2326                         if (rc < 0)
2327                                 break;
2328                         if (rc > 0)
2329                                 race_counter = 0;
2330                         else
2331                                 race_counter++;
2332                 }
2333                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2334                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2335                                               &loi->loi_read_lop);
2336                         if (rc < 0)
2337                                 break;
2338                         if (rc > 0)
2339                                 race_counter = 0;
2340                         else
2341                                 race_counter++;
2342                 }
2343
2344                 /* attempt some inter-object balancing by issueing rpcs
2345                  * for each object in turn */
2346                 if (!list_empty(&loi->loi_cli_item))
2347                         list_del_init(&loi->loi_cli_item);
2348                 if (!list_empty(&loi->loi_write_item))
2349                         list_del_init(&loi->loi_write_item);
2350                 if (!list_empty(&loi->loi_read_item))
2351                         list_del_init(&loi->loi_read_item);
2352
2353                 loi_list_maint(cli, loi);
2354
2355                 /* send_oap_rpc fails with 0 when make_ready tells it to
2356                  * back off.  llite's make_ready does this when it tries
2357                  * to lock a page queued for write that is already locked.
2358                  * we want to try sending rpcs from many objects, but we
2359                  * don't want to spin failing with 0.  */
2360                 if (race_counter == 10)
2361                         break;
2362         }
2363         EXIT;
2364 }
2365
2366 /* we're trying to queue a page in the osc so we're subject to the
2367  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2368  * If the osc's queued pages are already at that limit, then we want to sleep
2369  * until there is space in the osc's queue for us.  We also may be waiting for
2370  * write credits from the OST if there are RPCs in flight that may return some
2371  * before we fall back to sync writes.
2372  *
2373  * We need this know our allocation was granted in the presence of signals */
2374 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2375 {
2376         int rc;
2377         ENTRY;
2378         client_obd_list_lock(&cli->cl_loi_list_lock);
2379         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2380         client_obd_list_unlock(&cli->cl_loi_list_lock);
2381         RETURN(rc);
2382 };
2383
2384 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2385  * grant or cache space. */
2386 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2387                            struct osc_async_page *oap)
2388 {
2389         struct osc_cache_waiter ocw;
2390         struct l_wait_info lwi = { 0 };
2391
2392         ENTRY;
2393
2394         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2395                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2396                cli->cl_dirty_max, obd_max_dirty_pages,
2397                cli->cl_lost_grant, cli->cl_avail_grant);
2398
2399         /* force the caller to try sync io.  this can jump the list
2400          * of queued writes and create a discontiguous rpc stream */
2401         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2402             loi->loi_ar.ar_force_sync)
2403                 RETURN(-EDQUOT);
2404
2405         /* Hopefully normal case - cache space and write credits available */
2406         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2407             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2408             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2409                 /* account for ourselves */
2410                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2411                 RETURN(0);
2412         }
2413
2414         /* Make sure that there are write rpcs in flight to wait for.  This
2415          * is a little silly as this object may not have any pending but
2416          * other objects sure might. */
2417         if (cli->cl_w_in_flight) {
2418                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2419                 cfs_waitq_init(&ocw.ocw_waitq);
2420                 ocw.ocw_oap = oap;
2421                 ocw.ocw_rc = 0;
2422
2423                 loi_list_maint(cli, loi);
2424                 osc_check_rpcs(cli);
2425                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2426
2427                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2428                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2429
2430                 client_obd_list_lock(&cli->cl_loi_list_lock);
2431                 if (!list_empty(&ocw.ocw_entry)) {
2432                         list_del(&ocw.ocw_entry);
2433                         RETURN(-EINTR);
2434                 }
2435                 RETURN(ocw.ocw_rc);
2436         }
2437
2438         RETURN(-EDQUOT);
2439 }
2440
2441 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2442                         struct lov_oinfo *loi, cfs_page_t *page,
2443                         obd_off offset, struct obd_async_page_ops *ops,
2444                         void *data, void **res)
2445 {
2446         struct osc_async_page *oap;
2447         ENTRY;
2448
2449         if (!page)
2450                 return size_round(sizeof(*oap));
2451
2452         oap = *res;
2453         oap->oap_magic = OAP_MAGIC;
2454         oap->oap_cli = &exp->exp_obd->u.cli;
2455         oap->oap_loi = loi;
2456
2457         oap->oap_caller_ops = ops;
2458         oap->oap_caller_data = data;
2459
2460         oap->oap_page = page;
2461         oap->oap_obj_off = offset;
2462
2463         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2464         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2465         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2466
2467         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2468
2469         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2470         RETURN(0);
2471 }
2472
2473 struct osc_async_page *oap_from_cookie(void *cookie)
2474 {
2475         struct osc_async_page *oap = cookie;
2476         if (oap->oap_magic != OAP_MAGIC)
2477                 return ERR_PTR(-EINVAL);
2478         return oap;
2479 };
2480
2481 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2482                               struct lov_oinfo *loi, void *cookie,
2483                               int cmd, obd_off off, int count,
2484                               obd_flag brw_flags, enum async_flags async_flags)
2485 {
2486         struct client_obd *cli = &exp->exp_obd->u.cli;
2487         struct osc_async_page *oap;
2488         int rc = 0;
2489         ENTRY;
2490
2491         oap = oap_from_cookie(cookie);
2492         if (IS_ERR(oap))
2493                 RETURN(PTR_ERR(oap));
2494
2495         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2496                 RETURN(-EIO);
2497
2498         if (!list_empty(&oap->oap_pending_item) ||
2499             !list_empty(&oap->oap_urgent_item) ||
2500             !list_empty(&oap->oap_rpc_item))
2501                 RETURN(-EBUSY);
2502
2503         /* check if the file's owner/group is over quota */
2504 #ifdef HAVE_QUOTA_SUPPORT
2505         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2506                 struct obd_async_page_ops *ops;
2507                 struct obdo *oa;
2508
2509                 OBDO_ALLOC(oa);
2510                 if (oa == NULL)
2511                         RETURN(-ENOMEM);
2512
2513                 ops = oap->oap_caller_ops;
2514                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2515                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2516                     NO_QUOTA)
2517                         rc = -EDQUOT;
2518
2519                 OBDO_FREE(oa);
2520                 if (rc)
2521                         RETURN(rc);
2522         }
2523 #endif
2524
2525         if (loi == NULL)
2526                 loi = lsm->lsm_oinfo[0];
2527
2528         client_obd_list_lock(&cli->cl_loi_list_lock);
2529
2530         oap->oap_cmd = cmd;
2531         oap->oap_page_off = off;
2532         oap->oap_count = count;
2533         oap->oap_brw_flags = brw_flags;
2534         oap->oap_async_flags = async_flags;
2535
2536         if (cmd & OBD_BRW_WRITE) {
2537                 rc = osc_enter_cache(cli, loi, oap);
2538                 if (rc) {
2539                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2540                         RETURN(rc);
2541                 }
2542         }
2543
2544         osc_oap_to_pending(oap);
2545         loi_list_maint(cli, loi);
2546
2547         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2548                   cmd);
2549
2550         osc_check_rpcs(cli);
2551         client_obd_list_unlock(&cli->cl_loi_list_lock);
2552
2553         RETURN(0);
2554 }
2555
2556 /* aka (~was & now & flag), but this is more clear :) */
2557 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2558
2559 static int osc_set_async_flags(struct obd_export *exp,
2560                                struct lov_stripe_md *lsm,
2561                                struct lov_oinfo *loi, void *cookie,
2562                                obd_flag async_flags)
2563 {
2564         struct client_obd *cli = &exp->exp_obd->u.cli;
2565         struct loi_oap_pages *lop;
2566         struct osc_async_page *oap;
2567         int rc = 0;
2568         ENTRY;
2569
2570         oap = oap_from_cookie(cookie);
2571         if (IS_ERR(oap))
2572                 RETURN(PTR_ERR(oap));
2573
2574         /*
2575          * bug 7311: OST-side locking is only supported for liblustre for now
2576          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2577          * implementation has to handle case where OST-locked page was picked
2578          * up by, e.g., ->writepage().
2579          */
2580         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2581         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2582                                      * tread here. */
2583
2584         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2585                 RETURN(-EIO);
2586
2587         if (loi == NULL)
2588                 loi = lsm->lsm_oinfo[0];
2589
2590         if (oap->oap_cmd & OBD_BRW_WRITE) {
2591                 lop = &loi->loi_write_lop;
2592         } else {
2593                 lop = &loi->loi_read_lop;
2594         }
2595
2596         client_obd_list_lock(&cli->cl_loi_list_lock);
2597
2598         if (list_empty(&oap->oap_pending_item))
2599                 GOTO(out, rc = -EINVAL);
2600
2601         if ((oap->oap_async_flags & async_flags) == async_flags)
2602                 GOTO(out, rc = 0);
2603
2604         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2605                 oap->oap_async_flags |= ASYNC_READY;
2606
2607         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2608                 if (list_empty(&oap->oap_rpc_item)) {
2609                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2610                         loi_list_maint(cli, loi);
2611                 }
2612         }
2613
2614         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2615                         oap->oap_async_flags);
2616 out:
2617         osc_check_rpcs(cli);
2618         client_obd_list_unlock(&cli->cl_loi_list_lock);
2619         RETURN(rc);
2620 }
2621
2622 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2623                              struct lov_oinfo *loi,
2624                              struct obd_io_group *oig, void *cookie,
2625                              int cmd, obd_off off, int count,
2626                              obd_flag brw_flags,
2627                              obd_flag async_flags)
2628 {
2629         struct client_obd *cli = &exp->exp_obd->u.cli;
2630         struct osc_async_page *oap;
2631         struct loi_oap_pages *lop;
2632         int rc = 0;
2633         ENTRY;
2634
2635         oap = oap_from_cookie(cookie);
2636         if (IS_ERR(oap))
2637                 RETURN(PTR_ERR(oap));
2638
2639         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2640                 RETURN(-EIO);
2641
2642         if (!list_empty(&oap->oap_pending_item) ||
2643             !list_empty(&oap->oap_urgent_item) ||
2644             !list_empty(&oap->oap_rpc_item))
2645                 RETURN(-EBUSY);
2646
2647         if (loi == NULL)
2648                 loi = lsm->lsm_oinfo[0];
2649
2650         client_obd_list_lock(&cli->cl_loi_list_lock);
2651
2652         oap->oap_cmd = cmd;
2653         oap->oap_page_off = off;
2654         oap->oap_count = count;
2655         oap->oap_brw_flags = brw_flags;
2656         oap->oap_async_flags = async_flags;
2657
2658         if (cmd & OBD_BRW_WRITE)
2659                 lop = &loi->loi_write_lop;
2660         else
2661                 lop = &loi->loi_read_lop;
2662
2663         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2664         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2665                 oap->oap_oig = oig;
2666                 rc = oig_add_one(oig, &oap->oap_occ);
2667         }
2668
2669         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2670                   oap, oap->oap_page, rc);
2671
2672         client_obd_list_unlock(&cli->cl_loi_list_lock);
2673
2674         RETURN(rc);
2675 }
2676
2677 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2678                                  struct loi_oap_pages *lop, int cmd)
2679 {
2680         struct list_head *pos, *tmp;
2681         struct osc_async_page *oap;
2682
2683         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2684                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2685                 list_del(&oap->oap_pending_item);
2686                 osc_oap_to_pending(oap);
2687         }
2688         loi_list_maint(cli, loi);
2689 }
2690
2691 static int osc_trigger_group_io(struct obd_export *exp,
2692                                 struct lov_stripe_md *lsm,
2693                                 struct lov_oinfo *loi,
2694                                 struct obd_io_group *oig)
2695 {
2696         struct client_obd *cli = &exp->exp_obd->u.cli;
2697         ENTRY;
2698
2699         if (loi == NULL)
2700                 loi = lsm->lsm_oinfo[0];
2701
2702         client_obd_list_lock(&cli->cl_loi_list_lock);
2703
2704         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2705         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2706
2707         osc_check_rpcs(cli);
2708         client_obd_list_unlock(&cli->cl_loi_list_lock);
2709
2710         RETURN(0);
2711 }
2712
2713 static int osc_teardown_async_page(struct obd_export *exp,
2714                                    struct lov_stripe_md *lsm,
2715                                    struct lov_oinfo *loi, void *cookie)
2716 {
2717         struct client_obd *cli = &exp->exp_obd->u.cli;
2718         struct loi_oap_pages *lop;
2719         struct osc_async_page *oap;
2720         int rc = 0;
2721         ENTRY;
2722
2723         oap = oap_from_cookie(cookie);
2724         if (IS_ERR(oap))
2725                 RETURN(PTR_ERR(oap));
2726
2727         if (loi == NULL)
2728                 loi = lsm->lsm_oinfo[0];
2729
2730         if (oap->oap_cmd & OBD_BRW_WRITE) {
2731                 lop = &loi->loi_write_lop;
2732         } else {
2733                 lop = &loi->loi_read_lop;
2734         }
2735
2736         client_obd_list_lock(&cli->cl_loi_list_lock);
2737
2738         if (!list_empty(&oap->oap_rpc_item))
2739                 GOTO(out, rc = -EBUSY);
2740
2741         osc_exit_cache(cli, oap, 0);
2742         osc_wake_cache_waiters(cli);
2743
2744         if (!list_empty(&oap->oap_urgent_item)) {
2745                 list_del_init(&oap->oap_urgent_item);
2746                 oap->oap_async_flags &= ~ASYNC_URGENT;
2747         }
2748         if (!list_empty(&oap->oap_pending_item)) {
2749                 list_del_init(&oap->oap_pending_item);
2750                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2751         }
2752         loi_list_maint(cli, loi);
2753
2754         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2755 out:
2756         client_obd_list_unlock(&cli->cl_loi_list_lock);
2757         RETURN(rc);
2758 }
2759
2760 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2761                                     int flags)
2762 {
2763         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2764
2765         if (lock == NULL) {
2766                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2767                 return;
2768         }
2769         lock_res_and_lock(lock);
2770 #ifdef __KERNEL__
2771 #ifdef __LINUX__
2772         /* Liang XXX: Darwin and Winnt checking should be added */
2773         if (lock->l_ast_data && lock->l_ast_data != data) {
2774                 struct inode *new_inode = data;
2775                 struct inode *old_inode = lock->l_ast_data;
2776                 if (!(old_inode->i_state & I_FREEING))
2777                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2778                 LASSERTF(old_inode->i_state & I_FREEING,
2779                          "Found existing inode %p/%lu/%u state %lu in lock: "
2780                          "setting data to %p/%lu/%u\n", old_inode,
2781                          old_inode->i_ino, old_inode->i_generation,
2782                          old_inode->i_state,
2783                          new_inode, new_inode->i_ino, new_inode->i_generation);
2784         }
2785 #endif
2786 #endif
2787         lock->l_ast_data = data;
2788         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2789         unlock_res_and_lock(lock);
2790         LDLM_LOCK_PUT(lock);
2791 }
2792
2793 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2794                              ldlm_iterator_t replace, void *data)
2795 {
2796         struct ldlm_res_id res_id = { .name = {0} };
2797         struct obd_device *obd = class_exp2obd(exp);
2798
2799         res_id.name[0] = lsm->lsm_object_id;
2800         res_id.name[2] = lsm->lsm_object_gr;
2801
2802         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2803         return 0;
2804 }
2805
2806 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2807                             int intent, int rc)
2808 {
2809         ENTRY;
2810
2811         if (intent) {
2812                 /* The request was created before ldlm_cli_enqueue call. */
2813                 if (rc == ELDLM_LOCK_ABORTED) {
2814                         struct ldlm_reply *rep;
2815
2816                         /* swabbed by ldlm_cli_enqueue() */
2817                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2818                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2819                                              sizeof(*rep));
2820                         LASSERT(rep != NULL);
2821                         if (rep->lock_policy_res1)
2822                                 rc = rep->lock_policy_res1;
2823                 }
2824         }
2825
2826         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2827                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2828                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2829                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2830                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2831         }
2832
2833         /* Call the update callback. */
2834         rc = oinfo->oi_cb_up(oinfo, rc);
2835         RETURN(rc);
2836 }
2837
2838 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2839                                  struct osc_enqueue_args *aa, int rc)
2840 {
2841         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2842         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2843         struct ldlm_lock *lock;
2844
2845         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2846          * be valid. */
2847         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2848
2849         /* Complete obtaining the lock procedure. */
2850         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2851                                    aa->oa_ei->ei_mode,
2852                                    &aa->oa_oi->oi_flags,
2853                                    &lsm->lsm_oinfo[0]->loi_lvb,
2854                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2855                                    lustre_swab_ost_lvb,
2856                                    aa->oa_oi->oi_lockh, rc);
2857
2858         /* Complete osc stuff. */
2859         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2860
2861         /* Release the lock for async request. */
2862         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2863                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2864
2865         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2866                  aa->oa_oi->oi_lockh, req, aa);
2867         LDLM_LOCK_PUT(lock);
2868         return rc;
2869 }
2870
2871 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2872  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2873  * other synchronous requests, however keeping some locks and trying to obtain
2874  * others may take a considerable amount of time in a case of ost failure; and
2875  * when other sync requests do not get released lock from a client, the client
2876  * is excluded from the cluster -- such scenarious make the life difficult, so
2877  * release locks just after they are obtained. */
2878 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2879                        struct ldlm_enqueue_info *einfo,
2880                        struct ptlrpc_request_set *rqset)
2881 {
2882         struct ldlm_res_id res_id = { .name = {0} };
2883         struct obd_device *obd = exp->exp_obd;
2884         struct ldlm_reply *rep;
2885         struct ptlrpc_request *req = NULL;
2886         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2887         int rc;
2888         ENTRY;
2889
2890         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2891         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2892
2893         /* Filesystem lock extents are extended to page boundaries so that
2894          * dealing with the page cache is a little smoother.  */
2895         oinfo->oi_policy.l_extent.start -=
2896                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2897         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2898
2899         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2900                 goto no_match;
2901
2902         /* Next, search for already existing extent locks that will cover us */
2903         rc = ldlm_lock_match(obd->obd_namespace,
2904                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2905                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2906                              oinfo->oi_lockh);
2907         if (rc == 1) {
2908                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2909                                         oinfo->oi_flags);
2910                 if (intent) {
2911                         /* I would like to be able to ASSERT here that rss <=
2912                          * kms, but I can't, for reasons which are explained in
2913                          * lov_enqueue() */
2914                 }
2915
2916                 /* We already have a lock, and it's referenced */
2917                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2918
2919                 /* For async requests, decref the lock. */
2920                 if (rqset)
2921                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2922
2923                 RETURN(ELDLM_OK);
2924         }
2925
2926         /* If we're trying to read, we also search for an existing PW lock.  The
2927          * VFS and page cache already protect us locally, so lots of readers/
2928          * writers can share a single PW lock.
2929          *
2930          * There are problems with conversion deadlocks, so instead of
2931          * converting a read lock to a write lock, we'll just enqueue a new
2932          * one.
2933          *
2934          * At some point we should cancel the read lock instead of making them
2935          * send us a blocking callback, but there are problems with canceling
2936          * locks out from other users right now, too. */
2937
2938         if (einfo->ei_mode == LCK_PR) {
2939                 rc = ldlm_lock_match(obd->obd_namespace,
2940                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2941                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2942                                      LCK_PW, oinfo->oi_lockh);
2943                 if (rc == 1) {
2944                         /* FIXME: This is not incredibly elegant, but it might
2945                          * be more elegant than adding another parameter to
2946                          * lock_match.  I want a second opinion. */
2947                         /* addref the lock only if not async requests. */
2948                         if (!rqset)
2949                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2950                         osc_set_data_with_check(oinfo->oi_lockh,
2951                                                 einfo->ei_cbdata,
2952                                                 oinfo->oi_flags);
2953                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2954                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2955                         RETURN(ELDLM_OK);
2956                 }
2957         }
2958
2959  no_match:
2960         if (intent) {
2961                 int size[3] = {
2962                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2963                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2964                         [DLM_LOCKREQ_OFF + 1] = 0 };
2965
2966                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2967                 if (req == NULL)
2968                         RETURN(-ENOMEM);
2969
2970                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2971                 size[DLM_REPLY_REC_OFF] =
2972                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2973                 ptlrpc_req_set_repsize(req, 3, size);
2974         }
2975
2976         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2977         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2978
2979         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2980                               &oinfo->oi_policy, &oinfo->oi_flags,
2981                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2982                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2983                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2984                               rqset ? 1 : 0);
2985         if (rqset) {
2986                 if (!rc) {
2987                         struct osc_enqueue_args *aa;
2988                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2989                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2990                         aa->oa_oi = oinfo;
2991                         aa->oa_ei = einfo;
2992                         aa->oa_exp = exp;
2993
2994                         req->rq_interpret_reply = osc_enqueue_interpret;
2995                         ptlrpc_set_add_req(rqset, req);
2996                 } else if (intent) {
2997                         ptlrpc_req_finished(req);
2998                 }
2999                 RETURN(rc);
3000         }
3001
3002         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3003         if (intent)
3004                 ptlrpc_req_finished(req);
3005
3006         RETURN(rc);
3007 }
3008
3009 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3010                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3011                      int *flags, void *data, struct lustre_handle *lockh)
3012 {
3013         struct ldlm_res_id res_id = { .name = {0} };
3014         struct obd_device *obd = exp->exp_obd;
3015         int rc;
3016         int lflags = *flags;
3017         ENTRY;
3018
3019         res_id.name[0] = lsm->lsm_object_id;
3020         res_id.name[2] = lsm->lsm_object_gr;
3021
3022         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3023
3024         /* Filesystem lock extents are extended to page boundaries so that
3025          * dealing with the page cache is a little smoother */
3026         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3027         policy->l_extent.end |= ~CFS_PAGE_MASK;
3028
3029         /* Next, search for already existing extent locks that will cover us */
3030         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3031                              &res_id, type, policy, mode, lockh);
3032         if (rc) {
3033                 //if (!(*flags & LDLM_FL_TEST_LOCK))
3034                         osc_set_data_with_check(lockh, data, lflags);
3035                 RETURN(rc);
3036         }
3037         /* If we're trying to read, we also search for an existing PW lock.  The
3038          * VFS and page cache already protect us locally, so lots of readers/
3039          * writers can share a single PW lock. */
3040         if (mode == LCK_PR) {
3041                 rc = ldlm_lock_match(obd->obd_namespace,
3042                                      lflags | LDLM_FL_LVB_READY, &res_id,
3043                                      type, policy, LCK_PW, lockh);
3044                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3045                         /* FIXME: This is not incredibly elegant, but it might
3046                          * be more elegant than adding another parameter to
3047                          * lock_match.  I want a second opinion. */
3048                         osc_set_data_with_check(lockh, data, lflags);
3049                         ldlm_lock_addref(lockh, LCK_PR);
3050                         ldlm_lock_decref(lockh, LCK_PW);
3051                 }
3052         }
3053         RETURN(rc);
3054 }
3055
3056 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3057                       __u32 mode, struct lustre_handle *lockh)
3058 {
3059         ENTRY;
3060
3061         if (unlikely(mode == LCK_GROUP))
3062                 ldlm_lock_decref_and_cancel(lockh, mode);
3063         else
3064                 ldlm_lock_decref(lockh, mode);
3065
3066         RETURN(0);
3067 }
3068
3069 static int osc_cancel_unused(struct obd_export *exp,
3070                              struct lov_stripe_md *lsm, int flags,
3071                              void *opaque)
3072 {
3073         struct obd_device *obd = class_exp2obd(exp);
3074         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3075
3076         if (lsm != NULL) {
3077                 res_id.name[0] = lsm->lsm_object_id;
3078                 res_id.name[2] = lsm->lsm_object_gr;
3079                 resp = &res_id;
3080         }
3081
3082         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3083 }
3084
3085 static int osc_join_lru(struct obd_export *exp,
3086                         struct lov_stripe_md *lsm, int join)
3087 {
3088         struct obd_device *obd = class_exp2obd(exp);
3089         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3090
3091         if (lsm != NULL) {
3092                 res_id.name[0] = lsm->lsm_object_id;
3093                 res_id.name[2] = lsm->lsm_object_gr;
3094                 resp = &res_id;
3095         }
3096
3097         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3098 }
3099
3100 static int osc_statfs_interpret(struct ptlrpc_request *req,
3101                                 struct osc_async_args *aa, int rc)
3102 {
3103         struct obd_statfs *msfs;
3104         ENTRY;
3105
3106         if (rc != 0)
3107                 GOTO(out, rc);
3108
3109         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3110                                   lustre_swab_obd_statfs);
3111         if (msfs == NULL) {
3112                 CERROR("Can't unpack obd_statfs\n");
3113                 GOTO(out, rc = -EPROTO);
3114         }
3115
3116         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3117 out:
3118         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3119         RETURN(rc);
3120 }
3121
3122 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3123                             __u64 max_age, struct ptlrpc_request_set *rqset)
3124 {
3125         struct ptlrpc_request *req;
3126         struct osc_async_args *aa;
3127         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3128         ENTRY;
3129
3130         /* We could possibly pass max_age in the request (as an absolute
3131          * timestamp or a "seconds.usec ago") so the target can avoid doing
3132          * extra calls into the filesystem if that isn't necessary (e.g.
3133          * during mount that would help a bit).  Having relative timestamps
3134          * is not so great if request processing is slow, while absolute
3135          * timestamps are not ideal because they need time synchronization. */
3136         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3137                               OST_STATFS, 1, NULL, NULL);
3138         if (!req)
3139                 RETURN(-ENOMEM);
3140
3141         ptlrpc_req_set_repsize(req, 2, size);
3142         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3143
3144         req->rq_interpret_reply = osc_statfs_interpret;
3145         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3146         aa = (struct osc_async_args *)&req->rq_async_args;
3147         aa->aa_oi = oinfo;
3148
3149         ptlrpc_set_add_req(rqset, req);
3150         RETURN(0);
3151 }
3152
3153 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3154                       __u64 max_age)
3155 {
3156         struct obd_statfs *msfs;
3157         struct ptlrpc_request *req;
3158         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3159         ENTRY;
3160
3161         /* We could possibly pass max_age in the request (as an absolute
3162          * timestamp or a "seconds.usec ago") so the target can avoid doing
3163          * extra calls into the filesystem if that isn't necessary (e.g.
3164          * during mount that would help a bit).  Having relative timestamps
3165          * is not so great if request processing is slow, while absolute
3166          * timestamps are not ideal because they need time synchronization. */
3167         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3168                               OST_STATFS, 1, NULL, NULL);
3169         if (!req)
3170                 RETURN(-ENOMEM);
3171
3172         ptlrpc_req_set_repsize(req, 2, size);
3173         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3174
3175         rc = ptlrpc_queue_wait(req);
3176         if (rc)
3177                 GOTO(out, rc);
3178
3179         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3180                                   lustre_swab_obd_statfs);
3181         if (msfs == NULL) {
3182                 CERROR("Can't unpack obd_statfs\n");
3183                 GOTO(out, rc = -EPROTO);
3184         }
3185
3186         memcpy(osfs, msfs, sizeof(*osfs));
3187
3188         EXIT;
3189  out:
3190         ptlrpc_req_finished(req);
3191         return rc;
3192 }
3193
3194 /* Retrieve object striping information.
3195  *
3196  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3197  * the maximum number of OST indices which will fit in the user buffer.
3198  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3199  */
3200 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3201 {
3202         struct lov_user_md lum, *lumk;
3203         int rc = 0, lum_size;
3204         ENTRY;
3205
3206         if (!lsm)
3207                 RETURN(-ENODATA);
3208
3209         if (copy_from_user(&lum, lump, sizeof(lum)))
3210                 RETURN(-EFAULT);
3211
3212         if (lum.lmm_magic != LOV_USER_MAGIC)
3213                 RETURN(-EINVAL);
3214
3215         if (lum.lmm_stripe_count > 0) {
3216                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3217                 OBD_ALLOC(lumk, lum_size);
3218                 if (!lumk)
3219                         RETURN(-ENOMEM);
3220
3221                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3222                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3223         } else {
3224                 lum_size = sizeof(lum);
3225                 lumk = &lum;
3226         }
3227
3228         lumk->lmm_object_id = lsm->lsm_object_id;
3229         lumk->lmm_object_gr = lsm->lsm_object_gr;
3230         lumk->lmm_stripe_count = 1;
3231
3232         if (copy_to_user(lump, lumk, lum_size))
3233                 rc = -EFAULT;
3234
3235         if (lumk != &lum)
3236                 OBD_FREE(lumk, lum_size);
3237
3238         RETURN(rc);
3239 }
3240
3241
3242 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3243                          void *karg, void *uarg)
3244 {
3245         struct obd_device *obd = exp->exp_obd;
3246         struct obd_ioctl_data *data = karg;
3247         int err = 0;
3248         ENTRY;
3249
3250 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3251         MOD_INC_USE_COUNT;
3252 #else
3253         if (!try_module_get(THIS_MODULE)) {
3254                 CERROR("Can't get module. Is it alive?");
3255                 return -EINVAL;
3256         }
3257 #endif
3258         switch (cmd) {
3259         case OBD_IOC_LOV_GET_CONFIG: {
3260                 char *buf;
3261                 struct lov_desc *desc;
3262                 struct obd_uuid uuid;
3263
3264                 buf = NULL;
3265                 len = 0;
3266                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3267                         GOTO(out, err = -EINVAL);
3268
3269                 data = (struct obd_ioctl_data *)buf;
3270
3271                 if (sizeof(*desc) > data->ioc_inllen1) {
3272                         obd_ioctl_freedata(buf, len);
3273                         GOTO(out, err = -EINVAL);
3274                 }
3275
3276                 if (data->ioc_inllen2 < sizeof(uuid)) {
3277                         obd_ioctl_freedata(buf, len);
3278                         GOTO(out, err = -EINVAL);
3279                 }
3280
3281                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3282                 desc->ld_tgt_count = 1;
3283                 desc->ld_active_tgt_count = 1;
3284                 desc->ld_default_stripe_count = 1;
3285                 desc->ld_default_stripe_size = 0;
3286                 desc->ld_default_stripe_offset = 0;
3287                 desc->ld_pattern = 0;
3288                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3289
3290                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3291
3292                 err = copy_to_user((void *)uarg, buf, len);
3293                 if (err)
3294                         err = -EFAULT;
3295                 obd_ioctl_freedata(buf, len);
3296                 GOTO(out, err);
3297         }
3298         case LL_IOC_LOV_SETSTRIPE:
3299                 err = obd_alloc_memmd(exp, karg);
3300                 if (err > 0)
3301                         err = 0;
3302                 GOTO(out, err);
3303         case LL_IOC_LOV_GETSTRIPE:
3304                 err = osc_getstripe(karg, uarg);
3305                 GOTO(out, err);
3306         case OBD_IOC_CLIENT_RECOVER:
3307                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3308                                             data->ioc_inlbuf1);
3309                 if (err > 0)
3310                         err = 0;
3311                 GOTO(out, err);
3312         case IOC_OSC_SET_ACTIVE:
3313                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3314                                                data->ioc_offset);
3315                 GOTO(out, err);
3316         case OBD_IOC_POLL_QUOTACHECK:
3317                 err = lquota_poll_check(quota_interface, exp,
3318                                         (struct if_quotacheck *)karg);
3319                 GOTO(out, err);
3320         default:
3321                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3322                        cmd, cfs_curproc_comm());
3323                 GOTO(out, err = -ENOTTY);
3324         }
3325 out:
3326 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3327         MOD_DEC_USE_COUNT;
3328 #else
3329         module_put(THIS_MODULE);
3330 #endif
3331         return err;
3332 }
3333
3334 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3335                         void *key, __u32 *vallen, void *val)
3336 {
3337         ENTRY;
3338         if (!vallen || !val)
3339                 RETURN(-EFAULT);
3340
3341         if (keylen > strlen("lock_to_stripe") &&
3342             strcmp(key, "lock_to_stripe") == 0) {
3343                 __u32 *stripe = val;
3344                 *vallen = sizeof(*stripe);
3345                 *stripe = 0;
3346                 RETURN(0);
3347         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3348                 struct ptlrpc_request *req;
3349                 obd_id *reply;
3350                 char *bufs[2] = { NULL, key };
3351                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3352
3353                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3354                                       OST_GET_INFO, 2, size, bufs);
3355                 if (req == NULL)
3356                         RETURN(-ENOMEM);
3357
3358                 size[REPLY_REC_OFF] = *vallen;
3359                 ptlrpc_req_set_repsize(req, 2, size);
3360                 rc = ptlrpc_queue_wait(req);
3361                 if (rc)
3362                         GOTO(out, rc);
3363
3364                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3365                                            lustre_swab_ost_last_id);
3366                 if (reply == NULL) {
3367                         CERROR("Can't unpack OST last ID\n");
3368                         GOTO(out, rc = -EPROTO);
3369                 }
3370                 *((obd_id *)val) = *reply;
3371         out:
3372                 ptlrpc_req_finished(req);
3373                 RETURN(rc);
3374         }
3375         RETURN(-EINVAL);
3376 }
3377
3378 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3379                                           void *aa, int rc)
3380 {
3381         struct llog_ctxt *ctxt;
3382         struct obd_import *imp = req->rq_import;
3383         ENTRY;
3384
3385         if (rc != 0)
3386                 RETURN(rc);
3387
3388         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3389         if (ctxt) {
3390                 if (rc == 0)
3391                         rc = llog_initiator_connect(ctxt);
3392                 else
3393                         CERROR("cannot establish connection for "
3394                                "ctxt %p: %d\n", ctxt, rc);
3395         }
3396
3397         spin_lock(&imp->imp_lock);
3398         imp->imp_server_timeout = 1;
3399         imp->imp_pingable = 1;
3400         spin_unlock(&imp->imp_lock);
3401         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3402
3403         RETURN(rc);
3404 }
3405
3406 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3407                               void *key, obd_count vallen, void *val,
3408                               struct ptlrpc_request_set *set)
3409 {
3410         struct ptlrpc_request *req;
3411         struct obd_device  *obd = exp->exp_obd;
3412         struct obd_import *imp = class_exp2cliimp(exp);
3413         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3414         char *bufs[3] = { NULL, key, val };
3415         ENTRY;
3416
3417         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3418
3419         if (KEY_IS(KEY_NEXT_ID)) {
3420                 if (vallen != sizeof(obd_id))
3421                         RETURN(-EINVAL);
3422                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3423                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3424                        exp->exp_obd->obd_name,
3425                        obd->u.cli.cl_oscc.oscc_next_id);
3426
3427                 RETURN(0);
3428         }
3429
3430         if (KEY_IS("unlinked")) {
3431                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3432                 spin_lock(&oscc->oscc_lock);
3433                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3434                 spin_unlock(&oscc->oscc_lock);
3435                 RETURN(0);
3436         }
3437
3438         if (KEY_IS(KEY_INIT_RECOV)) {
3439                 if (vallen != sizeof(int))
3440                         RETURN(-EINVAL);
3441                 spin_lock(&imp->imp_lock);
3442                 imp->imp_initial_recov = *(int *)val;
3443                 spin_unlock(&imp->imp_lock);
3444                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3445                        exp->exp_obd->obd_name,
3446                        imp->imp_initial_recov);
3447                 RETURN(0);
3448         }
3449
3450         if (KEY_IS("checksum")) {
3451                 if (vallen != sizeof(int))
3452                         RETURN(-EINVAL);
3453                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3454                 RETURN(0);
3455         }
3456
3457         if (KEY_IS(KEY_FLUSH_CTX)) {
3458                 sptlrpc_import_flush_my_ctx(imp);
3459                 RETURN(0);
3460         }
3461
3462         if (!set)
3463                 RETURN(-EINVAL);
3464
3465         /* We pass all other commands directly to OST. Since nobody calls osc
3466            methods directly and everybody is supposed to go through LOV, we
3467            assume lov checked invalid values for us.
3468            The only recognised values so far are evict_by_nid and mds_conn.
3469            Even if something bad goes through, we'd get a -EINVAL from OST
3470            anyway. */
3471
3472         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3473                               bufs);
3474         if (req == NULL)
3475                 RETURN(-ENOMEM);
3476
3477         if (KEY_IS("mds_conn")) {
3478                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3479
3480                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3481                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3482                 LASSERT(oscc->oscc_oa.o_gr > 0);
3483                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3484         }
3485
3486         ptlrpc_req_set_repsize(req, 1, NULL);
3487         ptlrpc_set_add_req(set, req);
3488         ptlrpc_check_set(set);
3489
3490         RETURN(0);
3491 }
3492
3493
3494 static struct llog_operations osc_size_repl_logops = {
3495         lop_cancel: llog_obd_repl_cancel
3496 };
3497
3498 static struct llog_operations osc_mds_ost_orig_logops;
3499 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3500                          struct obd_device *tgt, int count,
3501                          struct llog_catid *catid, struct obd_uuid *uuid)
3502 {
3503         int rc;
3504         ENTRY;
3505
3506         spin_lock(&obd->obd_dev_lock);
3507         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3508                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3509                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3510                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3511                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3512                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3513         }
3514         spin_unlock(&obd->obd_dev_lock);
3515
3516         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3517                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3518         if (rc) {
3519                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3520                 GOTO (out, rc);
3521         }
3522
3523         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3524                         &osc_size_repl_logops);
3525         if (rc)
3526                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3527 out:
3528         if (rc) {
3529                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3530                        obd->obd_name, tgt->obd_name, count, catid, rc);
3531                 CERROR("logid "LPX64":0x%x\n",
3532                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3533         }
3534         RETURN(rc);
3535 }
3536
3537 static int osc_llog_finish(struct obd_device *obd, int count)
3538 {
3539         struct llog_ctxt *ctxt;
3540         int rc = 0, rc2 = 0;
3541         ENTRY;
3542
3543         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3544         if (ctxt)
3545                 rc = llog_cleanup(ctxt);
3546
3547         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3548         if (ctxt)
3549                 rc2 = llog_cleanup(ctxt);
3550         if (!rc)
3551                 rc = rc2;
3552
3553         RETURN(rc);
3554 }
3555
3556 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3557                          struct obd_uuid *cluuid,
3558                          struct obd_connect_data *data)
3559 {
3560         struct client_obd *cli = &obd->u.cli;
3561
3562         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3563                 long lost_grant;
3564
3565                 client_obd_list_lock(&cli->cl_loi_list_lock);
3566                 data->ocd_grant = cli->cl_avail_grant ?:
3567                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3568                 lost_grant = cli->cl_lost_grant;
3569                 cli->cl_lost_grant = 0;
3570                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3571
3572                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3573                        "cl_lost_grant: %ld\n", data->ocd_grant,
3574                        cli->cl_avail_grant, lost_grant);
3575                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3576                        " ocd_grant: %d\n", data->ocd_connect_flags,
3577                        data->ocd_version, data->ocd_grant);
3578         }
3579
3580         RETURN(0);
3581 }
3582
3583 static int osc_disconnect(struct obd_export *exp)
3584 {
3585         struct obd_device *obd = class_exp2obd(exp);
3586         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3587         int rc;
3588
3589         if (obd->u.cli.cl_conn_count == 1)
3590                 /* flush any remaining cancel messages out to the target */
3591                 llog_sync(ctxt, exp);
3592
3593         rc = client_disconnect_export(exp);
3594         return rc;
3595 }
3596
3597 static int osc_import_event(struct obd_device *obd,
3598                             struct obd_import *imp,
3599                             enum obd_import_event event)
3600 {
3601         struct client_obd *cli;
3602         int rc = 0;
3603
3604         ENTRY;
3605         LASSERT(imp->imp_obd == obd);
3606
3607         switch (event) {
3608         case IMP_EVENT_DISCON: {
3609                 /* Only do this on the MDS OSC's */
3610                 if (imp->imp_server_timeout) {
3611                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3612
3613                         spin_lock(&oscc->oscc_lock);
3614                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3615                         spin_unlock(&oscc->oscc_lock);
3616                 }
3617                 cli = &obd->u.cli;
3618                 client_obd_list_lock(&cli->cl_loi_list_lock);
3619                 cli->cl_avail_grant = 0;
3620                 cli->cl_lost_grant = 0;
3621                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3622                 break;
3623         }
3624         case IMP_EVENT_INACTIVE: {
3625                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3626                 break;
3627         }
3628         case IMP_EVENT_INVALIDATE: {
3629                 struct ldlm_namespace *ns = obd->obd_namespace;
3630
3631                 /* Reset grants */
3632                 cli = &obd->u.cli;
3633                 client_obd_list_lock(&cli->cl_loi_list_lock);
3634                 /* all pages go to failing rpcs due to the invalid import */
3635                 osc_check_rpcs(cli);
3636                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3637
3638                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3639
3640                 break;
3641         }
3642         case IMP_EVENT_ACTIVE: {
3643                 /* Only do this on the MDS OSC's */
3644                 if (imp->imp_server_timeout) {
3645                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3646
3647                         spin_lock(&oscc->oscc_lock);
3648                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3649                         spin_unlock(&oscc->oscc_lock);
3650                 }
3651                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3652                 break;
3653         }
3654         case IMP_EVENT_OCD: {
3655                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3656
3657                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3658                         osc_init_grant(&obd->u.cli, ocd);
3659
3660                 /* See bug 7198 */
3661                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3662                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3663
3664                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3665                 break;
3666         }
3667         default:
3668                 CERROR("Unknown import event %d\n", event);
3669                 LBUG();
3670         }
3671         RETURN(rc);
3672 }
3673
3674 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3675 {
3676         int rc;
3677         ENTRY;
3678
3679         ENTRY;
3680         rc = ptlrpcd_addref();
3681         if (rc)
3682                 RETURN(rc);
3683
3684         rc = client_obd_setup(obd, lcfg);
3685         if (rc) {
3686                 ptlrpcd_decref();
3687         } else {
3688                 struct lprocfs_static_vars lvars;
3689                 struct client_obd *cli = &obd->u.cli;
3690
3691                 lprocfs_init_vars(osc, &lvars);
3692                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3693                         lproc_osc_attach_seqstat(obd);
3694                         ptlrpc_lprocfs_register_obd(obd);
3695                 }
3696
3697                 oscc_init(obd);
3698                 /* We need to allocate a few requests more, because
3699                    brw_interpret_oap tries to create new requests before freeing
3700                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3701                    reserved, but I afraid that might be too much wasted RAM
3702                    in fact, so 2 is just my guess and still should work. */
3703                 cli->cl_import->imp_rq_pool =
3704                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3705                                             OST_MAXREQSIZE,
3706                                             ptlrpc_add_rqs_to_pool);
3707         }
3708
3709         RETURN(rc);
3710 }
3711
3712 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3713 {
3714         int rc = 0;
3715         ENTRY;
3716
3717         switch (stage) {
3718         case OBD_CLEANUP_EARLY: {
3719                 struct obd_import *imp;
3720                 imp = obd->u.cli.cl_import;
3721                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3722                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3723                 ptlrpc_deactivate_import(imp);
3724                 spin_lock(&imp->imp_lock);
3725                 imp->imp_pingable = 0;
3726                 spin_unlock(&imp->imp_lock);
3727                 break;
3728         }
3729         case OBD_CLEANUP_EXPORTS: {
3730                 /* If we set up but never connected, the
3731                    client import will not have been cleaned. */
3732                 if (obd->u.cli.cl_import) {
3733                         struct obd_import *imp;
3734                         imp = obd->u.cli.cl_import;
3735                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3736                                obd->obd_name);
3737                         ptlrpc_invalidate_import(imp);
3738                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3739                         class_destroy_import(imp);
3740                         obd->u.cli.cl_import = NULL;
3741                 }
3742                 break;
3743         }
3744         case OBD_CLEANUP_SELF_EXP:
3745                 rc = obd_llog_finish(obd, 0);
3746                 if (rc != 0)
3747                         CERROR("failed to cleanup llogging subsystems\n");
3748                 break;
3749         case OBD_CLEANUP_OBD:
3750                 break;
3751         }
3752         RETURN(rc);
3753 }
3754
3755 int osc_cleanup(struct obd_device *obd)
3756 {
3757         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3758         int rc;
3759
3760         ENTRY;
3761         ptlrpc_lprocfs_unregister_obd(obd);
3762         lprocfs_obd_cleanup(obd);
3763
3764         spin_lock(&oscc->oscc_lock);
3765         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3766         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3767         spin_unlock(&oscc->oscc_lock);
3768
3769         /* free memory of osc quota cache */
3770         lquota_cleanup(quota_interface, obd);
3771
3772         rc = client_obd_cleanup(obd);
3773
3774         ptlrpcd_decref();
3775         RETURN(rc);
3776 }
3777
3778 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3779 {
3780         struct lustre_cfg *lcfg = buf;
3781         struct lprocfs_static_vars lvars;
3782         int rc = 0;
3783
3784         lprocfs_init_vars(osc, &lvars);
3785
3786         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3787         return(rc);
3788 }
3789
3790 struct obd_ops osc_obd_ops = {
3791         .o_owner                = THIS_MODULE,
3792         .o_setup                = osc_setup,
3793         .o_precleanup           = osc_precleanup,
3794         .o_cleanup              = osc_cleanup,
3795         .o_add_conn             = client_import_add_conn,
3796         .o_del_conn             = client_import_del_conn,
3797         .o_connect              = client_connect_import,
3798         .o_reconnect            = osc_reconnect,
3799         .o_disconnect           = osc_disconnect,
3800         .o_statfs               = osc_statfs,
3801         .o_statfs_async         = osc_statfs_async,
3802         .o_packmd               = osc_packmd,
3803         .o_unpackmd             = osc_unpackmd,
3804         .o_precreate            = osc_precreate,
3805         .o_create               = osc_create,
3806         .o_destroy              = osc_destroy,
3807         .o_getattr              = osc_getattr,
3808         .o_getattr_async        = osc_getattr_async,
3809         .o_setattr              = osc_setattr,
3810         .o_setattr_async        = osc_setattr_async,
3811         .o_brw                  = osc_brw,
3812         .o_brw_async            = osc_brw_async,
3813         .o_prep_async_page      = osc_prep_async_page,
3814         .o_queue_async_io       = osc_queue_async_io,
3815         .o_set_async_flags      = osc_set_async_flags,
3816         .o_queue_group_io       = osc_queue_group_io,
3817         .o_trigger_group_io     = osc_trigger_group_io,
3818         .o_teardown_async_page  = osc_teardown_async_page,
3819         .o_punch                = osc_punch,
3820         .o_sync                 = osc_sync,
3821         .o_enqueue              = osc_enqueue,
3822         .o_match                = osc_match,
3823         .o_change_cbdata        = osc_change_cbdata,
3824         .o_cancel               = osc_cancel,
3825         .o_cancel_unused        = osc_cancel_unused,
3826         .o_join_lru             = osc_join_lru,
3827         .o_iocontrol            = osc_iocontrol,
3828         .o_get_info             = osc_get_info,
3829         .o_set_info_async       = osc_set_info_async,
3830         .o_import_event         = osc_import_event,
3831         .o_llog_init            = osc_llog_init,
3832         .o_llog_finish          = osc_llog_finish,
3833         .o_process_config       = osc_process_config,
3834 };
3835 int __init osc_init(void)
3836 {
3837         struct lprocfs_static_vars lvars;
3838         int rc;
3839         ENTRY;
3840
3841         lprocfs_init_vars(osc, &lvars);
3842
3843         request_module("lquota");
3844         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3845         lquota_init(quota_interface);
3846         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3847
3848         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3849                                  LUSTRE_OSC_NAME, NULL);
3850         if (rc) {
3851                 if (quota_interface)
3852                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3853                 RETURN(rc);
3854         }
3855
3856         RETURN(rc);
3857 }
3858
3859 #ifdef __KERNEL__
3860 static void /*__exit*/ osc_exit(void)
3861 {
3862         lquota_exit(quota_interface);
3863         if (quota_interface)
3864                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3865
3866         class_unregister_type(LUSTRE_OSC_NAME);
3867 }
3868
3869 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3870 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3871 MODULE_LICENSE("GPL");
3872
3873 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3874 #endif