Whamcloud - gitweb
9e6fec39c6fb2f3ca5fa2194edec93a15d4e8d74
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
68
69 /* by default 10s */
70 atomic_t osc_resend_time; 
71
72 /* Pack OSC object metadata for disk storage (LE byte order). */
73 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
74                       struct lov_stripe_md *lsm)
75 {
76         int lmm_size;
77         ENTRY;
78
79         lmm_size = sizeof(**lmmp);
80         if (!lmmp)
81                 RETURN(lmm_size);
82
83         if (*lmmp && !lsm) {
84                 OBD_FREE(*lmmp, lmm_size);
85                 *lmmp = NULL;
86                 RETURN(0);
87         }
88
89         if (!*lmmp) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (!*lmmp)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm) {
96                 LASSERT(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 LASSERT((*lsmp)->lsm_object_id);
151         }
152
153         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
154
155         RETURN(lsm_size);
156 }
157
158 static int osc_getattr_interpret(struct ptlrpc_request *req,
159                                  struct osc_async_args *aa, int rc)
160 {
161         struct ost_body *body;
162         ENTRY;
163
164         if (rc != 0)
165                 GOTO(out, rc);
166
167         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
168                                   lustre_swab_ost_body);
169         if (body) {
170                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
172
173                 /* This should really be sent by the OST */
174                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
175                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
176         } else {
177                 CERROR("can't unpack ost_body\n");
178                 rc = -EPROTO;
179                 aa->aa_oi->oi_oa->o_valid = 0;
180         }
181 out:
182         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
183         RETURN(rc);
184 }
185
186 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
187                              struct ptlrpc_request_set *set)
188 {
189         struct ptlrpc_request *req;
190         struct ost_body *body;
191         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
192         struct osc_async_args *aa;
193         ENTRY;
194
195         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
196                               OST_GETATTR, 2, size,NULL);
197         if (!req)
198                 RETURN(-ENOMEM);
199
200         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
201         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
202
203         ptlrpc_req_set_repsize(req, 2, size);
204         req->rq_interpret_reply = osc_getattr_interpret;
205
206         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
207         aa = (struct osc_async_args *)&req->rq_async_args;
208         aa->aa_oi = oinfo;
209
210         ptlrpc_set_add_req(set, req);
211         RETURN (0);
212 }
213
214 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
215 {
216         struct ptlrpc_request *req;
217         struct ost_body *body;
218         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219         ENTRY;
220
221         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
222                               OST_GETATTR, 2, size, NULL);
223         if (!req)
224                 RETURN(-ENOMEM);
225
226         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
227         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
228
229         ptlrpc_req_set_repsize(req, 2, size);
230
231         rc = ptlrpc_queue_wait(req);
232         if (rc) {
233                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
234                 GOTO(out, rc);
235         }
236
237         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
238                                   lustre_swab_ost_body);
239         if (body == NULL) {
240                 CERROR ("can't unpack ost_body\n");
241                 GOTO (out, rc = -EPROTO);
242         }
243
244         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
246
247         /* This should really be sent by the OST */
248         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
249         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
250
251         EXIT;
252  out:
253         ptlrpc_req_finished(req);
254         return rc;
255 }
256
257 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
258                        struct obd_trans_info *oti)
259 {
260         struct ptlrpc_request *req;
261         struct ost_body *body;
262         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
263         ENTRY;
264
265         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
266                               OST_SETATTR, 2, size, NULL);
267         if (!req)
268                 RETURN(-ENOMEM);
269
270         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
271         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
272
273         ptlrpc_req_set_repsize(req, 2, size);
274
275         rc = ptlrpc_queue_wait(req);
276         if (rc)
277                 GOTO(out, rc);
278
279         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
280                                   lustre_swab_ost_body);
281         if (body == NULL)
282                 GOTO(out, rc = -EPROTO);
283
284         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
285
286         EXIT;
287 out:
288         ptlrpc_req_finished(req);
289         RETURN(rc);
290 }
291
292 static int osc_setattr_interpret(struct ptlrpc_request *req,
293                                  struct osc_async_args *aa, int rc)
294 {
295         struct ost_body *body;
296         ENTRY;
297
298         if (rc != 0)
299                 GOTO(out, rc);
300
301         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
302                                   lustre_swab_ost_body);
303         if (body == NULL) {
304                 CERROR("can't unpack ost_body\n");
305                 GOTO(out, rc = -EPROTO);
306         }
307
308         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
309 out:
310         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
311         RETURN(rc);
312 }
313
314 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
315                              struct obd_trans_info *oti,
316                              struct ptlrpc_request_set *rqset)
317 {
318         struct ptlrpc_request *req;
319         struct ost_body *body;
320         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
321         struct osc_async_args *aa;
322         ENTRY;
323
324         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
325                               OST_SETATTR, 2, size, NULL);
326         if (!req)
327                 RETURN(-ENOMEM);
328
329         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
330
331         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
332                 LASSERT(oti);
333                 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
334                        sizeof(*oti->oti_logcookies));
335         }
336
337         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
338         ptlrpc_req_set_repsize(req, 2, size);
339         /* do mds to ost setattr asynchronouly */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req);
343         } else {
344                 req->rq_interpret_reply = osc_setattr_interpret;
345
346                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
347                 aa = (struct osc_async_args *)&req->rq_async_args;
348                 aa->aa_oi = oinfo;
349
350                 ptlrpc_set_add_req(rqset, req);
351         }
352
353         RETURN(0);
354 }
355
356 int osc_real_create(struct obd_export *exp, struct obdo *oa,
357                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
358 {
359         struct ptlrpc_request *req;
360         struct ost_body *body;
361         struct lov_stripe_md *lsm;
362         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
363         ENTRY;
364
365         LASSERT(oa);
366         LASSERT(ea);
367
368         lsm = *ea;
369         if (!lsm) {
370                 rc = obd_alloc_memmd(exp, &lsm);
371                 if (rc < 0)
372                         RETURN(rc);
373         }
374
375         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
376                               OST_CREATE, 2, size, NULL);
377         if (!req)
378                 GOTO(out, rc = -ENOMEM);
379
380         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
381         memcpy(&body->oa, oa, sizeof(body->oa));
382
383         ptlrpc_req_set_repsize(req, 2, size);
384         if (oa->o_valid & OBD_MD_FLINLINE) {
385                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
386                         oa->o_flags == OBD_FL_DELORPHAN);
387                 DEBUG_REQ(D_HA, req,
388                           "delorphan from OST integration");
389                 /* Don't resend the delorphan req */
390                 req->rq_no_resend = req->rq_no_delay = 1;
391         }
392
393         rc = ptlrpc_queue_wait(req);
394         if (rc)
395                 GOTO(out_req, rc);
396
397         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398                                   lustre_swab_ost_body);
399         if (body == NULL) {
400                 CERROR ("can't unpack ost_body\n");
401                 GOTO (out_req, rc = -EPROTO);
402         }
403
404         memcpy(oa, &body->oa, sizeof(*oa));
405
406         /* This should really be sent by the OST */
407         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408         oa->o_valid |= OBD_MD_FLBLKSZ;
409
410         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411          * have valid lsm_oinfo data structs, so don't go touching that.
412          * This needs to be fixed in a big way.
413          */
414         lsm->lsm_object_id = oa->o_id;
415         *ea = lsm;
416
417         if (oti != NULL) {
418                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
419
420                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421                         if (!oti->oti_logcookies)
422                                 oti_alloc_cookies(oti, 1);
423                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
424                                sizeof(oti->oti_onecookie));
425                 }
426         }
427
428         CDEBUG(D_HA, "transno: "LPD64"\n",
429                lustre_msg_get_transno(req->rq_repmsg));
430 out_req:
431         ptlrpc_req_finished(req);
432 out:
433         if (rc && !*ea)
434                 obd_free_memmd(exp, &lsm);
435         RETURN(rc);
436 }
437
438 static int osc_punch_interpret(struct ptlrpc_request *req,
439                                struct osc_async_args *aa, int rc)
440 {
441         struct ost_body *body;
442         ENTRY;
443
444         if (rc != 0)
445                 GOTO(out, rc);
446
447         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
448                                   lustre_swab_ost_body);
449         if (body == NULL) {
450                 CERROR ("can't unpack ost_body\n");
451                 GOTO(out, rc = -EPROTO);
452         }
453
454         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
455 out:
456         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
457         RETURN(rc);
458 }
459
460 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
461                      struct obd_trans_info *oti,
462                      struct ptlrpc_request_set *rqset)
463 {
464         struct ptlrpc_request *req;
465         struct osc_async_args *aa;
466         struct ost_body *body;
467         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
468         ENTRY;
469
470         if (!oinfo->oi_oa) {
471                 CERROR("oa NULL\n");
472                 RETURN(-EINVAL);
473         }
474
475         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
476                               OST_PUNCH, 2, size, NULL);
477         if (!req)
478                 RETURN(-ENOMEM);
479
480         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
481
482         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
484
485         /* overload the size and blocks fields in the oa with start/end */
486         body->oa.o_size = oinfo->oi_policy.l_extent.start;
487         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
489
490         ptlrpc_req_set_repsize(req, 2, size);
491
492         req->rq_interpret_reply = osc_punch_interpret;
493         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494         aa = (struct osc_async_args *)&req->rq_async_args;
495         aa->aa_oi = oinfo;
496         ptlrpc_set_add_req(rqset, req);
497
498         RETURN(0);
499 }
500
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502                     struct lov_stripe_md *md, obd_size start, obd_size end)
503 {
504         struct ptlrpc_request *req;
505         struct ost_body *body;
506         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
507         ENTRY;
508
509         if (!oa) {
510                 CERROR("oa NULL\n");
511                 RETURN(-EINVAL);
512         }
513
514         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515                               OST_SYNC, 2, size, NULL);
516         if (!req)
517                 RETURN(-ENOMEM);
518
519         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520         memcpy(&body->oa, oa, sizeof(*oa));
521
522         /* overload the size and blocks fields in the oa with start/end */
523         body->oa.o_size = start;
524         body->oa.o_blocks = end;
525         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
526
527         ptlrpc_req_set_repsize(req, 2, size);
528
529         rc = ptlrpc_queue_wait(req);
530         if (rc)
531                 GOTO(out, rc);
532
533         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534                                   lustre_swab_ost_body);
535         if (body == NULL) {
536                 CERROR ("can't unpack ost_body\n");
537                 GOTO (out, rc = -EPROTO);
538         }
539
540         memcpy(oa, &body->oa, sizeof(*oa));
541
542         EXIT;
543  out:
544         ptlrpc_req_finished(req);
545         return rc;
546 }
547
548 /* Find and cancel locally locks matched by @mode in the resource found by
549  * @objid. Found locks are added into @cancel list. Returns the amount of
550  * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552                                    struct list_head *cancels, ldlm_mode_t mode,
553                                    int lock_flags)
554 {
555         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556         struct ldlm_res_id res_id = { .name = { objid } };
557         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
558         int count;
559         ENTRY;
560
561         if (res == NULL)
562                 RETURN(0);
563
564         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565                                            lock_flags, 0, NULL);
566         ldlm_resource_putref(res);
567         RETURN(count);
568 }
569
570 /* Destroy requests can be async always on the client, and we don't even really
571  * care about the return code since the client cannot do anything at all about
572  * a destroy failure.
573  * When the MDS is unlinking a filename, it saves the file objects into a
574  * recovery llog, and these object records are cancelled when the OST reports
575  * they were destroyed and sync'd to disk (i.e. transaction committed).
576  * If the client dies, or the OST is down when the object should be destroyed,
577  * the records are not cancelled, and when the OST reconnects to the MDS next,
578  * it will retrieve the llog unlink logs and then sends the log cancellation
579  * cookies to the MDS after committing destroy transactions. */
580 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
581                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
582                        struct obd_export *md_export)
583 {
584         CFS_LIST_HEAD(cancels);
585         struct ptlrpc_request *req;
586         struct ost_body *body;
587         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
588         int count, bufcount = 2;
589         ENTRY;
590
591         if (!oa) {
592                 CERROR("oa NULL\n");
593                 RETURN(-EINVAL);
594         }
595
596         count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
597                                         LDLM_FL_DISCARD_DATA);
598         if (exp_connect_cancelset(exp) && count) {
599                 bufcount = 3;
600                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
601         }
602         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603                               OST_DESTROY, bufcount, size, NULL);
604         if (exp_connect_cancelset(exp) && req)
605                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1);
606         else
607                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
608
609         if (!req)
610                 RETURN(-ENOMEM);
611
612         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
613
614         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
615
616         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
617                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
618                        sizeof(*oti->oti_logcookies));
619         }
620
621         memcpy(&body->oa, oa, sizeof(*oa));
622         ptlrpc_req_set_repsize(req, 2, size);
623
624         ptlrpcd_add_req(req);
625         RETURN(0);
626 }
627
628 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
629                                 long writing_bytes)
630 {
631         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
632
633         LASSERT(!(oa->o_valid & bits));
634
635         oa->o_valid |= bits;
636         client_obd_list_lock(&cli->cl_loi_list_lock);
637         oa->o_dirty = cli->cl_dirty;
638         if (cli->cl_dirty > cli->cl_dirty_max) {
639                 CERROR("dirty %lu > dirty_max %lu\n",
640                        cli->cl_dirty, cli->cl_dirty_max);
641                 oa->o_undirty = 0;
642         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
643                 CERROR("dirty %d > system dirty_max %d\n",
644                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
645                 oa->o_undirty = 0;
646         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
647                 CERROR("dirty %lu - dirty_max %lu too big???\n",
648                        cli->cl_dirty, cli->cl_dirty_max);
649                 oa->o_undirty = 0;
650         } else {
651                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
652                                 (cli->cl_max_rpcs_in_flight + 1);
653                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
654         }
655         oa->o_grant = cli->cl_avail_grant;
656         oa->o_dropped = cli->cl_lost_grant;
657         cli->cl_lost_grant = 0;
658         client_obd_list_unlock(&cli->cl_loi_list_lock);
659         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
660                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
661 }
662
663 /* caller must hold loi_list_lock */
664 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
665 {
666         atomic_inc(&obd_dirty_pages);
667         cli->cl_dirty += CFS_PAGE_SIZE;
668         cli->cl_avail_grant -= CFS_PAGE_SIZE;
669         pga->flag |= OBD_BRW_FROM_GRANT;
670         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
671                CFS_PAGE_SIZE, pga, pga->pg);
672         LASSERT(cli->cl_avail_grant >= 0);
673 }
674
675 /* the companion to osc_consume_write_grant, called when a brw has completed.
676  * must be called with the loi lock held. */
677 static void osc_release_write_grant(struct client_obd *cli,
678                                     struct brw_page *pga, int sent)
679 {
680         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
681         ENTRY;
682
683         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
684                 EXIT;
685                 return;
686         }
687
688         pga->flag &= ~OBD_BRW_FROM_GRANT;
689         atomic_dec(&obd_dirty_pages);
690         cli->cl_dirty -= CFS_PAGE_SIZE;
691         if (!sent) {
692                 cli->cl_lost_grant += CFS_PAGE_SIZE;
693                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
694                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
695         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
696                 /* For short writes we shouldn't count parts of pages that
697                  * span a whole block on the OST side, or our accounting goes
698                  * wrong.  Should match the code in filter_grant_check. */
699                 int offset = pga->off & ~CFS_PAGE_MASK;
700                 int count = pga->count + (offset & (blocksize - 1));
701                 int end = (offset + pga->count) & (blocksize - 1);
702                 if (end)
703                         count += blocksize - end;
704
705                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
706                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
707                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
708                        cli->cl_avail_grant, cli->cl_dirty);
709         }
710
711         EXIT;
712 }
713
714 static unsigned long rpcs_in_flight(struct client_obd *cli)
715 {
716         return cli->cl_r_in_flight + cli->cl_w_in_flight;
717 }
718
719 /* caller must hold loi_list_lock */
720 void osc_wake_cache_waiters(struct client_obd *cli)
721 {
722         struct list_head *l, *tmp;
723         struct osc_cache_waiter *ocw;
724
725         ENTRY;
726         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
727                 /* if we can't dirty more, we must wait until some is written */
728                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
729                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
730                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
731                                "osc max %ld, sys max %d\n", cli->cl_dirty,
732                                cli->cl_dirty_max, obd_max_dirty_pages);
733                         return;
734                 }
735
736                 /* if still dirty cache but no grant wait for pending RPCs that
737                  * may yet return us some grant before doing sync writes */
738                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
739                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
740                                cli->cl_w_in_flight);
741                         return;
742                 }
743
744                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
745                 list_del_init(&ocw->ocw_entry);
746                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
747                         /* no more RPCs in flight to return grant, do sync IO */
748                         ocw->ocw_rc = -EDQUOT;
749                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
750                 } else {
751                         osc_consume_write_grant(cli,
752                                                 &ocw->ocw_oap->oap_brw_page);
753                 }
754
755                 cfs_waitq_signal(&ocw->ocw_waitq);
756         }
757
758         EXIT;
759 }
760
761 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
762 {
763         client_obd_list_lock(&cli->cl_loi_list_lock);
764         cli->cl_avail_grant = ocd->ocd_grant;
765         client_obd_list_unlock(&cli->cl_loi_list_lock);
766
767         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
768                cli->cl_avail_grant, cli->cl_lost_grant);
769         LASSERT(cli->cl_avail_grant >= 0);
770 }
771
772 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
773 {
774         client_obd_list_lock(&cli->cl_loi_list_lock);
775         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
776         if (body->oa.o_valid & OBD_MD_FLGRANT)
777                 cli->cl_avail_grant += body->oa.o_grant;
778         /* waiters are woken in brw_interpret_oap */
779         client_obd_list_unlock(&cli->cl_loi_list_lock);
780 }
781
782 /* We assume that the reason this OSC got a short read is because it read
783  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
784  * via the LOV, and it _knows_ it's reading inside the file, it's just that
785  * this stripe never got written at or beyond this stripe offset yet. */
786 static void handle_short_read(int nob_read, obd_count page_count,
787                               struct brw_page **pga)
788 {
789         char *ptr;
790         int i = 0;
791
792         /* skip bytes read OK */
793         while (nob_read > 0) {
794                 LASSERT (page_count > 0);
795
796                 if (pga[i]->count > nob_read) {
797                         /* EOF inside this page */
798                         ptr = cfs_kmap(pga[i]->pg) + 
799                                 (pga[i]->off & ~CFS_PAGE_MASK);
800                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
801                         cfs_kunmap(pga[i]->pg);
802                         page_count--;
803                         i++;
804                         break;
805                 }
806
807                 nob_read -= pga[i]->count;
808                 page_count--;
809                 i++;
810         }
811
812         /* zero remaining pages */
813         while (page_count-- > 0) {
814                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
815                 memset(ptr, 0, pga[i]->count);
816                 cfs_kunmap(pga[i]->pg);
817                 i++;
818         }
819 }
820
821 static int check_write_rcs(struct ptlrpc_request *req,
822                            int requested_nob, int niocount,
823                            obd_count page_count, struct brw_page **pga)
824 {
825         int    *remote_rcs, i;
826
827         /* return error if any niobuf was in error */
828         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
829                                         sizeof(*remote_rcs) * niocount, NULL);
830         if (remote_rcs == NULL) {
831                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
832                 return(-EPROTO);
833         }
834         if (lustre_msg_swabbed(req->rq_repmsg))
835                 for (i = 0; i < niocount; i++)
836                         __swab32s(&remote_rcs[i]);
837
838         for (i = 0; i < niocount; i++) {
839                 if (remote_rcs[i] < 0)
840                         return(remote_rcs[i]);
841
842                 if (remote_rcs[i] != 0) {
843                         CERROR("rc[%d] invalid (%d) req %p\n",
844                                 i, remote_rcs[i], req);
845                         return(-EPROTO);
846                 }
847         }
848
849         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
850                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
851                        requested_nob, req->rq_bulk->bd_nob_transferred);
852                 return(-EPROTO);
853         }
854
855         return (0);
856 }
857
858 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
859 {
860         if (p1->flag != p2->flag) {
861                 unsigned mask = ~OBD_BRW_FROM_GRANT;
862
863                 /* warn if we try to combine flags that we don't know to be
864                  * safe to combine */
865                 if ((p1->flag & mask) != (p2->flag & mask))
866                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
867                                "same brw?\n", p1->flag, p2->flag);
868                 return 0;
869         }
870
871         return (p1->off + p1->count == p2->off);
872 }
873
874 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
875                                    struct brw_page **pga, int opc)
876 {
877         __u32 cksum = ~0;
878         int i = 0;
879
880         LASSERT (pg_count > 0);
881         while (nob > 0 && pg_count > 0) {
882                 char *ptr = cfs_kmap(pga[i]->pg);
883                 int off = pga[i]->off & ~CFS_PAGE_MASK;
884                 int count = pga[i]->count > nob ? nob : pga[i]->count;
885
886                 /* corrupt the data before we compute the checksum, to
887                  * simulate an OST->client data error */
888                 if (i == 0 && opc == OST_READ &&
889                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
890                         memcpy(ptr + off, "bad1", min(4, nob));
891                 cksum = crc32_le(cksum, ptr + off, count);
892                 cfs_kunmap(pga[i]->pg);
893                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
894                                off, cksum);
895
896                 nob -= pga[i]->count;
897                 pg_count--;
898                 i++;
899         }
900         /* For sending we only compute the wrong checksum instead
901          * of corrupting the data so it is still correct on a redo */
902         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
903                 cksum++;
904
905         return cksum;
906 }
907
908 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
909                                 struct lov_stripe_md *lsm, obd_count page_count,
910                                 struct brw_page **pga,
911                                 struct ptlrpc_request **reqp)
912 {
913         struct ptlrpc_request   *req;
914         struct ptlrpc_bulk_desc *desc;
915         struct ost_body         *body;
916         struct obd_ioobj        *ioobj;
917         struct niobuf_remote    *niobuf;
918         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
919         int niocount, i, requested_nob, opc, rc;
920         struct ptlrpc_request_pool *pool;
921         struct osc_brw_async_args *aa;
922         struct brw_page *pg_prev;
923
924         ENTRY;
925         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
926         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
927
928         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
929         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
930
931         for (niocount = i = 1; i < page_count; i++) {
932                 if (!can_merge_pages(pga[i - 1], pga[i]))
933                         niocount++;
934         }
935
936         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
937         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
938
939         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
940                                    NULL, pool);
941         if (req == NULL)
942                 RETURN (-ENOMEM);
943
944         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
945
946         if (opc == OST_WRITE)
947                 desc = ptlrpc_prep_bulk_imp (req, page_count,
948                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
949         else
950                 desc = ptlrpc_prep_bulk_imp (req, page_count,
951                                              BULK_PUT_SINK, OST_BULK_PORTAL);
952         if (desc == NULL)
953                 GOTO(out, rc = -ENOMEM);
954         /* NB request now owns desc and will free it when it gets freed */
955
956         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
957         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
958         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
959                                 niocount * sizeof(*niobuf));
960
961         memcpy(&body->oa, oa, sizeof(*oa));
962
963         obdo_to_ioobj(oa, ioobj);
964         ioobj->ioo_bufcnt = niocount;
965
966         LASSERT (page_count > 0);
967         pg_prev = pga[0];
968         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
969                 struct brw_page *pg = pga[i];
970
971                 LASSERT(pg->count > 0);
972                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
973                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
974                          pg->off, pg->count);
975 #ifdef __LINUX__
976                 LASSERTF(i == 0 || pg->off > pg_prev->off,
977                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
978                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
979                          i, page_count,
980                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
981                          pg_prev->pg, page_private(pg_prev->pg),
982                          pg_prev->pg->index, pg_prev->off);
983 #else
984                 LASSERTF(i == 0 || pg->off > pg_prev->off,
985                          "i %d p_c %u\n", i, page_count);
986 #endif
987                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
988                         (pg->flag & OBD_BRW_SRVLOCK));
989
990                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
991                                       pg->count);
992                 requested_nob += pg->count;
993
994                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
995                         niobuf--;
996                         niobuf->len += pg->count;
997                 } else {
998                         niobuf->offset = pg->off;
999                         niobuf->len    = pg->count;
1000                         niobuf->flags  = pg->flag;
1001                 }
1002                 pg_prev = pg;
1003         }
1004
1005         LASSERTF((void *)(niobuf - niocount) ==
1006                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1007                                niocount * sizeof(*niobuf)),
1008                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, 
1009                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), 
1010                 (void *)(niobuf - niocount));
1011
1012         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1013
1014         /* size[REQ_REC_OFF] still sizeof (*body) */
1015         if (opc == OST_WRITE) {
1016                 if (unlikely(cli->cl_checksum)) {
1017                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1018                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1019                                                              page_count, pga,
1020                                                              OST_WRITE);
1021                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1022                                body->oa.o_cksum);
1023                         /* save this in 'oa', too, for later checking */
1024                         oa->o_valid |= OBD_MD_FLCKSUM;
1025                 } else {
1026                         /* clear out the checksum flag, in case this is a
1027                          * resend but cl_checksum is no longer set. b=11238 */
1028                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1029                 }
1030                 oa->o_cksum = body->oa.o_cksum;
1031                 /* 1 RC per niobuf */
1032                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1033                 ptlrpc_req_set_repsize(req, 3, size);
1034         } else {
1035                 if (unlikely(cli->cl_checksum))
1036                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1037                 /* 1 RC for the whole I/O */
1038                 ptlrpc_req_set_repsize(req, 2, size);
1039         }
1040
1041         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1042         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1043         aa->aa_oa = oa;
1044         aa->aa_requested_nob = requested_nob;
1045         aa->aa_nio_count = niocount;
1046         aa->aa_page_count = page_count;
1047         aa->aa_resends = 0;
1048         aa->aa_ppga = pga;
1049         aa->aa_cli = cli;
1050         INIT_LIST_HEAD(&aa->aa_oaps);
1051
1052         *reqp = req;
1053         RETURN (0);
1054
1055  out:
1056         ptlrpc_req_finished (req);
1057         RETURN (rc);
1058 }
1059
1060 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1061                                  __u32 client_cksum, __u32 server_cksum, int nob,
1062                                  obd_count page_count, struct brw_page **pga)
1063 {
1064         __u32 new_cksum;
1065         char *msg;
1066
1067         if (server_cksum == client_cksum) {
1068                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1069                 return 0;
1070         }
1071
1072         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1073
1074         if (new_cksum == server_cksum)
1075                 msg = "changed on the client after we checksummed it - "
1076                       "likely false positive due to mmap IO (bug 11742)";
1077         else if (new_cksum == client_cksum)
1078                 msg = "changed in transit before arrival at OST";
1079         else
1080                 msg = "changed in transit AND doesn't match the original - "
1081                       "likely false positive due to mmap IO (bug 11742)";
1082
1083         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1084                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1085                            "["LPU64"-"LPU64"]\n",
1086                            msg, libcfs_nid2str(peer->nid),
1087                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1088                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1089                                                         (__u64)0,
1090                            oa->o_id,
1091                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1092                            pga[0]->off,
1093                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1094         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1095                client_cksum, server_cksum, new_cksum);
1096
1097         return 1;
1098 }
1099
1100 /* Note rc enters this function as number of bytes transferred */
1101 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1102 {
1103         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1104         const lnet_process_id_t *peer =
1105                         &req->rq_import->imp_connection->c_peer;
1106         struct client_obd *cli = aa->aa_cli;
1107         struct ost_body *body;
1108         __u32 client_cksum = 0;
1109         ENTRY;
1110
1111         if (rc < 0 && rc != -EDQUOT)
1112                 RETURN(rc);
1113
1114         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1115         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1116                                   lustre_swab_ost_body);
1117         if (body == NULL) {
1118                 CERROR ("Can't unpack body\n");
1119                 RETURN(-EPROTO);
1120         }
1121
1122         /* set/clear over quota flag for a uid/gid */
1123         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1124             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1125                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1126                              body->oa.o_gid, body->oa.o_valid,
1127                              body->oa.o_flags);
1128
1129         if (rc < 0)
1130                 RETURN(rc);
1131
1132         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1133                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1134
1135         osc_update_grant(cli, body);
1136
1137         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1138                 if (rc > 0) {
1139                         CERROR ("Unexpected +ve rc %d\n", rc);
1140                         RETURN(-EPROTO);
1141                 }
1142                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1143
1144                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1145                              client_cksum &&
1146                              check_write_checksum(&body->oa, peer, client_cksum,
1147                                                   body->oa.o_cksum,
1148                                                   aa->aa_requested_nob,
1149                                                   aa->aa_page_count,
1150                                                   aa->aa_ppga)))
1151                         RETURN(-EAGAIN);
1152
1153                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1154                                      aa->aa_page_count, aa->aa_ppga);
1155                 GOTO(out, rc);
1156         }
1157
1158         /* The rest of this function executes only for OST_READs */
1159         if (rc > aa->aa_requested_nob) {
1160                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1161                        aa->aa_requested_nob);
1162                 RETURN(-EPROTO);
1163         }
1164
1165         if (rc != req->rq_bulk->bd_nob_transferred) {
1166                 CERROR ("Unexpected rc %d (%d transferred)\n",
1167                         rc, req->rq_bulk->bd_nob_transferred);
1168                 return (-EPROTO);
1169         }
1170
1171         if (rc < aa->aa_requested_nob)
1172                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1173
1174         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1175                 static int cksum_counter;
1176                 __u32      server_cksum = body->oa.o_cksum;
1177                 char      *via;
1178                 char      *router;
1179
1180                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1181                                                  aa->aa_ppga, OST_READ);
1182
1183                 if (peer->nid == req->rq_bulk->bd_sender) {
1184                         via = router = "";
1185                 } else {
1186                         via = " via ";
1187                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1188                 }
1189                 
1190                 if (server_cksum == ~0 && rc > 0) {
1191                         CERROR("Protocol error: server %s set the 'checksum' "
1192                                "bit, but didn't send a checksum.  Not fatal, "
1193                                "but please tell CFS.\n",
1194                                libcfs_nid2str(peer->nid));
1195                 } else if (server_cksum != client_cksum) {
1196                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1197                                            "%s%s%s inum "LPU64"/"LPU64" object "
1198                                            LPU64"/"LPU64" extent "
1199                                            "["LPU64"-"LPU64"]\n",
1200                                            req->rq_import->imp_obd->obd_name,
1201                                            libcfs_nid2str(peer->nid),
1202                                            via, router,
1203                                            body->oa.o_valid & OBD_MD_FLFID ?
1204                                                 body->oa.o_fid : (__u64)0,
1205                                            body->oa.o_valid & OBD_MD_FLFID ?
1206                                                 body->oa.o_generation :(__u64)0,
1207                                            body->oa.o_id,
1208                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1209                                                 body->oa.o_gr : (__u64)0,
1210                                            aa->aa_ppga[0]->off,
1211                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1212                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1213                                                                         1);
1214                         CERROR("client %x, server %x\n",
1215                                client_cksum, server_cksum);
1216                         cksum_counter = 0;
1217                         aa->aa_oa->o_cksum = client_cksum;
1218                         rc = -EAGAIN;
1219                 } else {
1220                         cksum_counter++;
1221                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1222                         rc = 0;
1223                 }
1224         } else if (unlikely(client_cksum)) {
1225                 static int cksum_missed;
1226
1227                 cksum_missed++;
1228                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1229                         CERROR("Checksum %u requested from %s but not sent\n",
1230                                cksum_missed, libcfs_nid2str(peer->nid));
1231         } else {
1232                 rc = 0;
1233         }
1234 out:
1235         if (rc >= 0)
1236                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1237
1238         RETURN(rc);
1239 }
1240
1241 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1242                             struct lov_stripe_md *lsm,
1243                             obd_count page_count, struct brw_page **pga)
1244 {
1245         struct ptlrpc_request *request;
1246         int                    rc;
1247         cfs_waitq_t            waitq;
1248         int                    resends = 0;
1249         struct l_wait_info     lwi;
1250
1251         ENTRY;
1252         init_waitqueue_head(&waitq);
1253
1254 restart_bulk:
1255         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1256                                   page_count, pga, &request);
1257         if (rc != 0)
1258                 return (rc);
1259
1260         rc = ptlrpc_queue_wait(request);
1261
1262         if (rc == -ETIMEDOUT && request->rq_resend) {
1263                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1264                 ptlrpc_req_finished(request);
1265                 goto restart_bulk;
1266         }
1267
1268         rc = osc_brw_fini_request(request, rc);
1269
1270         ptlrpc_req_finished(request);
1271         if (osc_recoverable_error(rc)) {
1272                 resends++;
1273                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1274                         CERROR("too many resend retries, returning error\n");
1275                         RETURN(-EIO);
1276                 }
1277                 
1278                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1279                 l_wait_event(waitq, 0, &lwi);
1280
1281                 goto restart_bulk;
1282         }
1283         RETURN(rc);
1284 }
1285
1286 int osc_brw_redo_request(struct ptlrpc_request *request,
1287                          struct osc_brw_async_args *aa)
1288 {
1289         struct ptlrpc_request *new_req;
1290         struct ptlrpc_request_set *set = request->rq_set;
1291         struct osc_brw_async_args *new_aa;
1292         struct osc_async_page *oap;
1293         int rc = 0;
1294         ENTRY;
1295
1296         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1297                 CERROR("too many resend retries, returning error\n");
1298                 RETURN(-EIO);
1299         }
1300         
1301         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1302
1303         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1304                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1305                                   aa->aa_cli, aa->aa_oa,
1306                                   NULL /* lsm unused by osc currently */,
1307                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1308         if (rc)
1309                 RETURN(rc);
1310
1311         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1312    
1313         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1314                 if (oap->oap_request != NULL) {
1315                         LASSERTF(request == oap->oap_request,
1316                                  "request %p != oap_request %p\n",
1317                                  request, oap->oap_request);
1318                         if (oap->oap_interrupted) {
1319                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1320                                 ptlrpc_req_finished(new_req);                        
1321                                 RETURN(-EINTR);
1322                         }
1323                 }
1324         }
1325         /* New request takes over pga and oaps from old request.
1326          * Note that copying a list_head doesn't work, need to move it... */
1327         aa->aa_resends++;
1328         new_req->rq_interpret_reply = request->rq_interpret_reply;
1329         new_req->rq_async_args = request->rq_async_args;
1330         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1331
1332         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1333
1334         INIT_LIST_HEAD(&new_aa->aa_oaps);
1335         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1336         INIT_LIST_HEAD(&aa->aa_oaps);
1337
1338         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1339                 if (oap->oap_request) {
1340                         ptlrpc_req_finished(oap->oap_request);
1341                         oap->oap_request = ptlrpc_request_addref(new_req);
1342                 }
1343         }
1344
1345         /* use ptlrpc_set_add_req is safe because interpret functions work 
1346          * in check_set context. only one way exist with access to request 
1347          * from different thread got -EINTR - this way protected with 
1348          * cl_loi_list_lock */
1349         ptlrpc_set_add_req(set, new_req);
1350
1351         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1352
1353         DEBUG_REQ(D_INFO, new_req, "new request");
1354         RETURN(0);
1355 }
1356
1357 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1358 {
1359         struct osc_brw_async_args *aa = data;
1360         int                        i;
1361         ENTRY;
1362
1363         rc = osc_brw_fini_request(request, rc);
1364         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);   
1365         if (osc_recoverable_error(rc)) {
1366                 rc = osc_brw_redo_request(request, aa);
1367                 if (rc == 0)
1368                         RETURN(0);
1369         }
1370         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1371         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1372                 aa->aa_cli->cl_w_in_flight--;
1373         else
1374                 aa->aa_cli->cl_r_in_flight--;
1375
1376         for (i = 0; i < aa->aa_page_count; i++)
1377                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1378         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1379         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1380
1381         RETURN(rc);
1382 }
1383
1384 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1385                           struct lov_stripe_md *lsm, obd_count page_count,
1386                           struct brw_page **pga, struct ptlrpc_request_set *set)
1387 {
1388         struct ptlrpc_request     *request;
1389         struct client_obd         *cli = &exp->exp_obd->u.cli;
1390         int                        rc, i;
1391         struct osc_brw_async_args *aa;
1392         ENTRY;
1393
1394         /* Consume write credits even if doing a sync write -
1395          * otherwise we may run out of space on OST due to grant. */
1396         if (cmd == OBD_BRW_WRITE) {
1397                 client_obd_list_lock(&cli->cl_loi_list_lock);
1398                 for (i = 0; i < page_count; i++) {
1399                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1400                                 osc_consume_write_grant(cli, pga[i]);
1401                 }
1402                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1403         }
1404
1405         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1406                                   page_count, pga, &request);
1407
1408         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1409         if (cmd == OBD_BRW_READ) {
1410                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1411                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1412                 ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
1413         } else {
1414                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1415                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1416                                  cli->cl_w_in_flight);
1417                 ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
1418         }
1419
1420         if (rc == 0) {
1421                 request->rq_interpret_reply = brw_interpret;
1422                 ptlrpc_set_add_req(set, request);
1423                 client_obd_list_lock(&cli->cl_loi_list_lock);
1424                 if (cmd == OBD_BRW_READ)
1425                         cli->cl_r_in_flight++;
1426                 else
1427                         cli->cl_w_in_flight++;
1428                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1429         } else if (cmd == OBD_BRW_WRITE) {
1430                 client_obd_list_lock(&cli->cl_loi_list_lock);
1431                 for (i = 0; i < page_count; i++)
1432                         osc_release_write_grant(cli, pga[i], 0);
1433                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1434         }
1435
1436         RETURN (rc);
1437 }
1438
1439 /*
1440  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1441  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1442  * fine for our small page arrays and doesn't require allocation.  its an
1443  * insertion sort that swaps elements that are strides apart, shrinking the
1444  * stride down until its '1' and the array is sorted.
1445  */
1446 static void sort_brw_pages(struct brw_page **array, int num)
1447 {
1448         int stride, i, j;
1449         struct brw_page *tmp;
1450
1451         if (num == 1)
1452                 return;
1453         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1454                 ;
1455
1456         do {
1457                 stride /= 3;
1458                 for (i = stride ; i < num ; i++) {
1459                         tmp = array[i];
1460                         j = i;
1461                         while (j >= stride && array[j-stride]->off > tmp->off) {
1462                                 array[j] = array[j - stride];
1463                                 j -= stride;
1464                         }
1465                         array[j] = tmp;
1466                 }
1467         } while (stride > 1);
1468 }
1469
1470 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1471 {
1472         int count = 1;
1473         int offset;
1474         int i = 0;
1475
1476         LASSERT (pages > 0);
1477         offset = pg[i]->off & (~CFS_PAGE_MASK);
1478
1479         for (;;) {
1480                 pages--;
1481                 if (pages == 0)         /* that's all */
1482                         return count;
1483
1484                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1485                         return count;   /* doesn't end on page boundary */
1486
1487                 i++;
1488                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1489                 if (offset != 0)        /* doesn't start on page boundary */
1490                         return count;
1491
1492                 count++;
1493         }
1494 }
1495
1496 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1497 {
1498         struct brw_page **ppga;
1499         int i;
1500
1501         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1502         if (ppga == NULL)
1503                 return NULL;
1504
1505         for (i = 0; i < count; i++)
1506                 ppga[i] = pga + i;
1507         return ppga;
1508 }
1509
1510 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1511 {
1512         LASSERT(ppga != NULL);
1513         OBD_FREE(ppga, sizeof(*ppga) * count);
1514 }
1515
1516 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1517                    obd_count page_count, struct brw_page *pga,
1518                    struct obd_trans_info *oti)
1519 {
1520         struct obdo *saved_oa = NULL;
1521         struct brw_page **ppga, **orig;
1522         struct obd_import *imp = class_exp2cliimp(exp);
1523         struct client_obd *cli = &imp->imp_obd->u.cli;
1524         int rc, page_count_orig;
1525         ENTRY;
1526
1527         if (cmd & OBD_BRW_CHECK) {
1528                 /* The caller just wants to know if there's a chance that this
1529                  * I/O can succeed */
1530
1531                 if (imp == NULL || imp->imp_invalid)
1532                         RETURN(-EIO);
1533                 RETURN(0);
1534         }
1535
1536         /* test_brw with a failed create can trip this, maybe others. */
1537         LASSERT(cli->cl_max_pages_per_rpc);
1538
1539         rc = 0;
1540
1541         orig = ppga = osc_build_ppga(pga, page_count);
1542         if (ppga == NULL)
1543                 RETURN(-ENOMEM);
1544         page_count_orig = page_count;
1545
1546         sort_brw_pages(ppga, page_count);
1547         while (page_count) {
1548                 obd_count pages_per_brw;
1549
1550                 if (page_count > cli->cl_max_pages_per_rpc)
1551                         pages_per_brw = cli->cl_max_pages_per_rpc;
1552                 else
1553                         pages_per_brw = page_count;
1554
1555                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1556
1557                 if (saved_oa != NULL) {
1558                         /* restore previously saved oa */
1559                         *oinfo->oi_oa = *saved_oa;
1560                 } else if (page_count > pages_per_brw) {
1561                         /* save a copy of oa (brw will clobber it) */
1562                         OBDO_ALLOC(saved_oa);
1563                         if (saved_oa == NULL)
1564                                 GOTO(out, rc = -ENOMEM);
1565                         *saved_oa = *oinfo->oi_oa;
1566                 }
1567
1568                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1569                                       pages_per_brw, ppga);
1570
1571                 if (rc != 0)
1572                         break;
1573
1574                 page_count -= pages_per_brw;
1575                 ppga += pages_per_brw;
1576         }
1577
1578 out:
1579         osc_release_ppga(orig, page_count_orig);
1580
1581         if (saved_oa != NULL)
1582                 OBDO_FREE(saved_oa);
1583
1584         RETURN(rc);
1585 }
1586
1587 static int osc_brw_async(int cmd, struct obd_export *exp,
1588                          struct obd_info *oinfo, obd_count page_count,
1589                          struct brw_page *pga, struct obd_trans_info *oti,
1590                          struct ptlrpc_request_set *set)
1591 {
1592         struct brw_page **ppga, **orig;
1593         int page_count_orig;
1594         int rc = 0;
1595         ENTRY;
1596
1597         if (cmd & OBD_BRW_CHECK) {
1598                 /* The caller just wants to know if there's a chance that this
1599                  * I/O can succeed */
1600                 struct obd_import *imp = class_exp2cliimp(exp);
1601
1602                 if (imp == NULL || imp->imp_invalid)
1603                         RETURN(-EIO);
1604                 RETURN(0);
1605         }
1606
1607         orig = ppga = osc_build_ppga(pga, page_count);
1608         if (ppga == NULL)
1609                 RETURN(-ENOMEM);
1610         page_count_orig = page_count;
1611
1612         sort_brw_pages(ppga, page_count);
1613         while (page_count) {
1614                 struct brw_page **copy;
1615                 obd_count pages_per_brw;
1616
1617                 pages_per_brw = min_t(obd_count, page_count,
1618                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1619
1620                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1621
1622                 /* use ppga only if single RPC is going to fly */
1623                 if (pages_per_brw != page_count_orig || ppga != orig) {
1624                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1625                         if (copy == NULL)
1626                                 GOTO(out, rc = -ENOMEM);
1627                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1628                 } else
1629                         copy = ppga;
1630
1631                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1632                                     pages_per_brw, copy, set);
1633
1634                 if (rc != 0) {
1635                         if (copy != ppga)
1636                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1637                         break;
1638                 }
1639
1640                 if (copy == orig) {
1641                         /* we passed it to async_internal() which is
1642                          * now responsible for releasing memory */
1643                         orig = NULL;
1644                 }
1645
1646                 page_count -= pages_per_brw;
1647                 ppga += pages_per_brw;
1648         }
1649 out:
1650         if (orig)
1651                 osc_release_ppga(orig, page_count_orig);
1652         RETURN(rc);
1653 }
1654
1655 static void osc_check_rpcs(struct client_obd *cli);
1656
1657 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1658  * the dirty accounting.  Writeback completes or truncate happens before
1659  * writing starts.  Must be called with the loi lock held. */
1660 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1661                            int sent)
1662 {
1663         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1664 }
1665
1666 /* This maintains the lists of pending pages to read/write for a given object
1667  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1668  * to quickly find objects that are ready to send an RPC. */
1669 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1670                          int cmd)
1671 {
1672         int optimal;
1673         ENTRY;
1674
1675         if (lop->lop_num_pending == 0)
1676                 RETURN(0);
1677
1678         /* if we have an invalid import we want to drain the queued pages
1679          * by forcing them through rpcs that immediately fail and complete
1680          * the pages.  recovery relies on this to empty the queued pages
1681          * before canceling the locks and evicting down the llite pages */
1682         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1683                 RETURN(1);
1684
1685         /* stream rpcs in queue order as long as as there is an urgent page
1686          * queued.  this is our cheap solution for good batching in the case
1687          * where writepage marks some random page in the middle of the file
1688          * as urgent because of, say, memory pressure */
1689         if (!list_empty(&lop->lop_urgent)) {
1690                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1691                 RETURN(1);
1692         }
1693
1694         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1695         optimal = cli->cl_max_pages_per_rpc;
1696         if (cmd & OBD_BRW_WRITE) {
1697                 /* trigger a write rpc stream as long as there are dirtiers
1698                  * waiting for space.  as they're waiting, they're not going to
1699                  * create more pages to coallesce with what's waiting.. */
1700                 if (!list_empty(&cli->cl_cache_waiters)) {
1701                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1702                         RETURN(1);
1703                 }
1704
1705                 /* +16 to avoid triggering rpcs that would want to include pages
1706                  * that are being queued but which can't be made ready until
1707                  * the queuer finishes with the page. this is a wart for
1708                  * llite::commit_write() */
1709                 optimal += 16;
1710         }
1711         if (lop->lop_num_pending >= optimal)
1712                 RETURN(1);
1713
1714         RETURN(0);
1715 }
1716
1717 static void on_list(struct list_head *item, struct list_head *list,
1718                     int should_be_on)
1719 {
1720         if (list_empty(item) && should_be_on)
1721                 list_add_tail(item, list);
1722         else if (!list_empty(item) && !should_be_on)
1723                 list_del_init(item);
1724 }
1725
1726 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1727  * can find pages to build into rpcs quickly */
1728 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1729 {
1730         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1731                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1732                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1733
1734         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1735                 loi->loi_write_lop.lop_num_pending);
1736
1737         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1738                 loi->loi_read_lop.lop_num_pending);
1739 }
1740
1741 static void lop_update_pending(struct client_obd *cli,
1742                                struct loi_oap_pages *lop, int cmd, int delta)
1743 {
1744         lop->lop_num_pending += delta;
1745         if (cmd & OBD_BRW_WRITE)
1746                 cli->cl_pending_w_pages += delta;
1747         else
1748                 cli->cl_pending_r_pages += delta;
1749 }
1750
1751 /* this is called when a sync waiter receives an interruption.  Its job is to
1752  * get the caller woken as soon as possible.  If its page hasn't been put in an
1753  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1754  * desiring interruption which will forcefully complete the rpc once the rpc
1755  * has timed out */
1756 static void osc_occ_interrupted(struct oig_callback_context *occ)
1757 {
1758         struct osc_async_page *oap;
1759         struct loi_oap_pages *lop;
1760         struct lov_oinfo *loi;
1761         ENTRY;
1762
1763         /* XXX member_of() */
1764         oap = list_entry(occ, struct osc_async_page, oap_occ);
1765
1766         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1767
1768         oap->oap_interrupted = 1;
1769
1770         /* ok, it's been put in an rpc. only one oap gets a request reference */
1771         if (oap->oap_request != NULL) {
1772                 ptlrpc_mark_interrupted(oap->oap_request);
1773                 ptlrpcd_wake(oap->oap_request);
1774                 GOTO(unlock, 0);
1775         }
1776
1777         /* we don't get interruption callbacks until osc_trigger_group_io()
1778          * has been called and put the sync oaps in the pending/urgent lists.*/
1779         if (!list_empty(&oap->oap_pending_item)) {
1780                 list_del_init(&oap->oap_pending_item);
1781                 list_del_init(&oap->oap_urgent_item);
1782
1783                 loi = oap->oap_loi;
1784                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1785                         &loi->loi_write_lop : &loi->loi_read_lop;
1786                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1787                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1788
1789                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1790                 oap->oap_oig = NULL;
1791         }
1792
1793 unlock:
1794         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1795 }
1796
1797 /* this is trying to propogate async writeback errors back up to the
1798  * application.  As an async write fails we record the error code for later if
1799  * the app does an fsync.  As long as errors persist we force future rpcs to be
1800  * sync so that the app can get a sync error and break the cycle of queueing
1801  * pages for which writeback will fail. */
1802 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1803                            int rc)
1804 {
1805         if (rc) {
1806                 if (!ar->ar_rc)
1807                         ar->ar_rc = rc;
1808
1809                 ar->ar_force_sync = 1;
1810                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1811                 return;
1812
1813         }
1814
1815         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1816                 ar->ar_force_sync = 0;
1817 }
1818
1819 static void osc_oap_to_pending(struct osc_async_page *oap)
1820 {
1821         struct loi_oap_pages *lop;
1822
1823         if (oap->oap_cmd & OBD_BRW_WRITE)
1824                 lop = &oap->oap_loi->loi_write_lop;
1825         else
1826                 lop = &oap->oap_loi->loi_read_lop;
1827
1828         if (oap->oap_async_flags & ASYNC_URGENT)
1829                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1830         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1831         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1832 }
1833
1834 /* this must be called holding the loi list lock to give coverage to exit_cache,
1835  * async_flag maintenance, and oap_request */
1836 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1837                               struct osc_async_page *oap, int sent, int rc)
1838 {
1839         __u64 xid = 0;
1840
1841         ENTRY;
1842         if (oap->oap_request != NULL) {
1843                 xid = ptlrpc_req_xid(oap->oap_request);
1844                 ptlrpc_req_finished(oap->oap_request);
1845                 oap->oap_request = NULL;
1846         }
1847
1848         oap->oap_async_flags = 0;
1849         oap->oap_interrupted = 0;
1850
1851         if (oap->oap_cmd & OBD_BRW_WRITE) {
1852                 osc_process_ar(&cli->cl_ar, xid, rc);
1853                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1854         }
1855
1856         if (rc == 0 && oa != NULL) {
1857                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1858                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1859                 if (oa->o_valid & OBD_MD_FLMTIME)
1860                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1861                 if (oa->o_valid & OBD_MD_FLATIME)
1862                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1863                 if (oa->o_valid & OBD_MD_FLCTIME)
1864                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1865         }
1866
1867         if (oap->oap_oig) {
1868                 osc_exit_cache(cli, oap, sent);
1869                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1870                 oap->oap_oig = NULL;
1871                 EXIT;
1872                 return;
1873         }
1874
1875         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1876                                                 oap->oap_cmd, oa, rc);
1877
1878         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1879          * I/O on the page could start, but OSC calls it under lock
1880          * and thus we can add oap back to pending safely */
1881         if (rc)
1882                 /* upper layer wants to leave the page on pending queue */
1883                 osc_oap_to_pending(oap);
1884         else
1885                 osc_exit_cache(cli, oap, sent);
1886         EXIT;
1887 }
1888
1889 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1890 {
1891         struct osc_brw_async_args *aa = data;
1892         struct osc_async_page *oap, *tmp;
1893         struct client_obd *cli;
1894         ENTRY;
1895
1896         rc = osc_brw_fini_request(request, rc);
1897         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1898
1899         if (osc_recoverable_error(rc)) {
1900                 rc = osc_brw_redo_request(request, aa);
1901                 if (rc == 0)
1902                         RETURN(0);
1903         }
1904
1905         cli = aa->aa_cli;
1906         client_obd_list_lock(&cli->cl_loi_list_lock);
1907         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1908          * is called so we know whether to go to sync BRWs or wait for more
1909          * RPCs to complete */
1910         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1911                 cli->cl_w_in_flight--;
1912         else
1913                 cli->cl_r_in_flight--;
1914
1915         /* the caller may re-use the oap after the completion call so
1916          * we need to clean it up a little */
1917         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1918                 list_del_init(&oap->oap_rpc_item);
1919                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1920         }
1921
1922         osc_wake_cache_waiters(cli);
1923         osc_check_rpcs(cli);
1924         client_obd_list_unlock(&cli->cl_loi_list_lock);
1925
1926         OBDO_FREE(aa->aa_oa);
1927
1928         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1929         RETURN(rc);
1930 }
1931
1932 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1933                                             struct list_head *rpc_list,
1934                                             int page_count, int cmd)
1935 {
1936         struct ptlrpc_request *req;
1937         struct brw_page **pga = NULL;
1938         struct osc_brw_async_args *aa;
1939         struct obdo *oa = NULL;
1940         struct obd_async_page_ops *ops = NULL;
1941         void *caller_data = NULL;
1942         struct osc_async_page *oap;
1943         int i, rc;
1944
1945         ENTRY;
1946         LASSERT(!list_empty(rpc_list));
1947
1948         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1949         if (pga == NULL)
1950                 RETURN(ERR_PTR(-ENOMEM));
1951
1952         OBDO_ALLOC(oa);
1953         if (oa == NULL)
1954                 GOTO(out, req = ERR_PTR(-ENOMEM));
1955
1956         i = 0;
1957         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1958                 if (ops == NULL) {
1959                         ops = oap->oap_caller_ops;
1960                         caller_data = oap->oap_caller_data;
1961                 }
1962                 pga[i] = &oap->oap_brw_page;
1963                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1964                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1965                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1966                 i++;
1967         }
1968
1969         /* always get the data for the obdo for the rpc */
1970         LASSERT(ops != NULL);
1971         ops->ap_fill_obdo(caller_data, cmd, oa);
1972
1973         sort_brw_pages(pga, page_count);
1974         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1975         if (rc != 0) {
1976                 CERROR("prep_req failed: %d\n", rc);
1977                 GOTO(out, req = ERR_PTR(rc));
1978         }
1979
1980         /* Need to update the timestamps after the request is built in case
1981          * we race with setattr (locally or in queue at OST).  If OST gets
1982          * later setattr before earlier BRW (as determined by the request xid),
1983          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1984          * way to do this in a single call.  bug 10150 */
1985         ops->ap_update_obdo(caller_data, cmd, oa,
1986                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1987
1988         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1989         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1990         INIT_LIST_HEAD(&aa->aa_oaps);
1991         list_splice(rpc_list, &aa->aa_oaps);
1992         INIT_LIST_HEAD(rpc_list);
1993
1994 out:
1995         if (IS_ERR(req)) {
1996                 if (oa)
1997                         OBDO_FREE(oa);
1998                 if (pga)
1999                         OBD_FREE(pga, sizeof(*pga) * page_count);
2000         }
2001         RETURN(req);
2002 }
2003
2004 /* the loi lock is held across this function but it's allowed to release
2005  * and reacquire it during its work */
2006 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2007                             int cmd, struct loi_oap_pages *lop)
2008 {
2009         struct ptlrpc_request *req;
2010         obd_count page_count = 0;
2011         struct osc_async_page *oap = NULL, *tmp;
2012         struct osc_brw_async_args *aa;
2013         struct obd_async_page_ops *ops;
2014         CFS_LIST_HEAD(rpc_list);
2015         unsigned int ending_offset;
2016         unsigned  starting_offset = 0;
2017         int srvlock = 0;
2018         ENTRY;
2019
2020         /* first we find the pages we're allowed to work with */
2021         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2022                 ops = oap->oap_caller_ops;
2023
2024                 LASSERT(oap->oap_magic == OAP_MAGIC);
2025
2026                 if (page_count != 0 &&
2027                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2028                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2029                                " oap %p, page %p, srvlock %u\n",
2030                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2031                         break;
2032                 }
2033                 /* in llite being 'ready' equates to the page being locked
2034                  * until completion unlocks it.  commit_write submits a page
2035                  * as not ready because its unlock will happen unconditionally
2036                  * as the call returns.  if we race with commit_write giving
2037                  * us that page we dont' want to create a hole in the page
2038                  * stream, so we stop and leave the rpc to be fired by
2039                  * another dirtier or kupdated interval (the not ready page
2040                  * will still be on the dirty list).  we could call in
2041                  * at the end of ll_file_write to process the queue again. */
2042                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2043                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2044                         if (rc < 0)
2045                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2046                                                 "instead of ready\n", oap,
2047                                                 oap->oap_page, rc);
2048                         switch (rc) {
2049                         case -EAGAIN:
2050                                 /* llite is telling us that the page is still
2051                                  * in commit_write and that we should try
2052                                  * and put it in an rpc again later.  we
2053                                  * break out of the loop so we don't create
2054                                  * a hole in the sequence of pages in the rpc
2055                                  * stream.*/
2056                                 oap = NULL;
2057                                 break;
2058                         case -EINTR:
2059                                 /* the io isn't needed.. tell the checks
2060                                  * below to complete the rpc with EINTR */
2061                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2062                                 oap->oap_count = -EINTR;
2063                                 break;
2064                         case 0:
2065                                 oap->oap_async_flags |= ASYNC_READY;
2066                                 break;
2067                         default:
2068                                 LASSERTF(0, "oap %p page %p returned %d "
2069                                             "from make_ready\n", oap,
2070                                             oap->oap_page, rc);
2071                                 break;
2072                         }
2073                 }
2074                 if (oap == NULL)
2075                         break;
2076                 /*
2077                  * Page submitted for IO has to be locked. Either by
2078                  * ->ap_make_ready() or by higher layers.
2079                  *
2080                  * XXX nikita: this assertion should be adjusted when lustre
2081                  * starts using PG_writeback for pages being written out.
2082                  */
2083 #if defined(__KERNEL__) && defined(__LINUX__)
2084                 LASSERT(PageLocked(oap->oap_page));
2085 #endif
2086                 /* If there is a gap at the start of this page, it can't merge
2087                  * with any previous page, so we'll hand the network a
2088                  * "fragmented" page array that it can't transfer in 1 RDMA */
2089                 if (page_count != 0 && oap->oap_page_off != 0)
2090                         break;
2091
2092                 /* take the page out of our book-keeping */
2093                 list_del_init(&oap->oap_pending_item);
2094                 lop_update_pending(cli, lop, cmd, -1);
2095                 list_del_init(&oap->oap_urgent_item);
2096
2097                 if (page_count == 0)
2098                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2099                                           (PTLRPC_MAX_BRW_SIZE - 1);
2100
2101                 /* ask the caller for the size of the io as the rpc leaves. */
2102                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2103                         oap->oap_count =
2104                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2105                 if (oap->oap_count <= 0) {
2106                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2107                                oap->oap_count);
2108                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2109                         continue;
2110                 }
2111
2112                 /* now put the page back in our accounting */
2113                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2114                 if (page_count == 0)
2115                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2116                 if (++page_count >= cli->cl_max_pages_per_rpc)
2117                         break;
2118
2119                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2120                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2121                  * have the same alignment as the initial writes that allocated
2122                  * extents on the server. */
2123                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2124                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2125                 if (ending_offset == 0)
2126                         break;
2127
2128                 /* If there is a gap at the end of this page, it can't merge
2129                  * with any subsequent pages, so we'll hand the network a
2130                  * "fragmented" page array that it can't transfer in 1 RDMA */
2131                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2132                         break;
2133         }
2134
2135         osc_wake_cache_waiters(cli);
2136
2137         if (page_count == 0)
2138                 RETURN(0);
2139
2140         loi_list_maint(cli, loi);
2141
2142         client_obd_list_unlock(&cli->cl_loi_list_lock);
2143
2144         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2145         if (IS_ERR(req)) {
2146                 /* this should happen rarely and is pretty bad, it makes the
2147                  * pending list not follow the dirty order */
2148                 client_obd_list_lock(&cli->cl_loi_list_lock);
2149                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2150                         list_del_init(&oap->oap_rpc_item);
2151
2152                         /* queued sync pages can be torn down while the pages
2153                          * were between the pending list and the rpc */
2154                         if (oap->oap_interrupted) {
2155                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2156                                 osc_ap_completion(cli, NULL, oap, 0,
2157                                                   oap->oap_count);
2158                                 continue;
2159                         }
2160                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2161                 }
2162                 loi_list_maint(cli, loi);
2163                 RETURN(PTR_ERR(req));
2164         }
2165
2166         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2167         if (cmd == OBD_BRW_READ) {
2168                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2169                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2170                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2171                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2172                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2173         } else {
2174                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2175                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2176                                  cli->cl_w_in_flight);
2177                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2178                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2179                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2180         }
2181
2182         client_obd_list_lock(&cli->cl_loi_list_lock);
2183
2184         if (cmd == OBD_BRW_READ)
2185                 cli->cl_r_in_flight++;
2186         else
2187                 cli->cl_w_in_flight++;
2188
2189         /* queued sync pages can be torn down while the pages
2190          * were between the pending list and the rpc */
2191         tmp = NULL;
2192         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2193                 /* only one oap gets a request reference */
2194                 if (tmp == NULL)
2195                         tmp = oap;
2196                 if (oap->oap_interrupted && !req->rq_intr) {
2197                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2198                                oap, req);
2199                         ptlrpc_mark_interrupted(req);
2200                 }
2201         }
2202         if (tmp != NULL)
2203                 tmp->oap_request = ptlrpc_request_addref(req);
2204
2205         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2206                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2207
2208         req->rq_interpret_reply = brw_interpret_oap;
2209         ptlrpcd_add_req(req);
2210         RETURN(1);
2211 }
2212
2213 #define LOI_DEBUG(LOI, STR, args...)                                     \
2214         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2215                !list_empty(&(LOI)->loi_cli_item),                        \
2216                (LOI)->loi_write_lop.lop_num_pending,                     \
2217                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2218                (LOI)->loi_read_lop.lop_num_pending,                      \
2219                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2220                args)                                                     \
2221
2222 /* This is called by osc_check_rpcs() to find which objects have pages that
2223  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2224 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2225 {
2226         ENTRY;
2227         /* first return all objects which we already know to have
2228          * pages ready to be stuffed into rpcs */
2229         if (!list_empty(&cli->cl_loi_ready_list))
2230                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2231                                   struct lov_oinfo, loi_cli_item));
2232
2233         /* then if we have cache waiters, return all objects with queued
2234          * writes.  This is especially important when many small files
2235          * have filled up the cache and not been fired into rpcs because
2236          * they don't pass the nr_pending/object threshhold */
2237         if (!list_empty(&cli->cl_cache_waiters) &&
2238             !list_empty(&cli->cl_loi_write_list))
2239                 RETURN(list_entry(cli->cl_loi_write_list.next,
2240                                   struct lov_oinfo, loi_write_item));
2241
2242         /* then return all queued objects when we have an invalid import
2243          * so that they get flushed */
2244         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2245                 if (!list_empty(&cli->cl_loi_write_list))
2246                         RETURN(list_entry(cli->cl_loi_write_list.next,
2247                                           struct lov_oinfo, loi_write_item));
2248                 if (!list_empty(&cli->cl_loi_read_list))
2249                         RETURN(list_entry(cli->cl_loi_read_list.next,
2250                                           struct lov_oinfo, loi_read_item));
2251         }
2252         RETURN(NULL);
2253 }
2254
2255 /* called with the loi list lock held */
2256 static void osc_check_rpcs(struct client_obd *cli)
2257 {
2258         struct lov_oinfo *loi;
2259         int rc = 0, race_counter = 0;
2260         ENTRY;
2261
2262         while ((loi = osc_next_loi(cli)) != NULL) {
2263                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2264
2265                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2266                         break;
2267
2268                 /* attempt some read/write balancing by alternating between
2269                  * reads and writes in an object.  The makes_rpc checks here
2270                  * would be redundant if we were getting read/write work items
2271                  * instead of objects.  we don't want send_oap_rpc to drain a
2272                  * partial read pending queue when we're given this object to
2273                  * do io on writes while there are cache waiters */
2274                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2275                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2276                                               &loi->loi_write_lop);
2277                         if (rc < 0)
2278                                 break;
2279                         if (rc > 0)
2280                                 race_counter = 0;
2281                         else
2282                                 race_counter++;
2283                 }
2284                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2285                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2286                                               &loi->loi_read_lop);
2287                         if (rc < 0)
2288                                 break;
2289                         if (rc > 0)
2290                                 race_counter = 0;
2291                         else
2292                                 race_counter++;
2293                 }
2294
2295                 /* attempt some inter-object balancing by issueing rpcs
2296                  * for each object in turn */
2297                 if (!list_empty(&loi->loi_cli_item))
2298                         list_del_init(&loi->loi_cli_item);
2299                 if (!list_empty(&loi->loi_write_item))
2300                         list_del_init(&loi->loi_write_item);
2301                 if (!list_empty(&loi->loi_read_item))
2302                         list_del_init(&loi->loi_read_item);
2303
2304                 loi_list_maint(cli, loi);
2305
2306                 /* send_oap_rpc fails with 0 when make_ready tells it to
2307                  * back off.  llite's make_ready does this when it tries
2308                  * to lock a page queued for write that is already locked.
2309                  * we want to try sending rpcs from many objects, but we
2310                  * don't want to spin failing with 0.  */
2311                 if (race_counter == 10)
2312                         break;
2313         }
2314         EXIT;
2315 }
2316
2317 /* we're trying to queue a page in the osc so we're subject to the
2318  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2319  * If the osc's queued pages are already at that limit, then we want to sleep
2320  * until there is space in the osc's queue for us.  We also may be waiting for
2321  * write credits from the OST if there are RPCs in flight that may return some
2322  * before we fall back to sync writes.
2323  *
2324  * We need this know our allocation was granted in the presence of signals */
2325 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2326 {
2327         int rc;
2328         ENTRY;
2329         client_obd_list_lock(&cli->cl_loi_list_lock);
2330         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2331         client_obd_list_unlock(&cli->cl_loi_list_lock);
2332         RETURN(rc);
2333 };
2334
2335 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2336  * grant or cache space. */
2337 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2338                            struct osc_async_page *oap)
2339 {
2340         struct osc_cache_waiter ocw;
2341         struct l_wait_info lwi = { 0 };
2342         ENTRY;
2343
2344         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2345                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2346                cli->cl_dirty_max, obd_max_dirty_pages,
2347                cli->cl_lost_grant, cli->cl_avail_grant);
2348
2349         /* force the caller to try sync io.  this can jump the list
2350          * of queued writes and create a discontiguous rpc stream */
2351         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2352             loi->loi_ar.ar_force_sync)
2353                 RETURN(-EDQUOT);
2354
2355         /* Hopefully normal case - cache space and write credits available */
2356         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2357             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2358             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2359                 /* account for ourselves */
2360                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2361                 RETURN(0);
2362         }
2363
2364         /* Make sure that there are write rpcs in flight to wait for.  This
2365          * is a little silly as this object may not have any pending but
2366          * other objects sure might. */
2367         if (cli->cl_w_in_flight) {
2368                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2369                 cfs_waitq_init(&ocw.ocw_waitq);
2370                 ocw.ocw_oap = oap;
2371                 ocw.ocw_rc = 0;
2372
2373                 loi_list_maint(cli, loi);
2374                 osc_check_rpcs(cli);
2375                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2376
2377                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2378                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2379
2380                 client_obd_list_lock(&cli->cl_loi_list_lock);
2381                 if (!list_empty(&ocw.ocw_entry)) {
2382                         list_del(&ocw.ocw_entry);
2383                         RETURN(-EINTR);
2384                 }
2385                 RETURN(ocw.ocw_rc);
2386         }
2387
2388         RETURN(-EDQUOT);
2389 }
2390
2391 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2392                         struct lov_oinfo *loi, cfs_page_t *page,
2393                         obd_off offset, struct obd_async_page_ops *ops,
2394                         void *data, void **res)
2395 {
2396         struct osc_async_page *oap;
2397         ENTRY;
2398
2399         if (!page)
2400                 return size_round(sizeof(*oap));
2401
2402         oap = *res;
2403         oap->oap_magic = OAP_MAGIC;
2404         oap->oap_cli = &exp->exp_obd->u.cli;
2405         oap->oap_loi = loi;
2406
2407         oap->oap_caller_ops = ops;
2408         oap->oap_caller_data = data;
2409
2410         oap->oap_page = page;
2411         oap->oap_obj_off = offset;
2412
2413         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2414         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2415         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2416
2417         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2418
2419         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2420         RETURN(0);
2421 }
2422
2423 struct osc_async_page *oap_from_cookie(void *cookie)
2424 {
2425         struct osc_async_page *oap = cookie;
2426         if (oap->oap_magic != OAP_MAGIC)
2427                 return ERR_PTR(-EINVAL);
2428         return oap;
2429 };
2430
2431 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2432                               struct lov_oinfo *loi, void *cookie,
2433                               int cmd, obd_off off, int count,
2434                               obd_flag brw_flags, enum async_flags async_flags)
2435 {
2436         struct client_obd *cli = &exp->exp_obd->u.cli;
2437         struct osc_async_page *oap;
2438         int rc = 0;
2439         ENTRY;
2440
2441         oap = oap_from_cookie(cookie);
2442         if (IS_ERR(oap))
2443                 RETURN(PTR_ERR(oap));
2444
2445         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2446                 RETURN(-EIO);
2447
2448         if (!list_empty(&oap->oap_pending_item) ||
2449             !list_empty(&oap->oap_urgent_item) ||
2450             !list_empty(&oap->oap_rpc_item))
2451                 RETURN(-EBUSY);
2452
2453         /* check if the file's owner/group is over quota */
2454 #ifdef HAVE_QUOTA_SUPPORT
2455         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2456                 struct obd_async_page_ops *ops;
2457                 struct obdo *oa;
2458
2459                 OBDO_ALLOC(oa);
2460                 if (oa == NULL)
2461                         RETURN(-ENOMEM);
2462
2463                 ops = oap->oap_caller_ops;
2464                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2465                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2466                     NO_QUOTA)
2467                         rc = -EDQUOT;
2468
2469                 OBDO_FREE(oa);
2470                 if (rc)
2471                         RETURN(rc);
2472         }
2473 #endif
2474
2475         if (loi == NULL)
2476                 loi = lsm->lsm_oinfo[0];
2477
2478         client_obd_list_lock(&cli->cl_loi_list_lock);
2479
2480         oap->oap_cmd = cmd;
2481         oap->oap_page_off = off;
2482         oap->oap_count = count;
2483         oap->oap_brw_flags = brw_flags;
2484         oap->oap_async_flags = async_flags;
2485
2486         if (cmd & OBD_BRW_WRITE) {
2487                 rc = osc_enter_cache(cli, loi, oap);
2488                 if (rc) {
2489                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2490                         RETURN(rc);
2491                 }
2492         }
2493
2494         osc_oap_to_pending(oap);
2495         loi_list_maint(cli, loi);
2496
2497         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2498                   cmd);
2499
2500         osc_check_rpcs(cli);
2501         client_obd_list_unlock(&cli->cl_loi_list_lock);
2502
2503         RETURN(0);
2504 }
2505
2506 /* aka (~was & now & flag), but this is more clear :) */
2507 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2508
2509 static int osc_set_async_flags(struct obd_export *exp,
2510                                struct lov_stripe_md *lsm,
2511                                struct lov_oinfo *loi, void *cookie,
2512                                obd_flag async_flags)
2513 {
2514         struct client_obd *cli = &exp->exp_obd->u.cli;
2515         struct loi_oap_pages *lop;
2516         struct osc_async_page *oap;
2517         int rc = 0;
2518         ENTRY;
2519
2520         oap = oap_from_cookie(cookie);
2521         if (IS_ERR(oap))
2522                 RETURN(PTR_ERR(oap));
2523
2524         /*
2525          * bug 7311: OST-side locking is only supported for liblustre for now
2526          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2527          * implementation has to handle case where OST-locked page was picked
2528          * up by, e.g., ->writepage().
2529          */
2530         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2531         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2532                                      * tread here. */
2533
2534         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2535                 RETURN(-EIO);
2536
2537         if (loi == NULL)
2538                 loi = lsm->lsm_oinfo[0];
2539
2540         if (oap->oap_cmd & OBD_BRW_WRITE) {
2541                 lop = &loi->loi_write_lop;
2542         } else {
2543                 lop = &loi->loi_read_lop;
2544         }
2545
2546         client_obd_list_lock(&cli->cl_loi_list_lock);
2547
2548         if (list_empty(&oap->oap_pending_item))
2549                 GOTO(out, rc = -EINVAL);
2550
2551         if ((oap->oap_async_flags & async_flags) == async_flags)
2552                 GOTO(out, rc = 0);
2553
2554         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2555                 oap->oap_async_flags |= ASYNC_READY;
2556
2557         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2558                 if (list_empty(&oap->oap_rpc_item)) {
2559                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2560                         loi_list_maint(cli, loi);
2561                 }
2562         }
2563
2564         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2565                         oap->oap_async_flags);
2566 out:
2567         osc_check_rpcs(cli);
2568         client_obd_list_unlock(&cli->cl_loi_list_lock);
2569         RETURN(rc);
2570 }
2571
2572 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2573                              struct lov_oinfo *loi,
2574                              struct obd_io_group *oig, void *cookie,
2575                              int cmd, obd_off off, int count,
2576                              obd_flag brw_flags,
2577                              obd_flag async_flags)
2578 {
2579         struct client_obd *cli = &exp->exp_obd->u.cli;
2580         struct osc_async_page *oap;
2581         struct loi_oap_pages *lop;
2582         int rc = 0;
2583         ENTRY;
2584
2585         oap = oap_from_cookie(cookie);
2586         if (IS_ERR(oap))
2587                 RETURN(PTR_ERR(oap));
2588
2589         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2590                 RETURN(-EIO);
2591
2592         if (!list_empty(&oap->oap_pending_item) ||
2593             !list_empty(&oap->oap_urgent_item) ||
2594             !list_empty(&oap->oap_rpc_item))
2595                 RETURN(-EBUSY);
2596
2597         if (loi == NULL)
2598                 loi = lsm->lsm_oinfo[0];
2599
2600         client_obd_list_lock(&cli->cl_loi_list_lock);
2601
2602         oap->oap_cmd = cmd;
2603         oap->oap_page_off = off;
2604         oap->oap_count = count;
2605         oap->oap_brw_flags = brw_flags;
2606         oap->oap_async_flags = async_flags;
2607
2608         if (cmd & OBD_BRW_WRITE)
2609                 lop = &loi->loi_write_lop;
2610         else
2611                 lop = &loi->loi_read_lop;
2612
2613         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2614         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2615                 oap->oap_oig = oig;
2616                 rc = oig_add_one(oig, &oap->oap_occ);
2617         }
2618
2619         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2620                   oap, oap->oap_page, rc);
2621
2622         client_obd_list_unlock(&cli->cl_loi_list_lock);
2623
2624         RETURN(rc);
2625 }
2626
2627 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2628                                  struct loi_oap_pages *lop, int cmd)
2629 {
2630         struct list_head *pos, *tmp;
2631         struct osc_async_page *oap;
2632
2633         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2634                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2635                 list_del(&oap->oap_pending_item);
2636                 osc_oap_to_pending(oap);
2637         }
2638         loi_list_maint(cli, loi);
2639 }
2640
2641 static int osc_trigger_group_io(struct obd_export *exp,
2642                                 struct lov_stripe_md *lsm,
2643                                 struct lov_oinfo *loi,
2644                                 struct obd_io_group *oig)
2645 {
2646         struct client_obd *cli = &exp->exp_obd->u.cli;
2647         ENTRY;
2648
2649         if (loi == NULL)
2650                 loi = lsm->lsm_oinfo[0];
2651
2652         client_obd_list_lock(&cli->cl_loi_list_lock);
2653
2654         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2655         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2656
2657         osc_check_rpcs(cli);
2658         client_obd_list_unlock(&cli->cl_loi_list_lock);
2659
2660         RETURN(0);
2661 }
2662
2663 static int osc_teardown_async_page(struct obd_export *exp,
2664                                    struct lov_stripe_md *lsm,
2665                                    struct lov_oinfo *loi, void *cookie)
2666 {
2667         struct client_obd *cli = &exp->exp_obd->u.cli;
2668         struct loi_oap_pages *lop;
2669         struct osc_async_page *oap;
2670         int rc = 0;
2671         ENTRY;
2672
2673         oap = oap_from_cookie(cookie);
2674         if (IS_ERR(oap))
2675                 RETURN(PTR_ERR(oap));
2676
2677         if (loi == NULL)
2678                 loi = lsm->lsm_oinfo[0];
2679
2680         if (oap->oap_cmd & OBD_BRW_WRITE) {
2681                 lop = &loi->loi_write_lop;
2682         } else {
2683                 lop = &loi->loi_read_lop;
2684         }
2685
2686         client_obd_list_lock(&cli->cl_loi_list_lock);
2687
2688         if (!list_empty(&oap->oap_rpc_item))
2689                 GOTO(out, rc = -EBUSY);
2690
2691         osc_exit_cache(cli, oap, 0);
2692         osc_wake_cache_waiters(cli);
2693
2694         if (!list_empty(&oap->oap_urgent_item)) {
2695                 list_del_init(&oap->oap_urgent_item);
2696                 oap->oap_async_flags &= ~ASYNC_URGENT;
2697         }
2698         if (!list_empty(&oap->oap_pending_item)) {
2699                 list_del_init(&oap->oap_pending_item);
2700                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2701         }
2702         loi_list_maint(cli, loi);
2703
2704         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2705 out:
2706         client_obd_list_unlock(&cli->cl_loi_list_lock);
2707         RETURN(rc);
2708 }
2709
2710 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2711                                     int flags)
2712 {
2713         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2714
2715         if (lock == NULL) {
2716                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2717                 return;
2718         }
2719         lock_res_and_lock(lock);
2720 #if defined (__KERNEL__) && defined (__LINUX__)
2721         /* Liang XXX: Darwin and Winnt checking should be added */
2722         if (lock->l_ast_data && lock->l_ast_data != data) {
2723                 struct inode *new_inode = data;
2724                 struct inode *old_inode = lock->l_ast_data;
2725                 if (!(old_inode->i_state & I_FREEING))
2726                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2727                 LASSERTF(old_inode->i_state & I_FREEING,
2728                          "Found existing inode %p/%lu/%u state %lu in lock: "
2729                          "setting data to %p/%lu/%u\n", old_inode,
2730                          old_inode->i_ino, old_inode->i_generation,
2731                          old_inode->i_state,
2732                          new_inode, new_inode->i_ino, new_inode->i_generation);
2733         }
2734 #endif
2735         lock->l_ast_data = data;
2736         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2737         unlock_res_and_lock(lock);
2738         LDLM_LOCK_PUT(lock);
2739 }
2740
2741 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2742                              ldlm_iterator_t replace, void *data)
2743 {
2744         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2745         struct obd_device *obd = class_exp2obd(exp);
2746
2747         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2748         return 0;
2749 }
2750
2751 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2752                             int intent, int rc)
2753 {
2754         ENTRY;
2755
2756         if (intent) {
2757                 /* The request was created before ldlm_cli_enqueue call. */
2758                 if (rc == ELDLM_LOCK_ABORTED) {
2759                         struct ldlm_reply *rep;
2760
2761                         /* swabbed by ldlm_cli_enqueue() */
2762                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2763                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2764                                              sizeof(*rep));
2765                         LASSERT(rep != NULL);
2766                         if (rep->lock_policy_res1)
2767                                 rc = rep->lock_policy_res1;
2768                 }
2769         }
2770
2771         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2772                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2773                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2774                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2775                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2776         }
2777
2778         /* Call the update callback. */
2779         rc = oinfo->oi_cb_up(oinfo, rc);
2780         RETURN(rc);
2781 }
2782
2783 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2784                                  struct osc_enqueue_args *aa, int rc)
2785 {
2786         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2787         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2788         struct ldlm_lock *lock;
2789
2790         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2791          * be valid. */
2792         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2793
2794         /* Complete obtaining the lock procedure. */
2795         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2796                                    aa->oa_ei->ei_mode,
2797                                    &aa->oa_oi->oi_flags,
2798                                    &lsm->lsm_oinfo[0]->loi_lvb,
2799                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2800                                    lustre_swab_ost_lvb,
2801                                    aa->oa_oi->oi_lockh, rc);
2802
2803         /* Complete osc stuff. */
2804         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2805
2806         /* Release the lock for async request. */
2807         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2808                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2809
2810         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2811                  aa->oa_oi->oi_lockh, req, aa);
2812         LDLM_LOCK_PUT(lock);
2813         return rc;
2814 }
2815
2816 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2817  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2818  * other synchronous requests, however keeping some locks and trying to obtain
2819  * others may take a considerable amount of time in a case of ost failure; and
2820  * when other sync requests do not get released lock from a client, the client
2821  * is excluded from the cluster -- such scenarious make the life difficult, so
2822  * release locks just after they are obtained. */
2823 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2824                        struct ldlm_enqueue_info *einfo,
2825                        struct ptlrpc_request_set *rqset)
2826 {
2827         struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2828         struct obd_device *obd = exp->exp_obd;
2829         struct ldlm_reply *rep;
2830         struct ptlrpc_request *req = NULL;
2831         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2832         ldlm_mode_t mode;
2833         int rc;
2834         ENTRY;
2835
2836         /* Filesystem lock extents are extended to page boundaries so that
2837          * dealing with the page cache is a little smoother.  */
2838         oinfo->oi_policy.l_extent.start -=
2839                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2840         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2841
2842         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2843                 goto no_match;
2844
2845         /* Next, search for already existing extent locks that will cover us */
2846         /* If we're trying to read, we also search for an existing PW lock.  The
2847          * VFS and page cache already protect us locally, so lots of readers/
2848          * writers can share a single PW lock.
2849          *
2850          * There are problems with conversion deadlocks, so instead of
2851          * converting a read lock to a write lock, we'll just enqueue a new
2852          * one.
2853          *
2854          * At some point we should cancel the read lock instead of making them
2855          * send us a blocking callback, but there are problems with canceling
2856          * locks out from other users right now, too. */
2857         mode = einfo->ei_mode;
2858         if (einfo->ei_mode == LCK_PR)
2859                 mode |= LCK_PW;
2860         mode = ldlm_lock_match(obd->obd_namespace,
2861                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2862                                einfo->ei_type, &oinfo->oi_policy, mode,
2863                                oinfo->oi_lockh);
2864         if (mode) {
2865                 /* addref the lock only if not async requests and PW lock is
2866                  * matched whereas we asked for PR. */
2867                 if (!rqset && einfo->ei_mode != mode)
2868                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2869                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2870                                         oinfo->oi_flags);
2871                 if (intent) {
2872                         /* I would like to be able to ASSERT here that rss <=
2873                          * kms, but I can't, for reasons which are explained in
2874                          * lov_enqueue() */
2875                 }
2876
2877                 /* We already have a lock, and it's referenced */
2878                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2879
2880                 /* For async requests, decref the lock. */
2881                 if (einfo->ei_mode != mode)
2882                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2883                 else if (rqset)
2884                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2885
2886                 RETURN(ELDLM_OK);
2887         }
2888
2889  no_match:
2890         if (intent) {
2891                 int size[3] = {
2892                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2893                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2894                         [DLM_LOCKREQ_OFF + 1] = 0 };
2895
2896                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2897                 if (req == NULL)
2898                         RETURN(-ENOMEM);
2899
2900                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2901                 size[DLM_REPLY_REC_OFF] = 
2902                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2903                 ptlrpc_req_set_repsize(req, 3, size);
2904         }
2905
2906         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2907         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2908
2909         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
2910                               &oinfo->oi_policy, &oinfo->oi_flags,
2911                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2912                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2913                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2914                               rqset ? 1 : 0);
2915         if (rqset) {
2916                 if (!rc) {
2917                         struct osc_enqueue_args *aa;
2918                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2919                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2920                         aa->oa_oi = oinfo;
2921                         aa->oa_ei = einfo;
2922                         aa->oa_exp = exp;
2923
2924                         req->rq_interpret_reply = osc_enqueue_interpret;
2925                         ptlrpc_set_add_req(rqset, req);
2926                 } else if (intent) {
2927                         ptlrpc_req_finished(req);
2928                 }
2929                 RETURN(rc);
2930         }
2931
2932         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2933         if (intent)
2934                 ptlrpc_req_finished(req);
2935
2936         RETURN(rc);
2937 }
2938
2939 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2940                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2941                      int *flags, void *data, struct lustre_handle *lockh)
2942 {
2943         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2944         struct obd_device *obd = exp->exp_obd;
2945         int lflags = *flags;
2946         ldlm_mode_t rc;
2947         ENTRY;
2948
2949         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2950
2951         /* Filesystem lock extents are extended to page boundaries so that
2952          * dealing with the page cache is a little smoother */
2953         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2954         policy->l_extent.end |= ~CFS_PAGE_MASK;
2955
2956         /* Next, search for already existing extent locks that will cover us */
2957         /* If we're trying to read, we also search for an existing PW lock.  The
2958          * VFS and page cache already protect us locally, so lots of readers/
2959          * writers can share a single PW lock. */
2960         rc = mode;
2961         if (mode == LCK_PR)
2962                 rc |= LCK_PW;
2963         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2964                              &res_id, type, policy, rc, lockh);
2965         if (rc) {
2966                 osc_set_data_with_check(lockh, data, lflags);
2967                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2968                         ldlm_lock_addref(lockh, LCK_PR);
2969                         ldlm_lock_decref(lockh, LCK_PW);
2970                 }
2971                 RETURN(rc);
2972         }
2973
2974         RETURN(rc);
2975 }
2976
2977 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2978                       __u32 mode, struct lustre_handle *lockh)
2979 {
2980         ENTRY;
2981
2982         if (unlikely(mode == LCK_GROUP))
2983                 ldlm_lock_decref_and_cancel(lockh, mode);
2984         else
2985                 ldlm_lock_decref(lockh, mode);
2986
2987         RETURN(0);
2988 }
2989
2990 static int osc_cancel_unused(struct obd_export *exp,
2991                              struct lov_stripe_md *lsm, int flags, void *opaque)
2992 {
2993         struct obd_device *obd = class_exp2obd(exp);
2994         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2995
2996         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2997                                       opaque);
2998 }
2999
3000 static int osc_join_lru(struct obd_export *exp,
3001                         struct lov_stripe_md *lsm, int join)
3002 {
3003         struct obd_device *obd = class_exp2obd(exp);
3004         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3005
3006         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
3007 }
3008
3009 static int osc_statfs_interpret(struct ptlrpc_request *req,
3010                                 struct osc_async_args *aa, int rc)
3011 {
3012         struct obd_statfs *msfs;
3013         ENTRY;
3014
3015         if (rc != 0)
3016                 GOTO(out, rc);
3017
3018         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3019                                   lustre_swab_obd_statfs);
3020         if (msfs == NULL) {
3021                 CERROR("Can't unpack obd_statfs\n");
3022                 GOTO(out, rc = -EPROTO);
3023         }
3024
3025         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3026 out:
3027         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3028         RETURN(rc);
3029 }
3030
3031 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3032                             __u64 max_age, struct ptlrpc_request_set *rqset)
3033 {
3034         struct ptlrpc_request *req;
3035         struct osc_async_args *aa;
3036         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3037         ENTRY;
3038
3039         /* We could possibly pass max_age in the request (as an absolute
3040          * timestamp or a "seconds.usec ago") so the target can avoid doing
3041          * extra calls into the filesystem if that isn't necessary (e.g.
3042          * during mount that would help a bit).  Having relative timestamps
3043          * is not so great if request processing is slow, while absolute
3044          * timestamps are not ideal because they need time synchronization. */
3045         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3046                               OST_STATFS, 1, NULL, NULL);
3047         if (!req)
3048                 RETURN(-ENOMEM);
3049
3050         ptlrpc_req_set_repsize(req, 2, size);
3051         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3052
3053         req->rq_interpret_reply = osc_statfs_interpret;
3054         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3055         aa = (struct osc_async_args *)&req->rq_async_args;
3056         aa->aa_oi = oinfo;
3057
3058         ptlrpc_set_add_req(rqset, req);
3059         RETURN(0);
3060 }
3061
3062 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3063                       __u64 max_age)
3064 {
3065         struct obd_statfs *msfs;
3066         struct ptlrpc_request *req;
3067         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3068         ENTRY;
3069
3070         /* We could possibly pass max_age in the request (as an absolute
3071          * timestamp or a "seconds.usec ago") so the target can avoid doing
3072          * extra calls into the filesystem if that isn't necessary (e.g.
3073          * during mount that would help a bit).  Having relative timestamps
3074          * is not so great if request processing is slow, while absolute
3075          * timestamps are not ideal because they need time synchronization. */
3076         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3077                               OST_STATFS, 1, NULL, NULL);
3078         if (!req)
3079                 RETURN(-ENOMEM);
3080
3081         ptlrpc_req_set_repsize(req, 2, size);
3082         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3083
3084         rc = ptlrpc_queue_wait(req);
3085         if (rc)
3086                 GOTO(out, rc);
3087
3088         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3089                                   lustre_swab_obd_statfs);
3090         if (msfs == NULL) {
3091                 CERROR("Can't unpack obd_statfs\n");
3092                 GOTO(out, rc = -EPROTO);
3093         }
3094
3095         memcpy(osfs, msfs, sizeof(*osfs));
3096
3097         EXIT;
3098  out:
3099         ptlrpc_req_finished(req);
3100         return rc;
3101 }
3102
3103 /* Retrieve object striping information.
3104  *
3105  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3106  * the maximum number of OST indices which will fit in the user buffer.
3107  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3108  */
3109 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3110 {
3111         struct lov_user_md lum, *lumk;
3112         int rc = 0, lum_size;
3113         ENTRY;
3114
3115         if (!lsm)
3116                 RETURN(-ENODATA);
3117
3118         if (copy_from_user(&lum, lump, sizeof(lum)))
3119                 RETURN(-EFAULT);
3120
3121         if (lum.lmm_magic != LOV_USER_MAGIC)
3122                 RETURN(-EINVAL);
3123
3124         if (lum.lmm_stripe_count > 0) {
3125                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3126                 OBD_ALLOC(lumk, lum_size);
3127                 if (!lumk)
3128                         RETURN(-ENOMEM);
3129
3130                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3131         } else {
3132                 lum_size = sizeof(lum);
3133                 lumk = &lum;
3134         }
3135
3136         lumk->lmm_object_id = lsm->lsm_object_id;
3137         lumk->lmm_stripe_count = 1;
3138
3139         if (copy_to_user(lump, lumk, lum_size))
3140                 rc = -EFAULT;
3141
3142         if (lumk != &lum)
3143                 OBD_FREE(lumk, lum_size);
3144
3145         RETURN(rc);
3146 }
3147
3148
3149 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3150                          void *karg, void *uarg)
3151 {
3152         struct obd_device *obd = exp->exp_obd;
3153         struct obd_ioctl_data *data = karg;
3154         int err = 0;
3155         ENTRY;
3156
3157 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3158         MOD_INC_USE_COUNT;
3159 #else
3160         if (!try_module_get(THIS_MODULE)) {
3161                 CERROR("Can't get module. Is it alive?");
3162                 return -EINVAL;
3163         }
3164 #endif
3165         switch (cmd) {
3166         case OBD_IOC_LOV_GET_CONFIG: {
3167                 char *buf;
3168                 struct lov_desc *desc;
3169                 struct obd_uuid uuid;
3170
3171                 buf = NULL;
3172                 len = 0;
3173                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3174                         GOTO(out, err = -EINVAL);
3175
3176                 data = (struct obd_ioctl_data *)buf;
3177
3178                 if (sizeof(*desc) > data->ioc_inllen1) {
3179                         obd_ioctl_freedata(buf, len);
3180                         GOTO(out, err = -EINVAL);
3181                 }
3182
3183                 if (data->ioc_inllen2 < sizeof(uuid)) {
3184                         obd_ioctl_freedata(buf, len);
3185                         GOTO(out, err = -EINVAL);
3186                 }
3187
3188                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3189                 desc->ld_tgt_count = 1;
3190                 desc->ld_active_tgt_count = 1;
3191                 desc->ld_default_stripe_count = 1;
3192                 desc->ld_default_stripe_size = 0;
3193                 desc->ld_default_stripe_offset = 0;
3194                 desc->ld_pattern = 0;
3195                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3196
3197                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3198
3199                 err = copy_to_user((void *)uarg, buf, len);
3200                 if (err)
3201                         err = -EFAULT;
3202                 obd_ioctl_freedata(buf, len);
3203                 GOTO(out, err);
3204         }
3205         case LL_IOC_LOV_SETSTRIPE:
3206                 err = obd_alloc_memmd(exp, karg);
3207                 if (err > 0)
3208                         err = 0;
3209                 GOTO(out, err);
3210         case LL_IOC_LOV_GETSTRIPE:
3211                 err = osc_getstripe(karg, uarg);
3212                 GOTO(out, err);
3213         case OBD_IOC_CLIENT_RECOVER:
3214                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3215                                             data->ioc_inlbuf1);
3216                 if (err > 0)
3217                         err = 0;
3218                 GOTO(out, err);
3219         case IOC_OSC_SET_ACTIVE:
3220                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3221                                                data->ioc_offset);
3222                 GOTO(out, err);
3223         case OBD_IOC_POLL_QUOTACHECK:
3224                 err = lquota_poll_check(quota_interface, exp,
3225                                         (struct if_quotacheck *)karg);
3226                 GOTO(out, err);
3227         case OBD_IOC_DESTROY: {
3228                 struct obdo            *oa;
3229
3230                 if (!capable (CAP_SYS_ADMIN))
3231                         GOTO (out, err = -EPERM);
3232                 oa = &data->ioc_obdo1;
3233                 oa->o_valid |= OBD_MD_FLGROUP;
3234
3235                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3236                 GOTO(out, err);
3237         }
3238         default:
3239                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3240                        cmd, cfs_curproc_comm());
3241                 GOTO(out, err = -ENOTTY);
3242         }
3243 out:
3244 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3245         MOD_DEC_USE_COUNT;
3246 #else
3247         module_put(THIS_MODULE);
3248 #endif
3249         return err;
3250 }
3251
3252 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3253                         void *key, __u32 *vallen, void *val)
3254 {
3255         ENTRY;
3256         if (!vallen || !val)
3257                 RETURN(-EFAULT);
3258
3259         if (KEY_IS("lock_to_stripe")) {
3260                 __u32 *stripe = val;
3261                 *vallen = sizeof(*stripe);
3262                 *stripe = 0;
3263                 RETURN(0);
3264         } else if (KEY_IS("last_id")) {
3265                 struct ptlrpc_request *req;
3266                 obd_id *reply;
3267                 char *bufs[2] = { NULL, key };
3268                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3269
3270                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3271                                       OST_GET_INFO, 2, size, bufs);
3272                 if (req == NULL)
3273                         RETURN(-ENOMEM);
3274
3275                 size[REPLY_REC_OFF] = *vallen;
3276                 ptlrpc_req_set_repsize(req, 2, size);
3277                 rc = ptlrpc_queue_wait(req);
3278                 if (rc)
3279                         GOTO(out, rc);
3280
3281                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3282                                            lustre_swab_ost_last_id);
3283                 if (reply == NULL) {
3284                         CERROR("Can't unpack OST last ID\n");
3285                         GOTO(out, rc = -EPROTO);
3286                 }
3287                 *((obd_id *)val) = *reply;
3288         out:
3289                 ptlrpc_req_finished(req);
3290                 RETURN(rc);
3291         }
3292         RETURN(-EINVAL);
3293 }
3294
3295 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3296                                           void *aa, int rc)
3297 {
3298         struct llog_ctxt *ctxt;
3299         struct obd_import *imp = req->rq_import;
3300         ENTRY;
3301
3302         if (rc != 0)
3303                 RETURN(rc);
3304
3305         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3306         if (ctxt) {
3307                 if (rc == 0)
3308                         rc = llog_initiator_connect(ctxt);
3309                 else
3310                         CERROR("cannot establish connection for "
3311                                "ctxt %p: %d\n", ctxt, rc);
3312         }
3313
3314         llog_ctxt_put(ctxt);
3315         spin_lock(&imp->imp_lock);
3316         imp->imp_server_timeout = 1;
3317         imp->imp_pingable = 1;
3318         spin_unlock(&imp->imp_lock);
3319         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3320
3321         RETURN(rc);
3322 }
3323
3324 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3325                               void *key, obd_count vallen, void *val,
3326                               struct ptlrpc_request_set *set)
3327 {
3328         struct ptlrpc_request *req;
3329         struct obd_device  *obd = exp->exp_obd;
3330         struct obd_import *imp = class_exp2cliimp(exp);
3331         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3332         char *bufs[3] = { NULL, key, val };
3333         ENTRY;
3334
3335         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3336
3337         if (KEY_IS(KEY_NEXT_ID)) {
3338                 if (vallen != sizeof(obd_id))
3339                         RETURN(-EINVAL);
3340                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3341                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3342                        exp->exp_obd->obd_name,
3343                        obd->u.cli.cl_oscc.oscc_next_id);
3344
3345                 RETURN(0);
3346         }
3347
3348         if (KEY_IS("unlinked")) {
3349                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3350                 spin_lock(&oscc->oscc_lock);
3351                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3352                 spin_unlock(&oscc->oscc_lock);
3353                 RETURN(0);
3354         }
3355
3356         if (KEY_IS(KEY_INIT_RECOV)) {
3357                 if (vallen != sizeof(int))
3358                         RETURN(-EINVAL);
3359                 spin_lock(&imp->imp_lock);
3360                 imp->imp_initial_recov = *(int *)val;
3361                 spin_unlock(&imp->imp_lock);
3362                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3363                        exp->exp_obd->obd_name,
3364                        imp->imp_initial_recov);
3365                 RETURN(0);
3366         }
3367
3368         if (KEY_IS("checksum")) {
3369                 if (vallen != sizeof(int))
3370                         RETURN(-EINVAL);
3371                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3372                 RETURN(0);
3373         }
3374
3375         if (!set)
3376                 RETURN(-EINVAL);
3377
3378         /* We pass all other commands directly to OST. Since nobody calls osc
3379            methods directly and everybody is supposed to go through LOV, we
3380            assume lov checked invalid values for us.
3381            The only recognised values so far are evict_by_nid and mds_conn.
3382            Even if something bad goes through, we'd get a -EINVAL from OST
3383            anyway. */
3384
3385         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3386                               bufs);
3387         if (req == NULL)
3388                 RETURN(-ENOMEM);
3389
3390         if (KEY_IS(KEY_MDS_CONN))
3391                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3392
3393         ptlrpc_req_set_repsize(req, 1, NULL);
3394         ptlrpc_set_add_req(set, req);
3395         ptlrpc_check_set(set);
3396
3397         RETURN(0);
3398 }
3399
3400
3401 static struct llog_operations osc_size_repl_logops = {
3402         lop_cancel: llog_obd_repl_cancel
3403 };
3404
3405 static struct llog_operations osc_mds_ost_orig_logops;
3406 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3407                          int count, struct llog_catid *catid, 
3408                          struct obd_uuid *uuid)
3409 {
3410         int rc;
3411         ENTRY;
3412
3413         spin_lock(&obd->obd_dev_lock);
3414         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3415                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3416                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3417                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3418                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3419                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3420         }
3421         spin_unlock(&obd->obd_dev_lock);
3422
3423         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3424                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3425         if (rc) {
3426                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3427                 GOTO (out, rc);
3428         }
3429
3430         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3431                         &osc_size_repl_logops);
3432         if (rc) 
3433                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3434 out:
3435         if (rc) {
3436                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3437                        obd->obd_name, tgt->obd_name, count, catid, rc);
3438                 CERROR("logid "LPX64":0x%x\n",
3439                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3440         }
3441         RETURN(rc);
3442 }
3443
3444 static int osc_llog_finish(struct obd_device *obd, int count)
3445 {
3446         struct llog_ctxt *ctxt;
3447         int rc = 0, rc2 = 0;
3448         ENTRY;
3449
3450         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3451         if (ctxt)
3452                 rc = llog_cleanup(ctxt);
3453
3454         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3455         if (ctxt)
3456                 rc2 = llog_cleanup(ctxt);
3457         if (!rc)
3458                 rc = rc2;
3459
3460         RETURN(rc);
3461 }
3462
3463 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3464                          struct obd_uuid *cluuid,
3465                          struct obd_connect_data *data)
3466 {
3467         struct client_obd *cli = &obd->u.cli;
3468
3469         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3470                 long lost_grant;
3471
3472                 client_obd_list_lock(&cli->cl_loi_list_lock);
3473                 data->ocd_grant = cli->cl_avail_grant ?:
3474                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3475                 lost_grant = cli->cl_lost_grant;
3476                 cli->cl_lost_grant = 0;
3477                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3478
3479                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3480                        "cl_lost_grant: %ld\n", data->ocd_grant,
3481                        cli->cl_avail_grant, lost_grant);
3482                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3483                        " ocd_grant: %d\n", data->ocd_connect_flags,
3484                        data->ocd_version, data->ocd_grant);
3485         }
3486
3487         RETURN(0);
3488 }
3489
3490 static int osc_disconnect(struct obd_export *exp)
3491 {
3492         struct obd_device *obd = class_exp2obd(exp);
3493         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3494         int rc;
3495
3496         if (obd->u.cli.cl_conn_count == 1)
3497                 /* flush any remaining cancel messages out to the target */
3498                 llog_sync(ctxt, exp);
3499         
3500         llog_ctxt_put(ctxt);
3501
3502         rc = client_disconnect_export(exp);
3503         return rc;
3504 }
3505
3506 static int osc_import_event(struct obd_device *obd,
3507                             struct obd_import *imp,
3508                             enum obd_import_event event)
3509 {
3510         struct client_obd *cli;
3511         int rc = 0;
3512
3513         ENTRY;
3514         LASSERT(imp->imp_obd == obd);
3515
3516         switch (event) {
3517         case IMP_EVENT_DISCON: {
3518                 /* Only do this on the MDS OSC's */
3519                 if (imp->imp_server_timeout) {
3520                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3521
3522                         spin_lock(&oscc->oscc_lock);
3523                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3524                         spin_unlock(&oscc->oscc_lock);
3525                 }
3526                 cli = &obd->u.cli;
3527                 client_obd_list_lock(&cli->cl_loi_list_lock);
3528                 cli->cl_avail_grant = 0;
3529                 cli->cl_lost_grant = 0;
3530                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3531                 ptlrpc_import_setasync(imp, -1);
3532
3533                 break;
3534         }
3535         case IMP_EVENT_INACTIVE: {
3536                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3537                 break;
3538         }
3539         case IMP_EVENT_INVALIDATE: {
3540                 struct ldlm_namespace *ns = obd->obd_namespace;
3541
3542                 /* Reset grants */
3543                 cli = &obd->u.cli;
3544                 client_obd_list_lock(&cli->cl_loi_list_lock);
3545                 /* all pages go to failing rpcs due to the invalid import */
3546                 osc_check_rpcs(cli);
3547                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3548
3549                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3550
3551                 break;
3552         }
3553         case IMP_EVENT_ACTIVE: {
3554                 /* Only do this on the MDS OSC's */
3555                 if (imp->imp_server_timeout) {
3556                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3557
3558                         spin_lock(&oscc->oscc_lock);
3559                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3560                         spin_unlock(&oscc->oscc_lock);
3561                 }
3562                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3563                 break;
3564         }
3565         case IMP_EVENT_OCD: {
3566                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3567
3568                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3569                         osc_init_grant(&obd->u.cli, ocd);
3570
3571                 /* See bug 7198 */
3572                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3573                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3574
3575                 ptlrpc_import_setasync(imp, 1);
3576                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3577                 break;
3578         }
3579         default:
3580                 CERROR("Unknown import event %d\n", event);
3581                 LBUG();
3582         }
3583         RETURN(rc);
3584 }
3585
3586 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3587 {
3588         int rc;
3589         ENTRY;
3590
3591         ENTRY;
3592         rc = ptlrpcd_addref();
3593         if (rc)
3594                 RETURN(rc);
3595
3596         rc = client_obd_setup(obd, len, buf);
3597         if (rc) {
3598                 ptlrpcd_decref();
3599         } else {
3600                 struct lprocfs_static_vars lvars;
3601                 struct client_obd *cli = &obd->u.cli;
3602
3603                 lprocfs_init_vars(osc, &lvars);
3604                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3605                         lproc_osc_attach_seqstat(obd);
3606                         ptlrpc_lprocfs_register_obd(obd);
3607                 }
3608
3609                 oscc_init(obd);
3610                 /* We need to allocate a few requests more, because
3611                    brw_interpret_oap tries to create new requests before freeing
3612                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3613                    reserved, but I afraid that might be too much wasted RAM
3614                    in fact, so 2 is just my guess and still should work. */
3615                 cli->cl_import->imp_rq_pool =
3616                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3617                                             OST_MAXREQSIZE,
3618                                             ptlrpc_add_rqs_to_pool);
3619         }
3620
3621         RETURN(rc);
3622 }
3623
3624 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3625 {
3626         int rc = 0;
3627         ENTRY;
3628
3629         switch (stage) {
3630         case OBD_CLEANUP_EARLY: {
3631                 struct obd_import *imp;
3632                 imp = obd->u.cli.cl_import;
3633                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3634                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3635                 ptlrpc_deactivate_import(imp);
3636                 break;
3637         }
3638         case OBD_CLEANUP_EXPORTS: {
3639                 /* If we set up but never connected, the
3640                    client import will not have been cleaned. */
3641                 if (obd->u.cli.cl_import) {
3642                         struct obd_import *imp;
3643                         imp = obd->u.cli.cl_import;
3644                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3645                                obd->obd_name);
3646                         ptlrpc_invalidate_import(imp);
3647                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3648                         class_destroy_import(imp);
3649                         obd->u.cli.cl_import = NULL;
3650                 }
3651                 break;
3652         }
3653         case OBD_CLEANUP_SELF_EXP:
3654                 rc = obd_llog_finish(obd, 0);
3655                 if (rc != 0)
3656                         CERROR("failed to cleanup llogging subsystems\n");
3657                 break;
3658         case OBD_CLEANUP_OBD:
3659                 break;
3660         }
3661         RETURN(rc);
3662 }
3663
3664 int osc_cleanup(struct obd_device *obd)
3665 {
3666         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3667         int rc;
3668
3669         ENTRY;
3670         ptlrpc_lprocfs_unregister_obd(obd);
3671         lprocfs_obd_cleanup(obd);
3672
3673         spin_lock(&oscc->oscc_lock);
3674         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3675         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3676         spin_unlock(&oscc->oscc_lock);
3677
3678         /* free memory of osc quota cache */
3679         lquota_cleanup(quota_interface, obd);
3680
3681         rc = client_obd_cleanup(obd);
3682
3683         ptlrpcd_decref();
3684         RETURN(rc);
3685 }
3686
3687 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3688 {
3689         struct lustre_cfg *lcfg = buf;
3690         struct lprocfs_static_vars lvars;
3691         int rc = 0;
3692
3693         lprocfs_init_vars(osc, &lvars);
3694
3695         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3696         return(rc);
3697 }
3698
3699 struct obd_ops osc_obd_ops = {
3700         .o_owner                = THIS_MODULE,
3701         .o_setup                = osc_setup,
3702         .o_precleanup           = osc_precleanup,
3703         .o_cleanup              = osc_cleanup,
3704         .o_add_conn             = client_import_add_conn,
3705         .o_del_conn             = client_import_del_conn,
3706         .o_connect              = client_connect_import,
3707         .o_reconnect            = osc_reconnect,
3708         .o_disconnect           = osc_disconnect,
3709         .o_statfs               = osc_statfs,
3710         .o_statfs_async         = osc_statfs_async,
3711         .o_packmd               = osc_packmd,
3712         .o_unpackmd             = osc_unpackmd,
3713         .o_precreate            = osc_precreate,
3714         .o_create               = osc_create,
3715         .o_destroy              = osc_destroy,
3716         .o_getattr              = osc_getattr,
3717         .o_getattr_async        = osc_getattr_async,
3718         .o_setattr              = osc_setattr,
3719         .o_setattr_async        = osc_setattr_async,
3720         .o_brw                  = osc_brw,
3721         .o_brw_async            = osc_brw_async,
3722         .o_prep_async_page      = osc_prep_async_page,
3723         .o_queue_async_io       = osc_queue_async_io,
3724         .o_set_async_flags      = osc_set_async_flags,
3725         .o_queue_group_io       = osc_queue_group_io,
3726         .o_trigger_group_io     = osc_trigger_group_io,
3727         .o_teardown_async_page  = osc_teardown_async_page,
3728         .o_punch                = osc_punch,
3729         .o_sync                 = osc_sync,
3730         .o_enqueue              = osc_enqueue,
3731         .o_match                = osc_match,
3732         .o_change_cbdata        = osc_change_cbdata,
3733         .o_cancel               = osc_cancel,
3734         .o_cancel_unused        = osc_cancel_unused,
3735         .o_join_lru             = osc_join_lru,
3736         .o_iocontrol            = osc_iocontrol,
3737         .o_get_info             = osc_get_info,
3738         .o_set_info_async       = osc_set_info_async,
3739         .o_import_event         = osc_import_event,
3740         .o_llog_init            = osc_llog_init,
3741         .o_llog_finish          = osc_llog_finish,
3742         .o_process_config       = osc_process_config,
3743 };
3744 int __init osc_init(void)
3745 {
3746         struct lprocfs_static_vars lvars;
3747         int rc;
3748         ENTRY;
3749
3750         lprocfs_init_vars(osc, &lvars);
3751
3752         request_module("lquota");
3753         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3754         lquota_init(quota_interface);
3755         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3756
3757         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3758                                  LUSTRE_OSC_NAME);
3759         if (rc) {
3760                 if (quota_interface)
3761                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3762                 RETURN(rc);
3763         }
3764
3765         RETURN(rc);
3766 }
3767
3768 #ifdef __KERNEL__
3769 static void /*__exit*/ osc_exit(void)
3770 {
3771         lquota_exit(quota_interface);
3772         if (quota_interface)
3773                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3774
3775         class_unregister_type(LUSTRE_OSC_NAME);
3776 }
3777
3778 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3779 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3780 MODULE_LICENSE("GPL");
3781
3782 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3783 #endif