Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453         EXIT;
454 out_req:
455         ptlrpc_req_finished(req);
456 out:
457         if (rc && !*ea)
458                 obd_free_memmd(exp, &lsm);
459         return rc;
460 }
461
462 static int osc_punch_interpret(struct ptlrpc_request *req,
463                                struct osc_async_args *aa, int rc)
464 {
465         struct ost_body *body;
466         ENTRY;
467
468         if (rc != 0)
469                 GOTO(out, rc);
470
471         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
472                                   lustre_swab_ost_body);
473         if (body == NULL) {
474                 CERROR ("can't unpack ost_body\n");
475                 GOTO(out, rc = -EPROTO);
476         }
477
478         *aa->aa_oi->oi_oa = body->oa;
479 out:
480         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
481         RETURN(rc);
482 }
483
484 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
485                      struct obd_trans_info *oti,
486                      struct ptlrpc_request_set *rqset)
487 {
488         struct ptlrpc_request *req;
489         struct osc_async_args *aa;
490         struct ost_body *body;
491         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
492         ENTRY;
493
494         if (!oinfo->oi_oa) {
495                 CERROR("oa NULL\n");
496                 RETURN(-EINVAL);
497         }
498
499         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
500         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
501                               OST_PUNCH, 3, size, NULL);
502         if (!req)
503                 RETURN(-ENOMEM);
504
505         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
506
507         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
508         /* overload the size and blocks fields in the oa with start/end */
509         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
510         body->oa.o_size = oinfo->oi_policy.l_extent.start;
511         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
512         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
513
514         ptlrpc_req_set_repsize(req, 2, size);
515
516         req->rq_interpret_reply = osc_punch_interpret;
517         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
518         aa = (struct osc_async_args *)&req->rq_async_args;
519         aa->aa_oi = oinfo;
520         ptlrpc_set_add_req(rqset, req);
521
522         RETURN(0);
523 }
524
525 static int osc_sync(struct obd_export *exp, struct obdo *oa,
526                     struct lov_stripe_md *md, obd_size start, obd_size end,
527                     void *capa)
528 {
529         struct ptlrpc_request *req;
530         struct ost_body *body;
531         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
532         ENTRY;
533
534         if (!oa) {
535                 CERROR("oa NULL\n");
536                 RETURN(-EINVAL);
537         }
538
539         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
540
541         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
542                               OST_SYNC, 3, size, NULL);
543         if (!req)
544                 RETURN(-ENOMEM);
545
546         /* overload the size and blocks fields in the oa with start/end */
547         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
548         body->oa = *oa;
549         body->oa.o_size = start;
550         body->oa.o_blocks = end;
551         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
552
553         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
554
555         ptlrpc_req_set_repsize(req, 2, size);
556
557         rc = ptlrpc_queue_wait(req);
558         if (rc)
559                 GOTO(out, rc);
560
561         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
562                                   lustre_swab_ost_body);
563         if (body == NULL) {
564                 CERROR ("can't unpack ost_body\n");
565                 GOTO (out, rc = -EPROTO);
566         }
567
568         *oa = body->oa;
569
570         EXIT;
571  out:
572         ptlrpc_req_finished(req);
573         return rc;
574 }
575
576 /* Find and cancel locally locks matched by @mode in the resource found by
577  * @objid. Found locks are added into @cancel list. Returns the amount of
578  * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580                                    struct list_head *cancels, ldlm_mode_t mode,
581                                    int lock_flags)
582 {
583         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
585         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
586         int count;
587         ENTRY;
588
589         if (res == NULL)
590                 RETURN(0);
591
592         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
593                                            lock_flags, 0, NULL);
594         ldlm_resource_putref(res);
595         RETURN(count);
596 }
597
598 /* Destroy requests can be async always on the client, and we don't even really
599  * care about the return code since the client cannot do anything at all about
600  * a destroy failure.
601  * When the MDS is unlinking a filename, it saves the file objects into a
602  * recovery llog, and these object records are cancelled when the OST reports
603  * they were destroyed and sync'd to disk (i.e. transaction committed).
604  * If the client dies, or the OST is down when the object should be destroyed,
605  * the records are not cancelled, and when the OST reconnects to the MDS next,
606  * it will retrieve the llog unlink logs and then sends the log cancellation
607  * cookies to the MDS after committing destroy transactions. */
608 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
609                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
610                        struct obd_export *md_export)
611 {
612         CFS_LIST_HEAD(cancels);
613         struct ptlrpc_request *req;
614         struct ost_body *body;
615         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
616         int count, bufcount = 2;
617         ENTRY;
618
619         if (!oa) {
620                 CERROR("oa NULL\n");
621                 RETURN(-EINVAL);
622         }
623
624         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
625                                         LDLM_FL_DISCARD_DATA);
626         if (exp_connect_cancelset(exp) && count) {
627                 bufcount = 3;
628                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
629         }
630         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
631                               OST_DESTROY, bufcount, size, NULL);
632         if (exp_connect_cancelset(exp) && req)
633                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
634         else
635                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
636
637         if (!req)
638                 RETURN(-ENOMEM);
639
640         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
641
642         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
644                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
645                        sizeof(*oti->oti_logcookies));
646         body->oa = *oa;
647
648         ptlrpc_req_set_repsize(req, 2, size);
649
650         ptlrpcd_add_req(req);
651         RETURN(0);
652 }
653
654 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
655                                 long writing_bytes)
656 {
657         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
658
659         LASSERT(!(oa->o_valid & bits));
660
661         oa->o_valid |= bits;
662         client_obd_list_lock(&cli->cl_loi_list_lock);
663         oa->o_dirty = cli->cl_dirty;
664         if (cli->cl_dirty > cli->cl_dirty_max) {
665                 CERROR("dirty %lu > dirty_max %lu\n",
666                        cli->cl_dirty, cli->cl_dirty_max);
667                 oa->o_undirty = 0;
668         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
669                 CERROR("dirty %d > system dirty_max %d\n",
670                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
671                 oa->o_undirty = 0;
672         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty, cli->cl_dirty_max);
675                 oa->o_undirty = 0;
676         } else {
677                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
678                                 (cli->cl_max_rpcs_in_flight + 1);
679                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
680         }
681         oa->o_grant = cli->cl_avail_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         client_obd_list_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 /* caller must hold loi_list_lock */
690 static void osc_consume_write_grant(struct client_obd *cli,
691                                     struct brw_page *pga)
692 {
693         atomic_inc(&obd_dirty_pages);
694         cli->cl_dirty += CFS_PAGE_SIZE;
695         cli->cl_avail_grant -= CFS_PAGE_SIZE;
696         pga->flag |= OBD_BRW_FROM_GRANT;
697         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
698                CFS_PAGE_SIZE, pga, pga->pg);
699         LASSERT(cli->cl_avail_grant >= 0);
700 }
701
702 /* the companion to osc_consume_write_grant, called when a brw has completed.
703  * must be called with the loi lock held. */
704 static void osc_release_write_grant(struct client_obd *cli,
705                                     struct brw_page *pga, int sent)
706 {
707         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
708         ENTRY;
709
710         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
711                 EXIT;
712                 return;
713         }
714
715         pga->flag &= ~OBD_BRW_FROM_GRANT;
716         atomic_dec(&obd_dirty_pages);
717         cli->cl_dirty -= CFS_PAGE_SIZE;
718         if (!sent) {
719                 cli->cl_lost_grant += CFS_PAGE_SIZE;
720                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
721                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
722         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
723                 /* For short writes we shouldn't count parts of pages that
724                  * span a whole block on the OST side, or our accounting goes
725                  * wrong.  Should match the code in filter_grant_check. */
726                 int offset = pga->off & ~CFS_PAGE_MASK;
727                 int count = pga->count + (offset & (blocksize - 1));
728                 int end = (offset + pga->count) & (blocksize - 1);
729                 if (end)
730                         count += blocksize - end;
731
732                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
733                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
734                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
735                        cli->cl_avail_grant, cli->cl_dirty);
736         }
737
738         EXIT;
739 }
740
741 static unsigned long rpcs_in_flight(struct client_obd *cli)
742 {
743         return cli->cl_r_in_flight + cli->cl_w_in_flight;
744 }
745
746 /* caller must hold loi_list_lock */
747 void osc_wake_cache_waiters(struct client_obd *cli)
748 {
749         struct list_head *l, *tmp;
750         struct osc_cache_waiter *ocw;
751
752         ENTRY;
753         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
754                 /* if we can't dirty more, we must wait until some is written */
755                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
756                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
757                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
758                                "osc max %ld, sys max %d\n", cli->cl_dirty,
759                                cli->cl_dirty_max, obd_max_dirty_pages);
760                         return;
761                 }
762
763                 /* if still dirty cache but no grant wait for pending RPCs that
764                  * may yet return us some grant before doing sync writes */
765                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
766                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
767                                cli->cl_w_in_flight);
768                         return;
769                 }
770
771                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
772                 list_del_init(&ocw->ocw_entry);
773                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
774                         /* no more RPCs in flight to return grant, do sync IO */
775                         ocw->ocw_rc = -EDQUOT;
776                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
777                 } else {
778                         osc_consume_write_grant(cli,
779                                                 &ocw->ocw_oap->oap_brw_page);
780                 }
781
782                 cfs_waitq_signal(&ocw->ocw_waitq);
783         }
784
785         EXIT;
786 }
787
788 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
789 {
790         client_obd_list_lock(&cli->cl_loi_list_lock);
791         cli->cl_avail_grant = ocd->ocd_grant;
792         client_obd_list_unlock(&cli->cl_loi_list_lock);
793
794         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
795                cli->cl_avail_grant, cli->cl_lost_grant);
796         LASSERT(cli->cl_avail_grant >= 0);
797 }
798
799 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
800 {
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
803         cli->cl_avail_grant += body->oa.o_grant;
804         /* waiters are woken in brw_interpret_oap */
805         client_obd_list_unlock(&cli->cl_loi_list_lock);
806 }
807
808 /* We assume that the reason this OSC got a short read is because it read
809  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
810  * via the LOV, and it _knows_ it's reading inside the file, it's just that
811  * this stripe never got written at or beyond this stripe offset yet. */
812 static void handle_short_read(int nob_read, obd_count page_count,
813                               struct brw_page **pga)
814 {
815         char *ptr;
816         int i = 0;
817
818         /* skip bytes read OK */
819         while (nob_read > 0) {
820                 LASSERT (page_count > 0);
821
822                 if (pga[i]->count > nob_read) {
823                         /* EOF inside this page */
824                         ptr = cfs_kmap(pga[i]->pg) +
825                                 (pga[i]->off & ~CFS_PAGE_MASK);
826                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
827                         cfs_kunmap(pga[i]->pg);
828                         page_count--;
829                         i++;
830                         break;
831                 }
832
833                 nob_read -= pga[i]->count;
834                 page_count--;
835                 i++;
836         }
837
838         /* zero remaining pages */
839         while (page_count-- > 0) {
840                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
841                 memset(ptr, 0, pga[i]->count);
842                 cfs_kunmap(pga[i]->pg);
843                 i++;
844         }
845 }
846
847 static int check_write_rcs(struct ptlrpc_request *req,
848                            int requested_nob, int niocount,
849                            obd_count page_count, struct brw_page **pga)
850 {
851         int    *remote_rcs, i;
852
853         /* return error if any niobuf was in error */
854         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
855                                         sizeof(*remote_rcs) * niocount, NULL);
856         if (remote_rcs == NULL) {
857                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
858                 return(-EPROTO);
859         }
860         if (lustre_msg_swabbed(req->rq_repmsg))
861                 for (i = 0; i < niocount; i++)
862                         __swab32s(&remote_rcs[i]);
863
864         for (i = 0; i < niocount; i++) {
865                 if (remote_rcs[i] < 0)
866                         return(remote_rcs[i]);
867
868                 if (remote_rcs[i] != 0) {
869                         CERROR("rc[%d] invalid (%d) req %p\n",
870                                 i, remote_rcs[i], req);
871                         return(-EPROTO);
872                 }
873         }
874
875         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
876                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
877                        requested_nob, req->rq_bulk->bd_nob_transferred);
878                 return(-EPROTO);
879         }
880
881         return (0);
882 }
883
884 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
885 {
886         if (p1->flag != p2->flag) {
887                 unsigned mask = ~OBD_BRW_FROM_GRANT;
888
889                 /* warn if we try to combine flags that we don't know to be
890                  * safe to combine */
891                 if ((p1->flag & mask) != (p2->flag & mask))
892                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
893                                "same brw?\n", p1->flag, p2->flag);
894                 return 0;
895         }
896
897         return (p1->off + p1->count == p2->off);
898 }
899
900 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
901                                    struct brw_page **pga)
902 {
903         __u32 cksum = ~0;
904         int i = 0;
905
906         LASSERT (pg_count > 0);
907         while (nob > 0 && pg_count > 0) {
908                 char *ptr = cfs_kmap(pga[i]->pg);
909                 int off = pga[i]->off & ~CFS_PAGE_MASK;
910                 int count = pga[i]->count > nob ? nob : pga[i]->count;
911
912                 /* corrupt the data before we compute the checksum, to
913                  * simulate an OST->client data error */
914                 if (i == 0 &&
915                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
916                         memcpy(ptr + off, "bad1", min(4, nob));
917                 cksum = crc32_le(cksum, ptr + off, count);
918                 cfs_kunmap(pga[i]->pg);
919                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
920                                off, cksum);
921
922                 nob -= pga[i]->count;
923                 pg_count--;
924                 i++;
925         }
926         /* For sending we only compute the wrong checksum instead
927          * of corrupting the data so it is still correct on a redo */
928         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
929                 cksum++;
930
931         return cksum;
932 }
933
934 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
935                                 struct lov_stripe_md *lsm, obd_count page_count,
936                                 struct brw_page **pga, 
937                                 struct ptlrpc_request **reqp,
938                                 struct obd_capa *ocapa)
939 {
940         struct ptlrpc_request   *req;
941         struct ptlrpc_bulk_desc *desc;
942         struct ost_body         *body;
943         struct obd_ioobj        *ioobj;
944         struct niobuf_remote    *niobuf;
945         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
946         int niocount, i, requested_nob, opc, rc;
947         struct ptlrpc_request_pool *pool;
948         struct lustre_capa      *capa;
949         struct osc_brw_async_args *aa;
950
951         ENTRY;
952         if ((cmd & OBD_BRW_WRITE) != 0) {
953                 opc = OST_WRITE;
954                 pool = cli->cl_import->imp_rq_pool;
955         } else {
956                 opc = OST_READ;
957                 pool = NULL;
958         }
959
960         for (niocount = i = 1; i < page_count; i++) {
961                 if (!can_merge_pages(pga[i - 1], pga[i]))
962                         niocount++;
963         }
964
965         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
966         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
967         if (ocapa)
968                 size[REQ_REC_OFF + 3] = sizeof(*capa);
969
970         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
971         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
972                                    size, NULL, pool, NULL);
973         if (req == NULL)
974                 RETURN (-ENOMEM);
975
976         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
977
978         if (opc == OST_WRITE)
979                 desc = ptlrpc_prep_bulk_imp (req, page_count,
980                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
981         else
982                 desc = ptlrpc_prep_bulk_imp (req, page_count,
983                                              BULK_PUT_SINK, OST_BULK_PORTAL);
984         if (desc == NULL)
985                 GOTO(out, rc = -ENOMEM);
986         /* NB request now owns desc and will free it when it gets freed */
987
988         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
989         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
990         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
991                                 niocount * sizeof(*niobuf));
992
993         body->oa = *oa;
994
995         obdo_to_ioobj(oa, ioobj);
996         ioobj->ioo_bufcnt = niocount;
997         if (ocapa) {
998                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
999                                       sizeof(*capa));
1000                 capa_cpy(capa, ocapa);
1001                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1002         }
1003
1004         LASSERT (page_count > 0);
1005         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1006                 struct brw_page *pg = pga[i];
1007                 struct brw_page *pg_prev = pga[i - 1];
1008
1009                 LASSERT(pg->count > 0);
1010                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1011                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1012                          pg->off, pg->count);
1013 #ifdef __LINUX__
1014                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1015                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1016                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1017                          i, page_count,
1018                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1019                          pg_prev->pg, page_private(pg_prev->pg),
1020                          pg_prev->pg->index, pg_prev->off);
1021 #else
1022                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1023                          "i %d p_c %u\n", i, page_count);
1024 #endif
1025                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1026                         (pg->flag & OBD_BRW_SRVLOCK));
1027
1028                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1029                                       pg->count);
1030                 requested_nob += pg->count;
1031
1032                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1033                         niobuf--;
1034                         niobuf->len += pg->count;
1035                 } else {
1036                         niobuf->offset = pg->off;
1037                         niobuf->len    = pg->count;
1038                         niobuf->flags  = pg->flag;
1039                 }
1040         }
1041
1042         LASSERT((void *)(niobuf - niocount) ==
1043                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1044                                niocount * sizeof(*niobuf)));
1045         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1046
1047         /* size[REQ_REC_OFF] still sizeof (*body) */
1048         if (opc == OST_WRITE) {
1049                 if (unlikely(cli->cl_checksum)) {
1050                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1051                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1052                                                              page_count, pga);
1053                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1054                                body->oa.o_cksum);
1055                         /* save this in 'oa', too, for later checking */
1056                         oa->o_valid |= OBD_MD_FLCKSUM;
1057                 } else {
1058                         /* clear out the checksum flag, in case this is a
1059                          * resend but cl_checksum is no longer set. b=11238 */
1060                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1061                 }
1062                 oa->o_cksum = body->oa.o_cksum;
1063                 /* 1 RC per niobuf */
1064                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1065                 ptlrpc_req_set_repsize(req, 3, size);
1066         } else {
1067                 if (unlikely(cli->cl_checksum))
1068                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1069                 /* 1 RC for the whole I/O */
1070                 ptlrpc_req_set_repsize(req, 2, size);
1071         }
1072
1073         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1074         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1075         aa->aa_oa = oa;
1076         aa->aa_requested_nob = requested_nob;
1077         aa->aa_nio_count = niocount;
1078         aa->aa_page_count = page_count;
1079         aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
1080         aa->aa_ppga = pga;
1081         aa->aa_cli = cli;
1082         INIT_LIST_HEAD(&aa->aa_oaps);
1083
1084         *reqp = req;
1085         RETURN (0);
1086
1087  out:
1088         ptlrpc_req_finished (req);
1089         RETURN (rc);
1090 }
1091
1092 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1093                                 __u32 client_cksum, __u32 server_cksum,
1094                                 int nob, obd_count page_count,
1095                                 struct brw_page **pga)
1096 {
1097         __u32 new_cksum;
1098         char *msg;
1099
1100         if (server_cksum == client_cksum) {
1101                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1102                 return 0;
1103         }
1104
1105         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1106
1107         if (new_cksum == server_cksum)
1108                 msg = "changed on the client after we checksummed it - "
1109                       "likely false positive due to mmap IO (bug 11742)";
1110         else if (new_cksum == client_cksum)
1111                 msg = "changed in transit before arrival at OST";
1112         else
1113                 msg = "changed in transit AND doesn't match the original - "
1114                       "likely false positive due to mmap IO (bug 11742)";
1115
1116         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1117                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1118                            "["LPU64"-"LPU64"]\n",
1119                            msg, libcfs_nid2str(peer->nid),
1120                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1121                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1122                                                         (__u64)0,
1123                            oa->o_id,
1124                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1125                            pga[0]->off,
1126                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1127         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1128                client_cksum, server_cksum, new_cksum);
1129         return 1;        
1130 }
1131
1132 /* Note rc enters this function as number of bytes transferred */
1133 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1134 {
1135         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1136         const lnet_process_id_t *peer =
1137                         &req->rq_import->imp_connection->c_peer;
1138         struct client_obd *cli = aa->aa_cli;
1139         struct ost_body *body;
1140         __u32 client_cksum = 0;
1141         ENTRY;
1142
1143         if (rc < 0 && rc != -EDQUOT)
1144                 RETURN(rc);
1145
1146         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1147         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1148                                   lustre_swab_ost_body);
1149         if (body == NULL) {
1150                 CERROR ("Can't unpack body\n");
1151                 RETURN(-EPROTO);
1152         }
1153
1154         /* set/clear over quota flag for a uid/gid */
1155         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1156             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1157                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1158                              body->oa.o_gid, body->oa.o_valid,
1159                              body->oa.o_flags);
1160
1161         if (rc < 0)
1162                 RETURN(rc);
1163
1164         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1165                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1166
1167         osc_update_grant(cli, body);
1168
1169         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1170                 if (rc > 0) {
1171                         CERROR ("Unexpected +ve rc %d\n", rc);
1172                         RETURN(-EPROTO);
1173                 }
1174                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1175
1176                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1177                              client_cksum &&
1178                              check_write_checksum(&body->oa, peer, client_cksum,
1179                                                   body->oa.o_cksum,
1180                                                   aa->aa_requested_nob,
1181                                                   aa->aa_page_count,
1182                                                   aa->aa_ppga)))
1183                         RETURN(-EAGAIN);
1184
1185                 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
1186
1187                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1188                                      aa->aa_page_count, aa->aa_ppga);
1189                 GOTO(out, rc);
1190         }
1191
1192         /* The rest of this function executes only for OST_READs */
1193         if (rc > aa->aa_requested_nob) {
1194                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1195                        aa->aa_requested_nob);
1196                 RETURN(-EPROTO);
1197         }
1198
1199         if (rc != req->rq_bulk->bd_nob_transferred) {
1200                 CERROR ("Unexpected rc %d (%d transferred)\n",
1201                         rc, req->rq_bulk->bd_nob_transferred);
1202                 return (-EPROTO);
1203         }
1204
1205         if (rc < aa->aa_requested_nob)
1206                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1207
1208         sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count, aa->aa_ppga);
1209
1210         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1211                 static int cksum_counter;
1212                 __u32      server_cksum = body->oa.o_cksum;
1213                 char      *via;
1214                 char      *router;
1215
1216                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1217                                                  aa->aa_ppga);
1218
1219                 if (peer->nid == req->rq_bulk->bd_sender) {
1220                         via = router = "";
1221                 } else {
1222                         via = " via ";
1223                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1224                 }
1225
1226                 if (server_cksum == ~0 && rc > 0) {
1227                         CERROR("Protocol error: server %s set the 'checksum' "
1228                                "bit, but didn't send a checksum.  Not fatal, "
1229                                "but please tell CFS.\n",
1230                                libcfs_nid2str(peer->nid));
1231                 } else if (server_cksum != client_cksum) {
1232                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1233                                            "%s%s%s inum "LPU64"/"LPU64" object "
1234                                            LPU64"/"LPU64" extent "
1235                                            "["LPU64"-"LPU64"]\n",
1236                                            req->rq_import->imp_obd->obd_name,
1237                                            libcfs_nid2str(peer->nid),
1238                                            via, router,
1239                                            body->oa.o_valid & OBD_MD_FLFID ?
1240                                                 body->oa.o_fid : (__u64)0,
1241                                            body->oa.o_valid & OBD_MD_FLFID ?
1242                                                 body->oa.o_generation :(__u64)0,
1243                                            body->oa.o_id,
1244                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1245                                                 body->oa.o_gr : (__u64)0,
1246                                            aa->aa_ppga[0]->off,
1247                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1248                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1249                                                                         1);
1250                         CERROR("client %x, server %x\n",
1251                                client_cksum, server_cksum);
1252                         cksum_counter = 0;
1253                         aa->aa_oa->o_cksum = client_cksum;
1254                         rc = -EAGAIN;
1255                 } else {
1256                         cksum_counter++;
1257                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1258                         rc = 0;
1259                 }
1260         } else if (unlikely(client_cksum)) {
1261                 static int cksum_missed;
1262
1263                 cksum_missed++;
1264                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1265                         CERROR("Checksum %u requested from %s but not sent\n",
1266                                cksum_missed, libcfs_nid2str(peer->nid));
1267         } else {
1268                 rc = 0;
1269         }
1270 out:
1271         if (rc >= 0)
1272                 *aa->aa_oa = body->oa;
1273
1274         RETURN(rc);
1275 }
1276
1277 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1278                             struct lov_stripe_md *lsm,
1279                             obd_count page_count, struct brw_page **pga,
1280                             struct obd_capa *ocapa)
1281 {
1282         struct ptlrpc_request *req;
1283         int                    rc, retries = 5; /* lprocfs? */
1284         ENTRY;
1285
1286 restart_bulk:
1287         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1288                                   page_count, pga, &req, ocapa);
1289         if (rc != 0)
1290                 return (rc);
1291
1292         rc = ptlrpc_queue_wait(req);
1293
1294         if (rc == -ETIMEDOUT && req->rq_resend) {
1295                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1296                 ptlrpc_req_finished(req);
1297                 goto restart_bulk;
1298         }
1299
1300         rc = osc_brw_fini_request(req, rc);
1301
1302         ptlrpc_req_finished(req);
1303         if (rc == -EAGAIN) {
1304                 if (retries-- > 0)
1305                         goto restart_bulk;
1306                 rc = -EIO;
1307         }
1308         RETURN (rc);
1309 }
1310
1311 int osc_brw_redo_request(struct ptlrpc_request *req,
1312                          struct osc_brw_async_args *aa)
1313 {
1314         struct ptlrpc_request *new_req;
1315         struct ptlrpc_request_set *set = req->rq_set;
1316         struct osc_brw_async_args *new_aa;
1317         struct osc_async_page *oap;
1318         int rc = 0;
1319         ENTRY;
1320
1321         if (aa->aa_retries-- <= 0) {
1322                 CERROR("too many checksum retries, returning error\n");
1323                 RETURN(-EIO);
1324         }
1325
1326         DEBUG_REQ(D_ERROR, req, "redo for checksum error");
1327         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1328                 if (oap->oap_request != NULL) {
1329                         LASSERTF(req == oap->oap_request,
1330                                  "request %p != oap_request %p\n",
1331                                  req, oap->oap_request);
1332                         if (oap->oap_interrupted) {
1333                                 ptlrpc_mark_interrupted(oap->oap_request);
1334                                 rc = -EINTR;
1335                                 break;
1336                         }
1337                 }
1338         }
1339         if (rc)
1340                 RETURN(rc);
1341         /* TODO-MERGE: and where to get ocapa?? */
1342         rc = osc_brw_prep_request(lustre_msg_get_opc(req->rq_reqmsg) ==
1343                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1344                                   aa->aa_cli, aa->aa_oa,
1345                                   NULL /* lsm unused by osc currently */,
1346                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1347                                   NULL /* ocapa */);
1348         if (rc)
1349                 RETURN(rc);
1350
1351         /* New request takes over pga and oaps from old request.
1352          * Note that copying a list_head doesn't work, need to move it... */
1353         new_req->rq_interpret_reply = req->rq_interpret_reply;
1354         new_req->rq_async_args = req->rq_async_args;
1355         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1356         INIT_LIST_HEAD(&new_aa->aa_oaps);
1357         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1358         INIT_LIST_HEAD(&aa->aa_oaps);
1359
1360         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1361                 if (oap->oap_request) {
1362                         ptlrpc_req_finished(oap->oap_request);
1363                         oap->oap_request = ptlrpc_request_addref(new_req);
1364                 }
1365         }
1366
1367         ptlrpc_set_add_req(set, new_req);
1368
1369         RETURN(0);
1370 }
1371
1372 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1373 {
1374         struct osc_brw_async_args *aa = data;
1375         int                        i;
1376         int                        nob = rc;
1377         ENTRY;
1378
1379         rc = osc_brw_fini_request(req, rc);
1380         if (rc == -EAGAIN) {
1381                 rc = osc_brw_redo_request(req, aa);
1382                 if (rc == 0)
1383                         RETURN(0);
1384         }
1385         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1386                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1387
1388         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1389         for (i = 0; i < aa->aa_page_count; i++)
1390                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1391         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1392
1393         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1394
1395         RETURN(rc);
1396 }
1397
1398 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1399                           struct lov_stripe_md *lsm, obd_count page_count,
1400                           struct brw_page **pga, struct ptlrpc_request_set *set,
1401                           struct obd_capa *ocapa)
1402 {
1403         struct ptlrpc_request     *req;
1404         struct client_obd         *cli = &exp->exp_obd->u.cli;
1405         int                        rc, i;
1406         ENTRY;
1407
1408         /* Consume write credits even if doing a sync write -
1409          * otherwise we may run out of space on OST due to grant. */
1410         if (cmd == OBD_BRW_WRITE) {
1411                 spin_lock(&cli->cl_loi_list_lock);
1412                 for (i = 0; i < page_count; i++) {
1413                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1414                                 osc_consume_write_grant(cli, pga[i]);
1415                 }
1416                 spin_unlock(&cli->cl_loi_list_lock);
1417         }
1418
1419         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1420                                   &req, ocapa);
1421         if (rc == 0) {
1422                 req->rq_interpret_reply = brw_interpret;
1423                 ptlrpc_set_add_req(set, req);
1424         } else if (cmd == OBD_BRW_WRITE) {
1425                 spin_lock(&cli->cl_loi_list_lock);
1426                 for (i = 0; i < page_count; i++)
1427                         osc_release_write_grant(cli, pga[i], 0);
1428                 spin_unlock(&cli->cl_loi_list_lock);
1429         }
1430         RETURN (rc);
1431 }
1432
1433 /*
1434  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1435  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1436  * fine for our small page arrays and doesn't require allocation.  its an
1437  * insertion sort that swaps elements that are strides apart, shrinking the
1438  * stride down until its '1' and the array is sorted.
1439  */
1440 static void sort_brw_pages(struct brw_page **array, int num)
1441 {
1442         int stride, i, j;
1443         struct brw_page *tmp;
1444
1445         if (num == 1)
1446                 return;
1447         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1448                 ;
1449
1450         do {
1451                 stride /= 3;
1452                 for (i = stride ; i < num ; i++) {
1453                         tmp = array[i];
1454                         j = i;
1455                         while (j >= stride && array[j - stride]->off > tmp->off) {
1456                                 array[j] = array[j - stride];
1457                                 j -= stride;
1458                         }
1459                         array[j] = tmp;
1460                 }
1461         } while (stride > 1);
1462 }
1463
1464 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1465 {
1466         int count = 1;
1467         int offset;
1468         int i = 0;
1469
1470         LASSERT (pages > 0);
1471         offset = pg[i]->off & ~CFS_PAGE_MASK;
1472
1473         for (;;) {
1474                 pages--;
1475                 if (pages == 0)         /* that's all */
1476                         return count;
1477
1478                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1479                         return count;   /* doesn't end on page boundary */
1480
1481                 i++;
1482                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1483                 if (offset != 0)        /* doesn't start on page boundary */
1484                         return count;
1485
1486                 count++;
1487         }
1488 }
1489
1490 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1491 {
1492         struct brw_page **ppga;
1493         int i;
1494
1495         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1496         if (ppga == NULL)
1497                 return NULL;
1498
1499         for (i = 0; i < count; i++)
1500                 ppga[i] = pga + i;
1501         return ppga;
1502 }
1503
1504 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1505 {
1506         LASSERT(ppga != NULL);
1507         OBD_FREE(ppga, sizeof(*ppga) * count);
1508 }
1509
1510 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1511                    obd_count page_count, struct brw_page *pga,
1512                    struct obd_trans_info *oti)
1513 {
1514         struct obdo *saved_oa = NULL;
1515         struct brw_page **ppga, **orig;
1516         struct obd_import *imp = class_exp2cliimp(exp);
1517         struct client_obd *cli = &imp->imp_obd->u.cli;
1518         int rc, page_count_orig;
1519         ENTRY;
1520
1521         if (cmd & OBD_BRW_CHECK) {
1522                 /* The caller just wants to know if there's a chance that this
1523                  * I/O can succeed */
1524
1525                 if (imp == NULL || imp->imp_invalid)
1526                         RETURN(-EIO);
1527                 RETURN(0);
1528         }
1529
1530         /* test_brw with a failed create can trip this, maybe others. */
1531         LASSERT(cli->cl_max_pages_per_rpc);
1532
1533         rc = 0;
1534
1535         orig = ppga = osc_build_ppga(pga, page_count);
1536         if (ppga == NULL)
1537                 RETURN(-ENOMEM);
1538         page_count_orig = page_count;
1539
1540         sort_brw_pages(ppga, page_count);
1541         while (page_count) {
1542                 obd_count pages_per_brw;
1543
1544                 if (page_count > cli->cl_max_pages_per_rpc)
1545                         pages_per_brw = cli->cl_max_pages_per_rpc;
1546                 else
1547                         pages_per_brw = page_count;
1548
1549                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1550
1551                 if (saved_oa != NULL) {
1552                         /* restore previously saved oa */
1553                         *oinfo->oi_oa = *saved_oa;
1554                 } else if (page_count > pages_per_brw) {
1555                         /* save a copy of oa (brw will clobber it) */
1556                         OBDO_ALLOC(saved_oa);
1557                         if (saved_oa == NULL)
1558                                 GOTO(out, rc = -ENOMEM);
1559                         *saved_oa = *oinfo->oi_oa;
1560                 }
1561
1562                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1563                                       pages_per_brw, ppga, oinfo->oi_capa);
1564
1565                 if (rc != 0)
1566                         break;
1567
1568                 page_count -= pages_per_brw;
1569                 ppga += pages_per_brw;
1570         }
1571
1572 out:
1573         osc_release_ppga(orig, page_count_orig);
1574
1575         if (saved_oa != NULL)
1576                 OBDO_FREE(saved_oa);
1577
1578         RETURN(rc);
1579 }
1580
1581 static int osc_brw_async(int cmd, struct obd_export *exp,
1582                          struct obd_info *oinfo, obd_count page_count,
1583                          struct brw_page *pga, struct obd_trans_info *oti,
1584                          struct ptlrpc_request_set *set)
1585 {
1586         struct brw_page **ppga, **orig;
1587         struct client_obd *cli = &exp->exp_obd->u.cli;
1588         int page_count_orig;
1589         int rc = 0;
1590         ENTRY;
1591
1592         if (cmd & OBD_BRW_CHECK) {
1593                 struct obd_import *imp = class_exp2cliimp(exp);
1594                 /* The caller just wants to know if there's a chance that this
1595                  * I/O can succeed */
1596
1597                 if (imp == NULL || imp->imp_invalid)
1598                         RETURN(-EIO);
1599                 RETURN(0);
1600         }
1601
1602         orig = ppga = osc_build_ppga(pga, page_count);
1603         if (ppga == NULL)
1604                 RETURN(-ENOMEM);
1605         page_count_orig = page_count;
1606
1607         sort_brw_pages(ppga, page_count);
1608         while (page_count) {
1609                 struct brw_page **copy;
1610                 obd_count pages_per_brw;
1611
1612                 pages_per_brw = min_t(obd_count, page_count,
1613                                       cli->cl_max_pages_per_rpc);
1614
1615                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1616
1617                 /* use ppga only if single RPC is going to fly */
1618                 if (pages_per_brw != page_count_orig || ppga != orig) {
1619                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1620                         if (copy == NULL)
1621                                 GOTO(out, rc = -ENOMEM);
1622                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1623                 } else
1624                         copy = ppga;
1625
1626                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1627                                     pages_per_brw, copy, set, oinfo->oi_capa);
1628
1629                 if (rc != 0) {
1630                         if (copy != ppga)
1631                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1632                         break;
1633                 }
1634                 if (copy == orig) {
1635                         /* we passed it to async_internal() which is
1636                          * now responsible for releasing memory */
1637                         orig = NULL;
1638                 }
1639
1640                 page_count -= pages_per_brw;
1641                 ppga += pages_per_brw;
1642         }
1643 out:
1644         if (orig)
1645                 osc_release_ppga(orig, page_count_orig);
1646         RETURN(rc);
1647 }
1648
1649 static void osc_check_rpcs(struct client_obd *cli);
1650
1651 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1652  * the dirty accounting.  Writeback completes or truncate happens before
1653  * writing starts.  Must be called with the loi lock held. */
1654 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1655                            int sent)
1656 {
1657         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1658 }
1659
1660
1661 /* This maintains the lists of pending pages to read/write for a given object
1662  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1663  * to quickly find objects that are ready to send an RPC. */
1664 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1665                          int cmd)
1666 {
1667         int optimal;
1668         ENTRY;
1669
1670         if (lop->lop_num_pending == 0)
1671                 RETURN(0);
1672
1673         /* if we have an invalid import we want to drain the queued pages
1674          * by forcing them through rpcs that immediately fail and complete
1675          * the pages.  recovery relies on this to empty the queued pages
1676          * before canceling the locks and evicting down the llite pages */
1677         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1678                 RETURN(1);
1679
1680         /* stream rpcs in queue order as long as as there is an urgent page
1681          * queued.  this is our cheap solution for good batching in the case
1682          * where writepage marks some random page in the middle of the file
1683          * as urgent because of, say, memory pressure */
1684         if (!list_empty(&lop->lop_urgent)) {
1685                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1686                 RETURN(1);
1687         }
1688         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1689         optimal = cli->cl_max_pages_per_rpc;
1690         if (cmd & OBD_BRW_WRITE) {
1691                 /* trigger a write rpc stream as long as there are dirtiers
1692                  * waiting for space.  as they're waiting, they're not going to
1693                  * create more pages to coallesce with what's waiting.. */
1694                 if (!list_empty(&cli->cl_cache_waiters)) {
1695                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1696                         RETURN(1);
1697                 }
1698                 /* +16 to avoid triggering rpcs that would want to include pages
1699                  * that are being queued but which can't be made ready until
1700                  * the queuer finishes with the page. this is a wart for
1701                  * llite::commit_write() */
1702                 optimal += 16;
1703         }
1704         if (lop->lop_num_pending >= optimal)
1705                 RETURN(1);
1706
1707         RETURN(0);
1708 }
1709
1710 static void on_list(struct list_head *item, struct list_head *list,
1711                     int should_be_on)
1712 {
1713         if (list_empty(item) && should_be_on)
1714                 list_add_tail(item, list);
1715         else if (!list_empty(item) && !should_be_on)
1716                 list_del_init(item);
1717 }
1718
1719 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1720  * can find pages to build into rpcs quickly */
1721 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1722 {
1723         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1724                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1725                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1726
1727         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1728                 loi->loi_write_lop.lop_num_pending);
1729
1730         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1731                 loi->loi_read_lop.lop_num_pending);
1732 }
1733
1734 static void lop_update_pending(struct client_obd *cli,
1735                                struct loi_oap_pages *lop, int cmd, int delta)
1736 {
1737         lop->lop_num_pending += delta;
1738         if (cmd & OBD_BRW_WRITE)
1739                 cli->cl_pending_w_pages += delta;
1740         else
1741                 cli->cl_pending_r_pages += delta;
1742 }
1743
1744 /* this is called when a sync waiter receives an interruption.  Its job is to
1745  * get the caller woken as soon as possible.  If its page hasn't been put in an
1746  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1747  * desiring interruption which will forcefully complete the rpc once the rpc
1748  * has timed out */
1749 static void osc_occ_interrupted(struct oig_callback_context *occ)
1750 {
1751         struct osc_async_page *oap;
1752         struct loi_oap_pages *lop;
1753         struct lov_oinfo *loi;
1754         ENTRY;
1755
1756         /* XXX member_of() */
1757         oap = list_entry(occ, struct osc_async_page, oap_occ);
1758
1759         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1760
1761         oap->oap_interrupted = 1;
1762
1763         /* ok, it's been put in an rpc. only one oap gets a request reference */
1764         if (oap->oap_request != NULL) {
1765                 ptlrpc_mark_interrupted(oap->oap_request);
1766                 ptlrpcd_wake(oap->oap_request);
1767                 GOTO(unlock, 0);
1768         }
1769
1770         /* we don't get interruption callbacks until osc_trigger_group_io()
1771          * has been called and put the sync oaps in the pending/urgent lists.*/
1772         if (!list_empty(&oap->oap_pending_item)) {
1773                 list_del_init(&oap->oap_pending_item);
1774                 list_del_init(&oap->oap_urgent_item);
1775
1776                 loi = oap->oap_loi;
1777                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1778                         &loi->loi_write_lop : &loi->loi_read_lop;
1779                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1780                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1781
1782                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1783                 oap->oap_oig = NULL;
1784         }
1785
1786 unlock:
1787         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1788 }
1789
1790 /* this is trying to propogate async writeback errors back up to the
1791  * application.  As an async write fails we record the error code for later if
1792  * the app does an fsync.  As long as errors persist we force future rpcs to be
1793  * sync so that the app can get a sync error and break the cycle of queueing
1794  * pages for which writeback will fail. */
1795 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1796                            int rc)
1797 {
1798         if (rc) {
1799                 if (!ar->ar_rc)
1800                         ar->ar_rc = rc;
1801
1802                 ar->ar_force_sync = 1;
1803                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1804                 return;
1805
1806         }
1807
1808         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1809                 ar->ar_force_sync = 0;
1810 }
1811
1812 static void osc_oap_to_pending(struct osc_async_page *oap)
1813 {
1814         struct loi_oap_pages *lop;
1815
1816         if (oap->oap_cmd & OBD_BRW_WRITE)
1817                 lop = &oap->oap_loi->loi_write_lop;
1818         else
1819                 lop = &oap->oap_loi->loi_read_lop;
1820
1821         if (oap->oap_async_flags & ASYNC_URGENT)
1822                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1823         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1824         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1825 }
1826
1827 /* this must be called holding the loi list lock to give coverage to exit_cache,
1828  * async_flag maintenance, and oap_request */
1829 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1830                               struct osc_async_page *oap, int sent, int rc)
1831 {
1832         ENTRY;
1833         oap->oap_async_flags = 0;
1834         oap->oap_interrupted = 0;
1835
1836         if (oap->oap_cmd & OBD_BRW_WRITE) {
1837                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1838                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1839         }
1840
1841         if (oap->oap_request != NULL) {
1842                 ptlrpc_req_finished(oap->oap_request);
1843                 oap->oap_request = NULL;
1844         }
1845
1846         if (rc == 0 && oa != NULL) {
1847                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1848                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1849                 if (oa->o_valid & OBD_MD_FLMTIME)
1850                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1851                 if (oa->o_valid & OBD_MD_FLATIME)
1852                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1853                 if (oa->o_valid & OBD_MD_FLCTIME)
1854                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1855         }
1856
1857         if (oap->oap_oig) {
1858                 osc_exit_cache(cli, oap, sent);
1859                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1860                 oap->oap_oig = NULL;
1861                 EXIT;
1862                 return;
1863         }
1864
1865         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1866                                                 oap->oap_cmd, oa, rc);
1867
1868         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1869          * I/O on the page could start, but OSC calls it under lock
1870          * and thus we can add oap back to pending safely */
1871         if (rc)
1872                 /* upper layer wants to leave the page on pending queue */
1873                 osc_oap_to_pending(oap);
1874         else
1875                 osc_exit_cache(cli, oap, sent);
1876         EXIT;
1877 }
1878
1879 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1880 {
1881         struct osc_async_page *oap, *tmp;
1882         struct osc_brw_async_args *aa = data;
1883         struct client_obd *cli;
1884         ENTRY;
1885
1886         rc = osc_brw_fini_request(req, rc);
1887         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1888         if (rc == -EAGAIN) {
1889                 rc = osc_brw_redo_request(req, aa);
1890                 if (rc == 0)
1891                         RETURN(0);
1892                 GOTO(out, rc);
1893         }
1894
1895         cli = aa->aa_cli;
1896
1897         client_obd_list_lock(&cli->cl_loi_list_lock);
1898
1899         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1900          * is called so we know whether to go to sync BRWs or wait for more
1901          * RPCs to complete */
1902         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1903                 cli->cl_w_in_flight--;
1904         else
1905                 cli->cl_r_in_flight--;
1906
1907         /* the caller may re-use the oap after the completion call so
1908          * we need to clean it up a little */
1909         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1910                 list_del_init(&oap->oap_rpc_item);
1911                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1912         }
1913
1914         osc_wake_cache_waiters(cli);
1915         osc_check_rpcs(cli);
1916
1917         client_obd_list_unlock(&cli->cl_loi_list_lock);
1918
1919         OBDO_FREE(aa->aa_oa);
1920         rc = 0;
1921 out:
1922         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1923         RETURN(rc);
1924 }
1925
1926 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1927                                             struct list_head *rpc_list,
1928                                             int page_count, int cmd)
1929 {
1930         struct ptlrpc_request *req;
1931         struct brw_page **pga = NULL;
1932         struct osc_brw_async_args *aa;
1933         struct obdo *oa = NULL;
1934         struct obd_async_page_ops *ops = NULL;
1935         void *caller_data = NULL;
1936         struct obd_capa *ocapa;
1937         struct osc_async_page *oap;
1938         int i, rc;
1939
1940         ENTRY;
1941         LASSERT(!list_empty(rpc_list));
1942
1943         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1944         if (pga == NULL)
1945                 RETURN(ERR_PTR(-ENOMEM));
1946
1947         OBDO_ALLOC(oa);
1948         if (oa == NULL)
1949                 GOTO(out, req = ERR_PTR(-ENOMEM));
1950
1951         i = 0;
1952         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1953                 if (ops == NULL) {
1954                         ops = oap->oap_caller_ops;
1955                         caller_data = oap->oap_caller_data;
1956                 }
1957                 pga[i] = &oap->oap_brw_page;
1958                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1959                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1960                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1961                 i++;
1962         }
1963
1964         /* always get the data for the obdo for the rpc */
1965         LASSERT(ops != NULL);
1966         ops->ap_fill_obdo(caller_data, cmd, oa);
1967         ocapa = ops->ap_lookup_capa(caller_data, cmd);
1968
1969         sort_brw_pages(pga, page_count);
1970         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1971                                   pga, &req, ocapa);
1972         capa_put(ocapa);
1973         if (rc != 0) {
1974                 CERROR("prep_req failed: %d\n", rc);
1975                 GOTO(out, req = ERR_PTR(rc));
1976         }
1977
1978         /* Need to update the timestamps after the request is built in case
1979          * we race with setattr (locally or in queue at OST).  If OST gets
1980          * later setattr before earlier BRW (as determined by the request xid),
1981          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1982          * way to do this in a single call.  bug 10150 */
1983         ops->ap_update_obdo(caller_data, cmd, oa,
1984                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1985
1986         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1987         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1988         INIT_LIST_HEAD(&aa->aa_oaps);
1989         list_splice(rpc_list, &aa->aa_oaps);
1990         INIT_LIST_HEAD(rpc_list);
1991
1992 out:
1993         if (IS_ERR(req)) {
1994                 if (oa)
1995                         OBDO_FREE(oa);
1996                 if (pga)
1997                         OBD_FREE(pga, sizeof(*pga) * page_count);
1998         }
1999         RETURN(req);
2000 }
2001
2002 /* the loi lock is held across this function but it's allowed to release
2003  * and reacquire it during its work */
2004 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2005                             int cmd, struct loi_oap_pages *lop)
2006 {
2007         struct ptlrpc_request *req;
2008         obd_count page_count = 0;
2009         struct osc_async_page *oap = NULL, *tmp;
2010         struct osc_brw_async_args *aa;
2011         struct obd_async_page_ops *ops;
2012         CFS_LIST_HEAD(rpc_list);
2013         unsigned int ending_offset;
2014         unsigned  starting_offset = 0;
2015         ENTRY;
2016
2017         /* first we find the pages we're allowed to work with */
2018         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2019                                  oap_pending_item) {
2020                 ops = oap->oap_caller_ops;
2021
2022                 LASSERT(oap->oap_magic == OAP_MAGIC);
2023
2024                 /* in llite being 'ready' equates to the page being locked
2025                  * until completion unlocks it.  commit_write submits a page
2026                  * as not ready because its unlock will happen unconditionally
2027                  * as the call returns.  if we race with commit_write giving
2028                  * us that page we dont' want to create a hole in the page
2029                  * stream, so we stop and leave the rpc to be fired by
2030                  * another dirtier or kupdated interval (the not ready page
2031                  * will still be on the dirty list).  we could call in
2032                  * at the end of ll_file_write to process the queue again. */
2033                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2034                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2035                         if (rc < 0)
2036                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2037                                                 "instead of ready\n", oap,
2038                                                 oap->oap_page, rc);
2039                         switch (rc) {
2040                         case -EAGAIN:
2041                                 /* llite is telling us that the page is still
2042                                  * in commit_write and that we should try
2043                                  * and put it in an rpc again later.  we
2044                                  * break out of the loop so we don't create
2045                                  * a hole in the sequence of pages in the rpc
2046                                  * stream.*/
2047                                 oap = NULL;
2048                                 break;
2049                         case -EINTR:
2050                                 /* the io isn't needed.. tell the checks
2051                                  * below to complete the rpc with EINTR */
2052                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2053                                 oap->oap_count = -EINTR;
2054                                 break;
2055                         case 0:
2056                                 oap->oap_async_flags |= ASYNC_READY;
2057                                 break;
2058                         default:
2059                                 LASSERTF(0, "oap %p page %p returned %d "
2060                                             "from make_ready\n", oap,
2061                                             oap->oap_page, rc);
2062                                 break;
2063                         }
2064                 }
2065                 if (oap == NULL)
2066                         break;
2067                 /*
2068                  * Page submitted for IO has to be locked. Either by
2069                  * ->ap_make_ready() or by higher layers.
2070                  *
2071                  * XXX nikita: this assertion should be adjusted when lustre
2072                  * starts using PG_writeback for pages being written out.
2073                  */
2074 #if defined(__KERNEL__) && defined(__LINUX__)
2075                 LASSERT(PageLocked(oap->oap_page));
2076 #endif
2077                 /* If there is a gap at the start of this page, it can't merge
2078                  * with any previous page, so we'll hand the network a
2079                  * "fragmented" page array that it can't transfer in 1 RDMA */
2080                 if (page_count != 0 && oap->oap_page_off != 0)
2081                         break;
2082
2083                 /* take the page out of our book-keeping */
2084                 list_del_init(&oap->oap_pending_item);
2085                 lop_update_pending(cli, lop, cmd, -1);
2086                 list_del_init(&oap->oap_urgent_item);
2087
2088                 if (page_count == 0)
2089                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2090                                           (PTLRPC_MAX_BRW_SIZE - 1);
2091
2092                 /* ask the caller for the size of the io as the rpc leaves. */
2093                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2094                         oap->oap_count =
2095                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2096                 if (oap->oap_count <= 0) {
2097                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2098                                oap->oap_count);
2099                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2100                         continue;
2101                 }
2102
2103                 /* now put the page back in our accounting */
2104                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2105                 if (++page_count >= cli->cl_max_pages_per_rpc)
2106                         break;
2107
2108                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2109                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2110                  * have the same alignment as the initial writes that allocated
2111                  * extents on the server. */
2112                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2113                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2114                 if (ending_offset == 0)
2115                         break;
2116
2117                 /* If there is a gap at the end of this page, it can't merge
2118                  * with any subsequent pages, so we'll hand the network a
2119                  * "fragmented" page array that it can't transfer in 1 RDMA */
2120                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2121                         break;
2122         }
2123
2124         osc_wake_cache_waiters(cli);
2125
2126         if (page_count == 0)
2127                 RETURN(0);
2128
2129         loi_list_maint(cli, loi);
2130
2131         client_obd_list_unlock(&cli->cl_loi_list_lock);
2132
2133         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2134         if (IS_ERR(req)) {
2135                 /* this should happen rarely and is pretty bad, it makes the
2136                  * pending list not follow the dirty order */
2137                 client_obd_list_lock(&cli->cl_loi_list_lock);
2138                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2139                         list_del_init(&oap->oap_rpc_item);
2140
2141                         /* queued sync pages can be torn down while the pages
2142                          * were between the pending list and the rpc */
2143                         if (oap->oap_interrupted) {
2144                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2145                                 osc_ap_completion(cli, NULL, oap, 0,
2146                                                   oap->oap_count);
2147                                 continue;
2148                         }
2149                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2150                 }
2151                 loi_list_maint(cli, loi);
2152                 RETURN(PTR_ERR(req));
2153         }
2154
2155         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2156
2157         if (cmd == OBD_BRW_READ) {
2158                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2159                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2160                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2161                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2162                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2163         } else {
2164                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2165                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2166                                  cli->cl_w_in_flight);
2167                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2168                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2169                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2170         }
2171
2172         client_obd_list_lock(&cli->cl_loi_list_lock);
2173
2174         if (cmd == OBD_BRW_READ)
2175                 cli->cl_r_in_flight++;
2176         else
2177                 cli->cl_w_in_flight++;
2178
2179         /* queued sync pages can be torn down while the pages
2180          * were between the pending list and the rpc */
2181         tmp = NULL;
2182         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2183                 /* only one oap gets a request reference */
2184                 if (tmp == NULL)
2185                         tmp = oap;
2186                 if (oap->oap_interrupted && !req->rq_intr) {
2187                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2188                                oap, req);
2189                         ptlrpc_mark_interrupted(req);
2190                 }
2191         }
2192         if (tmp != NULL)
2193                 tmp->oap_request = ptlrpc_request_addref(req);
2194
2195         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2196                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2197
2198         req->rq_interpret_reply = brw_interpret_oap;
2199         ptlrpcd_add_req(req);
2200         RETURN(1);
2201 }
2202
2203 #define LOI_DEBUG(LOI, STR, args...)                                     \
2204         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2205                !list_empty(&(LOI)->loi_cli_item),                        \
2206                (LOI)->loi_write_lop.lop_num_pending,                     \
2207                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2208                (LOI)->loi_read_lop.lop_num_pending,                      \
2209                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2210                args)                                                     \
2211
2212 /* This is called by osc_check_rpcs() to find which objects have pages that
2213  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2214 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2215 {
2216         ENTRY;
2217         /* first return all objects which we already know to have
2218          * pages ready to be stuffed into rpcs */
2219         if (!list_empty(&cli->cl_loi_ready_list))
2220                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2221                                   struct lov_oinfo, loi_cli_item));
2222
2223         /* then if we have cache waiters, return all objects with queued
2224          * writes.  This is especially important when many small files
2225          * have filled up the cache and not been fired into rpcs because
2226          * they don't pass the nr_pending/object threshhold */
2227         if (!list_empty(&cli->cl_cache_waiters) &&
2228             !list_empty(&cli->cl_loi_write_list))
2229                 RETURN(list_entry(cli->cl_loi_write_list.next,
2230                                   struct lov_oinfo, loi_write_item));
2231
2232         /* then return all queued objects when we have an invalid import
2233          * so that they get flushed */
2234         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2235                 if (!list_empty(&cli->cl_loi_write_list))
2236                         RETURN(list_entry(cli->cl_loi_write_list.next,
2237                                           struct lov_oinfo, loi_write_item));
2238                 if (!list_empty(&cli->cl_loi_read_list))
2239                         RETURN(list_entry(cli->cl_loi_read_list.next,
2240                                           struct lov_oinfo, loi_read_item));
2241         }
2242         RETURN(NULL);
2243 }
2244
2245 /* called with the loi list lock held */
2246 static void osc_check_rpcs(struct client_obd *cli)
2247 {
2248         struct lov_oinfo *loi;
2249         int rc = 0, race_counter = 0;
2250         ENTRY;
2251
2252         while ((loi = osc_next_loi(cli)) != NULL) {
2253                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2254
2255                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2256                         break;
2257
2258                 /* attempt some read/write balancing by alternating between
2259                  * reads and writes in an object.  The makes_rpc checks here
2260                  * would be redundant if we were getting read/write work items
2261                  * instead of objects.  we don't want send_oap_rpc to drain a
2262                  * partial read pending queue when we're given this object to
2263                  * do io on writes while there are cache waiters */
2264                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2265                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2266                                               &loi->loi_write_lop);
2267                         if (rc < 0)
2268                                 break;
2269                         if (rc > 0)
2270                                 race_counter = 0;
2271                         else
2272                                 race_counter++;
2273                 }
2274                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2275                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2276                                               &loi->loi_read_lop);
2277                         if (rc < 0)
2278                                 break;
2279                         if (rc > 0)
2280                                 race_counter = 0;
2281                         else
2282                                 race_counter++;
2283                 }
2284
2285                 /* attempt some inter-object balancing by issueing rpcs
2286                  * for each object in turn */
2287                 if (!list_empty(&loi->loi_cli_item))
2288                         list_del_init(&loi->loi_cli_item);
2289                 if (!list_empty(&loi->loi_write_item))
2290                         list_del_init(&loi->loi_write_item);
2291                 if (!list_empty(&loi->loi_read_item))
2292                         list_del_init(&loi->loi_read_item);
2293
2294                 loi_list_maint(cli, loi);
2295
2296                 /* send_oap_rpc fails with 0 when make_ready tells it to
2297                  * back off.  llite's make_ready does this when it tries
2298                  * to lock a page queued for write that is already locked.
2299                  * we want to try sending rpcs from many objects, but we
2300                  * don't want to spin failing with 0.  */
2301                 if (race_counter == 10)
2302                         break;
2303         }
2304         EXIT;
2305 }
2306
2307 /* we're trying to queue a page in the osc so we're subject to the
2308  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2309  * If the osc's queued pages are already at that limit, then we want to sleep
2310  * until there is space in the osc's queue for us.  We also may be waiting for
2311  * write credits from the OST if there are RPCs in flight that may return some
2312  * before we fall back to sync writes.
2313  *
2314  * We need this know our allocation was granted in the presence of signals */
2315 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2316 {
2317         int rc;
2318         ENTRY;
2319         client_obd_list_lock(&cli->cl_loi_list_lock);
2320         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2321         client_obd_list_unlock(&cli->cl_loi_list_lock);
2322         RETURN(rc);
2323 };
2324
2325 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2326  * grant or cache space. */
2327 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2328                            struct osc_async_page *oap)
2329 {
2330         struct osc_cache_waiter ocw;
2331         struct l_wait_info lwi = { 0 };
2332
2333         ENTRY;
2334
2335         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2336                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2337                cli->cl_dirty_max, obd_max_dirty_pages,
2338                cli->cl_lost_grant, cli->cl_avail_grant);
2339
2340         /* force the caller to try sync io.  this can jump the list
2341          * of queued writes and create a discontiguous rpc stream */
2342         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2343             loi->loi_ar.ar_force_sync)
2344                 RETURN(-EDQUOT);
2345
2346         /* Hopefully normal case - cache space and write credits available */
2347         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2348             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2349             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2350                 /* account for ourselves */
2351                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2352                 RETURN(0);
2353         }
2354
2355         /* Make sure that there are write rpcs in flight to wait for.  This
2356          * is a little silly as this object may not have any pending but
2357          * other objects sure might. */
2358         if (cli->cl_w_in_flight) {
2359                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2360                 cfs_waitq_init(&ocw.ocw_waitq);
2361                 ocw.ocw_oap = oap;
2362                 ocw.ocw_rc = 0;
2363
2364                 loi_list_maint(cli, loi);
2365                 osc_check_rpcs(cli);
2366                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2367
2368                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2369                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2370
2371                 client_obd_list_lock(&cli->cl_loi_list_lock);
2372                 if (!list_empty(&ocw.ocw_entry)) {
2373                         list_del(&ocw.ocw_entry);
2374                         RETURN(-EINTR);
2375                 }
2376                 RETURN(ocw.ocw_rc);
2377         }
2378
2379         RETURN(-EDQUOT);
2380 }
2381
2382 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2383                         struct lov_oinfo *loi, cfs_page_t *page,
2384                         obd_off offset, struct obd_async_page_ops *ops,
2385                         void *data, void **res)
2386 {
2387         struct osc_async_page *oap;
2388         ENTRY;
2389
2390         if (!page)
2391                 return size_round(sizeof(*oap));
2392
2393         oap = *res;
2394         oap->oap_magic = OAP_MAGIC;
2395         oap->oap_cli = &exp->exp_obd->u.cli;
2396         oap->oap_loi = loi;
2397
2398         oap->oap_caller_ops = ops;
2399         oap->oap_caller_data = data;
2400
2401         oap->oap_page = page;
2402         oap->oap_obj_off = offset;
2403
2404         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2405         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2406         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2407
2408         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2409
2410         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2411         RETURN(0);
2412 }
2413
2414 struct osc_async_page *oap_from_cookie(void *cookie)
2415 {
2416         struct osc_async_page *oap = cookie;
2417         if (oap->oap_magic != OAP_MAGIC)
2418                 return ERR_PTR(-EINVAL);
2419         return oap;
2420 };
2421
2422 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2423                               struct lov_oinfo *loi, void *cookie,
2424                               int cmd, obd_off off, int count,
2425                               obd_flag brw_flags, enum async_flags async_flags)
2426 {
2427         struct client_obd *cli = &exp->exp_obd->u.cli;
2428         struct osc_async_page *oap;
2429         int rc = 0;
2430         ENTRY;
2431
2432         oap = oap_from_cookie(cookie);
2433         if (IS_ERR(oap))
2434                 RETURN(PTR_ERR(oap));
2435
2436         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2437                 RETURN(-EIO);
2438
2439         if (!list_empty(&oap->oap_pending_item) ||
2440             !list_empty(&oap->oap_urgent_item) ||
2441             !list_empty(&oap->oap_rpc_item))
2442                 RETURN(-EBUSY);
2443
2444         /* check if the file's owner/group is over quota */
2445 #ifdef HAVE_QUOTA_SUPPORT
2446         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2447                 struct obd_async_page_ops *ops;
2448                 struct obdo *oa;
2449
2450                 OBDO_ALLOC(oa);
2451                 if (oa == NULL)
2452                         RETURN(-ENOMEM);
2453
2454                 ops = oap->oap_caller_ops;
2455                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2456                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2457                     NO_QUOTA)
2458                         rc = -EDQUOT;
2459
2460                 OBDO_FREE(oa);
2461                 if (rc)
2462                         RETURN(rc);
2463         }
2464 #endif
2465
2466         if (loi == NULL)
2467                 loi = lsm->lsm_oinfo[0];
2468
2469         client_obd_list_lock(&cli->cl_loi_list_lock);
2470
2471         oap->oap_cmd = cmd;
2472         oap->oap_page_off = off;
2473         oap->oap_count = count;
2474         oap->oap_brw_flags = brw_flags;
2475         oap->oap_async_flags = async_flags;
2476
2477         if (cmd & OBD_BRW_WRITE) {
2478                 rc = osc_enter_cache(cli, loi, oap);
2479                 if (rc) {
2480                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2481                         RETURN(rc);
2482                 }
2483         }
2484
2485         osc_oap_to_pending(oap);
2486         loi_list_maint(cli, loi);
2487
2488         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2489                   cmd);
2490
2491         osc_check_rpcs(cli);
2492         client_obd_list_unlock(&cli->cl_loi_list_lock);
2493
2494         RETURN(0);
2495 }
2496
2497 /* aka (~was & now & flag), but this is more clear :) */
2498 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2499
2500 static int osc_set_async_flags(struct obd_export *exp,
2501                                struct lov_stripe_md *lsm,
2502                                struct lov_oinfo *loi, void *cookie,
2503                                obd_flag async_flags)
2504 {
2505         struct client_obd *cli = &exp->exp_obd->u.cli;
2506         struct loi_oap_pages *lop;
2507         struct osc_async_page *oap;
2508         int rc = 0;
2509         ENTRY;
2510
2511         oap = oap_from_cookie(cookie);
2512         if (IS_ERR(oap))
2513                 RETURN(PTR_ERR(oap));
2514
2515         /*
2516          * bug 7311: OST-side locking is only supported for liblustre for now
2517          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2518          * implementation has to handle case where OST-locked page was picked
2519          * up by, e.g., ->writepage().
2520          */
2521         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2522         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2523                                      * tread here. */
2524
2525         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2526                 RETURN(-EIO);
2527
2528         if (loi == NULL)
2529                 loi = lsm->lsm_oinfo[0];
2530
2531         if (oap->oap_cmd & OBD_BRW_WRITE) {
2532                 lop = &loi->loi_write_lop;
2533         } else {
2534                 lop = &loi->loi_read_lop;
2535         }
2536
2537         client_obd_list_lock(&cli->cl_loi_list_lock);
2538
2539         if (list_empty(&oap->oap_pending_item))
2540                 GOTO(out, rc = -EINVAL);
2541
2542         if ((oap->oap_async_flags & async_flags) == async_flags)
2543                 GOTO(out, rc = 0);
2544
2545         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2546                 oap->oap_async_flags |= ASYNC_READY;
2547
2548         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2549                 if (list_empty(&oap->oap_rpc_item)) {
2550                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2551                         loi_list_maint(cli, loi);
2552                 }
2553         }
2554
2555         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2556                         oap->oap_async_flags);
2557 out:
2558         osc_check_rpcs(cli);
2559         client_obd_list_unlock(&cli->cl_loi_list_lock);
2560         RETURN(rc);
2561 }
2562
2563 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2564                              struct lov_oinfo *loi,
2565                              struct obd_io_group *oig, void *cookie,
2566                              int cmd, obd_off off, int count,
2567                              obd_flag brw_flags,
2568                              obd_flag async_flags)
2569 {
2570         struct client_obd *cli = &exp->exp_obd->u.cli;
2571         struct osc_async_page *oap;
2572         struct loi_oap_pages *lop;
2573         int rc = 0;
2574         ENTRY;
2575
2576         oap = oap_from_cookie(cookie);
2577         if (IS_ERR(oap))
2578                 RETURN(PTR_ERR(oap));
2579
2580         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2581                 RETURN(-EIO);
2582
2583         if (!list_empty(&oap->oap_pending_item) ||
2584             !list_empty(&oap->oap_urgent_item) ||
2585             !list_empty(&oap->oap_rpc_item))
2586                 RETURN(-EBUSY);
2587
2588         if (loi == NULL)
2589                 loi = lsm->lsm_oinfo[0];
2590
2591         client_obd_list_lock(&cli->cl_loi_list_lock);
2592
2593         oap->oap_cmd = cmd;
2594         oap->oap_page_off = off;
2595         oap->oap_count = count;
2596         oap->oap_brw_flags = brw_flags;
2597         oap->oap_async_flags = async_flags;
2598
2599         if (cmd & OBD_BRW_WRITE)
2600                 lop = &loi->loi_write_lop;
2601         else
2602                 lop = &loi->loi_read_lop;
2603
2604         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2605         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2606                 oap->oap_oig = oig;
2607                 rc = oig_add_one(oig, &oap->oap_occ);
2608         }
2609
2610         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2611                   oap, oap->oap_page, rc);
2612
2613         client_obd_list_unlock(&cli->cl_loi_list_lock);
2614
2615         RETURN(rc);
2616 }
2617
2618 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2619                                  struct loi_oap_pages *lop, int cmd)
2620 {
2621         struct list_head *pos, *tmp;
2622         struct osc_async_page *oap;
2623
2624         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2625                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2626                 list_del(&oap->oap_pending_item);
2627                 osc_oap_to_pending(oap);
2628         }
2629         loi_list_maint(cli, loi);
2630 }
2631
2632 static int osc_trigger_group_io(struct obd_export *exp,
2633                                 struct lov_stripe_md *lsm,
2634                                 struct lov_oinfo *loi,
2635                                 struct obd_io_group *oig)
2636 {
2637         struct client_obd *cli = &exp->exp_obd->u.cli;
2638         ENTRY;
2639
2640         if (loi == NULL)
2641                 loi = lsm->lsm_oinfo[0];
2642
2643         client_obd_list_lock(&cli->cl_loi_list_lock);
2644
2645         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2646         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2647
2648         osc_check_rpcs(cli);
2649         client_obd_list_unlock(&cli->cl_loi_list_lock);
2650
2651         RETURN(0);
2652 }
2653
2654 static int osc_teardown_async_page(struct obd_export *exp,
2655                                    struct lov_stripe_md *lsm,
2656                                    struct lov_oinfo *loi, void *cookie)
2657 {
2658         struct client_obd *cli = &exp->exp_obd->u.cli;
2659         struct loi_oap_pages *lop;
2660         struct osc_async_page *oap;
2661         int rc = 0;
2662         ENTRY;
2663
2664         oap = oap_from_cookie(cookie);
2665         if (IS_ERR(oap))
2666                 RETURN(PTR_ERR(oap));
2667
2668         if (loi == NULL)
2669                 loi = lsm->lsm_oinfo[0];
2670
2671         if (oap->oap_cmd & OBD_BRW_WRITE) {
2672                 lop = &loi->loi_write_lop;
2673         } else {
2674                 lop = &loi->loi_read_lop;
2675         }
2676
2677         client_obd_list_lock(&cli->cl_loi_list_lock);
2678
2679         if (!list_empty(&oap->oap_rpc_item))
2680                 GOTO(out, rc = -EBUSY);
2681
2682         osc_exit_cache(cli, oap, 0);
2683         osc_wake_cache_waiters(cli);
2684
2685         if (!list_empty(&oap->oap_urgent_item)) {
2686                 list_del_init(&oap->oap_urgent_item);
2687                 oap->oap_async_flags &= ~ASYNC_URGENT;
2688         }
2689         if (!list_empty(&oap->oap_pending_item)) {
2690                 list_del_init(&oap->oap_pending_item);
2691                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2692         }
2693         loi_list_maint(cli, loi);
2694
2695         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2696 out:
2697         client_obd_list_unlock(&cli->cl_loi_list_lock);
2698         RETURN(rc);
2699 }
2700
2701 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2702                                     int flags)
2703 {
2704         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2705
2706         if (lock == NULL) {
2707                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2708                 return;
2709         }
2710         lock_res_and_lock(lock);
2711 #ifdef __KERNEL__
2712 #ifdef __LINUX__
2713         /* Liang XXX: Darwin and Winnt checking should be added */
2714         if (lock->l_ast_data && lock->l_ast_data != data) {
2715                 struct inode *new_inode = data;
2716                 struct inode *old_inode = lock->l_ast_data;
2717                 if (!(old_inode->i_state & I_FREEING))
2718                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2719                 LASSERTF(old_inode->i_state & I_FREEING,
2720                          "Found existing inode %p/%lu/%u state %lu in lock: "
2721                          "setting data to %p/%lu/%u\n", old_inode,
2722                          old_inode->i_ino, old_inode->i_generation,
2723                          old_inode->i_state,
2724                          new_inode, new_inode->i_ino, new_inode->i_generation);
2725         }
2726 #endif
2727 #endif
2728         lock->l_ast_data = data;
2729         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2730         unlock_res_and_lock(lock);
2731         LDLM_LOCK_PUT(lock);
2732 }
2733
2734 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2735                              ldlm_iterator_t replace, void *data)
2736 {
2737         struct ldlm_res_id res_id = { .name = {0} };
2738         struct obd_device *obd = class_exp2obd(exp);
2739
2740         res_id.name[0] = lsm->lsm_object_id;
2741         res_id.name[2] = lsm->lsm_object_gr;
2742
2743         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2744         return 0;
2745 }
2746
2747 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2748                             int intent, int rc)
2749 {
2750         ENTRY;
2751
2752         if (intent) {
2753                 /* The request was created before ldlm_cli_enqueue call. */
2754                 if (rc == ELDLM_LOCK_ABORTED) {
2755                         struct ldlm_reply *rep;
2756
2757                         /* swabbed by ldlm_cli_enqueue() */
2758                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2759                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2760                                              sizeof(*rep));
2761                         LASSERT(rep != NULL);
2762                         if (rep->lock_policy_res1)
2763                                 rc = rep->lock_policy_res1;
2764                 }
2765         }
2766
2767         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2768                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2769                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2770                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2771                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2772         }
2773
2774         /* Call the update callback. */
2775         rc = oinfo->oi_cb_up(oinfo, rc);
2776         RETURN(rc);
2777 }
2778
2779 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2780                                  struct osc_enqueue_args *aa, int rc)
2781 {
2782         int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2783         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2784         struct ldlm_lock *lock;
2785
2786         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2787          * be valid. */
2788         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2789
2790         /* Complete obtaining the lock procedure. */
2791         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2792                                    aa->oa_ei->ei_mode,
2793                                    &aa->oa_ei->ei_flags,
2794                                    &lsm->lsm_oinfo[0]->loi_lvb,
2795                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2796                                    lustre_swab_ost_lvb,
2797                                    aa->oa_oi->oi_lockh, rc);
2798
2799         /* Complete osc stuff. */
2800         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2801
2802         /* Release the lock for async request. */
2803         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2804                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2805
2806         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2807                  aa->oa_oi->oi_lockh, req, aa);
2808         LDLM_LOCK_PUT(lock);
2809         return rc;
2810 }
2811
2812 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2813  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2814  * other synchronous requests, however keeping some locks and trying to obtain
2815  * others may take a considerable amount of time in a case of ost failure; and
2816  * when other sync requests do not get released lock from a client, the client
2817  * is excluded from the cluster -- such scenarious make the life difficult, so
2818  * release locks just after they are obtained. */
2819 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2820                        struct obd_enqueue_info *einfo)
2821 {
2822         struct ldlm_res_id res_id = { .name = {0} };
2823         struct obd_device *obd = exp->exp_obd;
2824         struct ldlm_reply *rep;
2825         struct ptlrpc_request *req = NULL;
2826         int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2827         int rc;
2828         ENTRY;
2829
2830         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2831         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2832
2833         /* Filesystem lock extents are extended to page boundaries so that
2834          * dealing with the page cache is a little smoother.  */
2835         oinfo->oi_policy.l_extent.start -=
2836                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2837         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2838
2839         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2840                 goto no_match;
2841
2842         /* Next, search for already existing extent locks that will cover us */
2843         rc = ldlm_lock_match(obd->obd_namespace,
2844                              einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2845                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2846                              oinfo->oi_lockh);
2847         if (rc == 1) {
2848                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2849                                         einfo->ei_flags);
2850                 if (intent) {
2851                         /* I would like to be able to ASSERT here that rss <=
2852                          * kms, but I can't, for reasons which are explained in
2853                          * lov_enqueue() */
2854                 }
2855
2856                 /* We already have a lock, and it's referenced */
2857                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2858
2859                 /* For async requests, decref the lock. */
2860                 if (einfo->ei_rqset)
2861                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2862
2863                 RETURN(ELDLM_OK);
2864         }
2865
2866         /* If we're trying to read, we also search for an existing PW lock.  The
2867          * VFS and page cache already protect us locally, so lots of readers/
2868          * writers can share a single PW lock.
2869          *
2870          * There are problems with conversion deadlocks, so instead of
2871          * converting a read lock to a write lock, we'll just enqueue a new
2872          * one.
2873          *
2874          * At some point we should cancel the read lock instead of making them
2875          * send us a blocking callback, but there are problems with canceling
2876          * locks out from other users right now, too. */
2877
2878         if (einfo->ei_mode == LCK_PR) {
2879                 rc = ldlm_lock_match(obd->obd_namespace,
2880                                      einfo->ei_flags | LDLM_FL_LVB_READY,
2881                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2882                                      LCK_PW, oinfo->oi_lockh);
2883                 if (rc == 1) {
2884                         /* FIXME: This is not incredibly elegant, but it might
2885                          * be more elegant than adding another parameter to
2886                          * lock_match.  I want a second opinion. */
2887                         /* addref the lock only if not async requests. */
2888                         if (!einfo->ei_rqset)
2889                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2890                         osc_set_data_with_check(oinfo->oi_lockh,
2891                                                 einfo->ei_cbdata,
2892                                                 einfo->ei_flags);
2893                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2894                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2895                         RETURN(ELDLM_OK);
2896                 }
2897         }
2898
2899  no_match:
2900         if (intent) {
2901                 int size[3] = {
2902                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2903                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2904                         [DLM_LOCKREQ_OFF + 1] = 0 };
2905
2906                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2907                 if (req == NULL)
2908                         RETURN(-ENOMEM);
2909
2910                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2911                 size[DLM_REPLY_REC_OFF] =
2912                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2913                 ptlrpc_req_set_repsize(req, 3, size);
2914         }
2915
2916         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2917         einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2918
2919         rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type,
2920                               &oinfo->oi_policy, einfo->ei_mode,
2921                               &einfo->ei_flags, einfo->ei_cb_bl,
2922                               einfo->ei_cb_cp, einfo->ei_cb_gl,
2923                               einfo->ei_cbdata,
2924                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2925                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2926                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2927                               einfo->ei_rqset ? 1 : 0);
2928         if (einfo->ei_rqset) {
2929                 if (!rc) {
2930                         struct osc_enqueue_args *aa;
2931                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2932                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2933                         aa->oa_oi = oinfo;
2934                         aa->oa_ei = einfo;
2935                         aa->oa_exp = exp;
2936
2937                         req->rq_interpret_reply = osc_enqueue_interpret;
2938                         ptlrpc_set_add_req(einfo->ei_rqset, req);
2939                 } else if (intent) {
2940                         ptlrpc_req_finished(req);
2941                 }
2942                 RETURN(rc);
2943         }
2944
2945         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2946         if (intent)
2947                 ptlrpc_req_finished(req);
2948
2949         RETURN(rc);
2950 }
2951
2952 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2953                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2954                      int *flags, void *data, struct lustre_handle *lockh)
2955 {
2956         struct ldlm_res_id res_id = { .name = {0} };
2957         struct obd_device *obd = exp->exp_obd;
2958         int rc;
2959         int lflags = *flags;
2960         ENTRY;
2961
2962         res_id.name[0] = lsm->lsm_object_id;
2963         res_id.name[2] = lsm->lsm_object_gr;
2964
2965         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2966
2967         /* Filesystem lock extents are extended to page boundaries so that
2968          * dealing with the page cache is a little smoother */
2969         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2970         policy->l_extent.end |= ~CFS_PAGE_MASK;
2971
2972         /* Next, search for already existing extent locks that will cover us */
2973         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2974                              &res_id, type, policy, mode, lockh);
2975         if (rc) {
2976                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2977                         osc_set_data_with_check(lockh, data, lflags);
2978                 RETURN(rc);
2979         }
2980         /* If we're trying to read, we also search for an existing PW lock.  The
2981          * VFS and page cache already protect us locally, so lots of readers/
2982          * writers can share a single PW lock. */
2983         if (mode == LCK_PR) {
2984                 rc = ldlm_lock_match(obd->obd_namespace,
2985                                      lflags | LDLM_FL_LVB_READY, &res_id,
2986                                      type, policy, LCK_PW, lockh);
2987                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2988                         /* FIXME: This is not incredibly elegant, but it might
2989                          * be more elegant than adding another parameter to
2990                          * lock_match.  I want a second opinion. */
2991                         osc_set_data_with_check(lockh, data, lflags);
2992                         ldlm_lock_addref(lockh, LCK_PR);
2993                         ldlm_lock_decref(lockh, LCK_PW);
2994                 }
2995         }
2996         RETURN(rc);
2997 }
2998
2999 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3000                       __u32 mode, struct lustre_handle *lockh)
3001 {
3002         ENTRY;
3003
3004         if (unlikely(mode == LCK_GROUP))
3005                 ldlm_lock_decref_and_cancel(lockh, mode);
3006         else
3007                 ldlm_lock_decref(lockh, mode);
3008
3009         RETURN(0);
3010 }
3011
3012 static int osc_cancel_unused(struct obd_export *exp,
3013                              struct lov_stripe_md *lsm, int flags,
3014                              void *opaque)
3015 {
3016         struct obd_device *obd = class_exp2obd(exp);
3017         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3018
3019         if (lsm != NULL) {
3020                 res_id.name[0] = lsm->lsm_object_id;
3021                 res_id.name[2] = lsm->lsm_object_gr;
3022                 resp = &res_id;
3023         }
3024
3025         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3026 }
3027
3028 static int osc_join_lru(struct obd_export *exp,
3029                         struct lov_stripe_md *lsm, int join)
3030 {
3031         struct obd_device *obd = class_exp2obd(exp);
3032         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3033
3034         if (lsm != NULL) {
3035                 res_id.name[0] = lsm->lsm_object_id;
3036                 res_id.name[2] = lsm->lsm_object_gr;
3037                 resp = &res_id;
3038         }
3039
3040         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3041 }
3042
3043 static int osc_statfs_interpret(struct ptlrpc_request *req,
3044                                 struct osc_async_args *aa, int rc)
3045 {
3046         struct obd_statfs *msfs;
3047         ENTRY;
3048
3049         if (rc != 0)
3050                 GOTO(out, rc);
3051
3052         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3053                                   lustre_swab_obd_statfs);
3054         if (msfs == NULL) {
3055                 CERROR("Can't unpack obd_statfs\n");
3056                 GOTO(out, rc = -EPROTO);
3057         }
3058
3059         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3060 out:
3061         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3062         RETURN(rc);
3063 }
3064
3065 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3066                             __u64 max_age, struct ptlrpc_request_set *rqset)
3067 {
3068         struct ptlrpc_request *req;
3069         struct osc_async_args *aa;
3070         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3071         ENTRY;
3072
3073         /* We could possibly pass max_age in the request (as an absolute
3074          * timestamp or a "seconds.usec ago") so the target can avoid doing
3075          * extra calls into the filesystem if that isn't necessary (e.g.
3076          * during mount that would help a bit).  Having relative timestamps
3077          * is not so great if request processing is slow, while absolute
3078          * timestamps are not ideal because they need time synchronization. */
3079         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3080                               OST_STATFS, 1, NULL, NULL);
3081         if (!req)
3082                 RETURN(-ENOMEM);
3083
3084         ptlrpc_req_set_repsize(req, 2, size);
3085         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3086
3087         req->rq_interpret_reply = osc_statfs_interpret;
3088         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3089         aa = (struct osc_async_args *)&req->rq_async_args;
3090         aa->aa_oi = oinfo;
3091
3092         ptlrpc_set_add_req(rqset, req);
3093         RETURN(0);
3094 }
3095
3096 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3097                       __u64 max_age)
3098 {
3099         struct obd_statfs *msfs;
3100         struct ptlrpc_request *req;
3101         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3102         ENTRY;
3103
3104         /* We could possibly pass max_age in the request (as an absolute
3105          * timestamp or a "seconds.usec ago") so the target can avoid doing
3106          * extra calls into the filesystem if that isn't necessary (e.g.
3107          * during mount that would help a bit).  Having relative timestamps
3108          * is not so great if request processing is slow, while absolute
3109          * timestamps are not ideal because they need time synchronization. */
3110         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3111                               OST_STATFS, 1, NULL, NULL);
3112         if (!req)
3113                 RETURN(-ENOMEM);
3114
3115         ptlrpc_req_set_repsize(req, 2, size);
3116         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3117
3118         rc = ptlrpc_queue_wait(req);
3119         if (rc)
3120                 GOTO(out, rc);
3121
3122         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3123                                   lustre_swab_obd_statfs);
3124         if (msfs == NULL) {
3125                 CERROR("Can't unpack obd_statfs\n");
3126                 GOTO(out, rc = -EPROTO);
3127         }
3128
3129         memcpy(osfs, msfs, sizeof(*osfs));
3130
3131         EXIT;
3132  out:
3133         ptlrpc_req_finished(req);
3134         return rc;
3135 }
3136
3137 /* Retrieve object striping information.
3138  *
3139  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3140  * the maximum number of OST indices which will fit in the user buffer.
3141  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3142  */
3143 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3144 {
3145         struct lov_user_md lum, *lumk;
3146         int rc = 0, lum_size;
3147         ENTRY;
3148
3149         if (!lsm)
3150                 RETURN(-ENODATA);
3151
3152         if (copy_from_user(&lum, lump, sizeof(lum)))
3153                 RETURN(-EFAULT);
3154
3155         if (lum.lmm_magic != LOV_USER_MAGIC)
3156                 RETURN(-EINVAL);
3157
3158         if (lum.lmm_stripe_count > 0) {
3159                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3160                 OBD_ALLOC(lumk, lum_size);
3161                 if (!lumk)
3162                         RETURN(-ENOMEM);
3163
3164                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3165                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3166         } else {
3167                 lum_size = sizeof(lum);
3168                 lumk = &lum;
3169         }
3170
3171         lumk->lmm_object_id = lsm->lsm_object_id;
3172         lumk->lmm_object_gr = lsm->lsm_object_gr;
3173         lumk->lmm_stripe_count = 1;
3174
3175         if (copy_to_user(lump, lumk, lum_size))
3176                 rc = -EFAULT;
3177
3178         if (lumk != &lum)
3179                 OBD_FREE(lumk, lum_size);
3180
3181         RETURN(rc);
3182 }
3183
3184
3185 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3186                          void *karg, void *uarg)
3187 {
3188         struct obd_device *obd = exp->exp_obd;
3189         struct obd_ioctl_data *data = karg;
3190         int err = 0;
3191         ENTRY;
3192
3193 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3194         MOD_INC_USE_COUNT;
3195 #else
3196         if (!try_module_get(THIS_MODULE)) {
3197                 CERROR("Can't get module. Is it alive?");
3198                 return -EINVAL;
3199         }
3200 #endif
3201         switch (cmd) {
3202         case OBD_IOC_LOV_GET_CONFIG: {
3203                 char *buf;
3204                 struct lov_desc *desc;
3205                 struct obd_uuid uuid;
3206
3207                 buf = NULL;
3208                 len = 0;
3209                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3210                         GOTO(out, err = -EINVAL);
3211
3212                 data = (struct obd_ioctl_data *)buf;
3213
3214                 if (sizeof(*desc) > data->ioc_inllen1) {
3215                         obd_ioctl_freedata(buf, len);
3216                         GOTO(out, err = -EINVAL);
3217                 }
3218
3219                 if (data->ioc_inllen2 < sizeof(uuid)) {
3220                         obd_ioctl_freedata(buf, len);
3221                         GOTO(out, err = -EINVAL);
3222                 }
3223
3224                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3225                 desc->ld_tgt_count = 1;
3226                 desc->ld_active_tgt_count = 1;
3227                 desc->ld_default_stripe_count = 1;
3228                 desc->ld_default_stripe_size = 0;
3229                 desc->ld_default_stripe_offset = 0;
3230                 desc->ld_pattern = 0;
3231                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3232
3233                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3234
3235                 err = copy_to_user((void *)uarg, buf, len);
3236                 if (err)
3237                         err = -EFAULT;
3238                 obd_ioctl_freedata(buf, len);
3239                 GOTO(out, err);
3240         }
3241         case LL_IOC_LOV_SETSTRIPE:
3242                 err = obd_alloc_memmd(exp, karg);
3243                 if (err > 0)
3244                         err = 0;
3245                 GOTO(out, err);
3246         case LL_IOC_LOV_GETSTRIPE:
3247                 err = osc_getstripe(karg, uarg);
3248                 GOTO(out, err);
3249         case OBD_IOC_CLIENT_RECOVER:
3250                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3251                                             data->ioc_inlbuf1);
3252                 if (err > 0)
3253                         err = 0;
3254                 GOTO(out, err);
3255         case IOC_OSC_SET_ACTIVE:
3256                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3257                                                data->ioc_offset);
3258                 GOTO(out, err);
3259         case OBD_IOC_POLL_QUOTACHECK:
3260                 err = lquota_poll_check(quota_interface, exp,
3261                                         (struct if_quotacheck *)karg);
3262                 GOTO(out, err);
3263         default:
3264                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3265                        cmd, cfs_curproc_comm());
3266                 GOTO(out, err = -ENOTTY);
3267         }
3268 out:
3269 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3270         MOD_DEC_USE_COUNT;
3271 #else
3272         module_put(THIS_MODULE);
3273 #endif
3274         return err;
3275 }
3276
3277 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3278                         void *key, __u32 *vallen, void *val)
3279 {
3280         ENTRY;
3281         if (!vallen || !val)
3282                 RETURN(-EFAULT);
3283
3284         if (keylen > strlen("lock_to_stripe") &&
3285             strcmp(key, "lock_to_stripe") == 0) {
3286                 __u32 *stripe = val;
3287                 *vallen = sizeof(*stripe);
3288                 *stripe = 0;
3289                 RETURN(0);
3290         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3291                 struct ptlrpc_request *req;
3292                 obd_id *reply;
3293                 char *bufs[2] = { NULL, key };
3294                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3295
3296                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3297                                       OST_GET_INFO, 2, size, bufs);
3298                 if (req == NULL)
3299                         RETURN(-ENOMEM);
3300
3301                 size[REPLY_REC_OFF] = *vallen;
3302                 ptlrpc_req_set_repsize(req, 2, size);
3303                 rc = ptlrpc_queue_wait(req);
3304                 if (rc)
3305                         GOTO(out, rc);
3306
3307                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3308                                            lustre_swab_ost_last_id);
3309                 if (reply == NULL) {
3310                         CERROR("Can't unpack OST last ID\n");
3311                         GOTO(out, rc = -EPROTO);
3312                 }
3313                 *((obd_id *)val) = *reply;
3314         out:
3315                 ptlrpc_req_finished(req);
3316                 RETURN(rc);
3317         }
3318         RETURN(-EINVAL);
3319 }
3320
3321 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3322                                           void *aa, int rc)
3323 {
3324         struct llog_ctxt *ctxt;
3325         struct obd_import *imp = req->rq_import;
3326         ENTRY;
3327
3328         if (rc != 0)
3329                 RETURN(rc);
3330
3331         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3332         if (ctxt) {
3333                 if (rc == 0)
3334                         rc = llog_initiator_connect(ctxt);
3335                 else
3336                         CERROR("cannot establish connection for "
3337                                "ctxt %p: %d\n", ctxt, rc);
3338         }
3339
3340         spin_lock(&imp->imp_lock);
3341         imp->imp_server_timeout = 1;
3342         imp->imp_pingable = 1;
3343         spin_unlock(&imp->imp_lock);
3344         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3345
3346         RETURN(rc);
3347 }
3348
3349 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3350                               void *key, obd_count vallen, void *val,
3351                               struct ptlrpc_request_set *set)
3352 {
3353         struct ptlrpc_request *req;
3354         struct obd_device  *obd = exp->exp_obd;
3355         struct obd_import *imp = class_exp2cliimp(exp);
3356         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3357         char *bufs[3] = { NULL, key, val };
3358         ENTRY;
3359
3360         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3361
3362         if (KEY_IS(KEY_NEXT_ID)) {
3363                 if (vallen != sizeof(obd_id))
3364                         RETURN(-EINVAL);
3365                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3366                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3367                        exp->exp_obd->obd_name,
3368                        obd->u.cli.cl_oscc.oscc_next_id);
3369
3370                 RETURN(0);
3371         }
3372
3373         if (KEY_IS("unlinked")) {
3374                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3375                 spin_lock(&oscc->oscc_lock);
3376                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3377                 spin_unlock(&oscc->oscc_lock);
3378                 RETURN(0);
3379         }
3380
3381         if (KEY_IS(KEY_INIT_RECOV)) {
3382                 if (vallen != sizeof(int))
3383                         RETURN(-EINVAL);
3384                 spin_lock(&imp->imp_lock);
3385                 imp->imp_initial_recov = *(int *)val;
3386                 spin_unlock(&imp->imp_lock);
3387                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3388                        exp->exp_obd->obd_name,
3389                        imp->imp_initial_recov);
3390                 RETURN(0);
3391         }
3392
3393         if (KEY_IS("checksum")) {
3394                 if (vallen != sizeof(int))
3395                         RETURN(-EINVAL);
3396                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3397                 RETURN(0);
3398         }
3399
3400         if (KEY_IS(KEY_FLUSH_CTX)) {
3401                 sptlrpc_import_flush_my_ctx(imp);
3402                 RETURN(0);
3403         }
3404
3405         if (!set)
3406                 RETURN(-EINVAL);
3407
3408         /* We pass all other commands directly to OST. Since nobody calls osc
3409            methods directly and everybody is supposed to go through LOV, we
3410            assume lov checked invalid values for us.
3411            The only recognised values so far are evict_by_nid and mds_conn.
3412            Even if something bad goes through, we'd get a -EINVAL from OST
3413            anyway. */
3414
3415         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3416                               bufs);
3417         if (req == NULL)
3418                 RETURN(-ENOMEM);
3419
3420         if (KEY_IS("mds_conn")) {
3421                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3422
3423                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3424                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3425                 LASSERT(oscc->oscc_oa.o_gr > 0);
3426                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3427         }
3428
3429         ptlrpc_req_set_repsize(req, 1, NULL);
3430         ptlrpc_set_add_req(set, req);
3431         ptlrpc_check_set(set);
3432
3433         RETURN(0);
3434 }
3435
3436
3437 static struct llog_operations osc_size_repl_logops = {
3438         lop_cancel: llog_obd_repl_cancel
3439 };
3440
3441 static struct llog_operations osc_mds_ost_orig_logops;
3442 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3443                          struct obd_device *tgt, int count,
3444                          struct llog_catid *catid, struct obd_uuid *uuid)
3445 {
3446         int rc;
3447         ENTRY;
3448
3449         spin_lock(&obd->obd_dev_lock);
3450         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3451                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3452                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3453                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3454                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3455                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3456         }
3457         spin_unlock(&obd->obd_dev_lock);
3458
3459         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3460                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3461         if (rc) {
3462                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3463                 GOTO (out, rc);
3464         }
3465
3466         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3467                         &osc_size_repl_logops);
3468         if (rc)
3469                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3470 out:
3471         if (rc) {
3472                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3473                        obd->obd_name, tgt->obd_name, count, catid, rc);
3474                 CERROR("logid "LPX64":0x%x\n",
3475                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3476         }
3477         RETURN(rc);
3478 }
3479
3480 static int osc_llog_finish(struct obd_device *obd, int count)
3481 {
3482         struct llog_ctxt *ctxt;
3483         int rc = 0, rc2 = 0;
3484         ENTRY;
3485
3486         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3487         if (ctxt)
3488                 rc = llog_cleanup(ctxt);
3489
3490         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3491         if (ctxt)
3492                 rc2 = llog_cleanup(ctxt);
3493         if (!rc)
3494                 rc = rc2;
3495
3496         RETURN(rc);
3497 }
3498
3499 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3500                          struct obd_uuid *cluuid,
3501                          struct obd_connect_data *data)
3502 {
3503         struct client_obd *cli = &obd->u.cli;
3504
3505         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3506                 long lost_grant;
3507
3508                 client_obd_list_lock(&cli->cl_loi_list_lock);
3509                 data->ocd_grant = cli->cl_avail_grant ?:
3510                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3511                 lost_grant = cli->cl_lost_grant;
3512                 cli->cl_lost_grant = 0;
3513                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3514
3515                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3516                        "cl_lost_grant: %ld\n", data->ocd_grant,
3517                        cli->cl_avail_grant, lost_grant);
3518                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3519                        " ocd_grant: %d\n", data->ocd_connect_flags,
3520                        data->ocd_version, data->ocd_grant);
3521         }
3522
3523         RETURN(0);
3524 }
3525
3526 static int osc_disconnect(struct obd_export *exp)
3527 {
3528         struct obd_device *obd = class_exp2obd(exp);
3529         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3530         int rc;
3531
3532         if (obd->u.cli.cl_conn_count == 1)
3533                 /* flush any remaining cancel messages out to the target */
3534                 llog_sync(ctxt, exp);
3535
3536         rc = client_disconnect_export(exp);
3537         return rc;
3538 }
3539
3540 static int osc_import_event(struct obd_device *obd,
3541                             struct obd_import *imp,
3542                             enum obd_import_event event)
3543 {
3544         struct client_obd *cli;
3545         int rc = 0;
3546
3547         ENTRY;
3548         LASSERT(imp->imp_obd == obd);
3549
3550         switch (event) {
3551         case IMP_EVENT_DISCON: {
3552                 /* Only do this on the MDS OSC's */
3553                 if (imp->imp_server_timeout) {
3554                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3555
3556                         spin_lock(&oscc->oscc_lock);
3557                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3558                         spin_unlock(&oscc->oscc_lock);
3559                 }
3560                 cli = &obd->u.cli;
3561                 client_obd_list_lock(&cli->cl_loi_list_lock);
3562                 cli->cl_avail_grant = 0;
3563                 cli->cl_lost_grant = 0;
3564                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3565                 break;
3566         }
3567         case IMP_EVENT_INACTIVE: {
3568                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3569                 break;
3570         }
3571         case IMP_EVENT_INVALIDATE: {
3572                 struct ldlm_namespace *ns = obd->obd_namespace;
3573
3574                 /* Reset grants */
3575                 cli = &obd->u.cli;
3576                 client_obd_list_lock(&cli->cl_loi_list_lock);
3577                 /* all pages go to failing rpcs due to the invalid import */
3578                 osc_check_rpcs(cli);
3579                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3580
3581                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3582
3583                 break;
3584         }
3585         case IMP_EVENT_ACTIVE: {
3586                 /* Only do this on the MDS OSC's */
3587                 if (imp->imp_server_timeout) {
3588                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3589
3590                         spin_lock(&oscc->oscc_lock);
3591                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3592                         spin_unlock(&oscc->oscc_lock);
3593                 }
3594                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3595                 break;
3596         }
3597         case IMP_EVENT_OCD: {
3598                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3599
3600                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3601                         osc_init_grant(&obd->u.cli, ocd);
3602
3603                 /* See bug 7198 */
3604                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3605                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3606
3607                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3608                 break;
3609         }
3610         default:
3611                 CERROR("Unknown import event %d\n", event);
3612                 LBUG();
3613         }
3614         RETURN(rc);
3615 }
3616
3617 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3618 {
3619         int rc;
3620         ENTRY;
3621
3622         ENTRY;
3623         rc = ptlrpcd_addref();
3624         if (rc)
3625                 RETURN(rc);
3626
3627         rc = client_obd_setup(obd, lcfg);
3628         if (rc) {
3629                 ptlrpcd_decref();
3630         } else {
3631                 struct lprocfs_static_vars lvars;
3632                 struct client_obd *cli = &obd->u.cli;
3633
3634                 lprocfs_init_vars(osc, &lvars);
3635                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3636                         lproc_osc_attach_seqstat(obd);
3637                         ptlrpc_lprocfs_register_obd(obd);
3638                 }
3639
3640                 oscc_init(obd);
3641                 /* We need to allocate a few requests more, because
3642                    brw_interpret_oap tries to create new requests before freeing
3643                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3644                    reserved, but I afraid that might be too much wasted RAM
3645                    in fact, so 2 is just my guess and still should work. */
3646                 cli->cl_import->imp_rq_pool =
3647                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3648                                             OST_MAXREQSIZE,
3649                                             ptlrpc_add_rqs_to_pool);
3650         }
3651
3652         RETURN(rc);
3653 }
3654
3655 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3656 {
3657         int rc = 0;
3658         ENTRY;
3659
3660         switch (stage) {
3661         case OBD_CLEANUP_EARLY: {
3662                 struct obd_import *imp;
3663                 imp = obd->u.cli.cl_import;
3664                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3665                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3666                 ptlrpc_deactivate_import(imp);
3667                 spin_lock(&imp->imp_lock);
3668                 imp->imp_pingable = 0;
3669                 spin_unlock(&imp->imp_lock);
3670                 break;
3671         }
3672         case OBD_CLEANUP_EXPORTS: {
3673                 /* If we set up but never connected, the
3674                    client import will not have been cleaned. */
3675                 if (obd->u.cli.cl_import) {
3676                         struct obd_import *imp;
3677                         imp = obd->u.cli.cl_import;
3678                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3679                                obd->obd_name);
3680                         ptlrpc_invalidate_import(imp);
3681                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3682                         class_destroy_import(imp);
3683                         obd->u.cli.cl_import = NULL;
3684                 }
3685                 break;
3686         }
3687         case OBD_CLEANUP_SELF_EXP:
3688                 rc = obd_llog_finish(obd, 0);
3689                 if (rc != 0)
3690                         CERROR("failed to cleanup llogging subsystems\n");
3691                 break;
3692         case OBD_CLEANUP_OBD:
3693                 break;
3694         }
3695         RETURN(rc);
3696 }
3697
3698 int osc_cleanup(struct obd_device *obd)
3699 {
3700         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3701         int rc;
3702
3703         ENTRY;
3704         ptlrpc_lprocfs_unregister_obd(obd);
3705         lprocfs_obd_cleanup(obd);
3706
3707         spin_lock(&oscc->oscc_lock);
3708         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3709         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3710         spin_unlock(&oscc->oscc_lock);
3711
3712         /* free memory of osc quota cache */
3713         lquota_cleanup(quota_interface, obd);
3714
3715         rc = client_obd_cleanup(obd);
3716
3717         ptlrpcd_decref();
3718         RETURN(rc);
3719 }
3720
3721 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3722 {
3723         struct lustre_cfg *lcfg = buf;
3724         struct lprocfs_static_vars lvars;
3725         int rc = 0;
3726
3727         lprocfs_init_vars(osc, &lvars);
3728
3729         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3730         return(rc);
3731 }
3732
3733 struct obd_ops osc_obd_ops = {
3734         .o_owner                = THIS_MODULE,
3735         .o_setup                = osc_setup,
3736         .o_precleanup           = osc_precleanup,
3737         .o_cleanup              = osc_cleanup,
3738         .o_add_conn             = client_import_add_conn,
3739         .o_del_conn             = client_import_del_conn,
3740         .o_connect              = client_connect_import,
3741         .o_reconnect            = osc_reconnect,
3742         .o_disconnect           = osc_disconnect,
3743         .o_statfs               = osc_statfs,
3744         .o_statfs_async         = osc_statfs_async,
3745         .o_packmd               = osc_packmd,
3746         .o_unpackmd             = osc_unpackmd,
3747         .o_precreate            = osc_precreate,
3748         .o_create               = osc_create,
3749         .o_destroy              = osc_destroy,
3750         .o_getattr              = osc_getattr,
3751         .o_getattr_async        = osc_getattr_async,
3752         .o_setattr              = osc_setattr,
3753         .o_setattr_async        = osc_setattr_async,
3754         .o_brw                  = osc_brw,
3755         .o_brw_async            = osc_brw_async,
3756         .o_prep_async_page      = osc_prep_async_page,
3757         .o_queue_async_io       = osc_queue_async_io,
3758         .o_set_async_flags      = osc_set_async_flags,
3759         .o_queue_group_io       = osc_queue_group_io,
3760         .o_trigger_group_io     = osc_trigger_group_io,
3761         .o_teardown_async_page  = osc_teardown_async_page,
3762         .o_punch                = osc_punch,
3763         .o_sync                 = osc_sync,
3764         .o_enqueue              = osc_enqueue,
3765         .o_match                = osc_match,
3766         .o_change_cbdata        = osc_change_cbdata,
3767         .o_cancel               = osc_cancel,
3768         .o_cancel_unused        = osc_cancel_unused,
3769         .o_join_lru             = osc_join_lru,
3770         .o_iocontrol            = osc_iocontrol,
3771         .o_get_info             = osc_get_info,
3772         .o_set_info_async       = osc_set_info_async,
3773         .o_import_event         = osc_import_event,
3774         .o_llog_init            = osc_llog_init,
3775         .o_llog_finish          = osc_llog_finish,
3776         .o_process_config       = osc_process_config,
3777 };
3778
3779 int __init osc_init(void)
3780 {
3781         struct lprocfs_static_vars lvars;
3782         int rc;
3783         ENTRY;
3784
3785         lprocfs_init_vars(osc, &lvars);
3786
3787         request_module("lquota");
3788         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3789         lquota_init(quota_interface);
3790         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3791
3792         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3793                                  LUSTRE_OSC_NAME, NULL);
3794         if (rc) {
3795                 if (quota_interface)
3796                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3797                 RETURN(rc);
3798         }
3799
3800         RETURN(rc);
3801 }
3802
3803 #ifdef __KERNEL__
3804 static void /*__exit*/ osc_exit(void)
3805 {
3806         lquota_exit(quota_interface);
3807         if (quota_interface)
3808                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3809
3810         class_unregister_type(LUSTRE_OSC_NAME);
3811 }
3812
3813 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3814 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3815 MODULE_LICENSE("GPL");
3816
3817 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3818 #endif