Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
68
69 /* by default 10s */
70 atomic_t osc_resend_time; 
71
72 /* Pack OSC object metadata for disk storage (LE byte order). */
73 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
74                       struct lov_stripe_md *lsm)
75 {
76         int lmm_size;
77         ENTRY;
78
79         lmm_size = sizeof(**lmmp);
80         if (!lmmp)
81                 RETURN(lmm_size);
82
83         if (*lmmp && !lsm) {
84                 OBD_FREE(*lmmp, lmm_size);
85                 *lmmp = NULL;
86                 RETURN(0);
87         }
88
89         if (!*lmmp) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (!*lmmp)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm) {
96                 LASSERT(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 LASSERT((*lsmp)->lsm_object_id);
151         }
152
153         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
154
155         RETURN(lsm_size);
156 }
157
158 static int osc_getattr_interpret(struct ptlrpc_request *req,
159                                  struct osc_async_args *aa, int rc)
160 {
161         struct ost_body *body;
162         ENTRY;
163
164         if (rc != 0)
165                 GOTO(out, rc);
166
167         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
168                                   lustre_swab_ost_body);
169         if (body) {
170                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
172
173                 /* This should really be sent by the OST */
174                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
175                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
176         } else {
177                 CERROR("can't unpack ost_body\n");
178                 rc = -EPROTO;
179                 aa->aa_oi->oi_oa->o_valid = 0;
180         }
181 out:
182         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
183         RETURN(rc);
184 }
185
186 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
187                              struct ptlrpc_request_set *set)
188 {
189         struct ptlrpc_request *req;
190         struct ost_body *body;
191         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
192         struct osc_async_args *aa;
193         ENTRY;
194
195         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
196                               OST_GETATTR, 2, size,NULL);
197         if (!req)
198                 RETURN(-ENOMEM);
199
200         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
201         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
202
203         ptlrpc_req_set_repsize(req, 2, size);
204         req->rq_interpret_reply = osc_getattr_interpret;
205
206         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
207         aa = (struct osc_async_args *)&req->rq_async_args;
208         aa->aa_oi = oinfo;
209
210         ptlrpc_set_add_req(set, req);
211         RETURN (0);
212 }
213
214 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
215 {
216         struct ptlrpc_request *req;
217         struct ost_body *body;
218         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219         ENTRY;
220
221         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
222                               OST_GETATTR, 2, size, NULL);
223         if (!req)
224                 RETURN(-ENOMEM);
225
226         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
227         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
228
229         ptlrpc_req_set_repsize(req, 2, size);
230
231         rc = ptlrpc_queue_wait(req);
232         if (rc) {
233                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
234                 GOTO(out, rc);
235         }
236
237         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
238                                   lustre_swab_ost_body);
239         if (body == NULL) {
240                 CERROR ("can't unpack ost_body\n");
241                 GOTO (out, rc = -EPROTO);
242         }
243
244         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
246
247         /* This should really be sent by the OST */
248         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
249         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
250
251         EXIT;
252  out:
253         ptlrpc_req_finished(req);
254         return rc;
255 }
256
257 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
258                        struct obd_trans_info *oti)
259 {
260         struct ptlrpc_request *req;
261         struct ost_body *body;
262         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
263         ENTRY;
264
265         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
266                               OST_SETATTR, 2, size, NULL);
267         if (!req)
268                 RETURN(-ENOMEM);
269
270         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
271         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
272
273         ptlrpc_req_set_repsize(req, 2, size);
274
275         rc = ptlrpc_queue_wait(req);
276         if (rc)
277                 GOTO(out, rc);
278
279         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
280                                   lustre_swab_ost_body);
281         if (body == NULL)
282                 GOTO(out, rc = -EPROTO);
283
284         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
285
286         EXIT;
287 out:
288         ptlrpc_req_finished(req);
289         RETURN(rc);
290 }
291
292 static int osc_setattr_interpret(struct ptlrpc_request *req,
293                                  struct osc_async_args *aa, int rc)
294 {
295         struct ost_body *body;
296         ENTRY;
297
298         if (rc != 0)
299                 GOTO(out, rc);
300
301         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
302                                   lustre_swab_ost_body);
303         if (body == NULL) {
304                 CERROR("can't unpack ost_body\n");
305                 GOTO(out, rc = -EPROTO);
306         }
307
308         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
309 out:
310         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
311         RETURN(rc);
312 }
313
314 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
315                              struct obd_trans_info *oti,
316                              struct ptlrpc_request_set *rqset)
317 {
318         struct ptlrpc_request *req;
319         struct ost_body *body;
320         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
321         struct osc_async_args *aa;
322         ENTRY;
323
324         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
325                               OST_SETATTR, 2, size, NULL);
326         if (!req)
327                 RETURN(-ENOMEM);
328
329         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
330
331         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
332                 LASSERT(oti);
333                 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
334                        sizeof(*oti->oti_logcookies));
335         }
336
337         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
338         ptlrpc_req_set_repsize(req, 2, size);
339         /* do mds to ost setattr asynchronouly */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req);
343         } else {
344                 req->rq_interpret_reply = osc_setattr_interpret;
345
346                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
347                 aa = (struct osc_async_args *)&req->rq_async_args;
348                 aa->aa_oi = oinfo;
349
350                 ptlrpc_set_add_req(rqset, req);
351         }
352
353         RETURN(0);
354 }
355
356 int osc_real_create(struct obd_export *exp, struct obdo *oa,
357                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
358 {
359         struct ptlrpc_request *req;
360         struct ost_body *body;
361         struct lov_stripe_md *lsm;
362         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
363         ENTRY;
364
365         LASSERT(oa);
366         LASSERT(ea);
367
368         lsm = *ea;
369         if (!lsm) {
370                 rc = obd_alloc_memmd(exp, &lsm);
371                 if (rc < 0)
372                         RETURN(rc);
373         }
374
375         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
376                               OST_CREATE, 2, size, NULL);
377         if (!req)
378                 GOTO(out, rc = -ENOMEM);
379
380         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
381         memcpy(&body->oa, oa, sizeof(body->oa));
382
383         ptlrpc_req_set_repsize(req, 2, size);
384         if (oa->o_valid & OBD_MD_FLINLINE) {
385                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
386                         oa->o_flags == OBD_FL_DELORPHAN);
387                 DEBUG_REQ(D_HA, req,
388                           "delorphan from OST integration");
389                 /* Don't resend the delorphan req */
390                 req->rq_no_resend = req->rq_no_delay = 1;
391         }
392
393         rc = ptlrpc_queue_wait(req);
394         if (rc)
395                 GOTO(out_req, rc);
396
397         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398                                   lustre_swab_ost_body);
399         if (body == NULL) {
400                 CERROR ("can't unpack ost_body\n");
401                 GOTO (out_req, rc = -EPROTO);
402         }
403
404         memcpy(oa, &body->oa, sizeof(*oa));
405
406         /* This should really be sent by the OST */
407         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408         oa->o_valid |= OBD_MD_FLBLKSZ;
409
410         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411          * have valid lsm_oinfo data structs, so don't go touching that.
412          * This needs to be fixed in a big way.
413          */
414         lsm->lsm_object_id = oa->o_id;
415         *ea = lsm;
416
417         if (oti != NULL) {
418                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
419
420                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421                         if (!oti->oti_logcookies)
422                                 oti_alloc_cookies(oti, 1);
423                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
424                                sizeof(oti->oti_onecookie));
425                 }
426         }
427
428         CDEBUG(D_HA, "transno: "LPD64"\n",
429                lustre_msg_get_transno(req->rq_repmsg));
430 out_req:
431         ptlrpc_req_finished(req);
432 out:
433         if (rc && !*ea)
434                 obd_free_memmd(exp, &lsm);
435         RETURN(rc);
436 }
437
438 static int osc_punch_interpret(struct ptlrpc_request *req,
439                                struct osc_async_args *aa, int rc)
440 {
441         struct ost_body *body;
442         ENTRY;
443
444         if (rc != 0)
445                 GOTO(out, rc);
446
447         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
448                                   lustre_swab_ost_body);
449         if (body == NULL) {
450                 CERROR ("can't unpack ost_body\n");
451                 GOTO(out, rc = -EPROTO);
452         }
453
454         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
455 out:
456         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
457         RETURN(rc);
458 }
459
460 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
461                      struct obd_trans_info *oti,
462                      struct ptlrpc_request_set *rqset)
463 {
464         struct ptlrpc_request *req;
465         struct osc_async_args *aa;
466         struct ost_body *body;
467         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
468         ENTRY;
469
470         if (!oinfo->oi_oa) {
471                 CERROR("oa NULL\n");
472                 RETURN(-EINVAL);
473         }
474
475         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
476                               OST_PUNCH, 2, size, NULL);
477         if (!req)
478                 RETURN(-ENOMEM);
479
480         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
481
482         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
484
485         /* overload the size and blocks fields in the oa with start/end */
486         body->oa.o_size = oinfo->oi_policy.l_extent.start;
487         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
489
490         ptlrpc_req_set_repsize(req, 2, size);
491
492         req->rq_interpret_reply = osc_punch_interpret;
493         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494         aa = (struct osc_async_args *)&req->rq_async_args;
495         aa->aa_oi = oinfo;
496         ptlrpc_set_add_req(rqset, req);
497
498         RETURN(0);
499 }
500
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502                     struct lov_stripe_md *md, obd_size start, obd_size end)
503 {
504         struct ptlrpc_request *req;
505         struct ost_body *body;
506         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
507         ENTRY;
508
509         if (!oa) {
510                 CERROR("oa NULL\n");
511                 RETURN(-EINVAL);
512         }
513
514         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515                               OST_SYNC, 2, size, NULL);
516         if (!req)
517                 RETURN(-ENOMEM);
518
519         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520         memcpy(&body->oa, oa, sizeof(*oa));
521
522         /* overload the size and blocks fields in the oa with start/end */
523         body->oa.o_size = start;
524         body->oa.o_blocks = end;
525         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
526
527         ptlrpc_req_set_repsize(req, 2, size);
528
529         rc = ptlrpc_queue_wait(req);
530         if (rc)
531                 GOTO(out, rc);
532
533         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534                                   lustre_swab_ost_body);
535         if (body == NULL) {
536                 CERROR ("can't unpack ost_body\n");
537                 GOTO (out, rc = -EPROTO);
538         }
539
540         memcpy(oa, &body->oa, sizeof(*oa));
541
542         EXIT;
543  out:
544         ptlrpc_req_finished(req);
545         return rc;
546 }
547
548 /* Find and cancel locally locks matched by @mode in the resource found by
549  * @objid. Found locks are added into @cancel list. Returns the amount of
550  * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552                                    struct list_head *cancels, ldlm_mode_t mode,
553                                    int lock_flags)
554 {
555         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556         struct ldlm_res_id res_id = { .name = { objid } };
557         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
558         int count;
559         ENTRY;
560
561         if (res == NULL)
562                 RETURN(0);
563
564         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565                                            lock_flags, 0, NULL);
566         ldlm_resource_putref(res);
567         RETURN(count);
568 }
569
570 /* Destroy requests can be async always on the client, and we don't even really
571  * care about the return code since the client cannot do anything at all about
572  * a destroy failure.
573  * When the MDS is unlinking a filename, it saves the file objects into a
574  * recovery llog, and these object records are cancelled when the OST reports
575  * they were destroyed and sync'd to disk (i.e. transaction committed).
576  * If the client dies, or the OST is down when the object should be destroyed,
577  * the records are not cancelled, and when the OST reconnects to the MDS next,
578  * it will retrieve the llog unlink logs and then sends the log cancellation
579  * cookies to the MDS after committing destroy transactions. */
580 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
581                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
582                        struct obd_export *md_export)
583 {
584         CFS_LIST_HEAD(cancels);
585         struct ptlrpc_request *req;
586         struct ost_body *body;
587         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
588         int count, bufcount = 2;
589         ENTRY;
590
591         if (!oa) {
592                 CERROR("oa NULL\n");
593                 RETURN(-EINVAL);
594         }
595
596         count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
597                                         LDLM_FL_DISCARD_DATA);
598         if (exp_connect_cancelset(exp) && count) {
599                 bufcount = 3;
600                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
601         }
602         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603                               OST_DESTROY, bufcount, size, NULL);
604         if (req)
605                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1);
606         else
607                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
608
609         if (!req)
610                 RETURN(-ENOMEM);
611
612         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
613
614         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
615
616         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
617                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
618                        sizeof(*oti->oti_logcookies));
619         }
620
621         memcpy(&body->oa, oa, sizeof(*oa));
622         ptlrpc_req_set_repsize(req, 2, size);
623
624         ptlrpcd_add_req(req);
625         RETURN(0);
626 }
627
628 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
629                                 long writing_bytes)
630 {
631         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
632
633         LASSERT(!(oa->o_valid & bits));
634
635         oa->o_valid |= bits;
636         client_obd_list_lock(&cli->cl_loi_list_lock);
637         oa->o_dirty = cli->cl_dirty;
638         if (cli->cl_dirty > cli->cl_dirty_max) {
639                 CERROR("dirty %lu > dirty_max %lu\n",
640                        cli->cl_dirty, cli->cl_dirty_max);
641                 oa->o_undirty = 0;
642         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
643                 CERROR("dirty %d > system dirty_max %d\n",
644                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
645                 oa->o_undirty = 0;
646         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
647                 CERROR("dirty %lu - dirty_max %lu too big???\n",
648                        cli->cl_dirty, cli->cl_dirty_max);
649                 oa->o_undirty = 0;
650         } else {
651                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
652                                 (cli->cl_max_rpcs_in_flight + 1);
653                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
654         }
655         oa->o_grant = cli->cl_avail_grant;
656         oa->o_dropped = cli->cl_lost_grant;
657         cli->cl_lost_grant = 0;
658         client_obd_list_unlock(&cli->cl_loi_list_lock);
659         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
660                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
661 }
662
663 /* caller must hold loi_list_lock */
664 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
665 {
666         atomic_inc(&obd_dirty_pages);
667         cli->cl_dirty += CFS_PAGE_SIZE;
668         cli->cl_avail_grant -= CFS_PAGE_SIZE;
669         pga->flag |= OBD_BRW_FROM_GRANT;
670         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
671                CFS_PAGE_SIZE, pga, pga->pg);
672         LASSERT(cli->cl_avail_grant >= 0);
673 }
674
675 /* the companion to osc_consume_write_grant, called when a brw has completed.
676  * must be called with the loi lock held. */
677 static void osc_release_write_grant(struct client_obd *cli,
678                                     struct brw_page *pga, int sent)
679 {
680         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
681         ENTRY;
682
683         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
684                 EXIT;
685                 return;
686         }
687
688         pga->flag &= ~OBD_BRW_FROM_GRANT;
689         atomic_dec(&obd_dirty_pages);
690         cli->cl_dirty -= CFS_PAGE_SIZE;
691         if (!sent) {
692                 cli->cl_lost_grant += CFS_PAGE_SIZE;
693                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
694                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
695         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
696                 /* For short writes we shouldn't count parts of pages that
697                  * span a whole block on the OST side, or our accounting goes
698                  * wrong.  Should match the code in filter_grant_check. */
699                 int offset = pga->off & ~CFS_PAGE_MASK;
700                 int count = pga->count + (offset & (blocksize - 1));
701                 int end = (offset + pga->count) & (blocksize - 1);
702                 if (end)
703                         count += blocksize - end;
704
705                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
706                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
707                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
708                        cli->cl_avail_grant, cli->cl_dirty);
709         }
710
711         EXIT;
712 }
713
714 static unsigned long rpcs_in_flight(struct client_obd *cli)
715 {
716         return cli->cl_r_in_flight + cli->cl_w_in_flight;
717 }
718
719 /* caller must hold loi_list_lock */
720 void osc_wake_cache_waiters(struct client_obd *cli)
721 {
722         struct list_head *l, *tmp;
723         struct osc_cache_waiter *ocw;
724
725         ENTRY;
726         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
727                 /* if we can't dirty more, we must wait until some is written */
728                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
729                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
730                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
731                                "osc max %ld, sys max %d\n", cli->cl_dirty,
732                                cli->cl_dirty_max, obd_max_dirty_pages);
733                         return;
734                 }
735
736                 /* if still dirty cache but no grant wait for pending RPCs that
737                  * may yet return us some grant before doing sync writes */
738                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
739                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
740                                cli->cl_w_in_flight);
741                         return;
742                 }
743
744                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
745                 list_del_init(&ocw->ocw_entry);
746                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
747                         /* no more RPCs in flight to return grant, do sync IO */
748                         ocw->ocw_rc = -EDQUOT;
749                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
750                 } else {
751                         osc_consume_write_grant(cli,
752                                                 &ocw->ocw_oap->oap_brw_page);
753                 }
754
755                 cfs_waitq_signal(&ocw->ocw_waitq);
756         }
757
758         EXIT;
759 }
760
761 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
762 {
763         client_obd_list_lock(&cli->cl_loi_list_lock);
764         cli->cl_avail_grant = ocd->ocd_grant;
765         client_obd_list_unlock(&cli->cl_loi_list_lock);
766
767         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
768                cli->cl_avail_grant, cli->cl_lost_grant);
769         LASSERT(cli->cl_avail_grant >= 0);
770 }
771
772 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
773 {
774         client_obd_list_lock(&cli->cl_loi_list_lock);
775         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
776         cli->cl_avail_grant += body->oa.o_grant;
777         /* waiters are woken in brw_interpret_oap */
778         client_obd_list_unlock(&cli->cl_loi_list_lock);
779 }
780
781 /* We assume that the reason this OSC got a short read is because it read
782  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
783  * via the LOV, and it _knows_ it's reading inside the file, it's just that
784  * this stripe never got written at or beyond this stripe offset yet. */
785 static void handle_short_read(int nob_read, obd_count page_count,
786                               struct brw_page **pga)
787 {
788         char *ptr;
789         int i = 0;
790
791         /* skip bytes read OK */
792         while (nob_read > 0) {
793                 LASSERT (page_count > 0);
794
795                 if (pga[i]->count > nob_read) {
796                         /* EOF inside this page */
797                         ptr = cfs_kmap(pga[i]->pg) + 
798                                 (pga[i]->off & ~CFS_PAGE_MASK);
799                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
800                         cfs_kunmap(pga[i]->pg);
801                         page_count--;
802                         i++;
803                         break;
804                 }
805
806                 nob_read -= pga[i]->count;
807                 page_count--;
808                 i++;
809         }
810
811         /* zero remaining pages */
812         while (page_count-- > 0) {
813                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
814                 memset(ptr, 0, pga[i]->count);
815                 cfs_kunmap(pga[i]->pg);
816                 i++;
817         }
818 }
819
820 static int check_write_rcs(struct ptlrpc_request *req,
821                            int requested_nob, int niocount,
822                            obd_count page_count, struct brw_page **pga)
823 {
824         int    *remote_rcs, i;
825
826         /* return error if any niobuf was in error */
827         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
828                                         sizeof(*remote_rcs) * niocount, NULL);
829         if (remote_rcs == NULL) {
830                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
831                 return(-EPROTO);
832         }
833         if (lustre_msg_swabbed(req->rq_repmsg))
834                 for (i = 0; i < niocount; i++)
835                         __swab32s(&remote_rcs[i]);
836
837         for (i = 0; i < niocount; i++) {
838                 if (remote_rcs[i] < 0)
839                         return(remote_rcs[i]);
840
841                 if (remote_rcs[i] != 0) {
842                         CERROR("rc[%d] invalid (%d) req %p\n",
843                                 i, remote_rcs[i], req);
844                         return(-EPROTO);
845                 }
846         }
847
848         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
849                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
850                        requested_nob, req->rq_bulk->bd_nob_transferred);
851                 return(-EPROTO);
852         }
853
854         return (0);
855 }
856
857 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
858 {
859         if (p1->flag != p2->flag) {
860                 unsigned mask = ~OBD_BRW_FROM_GRANT;
861
862                 /* warn if we try to combine flags that we don't know to be
863                  * safe to combine */
864                 if ((p1->flag & mask) != (p2->flag & mask))
865                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
866                                "same brw?\n", p1->flag, p2->flag);
867                 return 0;
868         }
869
870         return (p1->off + p1->count == p2->off);
871 }
872
873 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
874                                    struct brw_page **pga)
875 {
876         __u32 cksum = ~0;
877         int i = 0;
878
879         LASSERT (pg_count > 0);
880         while (nob > 0 && pg_count > 0) {
881                 char *ptr = cfs_kmap(pga[i]->pg);
882                 int off = pga[i]->off & ~CFS_PAGE_MASK;
883                 int count = pga[i]->count > nob ? nob : pga[i]->count;
884
885                 /* corrupt the data before we compute the checksum, to
886                  * simulate an OST->client data error */
887                 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
888                         memcpy(ptr + off, "bad1", min(4, nob));
889                 cksum = crc32_le(cksum, ptr + off, count);
890                 cfs_kunmap(pga[i]->pg);
891                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
892                                off, cksum);
893
894                 nob -= pga[i]->count;
895                 pg_count--;
896                 i++;
897         }
898         /* For sending we only compute the wrong checksum instead
899          * of corrupting the data so it is still correct on a redo */
900         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
901                 cksum++;
902
903         return cksum;
904 }
905
906 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
907                                 struct lov_stripe_md *lsm, obd_count page_count,
908                                 struct brw_page **pga,
909                                 struct ptlrpc_request **reqp)
910 {
911         struct ptlrpc_request   *req;
912         struct ptlrpc_bulk_desc *desc;
913         struct ost_body         *body;
914         struct obd_ioobj        *ioobj;
915         struct niobuf_remote    *niobuf;
916         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
917         int niocount, i, requested_nob, opc, rc;
918         struct ptlrpc_request_pool *pool;
919         struct osc_brw_async_args *aa;
920
921         ENTRY;
922         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
923         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
924
925         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
926         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
927
928         for (niocount = i = 1; i < page_count; i++) {
929                 if (!can_merge_pages(pga[i - 1], pga[i]))
930                         niocount++;
931         }
932
933         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
934         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
935
936         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
937                                    NULL, pool);
938         if (req == NULL)
939                 RETURN (-ENOMEM);
940
941         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
942
943         if (opc == OST_WRITE)
944                 desc = ptlrpc_prep_bulk_imp (req, page_count,
945                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
946         else
947                 desc = ptlrpc_prep_bulk_imp (req, page_count,
948                                              BULK_PUT_SINK, OST_BULK_PORTAL);
949         if (desc == NULL)
950                 GOTO(out, rc = -ENOMEM);
951         /* NB request now owns desc and will free it when it gets freed */
952
953         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
954         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
955         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
956                                 niocount * sizeof(*niobuf));
957
958         memcpy(&body->oa, oa, sizeof(*oa));
959
960         obdo_to_ioobj(oa, ioobj);
961         ioobj->ioo_bufcnt = niocount;
962
963         LASSERT (page_count > 0);
964         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
965                 struct brw_page *pg = pga[i];
966                 struct brw_page *pg_prev = pga[i - 1];
967
968                 LASSERT(pg->count > 0);
969                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
970                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
971                          pg->off, pg->count);
972 #ifdef __LINUX__
973                 LASSERTF(i == 0 || pg->off > pg_prev->off,
974                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
975                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
976                          i, page_count,
977                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
978                          pg_prev->pg, page_private(pg_prev->pg),
979                          pg_prev->pg->index, pg_prev->off);
980 #else
981                 LASSERTF(i == 0 || pg->off > pg_prev->off,
982                          "i %d p_c %u\n", i, page_count);
983 #endif
984                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
985                         (pg->flag & OBD_BRW_SRVLOCK));
986
987                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
988                                       pg->count);
989                 requested_nob += pg->count;
990
991                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
992                         niobuf--;
993                         niobuf->len += pg->count;
994                 } else {
995                         niobuf->offset = pg->off;
996                         niobuf->len    = pg->count;
997                         niobuf->flags  = pg->flag;
998                 }
999         }
1000
1001         LASSERT((void *)(niobuf - niocount) ==
1002                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1003                                niocount * sizeof(*niobuf)));
1004         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1005
1006         /* size[REQ_REC_OFF] still sizeof (*body) */
1007         if (opc == OST_WRITE) {
1008                 if (unlikely(cli->cl_checksum)) {
1009                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1010                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1011                                                              page_count, pga);
1012                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1013                                body->oa.o_cksum);
1014                         /* save this in 'oa', too, for later checking */
1015                         oa->o_valid |= OBD_MD_FLCKSUM;
1016                 } else {
1017                         /* clear out the checksum flag, in case this is a
1018                          * resend but cl_checksum is no longer set. b=11238 */
1019                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1020                 }
1021                 oa->o_cksum = body->oa.o_cksum;
1022                 /* 1 RC per niobuf */
1023                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1024                 ptlrpc_req_set_repsize(req, 3, size);
1025         } else {
1026                 if (unlikely(cli->cl_checksum))
1027                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1028                 /* 1 RC for the whole I/O */
1029                 ptlrpc_req_set_repsize(req, 2, size);
1030         }
1031
1032         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1033         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1034         aa->aa_oa = oa;
1035         aa->aa_requested_nob = requested_nob;
1036         aa->aa_nio_count = niocount;
1037         aa->aa_page_count = page_count;
1038         aa->aa_start_send = cfs_time_current();
1039         aa->aa_ppga = pga;
1040         aa->aa_cli = cli;
1041         INIT_LIST_HEAD(&aa->aa_oaps);
1042
1043         *reqp = req;
1044         RETURN (0);
1045
1046  out:
1047         ptlrpc_req_finished (req);
1048         RETURN (rc);
1049 }
1050
1051 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1052                                  __u32 client_cksum, __u32 server_cksum, int nob,
1053                                  obd_count page_count, struct brw_page **pga)
1054 {
1055         __u32 new_cksum;
1056         char *msg;
1057
1058         if (server_cksum == client_cksum) {
1059                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1060                 return 0;
1061         }
1062
1063         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1064
1065         if (new_cksum == server_cksum)
1066                 msg = "changed on the client after we checksummed it - "
1067                       "likely false positive due to mmap IO (bug 11742)";
1068         else if (new_cksum == client_cksum)
1069                 msg = "changed in transit before arrival at OST";
1070         else
1071                 msg = "changed in transit AND doesn't match the original - "
1072                       "likely false positive due to mmap IO (bug 11742)";
1073
1074         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1075                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1076                            "["LPU64"-"LPU64"]\n",
1077                            msg, libcfs_nid2str(peer->nid),
1078                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1079                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1080                                                         (__u64)0,
1081                            oa->o_id,
1082                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1083                            pga[0]->off,
1084                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1085         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1086                client_cksum, server_cksum, new_cksum);
1087
1088         return 1;
1089 }
1090
1091 /* Note rc enters this function as number of bytes transferred */
1092 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1093 {
1094         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1095         const lnet_process_id_t *peer =
1096                         &req->rq_import->imp_connection->c_peer;
1097         struct client_obd *cli = aa->aa_cli;
1098         struct ost_body *body;
1099         __u32 client_cksum = 0;
1100         ENTRY;
1101
1102         if (rc < 0 && rc != -EDQUOT)
1103                 RETURN(rc);
1104
1105         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1106         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1107                                   lustre_swab_ost_body);
1108         if (body == NULL) {
1109                 CERROR ("Can't unpack body\n");
1110                 RETURN(-EPROTO);
1111         }
1112
1113         /* set/clear over quota flag for a uid/gid */
1114         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1115             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1116                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1117                              body->oa.o_gid, body->oa.o_valid,
1118                              body->oa.o_flags);
1119
1120         if (rc < 0)
1121                 RETURN(rc);
1122
1123         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1124                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1125
1126         osc_update_grant(cli, body);
1127
1128         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1129                 if (rc > 0) {
1130                         CERROR ("Unexpected +ve rc %d\n", rc);
1131                         RETURN(-EPROTO);
1132                 }
1133                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1134
1135                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1136                              client_cksum &&
1137                              check_write_checksum(&body->oa, peer, client_cksum,
1138                                                   body->oa.o_cksum,
1139                                                   aa->aa_requested_nob,
1140                                                   aa->aa_page_count,
1141                                                   aa->aa_ppga)))
1142                         RETURN(-EAGAIN);
1143
1144                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1145                                      aa->aa_page_count, aa->aa_ppga);
1146                 GOTO(out, rc);
1147         }
1148
1149         /* The rest of this function executes only for OST_READs */
1150         if (rc > aa->aa_requested_nob) {
1151                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1152                        aa->aa_requested_nob);
1153                 RETURN(-EPROTO);
1154         }
1155
1156         if (rc != req->rq_bulk->bd_nob_transferred) {
1157                 CERROR ("Unexpected rc %d (%d transferred)\n",
1158                         rc, req->rq_bulk->bd_nob_transferred);
1159                 return (-EPROTO);
1160         }
1161
1162         if (rc < aa->aa_requested_nob)
1163                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1164
1165         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1166                 static int cksum_counter;
1167                 __u32      server_cksum = body->oa.o_cksum;
1168                 char      *via;
1169                 char      *router;
1170
1171                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1172                                                  aa->aa_ppga);
1173
1174                 if (peer->nid == req->rq_bulk->bd_sender) {
1175                         via = router = "";
1176                 } else {
1177                         via = " via ";
1178                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1179                 }
1180                 
1181                 if (server_cksum == ~0 && rc > 0) {
1182                         CERROR("Protocol error: server %s set the 'checksum' "
1183                                "bit, but didn't send a checksum.  Not fatal, "
1184                                "but please tell CFS.\n",
1185                                libcfs_nid2str(peer->nid));
1186                 } else if (server_cksum != client_cksum) {
1187                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1188                                            "%s%s%s inum "LPU64"/"LPU64" object "
1189                                            LPU64"/"LPU64" extent "
1190                                            "["LPU64"-"LPU64"]\n",
1191                                            req->rq_import->imp_obd->obd_name,
1192                                            libcfs_nid2str(peer->nid),
1193                                            via, router,
1194                                            body->oa.o_valid & OBD_MD_FLFID ?
1195                                                 body->oa.o_fid : (__u64)0,
1196                                            body->oa.o_valid & OBD_MD_FLFID ?
1197                                                 body->oa.o_generation :(__u64)0,
1198                                            body->oa.o_id,
1199                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1200                                                 body->oa.o_gr : (__u64)0,
1201                                            aa->aa_ppga[0]->off,
1202                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1203                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1204                                                                         1);
1205                         CERROR("client %x, server %x\n",
1206                                client_cksum, server_cksum);
1207                         cksum_counter = 0;
1208                         aa->aa_oa->o_cksum = client_cksum;
1209                         rc = -EAGAIN;
1210                 } else {
1211                         cksum_counter++;
1212                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1213                         rc = 0;
1214                 }
1215         } else if (unlikely(client_cksum)) {
1216                 static int cksum_missed;
1217
1218                 cksum_missed++;
1219                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1220                         CERROR("Checksum %u requested from %s but not sent\n",
1221                                cksum_missed, libcfs_nid2str(peer->nid));
1222         } else {
1223                 rc = 0;
1224         }
1225 out:
1226         if (rc >= 0)
1227                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1228
1229         RETURN(rc);
1230 }
1231
1232 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1233                             struct lov_stripe_md *lsm,
1234                             obd_count page_count, struct brw_page **pga)
1235 {
1236         struct ptlrpc_request *request;
1237         int                    rc;
1238         cfs_time_t             start_send = cfs_time_current();
1239         ENTRY;
1240
1241 restart_bulk:
1242         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1243                                   page_count, pga, &request);
1244         if (rc != 0)
1245                 return (rc);
1246
1247         rc = ptlrpc_queue_wait(request);
1248
1249         if (rc == -ETIMEDOUT && request->rq_resend) {
1250                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1251                 ptlrpc_req_finished(request);
1252                 goto restart_bulk;
1253         }
1254
1255         rc = osc_brw_fini_request(request, rc);
1256
1257         ptlrpc_req_finished(request);
1258         if (osc_recoverable_error(rc)) {
1259                 if (!osc_should_resend(start_send)) {
1260                         CERROR("too many resend retries, returning error\n");
1261                         RETURN(-EIO);
1262                 }
1263                 goto restart_bulk;
1264         }
1265         RETURN(rc);
1266 }
1267
1268 int osc_brw_redo_request(struct ptlrpc_request *request,
1269                          struct osc_brw_async_args *aa)
1270 {
1271         struct ptlrpc_request *new_req;
1272         struct ptlrpc_request_set *set = request->rq_set;
1273         struct osc_brw_async_args *new_aa;
1274         struct osc_async_page *oap;
1275         int rc = 0;
1276         ENTRY;
1277
1278         if (!osc_should_resend(aa->aa_start_send)) {
1279                 CERROR("too many resend retries, returning error\n");
1280                 RETURN(-EIO);
1281         }
1282
1283         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1284         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1285                 if (oap->oap_request != NULL) {
1286                         LASSERTF(request == oap->oap_request,
1287                                  "request %p != oap_request %p\n",
1288                                  request, oap->oap_request);
1289                         if (oap->oap_interrupted) {
1290                                 ptlrpc_mark_interrupted(oap->oap_request);
1291                                 rc = -EINTR;
1292                                 break;
1293                         }
1294                 }
1295         }
1296         if (rc)
1297                 RETURN(rc);
1298
1299         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1300                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1301                                   aa->aa_cli, aa->aa_oa,
1302                                   NULL /* lsm unused by osc currently */,
1303                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1304         if (rc)
1305                 RETURN(rc);
1306
1307         /* New request takes over pga and oaps from old request.
1308          * Note that copying a list_head doesn't work, need to move it... */
1309         new_req->rq_interpret_reply = request->rq_interpret_reply;
1310         new_req->rq_async_args = request->rq_async_args;
1311         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1312         INIT_LIST_HEAD(&new_aa->aa_oaps);
1313         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1314         INIT_LIST_HEAD(&aa->aa_oaps);
1315
1316         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1317                 if (oap->oap_request) {
1318                         ptlrpc_req_finished(oap->oap_request);
1319                         oap->oap_request = ptlrpc_request_addref(new_req);
1320                 }
1321         }
1322
1323         ptlrpc_set_add_req(set, new_req);
1324
1325         RETURN(0);
1326 }
1327
1328 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1329 {
1330         struct osc_brw_async_args *aa = data;
1331         int                        i;
1332         int                        nob = rc;
1333         ENTRY;
1334
1335         rc = osc_brw_fini_request(request, rc);
1336         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);   
1337         if (osc_recoverable_error(rc)) {
1338                 rc = osc_brw_redo_request(request, aa);
1339                 if (rc == 0)
1340                         RETURN(0);
1341         }
1342         if ((rc >= 0) && request->rq_set && request->rq_set->set_countp)
1343                 atomic_add(nob, (atomic_t *)request->rq_set->set_countp);
1344
1345         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1346         for (i = 0; i < aa->aa_page_count; i++)
1347                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1348         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1349
1350         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1351
1352         RETURN(rc);
1353 }
1354
1355 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1356                           struct lov_stripe_md *lsm, obd_count page_count,
1357                           struct brw_page **pga, struct ptlrpc_request_set *set)
1358 {
1359         struct ptlrpc_request     *request;
1360         struct client_obd         *cli = &exp->exp_obd->u.cli;
1361         int                        rc, i;
1362         ENTRY;
1363
1364         /* Consume write credits even if doing a sync write -
1365          * otherwise we may run out of space on OST due to grant. */
1366         if (cmd == OBD_BRW_WRITE) {
1367                 spin_lock(&cli->cl_loi_list_lock);
1368                 for (i = 0; i < page_count; i++) {
1369                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1370                                 osc_consume_write_grant(cli, pga[i]);
1371                 }
1372                 spin_unlock(&cli->cl_loi_list_lock);
1373         }
1374
1375         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1376                                   page_count, pga, &request);
1377
1378         if (rc == 0) {
1379                 request->rq_interpret_reply = brw_interpret;
1380                 ptlrpc_set_add_req(set, request);
1381         } else if (cmd == OBD_BRW_WRITE) {
1382                 spin_lock(&cli->cl_loi_list_lock);
1383                 for (i = 0; i < page_count; i++)
1384                         osc_release_write_grant(cli, pga[i], 0);
1385                 spin_unlock(&cli->cl_loi_list_lock);
1386         }
1387
1388         RETURN (rc);
1389 }
1390
1391 /*
1392  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1393  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1394  * fine for our small page arrays and doesn't require allocation.  its an
1395  * insertion sort that swaps elements that are strides apart, shrinking the
1396  * stride down until its '1' and the array is sorted.
1397  */
1398 static void sort_brw_pages(struct brw_page **array, int num)
1399 {
1400         int stride, i, j;
1401         struct brw_page *tmp;
1402
1403         if (num == 1)
1404                 return;
1405         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1406                 ;
1407
1408         do {
1409                 stride /= 3;
1410                 for (i = stride ; i < num ; i++) {
1411                         tmp = array[i];
1412                         j = i;
1413                         while (j >= stride && array[j-stride]->off > tmp->off) {
1414                                 array[j] = array[j - stride];
1415                                 j -= stride;
1416                         }
1417                         array[j] = tmp;
1418                 }
1419         } while (stride > 1);
1420 }
1421
1422 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1423 {
1424         int count = 1;
1425         int offset;
1426         int i = 0;
1427
1428         LASSERT (pages > 0);
1429         offset = pg[i]->off & (~CFS_PAGE_MASK);
1430
1431         for (;;) {
1432                 pages--;
1433                 if (pages == 0)         /* that's all */
1434                         return count;
1435
1436                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1437                         return count;   /* doesn't end on page boundary */
1438
1439                 i++;
1440                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1441                 if (offset != 0)        /* doesn't start on page boundary */
1442                         return count;
1443
1444                 count++;
1445         }
1446 }
1447
1448 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1449 {
1450         struct brw_page **ppga;
1451         int i;
1452
1453         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1454         if (ppga == NULL)
1455                 return NULL;
1456
1457         for (i = 0; i < count; i++)
1458                 ppga[i] = pga + i;
1459         return ppga;
1460 }
1461
1462 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1463 {
1464         LASSERT(ppga != NULL);
1465         OBD_FREE(ppga, sizeof(*ppga) * count);
1466 }
1467
1468 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1469                    obd_count page_count, struct brw_page *pga,
1470                    struct obd_trans_info *oti)
1471 {
1472         struct obdo *saved_oa = NULL;
1473         struct brw_page **ppga, **orig;
1474         struct obd_import *imp = class_exp2cliimp(exp);
1475         struct client_obd *cli = &imp->imp_obd->u.cli;
1476         int rc, page_count_orig;
1477         ENTRY;
1478
1479         if (cmd & OBD_BRW_CHECK) {
1480                 /* The caller just wants to know if there's a chance that this
1481                  * I/O can succeed */
1482
1483                 if (imp == NULL || imp->imp_invalid)
1484                         RETURN(-EIO);
1485                 RETURN(0);
1486         }
1487
1488         /* test_brw with a failed create can trip this, maybe others. */
1489         LASSERT(cli->cl_max_pages_per_rpc);
1490
1491         rc = 0;
1492
1493         orig = ppga = osc_build_ppga(pga, page_count);
1494         if (ppga == NULL)
1495                 RETURN(-ENOMEM);
1496         page_count_orig = page_count;
1497
1498         sort_brw_pages(ppga, page_count);
1499         while (page_count) {
1500                 obd_count pages_per_brw;
1501
1502                 if (page_count > cli->cl_max_pages_per_rpc)
1503                         pages_per_brw = cli->cl_max_pages_per_rpc;
1504                 else
1505                         pages_per_brw = page_count;
1506
1507                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1508
1509                 if (saved_oa != NULL) {
1510                         /* restore previously saved oa */
1511                         *oinfo->oi_oa = *saved_oa;
1512                 } else if (page_count > pages_per_brw) {
1513                         /* save a copy of oa (brw will clobber it) */
1514                         OBDO_ALLOC(saved_oa);
1515                         if (saved_oa == NULL)
1516                                 GOTO(out, rc = -ENOMEM);
1517                         *saved_oa = *oinfo->oi_oa;
1518                 }
1519
1520                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1521                                       pages_per_brw, ppga);
1522
1523                 if (rc != 0)
1524                         break;
1525
1526                 page_count -= pages_per_brw;
1527                 ppga += pages_per_brw;
1528         }
1529
1530 out:
1531         osc_release_ppga(orig, page_count_orig);
1532
1533         if (saved_oa != NULL)
1534                 OBDO_FREE(saved_oa);
1535
1536         RETURN(rc);
1537 }
1538
1539 static int osc_brw_async(int cmd, struct obd_export *exp,
1540                          struct obd_info *oinfo, obd_count page_count,
1541                          struct brw_page *pga, struct obd_trans_info *oti,
1542                          struct ptlrpc_request_set *set)
1543 {
1544         struct brw_page **ppga, **orig;
1545         int page_count_orig;
1546         int rc = 0;
1547         ENTRY;
1548
1549         if (cmd & OBD_BRW_CHECK) {
1550                 /* The caller just wants to know if there's a chance that this
1551                  * I/O can succeed */
1552                 struct obd_import *imp = class_exp2cliimp(exp);
1553
1554                 if (imp == NULL || imp->imp_invalid)
1555                         RETURN(-EIO);
1556                 RETURN(0);
1557         }
1558
1559         orig = ppga = osc_build_ppga(pga, page_count);
1560         if (ppga == NULL)
1561                 RETURN(-ENOMEM);
1562         page_count_orig = page_count;
1563
1564         sort_brw_pages(ppga, page_count);
1565         while (page_count) {
1566                 struct brw_page **copy;
1567                 obd_count pages_per_brw;
1568
1569                 pages_per_brw = min_t(obd_count, page_count,
1570                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1571
1572                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1573
1574                 /* use ppga only if single RPC is going to fly */
1575                 if (pages_per_brw != page_count_orig || ppga != orig) {
1576                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1577                         if (copy == NULL)
1578                                 GOTO(out, rc = -ENOMEM);
1579                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1580                 } else
1581                         copy = ppga;
1582
1583                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1584                                     pages_per_brw, copy, set);
1585
1586                 if (rc != 0) {
1587                         if (copy != ppga)
1588                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1589                         break;
1590                 }
1591
1592                 if (copy == orig) {
1593                         /* we passed it to async_internal() which is
1594                          * now responsible for releasing memory */
1595                         orig = NULL;
1596                 }
1597
1598                 page_count -= pages_per_brw;
1599                 ppga += pages_per_brw;
1600         }
1601 out:
1602         if (orig)
1603                 osc_release_ppga(orig, page_count_orig);
1604         RETURN(rc);
1605 }
1606
1607 static void osc_check_rpcs(struct client_obd *cli);
1608
1609 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1610  * the dirty accounting.  Writeback completes or truncate happens before
1611  * writing starts.  Must be called with the loi lock held. */
1612 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1613                            int sent)
1614 {
1615         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1616 }
1617
1618 /* This maintains the lists of pending pages to read/write for a given object
1619  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1620  * to quickly find objects that are ready to send an RPC. */
1621 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1622                          int cmd)
1623 {
1624         int optimal;
1625         ENTRY;
1626
1627         if (lop->lop_num_pending == 0)
1628                 RETURN(0);
1629
1630         /* if we have an invalid import we want to drain the queued pages
1631          * by forcing them through rpcs that immediately fail and complete
1632          * the pages.  recovery relies on this to empty the queued pages
1633          * before canceling the locks and evicting down the llite pages */
1634         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1635                 RETURN(1);
1636
1637         /* stream rpcs in queue order as long as as there is an urgent page
1638          * queued.  this is our cheap solution for good batching in the case
1639          * where writepage marks some random page in the middle of the file
1640          * as urgent because of, say, memory pressure */
1641         if (!list_empty(&lop->lop_urgent)) {
1642                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1643                 RETURN(1);
1644         }
1645
1646         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1647         optimal = cli->cl_max_pages_per_rpc;
1648         if (cmd & OBD_BRW_WRITE) {
1649                 /* trigger a write rpc stream as long as there are dirtiers
1650                  * waiting for space.  as they're waiting, they're not going to
1651                  * create more pages to coallesce with what's waiting.. */
1652                 if (!list_empty(&cli->cl_cache_waiters)) {
1653                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1654                         RETURN(1);
1655                 }
1656
1657                 /* +16 to avoid triggering rpcs that would want to include pages
1658                  * that are being queued but which can't be made ready until
1659                  * the queuer finishes with the page. this is a wart for
1660                  * llite::commit_write() */
1661                 optimal += 16;
1662         }
1663         if (lop->lop_num_pending >= optimal)
1664                 RETURN(1);
1665
1666         RETURN(0);
1667 }
1668
1669 static void on_list(struct list_head *item, struct list_head *list,
1670                     int should_be_on)
1671 {
1672         if (list_empty(item) && should_be_on)
1673                 list_add_tail(item, list);
1674         else if (!list_empty(item) && !should_be_on)
1675                 list_del_init(item);
1676 }
1677
1678 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1679  * can find pages to build into rpcs quickly */
1680 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1681 {
1682         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1683                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1684                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1685
1686         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1687                 loi->loi_write_lop.lop_num_pending);
1688
1689         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1690                 loi->loi_read_lop.lop_num_pending);
1691 }
1692
1693 static void lop_update_pending(struct client_obd *cli,
1694                                struct loi_oap_pages *lop, int cmd, int delta)
1695 {
1696         lop->lop_num_pending += delta;
1697         if (cmd & OBD_BRW_WRITE)
1698                 cli->cl_pending_w_pages += delta;
1699         else
1700                 cli->cl_pending_r_pages += delta;
1701 }
1702
1703 /* this is called when a sync waiter receives an interruption.  Its job is to
1704  * get the caller woken as soon as possible.  If its page hasn't been put in an
1705  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1706  * desiring interruption which will forcefully complete the rpc once the rpc
1707  * has timed out */
1708 static void osc_occ_interrupted(struct oig_callback_context *occ)
1709 {
1710         struct osc_async_page *oap;
1711         struct loi_oap_pages *lop;
1712         struct lov_oinfo *loi;
1713         ENTRY;
1714
1715         /* XXX member_of() */
1716         oap = list_entry(occ, struct osc_async_page, oap_occ);
1717
1718         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1719
1720         oap->oap_interrupted = 1;
1721
1722         /* ok, it's been put in an rpc. only one oap gets a request reference */
1723         if (oap->oap_request != NULL) {
1724                 ptlrpc_mark_interrupted(oap->oap_request);
1725                 ptlrpcd_wake(oap->oap_request);
1726                 GOTO(unlock, 0);
1727         }
1728
1729         /* we don't get interruption callbacks until osc_trigger_group_io()
1730          * has been called and put the sync oaps in the pending/urgent lists.*/
1731         if (!list_empty(&oap->oap_pending_item)) {
1732                 list_del_init(&oap->oap_pending_item);
1733                 list_del_init(&oap->oap_urgent_item);
1734
1735                 loi = oap->oap_loi;
1736                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1737                         &loi->loi_write_lop : &loi->loi_read_lop;
1738                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1739                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1740
1741                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1742                 oap->oap_oig = NULL;
1743         }
1744
1745 unlock:
1746         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1747 }
1748
1749 /* this is trying to propogate async writeback errors back up to the
1750  * application.  As an async write fails we record the error code for later if
1751  * the app does an fsync.  As long as errors persist we force future rpcs to be
1752  * sync so that the app can get a sync error and break the cycle of queueing
1753  * pages for which writeback will fail. */
1754 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1755                            int rc)
1756 {
1757         if (rc) {
1758                 if (!ar->ar_rc)
1759                         ar->ar_rc = rc;
1760
1761                 ar->ar_force_sync = 1;
1762                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1763                 return;
1764
1765         }
1766
1767         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1768                 ar->ar_force_sync = 0;
1769 }
1770
1771 static void osc_oap_to_pending(struct osc_async_page *oap)
1772 {
1773         struct loi_oap_pages *lop;
1774
1775         if (oap->oap_cmd & OBD_BRW_WRITE)
1776                 lop = &oap->oap_loi->loi_write_lop;
1777         else
1778                 lop = &oap->oap_loi->loi_read_lop;
1779
1780         if (oap->oap_async_flags & ASYNC_URGENT)
1781                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1782         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1783         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1784 }
1785
1786 /* this must be called holding the loi list lock to give coverage to exit_cache,
1787  * async_flag maintenance, and oap_request */
1788 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1789                               struct osc_async_page *oap, int sent, int rc)
1790 {
1791         __u64 xid = 0;
1792
1793         ENTRY;
1794         if (oap->oap_request != NULL) {
1795                 xid = ptlrpc_req_xid(oap->oap_request);
1796                 ptlrpc_req_finished(oap->oap_request);
1797                 oap->oap_request = NULL;
1798         }
1799
1800         oap->oap_async_flags = 0;
1801         oap->oap_interrupted = 0;
1802
1803         if (oap->oap_cmd & OBD_BRW_WRITE) {
1804                 osc_process_ar(&cli->cl_ar, xid, rc);
1805                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1806         }
1807
1808         if (rc == 0 && oa != NULL) {
1809                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1810                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1811                 if (oa->o_valid & OBD_MD_FLMTIME)
1812                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1813                 if (oa->o_valid & OBD_MD_FLATIME)
1814                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1815                 if (oa->o_valid & OBD_MD_FLCTIME)
1816                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1817         }
1818
1819         if (oap->oap_oig) {
1820                 osc_exit_cache(cli, oap, sent);
1821                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1822                 oap->oap_oig = NULL;
1823                 EXIT;
1824                 return;
1825         }
1826
1827         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1828                                                 oap->oap_cmd, oa, rc);
1829
1830         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1831          * I/O on the page could start, but OSC calls it under lock
1832          * and thus we can add oap back to pending safely */
1833         if (rc)
1834                 /* upper layer wants to leave the page on pending queue */
1835                 osc_oap_to_pending(oap);
1836         else
1837                 osc_exit_cache(cli, oap, sent);
1838         EXIT;
1839 }
1840
1841 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1842 {
1843         struct osc_brw_async_args *aa = data;
1844         struct osc_async_page *oap, *tmp;
1845         struct client_obd *cli;
1846         ENTRY;
1847
1848         rc = osc_brw_fini_request(request, rc);
1849         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1850
1851         if (osc_recoverable_error(rc)) {
1852                 rc = osc_brw_redo_request(request, aa);
1853                 if (rc == 0)
1854                         RETURN(0);
1855         }
1856
1857         cli = aa->aa_cli;
1858
1859         client_obd_list_lock(&cli->cl_loi_list_lock);
1860
1861         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1862          * is called so we know whether to go to sync BRWs or wait for more
1863          * RPCs to complete */
1864         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1865                 cli->cl_w_in_flight--;
1866         else
1867                 cli->cl_r_in_flight--;
1868
1869         /* the caller may re-use the oap after the completion call so
1870          * we need to clean it up a little */
1871         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1872                 list_del_init(&oap->oap_rpc_item);
1873                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1874         }
1875
1876         osc_wake_cache_waiters(cli);
1877         osc_check_rpcs(cli);
1878
1879         client_obd_list_unlock(&cli->cl_loi_list_lock);
1880
1881         OBDO_FREE(aa->aa_oa);
1882
1883         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1884         RETURN(rc);
1885 }
1886
1887 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1888                                             struct list_head *rpc_list,
1889                                             int page_count, int cmd)
1890 {
1891         struct ptlrpc_request *req;
1892         struct brw_page **pga = NULL;
1893         struct osc_brw_async_args *aa;
1894         struct obdo *oa = NULL;
1895         struct obd_async_page_ops *ops = NULL;
1896         void *caller_data = NULL;
1897         struct osc_async_page *oap;
1898         int i, rc;
1899
1900         ENTRY;
1901         LASSERT(!list_empty(rpc_list));
1902
1903         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1904         if (pga == NULL)
1905                 RETURN(ERR_PTR(-ENOMEM));
1906
1907         OBDO_ALLOC(oa);
1908         if (oa == NULL)
1909                 GOTO(out, req = ERR_PTR(-ENOMEM));
1910
1911         i = 0;
1912         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1913                 if (ops == NULL) {
1914                         ops = oap->oap_caller_ops;
1915                         caller_data = oap->oap_caller_data;
1916                 }
1917                 pga[i] = &oap->oap_brw_page;
1918                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1919                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1920                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1921                 i++;
1922         }
1923
1924         /* always get the data for the obdo for the rpc */
1925         LASSERT(ops != NULL);
1926         ops->ap_fill_obdo(caller_data, cmd, oa);
1927
1928         sort_brw_pages(pga, page_count);
1929         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1930         if (rc != 0) {
1931                 CERROR("prep_req failed: %d\n", rc);
1932                 GOTO(out, req = ERR_PTR(rc));
1933         }
1934
1935         /* Need to update the timestamps after the request is built in case
1936          * we race with setattr (locally or in queue at OST).  If OST gets
1937          * later setattr before earlier BRW (as determined by the request xid),
1938          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1939          * way to do this in a single call.  bug 10150 */
1940         ops->ap_update_obdo(caller_data, cmd, oa,
1941                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1942
1943         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1944         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1945         INIT_LIST_HEAD(&aa->aa_oaps);
1946         list_splice(rpc_list, &aa->aa_oaps);
1947         INIT_LIST_HEAD(rpc_list);
1948
1949 out:
1950         if (IS_ERR(req)) {
1951                 if (oa)
1952                         OBDO_FREE(oa);
1953                 if (pga)
1954                         OBD_FREE(pga, sizeof(*pga) * page_count);
1955         }
1956         RETURN(req);
1957 }
1958
1959 /* the loi lock is held across this function but it's allowed to release
1960  * and reacquire it during its work */
1961 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1962                             int cmd, struct loi_oap_pages *lop)
1963 {
1964         struct ptlrpc_request *req;
1965         obd_count page_count = 0;
1966         struct osc_async_page *oap = NULL, *tmp;
1967         struct osc_brw_async_args *aa;
1968         struct obd_async_page_ops *ops;
1969         CFS_LIST_HEAD(rpc_list);
1970         unsigned int ending_offset;
1971         unsigned  starting_offset = 0;
1972         ENTRY;
1973
1974         /* first we find the pages we're allowed to work with */
1975         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1976                 ops = oap->oap_caller_ops;
1977
1978                 LASSERT(oap->oap_magic == OAP_MAGIC);
1979
1980                 /* in llite being 'ready' equates to the page being locked
1981                  * until completion unlocks it.  commit_write submits a page
1982                  * as not ready because its unlock will happen unconditionally
1983                  * as the call returns.  if we race with commit_write giving
1984                  * us that page we dont' want to create a hole in the page
1985                  * stream, so we stop and leave the rpc to be fired by
1986                  * another dirtier or kupdated interval (the not ready page
1987                  * will still be on the dirty list).  we could call in
1988                  * at the end of ll_file_write to process the queue again. */
1989                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1990                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1991                         if (rc < 0)
1992                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1993                                                 "instead of ready\n", oap,
1994                                                 oap->oap_page, rc);
1995                         switch (rc) {
1996                         case -EAGAIN:
1997                                 /* llite is telling us that the page is still
1998                                  * in commit_write and that we should try
1999                                  * and put it in an rpc again later.  we
2000                                  * break out of the loop so we don't create
2001                                  * a hole in the sequence of pages in the rpc
2002                                  * stream.*/
2003                                 oap = NULL;
2004                                 break;
2005                         case -EINTR:
2006                                 /* the io isn't needed.. tell the checks
2007                                  * below to complete the rpc with EINTR */
2008                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2009                                 oap->oap_count = -EINTR;
2010                                 break;
2011                         case 0:
2012                                 oap->oap_async_flags |= ASYNC_READY;
2013                                 break;
2014                         default:
2015                                 LASSERTF(0, "oap %p page %p returned %d "
2016                                             "from make_ready\n", oap,
2017                                             oap->oap_page, rc);
2018                                 break;
2019                         }
2020                 }
2021                 if (oap == NULL)
2022                         break;
2023                 /*
2024                  * Page submitted for IO has to be locked. Either by
2025                  * ->ap_make_ready() or by higher layers.
2026                  *
2027                  * XXX nikita: this assertion should be adjusted when lustre
2028                  * starts using PG_writeback for pages being written out.
2029                  */
2030 #if defined(__KERNEL__) && defined(__LINUX__)
2031                 LASSERT(PageLocked(oap->oap_page));
2032 #endif
2033                 /* If there is a gap at the start of this page, it can't merge
2034                  * with any previous page, so we'll hand the network a
2035                  * "fragmented" page array that it can't transfer in 1 RDMA */
2036                 if (page_count != 0 && oap->oap_page_off != 0)
2037                         break;
2038
2039                 /* take the page out of our book-keeping */
2040                 list_del_init(&oap->oap_pending_item);
2041                 lop_update_pending(cli, lop, cmd, -1);
2042                 list_del_init(&oap->oap_urgent_item);
2043
2044                 if (page_count == 0)
2045                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2046                                           (PTLRPC_MAX_BRW_SIZE - 1);
2047
2048                 /* ask the caller for the size of the io as the rpc leaves. */
2049                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2050                         oap->oap_count =
2051                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2052                 if (oap->oap_count <= 0) {
2053                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2054                                oap->oap_count);
2055                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2056                         continue;
2057                 }
2058
2059                 /* now put the page back in our accounting */
2060                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2061                 if (++page_count >= cli->cl_max_pages_per_rpc)
2062                         break;
2063
2064                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2065                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2066                  * have the same alignment as the initial writes that allocated
2067                  * extents on the server. */
2068                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2069                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2070                 if (ending_offset == 0)
2071                         break;
2072
2073                 /* If there is a gap at the end of this page, it can't merge
2074                  * with any subsequent pages, so we'll hand the network a
2075                  * "fragmented" page array that it can't transfer in 1 RDMA */
2076                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2077                         break;
2078         }
2079
2080         osc_wake_cache_waiters(cli);
2081
2082         if (page_count == 0)
2083                 RETURN(0);
2084
2085         loi_list_maint(cli, loi);
2086
2087         client_obd_list_unlock(&cli->cl_loi_list_lock);
2088
2089         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2090         if (IS_ERR(req)) {
2091                 /* this should happen rarely and is pretty bad, it makes the
2092                  * pending list not follow the dirty order */
2093                 client_obd_list_lock(&cli->cl_loi_list_lock);
2094                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2095                         list_del_init(&oap->oap_rpc_item);
2096
2097                         /* queued sync pages can be torn down while the pages
2098                          * were between the pending list and the rpc */
2099                         if (oap->oap_interrupted) {
2100                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2101                                 osc_ap_completion(cli, NULL, oap, 0,
2102                                                   oap->oap_count);
2103                                 continue;
2104                         }
2105                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2106                 }
2107                 loi_list_maint(cli, loi);
2108                 RETURN(PTR_ERR(req));
2109         }
2110
2111         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2112         if (cmd == OBD_BRW_READ) {
2113                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2114                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2115                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2116                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2117                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2118         } else {
2119                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2120                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2121                                  cli->cl_w_in_flight);
2122                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2123                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2124                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2125         }
2126
2127         client_obd_list_lock(&cli->cl_loi_list_lock);
2128
2129         if (cmd == OBD_BRW_READ)
2130                 cli->cl_r_in_flight++;
2131         else
2132                 cli->cl_w_in_flight++;
2133
2134         /* queued sync pages can be torn down while the pages
2135          * were between the pending list and the rpc */
2136         tmp = NULL;
2137         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2138                 /* only one oap gets a request reference */
2139                 if (tmp == NULL)
2140                         tmp = oap;
2141                 if (oap->oap_interrupted && !req->rq_intr) {
2142                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2143                                oap, req);
2144                         ptlrpc_mark_interrupted(req);
2145                 }
2146         }
2147         if (tmp != NULL)
2148                 tmp->oap_request = ptlrpc_request_addref(req);
2149
2150         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2151                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2152
2153         req->rq_interpret_reply = brw_interpret_oap;
2154         ptlrpcd_add_req(req);
2155         RETURN(1);
2156 }
2157
2158 #define LOI_DEBUG(LOI, STR, args...)                                     \
2159         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2160                !list_empty(&(LOI)->loi_cli_item),                        \
2161                (LOI)->loi_write_lop.lop_num_pending,                     \
2162                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2163                (LOI)->loi_read_lop.lop_num_pending,                      \
2164                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2165                args)                                                     \
2166
2167 /* This is called by osc_check_rpcs() to find which objects have pages that
2168  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2169 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2170 {
2171         ENTRY;
2172         /* first return all objects which we already know to have
2173          * pages ready to be stuffed into rpcs */
2174         if (!list_empty(&cli->cl_loi_ready_list))
2175                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2176                                   struct lov_oinfo, loi_cli_item));
2177
2178         /* then if we have cache waiters, return all objects with queued
2179          * writes.  This is especially important when many small files
2180          * have filled up the cache and not been fired into rpcs because
2181          * they don't pass the nr_pending/object threshhold */
2182         if (!list_empty(&cli->cl_cache_waiters) &&
2183             !list_empty(&cli->cl_loi_write_list))
2184                 RETURN(list_entry(cli->cl_loi_write_list.next,
2185                                   struct lov_oinfo, loi_write_item));
2186
2187         /* then return all queued objects when we have an invalid import
2188          * so that they get flushed */
2189         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2190                 if (!list_empty(&cli->cl_loi_write_list))
2191                         RETURN(list_entry(cli->cl_loi_write_list.next,
2192                                           struct lov_oinfo, loi_write_item));
2193                 if (!list_empty(&cli->cl_loi_read_list))
2194                         RETURN(list_entry(cli->cl_loi_read_list.next,
2195                                           struct lov_oinfo, loi_read_item));
2196         }
2197         RETURN(NULL);
2198 }
2199
2200 /* called with the loi list lock held */
2201 static void osc_check_rpcs(struct client_obd *cli)
2202 {
2203         struct lov_oinfo *loi;
2204         int rc = 0, race_counter = 0;
2205         ENTRY;
2206
2207         while ((loi = osc_next_loi(cli)) != NULL) {
2208                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2209
2210                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2211                         break;
2212
2213                 /* attempt some read/write balancing by alternating between
2214                  * reads and writes in an object.  The makes_rpc checks here
2215                  * would be redundant if we were getting read/write work items
2216                  * instead of objects.  we don't want send_oap_rpc to drain a
2217                  * partial read pending queue when we're given this object to
2218                  * do io on writes while there are cache waiters */
2219                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2220                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2221                                               &loi->loi_write_lop);
2222                         if (rc < 0)
2223                                 break;
2224                         if (rc > 0)
2225                                 race_counter = 0;
2226                         else
2227                                 race_counter++;
2228                 }
2229                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2230                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2231                                               &loi->loi_read_lop);
2232                         if (rc < 0)
2233                                 break;
2234                         if (rc > 0)
2235                                 race_counter = 0;
2236                         else
2237                                 race_counter++;
2238                 }
2239
2240                 /* attempt some inter-object balancing by issueing rpcs
2241                  * for each object in turn */
2242                 if (!list_empty(&loi->loi_cli_item))
2243                         list_del_init(&loi->loi_cli_item);
2244                 if (!list_empty(&loi->loi_write_item))
2245                         list_del_init(&loi->loi_write_item);
2246                 if (!list_empty(&loi->loi_read_item))
2247                         list_del_init(&loi->loi_read_item);
2248
2249                 loi_list_maint(cli, loi);
2250
2251                 /* send_oap_rpc fails with 0 when make_ready tells it to
2252                  * back off.  llite's make_ready does this when it tries
2253                  * to lock a page queued for write that is already locked.
2254                  * we want to try sending rpcs from many objects, but we
2255                  * don't want to spin failing with 0.  */
2256                 if (race_counter == 10)
2257                         break;
2258         }
2259         EXIT;
2260 }
2261
2262 /* we're trying to queue a page in the osc so we're subject to the
2263  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2264  * If the osc's queued pages are already at that limit, then we want to sleep
2265  * until there is space in the osc's queue for us.  We also may be waiting for
2266  * write credits from the OST if there are RPCs in flight that may return some
2267  * before we fall back to sync writes.
2268  *
2269  * We need this know our allocation was granted in the presence of signals */
2270 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2271 {
2272         int rc;
2273         ENTRY;
2274         client_obd_list_lock(&cli->cl_loi_list_lock);
2275         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2276         client_obd_list_unlock(&cli->cl_loi_list_lock);
2277         RETURN(rc);
2278 };
2279
2280 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2281  * grant or cache space. */
2282 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2283                            struct osc_async_page *oap)
2284 {
2285         struct osc_cache_waiter ocw;
2286         struct l_wait_info lwi = { 0 };
2287         ENTRY;
2288
2289         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2290                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2291                cli->cl_dirty_max, obd_max_dirty_pages,
2292                cli->cl_lost_grant, cli->cl_avail_grant);
2293
2294         /* force the caller to try sync io.  this can jump the list
2295          * of queued writes and create a discontiguous rpc stream */
2296         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2297             loi->loi_ar.ar_force_sync)
2298                 RETURN(-EDQUOT);
2299
2300         /* Hopefully normal case - cache space and write credits available */
2301         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2302             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2303             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2304                 /* account for ourselves */
2305                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2306                 RETURN(0);
2307         }
2308
2309         /* Make sure that there are write rpcs in flight to wait for.  This
2310          * is a little silly as this object may not have any pending but
2311          * other objects sure might. */
2312         if (cli->cl_w_in_flight) {
2313                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2314                 cfs_waitq_init(&ocw.ocw_waitq);
2315                 ocw.ocw_oap = oap;
2316                 ocw.ocw_rc = 0;
2317
2318                 loi_list_maint(cli, loi);
2319                 osc_check_rpcs(cli);
2320                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2321
2322                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2323                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2324
2325                 client_obd_list_lock(&cli->cl_loi_list_lock);
2326                 if (!list_empty(&ocw.ocw_entry)) {
2327                         list_del(&ocw.ocw_entry);
2328                         RETURN(-EINTR);
2329                 }
2330                 RETURN(ocw.ocw_rc);
2331         }
2332
2333         RETURN(-EDQUOT);
2334 }
2335
2336 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2337                         struct lov_oinfo *loi, cfs_page_t *page,
2338                         obd_off offset, struct obd_async_page_ops *ops,
2339                         void *data, void **res)
2340 {
2341         struct osc_async_page *oap;
2342         ENTRY;
2343
2344         if (!page)
2345                 return size_round(sizeof(*oap));
2346
2347         oap = *res;
2348         oap->oap_magic = OAP_MAGIC;
2349         oap->oap_cli = &exp->exp_obd->u.cli;
2350         oap->oap_loi = loi;
2351
2352         oap->oap_caller_ops = ops;
2353         oap->oap_caller_data = data;
2354
2355         oap->oap_page = page;
2356         oap->oap_obj_off = offset;
2357
2358         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2359         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2360         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2361
2362         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2363
2364         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2365         RETURN(0);
2366 }
2367
2368 struct osc_async_page *oap_from_cookie(void *cookie)
2369 {
2370         struct osc_async_page *oap = cookie;
2371         if (oap->oap_magic != OAP_MAGIC)
2372                 return ERR_PTR(-EINVAL);
2373         return oap;
2374 };
2375
2376 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2377                               struct lov_oinfo *loi, void *cookie,
2378                               int cmd, obd_off off, int count,
2379                               obd_flag brw_flags, enum async_flags async_flags)
2380 {
2381         struct client_obd *cli = &exp->exp_obd->u.cli;
2382         struct osc_async_page *oap;
2383         int rc = 0;
2384         ENTRY;
2385
2386         oap = oap_from_cookie(cookie);
2387         if (IS_ERR(oap))
2388                 RETURN(PTR_ERR(oap));
2389
2390         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2391                 RETURN(-EIO);
2392
2393         if (!list_empty(&oap->oap_pending_item) ||
2394             !list_empty(&oap->oap_urgent_item) ||
2395             !list_empty(&oap->oap_rpc_item))
2396                 RETURN(-EBUSY);
2397
2398         /* check if the file's owner/group is over quota */
2399 #ifdef HAVE_QUOTA_SUPPORT
2400         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2401                 struct obd_async_page_ops *ops;
2402                 struct obdo *oa;
2403
2404                 OBDO_ALLOC(oa);
2405                 if (oa == NULL)
2406                         RETURN(-ENOMEM);
2407
2408                 ops = oap->oap_caller_ops;
2409                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2410                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2411                     NO_QUOTA)
2412                         rc = -EDQUOT;
2413
2414                 OBDO_FREE(oa);
2415                 if (rc)
2416                         RETURN(rc);
2417         }
2418 #endif
2419
2420         if (loi == NULL)
2421                 loi = lsm->lsm_oinfo[0];
2422
2423         client_obd_list_lock(&cli->cl_loi_list_lock);
2424
2425         oap->oap_cmd = cmd;
2426         oap->oap_page_off = off;
2427         oap->oap_count = count;
2428         oap->oap_brw_flags = brw_flags;
2429         oap->oap_async_flags = async_flags;
2430
2431         if (cmd & OBD_BRW_WRITE) {
2432                 rc = osc_enter_cache(cli, loi, oap);
2433                 if (rc) {
2434                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2435                         RETURN(rc);
2436                 }
2437         }
2438
2439         osc_oap_to_pending(oap);
2440         loi_list_maint(cli, loi);
2441
2442         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2443                   cmd);
2444
2445         osc_check_rpcs(cli);
2446         client_obd_list_unlock(&cli->cl_loi_list_lock);
2447
2448         RETURN(0);
2449 }
2450
2451 /* aka (~was & now & flag), but this is more clear :) */
2452 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2453
2454 static int osc_set_async_flags(struct obd_export *exp,
2455                                struct lov_stripe_md *lsm,
2456                                struct lov_oinfo *loi, void *cookie,
2457                                obd_flag async_flags)
2458 {
2459         struct client_obd *cli = &exp->exp_obd->u.cli;
2460         struct loi_oap_pages *lop;
2461         struct osc_async_page *oap;
2462         int rc = 0;
2463         ENTRY;
2464
2465         oap = oap_from_cookie(cookie);
2466         if (IS_ERR(oap))
2467                 RETURN(PTR_ERR(oap));
2468
2469         /*
2470          * bug 7311: OST-side locking is only supported for liblustre for now
2471          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2472          * implementation has to handle case where OST-locked page was picked
2473          * up by, e.g., ->writepage().
2474          */
2475         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2476         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2477                                      * tread here. */
2478
2479         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2480                 RETURN(-EIO);
2481
2482         if (loi == NULL)
2483                 loi = lsm->lsm_oinfo[0];
2484
2485         if (oap->oap_cmd & OBD_BRW_WRITE) {
2486                 lop = &loi->loi_write_lop;
2487         } else {
2488                 lop = &loi->loi_read_lop;
2489         }
2490
2491         client_obd_list_lock(&cli->cl_loi_list_lock);
2492
2493         if (list_empty(&oap->oap_pending_item))
2494                 GOTO(out, rc = -EINVAL);
2495
2496         if ((oap->oap_async_flags & async_flags) == async_flags)
2497                 GOTO(out, rc = 0);
2498
2499         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2500                 oap->oap_async_flags |= ASYNC_READY;
2501
2502         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2503                 if (list_empty(&oap->oap_rpc_item)) {
2504                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2505                         loi_list_maint(cli, loi);
2506                 }
2507         }
2508
2509         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2510                         oap->oap_async_flags);
2511 out:
2512         osc_check_rpcs(cli);
2513         client_obd_list_unlock(&cli->cl_loi_list_lock);
2514         RETURN(rc);
2515 }
2516
2517 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2518                              struct lov_oinfo *loi,
2519                              struct obd_io_group *oig, void *cookie,
2520                              int cmd, obd_off off, int count,
2521                              obd_flag brw_flags,
2522                              obd_flag async_flags)
2523 {
2524         struct client_obd *cli = &exp->exp_obd->u.cli;
2525         struct osc_async_page *oap;
2526         struct loi_oap_pages *lop;
2527         int rc = 0;
2528         ENTRY;
2529
2530         oap = oap_from_cookie(cookie);
2531         if (IS_ERR(oap))
2532                 RETURN(PTR_ERR(oap));
2533
2534         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2535                 RETURN(-EIO);
2536
2537         if (!list_empty(&oap->oap_pending_item) ||
2538             !list_empty(&oap->oap_urgent_item) ||
2539             !list_empty(&oap->oap_rpc_item))
2540                 RETURN(-EBUSY);
2541
2542         if (loi == NULL)
2543                 loi = lsm->lsm_oinfo[0];
2544
2545         client_obd_list_lock(&cli->cl_loi_list_lock);
2546
2547         oap->oap_cmd = cmd;
2548         oap->oap_page_off = off;
2549         oap->oap_count = count;
2550         oap->oap_brw_flags = brw_flags;
2551         oap->oap_async_flags = async_flags;
2552
2553         if (cmd & OBD_BRW_WRITE)
2554                 lop = &loi->loi_write_lop;
2555         else
2556                 lop = &loi->loi_read_lop;
2557
2558         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2559         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2560                 oap->oap_oig = oig;
2561                 rc = oig_add_one(oig, &oap->oap_occ);
2562         }
2563
2564         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2565                   oap, oap->oap_page, rc);
2566
2567         client_obd_list_unlock(&cli->cl_loi_list_lock);
2568
2569         RETURN(rc);
2570 }
2571
2572 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2573                                  struct loi_oap_pages *lop, int cmd)
2574 {
2575         struct list_head *pos, *tmp;
2576         struct osc_async_page *oap;
2577
2578         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2579                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2580                 list_del(&oap->oap_pending_item);
2581                 osc_oap_to_pending(oap);
2582         }
2583         loi_list_maint(cli, loi);
2584 }
2585
2586 static int osc_trigger_group_io(struct obd_export *exp,
2587                                 struct lov_stripe_md *lsm,
2588                                 struct lov_oinfo *loi,
2589                                 struct obd_io_group *oig)
2590 {
2591         struct client_obd *cli = &exp->exp_obd->u.cli;
2592         ENTRY;
2593
2594         if (loi == NULL)
2595                 loi = lsm->lsm_oinfo[0];
2596
2597         client_obd_list_lock(&cli->cl_loi_list_lock);
2598
2599         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2600         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2601
2602         osc_check_rpcs(cli);
2603         client_obd_list_unlock(&cli->cl_loi_list_lock);
2604
2605         RETURN(0);
2606 }
2607
2608 static int osc_teardown_async_page(struct obd_export *exp,
2609                                    struct lov_stripe_md *lsm,
2610                                    struct lov_oinfo *loi, void *cookie)
2611 {
2612         struct client_obd *cli = &exp->exp_obd->u.cli;
2613         struct loi_oap_pages *lop;
2614         struct osc_async_page *oap;
2615         int rc = 0;
2616         ENTRY;
2617
2618         oap = oap_from_cookie(cookie);
2619         if (IS_ERR(oap))
2620                 RETURN(PTR_ERR(oap));
2621
2622         if (loi == NULL)
2623                 loi = lsm->lsm_oinfo[0];
2624
2625         if (oap->oap_cmd & OBD_BRW_WRITE) {
2626                 lop = &loi->loi_write_lop;
2627         } else {
2628                 lop = &loi->loi_read_lop;
2629         }
2630
2631         client_obd_list_lock(&cli->cl_loi_list_lock);
2632
2633         if (!list_empty(&oap->oap_rpc_item))
2634                 GOTO(out, rc = -EBUSY);
2635
2636         osc_exit_cache(cli, oap, 0);
2637         osc_wake_cache_waiters(cli);
2638
2639         if (!list_empty(&oap->oap_urgent_item)) {
2640                 list_del_init(&oap->oap_urgent_item);
2641                 oap->oap_async_flags &= ~ASYNC_URGENT;
2642         }
2643         if (!list_empty(&oap->oap_pending_item)) {
2644                 list_del_init(&oap->oap_pending_item);
2645                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2646         }
2647         loi_list_maint(cli, loi);
2648
2649         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2650 out:
2651         client_obd_list_unlock(&cli->cl_loi_list_lock);
2652         RETURN(rc);
2653 }
2654
2655 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2656                                     int flags)
2657 {
2658         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2659
2660         if (lock == NULL) {
2661                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2662                 return;
2663         }
2664         lock_res_and_lock(lock);
2665 #ifdef __KERNEL__
2666 #ifdef __LINUX__
2667         /* Liang XXX: Darwin and Winnt checking should be added */
2668         if (lock->l_ast_data && lock->l_ast_data != data) {
2669                 struct inode *new_inode = data;
2670                 struct inode *old_inode = lock->l_ast_data;
2671                 if (!(old_inode->i_state & I_FREEING))
2672                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2673                 LASSERTF(old_inode->i_state & I_FREEING,
2674                          "Found existing inode %p/%lu/%u state %lu in lock: "
2675                          "setting data to %p/%lu/%u\n", old_inode,
2676                          old_inode->i_ino, old_inode->i_generation,
2677                          old_inode->i_state,
2678                          new_inode, new_inode->i_ino, new_inode->i_generation);
2679         }
2680 #endif
2681 #endif
2682         lock->l_ast_data = data;
2683         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2684         unlock_res_and_lock(lock);
2685         LDLM_LOCK_PUT(lock);
2686 }
2687
2688 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2689                              ldlm_iterator_t replace, void *data)
2690 {
2691         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2692         struct obd_device *obd = class_exp2obd(exp);
2693
2694         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2695         return 0;
2696 }
2697
2698 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2699                             int intent, int rc)
2700 {
2701         ENTRY;
2702
2703         if (intent) {
2704                 /* The request was created before ldlm_cli_enqueue call. */
2705                 if (rc == ELDLM_LOCK_ABORTED) {
2706                         struct ldlm_reply *rep;
2707
2708                         /* swabbed by ldlm_cli_enqueue() */
2709                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2710                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2711                                              sizeof(*rep));
2712                         LASSERT(rep != NULL);
2713                         if (rep->lock_policy_res1)
2714                                 rc = rep->lock_policy_res1;
2715                 }
2716         }
2717
2718         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2719                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2720                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2721                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2722                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2723         }
2724
2725         /* Call the update callback. */
2726         rc = oinfo->oi_cb_up(oinfo, rc);
2727         RETURN(rc);
2728 }
2729
2730 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2731                                  struct osc_enqueue_args *aa, int rc)
2732 {
2733         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2734         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2735         struct ldlm_lock *lock;
2736
2737         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2738          * be valid. */
2739         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2740
2741         /* Complete obtaining the lock procedure. */
2742         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2743                                    aa->oa_ei->ei_mode,
2744                                    &aa->oa_oi->oi_flags,
2745                                    &lsm->lsm_oinfo[0]->loi_lvb,
2746                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2747                                    lustre_swab_ost_lvb,
2748                                    aa->oa_oi->oi_lockh, rc);
2749
2750         /* Complete osc stuff. */
2751         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2752
2753         /* Release the lock for async request. */
2754         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2755                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2756
2757         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2758                  aa->oa_oi->oi_lockh, req, aa);
2759         LDLM_LOCK_PUT(lock);
2760         return rc;
2761 }
2762
2763 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2764  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2765  * other synchronous requests, however keeping some locks and trying to obtain
2766  * others may take a considerable amount of time in a case of ost failure; and
2767  * when other sync requests do not get released lock from a client, the client
2768  * is excluded from the cluster -- such scenarious make the life difficult, so
2769  * release locks just after they are obtained. */
2770 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2771                        struct ldlm_enqueue_info *einfo,
2772                        struct ptlrpc_request_set *rqset)
2773 {
2774         struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2775         struct obd_device *obd = exp->exp_obd;
2776         struct ldlm_reply *rep;
2777         struct ptlrpc_request *req = NULL;
2778         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2779         int rc;
2780         ENTRY;
2781
2782         /* Filesystem lock extents are extended to page boundaries so that
2783          * dealing with the page cache is a little smoother.  */
2784         oinfo->oi_policy.l_extent.start -=
2785                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2786         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2787
2788         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2789                 goto no_match;
2790
2791         /* Next, search for already existing extent locks that will cover us */
2792         rc = ldlm_lock_match(obd->obd_namespace,
2793                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2794                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2795                              oinfo->oi_lockh);
2796         if (rc == 1) {
2797                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2798                                         oinfo->oi_flags);
2799                 if (intent) {
2800                         /* I would like to be able to ASSERT here that rss <=
2801                          * kms, but I can't, for reasons which are explained in
2802                          * lov_enqueue() */
2803                 }
2804
2805                 /* We already have a lock, and it's referenced */
2806                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2807
2808                 /* For async requests, decref the lock. */
2809                 if (rqset)
2810                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2811
2812                 RETURN(ELDLM_OK);
2813         }
2814
2815         /* If we're trying to read, we also search for an existing PW lock.  The
2816          * VFS and page cache already protect us locally, so lots of readers/
2817          * writers can share a single PW lock.
2818          *
2819          * There are problems with conversion deadlocks, so instead of
2820          * converting a read lock to a write lock, we'll just enqueue a new
2821          * one.
2822          *
2823          * At some point we should cancel the read lock instead of making them
2824          * send us a blocking callback, but there are problems with canceling
2825          * locks out from other users right now, too. */
2826
2827         if (einfo->ei_mode == LCK_PR) {
2828                 rc = ldlm_lock_match(obd->obd_namespace,
2829                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2830                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2831                                      LCK_PW, oinfo->oi_lockh);
2832                 if (rc == 1) {
2833                         /* FIXME: This is not incredibly elegant, but it might
2834                          * be more elegant than adding another parameter to
2835                          * lock_match.  I want a second opinion. */
2836                         /* addref the lock only if not async requests. */
2837                         if (!rqset)
2838                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2839                         osc_set_data_with_check(oinfo->oi_lockh,
2840                                                 einfo->ei_cbdata,
2841                                                 oinfo->oi_flags);
2842                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2843                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2844                         RETURN(ELDLM_OK);
2845                 }
2846         }
2847
2848  no_match:
2849         if (intent) {
2850                 int size[3] = {
2851                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2852                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2853                         [DLM_LOCKREQ_OFF + 1] = 0 };
2854
2855                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2856                 if (req == NULL)
2857                         RETURN(-ENOMEM);
2858
2859                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2860                 size[DLM_REPLY_REC_OFF] = 
2861                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2862                 ptlrpc_req_set_repsize(req, 3, size);
2863         }
2864
2865         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2866         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2867
2868         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
2869                               &oinfo->oi_policy, &oinfo->oi_flags,
2870                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2871                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2872                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2873                               rqset ? 1 : 0);
2874         if (rqset) {
2875                 if (!rc) {
2876                         struct osc_enqueue_args *aa;
2877                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2878                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2879                         aa->oa_oi = oinfo;
2880                         aa->oa_ei = einfo;
2881                         aa->oa_exp = exp;
2882
2883                         req->rq_interpret_reply = osc_enqueue_interpret;
2884                         ptlrpc_set_add_req(rqset, req);
2885                 } else if (intent) {
2886                         ptlrpc_req_finished(req);
2887                 }
2888                 RETURN(rc);
2889         }
2890
2891         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2892         if (intent)
2893                 ptlrpc_req_finished(req);
2894
2895         RETURN(rc);
2896 }
2897
2898 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2899                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2900                      int *flags, void *data, struct lustre_handle *lockh)
2901 {
2902         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2903         struct obd_device *obd = exp->exp_obd;
2904         int rc;
2905         int lflags = *flags;
2906         ENTRY;
2907
2908         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2909
2910         /* Filesystem lock extents are extended to page boundaries so that
2911          * dealing with the page cache is a little smoother */
2912         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2913         policy->l_extent.end |= ~CFS_PAGE_MASK;
2914
2915         /* Next, search for already existing extent locks that will cover us */
2916         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2917                              policy, mode, lockh);
2918         if (rc) {
2919                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2920                         osc_set_data_with_check(lockh, data, lflags);
2921                 RETURN(rc);
2922         }
2923         /* If we're trying to read, we also search for an existing PW lock.  The
2924          * VFS and page cache already protect us locally, so lots of readers/
2925          * writers can share a single PW lock. */
2926         if (mode == LCK_PR) {
2927                 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, 
2928                                      &res_id, type,
2929                                      policy, LCK_PW, lockh);
2930                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2931                         /* FIXME: This is not incredibly elegant, but it might
2932                          * be more elegant than adding another parameter to
2933                          * lock_match.  I want a second opinion. */
2934                         osc_set_data_with_check(lockh, data, lflags);
2935                         ldlm_lock_addref(lockh, LCK_PR);
2936                         ldlm_lock_decref(lockh, LCK_PW);
2937                 }
2938         }
2939         RETURN(rc);
2940 }
2941
2942 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2943                       __u32 mode, struct lustre_handle *lockh)
2944 {
2945         ENTRY;
2946
2947         if (unlikely(mode == LCK_GROUP))
2948                 ldlm_lock_decref_and_cancel(lockh, mode);
2949         else
2950                 ldlm_lock_decref(lockh, mode);
2951
2952         RETURN(0);
2953 }
2954
2955 static int osc_cancel_unused(struct obd_export *exp,
2956                              struct lov_stripe_md *lsm, int flags, void *opaque)
2957 {
2958         struct obd_device *obd = class_exp2obd(exp);
2959         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2960
2961         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2962                                       opaque);
2963 }
2964
2965 static int osc_join_lru(struct obd_export *exp,
2966                         struct lov_stripe_md *lsm, int join)
2967 {
2968         struct obd_device *obd = class_exp2obd(exp);
2969         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2970
2971         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2972 }
2973
2974 static int osc_statfs_interpret(struct ptlrpc_request *req,
2975                                 struct osc_async_args *aa, int rc)
2976 {
2977         struct obd_statfs *msfs;
2978         ENTRY;
2979
2980         if (rc != 0)
2981                 GOTO(out, rc);
2982
2983         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2984                                   lustre_swab_obd_statfs);
2985         if (msfs == NULL) {
2986                 CERROR("Can't unpack obd_statfs\n");
2987                 GOTO(out, rc = -EPROTO);
2988         }
2989
2990         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2991 out:
2992         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2993         RETURN(rc);
2994 }
2995
2996 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2997                             __u64 max_age, struct ptlrpc_request_set *rqset)
2998 {
2999         struct ptlrpc_request *req;
3000         struct osc_async_args *aa;
3001         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3002         ENTRY;
3003
3004         /* We could possibly pass max_age in the request (as an absolute
3005          * timestamp or a "seconds.usec ago") so the target can avoid doing
3006          * extra calls into the filesystem if that isn't necessary (e.g.
3007          * during mount that would help a bit).  Having relative timestamps
3008          * is not so great if request processing is slow, while absolute
3009          * timestamps are not ideal because they need time synchronization. */
3010         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3011                               OST_STATFS, 1, NULL, NULL);
3012         if (!req)
3013                 RETURN(-ENOMEM);
3014
3015         ptlrpc_req_set_repsize(req, 2, size);
3016         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3017
3018         req->rq_interpret_reply = osc_statfs_interpret;
3019         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3020         aa = (struct osc_async_args *)&req->rq_async_args;
3021         aa->aa_oi = oinfo;
3022
3023         ptlrpc_set_add_req(rqset, req);
3024         RETURN(0);
3025 }
3026
3027 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3028                       __u64 max_age)
3029 {
3030         struct obd_statfs *msfs;
3031         struct ptlrpc_request *req;
3032         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3033         ENTRY;
3034
3035         /* We could possibly pass max_age in the request (as an absolute
3036          * timestamp or a "seconds.usec ago") so the target can avoid doing
3037          * extra calls into the filesystem if that isn't necessary (e.g.
3038          * during mount that would help a bit).  Having relative timestamps
3039          * is not so great if request processing is slow, while absolute
3040          * timestamps are not ideal because they need time synchronization. */
3041         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3042                               OST_STATFS, 1, NULL, NULL);
3043         if (!req)
3044                 RETURN(-ENOMEM);
3045
3046         ptlrpc_req_set_repsize(req, 2, size);
3047         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3048
3049         rc = ptlrpc_queue_wait(req);
3050         if (rc)
3051                 GOTO(out, rc);
3052
3053         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3054                                   lustre_swab_obd_statfs);
3055         if (msfs == NULL) {
3056                 CERROR("Can't unpack obd_statfs\n");
3057                 GOTO(out, rc = -EPROTO);
3058         }
3059
3060         memcpy(osfs, msfs, sizeof(*osfs));
3061
3062         EXIT;
3063  out:
3064         ptlrpc_req_finished(req);
3065         return rc;
3066 }
3067
3068 /* Retrieve object striping information.
3069  *
3070  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3071  * the maximum number of OST indices which will fit in the user buffer.
3072  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3073  */
3074 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3075 {
3076         struct lov_user_md lum, *lumk;
3077         int rc = 0, lum_size;
3078         ENTRY;
3079
3080         if (!lsm)
3081                 RETURN(-ENODATA);
3082
3083         if (copy_from_user(&lum, lump, sizeof(lum)))
3084                 RETURN(-EFAULT);
3085
3086         if (lum.lmm_magic != LOV_USER_MAGIC)
3087                 RETURN(-EINVAL);
3088
3089         if (lum.lmm_stripe_count > 0) {
3090                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3091                 OBD_ALLOC(lumk, lum_size);
3092                 if (!lumk)
3093                         RETURN(-ENOMEM);
3094
3095                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3096         } else {
3097                 lum_size = sizeof(lum);
3098                 lumk = &lum;
3099         }
3100
3101         lumk->lmm_object_id = lsm->lsm_object_id;
3102         lumk->lmm_stripe_count = 1;
3103
3104         if (copy_to_user(lump, lumk, lum_size))
3105                 rc = -EFAULT;
3106
3107         if (lumk != &lum)
3108                 OBD_FREE(lumk, lum_size);
3109
3110         RETURN(rc);
3111 }
3112
3113
3114 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3115                          void *karg, void *uarg)
3116 {
3117         struct obd_device *obd = exp->exp_obd;
3118         struct obd_ioctl_data *data = karg;
3119         int err = 0;
3120         ENTRY;
3121
3122 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3123         MOD_INC_USE_COUNT;
3124 #else
3125         if (!try_module_get(THIS_MODULE)) {
3126                 CERROR("Can't get module. Is it alive?");
3127                 return -EINVAL;
3128         }
3129 #endif
3130         switch (cmd) {
3131         case OBD_IOC_LOV_GET_CONFIG: {
3132                 char *buf;
3133                 struct lov_desc *desc;
3134                 struct obd_uuid uuid;
3135
3136                 buf = NULL;
3137                 len = 0;
3138                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3139                         GOTO(out, err = -EINVAL);
3140
3141                 data = (struct obd_ioctl_data *)buf;
3142
3143                 if (sizeof(*desc) > data->ioc_inllen1) {
3144                         obd_ioctl_freedata(buf, len);
3145                         GOTO(out, err = -EINVAL);
3146                 }
3147
3148                 if (data->ioc_inllen2 < sizeof(uuid)) {
3149                         obd_ioctl_freedata(buf, len);
3150                         GOTO(out, err = -EINVAL);
3151                 }
3152
3153                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3154                 desc->ld_tgt_count = 1;
3155                 desc->ld_active_tgt_count = 1;
3156                 desc->ld_default_stripe_count = 1;
3157                 desc->ld_default_stripe_size = 0;
3158                 desc->ld_default_stripe_offset = 0;
3159                 desc->ld_pattern = 0;
3160                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3161
3162                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3163
3164                 err = copy_to_user((void *)uarg, buf, len);
3165                 if (err)
3166                         err = -EFAULT;
3167                 obd_ioctl_freedata(buf, len);
3168                 GOTO(out, err);
3169         }
3170         case LL_IOC_LOV_SETSTRIPE:
3171                 err = obd_alloc_memmd(exp, karg);
3172                 if (err > 0)
3173                         err = 0;
3174                 GOTO(out, err);
3175         case LL_IOC_LOV_GETSTRIPE:
3176                 err = osc_getstripe(karg, uarg);
3177                 GOTO(out, err);
3178         case OBD_IOC_CLIENT_RECOVER:
3179                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3180                                             data->ioc_inlbuf1);
3181                 if (err > 0)
3182                         err = 0;
3183                 GOTO(out, err);
3184         case IOC_OSC_SET_ACTIVE:
3185                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3186                                                data->ioc_offset);
3187                 GOTO(out, err);
3188         case OBD_IOC_POLL_QUOTACHECK:
3189                 err = lquota_poll_check(quota_interface, exp,
3190                                         (struct if_quotacheck *)karg);
3191                 GOTO(out, err);
3192         default:
3193                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3194                        cmd, cfs_curproc_comm());
3195                 GOTO(out, err = -ENOTTY);
3196         }
3197 out:
3198 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3199         MOD_DEC_USE_COUNT;
3200 #else
3201         module_put(THIS_MODULE);
3202 #endif
3203         return err;
3204 }
3205
3206 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3207                         void *key, __u32 *vallen, void *val)
3208 {
3209         ENTRY;
3210         if (!vallen || !val)
3211                 RETURN(-EFAULT);
3212
3213         if (keylen > strlen("lock_to_stripe") &&
3214             strcmp(key, "lock_to_stripe") == 0) {
3215                 __u32 *stripe = val;
3216                 *vallen = sizeof(*stripe);
3217                 *stripe = 0;
3218                 RETURN(0);
3219         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3220                 struct ptlrpc_request *req;
3221                 obd_id *reply;
3222                 char *bufs[2] = { NULL, key };
3223                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3224
3225                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3226                                       OST_GET_INFO, 2, size, bufs);
3227                 if (req == NULL)
3228                         RETURN(-ENOMEM);
3229
3230                 size[REPLY_REC_OFF] = *vallen;
3231                 ptlrpc_req_set_repsize(req, 2, size);
3232                 rc = ptlrpc_queue_wait(req);
3233                 if (rc)
3234                         GOTO(out, rc);
3235
3236                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3237                                            lustre_swab_ost_last_id);
3238                 if (reply == NULL) {
3239                         CERROR("Can't unpack OST last ID\n");
3240                         GOTO(out, rc = -EPROTO);
3241                 }
3242                 *((obd_id *)val) = *reply;
3243         out:
3244                 ptlrpc_req_finished(req);
3245                 RETURN(rc);
3246         }
3247         RETURN(-EINVAL);
3248 }
3249
3250 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3251                                           void *aa, int rc)
3252 {
3253         struct llog_ctxt *ctxt;
3254         struct obd_import *imp = req->rq_import;
3255         ENTRY;
3256
3257         if (rc != 0)
3258                 RETURN(rc);
3259
3260         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3261         if (ctxt) {
3262                 if (rc == 0)
3263                         rc = llog_initiator_connect(ctxt);
3264                 else
3265                         CERROR("cannot establish connection for "
3266                                "ctxt %p: %d\n", ctxt, rc);
3267         }
3268
3269         spin_lock(&imp->imp_lock);
3270         imp->imp_server_timeout = 1;
3271         imp->imp_pingable = 1;
3272         spin_unlock(&imp->imp_lock);
3273         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3274
3275         RETURN(rc);
3276 }
3277
3278 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3279                               void *key, obd_count vallen, void *val,
3280                               struct ptlrpc_request_set *set)
3281 {
3282         struct ptlrpc_request *req;
3283         struct obd_device  *obd = exp->exp_obd;
3284         struct obd_import *imp = class_exp2cliimp(exp);
3285         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3286         char *bufs[3] = { NULL, key, val };
3287         ENTRY;
3288
3289         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3290
3291         if (KEY_IS(KEY_NEXT_ID)) {
3292                 if (vallen != sizeof(obd_id))
3293                         RETURN(-EINVAL);
3294                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3295                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3296                        exp->exp_obd->obd_name,
3297                        obd->u.cli.cl_oscc.oscc_next_id);
3298
3299                 RETURN(0);
3300         }
3301
3302         if (KEY_IS("unlinked")) {
3303                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3304                 spin_lock(&oscc->oscc_lock);
3305                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3306                 spin_unlock(&oscc->oscc_lock);
3307                 RETURN(0);
3308         }
3309
3310         if (KEY_IS(KEY_INIT_RECOV)) {
3311                 if (vallen != sizeof(int))
3312                         RETURN(-EINVAL);
3313                 spin_lock(&imp->imp_lock);
3314                 imp->imp_initial_recov = *(int *)val;
3315                 spin_unlock(&imp->imp_lock);
3316                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3317                        exp->exp_obd->obd_name,
3318                        imp->imp_initial_recov);
3319                 RETURN(0);
3320         }
3321
3322         if (KEY_IS("checksum")) {
3323                 if (vallen != sizeof(int))
3324                         RETURN(-EINVAL);
3325                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3326                 RETURN(0);
3327         }
3328
3329         if (!set)
3330                 RETURN(-EINVAL);
3331
3332         /* We pass all other commands directly to OST. Since nobody calls osc
3333            methods directly and everybody is supposed to go through LOV, we
3334            assume lov checked invalid values for us.
3335            The only recognised values so far are evict_by_nid and mds_conn.
3336            Even if something bad goes through, we'd get a -EINVAL from OST
3337            anyway. */
3338
3339         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3340                               bufs);
3341         if (req == NULL)
3342                 RETURN(-ENOMEM);
3343
3344         if (KEY_IS(KEY_MDS_CONN))
3345                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3346
3347         ptlrpc_req_set_repsize(req, 1, NULL);
3348         ptlrpc_set_add_req(set, req);
3349         ptlrpc_check_set(set);
3350
3351         RETURN(0);
3352 }
3353
3354
3355 static struct llog_operations osc_size_repl_logops = {
3356         lop_cancel: llog_obd_repl_cancel
3357 };
3358
3359 static struct llog_operations osc_mds_ost_orig_logops;
3360 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3361                          int count, struct llog_catid *catid, 
3362                          struct obd_uuid *uuid)
3363 {
3364         int rc;
3365         ENTRY;
3366
3367         spin_lock(&obd->obd_dev_lock);
3368         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3369                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3370                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3371                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3372                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3373                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3374         }
3375         spin_unlock(&obd->obd_dev_lock);
3376
3377         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3378                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3379         if (rc) {
3380                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3381                 GOTO (out, rc);
3382         }
3383
3384         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3385                         &osc_size_repl_logops);
3386         if (rc) 
3387                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3388 out:
3389         if (rc) {
3390                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3391                        obd->obd_name, tgt->obd_name, count, catid, rc);
3392                 CERROR("logid "LPX64":0x%x\n",
3393                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3394         }
3395         RETURN(rc);
3396 }
3397
3398 static int osc_llog_finish(struct obd_device *obd, int count)
3399 {
3400         struct llog_ctxt *ctxt;
3401         int rc = 0, rc2 = 0;
3402         ENTRY;
3403
3404         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3405         if (ctxt)
3406                 rc = llog_cleanup(ctxt);
3407
3408         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3409         if (ctxt)
3410                 rc2 = llog_cleanup(ctxt);
3411         if (!rc)
3412                 rc = rc2;
3413
3414         RETURN(rc);
3415 }
3416
3417 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3418                          struct obd_uuid *cluuid,
3419                          struct obd_connect_data *data)
3420 {
3421         struct client_obd *cli = &obd->u.cli;
3422
3423         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3424                 long lost_grant;
3425
3426                 client_obd_list_lock(&cli->cl_loi_list_lock);
3427                 data->ocd_grant = cli->cl_avail_grant ?:
3428                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3429                 lost_grant = cli->cl_lost_grant;
3430                 cli->cl_lost_grant = 0;
3431                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3432
3433                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3434                        "cl_lost_grant: %ld\n", data->ocd_grant,
3435                        cli->cl_avail_grant, lost_grant);
3436                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3437                        " ocd_grant: %d\n", data->ocd_connect_flags,
3438                        data->ocd_version, data->ocd_grant);
3439         }
3440
3441         RETURN(0);
3442 }
3443
3444 static int osc_disconnect(struct obd_export *exp)
3445 {
3446         struct obd_device *obd = class_exp2obd(exp);
3447         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3448         int rc;
3449
3450         if (obd->u.cli.cl_conn_count == 1)
3451                 /* flush any remaining cancel messages out to the target */
3452                 llog_sync(ctxt, exp);
3453
3454         rc = client_disconnect_export(exp);
3455         return rc;
3456 }
3457
3458 static int osc_import_event(struct obd_device *obd,
3459                             struct obd_import *imp,
3460                             enum obd_import_event event)
3461 {
3462         struct client_obd *cli;
3463         int rc = 0;
3464
3465         ENTRY;
3466         LASSERT(imp->imp_obd == obd);
3467
3468         switch (event) {
3469         case IMP_EVENT_DISCON: {
3470                 /* Only do this on the MDS OSC's */
3471                 if (imp->imp_server_timeout) {
3472                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3473
3474                         spin_lock(&oscc->oscc_lock);
3475                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3476                         spin_unlock(&oscc->oscc_lock);
3477                 }
3478                 cli = &obd->u.cli;
3479                 client_obd_list_lock(&cli->cl_loi_list_lock);
3480                 cli->cl_avail_grant = 0;
3481                 cli->cl_lost_grant = 0;
3482                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3483
3484                 break;
3485         }
3486         case IMP_EVENT_INACTIVE: {
3487                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3488                 break;
3489         }
3490         case IMP_EVENT_INVALIDATE: {
3491                 struct ldlm_namespace *ns = obd->obd_namespace;
3492
3493                 /* Reset grants */
3494                 cli = &obd->u.cli;
3495                 client_obd_list_lock(&cli->cl_loi_list_lock);
3496                 /* all pages go to failing rpcs due to the invalid import */
3497                 osc_check_rpcs(cli);
3498                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3499
3500                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3501
3502                 break;
3503         }
3504         case IMP_EVENT_ACTIVE: {
3505                 /* Only do this on the MDS OSC's */
3506                 if (imp->imp_server_timeout) {
3507                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3508
3509                         spin_lock(&oscc->oscc_lock);
3510                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3511                         spin_unlock(&oscc->oscc_lock);
3512                 }
3513                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3514                 break;
3515         }
3516         case IMP_EVENT_OCD: {
3517                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3518
3519                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3520                         osc_init_grant(&obd->u.cli, ocd);
3521
3522                 /* See bug 7198 */
3523                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3524                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3525
3526                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3527                 break;
3528         }
3529         default:
3530                 CERROR("Unknown import event %d\n", event);
3531                 LBUG();
3532         }
3533         RETURN(rc);
3534 }
3535
3536 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3537 {
3538         int rc;
3539         ENTRY;
3540
3541         ENTRY;
3542         rc = ptlrpcd_addref();
3543         if (rc)
3544                 RETURN(rc);
3545
3546         rc = client_obd_setup(obd, len, buf);
3547         if (rc) {
3548                 ptlrpcd_decref();
3549         } else {
3550                 struct lprocfs_static_vars lvars;
3551                 struct client_obd *cli = &obd->u.cli;
3552
3553                 lprocfs_init_vars(osc, &lvars);
3554                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3555                         lproc_osc_attach_seqstat(obd);
3556                         ptlrpc_lprocfs_register_obd(obd);
3557                 }
3558
3559                 oscc_init(obd);
3560                 /* We need to allocate a few requests more, because
3561                    brw_interpret_oap tries to create new requests before freeing
3562                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3563                    reserved, but I afraid that might be too much wasted RAM
3564                    in fact, so 2 is just my guess and still should work. */
3565                 cli->cl_import->imp_rq_pool =
3566                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3567                                             OST_MAXREQSIZE,
3568                                             ptlrpc_add_rqs_to_pool);
3569         }
3570
3571         RETURN(rc);
3572 }
3573
3574 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3575 {
3576         int rc = 0;
3577         ENTRY;
3578
3579         switch (stage) {
3580         case OBD_CLEANUP_EARLY: {
3581                 struct obd_import *imp;
3582                 imp = obd->u.cli.cl_import;
3583                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3584                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3585                 ptlrpc_deactivate_import(imp);
3586                 break;
3587         }
3588         case OBD_CLEANUP_EXPORTS: {
3589                 /* If we set up but never connected, the
3590                    client import will not have been cleaned. */
3591                 if (obd->u.cli.cl_import) {
3592                         struct obd_import *imp;
3593                         imp = obd->u.cli.cl_import;
3594                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3595                                obd->obd_name);
3596                         ptlrpc_invalidate_import(imp);
3597                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3598                         class_destroy_import(imp);
3599                         obd->u.cli.cl_import = NULL;
3600                 }
3601                 break;
3602         }
3603         case OBD_CLEANUP_SELF_EXP:
3604                 rc = obd_llog_finish(obd, 0);
3605                 if (rc != 0)
3606                         CERROR("failed to cleanup llogging subsystems\n");
3607                 break;
3608         case OBD_CLEANUP_OBD:
3609                 break;
3610         }
3611         RETURN(rc);
3612 }
3613
3614 int osc_cleanup(struct obd_device *obd)
3615 {
3616         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3617         int rc;
3618
3619         ENTRY;
3620         ptlrpc_lprocfs_unregister_obd(obd);
3621         lprocfs_obd_cleanup(obd);
3622
3623         spin_lock(&oscc->oscc_lock);
3624         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3625         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3626         spin_unlock(&oscc->oscc_lock);
3627
3628         /* free memory of osc quota cache */
3629         lquota_cleanup(quota_interface, obd);
3630
3631         rc = client_obd_cleanup(obd);
3632
3633         ptlrpcd_decref();
3634         RETURN(rc);
3635 }
3636
3637 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3638 {
3639         struct lustre_cfg *lcfg = buf;
3640         struct lprocfs_static_vars lvars;
3641         int rc = 0;
3642
3643         lprocfs_init_vars(osc, &lvars);
3644
3645         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3646         return(rc);
3647 }
3648
3649 struct obd_ops osc_obd_ops = {
3650         .o_owner                = THIS_MODULE,
3651         .o_setup                = osc_setup,
3652         .o_precleanup           = osc_precleanup,
3653         .o_cleanup              = osc_cleanup,
3654         .o_add_conn             = client_import_add_conn,
3655         .o_del_conn             = client_import_del_conn,
3656         .o_connect              = client_connect_import,
3657         .o_reconnect            = osc_reconnect,
3658         .o_disconnect           = osc_disconnect,
3659         .o_statfs               = osc_statfs,
3660         .o_statfs_async         = osc_statfs_async,
3661         .o_packmd               = osc_packmd,
3662         .o_unpackmd             = osc_unpackmd,
3663         .o_create               = osc_create,
3664         .o_destroy              = osc_destroy,
3665         .o_getattr              = osc_getattr,
3666         .o_getattr_async        = osc_getattr_async,
3667         .o_setattr              = osc_setattr,
3668         .o_setattr_async        = osc_setattr_async,
3669         .o_brw                  = osc_brw,
3670         .o_brw_async            = osc_brw_async,
3671         .o_prep_async_page      = osc_prep_async_page,
3672         .o_queue_async_io       = osc_queue_async_io,
3673         .o_set_async_flags      = osc_set_async_flags,
3674         .o_queue_group_io       = osc_queue_group_io,
3675         .o_trigger_group_io     = osc_trigger_group_io,
3676         .o_teardown_async_page  = osc_teardown_async_page,
3677         .o_punch                = osc_punch,
3678         .o_sync                 = osc_sync,
3679         .o_enqueue              = osc_enqueue,
3680         .o_match                = osc_match,
3681         .o_change_cbdata        = osc_change_cbdata,
3682         .o_cancel               = osc_cancel,
3683         .o_cancel_unused        = osc_cancel_unused,
3684         .o_join_lru             = osc_join_lru,
3685         .o_iocontrol            = osc_iocontrol,
3686         .o_get_info             = osc_get_info,
3687         .o_set_info_async       = osc_set_info_async,
3688         .o_import_event         = osc_import_event,
3689         .o_llog_init            = osc_llog_init,
3690         .o_llog_finish          = osc_llog_finish,
3691         .o_process_config       = osc_process_config,
3692 };
3693 int __init osc_init(void)
3694 {
3695         struct lprocfs_static_vars lvars;
3696         int rc;
3697         ENTRY;
3698
3699         atomic_set(&osc_resend_time, cfs_time_seconds(OSC_DEFAULT_TIMEOUT));
3700         lprocfs_init_vars(osc, &lvars);
3701
3702         request_module("lquota");
3703         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3704         lquota_init(quota_interface);
3705         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3706
3707         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3708                                  LUSTRE_OSC_NAME);
3709         if (rc) {
3710                 if (quota_interface)
3711                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3712                 RETURN(rc);
3713         }
3714
3715         RETURN(rc);
3716 }
3717
3718 #ifdef __KERNEL__
3719 static void /*__exit*/ osc_exit(void)
3720 {
3721         lquota_exit(quota_interface);
3722         if (quota_interface)
3723                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3724
3725         class_unregister_type(LUSTRE_OSC_NAME);
3726 }
3727
3728 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3729 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3730 MODULE_LICENSE("GPL");
3731
3732 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3733 #endif