Whamcloud - gitweb
improve handling recoverable errors
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* by default 10s */
67 atomic_t osc_resend_time; 
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT(lsm->lsm_object_gr);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof (*lmm)) {
111                         CERROR("lov_mds_md too small: %d, need %d\n",
112                                lmm_bytes, (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (lmm->lmm_object_id == 0) {
118                         CERROR("lov_mds_md: zero lmm_object_id\n");
119                         RETURN(-EINVAL);
120                 }
121         }
122
123         lsm_size = lov_stripe_md_size(1);
124         if (lsmp == NULL)
125                 RETURN(lsm_size);
126
127         if (*lsmp != NULL && lmm == NULL) {
128                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129                 OBD_FREE(*lsmp, lsm_size);
130                 *lsmp = NULL;
131                 RETURN(0);
132         }
133
134         if (*lsmp == NULL) {
135                 OBD_ALLOC(*lsmp, lsm_size);
136                 if (*lsmp == NULL)
137                         RETURN(-ENOMEM);
138                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140                         OBD_FREE(*lsmp, lsm_size);
141                         RETURN(-ENOMEM);
142                 }
143                 loi_init((*lsmp)->lsm_oinfo[0]);
144         }
145
146         if (lmm != NULL) {
147                 /* XXX zero *lsmp? */
148                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150                 LASSERT((*lsmp)->lsm_object_id);
151                 LASSERT((*lsmp)->lsm_object_gr);
152         }
153
154         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155
156         RETURN(lsm_size);
157 }
158
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160                                  struct ost_body *body, void *capa)
161 {
162         struct obd_capa *oc = (struct obd_capa *)capa;
163         struct lustre_capa *c;
164
165         if (!capa)
166                 return;
167
168         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
169         LASSERT(c);
170         capa_cpy(c, oc);
171         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172         DEBUG_CAPA(D_SEC, c, "pack");
173 }
174
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176                                      struct obd_info *oinfo)
177 {
178         struct ost_body *body;
179
180         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181         body->oa = *oinfo->oi_oa;
182         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
183 }
184
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186                                  struct osc_async_args *aa, int rc)
187 {
188         struct ost_body *body;
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195                                   lustre_swab_ost_body);
196         if (body) {
197                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
199
200                 /* This should really be sent by the OST */
201                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
203         } else {
204                 CERROR("can't unpack ost_body\n");
205                 rc = -EPROTO;
206                 aa->aa_oi->oi_oa->o_valid = 0;
207         }
208 out:
209         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210         RETURN(rc);
211 }
212
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214                              struct ptlrpc_request_set *set)
215 {
216         struct ptlrpc_request *req;
217         struct ost_body *body;
218         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219         struct osc_async_args *aa;
220         ENTRY;
221
222         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224                               OST_GETATTR, 3, size,NULL);
225         if (!req)
226                 RETURN(-ENOMEM);
227
228         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
229
230         ptlrpc_req_set_repsize(req, 2, size);
231         req->rq_interpret_reply = osc_getattr_interpret;
232
233         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234         aa = (struct osc_async_args *)&req->rq_async_args;
235         aa->aa_oi = oinfo;
236
237         ptlrpc_set_add_req(set, req);
238         RETURN (0);
239 }
240
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
242 {
243         struct ptlrpc_request *req;
244         struct ost_body *body;
245         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
246         ENTRY;
247
248         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250                               OST_GETATTR, 3, size, NULL);
251         if (!req)
252                 RETURN(-ENOMEM);
253
254         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
255
256         ptlrpc_req_set_repsize(req, 2, size);
257
258         rc = ptlrpc_queue_wait(req);
259         if (rc) {
260                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261                 GOTO(out, rc);
262         }
263
264         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265                                   lustre_swab_ost_body);
266         if (body == NULL) {
267                 CERROR ("can't unpack ost_body\n");
268                 GOTO (out, rc = -EPROTO);
269         }
270
271         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272         *oinfo->oi_oa = body->oa;
273
274         /* This should really be sent by the OST */
275         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277
278         EXIT;
279  out:
280         ptlrpc_req_finished(req);
281         return rc;
282 }
283
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285                        struct obd_trans_info *oti)
286 {
287         struct ptlrpc_request *req;
288         struct ost_body *body;
289         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
290         ENTRY;
291
292         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293                                         oinfo->oi_oa->o_gr > 0);
294         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296                               OST_SETATTR, 3, size, NULL);
297         if (!req)
298                 RETURN(-ENOMEM);
299
300         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
301
302         ptlrpc_req_set_repsize(req, 2, size);
303
304         rc = ptlrpc_queue_wait(req);
305         if (rc)
306                 GOTO(out, rc);
307
308         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309                                   lustre_swab_ost_body);
310         if (body == NULL)
311                 GOTO(out, rc = -EPROTO);
312
313         *oinfo->oi_oa = body->oa;
314
315         EXIT;
316 out:
317         ptlrpc_req_finished(req);
318         RETURN(rc);
319 }
320
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322                                  struct osc_async_args *aa, int rc)
323 {
324         struct ost_body *body;
325         ENTRY;
326
327         if (rc != 0)
328                 GOTO(out, rc);
329
330         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331                                   lustre_swab_ost_body);
332         if (body == NULL) {
333                 CERROR("can't unpack ost_body\n");
334                 GOTO(out, rc = -EPROTO);
335         }
336
337         *aa->aa_oi->oi_oa = body->oa;
338 out:
339         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340         RETURN(rc);
341 }
342
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344                              struct obd_trans_info *oti,
345                              struct ptlrpc_request_set *rqset)
346 {
347         struct ptlrpc_request *req;
348         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349         struct osc_async_args *aa;
350         ENTRY;
351
352         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354                               OST_SETATTR, 3, size, NULL);
355         if (!req)
356                 RETURN(-ENOMEM);
357
358         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
360                 LASSERT(oti);
361                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
362         }
363
364         ptlrpc_req_set_repsize(req, 2, size);
365         /* do mds to ost setattr asynchronouly */
366         if (!rqset) {
367                 /* Do not wait for response. */
368                 ptlrpcd_add_req(req);
369         } else {
370                 req->rq_interpret_reply = osc_setattr_interpret;
371
372                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373                 aa = (struct osc_async_args *)&req->rq_async_args;
374                 aa->aa_oi = oinfo;
375
376                 ptlrpc_set_add_req(rqset, req);
377         }
378
379         RETURN(0);
380 }
381
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
384 {
385         struct ptlrpc_request *req;
386         struct ost_body *body;
387         struct lov_stripe_md *lsm;
388         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
389         ENTRY;
390
391         LASSERT(oa);
392         LASSERT(ea);
393
394         lsm = *ea;
395         if (!lsm) {
396                 rc = obd_alloc_memmd(exp, &lsm);
397                 if (rc < 0)
398                         RETURN(rc);
399         }
400
401         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402                               OST_CREATE, 2, size, NULL);
403         if (!req)
404                 GOTO(out, rc = -ENOMEM);
405
406         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
407         body->oa = *oa;
408
409         ptlrpc_req_set_repsize(req, 2, size);
410         if (oa->o_valid & OBD_MD_FLINLINE) {
411                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412                         oa->o_flags == OBD_FL_DELORPHAN);
413                 DEBUG_REQ(D_HA, req,
414                           "delorphan from OST integration");
415                 /* Don't resend the delorphan req */
416                 req->rq_no_resend = req->rq_no_delay = 1;
417         }
418
419         rc = ptlrpc_queue_wait(req);
420         if (rc)
421                 GOTO(out_req, rc);
422
423         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424                                   lustre_swab_ost_body);
425         if (body == NULL) {
426                 CERROR ("can't unpack ost_body\n");
427                 GOTO (out_req, rc = -EPROTO);
428         }
429
430         *oa = body->oa;
431
432         /* This should really be sent by the OST */
433         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434         oa->o_valid |= OBD_MD_FLBLKSZ;
435
436         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437          * have valid lsm_oinfo data structs, so don't go touching that.
438          * This needs to be fixed in a big way.
439          */
440         lsm->lsm_object_id = oa->o_id;
441         lsm->lsm_object_gr = oa->o_gr;
442         *ea = lsm;
443
444         if (oti != NULL) {
445                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
446
447                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448                         if (!oti->oti_logcookies)
449                                 oti_alloc_cookies(oti, 1);
450                         *oti->oti_logcookies = *obdo_logcookie(oa);
451                 }
452         }
453
454         CDEBUG(D_HA, "transno: "LPD64"\n",
455                lustre_msg_get_transno(req->rq_repmsg));
456         EXIT;
457 out_req:
458         ptlrpc_req_finished(req);
459 out:
460         if (rc && !*ea)
461                 obd_free_memmd(exp, &lsm);
462         return rc;
463 }
464
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466                                struct osc_async_args *aa, int rc)
467 {
468         struct ost_body *body;
469         ENTRY;
470
471         if (rc != 0)
472                 GOTO(out, rc);
473
474         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475                                   lustre_swab_ost_body);
476         if (body == NULL) {
477                 CERROR ("can't unpack ost_body\n");
478                 GOTO(out, rc = -EPROTO);
479         }
480
481         *aa->aa_oi->oi_oa = body->oa;
482 out:
483         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
484         RETURN(rc);
485 }
486
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488                      struct obd_trans_info *oti,
489                      struct ptlrpc_request_set *rqset)
490 {
491         struct ptlrpc_request *req;
492         struct osc_async_args *aa;
493         struct ost_body *body;
494         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
495         ENTRY;
496
497         if (!oinfo->oi_oa) {
498                 CERROR("oa NULL\n");
499                 RETURN(-EINVAL);
500         }
501
502         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504                               OST_PUNCH, 3, size, NULL);
505         if (!req)
506                 RETURN(-ENOMEM);
507
508         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
509
510         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511         /* overload the size and blocks fields in the oa with start/end */
512         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513         body->oa.o_size = oinfo->oi_policy.l_extent.start;
514         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
516
517         ptlrpc_req_set_repsize(req, 2, size);
518
519         req->rq_interpret_reply = osc_punch_interpret;
520         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521         aa = (struct osc_async_args *)&req->rq_async_args;
522         aa->aa_oi = oinfo;
523         ptlrpc_set_add_req(rqset, req);
524
525         RETURN(0);
526 }
527
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529                     struct lov_stripe_md *md, obd_size start, obd_size end,
530                     void *capa)
531 {
532         struct ptlrpc_request *req;
533         struct ost_body *body;
534         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
535         ENTRY;
536
537         if (!oa) {
538                 CERROR("oa NULL\n");
539                 RETURN(-EINVAL);
540         }
541
542         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
543
544         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545                               OST_SYNC, 3, size, NULL);
546         if (!req)
547                 RETURN(-ENOMEM);
548
549         /* overload the size and blocks fields in the oa with start/end */
550         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
551         body->oa = *oa;
552         body->oa.o_size = start;
553         body->oa.o_blocks = end;
554         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
555
556         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
557
558         ptlrpc_req_set_repsize(req, 2, size);
559
560         rc = ptlrpc_queue_wait(req);
561         if (rc)
562                 GOTO(out, rc);
563
564         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565                                   lustre_swab_ost_body);
566         if (body == NULL) {
567                 CERROR ("can't unpack ost_body\n");
568                 GOTO (out, rc = -EPROTO);
569         }
570
571         *oa = body->oa;
572
573         EXIT;
574  out:
575         ptlrpc_req_finished(req);
576         return rc;
577 }
578
579 /* Find and cancel locally locks matched by @mode in the resource found by
580  * @objid. Found locks are added into @cancel list. Returns the amount of
581  * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583                                    struct list_head *cancels, ldlm_mode_t mode,
584                                    int lock_flags)
585 {
586         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
589         int count;
590         ENTRY;
591
592         if (res == NULL)
593                 RETURN(0);
594
595         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596                                            lock_flags, 0, NULL);
597         ldlm_resource_putref(res);
598         RETURN(count);
599 }
600
601 /* Destroy requests can be async always on the client, and we don't even really
602  * care about the return code since the client cannot do anything at all about
603  * a destroy failure.
604  * When the MDS is unlinking a filename, it saves the file objects into a
605  * recovery llog, and these object records are cancelled when the OST reports
606  * they were destroyed and sync'd to disk (i.e. transaction committed).
607  * If the client dies, or the OST is down when the object should be destroyed,
608  * the records are not cancelled, and when the OST reconnects to the MDS next,
609  * it will retrieve the llog unlink logs and then sends the log cancellation
610  * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
613                        struct obd_export *md_export)
614 {
615         CFS_LIST_HEAD(cancels);
616         struct ptlrpc_request *req;
617         struct ost_body *body;
618         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619         int count, bufcount = 2;
620         ENTRY;
621
622         if (!oa) {
623                 CERROR("oa NULL\n");
624                 RETURN(-EINVAL);
625         }
626
627         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628                                         LDLM_FL_DISCARD_DATA);
629         if (exp_connect_cancelset(exp) && count) {
630                 bufcount = 3;
631                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
632         }
633         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634                               OST_DESTROY, bufcount, size, NULL);
635         if (exp_connect_cancelset(exp) && req)
636                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
637         else
638                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
639
640         if (!req)
641                 RETURN(-ENOMEM);
642
643         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
644
645         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648                        sizeof(*oti->oti_logcookies));
649         body->oa = *oa;
650
651         ptlrpc_req_set_repsize(req, 2, size);
652
653         ptlrpcd_add_req(req);
654         RETURN(0);
655 }
656
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
658                                 long writing_bytes)
659 {
660         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
661
662         LASSERT(!(oa->o_valid & bits));
663
664         oa->o_valid |= bits;
665         client_obd_list_lock(&cli->cl_loi_list_lock);
666         oa->o_dirty = cli->cl_dirty;
667         if (cli->cl_dirty > cli->cl_dirty_max) {
668                 CERROR("dirty %lu > dirty_max %lu\n",
669                        cli->cl_dirty, cli->cl_dirty_max);
670                 oa->o_undirty = 0;
671         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672                 CERROR("dirty %d > system dirty_max %d\n",
673                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
674                 oa->o_undirty = 0;
675         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676                 CERROR("dirty %lu - dirty_max %lu too big???\n",
677                        cli->cl_dirty, cli->cl_dirty_max);
678                 oa->o_undirty = 0;
679         } else {
680                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681                                 (cli->cl_max_rpcs_in_flight + 1);
682                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
683         }
684         oa->o_grant = cli->cl_avail_grant;
685         oa->o_dropped = cli->cl_lost_grant;
686         cli->cl_lost_grant = 0;
687         client_obd_list_unlock(&cli->cl_loi_list_lock);
688         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
690 }
691
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694                                     struct brw_page *pga)
695 {
696         atomic_inc(&obd_dirty_pages);
697         cli->cl_dirty += CFS_PAGE_SIZE;
698         cli->cl_avail_grant -= CFS_PAGE_SIZE;
699         pga->flag |= OBD_BRW_FROM_GRANT;
700         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701                CFS_PAGE_SIZE, pga, pga->pg);
702         LASSERT(cli->cl_avail_grant >= 0);
703 }
704
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706  * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708                                     struct brw_page *pga, int sent)
709 {
710         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
711         ENTRY;
712
713         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
714                 EXIT;
715                 return;
716         }
717
718         pga->flag &= ~OBD_BRW_FROM_GRANT;
719         atomic_dec(&obd_dirty_pages);
720         cli->cl_dirty -= CFS_PAGE_SIZE;
721         if (!sent) {
722                 cli->cl_lost_grant += CFS_PAGE_SIZE;
723                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726                 /* For short writes we shouldn't count parts of pages that
727                  * span a whole block on the OST side, or our accounting goes
728                  * wrong.  Should match the code in filter_grant_check. */
729                 int offset = pga->off & ~CFS_PAGE_MASK;
730                 int count = pga->count + (offset & (blocksize - 1));
731                 int end = (offset + pga->count) & (blocksize - 1);
732                 if (end)
733                         count += blocksize - end;
734
735                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738                        cli->cl_avail_grant, cli->cl_dirty);
739         }
740
741         EXIT;
742 }
743
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
745 {
746         return cli->cl_r_in_flight + cli->cl_w_in_flight;
747 }
748
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
751 {
752         struct list_head *l, *tmp;
753         struct osc_cache_waiter *ocw;
754
755         ENTRY;
756         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757                 /* if we can't dirty more, we must wait until some is written */
758                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761                                "osc max %ld, sys max %d\n", cli->cl_dirty,
762                                cli->cl_dirty_max, obd_max_dirty_pages);
763                         return;
764                 }
765
766                 /* if still dirty cache but no grant wait for pending RPCs that
767                  * may yet return us some grant before doing sync writes */
768                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770                                cli->cl_w_in_flight);
771                         return;
772                 }
773
774                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775                 list_del_init(&ocw->ocw_entry);
776                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777                         /* no more RPCs in flight to return grant, do sync IO */
778                         ocw->ocw_rc = -EDQUOT;
779                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
780                 } else {
781                         osc_consume_write_grant(cli,
782                                                 &ocw->ocw_oap->oap_brw_page);
783                 }
784
785                 cfs_waitq_signal(&ocw->ocw_waitq);
786         }
787
788         EXIT;
789 }
790
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
792 {
793         client_obd_list_lock(&cli->cl_loi_list_lock);
794         cli->cl_avail_grant = ocd->ocd_grant;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796
797         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798                cli->cl_avail_grant, cli->cl_lost_grant);
799         LASSERT(cli->cl_avail_grant >= 0);
800 }
801
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
803 {
804         client_obd_list_lock(&cli->cl_loi_list_lock);
805         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806         cli->cl_avail_grant += body->oa.o_grant;
807         /* waiters are woken in brw_interpret_oap */
808         client_obd_list_unlock(&cli->cl_loi_list_lock);
809 }
810
811 /* We assume that the reason this OSC got a short read is because it read
812  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
813  * via the LOV, and it _knows_ it's reading inside the file, it's just that
814  * this stripe never got written at or beyond this stripe offset yet. */
815 static void handle_short_read(int nob_read, obd_count page_count,
816                               struct brw_page **pga)
817 {
818         char *ptr;
819         int i = 0;
820
821         /* skip bytes read OK */
822         while (nob_read > 0) {
823                 LASSERT (page_count > 0);
824
825                 if (pga[i]->count > nob_read) {
826                         /* EOF inside this page */
827                         ptr = cfs_kmap(pga[i]->pg) +
828                                 (pga[i]->off & ~CFS_PAGE_MASK);
829                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
830                         cfs_kunmap(pga[i]->pg);
831                         page_count--;
832                         i++;
833                         break;
834                 }
835
836                 nob_read -= pga[i]->count;
837                 page_count--;
838                 i++;
839         }
840
841         /* zero remaining pages */
842         while (page_count-- > 0) {
843                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
844                 memset(ptr, 0, pga[i]->count);
845                 cfs_kunmap(pga[i]->pg);
846                 i++;
847         }
848 }
849
850 static int check_write_rcs(struct ptlrpc_request *req,
851                            int requested_nob, int niocount,
852                            obd_count page_count, struct brw_page **pga)
853 {
854         int    *remote_rcs, i;
855
856         /* return error if any niobuf was in error */
857         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
858                                         sizeof(*remote_rcs) * niocount, NULL);
859         if (remote_rcs == NULL) {
860                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
861                 return(-EPROTO);
862         }
863         if (lustre_msg_swabbed(req->rq_repmsg))
864                 for (i = 0; i < niocount; i++)
865                         __swab32s(&remote_rcs[i]);
866
867         for (i = 0; i < niocount; i++) {
868                 if (remote_rcs[i] < 0)
869                         return(remote_rcs[i]);
870
871                 if (remote_rcs[i] != 0) {
872                         CERROR("rc[%d] invalid (%d) req %p\n",
873                                 i, remote_rcs[i], req);
874                         return(-EPROTO);
875                 }
876         }
877
878         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
879                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
880                        requested_nob, req->rq_bulk->bd_nob_transferred);
881                 return(-EPROTO);
882         }
883
884         return (0);
885 }
886
887 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
888 {
889         if (p1->flag != p2->flag) {
890                 unsigned mask = ~OBD_BRW_FROM_GRANT;
891
892                 /* warn if we try to combine flags that we don't know to be
893                  * safe to combine */
894                 if ((p1->flag & mask) != (p2->flag & mask))
895                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
896                                "same brw?\n", p1->flag, p2->flag);
897                 return 0;
898         }
899
900         return (p1->off + p1->count == p2->off);
901 }
902
903 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
904                                    struct brw_page **pga)
905 {
906         __u32 cksum = ~0;
907         int i = 0;
908
909         LASSERT (pg_count > 0);
910         while (nob > 0 && pg_count > 0) {
911                 char *ptr = cfs_kmap(pga[i]->pg);
912                 int off = pga[i]->off & ~CFS_PAGE_MASK;
913                 int count = pga[i]->count > nob ? nob : pga[i]->count;
914
915                 /* corrupt the data before we compute the checksum, to
916                  * simulate an OST->client data error */
917                 if (i == 0 &&
918                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
919                         memcpy(ptr + off, "bad1", min(4, nob));
920                 cksum = crc32_le(cksum, ptr + off, count);
921                 cfs_kunmap(pga[i]->pg);
922                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
923                                off, cksum);
924
925                 nob -= pga[i]->count;
926                 pg_count--;
927                 i++;
928         }
929         /* For sending we only compute the wrong checksum instead
930          * of corrupting the data so it is still correct on a redo */
931         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
932                 cksum++;
933
934         return cksum;
935 }
936
937 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
938                                 struct lov_stripe_md *lsm, obd_count page_count,
939                                 struct brw_page **pga, 
940                                 struct ptlrpc_request **reqp,
941                                 struct obd_capa *ocapa)
942 {
943         struct ptlrpc_request   *req;
944         struct ptlrpc_bulk_desc *desc;
945         struct ost_body         *body;
946         struct obd_ioobj        *ioobj;
947         struct niobuf_remote    *niobuf;
948         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
949         int niocount, i, requested_nob, opc, rc;
950         struct ptlrpc_request_pool *pool;
951         struct lustre_capa      *capa;
952         struct osc_brw_async_args *aa;
953
954         ENTRY;
955         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
956         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
957
958         if ((cmd & OBD_BRW_WRITE) != 0) {
959                 opc = OST_WRITE;
960                 pool = cli->cl_import->imp_rq_pool;
961         } else {
962                 opc = OST_READ;
963                 pool = NULL;
964         }
965
966         for (niocount = i = 1; i < page_count; i++) {
967                 if (!can_merge_pages(pga[i - 1], pga[i]))
968                         niocount++;
969         }
970
971         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
972         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
973         if (ocapa)
974                 size[REQ_REC_OFF + 3] = sizeof(*capa);
975
976         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
977                                    size, NULL, pool, NULL);
978         if (req == NULL)
979                 RETURN (-ENOMEM);
980
981         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
982
983         if (opc == OST_WRITE)
984                 desc = ptlrpc_prep_bulk_imp (req, page_count,
985                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
986         else
987                 desc = ptlrpc_prep_bulk_imp (req, page_count,
988                                              BULK_PUT_SINK, OST_BULK_PORTAL);
989         if (desc == NULL)
990                 GOTO(out, rc = -ENOMEM);
991         /* NB request now owns desc and will free it when it gets freed */
992
993         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
994         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
995         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
996                                 niocount * sizeof(*niobuf));
997
998         body->oa = *oa;
999
1000         obdo_to_ioobj(oa, ioobj);
1001         ioobj->ioo_bufcnt = niocount;
1002         if (ocapa) {
1003                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1004                                       sizeof(*capa));
1005                 capa_cpy(capa, ocapa);
1006                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1007         }
1008
1009         LASSERT (page_count > 0);
1010         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1011                 struct brw_page *pg = pga[i];
1012                 struct brw_page *pg_prev = pga[i - 1];
1013
1014                 LASSERT(pg->count > 0);
1015                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1016                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1017                          pg->off, pg->count);
1018 #ifdef __LINUX__
1019                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1020                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1021                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1022                          i, page_count,
1023                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1024                          pg_prev->pg, page_private(pg_prev->pg),
1025                          pg_prev->pg->index, pg_prev->off);
1026 #else
1027                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1028                          "i %d p_c %u\n", i, page_count);
1029 #endif
1030                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1031                         (pg->flag & OBD_BRW_SRVLOCK));
1032
1033                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1034                                       pg->count);
1035                 requested_nob += pg->count;
1036
1037                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1038                         niobuf--;
1039                         niobuf->len += pg->count;
1040                 } else {
1041                         niobuf->offset = pg->off;
1042                         niobuf->len    = pg->count;
1043                         niobuf->flags  = pg->flag;
1044                 }
1045         }
1046
1047         LASSERT((void *)(niobuf - niocount) ==
1048                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1049                                niocount * sizeof(*niobuf)));
1050         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1051
1052         /* size[REQ_REC_OFF] still sizeof (*body) */
1053         if (opc == OST_WRITE) {
1054                 if (unlikely(cli->cl_checksum)) {
1055                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1056                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1057                                                              page_count, pga);
1058                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1059                                body->oa.o_cksum);
1060                         /* save this in 'oa', too, for later checking */
1061                         oa->o_valid |= OBD_MD_FLCKSUM;
1062                 } else {
1063                         /* clear out the checksum flag, in case this is a
1064                          * resend but cl_checksum is no longer set. b=11238 */
1065                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1066                 }
1067                 oa->o_cksum = body->oa.o_cksum;
1068                 /* 1 RC per niobuf */
1069                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1070                 ptlrpc_req_set_repsize(req, 3, size);
1071         } else {
1072                 if (unlikely(cli->cl_checksum))
1073                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1074                 /* 1 RC for the whole I/O */
1075                 ptlrpc_req_set_repsize(req, 2, size);
1076         }
1077
1078         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1079         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1080         aa->aa_oa = oa;
1081         aa->aa_requested_nob = requested_nob;
1082         aa->aa_nio_count = niocount;
1083         aa->aa_page_count = page_count;
1084         aa->aa_resends = 0;
1085         aa->aa_ppga = pga;
1086         aa->aa_cli = cli;
1087         INIT_LIST_HEAD(&aa->aa_oaps);
1088
1089         *reqp = req;
1090         RETURN (0);
1091
1092  out:
1093         ptlrpc_req_finished (req);
1094         RETURN (rc);
1095 }
1096
1097 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1098                                 __u32 client_cksum, __u32 server_cksum,
1099                                 int nob, obd_count page_count,
1100                                 struct brw_page **pga)
1101 {
1102         __u32 new_cksum;
1103         char *msg;
1104
1105         if (server_cksum == client_cksum) {
1106                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1107                 return 0;
1108         }
1109
1110         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1111
1112         if (new_cksum == server_cksum)
1113                 msg = "changed on the client after we checksummed it - "
1114                       "likely false positive due to mmap IO (bug 11742)";
1115         else if (new_cksum == client_cksum)
1116                 msg = "changed in transit before arrival at OST";
1117         else
1118                 msg = "changed in transit AND doesn't match the original - "
1119                       "likely false positive due to mmap IO (bug 11742)";
1120
1121         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1122                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1123                            "["LPU64"-"LPU64"]\n",
1124                            msg, libcfs_nid2str(peer->nid),
1125                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1126                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1127                                                         (__u64)0,
1128                            oa->o_id,
1129                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1130                            pga[0]->off,
1131                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1132         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1133                client_cksum, server_cksum, new_cksum);
1134         return 1;        
1135 }
1136
1137 /* Note rc enters this function as number of bytes transferred */
1138 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1139 {
1140         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1141         const lnet_process_id_t *peer =
1142                         &req->rq_import->imp_connection->c_peer;
1143         struct client_obd *cli = aa->aa_cli;
1144         struct ost_body *body;
1145         __u32 client_cksum = 0;
1146         ENTRY;
1147
1148         if (rc < 0 && rc != -EDQUOT)
1149                 RETURN(rc);
1150
1151         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1152         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1153                                   lustre_swab_ost_body);
1154         if (body == NULL) {
1155                 CERROR ("Can't unpack body\n");
1156                 RETURN(-EPROTO);
1157         }
1158
1159         /* set/clear over quota flag for a uid/gid */
1160         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1161             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1162                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1163                              body->oa.o_gid, body->oa.o_valid,
1164                              body->oa.o_flags);
1165
1166         if (rc < 0)
1167                 RETURN(rc);
1168
1169         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1170                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1171
1172         osc_update_grant(cli, body);
1173
1174         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1175                 if (rc > 0) {
1176                         CERROR ("Unexpected +ve rc %d\n", rc);
1177                         RETURN(-EPROTO);
1178                 }
1179                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1180
1181                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1182                              client_cksum &&
1183                              check_write_checksum(&body->oa, peer, client_cksum,
1184                                                   body->oa.o_cksum,
1185                                                   aa->aa_requested_nob,
1186                                                   aa->aa_page_count,
1187                                                   aa->aa_ppga)))
1188                         RETURN(-EAGAIN);
1189
1190                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1191                         RETURN(-EAGAIN);
1192
1193                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1194                                      aa->aa_page_count, aa->aa_ppga);
1195                 GOTO(out, rc);
1196         }
1197
1198         /* The rest of this function executes only for OST_READs */
1199         if (rc > aa->aa_requested_nob) {
1200                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1201                        aa->aa_requested_nob);
1202                 RETURN(-EPROTO);
1203         }
1204
1205         if (rc != req->rq_bulk->bd_nob_transferred) {
1206                 CERROR ("Unexpected rc %d (%d transferred)\n",
1207                         rc, req->rq_bulk->bd_nob_transferred);
1208                 return (-EPROTO);
1209         }
1210
1211         if (rc < aa->aa_requested_nob)
1212                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1213
1214         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1215                                          aa->aa_ppga))
1216                 GOTO(out, rc = -EAGAIN);
1217
1218         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1219                 static int cksum_counter;
1220                 __u32      server_cksum = body->oa.o_cksum;
1221                 char      *via;
1222                 char      *router;
1223
1224                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1225                                                  aa->aa_ppga);
1226
1227                 if (peer->nid == req->rq_bulk->bd_sender) {
1228                         via = router = "";
1229                 } else {
1230                         via = " via ";
1231                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1232                 }
1233
1234                 if (server_cksum == ~0 && rc > 0) {
1235                         CERROR("Protocol error: server %s set the 'checksum' "
1236                                "bit, but didn't send a checksum.  Not fatal, "
1237                                "but please tell CFS.\n",
1238                                libcfs_nid2str(peer->nid));
1239                 } else if (server_cksum != client_cksum) {
1240                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1241                                            "%s%s%s inum "LPU64"/"LPU64" object "
1242                                            LPU64"/"LPU64" extent "
1243                                            "["LPU64"-"LPU64"]\n",
1244                                            req->rq_import->imp_obd->obd_name,
1245                                            libcfs_nid2str(peer->nid),
1246                                            via, router,
1247                                            body->oa.o_valid & OBD_MD_FLFID ?
1248                                                 body->oa.o_fid : (__u64)0,
1249                                            body->oa.o_valid & OBD_MD_FLFID ?
1250                                                 body->oa.o_generation :(__u64)0,
1251                                            body->oa.o_id,
1252                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1253                                                 body->oa.o_gr : (__u64)0,
1254                                            aa->aa_ppga[0]->off,
1255                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1256                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1257                                                                         1);
1258                         CERROR("client %x, server %x\n",
1259                                client_cksum, server_cksum);
1260                         cksum_counter = 0;
1261                         aa->aa_oa->o_cksum = client_cksum;
1262                         rc = -EAGAIN;
1263                 } else {
1264                         cksum_counter++;
1265                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1266                         rc = 0;
1267                 }
1268         } else if (unlikely(client_cksum)) {
1269                 static int cksum_missed;
1270
1271                 cksum_missed++;
1272                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1273                         CERROR("Checksum %u requested from %s but not sent\n",
1274                                cksum_missed, libcfs_nid2str(peer->nid));
1275         } else {
1276                 rc = 0;
1277         }
1278 out:
1279         if (rc >= 0)
1280                 *aa->aa_oa = body->oa;
1281
1282         RETURN(rc);
1283 }
1284
1285 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1286                             struct lov_stripe_md *lsm,
1287                             obd_count page_count, struct brw_page **pga,
1288                             struct obd_capa *ocapa)
1289 {
1290         struct ptlrpc_request *req;
1291         int                    rc;
1292         cfs_waitq_t            waitq;
1293         int                    resends = 0;
1294         struct l_wait_info     lwi;
1295
1296         ENTRY;
1297
1298         cfs_waitq_init(&waitq);
1299
1300 restart_bulk:
1301         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1302                                   page_count, pga, &req, ocapa);
1303         if (rc != 0)
1304                 return (rc);
1305
1306         rc = ptlrpc_queue_wait(req);
1307
1308         if (rc == -ETIMEDOUT && req->rq_resend) {
1309                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1310                 ptlrpc_req_finished(req);
1311                 goto restart_bulk;
1312         }
1313
1314         rc = osc_brw_fini_request(req, rc);
1315
1316         ptlrpc_req_finished(req);
1317         if (osc_recoverable_error(rc)) {
1318                 resends++;
1319                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1320                         CERROR("too many resend retries, returning error\n");
1321                         RETURN(-EIO);
1322                 }
1323
1324                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1325                 l_wait_event(waitq, 0, &lwi);
1326
1327                 goto restart_bulk;
1328         }
1329         
1330         RETURN (rc);
1331 }
1332
1333 int osc_brw_redo_request(struct ptlrpc_request *request,
1334                          struct osc_brw_async_args *aa)
1335 {
1336         struct ptlrpc_request *new_req;
1337         struct ptlrpc_request_set *set = request->rq_set;
1338         struct osc_brw_async_args *new_aa;
1339         struct osc_async_page *oap;
1340         int rc = 0;
1341         ENTRY;
1342
1343         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1344                 CERROR("too many resend retries, returning error\n");
1345                 RETURN(-EIO);
1346         }
1347         
1348         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1349 /*
1350         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1351         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1352                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1353                                            REQ_REC_OFF + 3);
1354 */
1355         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1356                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1357                                   aa->aa_cli, aa->aa_oa,
1358                                   NULL /* lsm unused by osc currently */,
1359                                   aa->aa_page_count, aa->aa_ppga, 
1360                                   &new_req, NULL /* ocapa */);
1361         if (rc)
1362                 RETURN(rc);
1363
1364         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1365    
1366         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1367                 if (oap->oap_request != NULL) {
1368                         LASSERTF(request == oap->oap_request,
1369                                  "request %p != oap_request %p\n",
1370                                  request, oap->oap_request);
1371                         if (oap->oap_interrupted) {
1372                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1373                                 ptlrpc_req_finished(new_req);                        
1374                                 RETURN(-EINTR);
1375                         }
1376                 }
1377         }
1378         /* New request takes over pga and oaps from old request.
1379          * Note that copying a list_head doesn't work, need to move it... */
1380         aa->aa_resends++;
1381         new_req->rq_interpret_reply = request->rq_interpret_reply;
1382         new_req->rq_async_args = request->rq_async_args;
1383         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1384
1385         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1386
1387         INIT_LIST_HEAD(&new_aa->aa_oaps);
1388         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1389         INIT_LIST_HEAD(&aa->aa_oaps);
1390
1391         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1392                 if (oap->oap_request) {
1393                         ptlrpc_req_finished(oap->oap_request);
1394                         oap->oap_request = ptlrpc_request_addref(new_req);
1395                 }
1396         }
1397         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1398
1399         DEBUG_REQ(D_INFO, new_req, "new request");
1400
1401         ptlrpc_set_add_req(set, new_req);
1402
1403         RETURN(0);
1404 }
1405
1406 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1407 {
1408         struct osc_brw_async_args *aa = data;
1409         int                        i;
1410         int                        nob = rc;
1411         ENTRY;
1412
1413         rc = osc_brw_fini_request(req, rc);
1414         if (osc_recoverable_error(rc)) {
1415                 rc = osc_brw_redo_request(req, aa);
1416                 if (rc == 0)
1417                         RETURN(0);
1418         }
1419         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1420                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1421
1422         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1423         for (i = 0; i < aa->aa_page_count; i++)
1424                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1425         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1426
1427         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1428
1429         RETURN(rc);
1430 }
1431
1432 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1433                           struct lov_stripe_md *lsm, obd_count page_count,
1434                           struct brw_page **pga, struct ptlrpc_request_set *set,
1435                           struct obd_capa *ocapa)
1436 {
1437         struct ptlrpc_request     *req;
1438         struct client_obd         *cli = &exp->exp_obd->u.cli;
1439         int                        rc, i;
1440         ENTRY;
1441
1442         /* Consume write credits even if doing a sync write -
1443          * otherwise we may run out of space on OST due to grant. */
1444         if (cmd == OBD_BRW_WRITE) {
1445                 spin_lock(&cli->cl_loi_list_lock);
1446                 for (i = 0; i < page_count; i++) {
1447                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1448                                 osc_consume_write_grant(cli, pga[i]);
1449                 }
1450                 spin_unlock(&cli->cl_loi_list_lock);
1451         }
1452
1453         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1454                                   &req, ocapa);
1455         if (rc == 0) {
1456                 req->rq_interpret_reply = brw_interpret;
1457                 ptlrpc_set_add_req(set, req);
1458         } else if (cmd == OBD_BRW_WRITE) {
1459                 spin_lock(&cli->cl_loi_list_lock);
1460                 for (i = 0; i < page_count; i++)
1461                         osc_release_write_grant(cli, pga[i], 0);
1462                 spin_unlock(&cli->cl_loi_list_lock);
1463         }
1464         RETURN (rc);
1465 }
1466
1467 /*
1468  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1469  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1470  * fine for our small page arrays and doesn't require allocation.  its an
1471  * insertion sort that swaps elements that are strides apart, shrinking the
1472  * stride down until its '1' and the array is sorted.
1473  */
1474 static void sort_brw_pages(struct brw_page **array, int num)
1475 {
1476         int stride, i, j;
1477         struct brw_page *tmp;
1478
1479         if (num == 1)
1480                 return;
1481         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1482                 ;
1483
1484         do {
1485                 stride /= 3;
1486                 for (i = stride ; i < num ; i++) {
1487                         tmp = array[i];
1488                         j = i;
1489                         while (j >= stride && array[j - stride]->off > tmp->off) {
1490                                 array[j] = array[j - stride];
1491                                 j -= stride;
1492                         }
1493                         array[j] = tmp;
1494                 }
1495         } while (stride > 1);
1496 }
1497
1498 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1499 {
1500         int count = 1;
1501         int offset;
1502         int i = 0;
1503
1504         LASSERT (pages > 0);
1505         offset = pg[i]->off & ~CFS_PAGE_MASK;
1506
1507         for (;;) {
1508                 pages--;
1509                 if (pages == 0)         /* that's all */
1510                         return count;
1511
1512                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1513                         return count;   /* doesn't end on page boundary */
1514
1515                 i++;
1516                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1517                 if (offset != 0)        /* doesn't start on page boundary */
1518                         return count;
1519
1520                 count++;
1521         }
1522 }
1523
1524 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1525 {
1526         struct brw_page **ppga;
1527         int i;
1528
1529         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1530         if (ppga == NULL)
1531                 return NULL;
1532
1533         for (i = 0; i < count; i++)
1534                 ppga[i] = pga + i;
1535         return ppga;
1536 }
1537
1538 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1539 {
1540         LASSERT(ppga != NULL);
1541         OBD_FREE(ppga, sizeof(*ppga) * count);
1542 }
1543
1544 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1545                    obd_count page_count, struct brw_page *pga,
1546                    struct obd_trans_info *oti)
1547 {
1548         struct obdo *saved_oa = NULL;
1549         struct brw_page **ppga, **orig;
1550         struct obd_import *imp = class_exp2cliimp(exp);
1551         struct client_obd *cli = &imp->imp_obd->u.cli;
1552         int rc, page_count_orig;
1553         ENTRY;
1554
1555         if (cmd & OBD_BRW_CHECK) {
1556                 /* The caller just wants to know if there's a chance that this
1557                  * I/O can succeed */
1558
1559                 if (imp == NULL || imp->imp_invalid)
1560                         RETURN(-EIO);
1561                 RETURN(0);
1562         }
1563
1564         /* test_brw with a failed create can trip this, maybe others. */
1565         LASSERT(cli->cl_max_pages_per_rpc);
1566
1567         rc = 0;
1568
1569         orig = ppga = osc_build_ppga(pga, page_count);
1570         if (ppga == NULL)
1571                 RETURN(-ENOMEM);
1572         page_count_orig = page_count;
1573
1574         sort_brw_pages(ppga, page_count);
1575         while (page_count) {
1576                 obd_count pages_per_brw;
1577
1578                 if (page_count > cli->cl_max_pages_per_rpc)
1579                         pages_per_brw = cli->cl_max_pages_per_rpc;
1580                 else
1581                         pages_per_brw = page_count;
1582
1583                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1584
1585                 if (saved_oa != NULL) {
1586                         /* restore previously saved oa */
1587                         *oinfo->oi_oa = *saved_oa;
1588                 } else if (page_count > pages_per_brw) {
1589                         /* save a copy of oa (brw will clobber it) */
1590                         OBDO_ALLOC(saved_oa);
1591                         if (saved_oa == NULL)
1592                                 GOTO(out, rc = -ENOMEM);
1593                         *saved_oa = *oinfo->oi_oa;
1594                 }
1595
1596                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1597                                       pages_per_brw, ppga, oinfo->oi_capa);
1598
1599                 if (rc != 0)
1600                         break;
1601
1602                 page_count -= pages_per_brw;
1603                 ppga += pages_per_brw;
1604         }
1605
1606 out:
1607         osc_release_ppga(orig, page_count_orig);
1608
1609         if (saved_oa != NULL)
1610                 OBDO_FREE(saved_oa);
1611
1612         RETURN(rc);
1613 }
1614
1615 static int osc_brw_async(int cmd, struct obd_export *exp,
1616                          struct obd_info *oinfo, obd_count page_count,
1617                          struct brw_page *pga, struct obd_trans_info *oti,
1618                          struct ptlrpc_request_set *set)
1619 {
1620         struct brw_page **ppga, **orig;
1621         struct client_obd *cli = &exp->exp_obd->u.cli;
1622         int page_count_orig;
1623         int rc = 0;
1624         ENTRY;
1625
1626         if (cmd & OBD_BRW_CHECK) {
1627                 struct obd_import *imp = class_exp2cliimp(exp);
1628                 /* The caller just wants to know if there's a chance that this
1629                  * I/O can succeed */
1630
1631                 if (imp == NULL || imp->imp_invalid)
1632                         RETURN(-EIO);
1633                 RETURN(0);
1634         }
1635
1636         orig = ppga = osc_build_ppga(pga, page_count);
1637         if (ppga == NULL)
1638                 RETURN(-ENOMEM);
1639         page_count_orig = page_count;
1640
1641         sort_brw_pages(ppga, page_count);
1642         while (page_count) {
1643                 struct brw_page **copy;
1644                 obd_count pages_per_brw;
1645
1646                 pages_per_brw = min_t(obd_count, page_count,
1647                                       cli->cl_max_pages_per_rpc);
1648
1649                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1650
1651                 /* use ppga only if single RPC is going to fly */
1652                 if (pages_per_brw != page_count_orig || ppga != orig) {
1653                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1654                         if (copy == NULL)
1655                                 GOTO(out, rc = -ENOMEM);
1656                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1657                 } else
1658                         copy = ppga;
1659
1660                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1661                                     pages_per_brw, copy, set, oinfo->oi_capa);
1662
1663                 if (rc != 0) {
1664                         if (copy != ppga)
1665                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1666                         break;
1667                 }
1668                 if (copy == orig) {
1669                         /* we passed it to async_internal() which is
1670                          * now responsible for releasing memory */
1671                         orig = NULL;
1672                 }
1673
1674                 page_count -= pages_per_brw;
1675                 ppga += pages_per_brw;
1676         }
1677 out:
1678         if (orig)
1679                 osc_release_ppga(orig, page_count_orig);
1680         RETURN(rc);
1681 }
1682
1683 static void osc_check_rpcs(struct client_obd *cli);
1684
1685 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1686  * the dirty accounting.  Writeback completes or truncate happens before
1687  * writing starts.  Must be called with the loi lock held. */
1688 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1689                            int sent)
1690 {
1691         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1692 }
1693
1694
1695 /* This maintains the lists of pending pages to read/write for a given object
1696  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1697  * to quickly find objects that are ready to send an RPC. */
1698 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1699                          int cmd)
1700 {
1701         int optimal;
1702         ENTRY;
1703
1704         if (lop->lop_num_pending == 0)
1705                 RETURN(0);
1706
1707         /* if we have an invalid import we want to drain the queued pages
1708          * by forcing them through rpcs that immediately fail and complete
1709          * the pages.  recovery relies on this to empty the queued pages
1710          * before canceling the locks and evicting down the llite pages */
1711         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1712                 RETURN(1);
1713
1714         /* stream rpcs in queue order as long as as there is an urgent page
1715          * queued.  this is our cheap solution for good batching in the case
1716          * where writepage marks some random page in the middle of the file
1717          * as urgent because of, say, memory pressure */
1718         if (!list_empty(&lop->lop_urgent)) {
1719                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1720                 RETURN(1);
1721         }
1722         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1723         optimal = cli->cl_max_pages_per_rpc;
1724         if (cmd & OBD_BRW_WRITE) {
1725                 /* trigger a write rpc stream as long as there are dirtiers
1726                  * waiting for space.  as they're waiting, they're not going to
1727                  * create more pages to coallesce with what's waiting.. */
1728                 if (!list_empty(&cli->cl_cache_waiters)) {
1729                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1730                         RETURN(1);
1731                 }
1732                 /* +16 to avoid triggering rpcs that would want to include pages
1733                  * that are being queued but which can't be made ready until
1734                  * the queuer finishes with the page. this is a wart for
1735                  * llite::commit_write() */
1736                 optimal += 16;
1737         }
1738         if (lop->lop_num_pending >= optimal)
1739                 RETURN(1);
1740
1741         RETURN(0);
1742 }
1743
1744 static void on_list(struct list_head *item, struct list_head *list,
1745                     int should_be_on)
1746 {
1747         if (list_empty(item) && should_be_on)
1748                 list_add_tail(item, list);
1749         else if (!list_empty(item) && !should_be_on)
1750                 list_del_init(item);
1751 }
1752
1753 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1754  * can find pages to build into rpcs quickly */
1755 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1756 {
1757         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1758                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1759                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1760
1761         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1762                 loi->loi_write_lop.lop_num_pending);
1763
1764         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1765                 loi->loi_read_lop.lop_num_pending);
1766 }
1767
1768 static void lop_update_pending(struct client_obd *cli,
1769                                struct loi_oap_pages *lop, int cmd, int delta)
1770 {
1771         lop->lop_num_pending += delta;
1772         if (cmd & OBD_BRW_WRITE)
1773                 cli->cl_pending_w_pages += delta;
1774         else
1775                 cli->cl_pending_r_pages += delta;
1776 }
1777
1778 /* this is called when a sync waiter receives an interruption.  Its job is to
1779  * get the caller woken as soon as possible.  If its page hasn't been put in an
1780  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1781  * desiring interruption which will forcefully complete the rpc once the rpc
1782  * has timed out */
1783 static void osc_occ_interrupted(struct oig_callback_context *occ)
1784 {
1785         struct osc_async_page *oap;
1786         struct loi_oap_pages *lop;
1787         struct lov_oinfo *loi;
1788         ENTRY;
1789
1790         /* XXX member_of() */
1791         oap = list_entry(occ, struct osc_async_page, oap_occ);
1792
1793         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1794
1795         oap->oap_interrupted = 1;
1796
1797         /* ok, it's been put in an rpc. only one oap gets a request reference */
1798         if (oap->oap_request != NULL) {
1799                 ptlrpc_mark_interrupted(oap->oap_request);
1800                 ptlrpcd_wake(oap->oap_request);
1801                 GOTO(unlock, 0);
1802         }
1803
1804         /* we don't get interruption callbacks until osc_trigger_group_io()
1805          * has been called and put the sync oaps in the pending/urgent lists.*/
1806         if (!list_empty(&oap->oap_pending_item)) {
1807                 list_del_init(&oap->oap_pending_item);
1808                 list_del_init(&oap->oap_urgent_item);
1809
1810                 loi = oap->oap_loi;
1811                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1812                         &loi->loi_write_lop : &loi->loi_read_lop;
1813                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1814                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1815
1816                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1817                 oap->oap_oig = NULL;
1818         }
1819
1820 unlock:
1821         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1822 }
1823
1824 /* this is trying to propogate async writeback errors back up to the
1825  * application.  As an async write fails we record the error code for later if
1826  * the app does an fsync.  As long as errors persist we force future rpcs to be
1827  * sync so that the app can get a sync error and break the cycle of queueing
1828  * pages for which writeback will fail. */
1829 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1830                            int rc)
1831 {
1832         if (rc) {
1833                 if (!ar->ar_rc)
1834                         ar->ar_rc = rc;
1835
1836                 ar->ar_force_sync = 1;
1837                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1838                 return;
1839
1840         }
1841
1842         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1843                 ar->ar_force_sync = 0;
1844 }
1845
1846 static void osc_oap_to_pending(struct osc_async_page *oap)
1847 {
1848         struct loi_oap_pages *lop;
1849
1850         if (oap->oap_cmd & OBD_BRW_WRITE)
1851                 lop = &oap->oap_loi->loi_write_lop;
1852         else
1853                 lop = &oap->oap_loi->loi_read_lop;
1854
1855         if (oap->oap_async_flags & ASYNC_URGENT)
1856                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1857         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1858         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1859 }
1860
1861 /* this must be called holding the loi list lock to give coverage to exit_cache,
1862  * async_flag maintenance, and oap_request */
1863 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1864                               struct osc_async_page *oap, int sent, int rc)
1865 {
1866         __u64 xid = 0;
1867
1868         ENTRY;
1869         if (oap->oap_request != NULL) {
1870                 xid = ptlrpc_req_xid(oap->oap_request);
1871                 ptlrpc_req_finished(oap->oap_request);
1872                 oap->oap_request = NULL;
1873         }
1874
1875         oap->oap_async_flags = 0;
1876         oap->oap_interrupted = 0;
1877
1878         if (oap->oap_cmd & OBD_BRW_WRITE) {
1879                 osc_process_ar(&cli->cl_ar, xid, rc);
1880                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1881         }
1882
1883         if (rc == 0 && oa != NULL) {
1884                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1885                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1886                 if (oa->o_valid & OBD_MD_FLMTIME)
1887                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1888                 if (oa->o_valid & OBD_MD_FLATIME)
1889                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1890                 if (oa->o_valid & OBD_MD_FLCTIME)
1891                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1892         }
1893
1894         if (oap->oap_oig) {
1895                 osc_exit_cache(cli, oap, sent);
1896                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1897                 oap->oap_oig = NULL;
1898                 EXIT;
1899                 return;
1900         }
1901
1902         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1903                                                 oap->oap_cmd, oa, rc);
1904
1905         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1906          * I/O on the page could start, but OSC calls it under lock
1907          * and thus we can add oap back to pending safely */
1908         if (rc)
1909                 /* upper layer wants to leave the page on pending queue */
1910                 osc_oap_to_pending(oap);
1911         else
1912                 osc_exit_cache(cli, oap, sent);
1913         EXIT;
1914 }
1915
1916 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1917 {
1918         struct osc_async_page *oap, *tmp;
1919         struct osc_brw_async_args *aa = data;
1920         struct client_obd *cli;
1921         ENTRY;
1922
1923         rc = osc_brw_fini_request(req, rc);
1924         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1925         if (osc_recoverable_error(rc)) {
1926                 rc = osc_brw_redo_request(req, aa);
1927                 if (rc == 0)
1928                         RETURN(0);
1929         }
1930
1931         cli = aa->aa_cli;
1932
1933         client_obd_list_lock(&cli->cl_loi_list_lock);
1934
1935         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1936          * is called so we know whether to go to sync BRWs or wait for more
1937          * RPCs to complete */
1938         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1939                 cli->cl_w_in_flight--;
1940         else
1941                 cli->cl_r_in_flight--;
1942
1943         /* the caller may re-use the oap after the completion call so
1944          * we need to clean it up a little */
1945         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1946                 list_del_init(&oap->oap_rpc_item);
1947                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1948         }
1949
1950         osc_wake_cache_waiters(cli);
1951         osc_check_rpcs(cli);
1952
1953         client_obd_list_unlock(&cli->cl_loi_list_lock);
1954
1955         OBDO_FREE(aa->aa_oa);
1956         
1957         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1958         RETURN(rc);
1959 }
1960
1961 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1962                                             struct list_head *rpc_list,
1963                                             int page_count, int cmd)
1964 {
1965         struct ptlrpc_request *req;
1966         struct brw_page **pga = NULL;
1967         struct osc_brw_async_args *aa;
1968         struct obdo *oa = NULL;
1969         struct obd_async_page_ops *ops = NULL;
1970         void *caller_data = NULL;
1971         struct obd_capa *ocapa;
1972         struct osc_async_page *oap;
1973         int i, rc;
1974
1975         ENTRY;
1976         LASSERT(!list_empty(rpc_list));
1977
1978         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1979         if (pga == NULL)
1980                 RETURN(ERR_PTR(-ENOMEM));
1981
1982         OBDO_ALLOC(oa);
1983         if (oa == NULL)
1984                 GOTO(out, req = ERR_PTR(-ENOMEM));
1985
1986         i = 0;
1987         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1988                 if (ops == NULL) {
1989                         ops = oap->oap_caller_ops;
1990                         caller_data = oap->oap_caller_data;
1991                 }
1992                 pga[i] = &oap->oap_brw_page;
1993                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1994                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1995                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1996                 i++;
1997         }
1998
1999         /* always get the data for the obdo for the rpc */
2000         LASSERT(ops != NULL);
2001         ops->ap_fill_obdo(caller_data, cmd, oa);
2002         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2003
2004         sort_brw_pages(pga, page_count);
2005         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2006                                   pga, &req, ocapa);
2007         capa_put(ocapa);
2008         if (rc != 0) {
2009                 CERROR("prep_req failed: %d\n", rc);
2010                 GOTO(out, req = ERR_PTR(rc));
2011         }
2012
2013         /* Need to update the timestamps after the request is built in case
2014          * we race with setattr (locally or in queue at OST).  If OST gets
2015          * later setattr before earlier BRW (as determined by the request xid),
2016          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2017          * way to do this in a single call.  bug 10150 */
2018         ops->ap_update_obdo(caller_data, cmd, oa,
2019                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2020
2021         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2022         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2023         INIT_LIST_HEAD(&aa->aa_oaps);
2024         list_splice(rpc_list, &aa->aa_oaps);
2025         INIT_LIST_HEAD(rpc_list);
2026
2027 out:
2028         if (IS_ERR(req)) {
2029                 if (oa)
2030                         OBDO_FREE(oa);
2031                 if (pga)
2032                         OBD_FREE(pga, sizeof(*pga) * page_count);
2033         }
2034         RETURN(req);
2035 }
2036
2037 /* the loi lock is held across this function but it's allowed to release
2038  * and reacquire it during its work */
2039 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2040                             int cmd, struct loi_oap_pages *lop)
2041 {
2042         struct ptlrpc_request *req;
2043         obd_count page_count = 0;
2044         struct osc_async_page *oap = NULL, *tmp;
2045         struct osc_brw_async_args *aa;
2046         struct obd_async_page_ops *ops;
2047         CFS_LIST_HEAD(rpc_list);
2048         unsigned int ending_offset;
2049         unsigned  starting_offset = 0;
2050         ENTRY;
2051
2052         /* first we find the pages we're allowed to work with */
2053         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2054                                  oap_pending_item) {
2055                 ops = oap->oap_caller_ops;
2056
2057                 LASSERT(oap->oap_magic == OAP_MAGIC);
2058
2059                 /* in llite being 'ready' equates to the page being locked
2060                  * until completion unlocks it.  commit_write submits a page
2061                  * as not ready because its unlock will happen unconditionally
2062                  * as the call returns.  if we race with commit_write giving
2063                  * us that page we dont' want to create a hole in the page
2064                  * stream, so we stop and leave the rpc to be fired by
2065                  * another dirtier or kupdated interval (the not ready page
2066                  * will still be on the dirty list).  we could call in
2067                  * at the end of ll_file_write to process the queue again. */
2068                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2069                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2070                         if (rc < 0)
2071                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2072                                                 "instead of ready\n", oap,
2073                                                 oap->oap_page, rc);
2074                         switch (rc) {
2075                         case -EAGAIN:
2076                                 /* llite is telling us that the page is still
2077                                  * in commit_write and that we should try
2078                                  * and put it in an rpc again later.  we
2079                                  * break out of the loop so we don't create
2080                                  * a hole in the sequence of pages in the rpc
2081                                  * stream.*/
2082                                 oap = NULL;
2083                                 break;
2084                         case -EINTR:
2085                                 /* the io isn't needed.. tell the checks
2086                                  * below to complete the rpc with EINTR */
2087                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2088                                 oap->oap_count = -EINTR;
2089                                 break;
2090                         case 0:
2091                                 oap->oap_async_flags |= ASYNC_READY;
2092                                 break;
2093                         default:
2094                                 LASSERTF(0, "oap %p page %p returned %d "
2095                                             "from make_ready\n", oap,
2096                                             oap->oap_page, rc);
2097                                 break;
2098                         }
2099                 }
2100                 if (oap == NULL)
2101                         break;
2102                 /*
2103                  * Page submitted for IO has to be locked. Either by
2104                  * ->ap_make_ready() or by higher layers.
2105                  *
2106                  * XXX nikita: this assertion should be adjusted when lustre
2107                  * starts using PG_writeback for pages being written out.
2108                  */
2109 #if defined(__KERNEL__) && defined(__LINUX__)
2110                 LASSERT(PageLocked(oap->oap_page));
2111 #endif
2112                 /* If there is a gap at the start of this page, it can't merge
2113                  * with any previous page, so we'll hand the network a
2114                  * "fragmented" page array that it can't transfer in 1 RDMA */
2115                 if (page_count != 0 && oap->oap_page_off != 0)
2116                         break;
2117
2118                 /* take the page out of our book-keeping */
2119                 list_del_init(&oap->oap_pending_item);
2120                 lop_update_pending(cli, lop, cmd, -1);
2121                 list_del_init(&oap->oap_urgent_item);
2122
2123                 if (page_count == 0)
2124                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2125                                           (PTLRPC_MAX_BRW_SIZE - 1);
2126
2127                 /* ask the caller for the size of the io as the rpc leaves. */
2128                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2129                         oap->oap_count =
2130                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2131                 if (oap->oap_count <= 0) {
2132                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2133                                oap->oap_count);
2134                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2135                         continue;
2136                 }
2137
2138                 /* now put the page back in our accounting */
2139                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2140                 if (++page_count >= cli->cl_max_pages_per_rpc)
2141                         break;
2142
2143                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2144                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2145                  * have the same alignment as the initial writes that allocated
2146                  * extents on the server. */
2147                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2148                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2149                 if (ending_offset == 0)
2150                         break;
2151
2152                 /* If there is a gap at the end of this page, it can't merge
2153                  * with any subsequent pages, so we'll hand the network a
2154                  * "fragmented" page array that it can't transfer in 1 RDMA */
2155                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2156                         break;
2157         }
2158
2159         osc_wake_cache_waiters(cli);
2160
2161         if (page_count == 0)
2162                 RETURN(0);
2163
2164         loi_list_maint(cli, loi);
2165
2166         client_obd_list_unlock(&cli->cl_loi_list_lock);
2167
2168         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2169         if (IS_ERR(req)) {
2170                 /* this should happen rarely and is pretty bad, it makes the
2171                  * pending list not follow the dirty order */
2172                 client_obd_list_lock(&cli->cl_loi_list_lock);
2173                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2174                         list_del_init(&oap->oap_rpc_item);
2175
2176                         /* queued sync pages can be torn down while the pages
2177                          * were between the pending list and the rpc */
2178                         if (oap->oap_interrupted) {
2179                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2180                                 osc_ap_completion(cli, NULL, oap, 0,
2181                                                   oap->oap_count);
2182                                 continue;
2183                         }
2184                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2185                 }
2186                 loi_list_maint(cli, loi);
2187                 RETURN(PTR_ERR(req));
2188         }
2189
2190         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2191
2192         if (cmd == OBD_BRW_READ) {
2193                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2194                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2195                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2196                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2197                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2198         } else {
2199                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2200                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2201                                  cli->cl_w_in_flight);
2202                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2203                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2204                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2205         }
2206
2207         client_obd_list_lock(&cli->cl_loi_list_lock);
2208
2209         if (cmd == OBD_BRW_READ)
2210                 cli->cl_r_in_flight++;
2211         else
2212                 cli->cl_w_in_flight++;
2213
2214         /* queued sync pages can be torn down while the pages
2215          * were between the pending list and the rpc */
2216         tmp = NULL;
2217         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2218                 /* only one oap gets a request reference */
2219                 if (tmp == NULL)
2220                         tmp = oap;
2221                 if (oap->oap_interrupted && !req->rq_intr) {
2222                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2223                                oap, req);
2224                         ptlrpc_mark_interrupted(req);
2225                 }
2226         }
2227         if (tmp != NULL)
2228                 tmp->oap_request = ptlrpc_request_addref(req);
2229
2230         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2231                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2232
2233         req->rq_interpret_reply = brw_interpret_oap;
2234         ptlrpcd_add_req(req);
2235         RETURN(1);
2236 }
2237
2238 #define LOI_DEBUG(LOI, STR, args...)                                     \
2239         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2240                !list_empty(&(LOI)->loi_cli_item),                        \
2241                (LOI)->loi_write_lop.lop_num_pending,                     \
2242                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2243                (LOI)->loi_read_lop.lop_num_pending,                      \
2244                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2245                args)                                                     \
2246
2247 /* This is called by osc_check_rpcs() to find which objects have pages that
2248  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2249 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2250 {
2251         ENTRY;
2252         /* first return all objects which we already know to have
2253          * pages ready to be stuffed into rpcs */
2254         if (!list_empty(&cli->cl_loi_ready_list))
2255                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2256                                   struct lov_oinfo, loi_cli_item));
2257
2258         /* then if we have cache waiters, return all objects with queued
2259          * writes.  This is especially important when many small files
2260          * have filled up the cache and not been fired into rpcs because
2261          * they don't pass the nr_pending/object threshhold */
2262         if (!list_empty(&cli->cl_cache_waiters) &&
2263             !list_empty(&cli->cl_loi_write_list))
2264                 RETURN(list_entry(cli->cl_loi_write_list.next,
2265                                   struct lov_oinfo, loi_write_item));
2266
2267         /* then return all queued objects when we have an invalid import
2268          * so that they get flushed */
2269         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2270                 if (!list_empty(&cli->cl_loi_write_list))
2271                         RETURN(list_entry(cli->cl_loi_write_list.next,
2272                                           struct lov_oinfo, loi_write_item));
2273                 if (!list_empty(&cli->cl_loi_read_list))
2274                         RETURN(list_entry(cli->cl_loi_read_list.next,
2275                                           struct lov_oinfo, loi_read_item));
2276         }
2277         RETURN(NULL);
2278 }
2279
2280 /* called with the loi list lock held */
2281 static void osc_check_rpcs(struct client_obd *cli)
2282 {
2283         struct lov_oinfo *loi;
2284         int rc = 0, race_counter = 0;
2285         ENTRY;
2286
2287         while ((loi = osc_next_loi(cli)) != NULL) {
2288                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2289
2290                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2291                         break;
2292
2293                 /* attempt some read/write balancing by alternating between
2294                  * reads and writes in an object.  The makes_rpc checks here
2295                  * would be redundant if we were getting read/write work items
2296                  * instead of objects.  we don't want send_oap_rpc to drain a
2297                  * partial read pending queue when we're given this object to
2298                  * do io on writes while there are cache waiters */
2299                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2300                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2301                                               &loi->loi_write_lop);
2302                         if (rc < 0)
2303                                 break;
2304                         if (rc > 0)
2305                                 race_counter = 0;
2306                         else
2307                                 race_counter++;
2308                 }
2309                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2310                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2311                                               &loi->loi_read_lop);
2312                         if (rc < 0)
2313                                 break;
2314                         if (rc > 0)
2315                                 race_counter = 0;
2316                         else
2317                                 race_counter++;
2318                 }
2319
2320                 /* attempt some inter-object balancing by issueing rpcs
2321                  * for each object in turn */
2322                 if (!list_empty(&loi->loi_cli_item))
2323                         list_del_init(&loi->loi_cli_item);
2324                 if (!list_empty(&loi->loi_write_item))
2325                         list_del_init(&loi->loi_write_item);
2326                 if (!list_empty(&loi->loi_read_item))
2327                         list_del_init(&loi->loi_read_item);
2328
2329                 loi_list_maint(cli, loi);
2330
2331                 /* send_oap_rpc fails with 0 when make_ready tells it to
2332                  * back off.  llite's make_ready does this when it tries
2333                  * to lock a page queued for write that is already locked.
2334                  * we want to try sending rpcs from many objects, but we
2335                  * don't want to spin failing with 0.  */
2336                 if (race_counter == 10)
2337                         break;
2338         }
2339         EXIT;
2340 }
2341
2342 /* we're trying to queue a page in the osc so we're subject to the
2343  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2344  * If the osc's queued pages are already at that limit, then we want to sleep
2345  * until there is space in the osc's queue for us.  We also may be waiting for
2346  * write credits from the OST if there are RPCs in flight that may return some
2347  * before we fall back to sync writes.
2348  *
2349  * We need this know our allocation was granted in the presence of signals */
2350 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2351 {
2352         int rc;
2353         ENTRY;
2354         client_obd_list_lock(&cli->cl_loi_list_lock);
2355         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2356         client_obd_list_unlock(&cli->cl_loi_list_lock);
2357         RETURN(rc);
2358 };
2359
2360 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2361  * grant or cache space. */
2362 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2363                            struct osc_async_page *oap)
2364 {
2365         struct osc_cache_waiter ocw;
2366         struct l_wait_info lwi = { 0 };
2367
2368         ENTRY;
2369
2370         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2371                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2372                cli->cl_dirty_max, obd_max_dirty_pages,
2373                cli->cl_lost_grant, cli->cl_avail_grant);
2374
2375         /* force the caller to try sync io.  this can jump the list
2376          * of queued writes and create a discontiguous rpc stream */
2377         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2378             loi->loi_ar.ar_force_sync)
2379                 RETURN(-EDQUOT);
2380
2381         /* Hopefully normal case - cache space and write credits available */
2382         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2383             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2384             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2385                 /* account for ourselves */
2386                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2387                 RETURN(0);
2388         }
2389
2390         /* Make sure that there are write rpcs in flight to wait for.  This
2391          * is a little silly as this object may not have any pending but
2392          * other objects sure might. */
2393         if (cli->cl_w_in_flight) {
2394                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2395                 cfs_waitq_init(&ocw.ocw_waitq);
2396                 ocw.ocw_oap = oap;
2397                 ocw.ocw_rc = 0;
2398
2399                 loi_list_maint(cli, loi);
2400                 osc_check_rpcs(cli);
2401                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2402
2403                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2404                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2405
2406                 client_obd_list_lock(&cli->cl_loi_list_lock);
2407                 if (!list_empty(&ocw.ocw_entry)) {
2408                         list_del(&ocw.ocw_entry);
2409                         RETURN(-EINTR);
2410                 }
2411                 RETURN(ocw.ocw_rc);
2412         }
2413
2414         RETURN(-EDQUOT);
2415 }
2416
2417 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2418                         struct lov_oinfo *loi, cfs_page_t *page,
2419                         obd_off offset, struct obd_async_page_ops *ops,
2420                         void *data, void **res)
2421 {
2422         struct osc_async_page *oap;
2423         ENTRY;
2424
2425         if (!page)
2426                 return size_round(sizeof(*oap));
2427
2428         oap = *res;
2429         oap->oap_magic = OAP_MAGIC;
2430         oap->oap_cli = &exp->exp_obd->u.cli;
2431         oap->oap_loi = loi;
2432
2433         oap->oap_caller_ops = ops;
2434         oap->oap_caller_data = data;
2435
2436         oap->oap_page = page;
2437         oap->oap_obj_off = offset;
2438
2439         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2440         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2441         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2442
2443         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2444
2445         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2446         RETURN(0);
2447 }
2448
2449 struct osc_async_page *oap_from_cookie(void *cookie)
2450 {
2451         struct osc_async_page *oap = cookie;
2452         if (oap->oap_magic != OAP_MAGIC)
2453                 return ERR_PTR(-EINVAL);
2454         return oap;
2455 };
2456
2457 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2458                               struct lov_oinfo *loi, void *cookie,
2459                               int cmd, obd_off off, int count,
2460                               obd_flag brw_flags, enum async_flags async_flags)
2461 {
2462         struct client_obd *cli = &exp->exp_obd->u.cli;
2463         struct osc_async_page *oap;
2464         int rc = 0;
2465         ENTRY;
2466
2467         oap = oap_from_cookie(cookie);
2468         if (IS_ERR(oap))
2469                 RETURN(PTR_ERR(oap));
2470
2471         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2472                 RETURN(-EIO);
2473
2474         if (!list_empty(&oap->oap_pending_item) ||
2475             !list_empty(&oap->oap_urgent_item) ||
2476             !list_empty(&oap->oap_rpc_item))
2477                 RETURN(-EBUSY);
2478
2479         /* check if the file's owner/group is over quota */
2480 #ifdef HAVE_QUOTA_SUPPORT
2481         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2482                 struct obd_async_page_ops *ops;
2483                 struct obdo *oa;
2484
2485                 OBDO_ALLOC(oa);
2486                 if (oa == NULL)
2487                         RETURN(-ENOMEM);
2488
2489                 ops = oap->oap_caller_ops;
2490                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2491                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2492                     NO_QUOTA)
2493                         rc = -EDQUOT;
2494
2495                 OBDO_FREE(oa);
2496                 if (rc)
2497                         RETURN(rc);
2498         }
2499 #endif
2500
2501         if (loi == NULL)
2502                 loi = lsm->lsm_oinfo[0];
2503
2504         client_obd_list_lock(&cli->cl_loi_list_lock);
2505
2506         oap->oap_cmd = cmd;
2507         oap->oap_page_off = off;
2508         oap->oap_count = count;
2509         oap->oap_brw_flags = brw_flags;
2510         oap->oap_async_flags = async_flags;
2511
2512         if (cmd & OBD_BRW_WRITE) {
2513                 rc = osc_enter_cache(cli, loi, oap);
2514                 if (rc) {
2515                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2516                         RETURN(rc);
2517                 }
2518         }
2519
2520         osc_oap_to_pending(oap);
2521         loi_list_maint(cli, loi);
2522
2523         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2524                   cmd);
2525
2526         osc_check_rpcs(cli);
2527         client_obd_list_unlock(&cli->cl_loi_list_lock);
2528
2529         RETURN(0);
2530 }
2531
2532 /* aka (~was & now & flag), but this is more clear :) */
2533 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2534
2535 static int osc_set_async_flags(struct obd_export *exp,
2536                                struct lov_stripe_md *lsm,
2537                                struct lov_oinfo *loi, void *cookie,
2538                                obd_flag async_flags)
2539 {
2540         struct client_obd *cli = &exp->exp_obd->u.cli;
2541         struct loi_oap_pages *lop;
2542         struct osc_async_page *oap;
2543         int rc = 0;
2544         ENTRY;
2545
2546         oap = oap_from_cookie(cookie);
2547         if (IS_ERR(oap))
2548                 RETURN(PTR_ERR(oap));
2549
2550         /*
2551          * bug 7311: OST-side locking is only supported for liblustre for now
2552          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2553          * implementation has to handle case where OST-locked page was picked
2554          * up by, e.g., ->writepage().
2555          */
2556         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2557         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2558                                      * tread here. */
2559
2560         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2561                 RETURN(-EIO);
2562
2563         if (loi == NULL)
2564                 loi = lsm->lsm_oinfo[0];
2565
2566         if (oap->oap_cmd & OBD_BRW_WRITE) {
2567                 lop = &loi->loi_write_lop;
2568         } else {
2569                 lop = &loi->loi_read_lop;
2570         }
2571
2572         client_obd_list_lock(&cli->cl_loi_list_lock);
2573
2574         if (list_empty(&oap->oap_pending_item))
2575                 GOTO(out, rc = -EINVAL);
2576
2577         if ((oap->oap_async_flags & async_flags) == async_flags)
2578                 GOTO(out, rc = 0);
2579
2580         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2581                 oap->oap_async_flags |= ASYNC_READY;
2582
2583         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2584                 if (list_empty(&oap->oap_rpc_item)) {
2585                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2586                         loi_list_maint(cli, loi);
2587                 }
2588         }
2589
2590         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2591                         oap->oap_async_flags);
2592 out:
2593         osc_check_rpcs(cli);
2594         client_obd_list_unlock(&cli->cl_loi_list_lock);
2595         RETURN(rc);
2596 }
2597
2598 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2599                              struct lov_oinfo *loi,
2600                              struct obd_io_group *oig, void *cookie,
2601                              int cmd, obd_off off, int count,
2602                              obd_flag brw_flags,
2603                              obd_flag async_flags)
2604 {
2605         struct client_obd *cli = &exp->exp_obd->u.cli;
2606         struct osc_async_page *oap;
2607         struct loi_oap_pages *lop;
2608         int rc = 0;
2609         ENTRY;
2610
2611         oap = oap_from_cookie(cookie);
2612         if (IS_ERR(oap))
2613                 RETURN(PTR_ERR(oap));
2614
2615         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2616                 RETURN(-EIO);
2617
2618         if (!list_empty(&oap->oap_pending_item) ||
2619             !list_empty(&oap->oap_urgent_item) ||
2620             !list_empty(&oap->oap_rpc_item))
2621                 RETURN(-EBUSY);
2622
2623         if (loi == NULL)
2624                 loi = lsm->lsm_oinfo[0];
2625
2626         client_obd_list_lock(&cli->cl_loi_list_lock);
2627
2628         oap->oap_cmd = cmd;
2629         oap->oap_page_off = off;
2630         oap->oap_count = count;
2631         oap->oap_brw_flags = brw_flags;
2632         oap->oap_async_flags = async_flags;
2633
2634         if (cmd & OBD_BRW_WRITE)
2635                 lop = &loi->loi_write_lop;
2636         else
2637                 lop = &loi->loi_read_lop;
2638
2639         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2640         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2641                 oap->oap_oig = oig;
2642                 rc = oig_add_one(oig, &oap->oap_occ);
2643         }
2644
2645         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2646                   oap, oap->oap_page, rc);
2647
2648         client_obd_list_unlock(&cli->cl_loi_list_lock);
2649
2650         RETURN(rc);
2651 }
2652
2653 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2654                                  struct loi_oap_pages *lop, int cmd)
2655 {
2656         struct list_head *pos, *tmp;
2657         struct osc_async_page *oap;
2658
2659         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2660                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2661                 list_del(&oap->oap_pending_item);
2662                 osc_oap_to_pending(oap);
2663         }
2664         loi_list_maint(cli, loi);
2665 }
2666
2667 static int osc_trigger_group_io(struct obd_export *exp,
2668                                 struct lov_stripe_md *lsm,
2669                                 struct lov_oinfo *loi,
2670                                 struct obd_io_group *oig)
2671 {
2672         struct client_obd *cli = &exp->exp_obd->u.cli;
2673         ENTRY;
2674
2675         if (loi == NULL)
2676                 loi = lsm->lsm_oinfo[0];
2677
2678         client_obd_list_lock(&cli->cl_loi_list_lock);
2679
2680         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2681         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2682
2683         osc_check_rpcs(cli);
2684         client_obd_list_unlock(&cli->cl_loi_list_lock);
2685
2686         RETURN(0);
2687 }
2688
2689 static int osc_teardown_async_page(struct obd_export *exp,
2690                                    struct lov_stripe_md *lsm,
2691                                    struct lov_oinfo *loi, void *cookie)
2692 {
2693         struct client_obd *cli = &exp->exp_obd->u.cli;
2694         struct loi_oap_pages *lop;
2695         struct osc_async_page *oap;
2696         int rc = 0;
2697         ENTRY;
2698
2699         oap = oap_from_cookie(cookie);
2700         if (IS_ERR(oap))
2701                 RETURN(PTR_ERR(oap));
2702
2703         if (loi == NULL)
2704                 loi = lsm->lsm_oinfo[0];
2705
2706         if (oap->oap_cmd & OBD_BRW_WRITE) {
2707                 lop = &loi->loi_write_lop;
2708         } else {
2709                 lop = &loi->loi_read_lop;
2710         }
2711
2712         client_obd_list_lock(&cli->cl_loi_list_lock);
2713
2714         if (!list_empty(&oap->oap_rpc_item))
2715                 GOTO(out, rc = -EBUSY);
2716
2717         osc_exit_cache(cli, oap, 0);
2718         osc_wake_cache_waiters(cli);
2719
2720         if (!list_empty(&oap->oap_urgent_item)) {
2721                 list_del_init(&oap->oap_urgent_item);
2722                 oap->oap_async_flags &= ~ASYNC_URGENT;
2723         }
2724         if (!list_empty(&oap->oap_pending_item)) {
2725                 list_del_init(&oap->oap_pending_item);
2726                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2727         }
2728         loi_list_maint(cli, loi);
2729
2730         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2731 out:
2732         client_obd_list_unlock(&cli->cl_loi_list_lock);
2733         RETURN(rc);
2734 }
2735
2736 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2737                                     int flags)
2738 {
2739         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2740
2741         if (lock == NULL) {
2742                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2743                 return;
2744         }
2745         lock_res_and_lock(lock);
2746 #ifdef __KERNEL__
2747 #ifdef __LINUX__
2748         /* Liang XXX: Darwin and Winnt checking should be added */
2749         if (lock->l_ast_data && lock->l_ast_data != data) {
2750                 struct inode *new_inode = data;
2751                 struct inode *old_inode = lock->l_ast_data;
2752                 if (!(old_inode->i_state & I_FREEING))
2753                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2754                 LASSERTF(old_inode->i_state & I_FREEING,
2755                          "Found existing inode %p/%lu/%u state %lu in lock: "
2756                          "setting data to %p/%lu/%u\n", old_inode,
2757                          old_inode->i_ino, old_inode->i_generation,
2758                          old_inode->i_state,
2759                          new_inode, new_inode->i_ino, new_inode->i_generation);
2760         }
2761 #endif
2762 #endif
2763         lock->l_ast_data = data;
2764         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2765         unlock_res_and_lock(lock);
2766         LDLM_LOCK_PUT(lock);
2767 }
2768
2769 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2770                              ldlm_iterator_t replace, void *data)
2771 {
2772         struct ldlm_res_id res_id = { .name = {0} };
2773         struct obd_device *obd = class_exp2obd(exp);
2774
2775         res_id.name[0] = lsm->lsm_object_id;
2776         res_id.name[2] = lsm->lsm_object_gr;
2777
2778         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2779         return 0;
2780 }
2781
2782 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2783                             int intent, int rc)
2784 {
2785         ENTRY;
2786
2787         if (intent) {
2788                 /* The request was created before ldlm_cli_enqueue call. */
2789                 if (rc == ELDLM_LOCK_ABORTED) {
2790                         struct ldlm_reply *rep;
2791
2792                         /* swabbed by ldlm_cli_enqueue() */
2793                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2794                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2795                                              sizeof(*rep));
2796                         LASSERT(rep != NULL);
2797                         if (rep->lock_policy_res1)
2798                                 rc = rep->lock_policy_res1;
2799                 }
2800         }
2801
2802         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2803                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2804                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2805                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2806                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2807         }
2808
2809         /* Call the update callback. */
2810         rc = oinfo->oi_cb_up(oinfo, rc);
2811         RETURN(rc);
2812 }
2813
2814 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2815                                  struct osc_enqueue_args *aa, int rc)
2816 {
2817         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2818         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2819         struct ldlm_lock *lock;
2820
2821         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2822          * be valid. */
2823         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2824
2825         /* Complete obtaining the lock procedure. */
2826         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2827                                    aa->oa_ei->ei_mode,
2828                                    &aa->oa_oi->oi_flags,
2829                                    &lsm->lsm_oinfo[0]->loi_lvb,
2830                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2831                                    lustre_swab_ost_lvb,
2832                                    aa->oa_oi->oi_lockh, rc);
2833
2834         /* Complete osc stuff. */
2835         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2836
2837         /* Release the lock for async request. */
2838         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2839                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2840
2841         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2842                  aa->oa_oi->oi_lockh, req, aa);
2843         LDLM_LOCK_PUT(lock);
2844         return rc;
2845 }
2846
2847 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2848  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2849  * other synchronous requests, however keeping some locks and trying to obtain
2850  * others may take a considerable amount of time in a case of ost failure; and
2851  * when other sync requests do not get released lock from a client, the client
2852  * is excluded from the cluster -- such scenarious make the life difficult, so
2853  * release locks just after they are obtained. */
2854 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2855                        struct ldlm_enqueue_info *einfo,
2856                        struct ptlrpc_request_set *rqset)
2857 {
2858         struct ldlm_res_id res_id = { .name = {0} };
2859         struct obd_device *obd = exp->exp_obd;
2860         struct ldlm_reply *rep;
2861         struct ptlrpc_request *req = NULL;
2862         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2863         int rc;
2864         ENTRY;
2865
2866         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2867         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2868
2869         /* Filesystem lock extents are extended to page boundaries so that
2870          * dealing with the page cache is a little smoother.  */
2871         oinfo->oi_policy.l_extent.start -=
2872                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2873         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2874
2875         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2876                 goto no_match;
2877
2878         /* Next, search for already existing extent locks that will cover us */
2879         rc = ldlm_lock_match(obd->obd_namespace,
2880                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2881                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2882                              oinfo->oi_lockh);
2883         if (rc == 1) {
2884                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2885                                         oinfo->oi_flags);
2886                 if (intent) {
2887                         /* I would like to be able to ASSERT here that rss <=
2888                          * kms, but I can't, for reasons which are explained in
2889                          * lov_enqueue() */
2890                 }
2891
2892                 /* We already have a lock, and it's referenced */
2893                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2894
2895                 /* For async requests, decref the lock. */
2896                 if (rqset)
2897                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2898
2899                 RETURN(ELDLM_OK);
2900         }
2901
2902         /* If we're trying to read, we also search for an existing PW lock.  The
2903          * VFS and page cache already protect us locally, so lots of readers/
2904          * writers can share a single PW lock.
2905          *
2906          * There are problems with conversion deadlocks, so instead of
2907          * converting a read lock to a write lock, we'll just enqueue a new
2908          * one.
2909          *
2910          * At some point we should cancel the read lock instead of making them
2911          * send us a blocking callback, but there are problems with canceling
2912          * locks out from other users right now, too. */
2913
2914         if (einfo->ei_mode == LCK_PR) {
2915                 rc = ldlm_lock_match(obd->obd_namespace,
2916                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2917                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2918                                      LCK_PW, oinfo->oi_lockh);
2919                 if (rc == 1) {
2920                         /* FIXME: This is not incredibly elegant, but it might
2921                          * be more elegant than adding another parameter to
2922                          * lock_match.  I want a second opinion. */
2923                         /* addref the lock only if not async requests. */
2924                         if (!rqset)
2925                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2926                         osc_set_data_with_check(oinfo->oi_lockh,
2927                                                 einfo->ei_cbdata,
2928                                                 oinfo->oi_flags);
2929                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2930                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2931                         RETURN(ELDLM_OK);
2932                 }
2933         }
2934
2935  no_match:
2936         if (intent) {
2937                 int size[3] = {
2938                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2939                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2940                         [DLM_LOCKREQ_OFF + 1] = 0 };
2941
2942                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2943                 if (req == NULL)
2944                         RETURN(-ENOMEM);
2945
2946                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2947                 size[DLM_REPLY_REC_OFF] =
2948                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2949                 ptlrpc_req_set_repsize(req, 3, size);
2950         }
2951
2952         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2953         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2954
2955         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2956                               &oinfo->oi_policy, &oinfo->oi_flags,
2957                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2958                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2959                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2960                               rqset ? 1 : 0);
2961         if (rqset) {
2962                 if (!rc) {
2963                         struct osc_enqueue_args *aa;
2964                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2965                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2966                         aa->oa_oi = oinfo;
2967                         aa->oa_ei = einfo;
2968                         aa->oa_exp = exp;
2969
2970                         req->rq_interpret_reply = osc_enqueue_interpret;
2971                         ptlrpc_set_add_req(rqset, req);
2972                 } else if (intent) {
2973                         ptlrpc_req_finished(req);
2974                 }
2975                 RETURN(rc);
2976         }
2977
2978         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2979         if (intent)
2980                 ptlrpc_req_finished(req);
2981
2982         RETURN(rc);
2983 }
2984
2985 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2986                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2987                      int *flags, void *data, struct lustre_handle *lockh)
2988 {
2989         struct ldlm_res_id res_id = { .name = {0} };
2990         struct obd_device *obd = exp->exp_obd;
2991         int rc;
2992         int lflags = *flags;
2993         ENTRY;
2994
2995         res_id.name[0] = lsm->lsm_object_id;
2996         res_id.name[2] = lsm->lsm_object_gr;
2997
2998         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2999
3000         /* Filesystem lock extents are extended to page boundaries so that
3001          * dealing with the page cache is a little smoother */
3002         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3003         policy->l_extent.end |= ~CFS_PAGE_MASK;
3004
3005         /* Next, search for already existing extent locks that will cover us */
3006         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3007                              &res_id, type, policy, mode, lockh);
3008         if (rc) {
3009                 //if (!(*flags & LDLM_FL_TEST_LOCK))
3010                         osc_set_data_with_check(lockh, data, lflags);
3011                 RETURN(rc);
3012         }
3013         /* If we're trying to read, we also search for an existing PW lock.  The
3014          * VFS and page cache already protect us locally, so lots of readers/
3015          * writers can share a single PW lock. */
3016         if (mode == LCK_PR) {
3017                 rc = ldlm_lock_match(obd->obd_namespace,
3018                                      lflags | LDLM_FL_LVB_READY, &res_id,
3019                                      type, policy, LCK_PW, lockh);
3020                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3021                         /* FIXME: This is not incredibly elegant, but it might
3022                          * be more elegant than adding another parameter to
3023                          * lock_match.  I want a second opinion. */
3024                         osc_set_data_with_check(lockh, data, lflags);
3025                         ldlm_lock_addref(lockh, LCK_PR);
3026                         ldlm_lock_decref(lockh, LCK_PW);
3027                 }
3028         }
3029         RETURN(rc);
3030 }
3031
3032 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3033                       __u32 mode, struct lustre_handle *lockh)
3034 {
3035         ENTRY;
3036
3037         if (unlikely(mode == LCK_GROUP))
3038                 ldlm_lock_decref_and_cancel(lockh, mode);
3039         else
3040                 ldlm_lock_decref(lockh, mode);
3041
3042         RETURN(0);
3043 }
3044
3045 static int osc_cancel_unused(struct obd_export *exp,
3046                              struct lov_stripe_md *lsm, int flags,
3047                              void *opaque)
3048 {
3049         struct obd_device *obd = class_exp2obd(exp);
3050         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3051
3052         if (lsm != NULL) {
3053                 res_id.name[0] = lsm->lsm_object_id;
3054                 res_id.name[2] = lsm->lsm_object_gr;
3055                 resp = &res_id;
3056         }
3057
3058         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3059 }
3060
3061 static int osc_join_lru(struct obd_export *exp,
3062                         struct lov_stripe_md *lsm, int join)
3063 {
3064         struct obd_device *obd = class_exp2obd(exp);
3065         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3066
3067         if (lsm != NULL) {
3068                 res_id.name[0] = lsm->lsm_object_id;
3069                 res_id.name[2] = lsm->lsm_object_gr;
3070                 resp = &res_id;
3071         }
3072
3073         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3074 }
3075
3076 static int osc_statfs_interpret(struct ptlrpc_request *req,
3077                                 struct osc_async_args *aa, int rc)
3078 {
3079         struct obd_statfs *msfs;
3080         ENTRY;
3081
3082         if (rc != 0)
3083                 GOTO(out, rc);
3084
3085         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3086                                   lustre_swab_obd_statfs);
3087         if (msfs == NULL) {
3088                 CERROR("Can't unpack obd_statfs\n");
3089                 GOTO(out, rc = -EPROTO);
3090         }
3091
3092         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3093 out:
3094         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3095         RETURN(rc);
3096 }
3097
3098 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3099                             __u64 max_age, struct ptlrpc_request_set *rqset)
3100 {
3101         struct ptlrpc_request *req;
3102         struct osc_async_args *aa;
3103         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3104         ENTRY;
3105
3106         /* We could possibly pass max_age in the request (as an absolute
3107          * timestamp or a "seconds.usec ago") so the target can avoid doing
3108          * extra calls into the filesystem if that isn't necessary (e.g.
3109          * during mount that would help a bit).  Having relative timestamps
3110          * is not so great if request processing is slow, while absolute
3111          * timestamps are not ideal because they need time synchronization. */
3112         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3113                               OST_STATFS, 1, NULL, NULL);
3114         if (!req)
3115                 RETURN(-ENOMEM);
3116
3117         ptlrpc_req_set_repsize(req, 2, size);
3118         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3119
3120         req->rq_interpret_reply = osc_statfs_interpret;
3121         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3122         aa = (struct osc_async_args *)&req->rq_async_args;
3123         aa->aa_oi = oinfo;
3124
3125         ptlrpc_set_add_req(rqset, req);
3126         RETURN(0);
3127 }
3128
3129 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3130                       __u64 max_age)
3131 {
3132         struct obd_statfs *msfs;
3133         struct ptlrpc_request *req;
3134         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3135         ENTRY;
3136
3137         /* We could possibly pass max_age in the request (as an absolute
3138          * timestamp or a "seconds.usec ago") so the target can avoid doing
3139          * extra calls into the filesystem if that isn't necessary (e.g.
3140          * during mount that would help a bit).  Having relative timestamps
3141          * is not so great if request processing is slow, while absolute
3142          * timestamps are not ideal because they need time synchronization. */
3143         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3144                               OST_STATFS, 1, NULL, NULL);
3145         if (!req)
3146                 RETURN(-ENOMEM);
3147
3148         ptlrpc_req_set_repsize(req, 2, size);
3149         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3150
3151         rc = ptlrpc_queue_wait(req);
3152         if (rc)
3153                 GOTO(out, rc);
3154
3155         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3156                                   lustre_swab_obd_statfs);
3157         if (msfs == NULL) {
3158                 CERROR("Can't unpack obd_statfs\n");
3159                 GOTO(out, rc = -EPROTO);
3160         }
3161
3162         memcpy(osfs, msfs, sizeof(*osfs));
3163
3164         EXIT;
3165  out:
3166         ptlrpc_req_finished(req);
3167         return rc;
3168 }
3169
3170 /* Retrieve object striping information.
3171  *
3172  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3173  * the maximum number of OST indices which will fit in the user buffer.
3174  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3175  */
3176 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3177 {
3178         struct lov_user_md lum, *lumk;
3179         int rc = 0, lum_size;
3180         ENTRY;
3181
3182         if (!lsm)
3183                 RETURN(-ENODATA);
3184
3185         if (copy_from_user(&lum, lump, sizeof(lum)))
3186                 RETURN(-EFAULT);
3187
3188         if (lum.lmm_magic != LOV_USER_MAGIC)
3189                 RETURN(-EINVAL);
3190
3191         if (lum.lmm_stripe_count > 0) {
3192                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3193                 OBD_ALLOC(lumk, lum_size);
3194                 if (!lumk)
3195                         RETURN(-ENOMEM);
3196
3197                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3198                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3199         } else {
3200                 lum_size = sizeof(lum);
3201                 lumk = &lum;
3202         }
3203
3204         lumk->lmm_object_id = lsm->lsm_object_id;
3205         lumk->lmm_object_gr = lsm->lsm_object_gr;
3206         lumk->lmm_stripe_count = 1;
3207
3208         if (copy_to_user(lump, lumk, lum_size))
3209                 rc = -EFAULT;
3210
3211         if (lumk != &lum)
3212                 OBD_FREE(lumk, lum_size);
3213
3214         RETURN(rc);
3215 }
3216
3217
3218 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3219                          void *karg, void *uarg)
3220 {
3221         struct obd_device *obd = exp->exp_obd;
3222         struct obd_ioctl_data *data = karg;
3223         int err = 0;
3224         ENTRY;
3225
3226 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3227         MOD_INC_USE_COUNT;
3228 #else
3229         if (!try_module_get(THIS_MODULE)) {
3230                 CERROR("Can't get module. Is it alive?");
3231                 return -EINVAL;
3232         }
3233 #endif
3234         switch (cmd) {
3235         case OBD_IOC_LOV_GET_CONFIG: {
3236                 char *buf;
3237                 struct lov_desc *desc;
3238                 struct obd_uuid uuid;
3239
3240                 buf = NULL;
3241                 len = 0;
3242                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3243                         GOTO(out, err = -EINVAL);
3244
3245                 data = (struct obd_ioctl_data *)buf;
3246
3247                 if (sizeof(*desc) > data->ioc_inllen1) {
3248                         obd_ioctl_freedata(buf, len);
3249                         GOTO(out, err = -EINVAL);
3250                 }
3251
3252                 if (data->ioc_inllen2 < sizeof(uuid)) {
3253                         obd_ioctl_freedata(buf, len);
3254                         GOTO(out, err = -EINVAL);
3255                 }
3256
3257                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3258                 desc->ld_tgt_count = 1;
3259                 desc->ld_active_tgt_count = 1;
3260                 desc->ld_default_stripe_count = 1;
3261                 desc->ld_default_stripe_size = 0;
3262                 desc->ld_default_stripe_offset = 0;
3263                 desc->ld_pattern = 0;
3264                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3265
3266                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3267
3268                 err = copy_to_user((void *)uarg, buf, len);
3269                 if (err)
3270                         err = -EFAULT;
3271                 obd_ioctl_freedata(buf, len);
3272                 GOTO(out, err);
3273         }
3274         case LL_IOC_LOV_SETSTRIPE:
3275                 err = obd_alloc_memmd(exp, karg);
3276                 if (err > 0)
3277                         err = 0;
3278                 GOTO(out, err);
3279         case LL_IOC_LOV_GETSTRIPE:
3280                 err = osc_getstripe(karg, uarg);
3281                 GOTO(out, err);
3282         case OBD_IOC_CLIENT_RECOVER:
3283                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3284                                             data->ioc_inlbuf1);
3285                 if (err > 0)
3286                         err = 0;
3287                 GOTO(out, err);
3288         case IOC_OSC_SET_ACTIVE:
3289                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3290                                                data->ioc_offset);
3291                 GOTO(out, err);
3292         case OBD_IOC_POLL_QUOTACHECK:
3293                 err = lquota_poll_check(quota_interface, exp,
3294                                         (struct if_quotacheck *)karg);
3295                 GOTO(out, err);
3296         default:
3297                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3298                        cmd, cfs_curproc_comm());
3299                 GOTO(out, err = -ENOTTY);
3300         }
3301 out:
3302 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3303         MOD_DEC_USE_COUNT;
3304 #else
3305         module_put(THIS_MODULE);
3306 #endif
3307         return err;
3308 }
3309
3310 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3311                         void *key, __u32 *vallen, void *val)
3312 {
3313         ENTRY;
3314         if (!vallen || !val)
3315                 RETURN(-EFAULT);
3316
3317         if (keylen > strlen("lock_to_stripe") &&
3318             strcmp(key, "lock_to_stripe") == 0) {
3319                 __u32 *stripe = val;
3320                 *vallen = sizeof(*stripe);
3321                 *stripe = 0;
3322                 RETURN(0);
3323         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3324                 struct ptlrpc_request *req;
3325                 obd_id *reply;
3326                 char *bufs[2] = { NULL, key };
3327                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3328
3329                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3330                                       OST_GET_INFO, 2, size, bufs);
3331                 if (req == NULL)
3332                         RETURN(-ENOMEM);
3333
3334                 size[REPLY_REC_OFF] = *vallen;
3335                 ptlrpc_req_set_repsize(req, 2, size);
3336                 rc = ptlrpc_queue_wait(req);
3337                 if (rc)
3338                         GOTO(out, rc);
3339
3340                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3341                                            lustre_swab_ost_last_id);
3342                 if (reply == NULL) {
3343                         CERROR("Can't unpack OST last ID\n");
3344                         GOTO(out, rc = -EPROTO);
3345                 }
3346                 *((obd_id *)val) = *reply;
3347         out:
3348                 ptlrpc_req_finished(req);
3349                 RETURN(rc);
3350         }
3351         RETURN(-EINVAL);
3352 }
3353
3354 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3355                                           void *aa, int rc)
3356 {
3357         struct llog_ctxt *ctxt;
3358         struct obd_import *imp = req->rq_import;
3359         ENTRY;
3360
3361         if (rc != 0)
3362                 RETURN(rc);
3363
3364         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3365         if (ctxt) {
3366                 if (rc == 0)
3367                         rc = llog_initiator_connect(ctxt);
3368                 else
3369                         CERROR("cannot establish connection for "
3370                                "ctxt %p: %d\n", ctxt, rc);
3371         }
3372
3373         spin_lock(&imp->imp_lock);
3374         imp->imp_server_timeout = 1;
3375         imp->imp_pingable = 1;
3376         spin_unlock(&imp->imp_lock);
3377         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3378
3379         RETURN(rc);
3380 }
3381
3382 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3383                               void *key, obd_count vallen, void *val,
3384                               struct ptlrpc_request_set *set)
3385 {
3386         struct ptlrpc_request *req;
3387         struct obd_device  *obd = exp->exp_obd;
3388         struct obd_import *imp = class_exp2cliimp(exp);
3389         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3390         char *bufs[3] = { NULL, key, val };
3391         ENTRY;
3392
3393         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3394
3395         if (KEY_IS(KEY_NEXT_ID)) {
3396                 if (vallen != sizeof(obd_id))
3397                         RETURN(-EINVAL);
3398                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3399                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3400                        exp->exp_obd->obd_name,
3401                        obd->u.cli.cl_oscc.oscc_next_id);
3402
3403                 RETURN(0);
3404         }
3405
3406         if (KEY_IS("unlinked")) {
3407                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3408                 spin_lock(&oscc->oscc_lock);
3409                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3410                 spin_unlock(&oscc->oscc_lock);
3411                 RETURN(0);
3412         }
3413
3414         if (KEY_IS(KEY_INIT_RECOV)) {
3415                 if (vallen != sizeof(int))
3416                         RETURN(-EINVAL);
3417                 spin_lock(&imp->imp_lock);
3418                 imp->imp_initial_recov = *(int *)val;
3419                 spin_unlock(&imp->imp_lock);
3420                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3421                        exp->exp_obd->obd_name,
3422                        imp->imp_initial_recov);
3423                 RETURN(0);
3424         }
3425
3426         if (KEY_IS("checksum")) {
3427                 if (vallen != sizeof(int))
3428                         RETURN(-EINVAL);
3429                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3430                 RETURN(0);
3431         }
3432
3433         if (KEY_IS(KEY_FLUSH_CTX)) {
3434                 sptlrpc_import_flush_my_ctx(imp);
3435                 RETURN(0);
3436         }
3437
3438         if (!set)
3439                 RETURN(-EINVAL);
3440
3441         /* We pass all other commands directly to OST. Since nobody calls osc
3442            methods directly and everybody is supposed to go through LOV, we
3443            assume lov checked invalid values for us.
3444            The only recognised values so far are evict_by_nid and mds_conn.
3445            Even if something bad goes through, we'd get a -EINVAL from OST
3446            anyway. */
3447
3448         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3449                               bufs);
3450         if (req == NULL)
3451                 RETURN(-ENOMEM);
3452
3453         if (KEY_IS("mds_conn")) {
3454                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3455
3456                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3457                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3458                 LASSERT(oscc->oscc_oa.o_gr > 0);
3459                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3460         }
3461
3462         ptlrpc_req_set_repsize(req, 1, NULL);
3463         ptlrpc_set_add_req(set, req);
3464         ptlrpc_check_set(set);
3465
3466         RETURN(0);
3467 }
3468
3469
3470 static struct llog_operations osc_size_repl_logops = {
3471         lop_cancel: llog_obd_repl_cancel
3472 };
3473
3474 static struct llog_operations osc_mds_ost_orig_logops;
3475 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3476                          struct obd_device *tgt, int count,
3477                          struct llog_catid *catid, struct obd_uuid *uuid)
3478 {
3479         int rc;
3480         ENTRY;
3481
3482         spin_lock(&obd->obd_dev_lock);
3483         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3484                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3485                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3486                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3487                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3488                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3489         }
3490         spin_unlock(&obd->obd_dev_lock);
3491
3492         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3493                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3494         if (rc) {
3495                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3496                 GOTO (out, rc);
3497         }
3498
3499         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3500                         &osc_size_repl_logops);
3501         if (rc)
3502                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3503 out:
3504         if (rc) {
3505                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3506                        obd->obd_name, tgt->obd_name, count, catid, rc);
3507                 CERROR("logid "LPX64":0x%x\n",
3508                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3509         }
3510         RETURN(rc);
3511 }
3512
3513 static int osc_llog_finish(struct obd_device *obd, int count)
3514 {
3515         struct llog_ctxt *ctxt;
3516         int rc = 0, rc2 = 0;
3517         ENTRY;
3518
3519         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3520         if (ctxt)
3521                 rc = llog_cleanup(ctxt);
3522
3523         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3524         if (ctxt)
3525                 rc2 = llog_cleanup(ctxt);
3526         if (!rc)
3527                 rc = rc2;
3528
3529         RETURN(rc);
3530 }
3531
3532 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3533                          struct obd_uuid *cluuid,
3534                          struct obd_connect_data *data)
3535 {
3536         struct client_obd *cli = &obd->u.cli;
3537
3538         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3539                 long lost_grant;
3540
3541                 client_obd_list_lock(&cli->cl_loi_list_lock);
3542                 data->ocd_grant = cli->cl_avail_grant ?:
3543                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3544                 lost_grant = cli->cl_lost_grant;
3545                 cli->cl_lost_grant = 0;
3546                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3547
3548                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3549                        "cl_lost_grant: %ld\n", data->ocd_grant,
3550                        cli->cl_avail_grant, lost_grant);
3551                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3552                        " ocd_grant: %d\n", data->ocd_connect_flags,
3553                        data->ocd_version, data->ocd_grant);
3554         }
3555
3556         RETURN(0);
3557 }
3558
3559 static int osc_disconnect(struct obd_export *exp)
3560 {
3561         struct obd_device *obd = class_exp2obd(exp);
3562         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3563         int rc;
3564
3565         if (obd->u.cli.cl_conn_count == 1)
3566                 /* flush any remaining cancel messages out to the target */
3567                 llog_sync(ctxt, exp);
3568
3569         rc = client_disconnect_export(exp);
3570         return rc;
3571 }
3572
3573 static int osc_import_event(struct obd_device *obd,
3574                             struct obd_import *imp,
3575                             enum obd_import_event event)
3576 {
3577         struct client_obd *cli;
3578         int rc = 0;
3579
3580         ENTRY;
3581         LASSERT(imp->imp_obd == obd);
3582
3583         switch (event) {
3584         case IMP_EVENT_DISCON: {
3585                 /* Only do this on the MDS OSC's */
3586                 if (imp->imp_server_timeout) {
3587                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3588
3589                         spin_lock(&oscc->oscc_lock);
3590                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3591                         spin_unlock(&oscc->oscc_lock);
3592                 }
3593                 cli = &obd->u.cli;
3594                 client_obd_list_lock(&cli->cl_loi_list_lock);
3595                 cli->cl_avail_grant = 0;
3596                 cli->cl_lost_grant = 0;
3597                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3598                 break;
3599         }
3600         case IMP_EVENT_INACTIVE: {
3601                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3602                 break;
3603         }
3604         case IMP_EVENT_INVALIDATE: {
3605                 struct ldlm_namespace *ns = obd->obd_namespace;
3606
3607                 /* Reset grants */
3608                 cli = &obd->u.cli;
3609                 client_obd_list_lock(&cli->cl_loi_list_lock);
3610                 /* all pages go to failing rpcs due to the invalid import */
3611                 osc_check_rpcs(cli);
3612                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3613
3614                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3615
3616                 break;
3617         }
3618         case IMP_EVENT_ACTIVE: {
3619                 /* Only do this on the MDS OSC's */
3620                 if (imp->imp_server_timeout) {
3621                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3622
3623                         spin_lock(&oscc->oscc_lock);
3624                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3625                         spin_unlock(&oscc->oscc_lock);
3626                 }
3627                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3628                 break;
3629         }
3630         case IMP_EVENT_OCD: {
3631                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3632
3633                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3634                         osc_init_grant(&obd->u.cli, ocd);
3635
3636                 /* See bug 7198 */
3637                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3638                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3639
3640                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3641                 break;
3642         }
3643         default:
3644                 CERROR("Unknown import event %d\n", event);
3645                 LBUG();
3646         }
3647         RETURN(rc);
3648 }
3649
3650 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3651 {
3652         int rc;
3653         ENTRY;
3654
3655         ENTRY;
3656         rc = ptlrpcd_addref();
3657         if (rc)
3658                 RETURN(rc);
3659
3660         rc = client_obd_setup(obd, lcfg);
3661         if (rc) {
3662                 ptlrpcd_decref();
3663         } else {
3664                 struct lprocfs_static_vars lvars;
3665                 struct client_obd *cli = &obd->u.cli;
3666
3667                 lprocfs_init_vars(osc, &lvars);
3668                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3669                         lproc_osc_attach_seqstat(obd);
3670                         ptlrpc_lprocfs_register_obd(obd);
3671                 }
3672
3673                 oscc_init(obd);
3674                 /* We need to allocate a few requests more, because
3675                    brw_interpret_oap tries to create new requests before freeing
3676                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3677                    reserved, but I afraid that might be too much wasted RAM
3678                    in fact, so 2 is just my guess and still should work. */
3679                 cli->cl_import->imp_rq_pool =
3680                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3681                                             OST_MAXREQSIZE,
3682                                             ptlrpc_add_rqs_to_pool);
3683         }
3684
3685         RETURN(rc);
3686 }
3687
3688 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3689 {
3690         int rc = 0;
3691         ENTRY;
3692
3693         switch (stage) {
3694         case OBD_CLEANUP_EARLY: {
3695                 struct obd_import *imp;
3696                 imp = obd->u.cli.cl_import;
3697                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3698                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3699                 ptlrpc_deactivate_import(imp);
3700                 spin_lock(&imp->imp_lock);
3701                 imp->imp_pingable = 0;
3702                 spin_unlock(&imp->imp_lock);
3703                 break;
3704         }
3705         case OBD_CLEANUP_EXPORTS: {
3706                 /* If we set up but never connected, the
3707                    client import will not have been cleaned. */
3708                 if (obd->u.cli.cl_import) {
3709                         struct obd_import *imp;
3710                         imp = obd->u.cli.cl_import;
3711                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3712                                obd->obd_name);
3713                         ptlrpc_invalidate_import(imp);
3714                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3715                         class_destroy_import(imp);
3716                         obd->u.cli.cl_import = NULL;
3717                 }
3718                 break;
3719         }
3720         case OBD_CLEANUP_SELF_EXP:
3721                 rc = obd_llog_finish(obd, 0);
3722                 if (rc != 0)
3723                         CERROR("failed to cleanup llogging subsystems\n");
3724                 break;
3725         case OBD_CLEANUP_OBD:
3726                 break;
3727         }
3728         RETURN(rc);
3729 }
3730
3731 int osc_cleanup(struct obd_device *obd)
3732 {
3733         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3734         int rc;
3735
3736         ENTRY;
3737         ptlrpc_lprocfs_unregister_obd(obd);
3738         lprocfs_obd_cleanup(obd);
3739
3740         spin_lock(&oscc->oscc_lock);
3741         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3742         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3743         spin_unlock(&oscc->oscc_lock);
3744
3745         /* free memory of osc quota cache */
3746         lquota_cleanup(quota_interface, obd);
3747
3748         rc = client_obd_cleanup(obd);
3749
3750         ptlrpcd_decref();
3751         RETURN(rc);
3752 }
3753
3754 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3755 {
3756         struct lustre_cfg *lcfg = buf;
3757         struct lprocfs_static_vars lvars;
3758         int rc = 0;
3759
3760         lprocfs_init_vars(osc, &lvars);
3761
3762         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3763         return(rc);
3764 }
3765
3766 struct obd_ops osc_obd_ops = {
3767         .o_owner                = THIS_MODULE,
3768         .o_setup                = osc_setup,
3769         .o_precleanup           = osc_precleanup,
3770         .o_cleanup              = osc_cleanup,
3771         .o_add_conn             = client_import_add_conn,
3772         .o_del_conn             = client_import_del_conn,
3773         .o_connect              = client_connect_import,
3774         .o_reconnect            = osc_reconnect,
3775         .o_disconnect           = osc_disconnect,
3776         .o_statfs               = osc_statfs,
3777         .o_statfs_async         = osc_statfs_async,
3778         .o_packmd               = osc_packmd,
3779         .o_unpackmd             = osc_unpackmd,
3780         .o_precreate            = osc_precreate,
3781         .o_create               = osc_create,
3782         .o_destroy              = osc_destroy,
3783         .o_getattr              = osc_getattr,
3784         .o_getattr_async        = osc_getattr_async,
3785         .o_setattr              = osc_setattr,
3786         .o_setattr_async        = osc_setattr_async,
3787         .o_brw                  = osc_brw,
3788         .o_brw_async            = osc_brw_async,
3789         .o_prep_async_page      = osc_prep_async_page,
3790         .o_queue_async_io       = osc_queue_async_io,
3791         .o_set_async_flags      = osc_set_async_flags,
3792         .o_queue_group_io       = osc_queue_group_io,
3793         .o_trigger_group_io     = osc_trigger_group_io,
3794         .o_teardown_async_page  = osc_teardown_async_page,
3795         .o_punch                = osc_punch,
3796         .o_sync                 = osc_sync,
3797         .o_enqueue              = osc_enqueue,
3798         .o_match                = osc_match,
3799         .o_change_cbdata        = osc_change_cbdata,
3800         .o_cancel               = osc_cancel,
3801         .o_cancel_unused        = osc_cancel_unused,
3802         .o_join_lru             = osc_join_lru,
3803         .o_iocontrol            = osc_iocontrol,
3804         .o_get_info             = osc_get_info,
3805         .o_set_info_async       = osc_set_info_async,
3806         .o_import_event         = osc_import_event,
3807         .o_llog_init            = osc_llog_init,
3808         .o_llog_finish          = osc_llog_finish,
3809         .o_process_config       = osc_process_config,
3810 };
3811 int __init osc_init(void)
3812 {
3813         struct lprocfs_static_vars lvars;
3814         int rc;
3815         ENTRY;
3816
3817         lprocfs_init_vars(osc, &lvars);
3818
3819         request_module("lquota");
3820         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3821         lquota_init(quota_interface);
3822         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3823
3824         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3825                                  LUSTRE_OSC_NAME, NULL);
3826         if (rc) {
3827                 if (quota_interface)
3828                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3829                 RETURN(rc);
3830         }
3831
3832         RETURN(rc);
3833 }
3834
3835 #ifdef __KERNEL__
3836 static void /*__exit*/ osc_exit(void)
3837 {
3838         lquota_exit(quota_interface);
3839         if (quota_interface)
3840                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3841
3842         class_unregister_type(LUSTRE_OSC_NAME);
3843 }
3844
3845 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3846 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3847 MODULE_LICENSE("GPL");
3848
3849 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3850 #endif