Whamcloud - gitweb
d7bd9ded9d6b763f1e380adb48e871a9b3b6b5a9
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453 out_req:
454         ptlrpc_req_finished(req);
455 out:
456         if (rc && !*ea)
457                 obd_free_memmd(exp, &lsm);
458         RETURN(rc);
459 }
460
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462                                struct osc_async_args *aa, int rc)
463 {
464         struct ost_body *body;
465         ENTRY;
466
467         if (rc != 0)
468                 GOTO(out, rc);
469
470         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471                                   lustre_swab_ost_body);
472         if (body == NULL) {
473                 CERROR ("can't unpack ost_body\n");
474                 GOTO(out, rc = -EPROTO);
475         }
476
477         *aa->aa_oi->oi_oa = body->oa;
478 out:
479         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
480         RETURN(rc);
481 }
482
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484                      struct obd_trans_info *oti,
485                      struct ptlrpc_request_set *rqset)
486 {
487         struct ptlrpc_request *req;
488         struct osc_async_args *aa;
489         struct ost_body *body;
490         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491         ENTRY;
492
493         if (!oinfo->oi_oa) {
494                 CERROR("oa NULL\n");
495                 RETURN(-EINVAL);
496         }
497
498         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500                               OST_PUNCH, 3, size, NULL);
501         if (!req)
502                 RETURN(-ENOMEM);
503
504         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
505
506         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
507         /* overload the size and blocks fields in the oa with start/end */
508         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
509         body->oa.o_size = oinfo->oi_policy.l_extent.start;
510         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
511         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
512
513         ptlrpc_req_set_repsize(req, 2, size);
514
515         req->rq_interpret_reply = osc_punch_interpret;
516         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
517         aa = (struct osc_async_args *)&req->rq_async_args;
518         aa->aa_oi = oinfo;
519         ptlrpc_set_add_req(rqset, req);
520
521         RETURN(0);
522 }
523
524 static int osc_sync(struct obd_export *exp, struct obdo *oa,
525                     struct lov_stripe_md *md, obd_size start, obd_size end,
526                     void *capa)
527 {
528         struct ptlrpc_request *req;
529         struct ost_body *body;
530         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
531         ENTRY;
532
533         if (!oa) {
534                 CERROR("oa NULL\n");
535                 RETURN(-EINVAL);
536         }
537
538         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
539
540         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
541                               OST_SYNC, 3, size, NULL);
542         if (!req)
543                 RETURN(-ENOMEM);
544
545         /* overload the size and blocks fields in the oa with start/end */
546         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
547         body->oa = *oa;
548         body->oa.o_size = start;
549         body->oa.o_blocks = end;
550         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
551
552         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
553
554         ptlrpc_req_set_repsize(req, 2, size);
555
556         rc = ptlrpc_queue_wait(req);
557         if (rc)
558                 GOTO(out, rc);
559
560         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
561                                   lustre_swab_ost_body);
562         if (body == NULL) {
563                 CERROR ("can't unpack ost_body\n");
564                 GOTO (out, rc = -EPROTO);
565         }
566
567         *oa = body->oa;
568
569         EXIT;
570  out:
571         ptlrpc_req_finished(req);
572         return rc;
573 }
574
575 /* Find and cancel locally locks matched by @mode in the resource found by
576  * @objid. Found locks are added into @cancel list. Returns the amount of
577  * locks added to @cancels list. */
578 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
579                                    struct list_head *cancels, ldlm_mode_t mode,
580                                    int lock_flags)
581 {
582         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
583         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
584         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
585         int count;
586         ENTRY;
587
588         if (res == NULL)
589                 RETURN(0);
590
591         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
592                                            lock_flags, 0, NULL);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 /* Destroy requests can be async always on the client, and we don't even really
598  * care about the return code since the client cannot do anything at all about
599  * a destroy failure.
600  * When the MDS is unlinking a filename, it saves the file objects into a
601  * recovery llog, and these object records are cancelled when the OST reports
602  * they were destroyed and sync'd to disk (i.e. transaction committed).
603  * If the client dies, or the OST is down when the object should be destroyed,
604  * the records are not cancelled, and when the OST reconnects to the MDS next,
605  * it will retrieve the llog unlink logs and then sends the log cancellation
606  * cookies to the MDS after committing destroy transactions. */
607 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
608                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
609                        struct obd_export *md_export)
610 {
611         CFS_LIST_HEAD(cancels);
612         struct ptlrpc_request *req;
613         struct ost_body *body;
614         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
615         int count, bufcount = 2;
616         ENTRY;
617
618         if (!oa) {
619                 CERROR("oa NULL\n");
620                 RETURN(-EINVAL);
621         }
622
623         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
624                                         LDLM_FL_DISCARD_DATA);
625         if (exp_connect_cancelset(exp) && count) {
626                 bufcount = 3;
627                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,
628                                                              OST_DESTROY);
629         }
630         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
631                               OST_DESTROY, bufcount, size, NULL);
632         if (exp_connect_cancelset(exp) && req)
633                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
634         else
635                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
636
637         if (!req)
638                 RETURN(-ENOMEM);
639
640         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
641
642         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
644                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
645                        sizeof(*oti->oti_logcookies));
646         body->oa = *oa;
647
648         ptlrpc_req_set_repsize(req, 2, size);
649
650         ptlrpcd_add_req(req);
651         RETURN(0);
652 }
653
654 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
655                                 long writing_bytes)
656 {
657         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
658
659         LASSERT(!(oa->o_valid & bits));
660
661         oa->o_valid |= bits;
662         client_obd_list_lock(&cli->cl_loi_list_lock);
663         oa->o_dirty = cli->cl_dirty;
664         if (cli->cl_dirty > cli->cl_dirty_max) {
665                 CERROR("dirty %lu > dirty_max %lu\n",
666                        cli->cl_dirty, cli->cl_dirty_max);
667                 oa->o_undirty = 0;
668         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
669                 CERROR("dirty %d > system dirty_max %d\n",
670                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
671                 oa->o_undirty = 0;
672         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty, cli->cl_dirty_max);
675                 oa->o_undirty = 0;
676         } else {
677                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
678                                 (cli->cl_max_rpcs_in_flight + 1);
679                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
680         }
681         oa->o_grant = cli->cl_avail_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         client_obd_list_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 /* caller must hold loi_list_lock */
690 static void osc_consume_write_grant(struct client_obd *cli,
691                                     struct brw_page *pga)
692 {
693         atomic_inc(&obd_dirty_pages);
694         cli->cl_dirty += CFS_PAGE_SIZE;
695         cli->cl_avail_grant -= CFS_PAGE_SIZE;
696         pga->flag |= OBD_BRW_FROM_GRANT;
697         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
698                CFS_PAGE_SIZE, pga, pga->pg);
699         LASSERT(cli->cl_avail_grant >= 0);
700 }
701
702 /* the companion to osc_consume_write_grant, called when a brw has completed.
703  * must be called with the loi lock held. */
704 static void osc_release_write_grant(struct client_obd *cli,
705                                     struct brw_page *pga, int sent)
706 {
707         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
708         ENTRY;
709
710         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
711                 EXIT;
712                 return;
713         }
714
715         pga->flag &= ~OBD_BRW_FROM_GRANT;
716         atomic_dec(&obd_dirty_pages);
717         cli->cl_dirty -= CFS_PAGE_SIZE;
718         if (!sent) {
719                 cli->cl_lost_grant += CFS_PAGE_SIZE;
720                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
721                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
722         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
723                 /* For short writes we shouldn't count parts of pages that
724                  * span a whole block on the OST side, or our accounting goes
725                  * wrong.  Should match the code in filter_grant_check. */
726                 int offset = pga->off & ~CFS_PAGE_MASK;
727                 int count = pga->count + (offset & (blocksize - 1));
728                 int end = (offset + pga->count) & (blocksize - 1);
729                 if (end)
730                         count += blocksize - end;
731
732                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
733                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
734                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
735                        cli->cl_avail_grant, cli->cl_dirty);
736         }
737
738         EXIT;
739 }
740
741 static unsigned long rpcs_in_flight(struct client_obd *cli)
742 {
743         return cli->cl_r_in_flight + cli->cl_w_in_flight;
744 }
745
746 /* caller must hold loi_list_lock */
747 void osc_wake_cache_waiters(struct client_obd *cli)
748 {
749         struct list_head *l, *tmp;
750         struct osc_cache_waiter *ocw;
751
752         ENTRY;
753         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
754                 /* if we can't dirty more, we must wait until some is written */
755                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
756                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
757                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
758                                "osc max %ld, sys max %d\n", cli->cl_dirty,
759                                cli->cl_dirty_max, obd_max_dirty_pages);
760                         return;
761                 }
762
763                 /* if still dirty cache but no grant wait for pending RPCs that
764                  * may yet return us some grant before doing sync writes */
765                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
766                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
767                                cli->cl_w_in_flight);
768                         return;
769                 }
770
771                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
772                 list_del_init(&ocw->ocw_entry);
773                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
774                         /* no more RPCs in flight to return grant, do sync IO */
775                         ocw->ocw_rc = -EDQUOT;
776                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
777                 } else {
778                         osc_consume_write_grant(cli,
779                                                 &ocw->ocw_oap->oap_brw_page);
780                 }
781
782                 cfs_waitq_signal(&ocw->ocw_waitq);
783         }
784
785         EXIT;
786 }
787
788 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
789 {
790         client_obd_list_lock(&cli->cl_loi_list_lock);
791         cli->cl_avail_grant = ocd->ocd_grant;
792         client_obd_list_unlock(&cli->cl_loi_list_lock);
793
794         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
795                cli->cl_avail_grant, cli->cl_lost_grant);
796         LASSERT(cli->cl_avail_grant >= 0);
797 }
798
799 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
800 {
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
803         if (body->oa.o_valid & OBD_MD_FLGRANT)
804                 cli->cl_avail_grant += body->oa.o_grant;
805         /* waiters are woken in brw_interpret_oap */
806         client_obd_list_unlock(&cli->cl_loi_list_lock);
807 }
808
809 /* We assume that the reason this OSC got a short read is because it read
810  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
811  * via the LOV, and it _knows_ it's reading inside the file, it's just that
812  * this stripe never got written at or beyond this stripe offset yet. */
813 static void handle_short_read(int nob_read, obd_count page_count,
814                               struct brw_page **pga)
815 {
816         char *ptr;
817         int i = 0;
818
819         /* skip bytes read OK */
820         while (nob_read > 0) {
821                 LASSERT (page_count > 0);
822
823                 if (pga[i]->count > nob_read) {
824                         /* EOF inside this page */
825                         ptr = cfs_kmap(pga[i]->pg) +
826                                 (pga[i]->off & ~CFS_PAGE_MASK);
827                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
828                         cfs_kunmap(pga[i]->pg);
829                         page_count--;
830                         i++;
831                         break;
832                 }
833
834                 nob_read -= pga[i]->count;
835                 page_count--;
836                 i++;
837         }
838
839         /* zero remaining pages */
840         while (page_count-- > 0) {
841                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
842                 memset(ptr, 0, pga[i]->count);
843                 cfs_kunmap(pga[i]->pg);
844                 i++;
845         }
846 }
847
848 static int check_write_rcs(struct ptlrpc_request *req,
849                            int requested_nob, int niocount,
850                            obd_count page_count, struct brw_page **pga)
851 {
852         int    *remote_rcs, i;
853
854         /* return error if any niobuf was in error */
855         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
856                                         sizeof(*remote_rcs) * niocount, NULL);
857         if (remote_rcs == NULL) {
858                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
859                 return(-EPROTO);
860         }
861         if (lustre_msg_swabbed(req->rq_repmsg))
862                 for (i = 0; i < niocount; i++)
863                         __swab32s(&remote_rcs[i]);
864
865         for (i = 0; i < niocount; i++) {
866                 if (remote_rcs[i] < 0)
867                         return(remote_rcs[i]);
868
869                 if (remote_rcs[i] != 0) {
870                         CERROR("rc[%d] invalid (%d) req %p\n",
871                                 i, remote_rcs[i], req);
872                         return(-EPROTO);
873                 }
874         }
875
876         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
877                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
878                        requested_nob, req->rq_bulk->bd_nob_transferred);
879                 return(-EPROTO);
880         }
881
882         return (0);
883 }
884
885 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
886 {
887         if (p1->flag != p2->flag) {
888                 unsigned mask = ~OBD_BRW_FROM_GRANT;
889
890                 /* warn if we try to combine flags that we don't know to be
891                  * safe to combine */
892                 if ((p1->flag & mask) != (p2->flag & mask))
893                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
894                                "same brw?\n", p1->flag, p2->flag);
895                 return 0;
896         }
897
898         return (p1->off + p1->count == p2->off);
899 }
900
901 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
902                                    struct brw_page **pga, int opc)
903 {
904         __u32 cksum = ~0;
905         int i = 0;
906
907         LASSERT (pg_count > 0);
908         while (nob > 0 && pg_count > 0) {
909                 char *ptr = cfs_kmap(pga[i]->pg);
910                 int off = pga[i]->off & ~CFS_PAGE_MASK;
911                 int count = pga[i]->count > nob ? nob : pga[i]->count;
912
913                 /* corrupt the data before we compute the checksum, to
914                  * simulate an OST->client data error */
915                 if (i == 0 && opc == OST_READ &&
916                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
917                         memcpy(ptr + off, "bad1", min(4, nob));
918                 cksum = crc32_le(cksum, ptr + off, count);
919                 cfs_kunmap(pga[i]->pg);
920                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
921                                off, cksum);
922
923                 nob -= pga[i]->count;
924                 pg_count--;
925                 i++;
926         }
927         /* For sending we only compute the wrong checksum instead
928          * of corrupting the data so it is still correct on a redo */
929         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
930                 cksum++;
931
932         return cksum;
933 }
934
935 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
936                                 struct lov_stripe_md *lsm, obd_count page_count,
937                                 struct brw_page **pga, 
938                                 struct ptlrpc_request **reqp,
939                                 struct obd_capa *ocapa)
940 {
941         struct ptlrpc_request   *req;
942         struct ptlrpc_bulk_desc *desc;
943         struct ost_body         *body;
944         struct obd_ioobj        *ioobj;
945         struct niobuf_remote    *niobuf;
946         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
947         int niocount, i, requested_nob, opc, rc;
948         struct ptlrpc_request_pool *pool;
949         struct lustre_capa      *capa;
950         struct osc_brw_async_args *aa;
951
952         ENTRY;
953         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
954         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
955
956         if ((cmd & OBD_BRW_WRITE) != 0) {
957                 opc = OST_WRITE;
958                 pool = cli->cl_import->imp_rq_pool;
959         } else {
960                 opc = OST_READ;
961                 pool = NULL;
962         }
963
964         for (niocount = i = 1; i < page_count; i++) {
965                 if (!can_merge_pages(pga[i - 1], pga[i]))
966                         niocount++;
967         }
968
969         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
970         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
971         if (ocapa)
972                 size[REQ_REC_OFF + 3] = sizeof(*capa);
973
974         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
975                                    size, NULL, pool, NULL);
976         if (req == NULL)
977                 RETURN (-ENOMEM);
978
979         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
980
981         if (opc == OST_WRITE)
982                 desc = ptlrpc_prep_bulk_imp (req, page_count,
983                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
984         else
985                 desc = ptlrpc_prep_bulk_imp (req, page_count,
986                                              BULK_PUT_SINK, OST_BULK_PORTAL);
987         if (desc == NULL)
988                 GOTO(out, rc = -ENOMEM);
989         /* NB request now owns desc and will free it when it gets freed */
990
991         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
992         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
993         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
994                                 niocount * sizeof(*niobuf));
995
996         body->oa = *oa;
997
998         obdo_to_ioobj(oa, ioobj);
999         ioobj->ioo_bufcnt = niocount;
1000         if (ocapa) {
1001                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1002                                       sizeof(*capa));
1003                 capa_cpy(capa, ocapa);
1004                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1005         }
1006
1007         LASSERT (page_count > 0);
1008         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1009                 struct brw_page *pg = pga[i];
1010                 struct brw_page *pg_prev = pga[i - 1];
1011
1012                 LASSERT(pg->count > 0);
1013                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1014                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1015                          pg->off, pg->count);
1016 #ifdef __LINUX__
1017                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1018                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1019                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1020                          i, page_count,
1021                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1022                          pg_prev->pg, page_private(pg_prev->pg),
1023                          pg_prev->pg->index, pg_prev->off);
1024 #else
1025                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1026                          "i %d p_c %u\n", i, page_count);
1027 #endif
1028                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1029                         (pg->flag & OBD_BRW_SRVLOCK));
1030
1031                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1032                                       pg->count);
1033                 requested_nob += pg->count;
1034
1035                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1036                         niobuf--;
1037                         niobuf->len += pg->count;
1038                 } else {
1039                         niobuf->offset = pg->off;
1040                         niobuf->len    = pg->count;
1041                         niobuf->flags  = pg->flag;
1042                 }
1043         }
1044
1045         LASSERT((void *)(niobuf - niocount) ==
1046                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1047                                niocount * sizeof(*niobuf)));
1048         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1049
1050         /* size[REQ_REC_OFF] still sizeof (*body) */
1051         if (opc == OST_WRITE) {
1052                 if (unlikely(cli->cl_checksum)) {
1053                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1054                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1055                                                              page_count, pga,
1056                                                              OST_WRITE);
1057                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1058                                body->oa.o_cksum);
1059                         /* save this in 'oa', too, for later checking */
1060                         oa->o_valid |= OBD_MD_FLCKSUM;
1061                 } else {
1062                         /* clear out the checksum flag, in case this is a
1063                          * resend but cl_checksum is no longer set. b=11238 */
1064                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1065                 }
1066                 oa->o_cksum = body->oa.o_cksum;
1067                 /* 1 RC per niobuf */
1068                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1069                 ptlrpc_req_set_repsize(req, 3, size);
1070         } else {
1071                 if (unlikely(cli->cl_checksum))
1072                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1073                 /* 1 RC for the whole I/O */
1074                 ptlrpc_req_set_repsize(req, 2, size);
1075         }
1076
1077         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1078         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1079         aa->aa_oa = oa;
1080         aa->aa_requested_nob = requested_nob;
1081         aa->aa_nio_count = niocount;
1082         aa->aa_page_count = page_count;
1083         aa->aa_resends = 0;
1084         aa->aa_ppga = pga;
1085         aa->aa_cli = cli;
1086         INIT_LIST_HEAD(&aa->aa_oaps);
1087
1088         *reqp = req;
1089         RETURN (0);
1090
1091  out:
1092         ptlrpc_req_finished (req);
1093         RETURN (rc);
1094 }
1095
1096 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1097                                 __u32 client_cksum, __u32 server_cksum,
1098                                 int nob, obd_count page_count,
1099                                 struct brw_page **pga)
1100 {
1101         __u32 new_cksum;
1102         char *msg;
1103
1104         if (server_cksum == client_cksum) {
1105                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1106                 return 0;
1107         }
1108
1109         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1110
1111         if (new_cksum == server_cksum)
1112                 msg = "changed on the client after we checksummed it - "
1113                       "likely false positive due to mmap IO (bug 11742)";
1114         else if (new_cksum == client_cksum)
1115                 msg = "changed in transit before arrival at OST";
1116         else
1117                 msg = "changed in transit AND doesn't match the original - "
1118                       "likely false positive due to mmap IO (bug 11742)";
1119
1120         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1121                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1122                            "["LPU64"-"LPU64"]\n",
1123                            msg, libcfs_nid2str(peer->nid),
1124                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1125                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1126                                                         (__u64)0,
1127                            oa->o_id,
1128                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1129                            pga[0]->off,
1130                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1131         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1132                client_cksum, server_cksum, new_cksum);
1133         return 1;        
1134 }
1135
1136 /* Note rc enters this function as number of bytes transferred */
1137 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1138 {
1139         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1140         const lnet_process_id_t *peer =
1141                         &req->rq_import->imp_connection->c_peer;
1142         struct client_obd *cli = aa->aa_cli;
1143         struct ost_body *body;
1144         __u32 client_cksum = 0;
1145         ENTRY;
1146
1147         if (rc < 0 && rc != -EDQUOT)
1148                 RETURN(rc);
1149
1150         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1151         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1152                                   lustre_swab_ost_body);
1153         if (body == NULL) {
1154                 CERROR ("Can't unpack body\n");
1155                 RETURN(-EPROTO);
1156         }
1157
1158         /* set/clear over quota flag for a uid/gid */
1159         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1160             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1161                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1162                              body->oa.o_gid, body->oa.o_valid,
1163                              body->oa.o_flags);
1164
1165         if (rc < 0)
1166                 RETURN(rc);
1167
1168         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1169                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1170
1171         osc_update_grant(cli, body);
1172
1173         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1174                 if (rc > 0) {
1175                         CERROR ("Unexpected +ve rc %d\n", rc);
1176                         RETURN(-EPROTO);
1177                 }
1178                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1179
1180                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1181                              client_cksum &&
1182                              check_write_checksum(&body->oa, peer, client_cksum,
1183                                                   body->oa.o_cksum,
1184                                                   aa->aa_requested_nob,
1185                                                   aa->aa_page_count,
1186                                                   aa->aa_ppga)))
1187                         RETURN(-EAGAIN);
1188
1189                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1190                         RETURN(-EAGAIN);
1191
1192                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1193                                      aa->aa_page_count, aa->aa_ppga);
1194                 GOTO(out, rc);
1195         }
1196
1197         /* The rest of this function executes only for OST_READs */
1198         if (rc > aa->aa_requested_nob) {
1199                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1200                        aa->aa_requested_nob);
1201                 RETURN(-EPROTO);
1202         }
1203
1204         if (rc != req->rq_bulk->bd_nob_transferred) {
1205                 CERROR ("Unexpected rc %d (%d transferred)\n",
1206                         rc, req->rq_bulk->bd_nob_transferred);
1207                 return (-EPROTO);
1208         }
1209
1210         if (rc < aa->aa_requested_nob)
1211                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1212
1213         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1214                                          aa->aa_ppga))
1215                 GOTO(out, rc = -EAGAIN);
1216
1217         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1218                 static int cksum_counter;
1219                 __u32      server_cksum = body->oa.o_cksum;
1220                 char      *via;
1221                 char      *router;
1222
1223                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1224                                                  aa->aa_ppga, OST_READ);
1225
1226                 if (peer->nid == req->rq_bulk->bd_sender) {
1227                         via = router = "";
1228                 } else {
1229                         via = " via ";
1230                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1231                 }
1232
1233                 if (server_cksum == ~0 && rc > 0) {
1234                         CERROR("Protocol error: server %s set the 'checksum' "
1235                                "bit, but didn't send a checksum.  Not fatal, "
1236                                "but please tell CFS.\n",
1237                                libcfs_nid2str(peer->nid));
1238                 } else if (server_cksum != client_cksum) {
1239                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1240                                            "%s%s%s inum "LPU64"/"LPU64" object "
1241                                            LPU64"/"LPU64" extent "
1242                                            "["LPU64"-"LPU64"]\n",
1243                                            req->rq_import->imp_obd->obd_name,
1244                                            libcfs_nid2str(peer->nid),
1245                                            via, router,
1246                                            body->oa.o_valid & OBD_MD_FLFID ?
1247                                                 body->oa.o_fid : (__u64)0,
1248                                            body->oa.o_valid & OBD_MD_FLFID ?
1249                                                 body->oa.o_generation :(__u64)0,
1250                                            body->oa.o_id,
1251                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1252                                                 body->oa.o_gr : (__u64)0,
1253                                            aa->aa_ppga[0]->off,
1254                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1255                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1256                                                                         1);
1257                         CERROR("client %x, server %x\n",
1258                                client_cksum, server_cksum);
1259                         cksum_counter = 0;
1260                         aa->aa_oa->o_cksum = client_cksum;
1261                         rc = -EAGAIN;
1262                 } else {
1263                         cksum_counter++;
1264                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1265                         rc = 0;
1266                 }
1267         } else if (unlikely(client_cksum)) {
1268                 static int cksum_missed;
1269
1270                 cksum_missed++;
1271                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1272                         CERROR("Checksum %u requested from %s but not sent\n",
1273                                cksum_missed, libcfs_nid2str(peer->nid));
1274         } else {
1275                 rc = 0;
1276         }
1277 out:
1278         if (rc >= 0)
1279                 *aa->aa_oa = body->oa;
1280
1281         RETURN(rc);
1282 }
1283
1284 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1285                             struct lov_stripe_md *lsm,
1286                             obd_count page_count, struct brw_page **pga,
1287                             struct obd_capa *ocapa)
1288 {
1289         struct ptlrpc_request *req;
1290         int                    rc;
1291         cfs_waitq_t            waitq;
1292         int                    resends = 0;
1293         struct l_wait_info     lwi;
1294
1295         ENTRY;
1296
1297         cfs_waitq_init(&waitq);
1298
1299 restart_bulk:
1300         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1301                                   page_count, pga, &req, ocapa);
1302         if (rc != 0)
1303                 return (rc);
1304
1305         rc = ptlrpc_queue_wait(req);
1306
1307         if (rc == -ETIMEDOUT && req->rq_resend) {
1308                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1309                 ptlrpc_req_finished(req);
1310                 goto restart_bulk;
1311         }
1312
1313         rc = osc_brw_fini_request(req, rc);
1314
1315         ptlrpc_req_finished(req);
1316         if (osc_recoverable_error(rc)) {
1317                 resends++;
1318                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1319                         CERROR("too many resend retries, returning error\n");
1320                         RETURN(-EIO);
1321                 }
1322
1323                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1324                 l_wait_event(waitq, 0, &lwi);
1325
1326                 goto restart_bulk;
1327         }
1328         
1329         RETURN (rc);
1330 }
1331
1332 int osc_brw_redo_request(struct ptlrpc_request *request,
1333                          struct osc_brw_async_args *aa)
1334 {
1335         struct ptlrpc_request *new_req;
1336         struct ptlrpc_request_set *set = request->rq_set;
1337         struct osc_brw_async_args *new_aa;
1338         struct osc_async_page *oap;
1339         int rc = 0;
1340         ENTRY;
1341
1342         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1343                 CERROR("too many resend retries, returning error\n");
1344                 RETURN(-EIO);
1345         }
1346         
1347         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1348 /*
1349         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1350         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1351                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1352                                            REQ_REC_OFF + 3);
1353 */
1354         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1355                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1356                                   aa->aa_cli, aa->aa_oa,
1357                                   NULL /* lsm unused by osc currently */,
1358                                   aa->aa_page_count, aa->aa_ppga, 
1359                                   &new_req, NULL /* ocapa */);
1360         if (rc)
1361                 RETURN(rc);
1362
1363         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1364    
1365         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1366                 if (oap->oap_request != NULL) {
1367                         LASSERTF(request == oap->oap_request,
1368                                  "request %p != oap_request %p\n",
1369                                  request, oap->oap_request);
1370                         if (oap->oap_interrupted) {
1371                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1372                                 ptlrpc_req_finished(new_req);                        
1373                                 RETURN(-EINTR);
1374                         }
1375                 }
1376         }
1377         /* New request takes over pga and oaps from old request.
1378          * Note that copying a list_head doesn't work, need to move it... */
1379         aa->aa_resends++;
1380         new_req->rq_interpret_reply = request->rq_interpret_reply;
1381         new_req->rq_async_args = request->rq_async_args;
1382         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1383
1384         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1385
1386         INIT_LIST_HEAD(&new_aa->aa_oaps);
1387         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1388         INIT_LIST_HEAD(&aa->aa_oaps);
1389
1390         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1391                 if (oap->oap_request) {
1392                         ptlrpc_req_finished(oap->oap_request);
1393                         oap->oap_request = ptlrpc_request_addref(new_req);
1394                 }
1395         }
1396
1397         /* use ptlrpc_set_add_req is safe because interpret functions work 
1398          * in check_set context. only one way exist with access to request 
1399          * from different thread got -EINTR - this way protected with 
1400          * cl_loi_list_lock */
1401         ptlrpc_set_add_req(set, new_req);
1402
1403         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1404
1405         DEBUG_REQ(D_INFO, new_req, "new request");
1406         RETURN(0);
1407 }
1408
1409 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1410 {
1411         struct osc_brw_async_args *aa = data;
1412         int                        i;
1413         int                        nob = rc;
1414         ENTRY;
1415
1416         rc = osc_brw_fini_request(req, rc);
1417         if (osc_recoverable_error(rc)) {
1418                 rc = osc_brw_redo_request(req, aa);
1419                 if (rc == 0)
1420                         RETURN(0);
1421         }
1422         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1423                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1424
1425         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1426         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1427                 aa->aa_cli->cl_w_in_flight--;
1428         else
1429                 aa->aa_cli->cl_r_in_flight--;
1430         for (i = 0; i < aa->aa_page_count; i++)
1431                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1432         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1433
1434         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1435
1436         RETURN(rc);
1437 }
1438
1439 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1440                           struct lov_stripe_md *lsm, obd_count page_count,
1441                           struct brw_page **pga, struct ptlrpc_request_set *set,
1442                           struct obd_capa *ocapa)
1443 {
1444         struct ptlrpc_request     *req;
1445         struct client_obd         *cli = &exp->exp_obd->u.cli;
1446         int                        rc, i;
1447         struct osc_brw_async_args *aa;
1448         ENTRY;
1449
1450         /* Consume write credits even if doing a sync write -
1451          * otherwise we may run out of space on OST due to grant. */
1452         if (cmd == OBD_BRW_WRITE) {
1453                 spin_lock(&cli->cl_loi_list_lock);
1454                 for (i = 0; i < page_count; i++) {
1455                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1456                                 osc_consume_write_grant(cli, pga[i]);
1457                 }
1458                 spin_unlock(&cli->cl_loi_list_lock);
1459         }
1460
1461         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1462                                   &req, ocapa);
1463
1464         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1465         if (cmd == OBD_BRW_READ) {
1466                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1467                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1468                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1469         } else {
1470                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1471                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1472                                  cli->cl_w_in_flight);
1473                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1474         }
1475
1476         if (rc == 0) {
1477                 req->rq_interpret_reply = brw_interpret;
1478                 ptlrpc_set_add_req(set, req);
1479                 client_obd_list_lock(&cli->cl_loi_list_lock);
1480                 if (cmd == OBD_BRW_READ)
1481                         cli->cl_r_in_flight++;
1482                 else
1483                         cli->cl_w_in_flight++;
1484                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1485         } else if (cmd == OBD_BRW_WRITE) {
1486                 client_obd_list_lock(&cli->cl_loi_list_lock);
1487                 for (i = 0; i < page_count; i++)
1488                         osc_release_write_grant(cli, pga[i], 0);
1489                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1490         }
1491         RETURN (rc);
1492 }
1493
1494 /*
1495  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1496  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1497  * fine for our small page arrays and doesn't require allocation.  its an
1498  * insertion sort that swaps elements that are strides apart, shrinking the
1499  * stride down until its '1' and the array is sorted.
1500  */
1501 static void sort_brw_pages(struct brw_page **array, int num)
1502 {
1503         int stride, i, j;
1504         struct brw_page *tmp;
1505
1506         if (num == 1)
1507                 return;
1508         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1509                 ;
1510
1511         do {
1512                 stride /= 3;
1513                 for (i = stride ; i < num ; i++) {
1514                         tmp = array[i];
1515                         j = i;
1516                         while (j >= stride && array[j - stride]->off > tmp->off) {
1517                                 array[j] = array[j - stride];
1518                                 j -= stride;
1519                         }
1520                         array[j] = tmp;
1521                 }
1522         } while (stride > 1);
1523 }
1524
1525 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1526 {
1527         int count = 1;
1528         int offset;
1529         int i = 0;
1530
1531         LASSERT (pages > 0);
1532         offset = pg[i]->off & ~CFS_PAGE_MASK;
1533
1534         for (;;) {
1535                 pages--;
1536                 if (pages == 0)         /* that's all */
1537                         return count;
1538
1539                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1540                         return count;   /* doesn't end on page boundary */
1541
1542                 i++;
1543                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1544                 if (offset != 0)        /* doesn't start on page boundary */
1545                         return count;
1546
1547                 count++;
1548         }
1549 }
1550
1551 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1552 {
1553         struct brw_page **ppga;
1554         int i;
1555
1556         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1557         if (ppga == NULL)
1558                 return NULL;
1559
1560         for (i = 0; i < count; i++)
1561                 ppga[i] = pga + i;
1562         return ppga;
1563 }
1564
1565 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1566 {
1567         LASSERT(ppga != NULL);
1568         OBD_FREE(ppga, sizeof(*ppga) * count);
1569 }
1570
1571 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1572                    obd_count page_count, struct brw_page *pga,
1573                    struct obd_trans_info *oti)
1574 {
1575         struct obdo *saved_oa = NULL;
1576         struct brw_page **ppga, **orig;
1577         struct obd_import *imp = class_exp2cliimp(exp);
1578         struct client_obd *cli = &imp->imp_obd->u.cli;
1579         int rc, page_count_orig;
1580         ENTRY;
1581
1582         if (cmd & OBD_BRW_CHECK) {
1583                 /* The caller just wants to know if there's a chance that this
1584                  * I/O can succeed */
1585
1586                 if (imp == NULL || imp->imp_invalid)
1587                         RETURN(-EIO);
1588                 RETURN(0);
1589         }
1590
1591         /* test_brw with a failed create can trip this, maybe others. */
1592         LASSERT(cli->cl_max_pages_per_rpc);
1593
1594         rc = 0;
1595
1596         orig = ppga = osc_build_ppga(pga, page_count);
1597         if (ppga == NULL)
1598                 RETURN(-ENOMEM);
1599         page_count_orig = page_count;
1600
1601         sort_brw_pages(ppga, page_count);
1602         while (page_count) {
1603                 obd_count pages_per_brw;
1604
1605                 if (page_count > cli->cl_max_pages_per_rpc)
1606                         pages_per_brw = cli->cl_max_pages_per_rpc;
1607                 else
1608                         pages_per_brw = page_count;
1609
1610                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1611
1612                 if (saved_oa != NULL) {
1613                         /* restore previously saved oa */
1614                         *oinfo->oi_oa = *saved_oa;
1615                 } else if (page_count > pages_per_brw) {
1616                         /* save a copy of oa (brw will clobber it) */
1617                         OBDO_ALLOC(saved_oa);
1618                         if (saved_oa == NULL)
1619                                 GOTO(out, rc = -ENOMEM);
1620                         *saved_oa = *oinfo->oi_oa;
1621                 }
1622
1623                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1624                                       pages_per_brw, ppga, oinfo->oi_capa);
1625
1626                 if (rc != 0)
1627                         break;
1628
1629                 page_count -= pages_per_brw;
1630                 ppga += pages_per_brw;
1631         }
1632
1633 out:
1634         osc_release_ppga(orig, page_count_orig);
1635
1636         if (saved_oa != NULL)
1637                 OBDO_FREE(saved_oa);
1638
1639         RETURN(rc);
1640 }
1641
1642 static int osc_brw_async(int cmd, struct obd_export *exp,
1643                          struct obd_info *oinfo, obd_count page_count,
1644                          struct brw_page *pga, struct obd_trans_info *oti,
1645                          struct ptlrpc_request_set *set)
1646 {
1647         struct brw_page **ppga, **orig;
1648         struct client_obd *cli = &exp->exp_obd->u.cli;
1649         int page_count_orig;
1650         int rc = 0;
1651         ENTRY;
1652
1653         if (cmd & OBD_BRW_CHECK) {
1654                 struct obd_import *imp = class_exp2cliimp(exp);
1655                 /* The caller just wants to know if there's a chance that this
1656                  * I/O can succeed */
1657
1658                 if (imp == NULL || imp->imp_invalid)
1659                         RETURN(-EIO);
1660                 RETURN(0);
1661         }
1662
1663         orig = ppga = osc_build_ppga(pga, page_count);
1664         if (ppga == NULL)
1665                 RETURN(-ENOMEM);
1666         page_count_orig = page_count;
1667
1668         sort_brw_pages(ppga, page_count);
1669         while (page_count) {
1670                 struct brw_page **copy;
1671                 obd_count pages_per_brw;
1672
1673                 pages_per_brw = min_t(obd_count, page_count,
1674                                       cli->cl_max_pages_per_rpc);
1675
1676                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1677
1678                 /* use ppga only if single RPC is going to fly */
1679                 if (pages_per_brw != page_count_orig || ppga != orig) {
1680                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1681                         if (copy == NULL)
1682                                 GOTO(out, rc = -ENOMEM);
1683                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1684                 } else
1685                         copy = ppga;
1686
1687                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1688                                     pages_per_brw, copy, set, oinfo->oi_capa);
1689
1690                 if (rc != 0) {
1691                         if (copy != ppga)
1692                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1693                         break;
1694                 }
1695                 if (copy == orig) {
1696                         /* we passed it to async_internal() which is
1697                          * now responsible for releasing memory */
1698                         orig = NULL;
1699                 }
1700
1701                 page_count -= pages_per_brw;
1702                 ppga += pages_per_brw;
1703         }
1704 out:
1705         if (orig)
1706                 osc_release_ppga(orig, page_count_orig);
1707         RETURN(rc);
1708 }
1709
1710 static void osc_check_rpcs(struct client_obd *cli);
1711
1712 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1713  * the dirty accounting.  Writeback completes or truncate happens before
1714  * writing starts.  Must be called with the loi lock held. */
1715 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1716                            int sent)
1717 {
1718         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1719 }
1720
1721
1722 /* This maintains the lists of pending pages to read/write for a given object
1723  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1724  * to quickly find objects that are ready to send an RPC. */
1725 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1726                          int cmd)
1727 {
1728         int optimal;
1729         ENTRY;
1730
1731         if (lop->lop_num_pending == 0)
1732                 RETURN(0);
1733
1734         /* if we have an invalid import we want to drain the queued pages
1735          * by forcing them through rpcs that immediately fail and complete
1736          * the pages.  recovery relies on this to empty the queued pages
1737          * before canceling the locks and evicting down the llite pages */
1738         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1739                 RETURN(1);
1740
1741         /* stream rpcs in queue order as long as as there is an urgent page
1742          * queued.  this is our cheap solution for good batching in the case
1743          * where writepage marks some random page in the middle of the file
1744          * as urgent because of, say, memory pressure */
1745         if (!list_empty(&lop->lop_urgent)) {
1746                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1747                 RETURN(1);
1748         }
1749         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1750         optimal = cli->cl_max_pages_per_rpc;
1751         if (cmd & OBD_BRW_WRITE) {
1752                 /* trigger a write rpc stream as long as there are dirtiers
1753                  * waiting for space.  as they're waiting, they're not going to
1754                  * create more pages to coallesce with what's waiting.. */
1755                 if (!list_empty(&cli->cl_cache_waiters)) {
1756                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1757                         RETURN(1);
1758                 }
1759                 /* +16 to avoid triggering rpcs that would want to include pages
1760                  * that are being queued but which can't be made ready until
1761                  * the queuer finishes with the page. this is a wart for
1762                  * llite::commit_write() */
1763                 optimal += 16;
1764         }
1765         if (lop->lop_num_pending >= optimal)
1766                 RETURN(1);
1767
1768         RETURN(0);
1769 }
1770
1771 static void on_list(struct list_head *item, struct list_head *list,
1772                     int should_be_on)
1773 {
1774         if (list_empty(item) && should_be_on)
1775                 list_add_tail(item, list);
1776         else if (!list_empty(item) && !should_be_on)
1777                 list_del_init(item);
1778 }
1779
1780 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1781  * can find pages to build into rpcs quickly */
1782 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1783 {
1784         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1785                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1786                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1787
1788         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1789                 loi->loi_write_lop.lop_num_pending);
1790
1791         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1792                 loi->loi_read_lop.lop_num_pending);
1793 }
1794
1795 static void lop_update_pending(struct client_obd *cli,
1796                                struct loi_oap_pages *lop, int cmd, int delta)
1797 {
1798         lop->lop_num_pending += delta;
1799         if (cmd & OBD_BRW_WRITE)
1800                 cli->cl_pending_w_pages += delta;
1801         else
1802                 cli->cl_pending_r_pages += delta;
1803 }
1804
1805 /* this is called when a sync waiter receives an interruption.  Its job is to
1806  * get the caller woken as soon as possible.  If its page hasn't been put in an
1807  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1808  * desiring interruption which will forcefully complete the rpc once the rpc
1809  * has timed out */
1810 static void osc_occ_interrupted(struct oig_callback_context *occ)
1811 {
1812         struct osc_async_page *oap;
1813         struct loi_oap_pages *lop;
1814         struct lov_oinfo *loi;
1815         ENTRY;
1816
1817         /* XXX member_of() */
1818         oap = list_entry(occ, struct osc_async_page, oap_occ);
1819
1820         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1821
1822         oap->oap_interrupted = 1;
1823
1824         /* ok, it's been put in an rpc. only one oap gets a request reference */
1825         if (oap->oap_request != NULL) {
1826                 ptlrpc_mark_interrupted(oap->oap_request);
1827                 ptlrpcd_wake(oap->oap_request);
1828                 GOTO(unlock, 0);
1829         }
1830
1831         /* we don't get interruption callbacks until osc_trigger_group_io()
1832          * has been called and put the sync oaps in the pending/urgent lists.*/
1833         if (!list_empty(&oap->oap_pending_item)) {
1834                 list_del_init(&oap->oap_pending_item);
1835                 list_del_init(&oap->oap_urgent_item);
1836
1837                 loi = oap->oap_loi;
1838                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1839                         &loi->loi_write_lop : &loi->loi_read_lop;
1840                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1841                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1842
1843                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1844                 oap->oap_oig = NULL;
1845         }
1846
1847 unlock:
1848         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1849 }
1850
1851 /* this is trying to propogate async writeback errors back up to the
1852  * application.  As an async write fails we record the error code for later if
1853  * the app does an fsync.  As long as errors persist we force future rpcs to be
1854  * sync so that the app can get a sync error and break the cycle of queueing
1855  * pages for which writeback will fail. */
1856 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1857                            int rc)
1858 {
1859         if (rc) {
1860                 if (!ar->ar_rc)
1861                         ar->ar_rc = rc;
1862
1863                 ar->ar_force_sync = 1;
1864                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1865                 return;
1866
1867         }
1868
1869         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1870                 ar->ar_force_sync = 0;
1871 }
1872
1873 static void osc_oap_to_pending(struct osc_async_page *oap)
1874 {
1875         struct loi_oap_pages *lop;
1876
1877         if (oap->oap_cmd & OBD_BRW_WRITE)
1878                 lop = &oap->oap_loi->loi_write_lop;
1879         else
1880                 lop = &oap->oap_loi->loi_read_lop;
1881
1882         if (oap->oap_async_flags & ASYNC_URGENT)
1883                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1884         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1885         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1886 }
1887
1888 /* this must be called holding the loi list lock to give coverage to exit_cache,
1889  * async_flag maintenance, and oap_request */
1890 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1891                               struct osc_async_page *oap, int sent, int rc)
1892 {
1893         __u64 xid = 0;
1894
1895         ENTRY;
1896         if (oap->oap_request != NULL) {
1897                 xid = ptlrpc_req_xid(oap->oap_request);
1898                 ptlrpc_req_finished(oap->oap_request);
1899                 oap->oap_request = NULL;
1900         }
1901
1902         oap->oap_async_flags = 0;
1903         oap->oap_interrupted = 0;
1904
1905         if (oap->oap_cmd & OBD_BRW_WRITE) {
1906                 osc_process_ar(&cli->cl_ar, xid, rc);
1907                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1908         }
1909
1910         if (rc == 0 && oa != NULL) {
1911                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1912                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1913                 if (oa->o_valid & OBD_MD_FLMTIME)
1914                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1915                 if (oa->o_valid & OBD_MD_FLATIME)
1916                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1917                 if (oa->o_valid & OBD_MD_FLCTIME)
1918                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1919         }
1920
1921         if (oap->oap_oig) {
1922                 osc_exit_cache(cli, oap, sent);
1923                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1924                 oap->oap_oig = NULL;
1925                 EXIT;
1926                 return;
1927         }
1928
1929         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1930                                                 oap->oap_cmd, oa, rc);
1931
1932         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1933          * I/O on the page could start, but OSC calls it under lock
1934          * and thus we can add oap back to pending safely */
1935         if (rc)
1936                 /* upper layer wants to leave the page on pending queue */
1937                 osc_oap_to_pending(oap);
1938         else
1939                 osc_exit_cache(cli, oap, sent);
1940         EXIT;
1941 }
1942
1943 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1944 {
1945         struct osc_async_page *oap, *tmp;
1946         struct osc_brw_async_args *aa = data;
1947         struct client_obd *cli;
1948         ENTRY;
1949
1950         rc = osc_brw_fini_request(req, rc);
1951         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1952         if (osc_recoverable_error(rc)) {
1953                 rc = osc_brw_redo_request(req, aa);
1954                 if (rc == 0)
1955                         RETURN(0);
1956         }
1957
1958         cli = aa->aa_cli;
1959
1960         client_obd_list_lock(&cli->cl_loi_list_lock);
1961
1962         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1963          * is called so we know whether to go to sync BRWs or wait for more
1964          * RPCs to complete */
1965         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1966                 cli->cl_w_in_flight--;
1967         else
1968                 cli->cl_r_in_flight--;
1969
1970         /* the caller may re-use the oap after the completion call so
1971          * we need to clean it up a little */
1972         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1973                 list_del_init(&oap->oap_rpc_item);
1974                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1975         }
1976
1977         osc_wake_cache_waiters(cli);
1978         osc_check_rpcs(cli);
1979
1980         client_obd_list_unlock(&cli->cl_loi_list_lock);
1981
1982         OBDO_FREE(aa->aa_oa);
1983         
1984         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1985         RETURN(rc);
1986 }
1987
1988 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1989                                             struct list_head *rpc_list,
1990                                             int page_count, int cmd)
1991 {
1992         struct ptlrpc_request *req;
1993         struct brw_page **pga = NULL;
1994         struct osc_brw_async_args *aa;
1995         struct obdo *oa = NULL;
1996         struct obd_async_page_ops *ops = NULL;
1997         void *caller_data = NULL;
1998         struct obd_capa *ocapa;
1999         struct osc_async_page *oap;
2000         int i, rc;
2001
2002         ENTRY;
2003         LASSERT(!list_empty(rpc_list));
2004
2005         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2006         if (pga == NULL)
2007                 RETURN(ERR_PTR(-ENOMEM));
2008
2009         OBDO_ALLOC(oa);
2010         if (oa == NULL)
2011                 GOTO(out, req = ERR_PTR(-ENOMEM));
2012
2013         i = 0;
2014         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2015                 if (ops == NULL) {
2016                         ops = oap->oap_caller_ops;
2017                         caller_data = oap->oap_caller_data;
2018                 }
2019                 pga[i] = &oap->oap_brw_page;
2020                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2021                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2022                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2023                 i++;
2024         }
2025
2026         /* always get the data for the obdo for the rpc */
2027         LASSERT(ops != NULL);
2028         ops->ap_fill_obdo(caller_data, cmd, oa);
2029         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2030
2031         sort_brw_pages(pga, page_count);
2032         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2033                                   pga, &req, ocapa);
2034         capa_put(ocapa);
2035         if (rc != 0) {
2036                 CERROR("prep_req failed: %d\n", rc);
2037                 GOTO(out, req = ERR_PTR(rc));
2038         }
2039
2040         /* Need to update the timestamps after the request is built in case
2041          * we race with setattr (locally or in queue at OST).  If OST gets
2042          * later setattr before earlier BRW (as determined by the request xid),
2043          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2044          * way to do this in a single call.  bug 10150 */
2045         ops->ap_update_obdo(caller_data, cmd, oa,
2046                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2047
2048         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2049         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2050         INIT_LIST_HEAD(&aa->aa_oaps);
2051         list_splice(rpc_list, &aa->aa_oaps);
2052         INIT_LIST_HEAD(rpc_list);
2053
2054 out:
2055         if (IS_ERR(req)) {
2056                 if (oa)
2057                         OBDO_FREE(oa);
2058                 if (pga)
2059                         OBD_FREE(pga, sizeof(*pga) * page_count);
2060         }
2061         RETURN(req);
2062 }
2063
2064 /* the loi lock is held across this function but it's allowed to release
2065  * and reacquire it during its work */
2066 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2067                             int cmd, struct loi_oap_pages *lop)
2068 {
2069         struct ptlrpc_request *req;
2070         obd_count page_count = 0;
2071         struct osc_async_page *oap = NULL, *tmp;
2072         struct osc_brw_async_args *aa;
2073         struct obd_async_page_ops *ops;
2074         CFS_LIST_HEAD(rpc_list);
2075         unsigned int ending_offset;
2076         unsigned  starting_offset = 0;
2077         ENTRY;
2078
2079         /* first we find the pages we're allowed to work with */
2080         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2081                                  oap_pending_item) {
2082                 ops = oap->oap_caller_ops;
2083
2084                 LASSERT(oap->oap_magic == OAP_MAGIC);
2085
2086                 /* in llite being 'ready' equates to the page being locked
2087                  * until completion unlocks it.  commit_write submits a page
2088                  * as not ready because its unlock will happen unconditionally
2089                  * as the call returns.  if we race with commit_write giving
2090                  * us that page we dont' want to create a hole in the page
2091                  * stream, so we stop and leave the rpc to be fired by
2092                  * another dirtier or kupdated interval (the not ready page
2093                  * will still be on the dirty list).  we could call in
2094                  * at the end of ll_file_write to process the queue again. */
2095                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2096                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2097                         if (rc < 0)
2098                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2099                                                 "instead of ready\n", oap,
2100                                                 oap->oap_page, rc);
2101                         switch (rc) {
2102                         case -EAGAIN:
2103                                 /* llite is telling us that the page is still
2104                                  * in commit_write and that we should try
2105                                  * and put it in an rpc again later.  we
2106                                  * break out of the loop so we don't create
2107                                  * a hole in the sequence of pages in the rpc
2108                                  * stream.*/
2109                                 oap = NULL;
2110                                 break;
2111                         case -EINTR:
2112                                 /* the io isn't needed.. tell the checks
2113                                  * below to complete the rpc with EINTR */
2114                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2115                                 oap->oap_count = -EINTR;
2116                                 break;
2117                         case 0:
2118                                 oap->oap_async_flags |= ASYNC_READY;
2119                                 break;
2120                         default:
2121                                 LASSERTF(0, "oap %p page %p returned %d "
2122                                             "from make_ready\n", oap,
2123                                             oap->oap_page, rc);
2124                                 break;
2125                         }
2126                 }
2127                 if (oap == NULL)
2128                         break;
2129                 /*
2130                  * Page submitted for IO has to be locked. Either by
2131                  * ->ap_make_ready() or by higher layers.
2132                  *
2133                  * XXX nikita: this assertion should be adjusted when lustre
2134                  * starts using PG_writeback for pages being written out.
2135                  */
2136 #if defined(__KERNEL__) && defined(__LINUX__)
2137                 LASSERT(PageLocked(oap->oap_page));
2138 #endif
2139                 /* If there is a gap at the start of this page, it can't merge
2140                  * with any previous page, so we'll hand the network a
2141                  * "fragmented" page array that it can't transfer in 1 RDMA */
2142                 if (page_count != 0 && oap->oap_page_off != 0)
2143                         break;
2144
2145                 /* take the page out of our book-keeping */
2146                 list_del_init(&oap->oap_pending_item);
2147                 lop_update_pending(cli, lop, cmd, -1);
2148                 list_del_init(&oap->oap_urgent_item);
2149
2150                 if (page_count == 0)
2151                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2152                                           (PTLRPC_MAX_BRW_SIZE - 1);
2153
2154                 /* ask the caller for the size of the io as the rpc leaves. */
2155                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2156                         oap->oap_count =
2157                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2158                 if (oap->oap_count <= 0) {
2159                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2160                                oap->oap_count);
2161                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2162                         continue;
2163                 }
2164
2165                 /* now put the page back in our accounting */
2166                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2167                 if (++page_count >= cli->cl_max_pages_per_rpc)
2168                         break;
2169
2170                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2171                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2172                  * have the same alignment as the initial writes that allocated
2173                  * extents on the server. */
2174                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2175                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2176                 if (ending_offset == 0)
2177                         break;
2178
2179                 /* If there is a gap at the end of this page, it can't merge
2180                  * with any subsequent pages, so we'll hand the network a
2181                  * "fragmented" page array that it can't transfer in 1 RDMA */
2182                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2183                         break;
2184         }
2185
2186         osc_wake_cache_waiters(cli);
2187
2188         if (page_count == 0)
2189                 RETURN(0);
2190
2191         loi_list_maint(cli, loi);
2192
2193         client_obd_list_unlock(&cli->cl_loi_list_lock);
2194
2195         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2196         if (IS_ERR(req)) {
2197                 /* this should happen rarely and is pretty bad, it makes the
2198                  * pending list not follow the dirty order */
2199                 client_obd_list_lock(&cli->cl_loi_list_lock);
2200                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2201                         list_del_init(&oap->oap_rpc_item);
2202
2203                         /* queued sync pages can be torn down while the pages
2204                          * were between the pending list and the rpc */
2205                         if (oap->oap_interrupted) {
2206                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2207                                 osc_ap_completion(cli, NULL, oap, 0,
2208                                                   oap->oap_count);
2209                                 continue;
2210                         }
2211                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2212                 }
2213                 loi_list_maint(cli, loi);
2214                 RETURN(PTR_ERR(req));
2215         }
2216
2217         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2218
2219         if (cmd == OBD_BRW_READ) {
2220                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2221                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2222                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2223                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2224                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2225         } else {
2226                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2227                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2228                                  cli->cl_w_in_flight);
2229                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2230                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2231                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2232         }
2233
2234         client_obd_list_lock(&cli->cl_loi_list_lock);
2235
2236         if (cmd == OBD_BRW_READ)
2237                 cli->cl_r_in_flight++;
2238         else
2239                 cli->cl_w_in_flight++;
2240
2241         /* queued sync pages can be torn down while the pages
2242          * were between the pending list and the rpc */
2243         tmp = NULL;
2244         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2245                 /* only one oap gets a request reference */
2246                 if (tmp == NULL)
2247                         tmp = oap;
2248                 if (oap->oap_interrupted && !req->rq_intr) {
2249                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2250                                oap, req);
2251                         ptlrpc_mark_interrupted(req);
2252                 }
2253         }
2254         if (tmp != NULL)
2255                 tmp->oap_request = ptlrpc_request_addref(req);
2256
2257         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2258                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2259
2260         req->rq_interpret_reply = brw_interpret_oap;
2261         ptlrpcd_add_req(req);
2262         RETURN(1);
2263 }
2264
2265 #define LOI_DEBUG(LOI, STR, args...)                                     \
2266         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2267                !list_empty(&(LOI)->loi_cli_item),                        \
2268                (LOI)->loi_write_lop.lop_num_pending,                     \
2269                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2270                (LOI)->loi_read_lop.lop_num_pending,                      \
2271                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2272                args)                                                     \
2273
2274 /* This is called by osc_check_rpcs() to find which objects have pages that
2275  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2276 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2277 {
2278         ENTRY;
2279         /* first return all objects which we already know to have
2280          * pages ready to be stuffed into rpcs */
2281         if (!list_empty(&cli->cl_loi_ready_list))
2282                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2283                                   struct lov_oinfo, loi_cli_item));
2284
2285         /* then if we have cache waiters, return all objects with queued
2286          * writes.  This is especially important when many small files
2287          * have filled up the cache and not been fired into rpcs because
2288          * they don't pass the nr_pending/object threshhold */
2289         if (!list_empty(&cli->cl_cache_waiters) &&
2290             !list_empty(&cli->cl_loi_write_list))
2291                 RETURN(list_entry(cli->cl_loi_write_list.next,
2292                                   struct lov_oinfo, loi_write_item));
2293
2294         /* then return all queued objects when we have an invalid import
2295          * so that they get flushed */
2296         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2297                 if (!list_empty(&cli->cl_loi_write_list))
2298                         RETURN(list_entry(cli->cl_loi_write_list.next,
2299                                           struct lov_oinfo, loi_write_item));
2300                 if (!list_empty(&cli->cl_loi_read_list))
2301                         RETURN(list_entry(cli->cl_loi_read_list.next,
2302                                           struct lov_oinfo, loi_read_item));
2303         }
2304         RETURN(NULL);
2305 }
2306
2307 /* called with the loi list lock held */
2308 static void osc_check_rpcs(struct client_obd *cli)
2309 {
2310         struct lov_oinfo *loi;
2311         int rc = 0, race_counter = 0;
2312         ENTRY;
2313
2314         while ((loi = osc_next_loi(cli)) != NULL) {
2315                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2316
2317                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2318                         break;
2319
2320                 /* attempt some read/write balancing by alternating between
2321                  * reads and writes in an object.  The makes_rpc checks here
2322                  * would be redundant if we were getting read/write work items
2323                  * instead of objects.  we don't want send_oap_rpc to drain a
2324                  * partial read pending queue when we're given this object to
2325                  * do io on writes while there are cache waiters */
2326                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2327                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2328                                               &loi->loi_write_lop);
2329                         if (rc < 0)
2330                                 break;
2331                         if (rc > 0)
2332                                 race_counter = 0;
2333                         else
2334                                 race_counter++;
2335                 }
2336                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2337                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2338                                               &loi->loi_read_lop);
2339                         if (rc < 0)
2340                                 break;
2341                         if (rc > 0)
2342                                 race_counter = 0;
2343                         else
2344                                 race_counter++;
2345                 }
2346
2347                 /* attempt some inter-object balancing by issueing rpcs
2348                  * for each object in turn */
2349                 if (!list_empty(&loi->loi_cli_item))
2350                         list_del_init(&loi->loi_cli_item);
2351                 if (!list_empty(&loi->loi_write_item))
2352                         list_del_init(&loi->loi_write_item);
2353                 if (!list_empty(&loi->loi_read_item))
2354                         list_del_init(&loi->loi_read_item);
2355
2356                 loi_list_maint(cli, loi);
2357
2358                 /* send_oap_rpc fails with 0 when make_ready tells it to
2359                  * back off.  llite's make_ready does this when it tries
2360                  * to lock a page queued for write that is already locked.
2361                  * we want to try sending rpcs from many objects, but we
2362                  * don't want to spin failing with 0.  */
2363                 if (race_counter == 10)
2364                         break;
2365         }
2366         EXIT;
2367 }
2368
2369 /* we're trying to queue a page in the osc so we're subject to the
2370  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2371  * If the osc's queued pages are already at that limit, then we want to sleep
2372  * until there is space in the osc's queue for us.  We also may be waiting for
2373  * write credits from the OST if there are RPCs in flight that may return some
2374  * before we fall back to sync writes.
2375  *
2376  * We need this know our allocation was granted in the presence of signals */
2377 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2378 {
2379         int rc;
2380         ENTRY;
2381         client_obd_list_lock(&cli->cl_loi_list_lock);
2382         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2383         client_obd_list_unlock(&cli->cl_loi_list_lock);
2384         RETURN(rc);
2385 };
2386
2387 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2388  * grant or cache space. */
2389 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2390                            struct osc_async_page *oap)
2391 {
2392         struct osc_cache_waiter ocw;
2393         struct l_wait_info lwi = { 0 };
2394
2395         ENTRY;
2396
2397         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2398                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2399                cli->cl_dirty_max, obd_max_dirty_pages,
2400                cli->cl_lost_grant, cli->cl_avail_grant);
2401
2402         /* force the caller to try sync io.  this can jump the list
2403          * of queued writes and create a discontiguous rpc stream */
2404         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2405             loi->loi_ar.ar_force_sync)
2406                 RETURN(-EDQUOT);
2407
2408         /* Hopefully normal case - cache space and write credits available */
2409         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2410             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2411             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2412                 /* account for ourselves */
2413                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2414                 RETURN(0);
2415         }
2416
2417         /* Make sure that there are write rpcs in flight to wait for.  This
2418          * is a little silly as this object may not have any pending but
2419          * other objects sure might. */
2420         if (cli->cl_w_in_flight) {
2421                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2422                 cfs_waitq_init(&ocw.ocw_waitq);
2423                 ocw.ocw_oap = oap;
2424                 ocw.ocw_rc = 0;
2425
2426                 loi_list_maint(cli, loi);
2427                 osc_check_rpcs(cli);
2428                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2429
2430                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2431                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2432
2433                 client_obd_list_lock(&cli->cl_loi_list_lock);
2434                 if (!list_empty(&ocw.ocw_entry)) {
2435                         list_del(&ocw.ocw_entry);
2436                         RETURN(-EINTR);
2437                 }
2438                 RETURN(ocw.ocw_rc);
2439         }
2440
2441         RETURN(-EDQUOT);
2442 }
2443
2444 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2445                         struct lov_oinfo *loi, cfs_page_t *page,
2446                         obd_off offset, struct obd_async_page_ops *ops,
2447                         void *data, void **res)
2448 {
2449         struct osc_async_page *oap;
2450         ENTRY;
2451
2452         if (!page)
2453                 return size_round(sizeof(*oap));
2454
2455         oap = *res;
2456         oap->oap_magic = OAP_MAGIC;
2457         oap->oap_cli = &exp->exp_obd->u.cli;
2458         oap->oap_loi = loi;
2459
2460         oap->oap_caller_ops = ops;
2461         oap->oap_caller_data = data;
2462
2463         oap->oap_page = page;
2464         oap->oap_obj_off = offset;
2465
2466         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2467         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2468         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2469
2470         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2471
2472         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2473         RETURN(0);
2474 }
2475
2476 struct osc_async_page *oap_from_cookie(void *cookie)
2477 {
2478         struct osc_async_page *oap = cookie;
2479         if (oap->oap_magic != OAP_MAGIC)
2480                 return ERR_PTR(-EINVAL);
2481         return oap;
2482 };
2483
2484 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2485                               struct lov_oinfo *loi, void *cookie,
2486                               int cmd, obd_off off, int count,
2487                               obd_flag brw_flags, enum async_flags async_flags)
2488 {
2489         struct client_obd *cli = &exp->exp_obd->u.cli;
2490         struct osc_async_page *oap;
2491         int rc = 0;
2492         ENTRY;
2493
2494         oap = oap_from_cookie(cookie);
2495         if (IS_ERR(oap))
2496                 RETURN(PTR_ERR(oap));
2497
2498         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2499                 RETURN(-EIO);
2500
2501         if (!list_empty(&oap->oap_pending_item) ||
2502             !list_empty(&oap->oap_urgent_item) ||
2503             !list_empty(&oap->oap_rpc_item))
2504                 RETURN(-EBUSY);
2505
2506         /* check if the file's owner/group is over quota */
2507 #ifdef HAVE_QUOTA_SUPPORT
2508         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2509                 struct obd_async_page_ops *ops;
2510                 struct obdo *oa;
2511
2512                 OBDO_ALLOC(oa);
2513                 if (oa == NULL)
2514                         RETURN(-ENOMEM);
2515
2516                 ops = oap->oap_caller_ops;
2517                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2518                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2519                     NO_QUOTA)
2520                         rc = -EDQUOT;
2521
2522                 OBDO_FREE(oa);
2523                 if (rc)
2524                         RETURN(rc);
2525         }
2526 #endif
2527
2528         if (loi == NULL)
2529                 loi = lsm->lsm_oinfo[0];
2530
2531         client_obd_list_lock(&cli->cl_loi_list_lock);
2532
2533         oap->oap_cmd = cmd;
2534         oap->oap_page_off = off;
2535         oap->oap_count = count;
2536         oap->oap_brw_flags = brw_flags;
2537         oap->oap_async_flags = async_flags;
2538
2539         if (cmd & OBD_BRW_WRITE) {
2540                 rc = osc_enter_cache(cli, loi, oap);
2541                 if (rc) {
2542                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2543                         RETURN(rc);
2544                 }
2545         }
2546
2547         osc_oap_to_pending(oap);
2548         loi_list_maint(cli, loi);
2549
2550         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2551                   cmd);
2552
2553         osc_check_rpcs(cli);
2554         client_obd_list_unlock(&cli->cl_loi_list_lock);
2555
2556         RETURN(0);
2557 }
2558
2559 /* aka (~was & now & flag), but this is more clear :) */
2560 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2561
2562 static int osc_set_async_flags(struct obd_export *exp,
2563                                struct lov_stripe_md *lsm,
2564                                struct lov_oinfo *loi, void *cookie,
2565                                obd_flag async_flags)
2566 {
2567         struct client_obd *cli = &exp->exp_obd->u.cli;
2568         struct loi_oap_pages *lop;
2569         struct osc_async_page *oap;
2570         int rc = 0;
2571         ENTRY;
2572
2573         oap = oap_from_cookie(cookie);
2574         if (IS_ERR(oap))
2575                 RETURN(PTR_ERR(oap));
2576
2577         /*
2578          * bug 7311: OST-side locking is only supported for liblustre for now
2579          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2580          * implementation has to handle case where OST-locked page was picked
2581          * up by, e.g., ->writepage().
2582          */
2583         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2584         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2585                                      * tread here. */
2586
2587         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2588                 RETURN(-EIO);
2589
2590         if (loi == NULL)
2591                 loi = lsm->lsm_oinfo[0];
2592
2593         if (oap->oap_cmd & OBD_BRW_WRITE) {
2594                 lop = &loi->loi_write_lop;
2595         } else {
2596                 lop = &loi->loi_read_lop;
2597         }
2598
2599         client_obd_list_lock(&cli->cl_loi_list_lock);
2600
2601         if (list_empty(&oap->oap_pending_item))
2602                 GOTO(out, rc = -EINVAL);
2603
2604         if ((oap->oap_async_flags & async_flags) == async_flags)
2605                 GOTO(out, rc = 0);
2606
2607         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2608                 oap->oap_async_flags |= ASYNC_READY;
2609
2610         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2611                 if (list_empty(&oap->oap_rpc_item)) {
2612                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2613                         loi_list_maint(cli, loi);
2614                 }
2615         }
2616
2617         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2618                         oap->oap_async_flags);
2619 out:
2620         osc_check_rpcs(cli);
2621         client_obd_list_unlock(&cli->cl_loi_list_lock);
2622         RETURN(rc);
2623 }
2624
2625 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2626                              struct lov_oinfo *loi,
2627                              struct obd_io_group *oig, void *cookie,
2628                              int cmd, obd_off off, int count,
2629                              obd_flag brw_flags,
2630                              obd_flag async_flags)
2631 {
2632         struct client_obd *cli = &exp->exp_obd->u.cli;
2633         struct osc_async_page *oap;
2634         struct loi_oap_pages *lop;
2635         int rc = 0;
2636         ENTRY;
2637
2638         oap = oap_from_cookie(cookie);
2639         if (IS_ERR(oap))
2640                 RETURN(PTR_ERR(oap));
2641
2642         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2643                 RETURN(-EIO);
2644
2645         if (!list_empty(&oap->oap_pending_item) ||
2646             !list_empty(&oap->oap_urgent_item) ||
2647             !list_empty(&oap->oap_rpc_item))
2648                 RETURN(-EBUSY);
2649
2650         if (loi == NULL)
2651                 loi = lsm->lsm_oinfo[0];
2652
2653         client_obd_list_lock(&cli->cl_loi_list_lock);
2654
2655         oap->oap_cmd = cmd;
2656         oap->oap_page_off = off;
2657         oap->oap_count = count;
2658         oap->oap_brw_flags = brw_flags;
2659         oap->oap_async_flags = async_flags;
2660
2661         if (cmd & OBD_BRW_WRITE)
2662                 lop = &loi->loi_write_lop;
2663         else
2664                 lop = &loi->loi_read_lop;
2665
2666         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2667         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2668                 oap->oap_oig = oig;
2669                 rc = oig_add_one(oig, &oap->oap_occ);
2670         }
2671
2672         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2673                   oap, oap->oap_page, rc);
2674
2675         client_obd_list_unlock(&cli->cl_loi_list_lock);
2676
2677         RETURN(rc);
2678 }
2679
2680 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2681                                  struct loi_oap_pages *lop, int cmd)
2682 {
2683         struct list_head *pos, *tmp;
2684         struct osc_async_page *oap;
2685
2686         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2687                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2688                 list_del(&oap->oap_pending_item);
2689                 osc_oap_to_pending(oap);
2690         }
2691         loi_list_maint(cli, loi);
2692 }
2693
2694 static int osc_trigger_group_io(struct obd_export *exp,
2695                                 struct lov_stripe_md *lsm,
2696                                 struct lov_oinfo *loi,
2697                                 struct obd_io_group *oig)
2698 {
2699         struct client_obd *cli = &exp->exp_obd->u.cli;
2700         ENTRY;
2701
2702         if (loi == NULL)
2703                 loi = lsm->lsm_oinfo[0];
2704
2705         client_obd_list_lock(&cli->cl_loi_list_lock);
2706
2707         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2708         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2709
2710         osc_check_rpcs(cli);
2711         client_obd_list_unlock(&cli->cl_loi_list_lock);
2712
2713         RETURN(0);
2714 }
2715
2716 static int osc_teardown_async_page(struct obd_export *exp,
2717                                    struct lov_stripe_md *lsm,
2718                                    struct lov_oinfo *loi, void *cookie)
2719 {
2720         struct client_obd *cli = &exp->exp_obd->u.cli;
2721         struct loi_oap_pages *lop;
2722         struct osc_async_page *oap;
2723         int rc = 0;
2724         ENTRY;
2725
2726         oap = oap_from_cookie(cookie);
2727         if (IS_ERR(oap))
2728                 RETURN(PTR_ERR(oap));
2729
2730         if (loi == NULL)
2731                 loi = lsm->lsm_oinfo[0];
2732
2733         if (oap->oap_cmd & OBD_BRW_WRITE) {
2734                 lop = &loi->loi_write_lop;
2735         } else {
2736                 lop = &loi->loi_read_lop;
2737         }
2738
2739         client_obd_list_lock(&cli->cl_loi_list_lock);
2740
2741         if (!list_empty(&oap->oap_rpc_item))
2742                 GOTO(out, rc = -EBUSY);
2743
2744         osc_exit_cache(cli, oap, 0);
2745         osc_wake_cache_waiters(cli);
2746
2747         if (!list_empty(&oap->oap_urgent_item)) {
2748                 list_del_init(&oap->oap_urgent_item);
2749                 oap->oap_async_flags &= ~ASYNC_URGENT;
2750         }
2751         if (!list_empty(&oap->oap_pending_item)) {
2752                 list_del_init(&oap->oap_pending_item);
2753                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2754         }
2755         loi_list_maint(cli, loi);
2756
2757         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2758 out:
2759         client_obd_list_unlock(&cli->cl_loi_list_lock);
2760         RETURN(rc);
2761 }
2762
2763 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2764                                     int flags)
2765 {
2766         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2767
2768         if (lock == NULL) {
2769                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2770                 return;
2771         }
2772         lock_res_and_lock(lock);
2773 #if defined (__KERNEL__) && defined (__LINUX__)
2774         /* Liang XXX: Darwin and Winnt checking should be added */
2775         if (lock->l_ast_data && lock->l_ast_data != data) {
2776                 struct inode *new_inode = data;
2777                 struct inode *old_inode = lock->l_ast_data;
2778                 if (!(old_inode->i_state & I_FREEING))
2779                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2780                 LASSERTF(old_inode->i_state & I_FREEING,
2781                          "Found existing inode %p/%lu/%u state %lu in lock: "
2782                          "setting data to %p/%lu/%u\n", old_inode,
2783                          old_inode->i_ino, old_inode->i_generation,
2784                          old_inode->i_state,
2785                          new_inode, new_inode->i_ino, new_inode->i_generation);
2786         }
2787 #endif
2788         lock->l_ast_data = data;
2789         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2790         unlock_res_and_lock(lock);
2791         LDLM_LOCK_PUT(lock);
2792 }
2793
2794 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2795                              ldlm_iterator_t replace, void *data)
2796 {
2797         struct ldlm_res_id res_id = { .name = {0} };
2798         struct obd_device *obd = class_exp2obd(exp);
2799
2800         res_id.name[0] = lsm->lsm_object_id;
2801         res_id.name[2] = lsm->lsm_object_gr;
2802
2803         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2804         return 0;
2805 }
2806
2807 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2808                             int intent, int rc)
2809 {
2810         ENTRY;
2811
2812         if (intent) {
2813                 /* The request was created before ldlm_cli_enqueue call. */
2814                 if (rc == ELDLM_LOCK_ABORTED) {
2815                         struct ldlm_reply *rep;
2816
2817                         /* swabbed by ldlm_cli_enqueue() */
2818                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2819                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2820                                              sizeof(*rep));
2821                         LASSERT(rep != NULL);
2822                         if (rep->lock_policy_res1)
2823                                 rc = rep->lock_policy_res1;
2824                 }
2825         }
2826
2827         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2828                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2829                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2830                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2831                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2832         }
2833
2834         /* Call the update callback. */
2835         rc = oinfo->oi_cb_up(oinfo, rc);
2836         RETURN(rc);
2837 }
2838
2839 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2840                                  struct osc_enqueue_args *aa, int rc)
2841 {
2842         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2843         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2844         struct ldlm_lock *lock;
2845
2846         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2847          * be valid. */
2848         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2849
2850         /* Complete obtaining the lock procedure. */
2851         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2852                                    aa->oa_ei->ei_mode,
2853                                    &aa->oa_oi->oi_flags,
2854                                    &lsm->lsm_oinfo[0]->loi_lvb,
2855                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2856                                    lustre_swab_ost_lvb,
2857                                    aa->oa_oi->oi_lockh, rc);
2858
2859         /* Complete osc stuff. */
2860         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2861
2862         /* Release the lock for async request. */
2863         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2864                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2865
2866         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2867                  aa->oa_oi->oi_lockh, req, aa);
2868         LDLM_LOCK_PUT(lock);
2869         return rc;
2870 }
2871
2872 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2873  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2874  * other synchronous requests, however keeping some locks and trying to obtain
2875  * others may take a considerable amount of time in a case of ost failure; and
2876  * when other sync requests do not get released lock from a client, the client
2877  * is excluded from the cluster -- such scenarious make the life difficult, so
2878  * release locks just after they are obtained. */
2879 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2880                        struct ldlm_enqueue_info *einfo,
2881                        struct ptlrpc_request_set *rqset)
2882 {
2883         struct ldlm_res_id res_id = { .name = {0} };
2884         struct obd_device *obd = exp->exp_obd;
2885         struct ldlm_reply *rep;
2886         struct ptlrpc_request *req = NULL;
2887         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2888         ldlm_mode_t mode;
2889         int rc;
2890         ENTRY;
2891
2892         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2893         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2894
2895         /* Filesystem lock extents are extended to page boundaries so that
2896          * dealing with the page cache is a little smoother.  */
2897         oinfo->oi_policy.l_extent.start -=
2898                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2899         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2900
2901         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2902                 goto no_match;
2903
2904         /* Next, search for already existing extent locks that will cover us */
2905         /* If we're trying to read, we also search for an existing PW lock.  The
2906          * VFS and page cache already protect us locally, so lots of readers/
2907          * writers can share a single PW lock.
2908          *
2909          * There are problems with conversion deadlocks, so instead of
2910          * converting a read lock to a write lock, we'll just enqueue a new
2911          * one.
2912          *
2913          * At some point we should cancel the read lock instead of making them
2914          * send us a blocking callback, but there are problems with canceling
2915          * locks out from other users right now, too. */
2916         mode = einfo->ei_mode;
2917         if (einfo->ei_mode == LCK_PR)
2918                 mode |= LCK_PW;
2919         mode = ldlm_lock_match(obd->obd_namespace,
2920                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2921                                einfo->ei_type, &oinfo->oi_policy, mode,
2922                                oinfo->oi_lockh);
2923         if (mode) {
2924                 /* addref the lock only if not async requests and PW lock is
2925                  * matched whereas we asked for PR. */
2926                 if (!rqset && einfo->ei_mode != mode)
2927                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2928                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2929                                         oinfo->oi_flags);
2930                 if (intent) {
2931                         /* I would like to be able to ASSERT here that rss <=
2932                          * kms, but I can't, for reasons which are explained in
2933                          * lov_enqueue() */
2934                 }
2935
2936                 /* We already have a lock, and it's referenced */
2937                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2938
2939                 /* For async requests, decref the lock. */
2940                 if (einfo->ei_mode != mode)
2941                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2942                 else if (rqset)
2943                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2944
2945                 RETURN(ELDLM_OK);
2946         }
2947
2948  no_match:
2949         if (intent) {
2950                 int size[3] = {
2951                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2952                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2953                         [DLM_LOCKREQ_OFF + 1] = 0 };
2954
2955                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2956                 if (req == NULL)
2957                         RETURN(-ENOMEM);
2958
2959                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2960                 size[DLM_REPLY_REC_OFF] =
2961                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2962                 ptlrpc_req_set_repsize(req, 3, size);
2963         }
2964
2965         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2966         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2967
2968         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2969                               &oinfo->oi_policy, &oinfo->oi_flags,
2970                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2971                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2972                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2973                               rqset ? 1 : 0);
2974         if (rqset) {
2975                 if (!rc) {
2976                         struct osc_enqueue_args *aa;
2977                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2978                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2979                         aa->oa_oi = oinfo;
2980                         aa->oa_ei = einfo;
2981                         aa->oa_exp = exp;
2982
2983                         req->rq_interpret_reply = osc_enqueue_interpret;
2984                         ptlrpc_set_add_req(rqset, req);
2985                 } else if (intent) {
2986                         ptlrpc_req_finished(req);
2987                 }
2988                 RETURN(rc);
2989         }
2990
2991         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2992         if (intent)
2993                 ptlrpc_req_finished(req);
2994
2995         RETURN(rc);
2996 }
2997
2998 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2999                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3000                      int *flags, void *data, struct lustre_handle *lockh)
3001 {
3002         struct ldlm_res_id res_id = { .name = {0} };
3003         struct obd_device *obd = exp->exp_obd;
3004         int lflags = *flags;
3005         ldlm_mode_t rc;
3006         ENTRY;
3007
3008         res_id.name[0] = lsm->lsm_object_id;
3009         res_id.name[2] = lsm->lsm_object_gr;
3010
3011         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3012
3013         /* Filesystem lock extents are extended to page boundaries so that
3014          * dealing with the page cache is a little smoother */
3015         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3016         policy->l_extent.end |= ~CFS_PAGE_MASK;
3017
3018         /* Next, search for already existing extent locks that will cover us */
3019         /* If we're trying to read, we also search for an existing PW lock.  The
3020          * VFS and page cache already protect us locally, so lots of readers/
3021          * writers can share a single PW lock. */
3022         rc = mode;
3023         if (mode == LCK_PR)
3024                 rc |= LCK_PW;
3025         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3026                              &res_id, type, policy, rc, lockh);
3027         if (rc) {
3028                 osc_set_data_with_check(lockh, data, lflags);
3029                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3030                         ldlm_lock_addref(lockh, LCK_PR);
3031                         ldlm_lock_decref(lockh, LCK_PW);
3032                 }
3033                 RETURN(rc);
3034         }
3035         RETURN(rc);
3036 }
3037
3038 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3039                       __u32 mode, struct lustre_handle *lockh)
3040 {
3041         ENTRY;
3042
3043         if (unlikely(mode == LCK_GROUP))
3044                 ldlm_lock_decref_and_cancel(lockh, mode);
3045         else
3046                 ldlm_lock_decref(lockh, mode);
3047
3048         RETURN(0);
3049 }
3050
3051 static int osc_cancel_unused(struct obd_export *exp,
3052                              struct lov_stripe_md *lsm, int flags,
3053                              void *opaque)
3054 {
3055         struct obd_device *obd = class_exp2obd(exp);
3056         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3057
3058         if (lsm != NULL) {
3059                 res_id.name[0] = lsm->lsm_object_id;
3060                 res_id.name[2] = lsm->lsm_object_gr;
3061                 resp = &res_id;
3062         }
3063
3064         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3065 }
3066
3067 static int osc_join_lru(struct obd_export *exp,
3068                         struct lov_stripe_md *lsm, int join)
3069 {
3070         struct obd_device *obd = class_exp2obd(exp);
3071         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3072
3073         if (lsm != NULL) {
3074                 res_id.name[0] = lsm->lsm_object_id;
3075                 res_id.name[2] = lsm->lsm_object_gr;
3076                 resp = &res_id;
3077         }
3078
3079         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3080 }
3081
3082 static int osc_statfs_interpret(struct ptlrpc_request *req,
3083                                 struct osc_async_args *aa, int rc)
3084 {
3085         struct obd_statfs *msfs;
3086         ENTRY;
3087
3088         if (rc != 0)
3089                 GOTO(out, rc);
3090
3091         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3092                                   lustre_swab_obd_statfs);
3093         if (msfs == NULL) {
3094                 CERROR("Can't unpack obd_statfs\n");
3095                 GOTO(out, rc = -EPROTO);
3096         }
3097
3098         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3099 out:
3100         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3101         RETURN(rc);
3102 }
3103
3104 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3105                             __u64 max_age, struct ptlrpc_request_set *rqset)
3106 {
3107         struct ptlrpc_request *req;
3108         struct osc_async_args *aa;
3109         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3110         ENTRY;
3111
3112         /* We could possibly pass max_age in the request (as an absolute
3113          * timestamp or a "seconds.usec ago") so the target can avoid doing
3114          * extra calls into the filesystem if that isn't necessary (e.g.
3115          * during mount that would help a bit).  Having relative timestamps
3116          * is not so great if request processing is slow, while absolute
3117          * timestamps are not ideal because they need time synchronization. */
3118         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3119                               OST_STATFS, 1, NULL, NULL);
3120         if (!req)
3121                 RETURN(-ENOMEM);
3122
3123         ptlrpc_req_set_repsize(req, 2, size);
3124         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3125
3126         req->rq_interpret_reply = osc_statfs_interpret;
3127         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3128         aa = (struct osc_async_args *)&req->rq_async_args;
3129         aa->aa_oi = oinfo;
3130
3131         ptlrpc_set_add_req(rqset, req);
3132         RETURN(0);
3133 }
3134
3135 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3136                       __u64 max_age)
3137 {
3138         struct obd_statfs *msfs;
3139         struct ptlrpc_request *req;
3140         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3141         ENTRY;
3142
3143         /* We could possibly pass max_age in the request (as an absolute
3144          * timestamp or a "seconds.usec ago") so the target can avoid doing
3145          * extra calls into the filesystem if that isn't necessary (e.g.
3146          * during mount that would help a bit).  Having relative timestamps
3147          * is not so great if request processing is slow, while absolute
3148          * timestamps are not ideal because they need time synchronization. */
3149         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3150                               OST_STATFS, 1, NULL, NULL);
3151         if (!req)
3152                 RETURN(-ENOMEM);
3153
3154         ptlrpc_req_set_repsize(req, 2, size);
3155         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3156
3157         rc = ptlrpc_queue_wait(req);
3158         if (rc)
3159                 GOTO(out, rc);
3160
3161         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3162                                   lustre_swab_obd_statfs);
3163         if (msfs == NULL) {
3164                 CERROR("Can't unpack obd_statfs\n");
3165                 GOTO(out, rc = -EPROTO);
3166         }
3167
3168         memcpy(osfs, msfs, sizeof(*osfs));
3169
3170         EXIT;
3171  out:
3172         ptlrpc_req_finished(req);
3173         return rc;
3174 }
3175
3176 /* Retrieve object striping information.
3177  *
3178  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3179  * the maximum number of OST indices which will fit in the user buffer.
3180  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3181  */
3182 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3183 {
3184         struct lov_user_md lum, *lumk;
3185         int rc = 0, lum_size;
3186         ENTRY;
3187
3188         if (!lsm)
3189                 RETURN(-ENODATA);
3190
3191         if (copy_from_user(&lum, lump, sizeof(lum)))
3192                 RETURN(-EFAULT);
3193
3194         if (lum.lmm_magic != LOV_USER_MAGIC)
3195                 RETURN(-EINVAL);
3196
3197         if (lum.lmm_stripe_count > 0) {
3198                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3199                 OBD_ALLOC(lumk, lum_size);
3200                 if (!lumk)
3201                         RETURN(-ENOMEM);
3202
3203                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3204                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3205         } else {
3206                 lum_size = sizeof(lum);
3207                 lumk = &lum;
3208         }
3209
3210         lumk->lmm_object_id = lsm->lsm_object_id;
3211         lumk->lmm_object_gr = lsm->lsm_object_gr;
3212         lumk->lmm_stripe_count = 1;
3213
3214         if (copy_to_user(lump, lumk, lum_size))
3215                 rc = -EFAULT;
3216
3217         if (lumk != &lum)
3218                 OBD_FREE(lumk, lum_size);
3219
3220         RETURN(rc);
3221 }
3222
3223
3224 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3225                          void *karg, void *uarg)
3226 {
3227         struct obd_device *obd = exp->exp_obd;
3228         struct obd_ioctl_data *data = karg;
3229         int err = 0;
3230         ENTRY;
3231
3232         if (!try_module_get(THIS_MODULE)) {
3233                 CERROR("Can't get module. Is it alive?");
3234                 return -EINVAL;
3235         }
3236         switch (cmd) {
3237         case OBD_IOC_LOV_GET_CONFIG: {
3238                 char *buf;
3239                 struct lov_desc *desc;
3240                 struct obd_uuid uuid;
3241
3242                 buf = NULL;
3243                 len = 0;
3244                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3245                         GOTO(out, err = -EINVAL);
3246
3247                 data = (struct obd_ioctl_data *)buf;
3248
3249                 if (sizeof(*desc) > data->ioc_inllen1) {
3250                         obd_ioctl_freedata(buf, len);
3251                         GOTO(out, err = -EINVAL);
3252                 }
3253
3254                 if (data->ioc_inllen2 < sizeof(uuid)) {
3255                         obd_ioctl_freedata(buf, len);
3256                         GOTO(out, err = -EINVAL);
3257                 }
3258
3259                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3260                 desc->ld_tgt_count = 1;
3261                 desc->ld_active_tgt_count = 1;
3262                 desc->ld_default_stripe_count = 1;
3263                 desc->ld_default_stripe_size = 0;
3264                 desc->ld_default_stripe_offset = 0;
3265                 desc->ld_pattern = 0;
3266                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3267
3268                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3269
3270                 err = copy_to_user((void *)uarg, buf, len);
3271                 if (err)
3272                         err = -EFAULT;
3273                 obd_ioctl_freedata(buf, len);
3274                 GOTO(out, err);
3275         }
3276         case LL_IOC_LOV_SETSTRIPE:
3277                 err = obd_alloc_memmd(exp, karg);
3278                 if (err > 0)
3279                         err = 0;
3280                 GOTO(out, err);
3281         case LL_IOC_LOV_GETSTRIPE:
3282                 err = osc_getstripe(karg, uarg);
3283                 GOTO(out, err);
3284         case OBD_IOC_CLIENT_RECOVER:
3285                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3286                                             data->ioc_inlbuf1);
3287                 if (err > 0)
3288                         err = 0;
3289                 GOTO(out, err);
3290         case IOC_OSC_SET_ACTIVE:
3291                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3292                                                data->ioc_offset);
3293                 GOTO(out, err);
3294         case OBD_IOC_POLL_QUOTACHECK:
3295                 err = lquota_poll_check(quota_interface, exp,
3296                                         (struct if_quotacheck *)karg);
3297                 GOTO(out, err);
3298         default:
3299                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3300                        cmd, cfs_curproc_comm());
3301                 GOTO(out, err = -ENOTTY);
3302         }
3303 out:
3304         module_put(THIS_MODULE);
3305         return err;
3306 }
3307
3308 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3309                         void *key, __u32 *vallen, void *val)
3310 {
3311         ENTRY;
3312         if (!vallen || !val)
3313                 RETURN(-EFAULT);
3314
3315         if (KEY_IS("lock_to_stripe")) {
3316                 __u32 *stripe = val;
3317                 *vallen = sizeof(*stripe);
3318                 *stripe = 0;
3319                 RETURN(0);
3320         } else if (KEY_IS("last_id")) {
3321                 struct ptlrpc_request *req;
3322                 obd_id *reply;
3323                 char *bufs[2] = { NULL, key };
3324                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3325
3326                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3327                                       OST_GET_INFO, 2, size, bufs);
3328                 if (req == NULL)
3329                         RETURN(-ENOMEM);
3330
3331                 size[REPLY_REC_OFF] = *vallen;
3332                 ptlrpc_req_set_repsize(req, 2, size);
3333                 rc = ptlrpc_queue_wait(req);
3334                 if (rc)
3335                         GOTO(out, rc);
3336
3337                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3338                                            lustre_swab_ost_last_id);
3339                 if (reply == NULL) {
3340                         CERROR("Can't unpack OST last ID\n");
3341                         GOTO(out, rc = -EPROTO);
3342                 }
3343                 *((obd_id *)val) = *reply;
3344         out:
3345                 ptlrpc_req_finished(req);
3346                 RETURN(rc);
3347         }
3348         RETURN(-EINVAL);
3349 }
3350
3351 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3352                                           void *aa, int rc)
3353 {
3354         struct llog_ctxt *ctxt;
3355         struct obd_import *imp = req->rq_import;
3356         ENTRY;
3357
3358         if (rc != 0)
3359                 RETURN(rc);
3360
3361         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3362         if (ctxt) {
3363                 if (rc == 0)
3364                         rc = llog_initiator_connect(ctxt);
3365                 else
3366                         CERROR("cannot establish connection for "
3367                                "ctxt %p: %d\n", ctxt, rc);
3368         }
3369
3370         spin_lock(&imp->imp_lock);
3371         imp->imp_server_timeout = 1;
3372         imp->imp_pingable = 1;
3373         spin_unlock(&imp->imp_lock);
3374         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3375
3376         RETURN(rc);
3377 }
3378
3379 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3380                               void *key, obd_count vallen, void *val,
3381                               struct ptlrpc_request_set *set)
3382 {
3383         struct ptlrpc_request *req;
3384         struct obd_device  *obd = exp->exp_obd;
3385         struct obd_import *imp = class_exp2cliimp(exp);
3386         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3387         char *bufs[3] = { NULL, key, val };
3388         ENTRY;
3389
3390         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3391
3392         if (KEY_IS(KEY_NEXT_ID)) {
3393                 if (vallen != sizeof(obd_id))
3394                         RETURN(-EINVAL);
3395                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3396                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3397                        exp->exp_obd->obd_name,
3398                        obd->u.cli.cl_oscc.oscc_next_id);
3399
3400                 RETURN(0);
3401         }
3402
3403         if (KEY_IS("unlinked")) {
3404                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3405                 spin_lock(&oscc->oscc_lock);
3406                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3407                 spin_unlock(&oscc->oscc_lock);
3408                 RETURN(0);
3409         }
3410
3411         if (KEY_IS(KEY_INIT_RECOV)) {
3412                 if (vallen != sizeof(int))
3413                         RETURN(-EINVAL);
3414                 spin_lock(&imp->imp_lock);
3415                 imp->imp_initial_recov = *(int *)val;
3416                 spin_unlock(&imp->imp_lock);
3417                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3418                        exp->exp_obd->obd_name,
3419                        imp->imp_initial_recov);
3420                 RETURN(0);
3421         }
3422
3423         if (KEY_IS("checksum")) {
3424                 if (vallen != sizeof(int))
3425                         RETURN(-EINVAL);
3426                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3427                 RETURN(0);
3428         }
3429
3430         if (KEY_IS(KEY_FLUSH_CTX)) {
3431                 sptlrpc_import_flush_my_ctx(imp);
3432                 RETURN(0);
3433         }
3434
3435         if (!set)
3436                 RETURN(-EINVAL);
3437
3438         /* We pass all other commands directly to OST. Since nobody calls osc
3439            methods directly and everybody is supposed to go through LOV, we
3440            assume lov checked invalid values for us.
3441            The only recognised values so far are evict_by_nid and mds_conn.
3442            Even if something bad goes through, we'd get a -EINVAL from OST
3443            anyway. */
3444
3445         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3446                               bufs);
3447         if (req == NULL)
3448                 RETURN(-ENOMEM);
3449
3450         if (KEY_IS(KEY_MDS_CONN)) {
3451                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3452
3453                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3454                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3455                 LASSERT(oscc->oscc_oa.o_gr > 0);
3456                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3457         }
3458
3459         ptlrpc_req_set_repsize(req, 1, NULL);
3460         ptlrpc_set_add_req(set, req);
3461         ptlrpc_check_set(set);
3462
3463         RETURN(0);
3464 }
3465
3466
3467 static struct llog_operations osc_size_repl_logops = {
3468         lop_cancel: llog_obd_repl_cancel
3469 };
3470
3471 static struct llog_operations osc_mds_ost_orig_logops;
3472 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3473                          struct obd_device *tgt, int count,
3474                          struct llog_catid *catid, struct obd_uuid *uuid)
3475 {
3476         int rc;
3477         ENTRY;
3478
3479         spin_lock(&obd->obd_dev_lock);
3480         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3481                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3482                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3483                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3484                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3485                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3486         }
3487         spin_unlock(&obd->obd_dev_lock);
3488
3489         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3490                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3491         if (rc) {
3492                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3493                 GOTO (out, rc);
3494         }
3495
3496         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3497                         &osc_size_repl_logops);
3498         if (rc)
3499                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3500 out:
3501         if (rc) {
3502                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3503                        obd->obd_name, tgt->obd_name, count, catid, rc);
3504                 CERROR("logid "LPX64":0x%x\n",
3505                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3506         }
3507         RETURN(rc);
3508 }
3509
3510 static int osc_llog_finish(struct obd_device *obd, int count)
3511 {
3512         struct llog_ctxt *ctxt;
3513         int rc = 0, rc2 = 0;
3514         ENTRY;
3515
3516         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3517         if (ctxt)
3518                 rc = llog_cleanup(ctxt);
3519
3520         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3521         if (ctxt)
3522                 rc2 = llog_cleanup(ctxt);
3523         if (!rc)
3524                 rc = rc2;
3525
3526         RETURN(rc);
3527 }
3528
3529 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3530                          struct obd_uuid *cluuid,
3531                          struct obd_connect_data *data)
3532 {
3533         struct client_obd *cli = &obd->u.cli;
3534
3535         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3536                 long lost_grant;
3537
3538                 client_obd_list_lock(&cli->cl_loi_list_lock);
3539                 data->ocd_grant = cli->cl_avail_grant ?:
3540                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3541                 lost_grant = cli->cl_lost_grant;
3542                 cli->cl_lost_grant = 0;
3543                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3544
3545                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3546                        "cl_lost_grant: %ld\n", data->ocd_grant,
3547                        cli->cl_avail_grant, lost_grant);
3548                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3549                        " ocd_grant: %d\n", data->ocd_connect_flags,
3550                        data->ocd_version, data->ocd_grant);
3551         }
3552
3553         RETURN(0);
3554 }
3555
3556 static int osc_disconnect(struct obd_export *exp)
3557 {
3558         struct obd_device *obd = class_exp2obd(exp);
3559         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3560         int rc;
3561
3562         if (obd->u.cli.cl_conn_count == 1)
3563                 /* flush any remaining cancel messages out to the target */
3564                 llog_sync(ctxt, exp);
3565
3566         rc = client_disconnect_export(exp);
3567         return rc;
3568 }
3569
3570 static int osc_import_event(struct obd_device *obd,
3571                             struct obd_import *imp,
3572                             enum obd_import_event event)
3573 {
3574         struct client_obd *cli;
3575         int rc = 0;
3576
3577         ENTRY;
3578         LASSERT(imp->imp_obd == obd);
3579
3580         switch (event) {
3581         case IMP_EVENT_DISCON: {
3582                 /* Only do this on the MDS OSC's */
3583                 if (imp->imp_server_timeout) {
3584                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3585
3586                         spin_lock(&oscc->oscc_lock);
3587                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3588                         spin_unlock(&oscc->oscc_lock);
3589                 }
3590                 cli = &obd->u.cli;
3591                 client_obd_list_lock(&cli->cl_loi_list_lock);
3592                 cli->cl_avail_grant = 0;
3593                 cli->cl_lost_grant = 0;
3594                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3595                 break;
3596         }
3597         case IMP_EVENT_INACTIVE: {
3598                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3599                 break;
3600         }
3601         case IMP_EVENT_INVALIDATE: {
3602                 struct ldlm_namespace *ns = obd->obd_namespace;
3603
3604                 /* Reset grants */
3605                 cli = &obd->u.cli;
3606                 client_obd_list_lock(&cli->cl_loi_list_lock);
3607                 /* all pages go to failing rpcs due to the invalid import */
3608                 osc_check_rpcs(cli);
3609                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3610
3611                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3612
3613                 break;
3614         }
3615         case IMP_EVENT_ACTIVE: {
3616                 /* Only do this on the MDS OSC's */
3617                 if (imp->imp_server_timeout) {
3618                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3619
3620                         spin_lock(&oscc->oscc_lock);
3621                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3622                         spin_unlock(&oscc->oscc_lock);
3623                 }
3624                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3625                 break;
3626         }
3627         case IMP_EVENT_OCD: {
3628                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3629
3630                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3631                         osc_init_grant(&obd->u.cli, ocd);
3632
3633                 /* See bug 7198 */
3634                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3635                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3636
3637                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3638                 break;
3639         }
3640         default:
3641                 CERROR("Unknown import event %d\n", event);
3642                 LBUG();
3643         }
3644         RETURN(rc);
3645 }
3646
3647 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3648 {
3649         int rc;
3650         ENTRY;
3651
3652         ENTRY;
3653         rc = ptlrpcd_addref();
3654         if (rc)
3655                 RETURN(rc);
3656
3657         rc = client_obd_setup(obd, lcfg);
3658         if (rc) {
3659                 ptlrpcd_decref();
3660         } else {
3661                 struct lprocfs_static_vars lvars;
3662                 struct client_obd *cli = &obd->u.cli;
3663
3664                 lprocfs_init_vars(osc, &lvars);
3665                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3666                         lproc_osc_attach_seqstat(obd);
3667                         ptlrpc_lprocfs_register_obd(obd);
3668                 }
3669
3670                 oscc_init(obd);
3671                 /* We need to allocate a few requests more, because
3672                    brw_interpret_oap tries to create new requests before freeing
3673                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3674                    reserved, but I afraid that might be too much wasted RAM
3675                    in fact, so 2 is just my guess and still should work. */
3676                 cli->cl_import->imp_rq_pool =
3677                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3678                                             OST_MAXREQSIZE,
3679                                             ptlrpc_add_rqs_to_pool);
3680         }
3681
3682         RETURN(rc);
3683 }
3684
3685 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3686 {
3687         int rc = 0;
3688         ENTRY;
3689
3690         switch (stage) {
3691         case OBD_CLEANUP_EARLY: {
3692                 struct obd_import *imp;
3693                 imp = obd->u.cli.cl_import;
3694                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3695                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3696                 ptlrpc_deactivate_import(imp);
3697                 spin_lock(&imp->imp_lock);
3698                 imp->imp_pingable = 0;
3699                 spin_unlock(&imp->imp_lock);
3700                 break;
3701         }
3702         case OBD_CLEANUP_EXPORTS: {
3703                 /* If we set up but never connected, the
3704                    client import will not have been cleaned. */
3705                 if (obd->u.cli.cl_import) {
3706                         struct obd_import *imp;
3707                         imp = obd->u.cli.cl_import;
3708                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3709                                obd->obd_name);
3710                         ptlrpc_invalidate_import(imp);
3711                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3712                         class_destroy_import(imp);
3713                         obd->u.cli.cl_import = NULL;
3714                 }
3715                 break;
3716         }
3717         case OBD_CLEANUP_SELF_EXP:
3718                 rc = obd_llog_finish(obd, 0);
3719                 if (rc != 0)
3720                         CERROR("failed to cleanup llogging subsystems\n");
3721                 break;
3722         case OBD_CLEANUP_OBD:
3723                 break;
3724         }
3725         RETURN(rc);
3726 }
3727
3728 int osc_cleanup(struct obd_device *obd)
3729 {
3730         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3731         int rc;
3732
3733         ENTRY;
3734         ptlrpc_lprocfs_unregister_obd(obd);
3735         lprocfs_obd_cleanup(obd);
3736
3737         spin_lock(&oscc->oscc_lock);
3738         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3739         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3740         spin_unlock(&oscc->oscc_lock);
3741
3742         /* free memory of osc quota cache */
3743         lquota_cleanup(quota_interface, obd);
3744
3745         rc = client_obd_cleanup(obd);
3746
3747         ptlrpcd_decref();
3748         RETURN(rc);
3749 }
3750
3751 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3752 {
3753         struct lustre_cfg *lcfg = buf;
3754         struct lprocfs_static_vars lvars;
3755         int rc = 0;
3756
3757         lprocfs_init_vars(osc, &lvars);
3758
3759         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3760         return(rc);
3761 }
3762
3763 struct obd_ops osc_obd_ops = {
3764         .o_owner                = THIS_MODULE,
3765         .o_setup                = osc_setup,
3766         .o_precleanup           = osc_precleanup,
3767         .o_cleanup              = osc_cleanup,
3768         .o_add_conn             = client_import_add_conn,
3769         .o_del_conn             = client_import_del_conn,
3770         .o_connect              = client_connect_import,
3771         .o_reconnect            = osc_reconnect,
3772         .o_disconnect           = osc_disconnect,
3773         .o_statfs               = osc_statfs,
3774         .o_statfs_async         = osc_statfs_async,
3775         .o_packmd               = osc_packmd,
3776         .o_unpackmd             = osc_unpackmd,
3777         .o_precreate            = osc_precreate,
3778         .o_create               = osc_create,
3779         .o_destroy              = osc_destroy,
3780         .o_getattr              = osc_getattr,
3781         .o_getattr_async        = osc_getattr_async,
3782         .o_setattr              = osc_setattr,
3783         .o_setattr_async        = osc_setattr_async,
3784         .o_brw                  = osc_brw,
3785         .o_brw_async            = osc_brw_async,
3786         .o_prep_async_page      = osc_prep_async_page,
3787         .o_queue_async_io       = osc_queue_async_io,
3788         .o_set_async_flags      = osc_set_async_flags,
3789         .o_queue_group_io       = osc_queue_group_io,
3790         .o_trigger_group_io     = osc_trigger_group_io,
3791         .o_teardown_async_page  = osc_teardown_async_page,
3792         .o_punch                = osc_punch,
3793         .o_sync                 = osc_sync,
3794         .o_enqueue              = osc_enqueue,
3795         .o_match                = osc_match,
3796         .o_change_cbdata        = osc_change_cbdata,
3797         .o_cancel               = osc_cancel,
3798         .o_cancel_unused        = osc_cancel_unused,
3799         .o_join_lru             = osc_join_lru,
3800         .o_iocontrol            = osc_iocontrol,
3801         .o_get_info             = osc_get_info,
3802         .o_set_info_async       = osc_set_info_async,
3803         .o_import_event         = osc_import_event,
3804         .o_llog_init            = osc_llog_init,
3805         .o_llog_finish          = osc_llog_finish,
3806         .o_process_config       = osc_process_config,
3807 };
3808 int __init osc_init(void)
3809 {
3810         struct lprocfs_static_vars lvars;
3811         int rc;
3812         ENTRY;
3813
3814         lprocfs_init_vars(osc, &lvars);
3815
3816         request_module("lquota");
3817         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3818         lquota_init(quota_interface);
3819         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3820
3821         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3822                                  LUSTRE_OSC_NAME, NULL);
3823         if (rc) {
3824                 if (quota_interface)
3825                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3826                 RETURN(rc);
3827         }
3828
3829         RETURN(rc);
3830 }
3831
3832 #ifdef __KERNEL__
3833 static void /*__exit*/ osc_exit(void)
3834 {
3835         lquota_exit(quota_interface);
3836         if (quota_interface)
3837                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3838
3839         class_unregister_type(LUSTRE_OSC_NAME);
3840 }
3841
3842 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3843 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3844 MODULE_LICENSE("GPL");
3845
3846 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3847 #endif