Whamcloud - gitweb
ad5ec8040aa0af3b1166b97b6855143f47d87811
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453 out_req:
454         ptlrpc_req_finished(req);
455 out:
456         if (rc && !*ea)
457                 obd_free_memmd(exp, &lsm);
458         RETURN(rc);
459 }
460
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462                                struct osc_async_args *aa, int rc)
463 {
464         struct ost_body *body;
465         ENTRY;
466
467         if (rc != 0)
468                 GOTO(out, rc);
469
470         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471                                   lustre_swab_ost_body);
472         if (body == NULL) {
473                 CERROR ("can't unpack ost_body\n");
474                 GOTO(out, rc = -EPROTO);
475         }
476
477         *aa->aa_oi->oi_oa = body->oa;
478 out:
479         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
480         RETURN(rc);
481 }
482
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484                      struct obd_trans_info *oti,
485                      struct ptlrpc_request_set *rqset)
486 {
487         struct ptlrpc_request *req;
488         struct osc_async_args *aa;
489         struct ost_body *body;
490         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491         ENTRY;
492
493         if (!oinfo->oi_oa) {
494                 CERROR("oa NULL\n");
495                 RETURN(-EINVAL);
496         }
497
498         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500                               OST_PUNCH, 3, size, NULL);
501         if (!req)
502                 RETURN(-ENOMEM);
503
504         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
505
506         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
507         /* overload the size and blocks fields in the oa with start/end */
508         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
509         body->oa.o_size = oinfo->oi_policy.l_extent.start;
510         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
511         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
512
513         ptlrpc_req_set_repsize(req, 2, size);
514
515         req->rq_interpret_reply = osc_punch_interpret;
516         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
517         aa = (struct osc_async_args *)&req->rq_async_args;
518         aa->aa_oi = oinfo;
519         ptlrpc_set_add_req(rqset, req);
520
521         RETURN(0);
522 }
523
524 static int osc_sync(struct obd_export *exp, struct obdo *oa,
525                     struct lov_stripe_md *md, obd_size start, obd_size end,
526                     void *capa)
527 {
528         struct ptlrpc_request *req;
529         struct ost_body *body;
530         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
531         ENTRY;
532
533         if (!oa) {
534                 CERROR("oa NULL\n");
535                 RETURN(-EINVAL);
536         }
537
538         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
539
540         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
541                               OST_SYNC, 3, size, NULL);
542         if (!req)
543                 RETURN(-ENOMEM);
544
545         /* overload the size and blocks fields in the oa with start/end */
546         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
547         body->oa = *oa;
548         body->oa.o_size = start;
549         body->oa.o_blocks = end;
550         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
551
552         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
553
554         ptlrpc_req_set_repsize(req, 2, size);
555
556         rc = ptlrpc_queue_wait(req);
557         if (rc)
558                 GOTO(out, rc);
559
560         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
561                                   lustre_swab_ost_body);
562         if (body == NULL) {
563                 CERROR ("can't unpack ost_body\n");
564                 GOTO (out, rc = -EPROTO);
565         }
566
567         *oa = body->oa;
568
569         EXIT;
570  out:
571         ptlrpc_req_finished(req);
572         return rc;
573 }
574
575 /* Find and cancel locally locks matched by @mode in the resource found by
576  * @objid. Found locks are added into @cancel list. Returns the amount of
577  * locks added to @cancels list. */
578 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
579                                    struct list_head *cancels, ldlm_mode_t mode,
580                                    int lock_flags)
581 {
582         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
583         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
584         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
585         int count;
586         ENTRY;
587
588         if (res == NULL)
589                 RETURN(0);
590
591         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
592                                            lock_flags, 0, NULL);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 /* Destroy requests can be async always on the client, and we don't even really
598  * care about the return code since the client cannot do anything at all about
599  * a destroy failure.
600  * When the MDS is unlinking a filename, it saves the file objects into a
601  * recovery llog, and these object records are cancelled when the OST reports
602  * they were destroyed and sync'd to disk (i.e. transaction committed).
603  * If the client dies, or the OST is down when the object should be destroyed,
604  * the records are not cancelled, and when the OST reconnects to the MDS next,
605  * it will retrieve the llog unlink logs and then sends the log cancellation
606  * cookies to the MDS after committing destroy transactions. */
607 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
608                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
609                        struct obd_export *md_export)
610 {
611         CFS_LIST_HEAD(cancels);
612         struct ptlrpc_request *req;
613         struct ost_body *body;
614         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
615         int count, bufcount = 2;
616         ENTRY;
617
618         if (!oa) {
619                 CERROR("oa NULL\n");
620                 RETURN(-EINVAL);
621         }
622
623         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
624                                         LDLM_FL_DISCARD_DATA);
625         if (exp_connect_cancelset(exp) && count) {
626                 bufcount = 3;
627                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,
628                                                              OST_DESTROY);
629         }
630         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
631                               OST_DESTROY, bufcount, size, NULL);
632         if (exp_connect_cancelset(exp) && req)
633                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
634         else
635                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
636
637         if (!req)
638                 RETURN(-ENOMEM);
639
640         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
641
642         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
644                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
645                        sizeof(*oti->oti_logcookies));
646         body->oa = *oa;
647
648         ptlrpc_req_set_repsize(req, 2, size);
649
650         ptlrpcd_add_req(req);
651         RETURN(0);
652 }
653
654 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
655                                 long writing_bytes)
656 {
657         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
658
659         LASSERT(!(oa->o_valid & bits));
660
661         oa->o_valid |= bits;
662         client_obd_list_lock(&cli->cl_loi_list_lock);
663         oa->o_dirty = cli->cl_dirty;
664         if (cli->cl_dirty > cli->cl_dirty_max) {
665                 CERROR("dirty %lu > dirty_max %lu\n",
666                        cli->cl_dirty, cli->cl_dirty_max);
667                 oa->o_undirty = 0;
668         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
669                 CERROR("dirty %d > system dirty_max %d\n",
670                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
671                 oa->o_undirty = 0;
672         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty, cli->cl_dirty_max);
675                 oa->o_undirty = 0;
676         } else {
677                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
678                                 (cli->cl_max_rpcs_in_flight + 1);
679                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
680         }
681         oa->o_grant = cli->cl_avail_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         client_obd_list_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 /* caller must hold loi_list_lock */
690 static void osc_consume_write_grant(struct client_obd *cli,
691                                     struct brw_page *pga)
692 {
693         atomic_inc(&obd_dirty_pages);
694         cli->cl_dirty += CFS_PAGE_SIZE;
695         cli->cl_avail_grant -= CFS_PAGE_SIZE;
696         pga->flag |= OBD_BRW_FROM_GRANT;
697         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
698                CFS_PAGE_SIZE, pga, pga->pg);
699         LASSERT(cli->cl_avail_grant >= 0);
700 }
701
702 /* the companion to osc_consume_write_grant, called when a brw has completed.
703  * must be called with the loi lock held. */
704 static void osc_release_write_grant(struct client_obd *cli,
705                                     struct brw_page *pga, int sent)
706 {
707         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
708         ENTRY;
709
710         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
711                 EXIT;
712                 return;
713         }
714
715         pga->flag &= ~OBD_BRW_FROM_GRANT;
716         atomic_dec(&obd_dirty_pages);
717         cli->cl_dirty -= CFS_PAGE_SIZE;
718         if (!sent) {
719                 cli->cl_lost_grant += CFS_PAGE_SIZE;
720                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
721                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
722         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
723                 /* For short writes we shouldn't count parts of pages that
724                  * span a whole block on the OST side, or our accounting goes
725                  * wrong.  Should match the code in filter_grant_check. */
726                 int offset = pga->off & ~CFS_PAGE_MASK;
727                 int count = pga->count + (offset & (blocksize - 1));
728                 int end = (offset + pga->count) & (blocksize - 1);
729                 if (end)
730                         count += blocksize - end;
731
732                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
733                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
734                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
735                        cli->cl_avail_grant, cli->cl_dirty);
736         }
737
738         EXIT;
739 }
740
741 static unsigned long rpcs_in_flight(struct client_obd *cli)
742 {
743         return cli->cl_r_in_flight + cli->cl_w_in_flight;
744 }
745
746 /* caller must hold loi_list_lock */
747 void osc_wake_cache_waiters(struct client_obd *cli)
748 {
749         struct list_head *l, *tmp;
750         struct osc_cache_waiter *ocw;
751
752         ENTRY;
753         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
754                 /* if we can't dirty more, we must wait until some is written */
755                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
756                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
757                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
758                                "osc max %ld, sys max %d\n", cli->cl_dirty,
759                                cli->cl_dirty_max, obd_max_dirty_pages);
760                         return;
761                 }
762
763                 /* if still dirty cache but no grant wait for pending RPCs that
764                  * may yet return us some grant before doing sync writes */
765                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
766                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
767                                cli->cl_w_in_flight);
768                         return;
769                 }
770
771                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
772                 list_del_init(&ocw->ocw_entry);
773                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
774                         /* no more RPCs in flight to return grant, do sync IO */
775                         ocw->ocw_rc = -EDQUOT;
776                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
777                 } else {
778                         osc_consume_write_grant(cli,
779                                                 &ocw->ocw_oap->oap_brw_page);
780                 }
781
782                 cfs_waitq_signal(&ocw->ocw_waitq);
783         }
784
785         EXIT;
786 }
787
788 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
789 {
790         client_obd_list_lock(&cli->cl_loi_list_lock);
791         cli->cl_avail_grant = ocd->ocd_grant;
792         client_obd_list_unlock(&cli->cl_loi_list_lock);
793
794         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
795                cli->cl_avail_grant, cli->cl_lost_grant);
796         LASSERT(cli->cl_avail_grant >= 0);
797 }
798
799 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
800 {
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
803         if (body->oa.o_valid & OBD_MD_FLGRANT)
804                 cli->cl_avail_grant += body->oa.o_grant;
805         /* waiters are woken in brw_interpret_oap */
806         client_obd_list_unlock(&cli->cl_loi_list_lock);
807 }
808
809 /* We assume that the reason this OSC got a short read is because it read
810  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
811  * via the LOV, and it _knows_ it's reading inside the file, it's just that
812  * this stripe never got written at or beyond this stripe offset yet. */
813 static void handle_short_read(int nob_read, obd_count page_count,
814                               struct brw_page **pga)
815 {
816         char *ptr;
817         int i = 0;
818
819         /* skip bytes read OK */
820         while (nob_read > 0) {
821                 LASSERT (page_count > 0);
822
823                 if (pga[i]->count > nob_read) {
824                         /* EOF inside this page */
825                         ptr = cfs_kmap(pga[i]->pg) +
826                                 (pga[i]->off & ~CFS_PAGE_MASK);
827                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
828                         cfs_kunmap(pga[i]->pg);
829                         page_count--;
830                         i++;
831                         break;
832                 }
833
834                 nob_read -= pga[i]->count;
835                 page_count--;
836                 i++;
837         }
838
839         /* zero remaining pages */
840         while (page_count-- > 0) {
841                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
842                 memset(ptr, 0, pga[i]->count);
843                 cfs_kunmap(pga[i]->pg);
844                 i++;
845         }
846 }
847
848 static int check_write_rcs(struct ptlrpc_request *req,
849                            int requested_nob, int niocount,
850                            obd_count page_count, struct brw_page **pga)
851 {
852         int    *remote_rcs, i;
853
854         /* return error if any niobuf was in error */
855         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
856                                         sizeof(*remote_rcs) * niocount, NULL);
857         if (remote_rcs == NULL) {
858                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
859                 return(-EPROTO);
860         }
861         if (lustre_msg_swabbed(req->rq_repmsg))
862                 for (i = 0; i < niocount; i++)
863                         __swab32s(&remote_rcs[i]);
864
865         for (i = 0; i < niocount; i++) {
866                 if (remote_rcs[i] < 0)
867                         return(remote_rcs[i]);
868
869                 if (remote_rcs[i] != 0) {
870                         CERROR("rc[%d] invalid (%d) req %p\n",
871                                 i, remote_rcs[i], req);
872                         return(-EPROTO);
873                 }
874         }
875
876         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
877                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
878                        requested_nob, req->rq_bulk->bd_nob_transferred);
879                 return(-EPROTO);
880         }
881
882         return (0);
883 }
884
885 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
886 {
887         if (p1->flag != p2->flag) {
888                 unsigned mask = ~OBD_BRW_FROM_GRANT;
889
890                 /* warn if we try to combine flags that we don't know to be
891                  * safe to combine */
892                 if ((p1->flag & mask) != (p2->flag & mask))
893                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
894                                "same brw?\n", p1->flag, p2->flag);
895                 return 0;
896         }
897
898         return (p1->off + p1->count == p2->off);
899 }
900
901 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
902                                    struct brw_page **pga, int opc)
903 {
904         __u32 cksum = ~0;
905         int i = 0;
906
907         LASSERT (pg_count > 0);
908         while (nob > 0 && pg_count > 0) {
909                 char *ptr = cfs_kmap(pga[i]->pg);
910                 int off = pga[i]->off & ~CFS_PAGE_MASK;
911                 int count = pga[i]->count > nob ? nob : pga[i]->count;
912
913                 /* corrupt the data before we compute the checksum, to
914                  * simulate an OST->client data error */
915                 if (i == 0 && opc == OST_READ &&
916                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
917                         memcpy(ptr + off, "bad1", min(4, nob));
918                 cksum = crc32_le(cksum, ptr + off, count);
919                 cfs_kunmap(pga[i]->pg);
920                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
921                                off, cksum);
922
923                 nob -= pga[i]->count;
924                 pg_count--;
925                 i++;
926         }
927         /* For sending we only compute the wrong checksum instead
928          * of corrupting the data so it is still correct on a redo */
929         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
930                 cksum++;
931
932         return cksum;
933 }
934
935 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
936                                 struct lov_stripe_md *lsm, obd_count page_count,
937                                 struct brw_page **pga, 
938                                 struct ptlrpc_request **reqp,
939                                 struct obd_capa *ocapa)
940 {
941         struct ptlrpc_request   *req;
942         struct ptlrpc_bulk_desc *desc;
943         struct ost_body         *body;
944         struct obd_ioobj        *ioobj;
945         struct niobuf_remote    *niobuf;
946         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
947         int niocount, i, requested_nob, opc, rc;
948         struct ptlrpc_request_pool *pool;
949         struct lustre_capa      *capa;
950         struct osc_brw_async_args *aa;
951
952         ENTRY;
953         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
954         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
955
956         if ((cmd & OBD_BRW_WRITE) != 0) {
957                 opc = OST_WRITE;
958                 pool = cli->cl_import->imp_rq_pool;
959         } else {
960                 opc = OST_READ;
961                 pool = NULL;
962         }
963
964         for (niocount = i = 1; i < page_count; i++) {
965                 if (!can_merge_pages(pga[i - 1], pga[i]))
966                         niocount++;
967         }
968
969         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
970         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
971         if (ocapa)
972                 size[REQ_REC_OFF + 3] = sizeof(*capa);
973
974         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
975                                    size, NULL, pool, NULL);
976         if (req == NULL)
977                 RETURN (-ENOMEM);
978
979         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
980
981         if (opc == OST_WRITE)
982                 desc = ptlrpc_prep_bulk_imp (req, page_count,
983                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
984         else
985                 desc = ptlrpc_prep_bulk_imp (req, page_count,
986                                              BULK_PUT_SINK, OST_BULK_PORTAL);
987         if (desc == NULL)
988                 GOTO(out, rc = -ENOMEM);
989         /* NB request now owns desc and will free it when it gets freed */
990
991         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
992         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
993         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
994                                 niocount * sizeof(*niobuf));
995
996         body->oa = *oa;
997
998         obdo_to_ioobj(oa, ioobj);
999         ioobj->ioo_bufcnt = niocount;
1000         if (ocapa) {
1001                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1002                                       sizeof(*capa));
1003                 capa_cpy(capa, ocapa);
1004                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1005         }
1006
1007         LASSERT (page_count > 0);
1008         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1009                 struct brw_page *pg = pga[i];
1010                 struct brw_page *pg_prev = pga[i - 1];
1011
1012                 LASSERT(pg->count > 0);
1013                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1014                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1015                          pg->off, pg->count);
1016 #ifdef __LINUX__
1017                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1018                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1019                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1020                          i, page_count,
1021                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1022                          pg_prev->pg, page_private(pg_prev->pg),
1023                          pg_prev->pg->index, pg_prev->off);
1024 #else
1025                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1026                          "i %d p_c %u\n", i, page_count);
1027 #endif
1028                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1029                         (pg->flag & OBD_BRW_SRVLOCK));
1030
1031                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1032                                       pg->count);
1033                 requested_nob += pg->count;
1034
1035                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1036                         niobuf--;
1037                         niobuf->len += pg->count;
1038                 } else {
1039                         niobuf->offset = pg->off;
1040                         niobuf->len    = pg->count;
1041                         niobuf->flags  = pg->flag;
1042                 }
1043         }
1044
1045         LASSERT((void *)(niobuf - niocount) ==
1046                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1047                                niocount * sizeof(*niobuf)));
1048         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1049
1050         /* size[REQ_REC_OFF] still sizeof (*body) */
1051         if (opc == OST_WRITE) {
1052                 if (unlikely(cli->cl_checksum)) {
1053                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1054                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1055                                                              page_count, pga,
1056                                                              OST_WRITE);
1057                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1058                                body->oa.o_cksum);
1059                         /* save this in 'oa', too, for later checking */
1060                         oa->o_valid |= OBD_MD_FLCKSUM;
1061                 } else {
1062                         /* clear out the checksum flag, in case this is a
1063                          * resend but cl_checksum is no longer set. b=11238 */
1064                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1065                 }
1066                 oa->o_cksum = body->oa.o_cksum;
1067                 /* 1 RC per niobuf */
1068                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1069                 ptlrpc_req_set_repsize(req, 3, size);
1070         } else {
1071                 if (unlikely(cli->cl_checksum))
1072                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1073                 /* 1 RC for the whole I/O */
1074                 ptlrpc_req_set_repsize(req, 2, size);
1075         }
1076
1077         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1078         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1079         aa->aa_oa = oa;
1080         aa->aa_requested_nob = requested_nob;
1081         aa->aa_nio_count = niocount;
1082         aa->aa_page_count = page_count;
1083         aa->aa_resends = 0;
1084         aa->aa_ppga = pga;
1085         aa->aa_cli = cli;
1086         INIT_LIST_HEAD(&aa->aa_oaps);
1087
1088         *reqp = req;
1089         RETURN (0);
1090
1091  out:
1092         ptlrpc_req_finished (req);
1093         RETURN (rc);
1094 }
1095
1096 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1097                                 __u32 client_cksum, __u32 server_cksum,
1098                                 int nob, obd_count page_count,
1099                                 struct brw_page **pga)
1100 {
1101         __u32 new_cksum;
1102         char *msg;
1103
1104         if (server_cksum == client_cksum) {
1105                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1106                 return 0;
1107         }
1108
1109         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1110
1111         if (new_cksum == server_cksum)
1112                 msg = "changed on the client after we checksummed it - "
1113                       "likely false positive due to mmap IO (bug 11742)";
1114         else if (new_cksum == client_cksum)
1115                 msg = "changed in transit before arrival at OST";
1116         else
1117                 msg = "changed in transit AND doesn't match the original - "
1118                       "likely false positive due to mmap IO (bug 11742)";
1119
1120         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1121                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1122                            "["LPU64"-"LPU64"]\n",
1123                            msg, libcfs_nid2str(peer->nid),
1124                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1125                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1126                                                         (__u64)0,
1127                            oa->o_id,
1128                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1129                            pga[0]->off,
1130                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1131         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1132                client_cksum, server_cksum, new_cksum);
1133         return 1;        
1134 }
1135
1136 /* Note rc enters this function as number of bytes transferred */
1137 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1138 {
1139         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1140         const lnet_process_id_t *peer =
1141                         &req->rq_import->imp_connection->c_peer;
1142         struct client_obd *cli = aa->aa_cli;
1143         struct ost_body *body;
1144         __u32 client_cksum = 0;
1145         ENTRY;
1146
1147         if (rc < 0 && rc != -EDQUOT)
1148                 RETURN(rc);
1149
1150         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1151         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1152                                   lustre_swab_ost_body);
1153         if (body == NULL) {
1154                 CERROR ("Can't unpack body\n");
1155                 RETURN(-EPROTO);
1156         }
1157
1158         /* set/clear over quota flag for a uid/gid */
1159         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1160             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1161                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1162                              body->oa.o_gid, body->oa.o_valid,
1163                              body->oa.o_flags);
1164
1165         if (rc < 0)
1166                 RETURN(rc);
1167
1168         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1169                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1170
1171         osc_update_grant(cli, body);
1172
1173         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1174                 if (rc > 0) {
1175                         CERROR ("Unexpected +ve rc %d\n", rc);
1176                         RETURN(-EPROTO);
1177                 }
1178                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1179
1180                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1181                              client_cksum &&
1182                              check_write_checksum(&body->oa, peer, client_cksum,
1183                                                   body->oa.o_cksum,
1184                                                   aa->aa_requested_nob,
1185                                                   aa->aa_page_count,
1186                                                   aa->aa_ppga)))
1187                         RETURN(-EAGAIN);
1188
1189                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1190                         RETURN(-EAGAIN);
1191
1192                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1193                                      aa->aa_page_count, aa->aa_ppga);
1194                 GOTO(out, rc);
1195         }
1196
1197         /* The rest of this function executes only for OST_READs */
1198         if (rc > aa->aa_requested_nob) {
1199                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1200                        aa->aa_requested_nob);
1201                 RETURN(-EPROTO);
1202         }
1203
1204         if (rc != req->rq_bulk->bd_nob_transferred) {
1205                 CERROR ("Unexpected rc %d (%d transferred)\n",
1206                         rc, req->rq_bulk->bd_nob_transferred);
1207                 return (-EPROTO);
1208         }
1209
1210         if (rc < aa->aa_requested_nob)
1211                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1212
1213         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1214                                          aa->aa_ppga))
1215                 GOTO(out, rc = -EAGAIN);
1216
1217         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1218                 static int cksum_counter;
1219                 __u32      server_cksum = body->oa.o_cksum;
1220                 char      *via;
1221                 char      *router;
1222
1223                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1224                                                  aa->aa_ppga, OST_READ);
1225
1226                 if (peer->nid == req->rq_bulk->bd_sender) {
1227                         via = router = "";
1228                 } else {
1229                         via = " via ";
1230                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1231                 }
1232
1233                 if (server_cksum == ~0 && rc > 0) {
1234                         CERROR("Protocol error: server %s set the 'checksum' "
1235                                "bit, but didn't send a checksum.  Not fatal, "
1236                                "but please tell CFS.\n",
1237                                libcfs_nid2str(peer->nid));
1238                 } else if (server_cksum != client_cksum) {
1239                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1240                                            "%s%s%s inum "LPU64"/"LPU64" object "
1241                                            LPU64"/"LPU64" extent "
1242                                            "["LPU64"-"LPU64"]\n",
1243                                            req->rq_import->imp_obd->obd_name,
1244                                            libcfs_nid2str(peer->nid),
1245                                            via, router,
1246                                            body->oa.o_valid & OBD_MD_FLFID ?
1247                                                 body->oa.o_fid : (__u64)0,
1248                                            body->oa.o_valid & OBD_MD_FLFID ?
1249                                                 body->oa.o_generation :(__u64)0,
1250                                            body->oa.o_id,
1251                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1252                                                 body->oa.o_gr : (__u64)0,
1253                                            aa->aa_ppga[0]->off,
1254                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1255                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1256                                                                         1);
1257                         CERROR("client %x, server %x\n",
1258                                client_cksum, server_cksum);
1259                         cksum_counter = 0;
1260                         aa->aa_oa->o_cksum = client_cksum;
1261                         rc = -EAGAIN;
1262                 } else {
1263                         cksum_counter++;
1264                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1265                         rc = 0;
1266                 }
1267         } else if (unlikely(client_cksum)) {
1268                 static int cksum_missed;
1269
1270                 cksum_missed++;
1271                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1272                         CERROR("Checksum %u requested from %s but not sent\n",
1273                                cksum_missed, libcfs_nid2str(peer->nid));
1274         } else {
1275                 rc = 0;
1276         }
1277 out:
1278         if (rc >= 0)
1279                 *aa->aa_oa = body->oa;
1280
1281         RETURN(rc);
1282 }
1283
1284 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1285                             struct lov_stripe_md *lsm,
1286                             obd_count page_count, struct brw_page **pga,
1287                             struct obd_capa *ocapa)
1288 {
1289         struct ptlrpc_request *req;
1290         int                    rc;
1291         cfs_waitq_t            waitq;
1292         int                    resends = 0;
1293         struct l_wait_info     lwi;
1294
1295         ENTRY;
1296
1297         cfs_waitq_init(&waitq);
1298
1299 restart_bulk:
1300         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1301                                   page_count, pga, &req, ocapa);
1302         if (rc != 0)
1303                 return (rc);
1304
1305         rc = ptlrpc_queue_wait(req);
1306
1307         if (rc == -ETIMEDOUT && req->rq_resend) {
1308                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1309                 ptlrpc_req_finished(req);
1310                 goto restart_bulk;
1311         }
1312
1313         rc = osc_brw_fini_request(req, rc);
1314
1315         ptlrpc_req_finished(req);
1316         if (osc_recoverable_error(rc)) {
1317                 resends++;
1318                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1319                         CERROR("too many resend retries, returning error\n");
1320                         RETURN(-EIO);
1321                 }
1322
1323                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1324                 l_wait_event(waitq, 0, &lwi);
1325
1326                 goto restart_bulk;
1327         }
1328         
1329         RETURN (rc);
1330 }
1331
1332 int osc_brw_redo_request(struct ptlrpc_request *request,
1333                          struct osc_brw_async_args *aa)
1334 {
1335         struct ptlrpc_request *new_req;
1336         struct ptlrpc_request_set *set = request->rq_set;
1337         struct osc_brw_async_args *new_aa;
1338         struct osc_async_page *oap;
1339         int rc = 0;
1340         ENTRY;
1341
1342         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1343                 CERROR("too many resend retries, returning error\n");
1344                 RETURN(-EIO);
1345         }
1346         
1347         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1348 /*
1349         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1350         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1351                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1352                                            REQ_REC_OFF + 3);
1353 */
1354         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1355                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1356                                   aa->aa_cli, aa->aa_oa,
1357                                   NULL /* lsm unused by osc currently */,
1358                                   aa->aa_page_count, aa->aa_ppga, 
1359                                   &new_req, NULL /* ocapa */);
1360         if (rc)
1361                 RETURN(rc);
1362
1363         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1364    
1365         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1366                 if (oap->oap_request != NULL) {
1367                         LASSERTF(request == oap->oap_request,
1368                                  "request %p != oap_request %p\n",
1369                                  request, oap->oap_request);
1370                         if (oap->oap_interrupted) {
1371                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1372                                 ptlrpc_req_finished(new_req);                        
1373                                 RETURN(-EINTR);
1374                         }
1375                 }
1376         }
1377         /* New request takes over pga and oaps from old request.
1378          * Note that copying a list_head doesn't work, need to move it... */
1379         aa->aa_resends++;
1380         new_req->rq_interpret_reply = request->rq_interpret_reply;
1381         new_req->rq_async_args = request->rq_async_args;
1382         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1383
1384         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1385
1386         INIT_LIST_HEAD(&new_aa->aa_oaps);
1387         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1388         INIT_LIST_HEAD(&aa->aa_oaps);
1389
1390         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1391                 if (oap->oap_request) {
1392                         ptlrpc_req_finished(oap->oap_request);
1393                         oap->oap_request = ptlrpc_request_addref(new_req);
1394                 }
1395         }
1396         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1397
1398         DEBUG_REQ(D_INFO, new_req, "new request");
1399
1400         ptlrpc_set_add_req(set, new_req);
1401
1402         RETURN(0);
1403 }
1404
1405 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1406 {
1407         struct osc_brw_async_args *aa = data;
1408         int                        i;
1409         int                        nob = rc;
1410         ENTRY;
1411
1412         rc = osc_brw_fini_request(req, rc);
1413         if (osc_recoverable_error(rc)) {
1414                 rc = osc_brw_redo_request(req, aa);
1415                 if (rc == 0)
1416                         RETURN(0);
1417         }
1418         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1419                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1420
1421         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1422         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1423                 aa->aa_cli->cl_w_in_flight--;
1424         else
1425                 aa->aa_cli->cl_r_in_flight--;
1426         for (i = 0; i < aa->aa_page_count; i++)
1427                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1428         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1429
1430         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1431
1432         RETURN(rc);
1433 }
1434
1435 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1436                           struct lov_stripe_md *lsm, obd_count page_count,
1437                           struct brw_page **pga, struct ptlrpc_request_set *set,
1438                           struct obd_capa *ocapa)
1439 {
1440         struct ptlrpc_request     *req;
1441         struct client_obd         *cli = &exp->exp_obd->u.cli;
1442         int                        rc, i;
1443         struct osc_brw_async_args *aa;
1444         ENTRY;
1445
1446         /* Consume write credits even if doing a sync write -
1447          * otherwise we may run out of space on OST due to grant. */
1448         if (cmd == OBD_BRW_WRITE) {
1449                 spin_lock(&cli->cl_loi_list_lock);
1450                 for (i = 0; i < page_count; i++) {
1451                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1452                                 osc_consume_write_grant(cli, pga[i]);
1453                 }
1454                 spin_unlock(&cli->cl_loi_list_lock);
1455         }
1456
1457         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1458                                   &req, ocapa);
1459
1460         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1461         if (cmd == OBD_BRW_READ) {
1462                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1463                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1464                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1465         } else {
1466                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1467                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1468                                  cli->cl_w_in_flight);
1469                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1470         }
1471
1472         if (rc == 0) {
1473                 req->rq_interpret_reply = brw_interpret;
1474                 ptlrpc_set_add_req(set, req);
1475                 client_obd_list_lock(&cli->cl_loi_list_lock);
1476                 if (cmd == OBD_BRW_READ)
1477                         cli->cl_r_in_flight++;
1478                 else
1479                         cli->cl_w_in_flight++;
1480                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1481         } else if (cmd == OBD_BRW_WRITE) {
1482                 client_obd_list_lock(&cli->cl_loi_list_lock);
1483                 for (i = 0; i < page_count; i++)
1484                         osc_release_write_grant(cli, pga[i], 0);
1485                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1486         }
1487         RETURN (rc);
1488 }
1489
1490 /*
1491  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1492  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1493  * fine for our small page arrays and doesn't require allocation.  its an
1494  * insertion sort that swaps elements that are strides apart, shrinking the
1495  * stride down until its '1' and the array is sorted.
1496  */
1497 static void sort_brw_pages(struct brw_page **array, int num)
1498 {
1499         int stride, i, j;
1500         struct brw_page *tmp;
1501
1502         if (num == 1)
1503                 return;
1504         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1505                 ;
1506
1507         do {
1508                 stride /= 3;
1509                 for (i = stride ; i < num ; i++) {
1510                         tmp = array[i];
1511                         j = i;
1512                         while (j >= stride && array[j - stride]->off > tmp->off) {
1513                                 array[j] = array[j - stride];
1514                                 j -= stride;
1515                         }
1516                         array[j] = tmp;
1517                 }
1518         } while (stride > 1);
1519 }
1520
1521 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1522 {
1523         int count = 1;
1524         int offset;
1525         int i = 0;
1526
1527         LASSERT (pages > 0);
1528         offset = pg[i]->off & ~CFS_PAGE_MASK;
1529
1530         for (;;) {
1531                 pages--;
1532                 if (pages == 0)         /* that's all */
1533                         return count;
1534
1535                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1536                         return count;   /* doesn't end on page boundary */
1537
1538                 i++;
1539                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1540                 if (offset != 0)        /* doesn't start on page boundary */
1541                         return count;
1542
1543                 count++;
1544         }
1545 }
1546
1547 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1548 {
1549         struct brw_page **ppga;
1550         int i;
1551
1552         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1553         if (ppga == NULL)
1554                 return NULL;
1555
1556         for (i = 0; i < count; i++)
1557                 ppga[i] = pga + i;
1558         return ppga;
1559 }
1560
1561 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1562 {
1563         LASSERT(ppga != NULL);
1564         OBD_FREE(ppga, sizeof(*ppga) * count);
1565 }
1566
1567 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1568                    obd_count page_count, struct brw_page *pga,
1569                    struct obd_trans_info *oti)
1570 {
1571         struct obdo *saved_oa = NULL;
1572         struct brw_page **ppga, **orig;
1573         struct obd_import *imp = class_exp2cliimp(exp);
1574         struct client_obd *cli = &imp->imp_obd->u.cli;
1575         int rc, page_count_orig;
1576         ENTRY;
1577
1578         if (cmd & OBD_BRW_CHECK) {
1579                 /* The caller just wants to know if there's a chance that this
1580                  * I/O can succeed */
1581
1582                 if (imp == NULL || imp->imp_invalid)
1583                         RETURN(-EIO);
1584                 RETURN(0);
1585         }
1586
1587         /* test_brw with a failed create can trip this, maybe others. */
1588         LASSERT(cli->cl_max_pages_per_rpc);
1589
1590         rc = 0;
1591
1592         orig = ppga = osc_build_ppga(pga, page_count);
1593         if (ppga == NULL)
1594                 RETURN(-ENOMEM);
1595         page_count_orig = page_count;
1596
1597         sort_brw_pages(ppga, page_count);
1598         while (page_count) {
1599                 obd_count pages_per_brw;
1600
1601                 if (page_count > cli->cl_max_pages_per_rpc)
1602                         pages_per_brw = cli->cl_max_pages_per_rpc;
1603                 else
1604                         pages_per_brw = page_count;
1605
1606                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1607
1608                 if (saved_oa != NULL) {
1609                         /* restore previously saved oa */
1610                         *oinfo->oi_oa = *saved_oa;
1611                 } else if (page_count > pages_per_brw) {
1612                         /* save a copy of oa (brw will clobber it) */
1613                         OBDO_ALLOC(saved_oa);
1614                         if (saved_oa == NULL)
1615                                 GOTO(out, rc = -ENOMEM);
1616                         *saved_oa = *oinfo->oi_oa;
1617                 }
1618
1619                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1620                                       pages_per_brw, ppga, oinfo->oi_capa);
1621
1622                 if (rc != 0)
1623                         break;
1624
1625                 page_count -= pages_per_brw;
1626                 ppga += pages_per_brw;
1627         }
1628
1629 out:
1630         osc_release_ppga(orig, page_count_orig);
1631
1632         if (saved_oa != NULL)
1633                 OBDO_FREE(saved_oa);
1634
1635         RETURN(rc);
1636 }
1637
1638 static int osc_brw_async(int cmd, struct obd_export *exp,
1639                          struct obd_info *oinfo, obd_count page_count,
1640                          struct brw_page *pga, struct obd_trans_info *oti,
1641                          struct ptlrpc_request_set *set)
1642 {
1643         struct brw_page **ppga, **orig;
1644         struct client_obd *cli = &exp->exp_obd->u.cli;
1645         int page_count_orig;
1646         int rc = 0;
1647         ENTRY;
1648
1649         if (cmd & OBD_BRW_CHECK) {
1650                 struct obd_import *imp = class_exp2cliimp(exp);
1651                 /* The caller just wants to know if there's a chance that this
1652                  * I/O can succeed */
1653
1654                 if (imp == NULL || imp->imp_invalid)
1655                         RETURN(-EIO);
1656                 RETURN(0);
1657         }
1658
1659         orig = ppga = osc_build_ppga(pga, page_count);
1660         if (ppga == NULL)
1661                 RETURN(-ENOMEM);
1662         page_count_orig = page_count;
1663
1664         sort_brw_pages(ppga, page_count);
1665         while (page_count) {
1666                 struct brw_page **copy;
1667                 obd_count pages_per_brw;
1668
1669                 pages_per_brw = min_t(obd_count, page_count,
1670                                       cli->cl_max_pages_per_rpc);
1671
1672                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1673
1674                 /* use ppga only if single RPC is going to fly */
1675                 if (pages_per_brw != page_count_orig || ppga != orig) {
1676                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1677                         if (copy == NULL)
1678                                 GOTO(out, rc = -ENOMEM);
1679                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1680                 } else
1681                         copy = ppga;
1682
1683                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1684                                     pages_per_brw, copy, set, oinfo->oi_capa);
1685
1686                 if (rc != 0) {
1687                         if (copy != ppga)
1688                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1689                         break;
1690                 }
1691                 if (copy == orig) {
1692                         /* we passed it to async_internal() which is
1693                          * now responsible for releasing memory */
1694                         orig = NULL;
1695                 }
1696
1697                 page_count -= pages_per_brw;
1698                 ppga += pages_per_brw;
1699         }
1700 out:
1701         if (orig)
1702                 osc_release_ppga(orig, page_count_orig);
1703         RETURN(rc);
1704 }
1705
1706 static void osc_check_rpcs(struct client_obd *cli);
1707
1708 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1709  * the dirty accounting.  Writeback completes or truncate happens before
1710  * writing starts.  Must be called with the loi lock held. */
1711 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1712                            int sent)
1713 {
1714         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1715 }
1716
1717
1718 /* This maintains the lists of pending pages to read/write for a given object
1719  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1720  * to quickly find objects that are ready to send an RPC. */
1721 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1722                          int cmd)
1723 {
1724         int optimal;
1725         ENTRY;
1726
1727         if (lop->lop_num_pending == 0)
1728                 RETURN(0);
1729
1730         /* if we have an invalid import we want to drain the queued pages
1731          * by forcing them through rpcs that immediately fail and complete
1732          * the pages.  recovery relies on this to empty the queued pages
1733          * before canceling the locks and evicting down the llite pages */
1734         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1735                 RETURN(1);
1736
1737         /* stream rpcs in queue order as long as as there is an urgent page
1738          * queued.  this is our cheap solution for good batching in the case
1739          * where writepage marks some random page in the middle of the file
1740          * as urgent because of, say, memory pressure */
1741         if (!list_empty(&lop->lop_urgent)) {
1742                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1743                 RETURN(1);
1744         }
1745         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1746         optimal = cli->cl_max_pages_per_rpc;
1747         if (cmd & OBD_BRW_WRITE) {
1748                 /* trigger a write rpc stream as long as there are dirtiers
1749                  * waiting for space.  as they're waiting, they're not going to
1750                  * create more pages to coallesce with what's waiting.. */
1751                 if (!list_empty(&cli->cl_cache_waiters)) {
1752                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1753                         RETURN(1);
1754                 }
1755                 /* +16 to avoid triggering rpcs that would want to include pages
1756                  * that are being queued but which can't be made ready until
1757                  * the queuer finishes with the page. this is a wart for
1758                  * llite::commit_write() */
1759                 optimal += 16;
1760         }
1761         if (lop->lop_num_pending >= optimal)
1762                 RETURN(1);
1763
1764         RETURN(0);
1765 }
1766
1767 static void on_list(struct list_head *item, struct list_head *list,
1768                     int should_be_on)
1769 {
1770         if (list_empty(item) && should_be_on)
1771                 list_add_tail(item, list);
1772         else if (!list_empty(item) && !should_be_on)
1773                 list_del_init(item);
1774 }
1775
1776 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1777  * can find pages to build into rpcs quickly */
1778 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1779 {
1780         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1781                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1782                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1783
1784         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1785                 loi->loi_write_lop.lop_num_pending);
1786
1787         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1788                 loi->loi_read_lop.lop_num_pending);
1789 }
1790
1791 static void lop_update_pending(struct client_obd *cli,
1792                                struct loi_oap_pages *lop, int cmd, int delta)
1793 {
1794         lop->lop_num_pending += delta;
1795         if (cmd & OBD_BRW_WRITE)
1796                 cli->cl_pending_w_pages += delta;
1797         else
1798                 cli->cl_pending_r_pages += delta;
1799 }
1800
1801 /* this is called when a sync waiter receives an interruption.  Its job is to
1802  * get the caller woken as soon as possible.  If its page hasn't been put in an
1803  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1804  * desiring interruption which will forcefully complete the rpc once the rpc
1805  * has timed out */
1806 static void osc_occ_interrupted(struct oig_callback_context *occ)
1807 {
1808         struct osc_async_page *oap;
1809         struct loi_oap_pages *lop;
1810         struct lov_oinfo *loi;
1811         ENTRY;
1812
1813         /* XXX member_of() */
1814         oap = list_entry(occ, struct osc_async_page, oap_occ);
1815
1816         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1817
1818         oap->oap_interrupted = 1;
1819
1820         /* ok, it's been put in an rpc. only one oap gets a request reference */
1821         if (oap->oap_request != NULL) {
1822                 ptlrpc_mark_interrupted(oap->oap_request);
1823                 ptlrpcd_wake(oap->oap_request);
1824                 GOTO(unlock, 0);
1825         }
1826
1827         /* we don't get interruption callbacks until osc_trigger_group_io()
1828          * has been called and put the sync oaps in the pending/urgent lists.*/
1829         if (!list_empty(&oap->oap_pending_item)) {
1830                 list_del_init(&oap->oap_pending_item);
1831                 list_del_init(&oap->oap_urgent_item);
1832
1833                 loi = oap->oap_loi;
1834                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1835                         &loi->loi_write_lop : &loi->loi_read_lop;
1836                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1837                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1838
1839                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1840                 oap->oap_oig = NULL;
1841         }
1842
1843 unlock:
1844         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1845 }
1846
1847 /* this is trying to propogate async writeback errors back up to the
1848  * application.  As an async write fails we record the error code for later if
1849  * the app does an fsync.  As long as errors persist we force future rpcs to be
1850  * sync so that the app can get a sync error and break the cycle of queueing
1851  * pages for which writeback will fail. */
1852 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1853                            int rc)
1854 {
1855         if (rc) {
1856                 if (!ar->ar_rc)
1857                         ar->ar_rc = rc;
1858
1859                 ar->ar_force_sync = 1;
1860                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1861                 return;
1862
1863         }
1864
1865         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1866                 ar->ar_force_sync = 0;
1867 }
1868
1869 static void osc_oap_to_pending(struct osc_async_page *oap)
1870 {
1871         struct loi_oap_pages *lop;
1872
1873         if (oap->oap_cmd & OBD_BRW_WRITE)
1874                 lop = &oap->oap_loi->loi_write_lop;
1875         else
1876                 lop = &oap->oap_loi->loi_read_lop;
1877
1878         if (oap->oap_async_flags & ASYNC_URGENT)
1879                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1880         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1881         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1882 }
1883
1884 /* this must be called holding the loi list lock to give coverage to exit_cache,
1885  * async_flag maintenance, and oap_request */
1886 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1887                               struct osc_async_page *oap, int sent, int rc)
1888 {
1889         __u64 xid = 0;
1890
1891         ENTRY;
1892         if (oap->oap_request != NULL) {
1893                 xid = ptlrpc_req_xid(oap->oap_request);
1894                 ptlrpc_req_finished(oap->oap_request);
1895                 oap->oap_request = NULL;
1896         }
1897
1898         oap->oap_async_flags = 0;
1899         oap->oap_interrupted = 0;
1900
1901         if (oap->oap_cmd & OBD_BRW_WRITE) {
1902                 osc_process_ar(&cli->cl_ar, xid, rc);
1903                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1904         }
1905
1906         if (rc == 0 && oa != NULL) {
1907                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1908                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1909                 if (oa->o_valid & OBD_MD_FLMTIME)
1910                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1911                 if (oa->o_valid & OBD_MD_FLATIME)
1912                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1913                 if (oa->o_valid & OBD_MD_FLCTIME)
1914                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1915         }
1916
1917         if (oap->oap_oig) {
1918                 osc_exit_cache(cli, oap, sent);
1919                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1920                 oap->oap_oig = NULL;
1921                 EXIT;
1922                 return;
1923         }
1924
1925         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1926                                                 oap->oap_cmd, oa, rc);
1927
1928         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1929          * I/O on the page could start, but OSC calls it under lock
1930          * and thus we can add oap back to pending safely */
1931         if (rc)
1932                 /* upper layer wants to leave the page on pending queue */
1933                 osc_oap_to_pending(oap);
1934         else
1935                 osc_exit_cache(cli, oap, sent);
1936         EXIT;
1937 }
1938
1939 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1940 {
1941         struct osc_async_page *oap, *tmp;
1942         struct osc_brw_async_args *aa = data;
1943         struct client_obd *cli;
1944         ENTRY;
1945
1946         rc = osc_brw_fini_request(req, rc);
1947         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1948         if (osc_recoverable_error(rc)) {
1949                 rc = osc_brw_redo_request(req, aa);
1950                 if (rc == 0)
1951                         RETURN(0);
1952         }
1953
1954         cli = aa->aa_cli;
1955
1956         client_obd_list_lock(&cli->cl_loi_list_lock);
1957
1958         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1959          * is called so we know whether to go to sync BRWs or wait for more
1960          * RPCs to complete */
1961         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1962                 cli->cl_w_in_flight--;
1963         else
1964                 cli->cl_r_in_flight--;
1965
1966         /* the caller may re-use the oap after the completion call so
1967          * we need to clean it up a little */
1968         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1969                 list_del_init(&oap->oap_rpc_item);
1970                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1971         }
1972
1973         osc_wake_cache_waiters(cli);
1974         osc_check_rpcs(cli);
1975
1976         client_obd_list_unlock(&cli->cl_loi_list_lock);
1977
1978         OBDO_FREE(aa->aa_oa);
1979         
1980         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1981         RETURN(rc);
1982 }
1983
1984 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1985                                             struct list_head *rpc_list,
1986                                             int page_count, int cmd)
1987 {
1988         struct ptlrpc_request *req;
1989         struct brw_page **pga = NULL;
1990         struct osc_brw_async_args *aa;
1991         struct obdo *oa = NULL;
1992         struct obd_async_page_ops *ops = NULL;
1993         void *caller_data = NULL;
1994         struct obd_capa *ocapa;
1995         struct osc_async_page *oap;
1996         int i, rc;
1997
1998         ENTRY;
1999         LASSERT(!list_empty(rpc_list));
2000
2001         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2002         if (pga == NULL)
2003                 RETURN(ERR_PTR(-ENOMEM));
2004
2005         OBDO_ALLOC(oa);
2006         if (oa == NULL)
2007                 GOTO(out, req = ERR_PTR(-ENOMEM));
2008
2009         i = 0;
2010         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2011                 if (ops == NULL) {
2012                         ops = oap->oap_caller_ops;
2013                         caller_data = oap->oap_caller_data;
2014                 }
2015                 pga[i] = &oap->oap_brw_page;
2016                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2017                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2018                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2019                 i++;
2020         }
2021
2022         /* always get the data for the obdo for the rpc */
2023         LASSERT(ops != NULL);
2024         ops->ap_fill_obdo(caller_data, cmd, oa);
2025         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2026
2027         sort_brw_pages(pga, page_count);
2028         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2029                                   pga, &req, ocapa);
2030         capa_put(ocapa);
2031         if (rc != 0) {
2032                 CERROR("prep_req failed: %d\n", rc);
2033                 GOTO(out, req = ERR_PTR(rc));
2034         }
2035
2036         /* Need to update the timestamps after the request is built in case
2037          * we race with setattr (locally or in queue at OST).  If OST gets
2038          * later setattr before earlier BRW (as determined by the request xid),
2039          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2040          * way to do this in a single call.  bug 10150 */
2041         ops->ap_update_obdo(caller_data, cmd, oa,
2042                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2043
2044         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2045         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2046         INIT_LIST_HEAD(&aa->aa_oaps);
2047         list_splice(rpc_list, &aa->aa_oaps);
2048         INIT_LIST_HEAD(rpc_list);
2049
2050 out:
2051         if (IS_ERR(req)) {
2052                 if (oa)
2053                         OBDO_FREE(oa);
2054                 if (pga)
2055                         OBD_FREE(pga, sizeof(*pga) * page_count);
2056         }
2057         RETURN(req);
2058 }
2059
2060 /* the loi lock is held across this function but it's allowed to release
2061  * and reacquire it during its work */
2062 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2063                             int cmd, struct loi_oap_pages *lop)
2064 {
2065         struct ptlrpc_request *req;
2066         obd_count page_count = 0;
2067         struct osc_async_page *oap = NULL, *tmp;
2068         struct osc_brw_async_args *aa;
2069         struct obd_async_page_ops *ops;
2070         CFS_LIST_HEAD(rpc_list);
2071         unsigned int ending_offset;
2072         unsigned  starting_offset = 0;
2073         ENTRY;
2074
2075         /* first we find the pages we're allowed to work with */
2076         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2077                                  oap_pending_item) {
2078                 ops = oap->oap_caller_ops;
2079
2080                 LASSERT(oap->oap_magic == OAP_MAGIC);
2081
2082                 /* in llite being 'ready' equates to the page being locked
2083                  * until completion unlocks it.  commit_write submits a page
2084                  * as not ready because its unlock will happen unconditionally
2085                  * as the call returns.  if we race with commit_write giving
2086                  * us that page we dont' want to create a hole in the page
2087                  * stream, so we stop and leave the rpc to be fired by
2088                  * another dirtier or kupdated interval (the not ready page
2089                  * will still be on the dirty list).  we could call in
2090                  * at the end of ll_file_write to process the queue again. */
2091                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2092                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2093                         if (rc < 0)
2094                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2095                                                 "instead of ready\n", oap,
2096                                                 oap->oap_page, rc);
2097                         switch (rc) {
2098                         case -EAGAIN:
2099                                 /* llite is telling us that the page is still
2100                                  * in commit_write and that we should try
2101                                  * and put it in an rpc again later.  we
2102                                  * break out of the loop so we don't create
2103                                  * a hole in the sequence of pages in the rpc
2104                                  * stream.*/
2105                                 oap = NULL;
2106                                 break;
2107                         case -EINTR:
2108                                 /* the io isn't needed.. tell the checks
2109                                  * below to complete the rpc with EINTR */
2110                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2111                                 oap->oap_count = -EINTR;
2112                                 break;
2113                         case 0:
2114                                 oap->oap_async_flags |= ASYNC_READY;
2115                                 break;
2116                         default:
2117                                 LASSERTF(0, "oap %p page %p returned %d "
2118                                             "from make_ready\n", oap,
2119                                             oap->oap_page, rc);
2120                                 break;
2121                         }
2122                 }
2123                 if (oap == NULL)
2124                         break;
2125                 /*
2126                  * Page submitted for IO has to be locked. Either by
2127                  * ->ap_make_ready() or by higher layers.
2128                  *
2129                  * XXX nikita: this assertion should be adjusted when lustre
2130                  * starts using PG_writeback for pages being written out.
2131                  */
2132 #if defined(__KERNEL__) && defined(__LINUX__)
2133                 LASSERT(PageLocked(oap->oap_page));
2134 #endif
2135                 /* If there is a gap at the start of this page, it can't merge
2136                  * with any previous page, so we'll hand the network a
2137                  * "fragmented" page array that it can't transfer in 1 RDMA */
2138                 if (page_count != 0 && oap->oap_page_off != 0)
2139                         break;
2140
2141                 /* take the page out of our book-keeping */
2142                 list_del_init(&oap->oap_pending_item);
2143                 lop_update_pending(cli, lop, cmd, -1);
2144                 list_del_init(&oap->oap_urgent_item);
2145
2146                 if (page_count == 0)
2147                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2148                                           (PTLRPC_MAX_BRW_SIZE - 1);
2149
2150                 /* ask the caller for the size of the io as the rpc leaves. */
2151                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2152                         oap->oap_count =
2153                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2154                 if (oap->oap_count <= 0) {
2155                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2156                                oap->oap_count);
2157                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2158                         continue;
2159                 }
2160
2161                 /* now put the page back in our accounting */
2162                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2163                 if (++page_count >= cli->cl_max_pages_per_rpc)
2164                         break;
2165
2166                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2167                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2168                  * have the same alignment as the initial writes that allocated
2169                  * extents on the server. */
2170                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2171                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2172                 if (ending_offset == 0)
2173                         break;
2174
2175                 /* If there is a gap at the end of this page, it can't merge
2176                  * with any subsequent pages, so we'll hand the network a
2177                  * "fragmented" page array that it can't transfer in 1 RDMA */
2178                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2179                         break;
2180         }
2181
2182         osc_wake_cache_waiters(cli);
2183
2184         if (page_count == 0)
2185                 RETURN(0);
2186
2187         loi_list_maint(cli, loi);
2188
2189         client_obd_list_unlock(&cli->cl_loi_list_lock);
2190
2191         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2192         if (IS_ERR(req)) {
2193                 /* this should happen rarely and is pretty bad, it makes the
2194                  * pending list not follow the dirty order */
2195                 client_obd_list_lock(&cli->cl_loi_list_lock);
2196                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2197                         list_del_init(&oap->oap_rpc_item);
2198
2199                         /* queued sync pages can be torn down while the pages
2200                          * were between the pending list and the rpc */
2201                         if (oap->oap_interrupted) {
2202                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2203                                 osc_ap_completion(cli, NULL, oap, 0,
2204                                                   oap->oap_count);
2205                                 continue;
2206                         }
2207                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2208                 }
2209                 loi_list_maint(cli, loi);
2210                 RETURN(PTR_ERR(req));
2211         }
2212
2213         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2214
2215         if (cmd == OBD_BRW_READ) {
2216                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2217                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2218                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2219                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2220                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2221         } else {
2222                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2223                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2224                                  cli->cl_w_in_flight);
2225                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2226                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2227                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2228         }
2229
2230         client_obd_list_lock(&cli->cl_loi_list_lock);
2231
2232         if (cmd == OBD_BRW_READ)
2233                 cli->cl_r_in_flight++;
2234         else
2235                 cli->cl_w_in_flight++;
2236
2237         /* queued sync pages can be torn down while the pages
2238          * were between the pending list and the rpc */
2239         tmp = NULL;
2240         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2241                 /* only one oap gets a request reference */
2242                 if (tmp == NULL)
2243                         tmp = oap;
2244                 if (oap->oap_interrupted && !req->rq_intr) {
2245                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2246                                oap, req);
2247                         ptlrpc_mark_interrupted(req);
2248                 }
2249         }
2250         if (tmp != NULL)
2251                 tmp->oap_request = ptlrpc_request_addref(req);
2252
2253         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2254                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2255
2256         req->rq_interpret_reply = brw_interpret_oap;
2257         ptlrpcd_add_req(req);
2258         RETURN(1);
2259 }
2260
2261 #define LOI_DEBUG(LOI, STR, args...)                                     \
2262         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2263                !list_empty(&(LOI)->loi_cli_item),                        \
2264                (LOI)->loi_write_lop.lop_num_pending,                     \
2265                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2266                (LOI)->loi_read_lop.lop_num_pending,                      \
2267                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2268                args)                                                     \
2269
2270 /* This is called by osc_check_rpcs() to find which objects have pages that
2271  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2272 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2273 {
2274         ENTRY;
2275         /* first return all objects which we already know to have
2276          * pages ready to be stuffed into rpcs */
2277         if (!list_empty(&cli->cl_loi_ready_list))
2278                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2279                                   struct lov_oinfo, loi_cli_item));
2280
2281         /* then if we have cache waiters, return all objects with queued
2282          * writes.  This is especially important when many small files
2283          * have filled up the cache and not been fired into rpcs because
2284          * they don't pass the nr_pending/object threshhold */
2285         if (!list_empty(&cli->cl_cache_waiters) &&
2286             !list_empty(&cli->cl_loi_write_list))
2287                 RETURN(list_entry(cli->cl_loi_write_list.next,
2288                                   struct lov_oinfo, loi_write_item));
2289
2290         /* then return all queued objects when we have an invalid import
2291          * so that they get flushed */
2292         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2293                 if (!list_empty(&cli->cl_loi_write_list))
2294                         RETURN(list_entry(cli->cl_loi_write_list.next,
2295                                           struct lov_oinfo, loi_write_item));
2296                 if (!list_empty(&cli->cl_loi_read_list))
2297                         RETURN(list_entry(cli->cl_loi_read_list.next,
2298                                           struct lov_oinfo, loi_read_item));
2299         }
2300         RETURN(NULL);
2301 }
2302
2303 /* called with the loi list lock held */
2304 static void osc_check_rpcs(struct client_obd *cli)
2305 {
2306         struct lov_oinfo *loi;
2307         int rc = 0, race_counter = 0;
2308         ENTRY;
2309
2310         while ((loi = osc_next_loi(cli)) != NULL) {
2311                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2312
2313                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2314                         break;
2315
2316                 /* attempt some read/write balancing by alternating between
2317                  * reads and writes in an object.  The makes_rpc checks here
2318                  * would be redundant if we were getting read/write work items
2319                  * instead of objects.  we don't want send_oap_rpc to drain a
2320                  * partial read pending queue when we're given this object to
2321                  * do io on writes while there are cache waiters */
2322                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2323                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2324                                               &loi->loi_write_lop);
2325                         if (rc < 0)
2326                                 break;
2327                         if (rc > 0)
2328                                 race_counter = 0;
2329                         else
2330                                 race_counter++;
2331                 }
2332                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2333                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2334                                               &loi->loi_read_lop);
2335                         if (rc < 0)
2336                                 break;
2337                         if (rc > 0)
2338                                 race_counter = 0;
2339                         else
2340                                 race_counter++;
2341                 }
2342
2343                 /* attempt some inter-object balancing by issueing rpcs
2344                  * for each object in turn */
2345                 if (!list_empty(&loi->loi_cli_item))
2346                         list_del_init(&loi->loi_cli_item);
2347                 if (!list_empty(&loi->loi_write_item))
2348                         list_del_init(&loi->loi_write_item);
2349                 if (!list_empty(&loi->loi_read_item))
2350                         list_del_init(&loi->loi_read_item);
2351
2352                 loi_list_maint(cli, loi);
2353
2354                 /* send_oap_rpc fails with 0 when make_ready tells it to
2355                  * back off.  llite's make_ready does this when it tries
2356                  * to lock a page queued for write that is already locked.
2357                  * we want to try sending rpcs from many objects, but we
2358                  * don't want to spin failing with 0.  */
2359                 if (race_counter == 10)
2360                         break;
2361         }
2362         EXIT;
2363 }
2364
2365 /* we're trying to queue a page in the osc so we're subject to the
2366  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2367  * If the osc's queued pages are already at that limit, then we want to sleep
2368  * until there is space in the osc's queue for us.  We also may be waiting for
2369  * write credits from the OST if there are RPCs in flight that may return some
2370  * before we fall back to sync writes.
2371  *
2372  * We need this know our allocation was granted in the presence of signals */
2373 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2374 {
2375         int rc;
2376         ENTRY;
2377         client_obd_list_lock(&cli->cl_loi_list_lock);
2378         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2379         client_obd_list_unlock(&cli->cl_loi_list_lock);
2380         RETURN(rc);
2381 };
2382
2383 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2384  * grant or cache space. */
2385 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2386                            struct osc_async_page *oap)
2387 {
2388         struct osc_cache_waiter ocw;
2389         struct l_wait_info lwi = { 0 };
2390
2391         ENTRY;
2392
2393         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2394                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2395                cli->cl_dirty_max, obd_max_dirty_pages,
2396                cli->cl_lost_grant, cli->cl_avail_grant);
2397
2398         /* force the caller to try sync io.  this can jump the list
2399          * of queued writes and create a discontiguous rpc stream */
2400         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2401             loi->loi_ar.ar_force_sync)
2402                 RETURN(-EDQUOT);
2403
2404         /* Hopefully normal case - cache space and write credits available */
2405         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2406             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2407             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2408                 /* account for ourselves */
2409                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2410                 RETURN(0);
2411         }
2412
2413         /* Make sure that there are write rpcs in flight to wait for.  This
2414          * is a little silly as this object may not have any pending but
2415          * other objects sure might. */
2416         if (cli->cl_w_in_flight) {
2417                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2418                 cfs_waitq_init(&ocw.ocw_waitq);
2419                 ocw.ocw_oap = oap;
2420                 ocw.ocw_rc = 0;
2421
2422                 loi_list_maint(cli, loi);
2423                 osc_check_rpcs(cli);
2424                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2425
2426                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2427                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2428
2429                 client_obd_list_lock(&cli->cl_loi_list_lock);
2430                 if (!list_empty(&ocw.ocw_entry)) {
2431                         list_del(&ocw.ocw_entry);
2432                         RETURN(-EINTR);
2433                 }
2434                 RETURN(ocw.ocw_rc);
2435         }
2436
2437         RETURN(-EDQUOT);
2438 }
2439
2440 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2441                         struct lov_oinfo *loi, cfs_page_t *page,
2442                         obd_off offset, struct obd_async_page_ops *ops,
2443                         void *data, void **res)
2444 {
2445         struct osc_async_page *oap;
2446         ENTRY;
2447
2448         if (!page)
2449                 return size_round(sizeof(*oap));
2450
2451         oap = *res;
2452         oap->oap_magic = OAP_MAGIC;
2453         oap->oap_cli = &exp->exp_obd->u.cli;
2454         oap->oap_loi = loi;
2455
2456         oap->oap_caller_ops = ops;
2457         oap->oap_caller_data = data;
2458
2459         oap->oap_page = page;
2460         oap->oap_obj_off = offset;
2461
2462         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2463         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2464         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2465
2466         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2467
2468         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2469         RETURN(0);
2470 }
2471
2472 struct osc_async_page *oap_from_cookie(void *cookie)
2473 {
2474         struct osc_async_page *oap = cookie;
2475         if (oap->oap_magic != OAP_MAGIC)
2476                 return ERR_PTR(-EINVAL);
2477         return oap;
2478 };
2479
2480 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2481                               struct lov_oinfo *loi, void *cookie,
2482                               int cmd, obd_off off, int count,
2483                               obd_flag brw_flags, enum async_flags async_flags)
2484 {
2485         struct client_obd *cli = &exp->exp_obd->u.cli;
2486         struct osc_async_page *oap;
2487         int rc = 0;
2488         ENTRY;
2489
2490         oap = oap_from_cookie(cookie);
2491         if (IS_ERR(oap))
2492                 RETURN(PTR_ERR(oap));
2493
2494         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2495                 RETURN(-EIO);
2496
2497         if (!list_empty(&oap->oap_pending_item) ||
2498             !list_empty(&oap->oap_urgent_item) ||
2499             !list_empty(&oap->oap_rpc_item))
2500                 RETURN(-EBUSY);
2501
2502         /* check if the file's owner/group is over quota */
2503 #ifdef HAVE_QUOTA_SUPPORT
2504         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2505                 struct obd_async_page_ops *ops;
2506                 struct obdo *oa;
2507
2508                 OBDO_ALLOC(oa);
2509                 if (oa == NULL)
2510                         RETURN(-ENOMEM);
2511
2512                 ops = oap->oap_caller_ops;
2513                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2514                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2515                     NO_QUOTA)
2516                         rc = -EDQUOT;
2517
2518                 OBDO_FREE(oa);
2519                 if (rc)
2520                         RETURN(rc);
2521         }
2522 #endif
2523
2524         if (loi == NULL)
2525                 loi = lsm->lsm_oinfo[0];
2526
2527         client_obd_list_lock(&cli->cl_loi_list_lock);
2528
2529         oap->oap_cmd = cmd;
2530         oap->oap_page_off = off;
2531         oap->oap_count = count;
2532         oap->oap_brw_flags = brw_flags;
2533         oap->oap_async_flags = async_flags;
2534
2535         if (cmd & OBD_BRW_WRITE) {
2536                 rc = osc_enter_cache(cli, loi, oap);
2537                 if (rc) {
2538                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2539                         RETURN(rc);
2540                 }
2541         }
2542
2543         osc_oap_to_pending(oap);
2544         loi_list_maint(cli, loi);
2545
2546         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2547                   cmd);
2548
2549         osc_check_rpcs(cli);
2550         client_obd_list_unlock(&cli->cl_loi_list_lock);
2551
2552         RETURN(0);
2553 }
2554
2555 /* aka (~was & now & flag), but this is more clear :) */
2556 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2557
2558 static int osc_set_async_flags(struct obd_export *exp,
2559                                struct lov_stripe_md *lsm,
2560                                struct lov_oinfo *loi, void *cookie,
2561                                obd_flag async_flags)
2562 {
2563         struct client_obd *cli = &exp->exp_obd->u.cli;
2564         struct loi_oap_pages *lop;
2565         struct osc_async_page *oap;
2566         int rc = 0;
2567         ENTRY;
2568
2569         oap = oap_from_cookie(cookie);
2570         if (IS_ERR(oap))
2571                 RETURN(PTR_ERR(oap));
2572
2573         /*
2574          * bug 7311: OST-side locking is only supported for liblustre for now
2575          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2576          * implementation has to handle case where OST-locked page was picked
2577          * up by, e.g., ->writepage().
2578          */
2579         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2580         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2581                                      * tread here. */
2582
2583         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2584                 RETURN(-EIO);
2585
2586         if (loi == NULL)
2587                 loi = lsm->lsm_oinfo[0];
2588
2589         if (oap->oap_cmd & OBD_BRW_WRITE) {
2590                 lop = &loi->loi_write_lop;
2591         } else {
2592                 lop = &loi->loi_read_lop;
2593         }
2594
2595         client_obd_list_lock(&cli->cl_loi_list_lock);
2596
2597         if (list_empty(&oap->oap_pending_item))
2598                 GOTO(out, rc = -EINVAL);
2599
2600         if ((oap->oap_async_flags & async_flags) == async_flags)
2601                 GOTO(out, rc = 0);
2602
2603         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2604                 oap->oap_async_flags |= ASYNC_READY;
2605
2606         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2607                 if (list_empty(&oap->oap_rpc_item)) {
2608                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2609                         loi_list_maint(cli, loi);
2610                 }
2611         }
2612
2613         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2614                         oap->oap_async_flags);
2615 out:
2616         osc_check_rpcs(cli);
2617         client_obd_list_unlock(&cli->cl_loi_list_lock);
2618         RETURN(rc);
2619 }
2620
2621 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2622                              struct lov_oinfo *loi,
2623                              struct obd_io_group *oig, void *cookie,
2624                              int cmd, obd_off off, int count,
2625                              obd_flag brw_flags,
2626                              obd_flag async_flags)
2627 {
2628         struct client_obd *cli = &exp->exp_obd->u.cli;
2629         struct osc_async_page *oap;
2630         struct loi_oap_pages *lop;
2631         int rc = 0;
2632         ENTRY;
2633
2634         oap = oap_from_cookie(cookie);
2635         if (IS_ERR(oap))
2636                 RETURN(PTR_ERR(oap));
2637
2638         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2639                 RETURN(-EIO);
2640
2641         if (!list_empty(&oap->oap_pending_item) ||
2642             !list_empty(&oap->oap_urgent_item) ||
2643             !list_empty(&oap->oap_rpc_item))
2644                 RETURN(-EBUSY);
2645
2646         if (loi == NULL)
2647                 loi = lsm->lsm_oinfo[0];
2648
2649         client_obd_list_lock(&cli->cl_loi_list_lock);
2650
2651         oap->oap_cmd = cmd;
2652         oap->oap_page_off = off;
2653         oap->oap_count = count;
2654         oap->oap_brw_flags = brw_flags;
2655         oap->oap_async_flags = async_flags;
2656
2657         if (cmd & OBD_BRW_WRITE)
2658                 lop = &loi->loi_write_lop;
2659         else
2660                 lop = &loi->loi_read_lop;
2661
2662         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2663         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2664                 oap->oap_oig = oig;
2665                 rc = oig_add_one(oig, &oap->oap_occ);
2666         }
2667
2668         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2669                   oap, oap->oap_page, rc);
2670
2671         client_obd_list_unlock(&cli->cl_loi_list_lock);
2672
2673         RETURN(rc);
2674 }
2675
2676 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2677                                  struct loi_oap_pages *lop, int cmd)
2678 {
2679         struct list_head *pos, *tmp;
2680         struct osc_async_page *oap;
2681
2682         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2683                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2684                 list_del(&oap->oap_pending_item);
2685                 osc_oap_to_pending(oap);
2686         }
2687         loi_list_maint(cli, loi);
2688 }
2689
2690 static int osc_trigger_group_io(struct obd_export *exp,
2691                                 struct lov_stripe_md *lsm,
2692                                 struct lov_oinfo *loi,
2693                                 struct obd_io_group *oig)
2694 {
2695         struct client_obd *cli = &exp->exp_obd->u.cli;
2696         ENTRY;
2697
2698         if (loi == NULL)
2699                 loi = lsm->lsm_oinfo[0];
2700
2701         client_obd_list_lock(&cli->cl_loi_list_lock);
2702
2703         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2704         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2705
2706         osc_check_rpcs(cli);
2707         client_obd_list_unlock(&cli->cl_loi_list_lock);
2708
2709         RETURN(0);
2710 }
2711
2712 static int osc_teardown_async_page(struct obd_export *exp,
2713                                    struct lov_stripe_md *lsm,
2714                                    struct lov_oinfo *loi, void *cookie)
2715 {
2716         struct client_obd *cli = &exp->exp_obd->u.cli;
2717         struct loi_oap_pages *lop;
2718         struct osc_async_page *oap;
2719         int rc = 0;
2720         ENTRY;
2721
2722         oap = oap_from_cookie(cookie);
2723         if (IS_ERR(oap))
2724                 RETURN(PTR_ERR(oap));
2725
2726         if (loi == NULL)
2727                 loi = lsm->lsm_oinfo[0];
2728
2729         if (oap->oap_cmd & OBD_BRW_WRITE) {
2730                 lop = &loi->loi_write_lop;
2731         } else {
2732                 lop = &loi->loi_read_lop;
2733         }
2734
2735         client_obd_list_lock(&cli->cl_loi_list_lock);
2736
2737         if (!list_empty(&oap->oap_rpc_item))
2738                 GOTO(out, rc = -EBUSY);
2739
2740         osc_exit_cache(cli, oap, 0);
2741         osc_wake_cache_waiters(cli);
2742
2743         if (!list_empty(&oap->oap_urgent_item)) {
2744                 list_del_init(&oap->oap_urgent_item);
2745                 oap->oap_async_flags &= ~ASYNC_URGENT;
2746         }
2747         if (!list_empty(&oap->oap_pending_item)) {
2748                 list_del_init(&oap->oap_pending_item);
2749                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2750         }
2751         loi_list_maint(cli, loi);
2752
2753         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2754 out:
2755         client_obd_list_unlock(&cli->cl_loi_list_lock);
2756         RETURN(rc);
2757 }
2758
2759 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2760                                     int flags)
2761 {
2762         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2763
2764         if (lock == NULL) {
2765                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2766                 return;
2767         }
2768         lock_res_and_lock(lock);
2769 #ifdef __KERNEL__
2770 #ifdef __LINUX__
2771         /* Liang XXX: Darwin and Winnt checking should be added */
2772         if (lock->l_ast_data && lock->l_ast_data != data) {
2773                 struct inode *new_inode = data;
2774                 struct inode *old_inode = lock->l_ast_data;
2775                 if (!(old_inode->i_state & I_FREEING))
2776                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2777                 LASSERTF(old_inode->i_state & I_FREEING,
2778                          "Found existing inode %p/%lu/%u state %lu in lock: "
2779                          "setting data to %p/%lu/%u\n", old_inode,
2780                          old_inode->i_ino, old_inode->i_generation,
2781                          old_inode->i_state,
2782                          new_inode, new_inode->i_ino, new_inode->i_generation);
2783         }
2784 #endif
2785 #endif
2786         lock->l_ast_data = data;
2787         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2788         unlock_res_and_lock(lock);
2789         LDLM_LOCK_PUT(lock);
2790 }
2791
2792 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2793                              ldlm_iterator_t replace, void *data)
2794 {
2795         struct ldlm_res_id res_id = { .name = {0} };
2796         struct obd_device *obd = class_exp2obd(exp);
2797
2798         res_id.name[0] = lsm->lsm_object_id;
2799         res_id.name[2] = lsm->lsm_object_gr;
2800
2801         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2802         return 0;
2803 }
2804
2805 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2806                             int intent, int rc)
2807 {
2808         ENTRY;
2809
2810         if (intent) {
2811                 /* The request was created before ldlm_cli_enqueue call. */
2812                 if (rc == ELDLM_LOCK_ABORTED) {
2813                         struct ldlm_reply *rep;
2814
2815                         /* swabbed by ldlm_cli_enqueue() */
2816                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2817                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2818                                              sizeof(*rep));
2819                         LASSERT(rep != NULL);
2820                         if (rep->lock_policy_res1)
2821                                 rc = rep->lock_policy_res1;
2822                 }
2823         }
2824
2825         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2826                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2827                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2828                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2829                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2830         }
2831
2832         /* Call the update callback. */
2833         rc = oinfo->oi_cb_up(oinfo, rc);
2834         RETURN(rc);
2835 }
2836
2837 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2838                                  struct osc_enqueue_args *aa, int rc)
2839 {
2840         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2841         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2842         struct ldlm_lock *lock;
2843
2844         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2845          * be valid. */
2846         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2847
2848         /* Complete obtaining the lock procedure. */
2849         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2850                                    aa->oa_ei->ei_mode,
2851                                    &aa->oa_oi->oi_flags,
2852                                    &lsm->lsm_oinfo[0]->loi_lvb,
2853                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2854                                    lustre_swab_ost_lvb,
2855                                    aa->oa_oi->oi_lockh, rc);
2856
2857         /* Complete osc stuff. */
2858         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2859
2860         /* Release the lock for async request. */
2861         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2862                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2863
2864         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2865                  aa->oa_oi->oi_lockh, req, aa);
2866         LDLM_LOCK_PUT(lock);
2867         return rc;
2868 }
2869
2870 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2871  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2872  * other synchronous requests, however keeping some locks and trying to obtain
2873  * others may take a considerable amount of time in a case of ost failure; and
2874  * when other sync requests do not get released lock from a client, the client
2875  * is excluded from the cluster -- such scenarious make the life difficult, so
2876  * release locks just after they are obtained. */
2877 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2878                        struct ldlm_enqueue_info *einfo,
2879                        struct ptlrpc_request_set *rqset)
2880 {
2881         struct ldlm_res_id res_id = { .name = {0} };
2882         struct obd_device *obd = exp->exp_obd;
2883         struct ldlm_reply *rep;
2884         struct ptlrpc_request *req = NULL;
2885         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2886         int rc;
2887         ENTRY;
2888
2889         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2890         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2891
2892         /* Filesystem lock extents are extended to page boundaries so that
2893          * dealing with the page cache is a little smoother.  */
2894         oinfo->oi_policy.l_extent.start -=
2895                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2896         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2897
2898         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2899                 goto no_match;
2900
2901         /* Next, search for already existing extent locks that will cover us */
2902         rc = ldlm_lock_match(obd->obd_namespace,
2903                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2904                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2905                              oinfo->oi_lockh);
2906         if (rc == 1) {
2907                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2908                                         oinfo->oi_flags);
2909                 if (intent) {
2910                         /* I would like to be able to ASSERT here that rss <=
2911                          * kms, but I can't, for reasons which are explained in
2912                          * lov_enqueue() */
2913                 }
2914
2915                 /* We already have a lock, and it's referenced */
2916                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2917
2918                 /* For async requests, decref the lock. */
2919                 if (rqset)
2920                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2921
2922                 RETURN(ELDLM_OK);
2923         }
2924
2925         /* If we're trying to read, we also search for an existing PW lock.  The
2926          * VFS and page cache already protect us locally, so lots of readers/
2927          * writers can share a single PW lock.
2928          *
2929          * There are problems with conversion deadlocks, so instead of
2930          * converting a read lock to a write lock, we'll just enqueue a new
2931          * one.
2932          *
2933          * At some point we should cancel the read lock instead of making them
2934          * send us a blocking callback, but there are problems with canceling
2935          * locks out from other users right now, too. */
2936
2937         if (einfo->ei_mode == LCK_PR) {
2938                 rc = ldlm_lock_match(obd->obd_namespace,
2939                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2940                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2941                                      LCK_PW, oinfo->oi_lockh);
2942                 if (rc == 1) {
2943                         /* FIXME: This is not incredibly elegant, but it might
2944                          * be more elegant than adding another parameter to
2945                          * lock_match.  I want a second opinion. */
2946                         /* addref the lock only if not async requests. */
2947                         if (!rqset)
2948                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2949                         osc_set_data_with_check(oinfo->oi_lockh,
2950                                                 einfo->ei_cbdata,
2951                                                 oinfo->oi_flags);
2952                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2953                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2954                         RETURN(ELDLM_OK);
2955                 }
2956         }
2957
2958  no_match:
2959         if (intent) {
2960                 int size[3] = {
2961                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2962                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2963                         [DLM_LOCKREQ_OFF + 1] = 0 };
2964
2965                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2966                 if (req == NULL)
2967                         RETURN(-ENOMEM);
2968
2969                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2970                 size[DLM_REPLY_REC_OFF] =
2971                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2972                 ptlrpc_req_set_repsize(req, 3, size);
2973         }
2974
2975         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2976         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2977
2978         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2979                               &oinfo->oi_policy, &oinfo->oi_flags,
2980                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2981                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2982                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2983                               rqset ? 1 : 0);
2984         if (rqset) {
2985                 if (!rc) {
2986                         struct osc_enqueue_args *aa;
2987                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2988                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2989                         aa->oa_oi = oinfo;
2990                         aa->oa_ei = einfo;
2991                         aa->oa_exp = exp;
2992
2993                         req->rq_interpret_reply = osc_enqueue_interpret;
2994                         ptlrpc_set_add_req(rqset, req);
2995                 } else if (intent) {
2996                         ptlrpc_req_finished(req);
2997                 }
2998                 RETURN(rc);
2999         }
3000
3001         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3002         if (intent)
3003                 ptlrpc_req_finished(req);
3004
3005         RETURN(rc);
3006 }
3007
3008 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3009                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3010                      int *flags, void *data, struct lustre_handle *lockh)
3011 {
3012         struct ldlm_res_id res_id = { .name = {0} };
3013         struct obd_device *obd = exp->exp_obd;
3014         int rc;
3015         int lflags = *flags;
3016         ENTRY;
3017
3018         res_id.name[0] = lsm->lsm_object_id;
3019         res_id.name[2] = lsm->lsm_object_gr;
3020
3021         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3022
3023         /* Filesystem lock extents are extended to page boundaries so that
3024          * dealing with the page cache is a little smoother */
3025         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3026         policy->l_extent.end |= ~CFS_PAGE_MASK;
3027
3028         /* Next, search for already existing extent locks that will cover us */
3029         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3030                              &res_id, type, policy, mode, lockh);
3031         if (rc) {
3032                 //if (!(*flags & LDLM_FL_TEST_LOCK))
3033                         osc_set_data_with_check(lockh, data, lflags);
3034                 RETURN(rc);
3035         }
3036         /* If we're trying to read, we also search for an existing PW lock.  The
3037          * VFS and page cache already protect us locally, so lots of readers/
3038          * writers can share a single PW lock. */
3039         if (mode == LCK_PR) {
3040                 rc = ldlm_lock_match(obd->obd_namespace,
3041                                      lflags | LDLM_FL_LVB_READY, &res_id,
3042                                      type, policy, LCK_PW, lockh);
3043                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3044                         /* FIXME: This is not incredibly elegant, but it might
3045                          * be more elegant than adding another parameter to
3046                          * lock_match.  I want a second opinion. */
3047                         osc_set_data_with_check(lockh, data, lflags);
3048                         ldlm_lock_addref(lockh, LCK_PR);
3049                         ldlm_lock_decref(lockh, LCK_PW);
3050                 }
3051         }
3052         RETURN(rc);
3053 }
3054
3055 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3056                       __u32 mode, struct lustre_handle *lockh)
3057 {
3058         ENTRY;
3059
3060         if (unlikely(mode == LCK_GROUP))
3061                 ldlm_lock_decref_and_cancel(lockh, mode);
3062         else
3063                 ldlm_lock_decref(lockh, mode);
3064
3065         RETURN(0);
3066 }
3067
3068 static int osc_cancel_unused(struct obd_export *exp,
3069                              struct lov_stripe_md *lsm, int flags,
3070                              void *opaque)
3071 {
3072         struct obd_device *obd = class_exp2obd(exp);
3073         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3074
3075         if (lsm != NULL) {
3076                 res_id.name[0] = lsm->lsm_object_id;
3077                 res_id.name[2] = lsm->lsm_object_gr;
3078                 resp = &res_id;
3079         }
3080
3081         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3082 }
3083
3084 static int osc_join_lru(struct obd_export *exp,
3085                         struct lov_stripe_md *lsm, int join)
3086 {
3087         struct obd_device *obd = class_exp2obd(exp);
3088         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3089
3090         if (lsm != NULL) {
3091                 res_id.name[0] = lsm->lsm_object_id;
3092                 res_id.name[2] = lsm->lsm_object_gr;
3093                 resp = &res_id;
3094         }
3095
3096         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3097 }
3098
3099 static int osc_statfs_interpret(struct ptlrpc_request *req,
3100                                 struct osc_async_args *aa, int rc)
3101 {
3102         struct obd_statfs *msfs;
3103         ENTRY;
3104
3105         if (rc != 0)
3106                 GOTO(out, rc);
3107
3108         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3109                                   lustre_swab_obd_statfs);
3110         if (msfs == NULL) {
3111                 CERROR("Can't unpack obd_statfs\n");
3112                 GOTO(out, rc = -EPROTO);
3113         }
3114
3115         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3116 out:
3117         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3118         RETURN(rc);
3119 }
3120
3121 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3122                             __u64 max_age, struct ptlrpc_request_set *rqset)
3123 {
3124         struct ptlrpc_request *req;
3125         struct osc_async_args *aa;
3126         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3127         ENTRY;
3128
3129         /* We could possibly pass max_age in the request (as an absolute
3130          * timestamp or a "seconds.usec ago") so the target can avoid doing
3131          * extra calls into the filesystem if that isn't necessary (e.g.
3132          * during mount that would help a bit).  Having relative timestamps
3133          * is not so great if request processing is slow, while absolute
3134          * timestamps are not ideal because they need time synchronization. */
3135         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3136                               OST_STATFS, 1, NULL, NULL);
3137         if (!req)
3138                 RETURN(-ENOMEM);
3139
3140         ptlrpc_req_set_repsize(req, 2, size);
3141         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3142
3143         req->rq_interpret_reply = osc_statfs_interpret;
3144         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3145         aa = (struct osc_async_args *)&req->rq_async_args;
3146         aa->aa_oi = oinfo;
3147
3148         ptlrpc_set_add_req(rqset, req);
3149         RETURN(0);
3150 }
3151
3152 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3153                       __u64 max_age)
3154 {
3155         struct obd_statfs *msfs;
3156         struct ptlrpc_request *req;
3157         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3158         ENTRY;
3159
3160         /* We could possibly pass max_age in the request (as an absolute
3161          * timestamp or a "seconds.usec ago") so the target can avoid doing
3162          * extra calls into the filesystem if that isn't necessary (e.g.
3163          * during mount that would help a bit).  Having relative timestamps
3164          * is not so great if request processing is slow, while absolute
3165          * timestamps are not ideal because they need time synchronization. */
3166         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3167                               OST_STATFS, 1, NULL, NULL);
3168         if (!req)
3169                 RETURN(-ENOMEM);
3170
3171         ptlrpc_req_set_repsize(req, 2, size);
3172         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3173
3174         rc = ptlrpc_queue_wait(req);
3175         if (rc)
3176                 GOTO(out, rc);
3177
3178         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3179                                   lustre_swab_obd_statfs);
3180         if (msfs == NULL) {
3181                 CERROR("Can't unpack obd_statfs\n");
3182                 GOTO(out, rc = -EPROTO);
3183         }
3184
3185         memcpy(osfs, msfs, sizeof(*osfs));
3186
3187         EXIT;
3188  out:
3189         ptlrpc_req_finished(req);
3190         return rc;
3191 }
3192
3193 /* Retrieve object striping information.
3194  *
3195  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3196  * the maximum number of OST indices which will fit in the user buffer.
3197  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3198  */
3199 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3200 {
3201         struct lov_user_md lum, *lumk;
3202         int rc = 0, lum_size;
3203         ENTRY;
3204
3205         if (!lsm)
3206                 RETURN(-ENODATA);
3207
3208         if (copy_from_user(&lum, lump, sizeof(lum)))
3209                 RETURN(-EFAULT);
3210
3211         if (lum.lmm_magic != LOV_USER_MAGIC)
3212                 RETURN(-EINVAL);
3213
3214         if (lum.lmm_stripe_count > 0) {
3215                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3216                 OBD_ALLOC(lumk, lum_size);
3217                 if (!lumk)
3218                         RETURN(-ENOMEM);
3219
3220                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3221                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3222         } else {
3223                 lum_size = sizeof(lum);
3224                 lumk = &lum;
3225         }
3226
3227         lumk->lmm_object_id = lsm->lsm_object_id;
3228         lumk->lmm_object_gr = lsm->lsm_object_gr;
3229         lumk->lmm_stripe_count = 1;
3230
3231         if (copy_to_user(lump, lumk, lum_size))
3232                 rc = -EFAULT;
3233
3234         if (lumk != &lum)
3235                 OBD_FREE(lumk, lum_size);
3236
3237         RETURN(rc);
3238 }
3239
3240
3241 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3242                          void *karg, void *uarg)
3243 {
3244         struct obd_device *obd = exp->exp_obd;
3245         struct obd_ioctl_data *data = karg;
3246         int err = 0;
3247         ENTRY;
3248
3249 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3250         MOD_INC_USE_COUNT;
3251 #else
3252         if (!try_module_get(THIS_MODULE)) {
3253                 CERROR("Can't get module. Is it alive?");
3254                 return -EINVAL;
3255         }
3256 #endif
3257         switch (cmd) {
3258         case OBD_IOC_LOV_GET_CONFIG: {
3259                 char *buf;
3260                 struct lov_desc *desc;
3261                 struct obd_uuid uuid;
3262
3263                 buf = NULL;
3264                 len = 0;
3265                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3266                         GOTO(out, err = -EINVAL);
3267
3268                 data = (struct obd_ioctl_data *)buf;
3269
3270                 if (sizeof(*desc) > data->ioc_inllen1) {
3271                         obd_ioctl_freedata(buf, len);
3272                         GOTO(out, err = -EINVAL);
3273                 }
3274
3275                 if (data->ioc_inllen2 < sizeof(uuid)) {
3276                         obd_ioctl_freedata(buf, len);
3277                         GOTO(out, err = -EINVAL);
3278                 }
3279
3280                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3281                 desc->ld_tgt_count = 1;
3282                 desc->ld_active_tgt_count = 1;
3283                 desc->ld_default_stripe_count = 1;
3284                 desc->ld_default_stripe_size = 0;
3285                 desc->ld_default_stripe_offset = 0;
3286                 desc->ld_pattern = 0;
3287                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3288
3289                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3290
3291                 err = copy_to_user((void *)uarg, buf, len);
3292                 if (err)
3293                         err = -EFAULT;
3294                 obd_ioctl_freedata(buf, len);
3295                 GOTO(out, err);
3296         }
3297         case LL_IOC_LOV_SETSTRIPE:
3298                 err = obd_alloc_memmd(exp, karg);
3299                 if (err > 0)
3300                         err = 0;
3301                 GOTO(out, err);
3302         case LL_IOC_LOV_GETSTRIPE:
3303                 err = osc_getstripe(karg, uarg);
3304                 GOTO(out, err);
3305         case OBD_IOC_CLIENT_RECOVER:
3306                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3307                                             data->ioc_inlbuf1);
3308                 if (err > 0)
3309                         err = 0;
3310                 GOTO(out, err);
3311         case IOC_OSC_SET_ACTIVE:
3312                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3313                                                data->ioc_offset);
3314                 GOTO(out, err);
3315         case OBD_IOC_POLL_QUOTACHECK:
3316                 err = lquota_poll_check(quota_interface, exp,
3317                                         (struct if_quotacheck *)karg);
3318                 GOTO(out, err);
3319         default:
3320                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3321                        cmd, cfs_curproc_comm());
3322                 GOTO(out, err = -ENOTTY);
3323         }
3324 out:
3325 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3326         MOD_DEC_USE_COUNT;
3327 #else
3328         module_put(THIS_MODULE);
3329 #endif
3330         return err;
3331 }
3332
3333 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3334                         void *key, __u32 *vallen, void *val)
3335 {
3336         ENTRY;
3337         if (!vallen || !val)
3338                 RETURN(-EFAULT);
3339
3340         if (keylen > strlen("lock_to_stripe") &&
3341             strcmp(key, "lock_to_stripe") == 0) {
3342                 __u32 *stripe = val;
3343                 *vallen = sizeof(*stripe);
3344                 *stripe = 0;
3345                 RETURN(0);
3346         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3347                 struct ptlrpc_request *req;
3348                 obd_id *reply;
3349                 char *bufs[2] = { NULL, key };
3350                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3351
3352                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3353                                       OST_GET_INFO, 2, size, bufs);
3354                 if (req == NULL)
3355                         RETURN(-ENOMEM);
3356
3357                 size[REPLY_REC_OFF] = *vallen;
3358                 ptlrpc_req_set_repsize(req, 2, size);
3359                 rc = ptlrpc_queue_wait(req);
3360                 if (rc)
3361                         GOTO(out, rc);
3362
3363                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3364                                            lustre_swab_ost_last_id);
3365                 if (reply == NULL) {
3366                         CERROR("Can't unpack OST last ID\n");
3367                         GOTO(out, rc = -EPROTO);
3368                 }
3369                 *((obd_id *)val) = *reply;
3370         out:
3371                 ptlrpc_req_finished(req);
3372                 RETURN(rc);
3373         }
3374         RETURN(-EINVAL);
3375 }
3376
3377 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3378                                           void *aa, int rc)
3379 {
3380         struct llog_ctxt *ctxt;
3381         struct obd_import *imp = req->rq_import;
3382         ENTRY;
3383
3384         if (rc != 0)
3385                 RETURN(rc);
3386
3387         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3388         if (ctxt) {
3389                 if (rc == 0)
3390                         rc = llog_initiator_connect(ctxt);
3391                 else
3392                         CERROR("cannot establish connection for "
3393                                "ctxt %p: %d\n", ctxt, rc);
3394         }
3395
3396         spin_lock(&imp->imp_lock);
3397         imp->imp_server_timeout = 1;
3398         imp->imp_pingable = 1;
3399         spin_unlock(&imp->imp_lock);
3400         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3401
3402         RETURN(rc);
3403 }
3404
3405 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3406                               void *key, obd_count vallen, void *val,
3407                               struct ptlrpc_request_set *set)
3408 {
3409         struct ptlrpc_request *req;
3410         struct obd_device  *obd = exp->exp_obd;
3411         struct obd_import *imp = class_exp2cliimp(exp);
3412         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3413         char *bufs[3] = { NULL, key, val };
3414         ENTRY;
3415
3416         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3417
3418         if (KEY_IS(KEY_NEXT_ID)) {
3419                 if (vallen != sizeof(obd_id))
3420                         RETURN(-EINVAL);
3421                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3422                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3423                        exp->exp_obd->obd_name,
3424                        obd->u.cli.cl_oscc.oscc_next_id);
3425
3426                 RETURN(0);
3427         }
3428
3429         if (KEY_IS("unlinked")) {
3430                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3431                 spin_lock(&oscc->oscc_lock);
3432                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3433                 spin_unlock(&oscc->oscc_lock);
3434                 RETURN(0);
3435         }
3436
3437         if (KEY_IS(KEY_INIT_RECOV)) {
3438                 if (vallen != sizeof(int))
3439                         RETURN(-EINVAL);
3440                 spin_lock(&imp->imp_lock);
3441                 imp->imp_initial_recov = *(int *)val;
3442                 spin_unlock(&imp->imp_lock);
3443                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3444                        exp->exp_obd->obd_name,
3445                        imp->imp_initial_recov);
3446                 RETURN(0);
3447         }
3448
3449         if (KEY_IS("checksum")) {
3450                 if (vallen != sizeof(int))
3451                         RETURN(-EINVAL);
3452                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3453                 RETURN(0);
3454         }
3455
3456         if (KEY_IS(KEY_FLUSH_CTX)) {
3457                 sptlrpc_import_flush_my_ctx(imp);
3458                 RETURN(0);
3459         }
3460
3461         if (!set)
3462                 RETURN(-EINVAL);
3463
3464         /* We pass all other commands directly to OST. Since nobody calls osc
3465            methods directly and everybody is supposed to go through LOV, we
3466            assume lov checked invalid values for us.
3467            The only recognised values so far are evict_by_nid and mds_conn.
3468            Even if something bad goes through, we'd get a -EINVAL from OST
3469            anyway. */
3470
3471         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3472                               bufs);
3473         if (req == NULL)
3474                 RETURN(-ENOMEM);
3475
3476         if (KEY_IS(KEY_MDS_CONN)) {
3477                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3478
3479                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3480                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3481                 LASSERT(oscc->oscc_oa.o_gr > 0);
3482                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3483         }
3484
3485         ptlrpc_req_set_repsize(req, 1, NULL);
3486         ptlrpc_set_add_req(set, req);
3487         ptlrpc_check_set(set);
3488
3489         RETURN(0);
3490 }
3491
3492
3493 static struct llog_operations osc_size_repl_logops = {
3494         lop_cancel: llog_obd_repl_cancel
3495 };
3496
3497 static struct llog_operations osc_mds_ost_orig_logops;
3498 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3499                          struct obd_device *tgt, int count,
3500                          struct llog_catid *catid, struct obd_uuid *uuid)
3501 {
3502         int rc;
3503         ENTRY;
3504
3505         spin_lock(&obd->obd_dev_lock);
3506         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3507                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3508                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3509                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3510                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3511                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3512         }
3513         spin_unlock(&obd->obd_dev_lock);
3514
3515         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3516                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3517         if (rc) {
3518                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3519                 GOTO (out, rc);
3520         }
3521
3522         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3523                         &osc_size_repl_logops);
3524         if (rc)
3525                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3526 out:
3527         if (rc) {
3528                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3529                        obd->obd_name, tgt->obd_name, count, catid, rc);
3530                 CERROR("logid "LPX64":0x%x\n",
3531                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3532         }
3533         RETURN(rc);
3534 }
3535
3536 static int osc_llog_finish(struct obd_device *obd, int count)
3537 {
3538         struct llog_ctxt *ctxt;
3539         int rc = 0, rc2 = 0;
3540         ENTRY;
3541
3542         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3543         if (ctxt)
3544                 rc = llog_cleanup(ctxt);
3545
3546         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3547         if (ctxt)
3548                 rc2 = llog_cleanup(ctxt);
3549         if (!rc)
3550                 rc = rc2;
3551
3552         RETURN(rc);
3553 }
3554
3555 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3556                          struct obd_uuid *cluuid,
3557                          struct obd_connect_data *data)
3558 {
3559         struct client_obd *cli = &obd->u.cli;
3560
3561         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3562                 long lost_grant;
3563
3564                 client_obd_list_lock(&cli->cl_loi_list_lock);
3565                 data->ocd_grant = cli->cl_avail_grant ?:
3566                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3567                 lost_grant = cli->cl_lost_grant;
3568                 cli->cl_lost_grant = 0;
3569                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3570
3571                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3572                        "cl_lost_grant: %ld\n", data->ocd_grant,
3573                        cli->cl_avail_grant, lost_grant);
3574                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3575                        " ocd_grant: %d\n", data->ocd_connect_flags,
3576                        data->ocd_version, data->ocd_grant);
3577         }
3578
3579         RETURN(0);
3580 }
3581
3582 static int osc_disconnect(struct obd_export *exp)
3583 {
3584         struct obd_device *obd = class_exp2obd(exp);
3585         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3586         int rc;
3587
3588         if (obd->u.cli.cl_conn_count == 1)
3589                 /* flush any remaining cancel messages out to the target */
3590                 llog_sync(ctxt, exp);
3591
3592         rc = client_disconnect_export(exp);
3593         return rc;
3594 }
3595
3596 static int osc_import_event(struct obd_device *obd,
3597                             struct obd_import *imp,
3598                             enum obd_import_event event)
3599 {
3600         struct client_obd *cli;
3601         int rc = 0;
3602
3603         ENTRY;
3604         LASSERT(imp->imp_obd == obd);
3605
3606         switch (event) {
3607         case IMP_EVENT_DISCON: {
3608                 /* Only do this on the MDS OSC's */
3609                 if (imp->imp_server_timeout) {
3610                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3611
3612                         spin_lock(&oscc->oscc_lock);
3613                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3614                         spin_unlock(&oscc->oscc_lock);
3615                 }
3616                 cli = &obd->u.cli;
3617                 client_obd_list_lock(&cli->cl_loi_list_lock);
3618                 cli->cl_avail_grant = 0;
3619                 cli->cl_lost_grant = 0;
3620                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3621                 break;
3622         }
3623         case IMP_EVENT_INACTIVE: {
3624                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3625                 break;
3626         }
3627         case IMP_EVENT_INVALIDATE: {
3628                 struct ldlm_namespace *ns = obd->obd_namespace;
3629
3630                 /* Reset grants */
3631                 cli = &obd->u.cli;
3632                 client_obd_list_lock(&cli->cl_loi_list_lock);
3633                 /* all pages go to failing rpcs due to the invalid import */
3634                 osc_check_rpcs(cli);
3635                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3636
3637                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3638
3639                 break;
3640         }
3641         case IMP_EVENT_ACTIVE: {
3642                 /* Only do this on the MDS OSC's */
3643                 if (imp->imp_server_timeout) {
3644                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3645
3646                         spin_lock(&oscc->oscc_lock);
3647                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3648                         spin_unlock(&oscc->oscc_lock);
3649                 }
3650                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3651                 break;
3652         }
3653         case IMP_EVENT_OCD: {
3654                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3655
3656                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3657                         osc_init_grant(&obd->u.cli, ocd);
3658
3659                 /* See bug 7198 */
3660                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3661                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3662
3663                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3664                 break;
3665         }
3666         default:
3667                 CERROR("Unknown import event %d\n", event);
3668                 LBUG();
3669         }
3670         RETURN(rc);
3671 }
3672
3673 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3674 {
3675         int rc;
3676         ENTRY;
3677
3678         ENTRY;
3679         rc = ptlrpcd_addref();
3680         if (rc)
3681                 RETURN(rc);
3682
3683         rc = client_obd_setup(obd, lcfg);
3684         if (rc) {
3685                 ptlrpcd_decref();
3686         } else {
3687                 struct lprocfs_static_vars lvars;
3688                 struct client_obd *cli = &obd->u.cli;
3689
3690                 lprocfs_init_vars(osc, &lvars);
3691                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3692                         lproc_osc_attach_seqstat(obd);
3693                         ptlrpc_lprocfs_register_obd(obd);
3694                 }
3695
3696                 oscc_init(obd);
3697                 /* We need to allocate a few requests more, because
3698                    brw_interpret_oap tries to create new requests before freeing
3699                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3700                    reserved, but I afraid that might be too much wasted RAM
3701                    in fact, so 2 is just my guess and still should work. */
3702                 cli->cl_import->imp_rq_pool =
3703                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3704                                             OST_MAXREQSIZE,
3705                                             ptlrpc_add_rqs_to_pool);
3706         }
3707
3708         RETURN(rc);
3709 }
3710
3711 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3712 {
3713         int rc = 0;
3714         ENTRY;
3715
3716         switch (stage) {
3717         case OBD_CLEANUP_EARLY: {
3718                 struct obd_import *imp;
3719                 imp = obd->u.cli.cl_import;
3720                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3721                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3722                 ptlrpc_deactivate_import(imp);
3723                 spin_lock(&imp->imp_lock);
3724                 imp->imp_pingable = 0;
3725                 spin_unlock(&imp->imp_lock);
3726                 break;
3727         }
3728         case OBD_CLEANUP_EXPORTS: {
3729                 /* If we set up but never connected, the
3730                    client import will not have been cleaned. */
3731                 if (obd->u.cli.cl_import) {
3732                         struct obd_import *imp;
3733                         imp = obd->u.cli.cl_import;
3734                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3735                                obd->obd_name);
3736                         ptlrpc_invalidate_import(imp);
3737                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3738                         class_destroy_import(imp);
3739                         obd->u.cli.cl_import = NULL;
3740                 }
3741                 break;
3742         }
3743         case OBD_CLEANUP_SELF_EXP:
3744                 rc = obd_llog_finish(obd, 0);
3745                 if (rc != 0)
3746                         CERROR("failed to cleanup llogging subsystems\n");
3747                 break;
3748         case OBD_CLEANUP_OBD:
3749                 break;
3750         }
3751         RETURN(rc);
3752 }
3753
3754 int osc_cleanup(struct obd_device *obd)
3755 {
3756         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3757         int rc;
3758
3759         ENTRY;
3760         ptlrpc_lprocfs_unregister_obd(obd);
3761         lprocfs_obd_cleanup(obd);
3762
3763         spin_lock(&oscc->oscc_lock);
3764         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3765         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3766         spin_unlock(&oscc->oscc_lock);
3767
3768         /* free memory of osc quota cache */
3769         lquota_cleanup(quota_interface, obd);
3770
3771         rc = client_obd_cleanup(obd);
3772
3773         ptlrpcd_decref();
3774         RETURN(rc);
3775 }
3776
3777 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3778 {
3779         struct lustre_cfg *lcfg = buf;
3780         struct lprocfs_static_vars lvars;
3781         int rc = 0;
3782
3783         lprocfs_init_vars(osc, &lvars);
3784
3785         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3786         return(rc);
3787 }
3788
3789 struct obd_ops osc_obd_ops = {
3790         .o_owner                = THIS_MODULE,
3791         .o_setup                = osc_setup,
3792         .o_precleanup           = osc_precleanup,
3793         .o_cleanup              = osc_cleanup,
3794         .o_add_conn             = client_import_add_conn,
3795         .o_del_conn             = client_import_del_conn,
3796         .o_connect              = client_connect_import,
3797         .o_reconnect            = osc_reconnect,
3798         .o_disconnect           = osc_disconnect,
3799         .o_statfs               = osc_statfs,
3800         .o_statfs_async         = osc_statfs_async,
3801         .o_packmd               = osc_packmd,
3802         .o_unpackmd             = osc_unpackmd,
3803         .o_precreate            = osc_precreate,
3804         .o_create               = osc_create,
3805         .o_destroy              = osc_destroy,
3806         .o_getattr              = osc_getattr,
3807         .o_getattr_async        = osc_getattr_async,
3808         .o_setattr              = osc_setattr,
3809         .o_setattr_async        = osc_setattr_async,
3810         .o_brw                  = osc_brw,
3811         .o_brw_async            = osc_brw_async,
3812         .o_prep_async_page      = osc_prep_async_page,
3813         .o_queue_async_io       = osc_queue_async_io,
3814         .o_set_async_flags      = osc_set_async_flags,
3815         .o_queue_group_io       = osc_queue_group_io,
3816         .o_trigger_group_io     = osc_trigger_group_io,
3817         .o_teardown_async_page  = osc_teardown_async_page,
3818         .o_punch                = osc_punch,
3819         .o_sync                 = osc_sync,
3820         .o_enqueue              = osc_enqueue,
3821         .o_match                = osc_match,
3822         .o_change_cbdata        = osc_change_cbdata,
3823         .o_cancel               = osc_cancel,
3824         .o_cancel_unused        = osc_cancel_unused,
3825         .o_join_lru             = osc_join_lru,
3826         .o_iocontrol            = osc_iocontrol,
3827         .o_get_info             = osc_get_info,
3828         .o_set_info_async       = osc_set_info_async,
3829         .o_import_event         = osc_import_event,
3830         .o_llog_init            = osc_llog_init,
3831         .o_llog_finish          = osc_llog_finish,
3832         .o_process_config       = osc_process_config,
3833 };
3834 int __init osc_init(void)
3835 {
3836         struct lprocfs_static_vars lvars;
3837         int rc;
3838         ENTRY;
3839
3840         lprocfs_init_vars(osc, &lvars);
3841
3842         request_module("lquota");
3843         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3844         lquota_init(quota_interface);
3845         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3846
3847         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3848                                  LUSTRE_OSC_NAME, NULL);
3849         if (rc) {
3850                 if (quota_interface)
3851                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3852                 RETURN(rc);
3853         }
3854
3855         RETURN(rc);
3856 }
3857
3858 #ifdef __KERNEL__
3859 static void /*__exit*/ osc_exit(void)
3860 {
3861         lquota_exit(quota_interface);
3862         if (quota_interface)
3863                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3864
3865         class_unregister_type(LUSTRE_OSC_NAME);
3866 }
3867
3868 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3869 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3870 MODULE_LICENSE("GPL");
3871
3872 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3873 #endif