Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453 out_req:
454         ptlrpc_req_finished(req);
455 out:
456         if (rc && !*ea)
457                 obd_free_memmd(exp, &lsm);
458         RETURN(rc);
459 }
460
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462                                struct osc_async_args *aa, int rc)
463 {
464         struct ost_body *body;
465         ENTRY;
466
467         if (rc != 0)
468                 GOTO(out, rc);
469
470         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471                                   lustre_swab_ost_body);
472         if (body == NULL) {
473                 CERROR ("can't unpack ost_body\n");
474                 GOTO(out, rc = -EPROTO);
475         }
476
477         *aa->aa_oi->oi_oa = body->oa;
478 out:
479         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
480         RETURN(rc);
481 }
482
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484                      struct obd_trans_info *oti,
485                      struct ptlrpc_request_set *rqset)
486 {
487         struct ptlrpc_request *req;
488         struct osc_async_args *aa;
489         struct ost_body *body;
490         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491         ENTRY;
492
493         if (!oinfo->oi_oa) {
494                 CERROR("oa NULL\n");
495                 RETURN(-EINVAL);
496         }
497
498         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500                               OST_PUNCH, 3, size, NULL);
501         if (!req)
502                 RETURN(-ENOMEM);
503
504         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
505
506         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
507         /* overload the size and blocks fields in the oa with start/end */
508         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
509         body->oa.o_size = oinfo->oi_policy.l_extent.start;
510         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
511         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
512
513         ptlrpc_req_set_repsize(req, 2, size);
514
515         req->rq_interpret_reply = osc_punch_interpret;
516         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
517         aa = (struct osc_async_args *)&req->rq_async_args;
518         aa->aa_oi = oinfo;
519         ptlrpc_set_add_req(rqset, req);
520
521         RETURN(0);
522 }
523
524 static int osc_sync(struct obd_export *exp, struct obdo *oa,
525                     struct lov_stripe_md *md, obd_size start, obd_size end,
526                     void *capa)
527 {
528         struct ptlrpc_request *req;
529         struct ost_body *body;
530         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
531         ENTRY;
532
533         if (!oa) {
534                 CERROR("oa NULL\n");
535                 RETURN(-EINVAL);
536         }
537
538         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
539
540         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
541                               OST_SYNC, 3, size, NULL);
542         if (!req)
543                 RETURN(-ENOMEM);
544
545         /* overload the size and blocks fields in the oa with start/end */
546         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
547         body->oa = *oa;
548         body->oa.o_size = start;
549         body->oa.o_blocks = end;
550         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
551
552         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
553
554         ptlrpc_req_set_repsize(req, 2, size);
555
556         rc = ptlrpc_queue_wait(req);
557         if (rc)
558                 GOTO(out, rc);
559
560         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
561                                   lustre_swab_ost_body);
562         if (body == NULL) {
563                 CERROR ("can't unpack ost_body\n");
564                 GOTO (out, rc = -EPROTO);
565         }
566
567         *oa = body->oa;
568
569         EXIT;
570  out:
571         ptlrpc_req_finished(req);
572         return rc;
573 }
574
575 /* Find and cancel locally locks matched by @mode in the resource found by
576  * @objid. Found locks are added into @cancel list. Returns the amount of
577  * locks added to @cancels list. */
578 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
579                                    struct list_head *cancels, ldlm_mode_t mode,
580                                    int lock_flags)
581 {
582         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
583         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
584         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
585         int count;
586         ENTRY;
587
588         if (res == NULL)
589                 RETURN(0);
590
591         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
592                                            lock_flags, 0, NULL);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 /* Destroy requests can be async always on the client, and we don't even really
598  * care about the return code since the client cannot do anything at all about
599  * a destroy failure.
600  * When the MDS is unlinking a filename, it saves the file objects into a
601  * recovery llog, and these object records are cancelled when the OST reports
602  * they were destroyed and sync'd to disk (i.e. transaction committed).
603  * If the client dies, or the OST is down when the object should be destroyed,
604  * the records are not cancelled, and when the OST reconnects to the MDS next,
605  * it will retrieve the llog unlink logs and then sends the log cancellation
606  * cookies to the MDS after committing destroy transactions. */
607 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
608                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
609                        struct obd_export *md_export)
610 {
611         CFS_LIST_HEAD(cancels);
612         struct ptlrpc_request *req;
613         struct ost_body *body;
614         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
615         int count, bufcount = 2;
616         ENTRY;
617
618         if (!oa) {
619                 CERROR("oa NULL\n");
620                 RETURN(-EINVAL);
621         }
622
623         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
624                                         LDLM_FL_DISCARD_DATA);
625         if (exp_connect_cancelset(exp) && count) {
626                 bufcount = 3;
627                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,
628                                                              OST_DESTROY);
629         }
630         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
631                               OST_DESTROY, bufcount, size, NULL);
632         if (exp_connect_cancelset(exp) && req)
633                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
634         else
635                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
636
637         if (!req)
638                 RETURN(-ENOMEM);
639
640         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
641
642         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
644                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
645                        sizeof(*oti->oti_logcookies));
646         body->oa = *oa;
647
648         ptlrpc_req_set_repsize(req, 2, size);
649
650         ptlrpcd_add_req(req);
651         RETURN(0);
652 }
653
654 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
655                                 long writing_bytes)
656 {
657         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
658
659         LASSERT(!(oa->o_valid & bits));
660
661         oa->o_valid |= bits;
662         client_obd_list_lock(&cli->cl_loi_list_lock);
663         oa->o_dirty = cli->cl_dirty;
664         if (cli->cl_dirty > cli->cl_dirty_max) {
665                 CERROR("dirty %lu > dirty_max %lu\n",
666                        cli->cl_dirty, cli->cl_dirty_max);
667                 oa->o_undirty = 0;
668         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
669                 CERROR("dirty %d > system dirty_max %d\n",
670                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
671                 oa->o_undirty = 0;
672         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty, cli->cl_dirty_max);
675                 oa->o_undirty = 0;
676         } else {
677                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
678                                 (cli->cl_max_rpcs_in_flight + 1);
679                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
680         }
681         oa->o_grant = cli->cl_avail_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         client_obd_list_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 /* caller must hold loi_list_lock */
690 static void osc_consume_write_grant(struct client_obd *cli,
691                                     struct brw_page *pga)
692 {
693         atomic_inc(&obd_dirty_pages);
694         cli->cl_dirty += CFS_PAGE_SIZE;
695         cli->cl_avail_grant -= CFS_PAGE_SIZE;
696         pga->flag |= OBD_BRW_FROM_GRANT;
697         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
698                CFS_PAGE_SIZE, pga, pga->pg);
699         LASSERT(cli->cl_avail_grant >= 0);
700 }
701
702 /* the companion to osc_consume_write_grant, called when a brw has completed.
703  * must be called with the loi lock held. */
704 static void osc_release_write_grant(struct client_obd *cli,
705                                     struct brw_page *pga, int sent)
706 {
707         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
708         ENTRY;
709
710         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
711                 EXIT;
712                 return;
713         }
714
715         pga->flag &= ~OBD_BRW_FROM_GRANT;
716         atomic_dec(&obd_dirty_pages);
717         cli->cl_dirty -= CFS_PAGE_SIZE;
718         if (!sent) {
719                 cli->cl_lost_grant += CFS_PAGE_SIZE;
720                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
721                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
722         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
723                 /* For short writes we shouldn't count parts of pages that
724                  * span a whole block on the OST side, or our accounting goes
725                  * wrong.  Should match the code in filter_grant_check. */
726                 int offset = pga->off & ~CFS_PAGE_MASK;
727                 int count = pga->count + (offset & (blocksize - 1));
728                 int end = (offset + pga->count) & (blocksize - 1);
729                 if (end)
730                         count += blocksize - end;
731
732                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
733                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
734                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
735                        cli->cl_avail_grant, cli->cl_dirty);
736         }
737
738         EXIT;
739 }
740
741 static unsigned long rpcs_in_flight(struct client_obd *cli)
742 {
743         return cli->cl_r_in_flight + cli->cl_w_in_flight;
744 }
745
746 /* caller must hold loi_list_lock */
747 void osc_wake_cache_waiters(struct client_obd *cli)
748 {
749         struct list_head *l, *tmp;
750         struct osc_cache_waiter *ocw;
751
752         ENTRY;
753         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
754                 /* if we can't dirty more, we must wait until some is written */
755                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
756                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
757                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
758                                "osc max %ld, sys max %d\n", cli->cl_dirty,
759                                cli->cl_dirty_max, obd_max_dirty_pages);
760                         return;
761                 }
762
763                 /* if still dirty cache but no grant wait for pending RPCs that
764                  * may yet return us some grant before doing sync writes */
765                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
766                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
767                                cli->cl_w_in_flight);
768                         return;
769                 }
770
771                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
772                 list_del_init(&ocw->ocw_entry);
773                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
774                         /* no more RPCs in flight to return grant, do sync IO */
775                         ocw->ocw_rc = -EDQUOT;
776                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
777                 } else {
778                         osc_consume_write_grant(cli,
779                                                 &ocw->ocw_oap->oap_brw_page);
780                 }
781
782                 cfs_waitq_signal(&ocw->ocw_waitq);
783         }
784
785         EXIT;
786 }
787
788 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
789 {
790         client_obd_list_lock(&cli->cl_loi_list_lock);
791         cli->cl_avail_grant = ocd->ocd_grant;
792         client_obd_list_unlock(&cli->cl_loi_list_lock);
793
794         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
795                cli->cl_avail_grant, cli->cl_lost_grant);
796         LASSERT(cli->cl_avail_grant >= 0);
797 }
798
799 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
800 {
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
803         if (body->oa.o_valid & OBD_MD_FLGRANT)
804                 cli->cl_avail_grant += body->oa.o_grant;
805         /* waiters are woken in brw_interpret_oap */
806         client_obd_list_unlock(&cli->cl_loi_list_lock);
807 }
808
809 /* We assume that the reason this OSC got a short read is because it read
810  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
811  * via the LOV, and it _knows_ it's reading inside the file, it's just that
812  * this stripe never got written at or beyond this stripe offset yet. */
813 static void handle_short_read(int nob_read, obd_count page_count,
814                               struct brw_page **pga)
815 {
816         char *ptr;
817         int i = 0;
818
819         /* skip bytes read OK */
820         while (nob_read > 0) {
821                 LASSERT (page_count > 0);
822
823                 if (pga[i]->count > nob_read) {
824                         /* EOF inside this page */
825                         ptr = cfs_kmap(pga[i]->pg) +
826                                 (pga[i]->off & ~CFS_PAGE_MASK);
827                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
828                         cfs_kunmap(pga[i]->pg);
829                         page_count--;
830                         i++;
831                         break;
832                 }
833
834                 nob_read -= pga[i]->count;
835                 page_count--;
836                 i++;
837         }
838
839         /* zero remaining pages */
840         while (page_count-- > 0) {
841                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
842                 memset(ptr, 0, pga[i]->count);
843                 cfs_kunmap(pga[i]->pg);
844                 i++;
845         }
846 }
847
848 static int check_write_rcs(struct ptlrpc_request *req,
849                            int requested_nob, int niocount,
850                            obd_count page_count, struct brw_page **pga)
851 {
852         int    *remote_rcs, i;
853
854         /* return error if any niobuf was in error */
855         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
856                                         sizeof(*remote_rcs) * niocount, NULL);
857         if (remote_rcs == NULL) {
858                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
859                 return(-EPROTO);
860         }
861         if (lustre_msg_swabbed(req->rq_repmsg))
862                 for (i = 0; i < niocount; i++)
863                         __swab32s(&remote_rcs[i]);
864
865         for (i = 0; i < niocount; i++) {
866                 if (remote_rcs[i] < 0)
867                         return(remote_rcs[i]);
868
869                 if (remote_rcs[i] != 0) {
870                         CERROR("rc[%d] invalid (%d) req %p\n",
871                                 i, remote_rcs[i], req);
872                         return(-EPROTO);
873                 }
874         }
875
876         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
877                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
878                        requested_nob, req->rq_bulk->bd_nob_transferred);
879                 return(-EPROTO);
880         }
881
882         return (0);
883 }
884
885 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
886 {
887         if (p1->flag != p2->flag) {
888                 unsigned mask = ~OBD_BRW_FROM_GRANT;
889
890                 /* warn if we try to combine flags that we don't know to be
891                  * safe to combine */
892                 if ((p1->flag & mask) != (p2->flag & mask))
893                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
894                                "same brw?\n", p1->flag, p2->flag);
895                 return 0;
896         }
897
898         return (p1->off + p1->count == p2->off);
899 }
900
901 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
902                                    struct brw_page **pga, int opc)
903 {
904         __u32 cksum = ~0;
905         int i = 0;
906
907         LASSERT (pg_count > 0);
908         while (nob > 0 && pg_count > 0) {
909                 char *ptr = cfs_kmap(pga[i]->pg);
910                 int off = pga[i]->off & ~CFS_PAGE_MASK;
911                 int count = pga[i]->count > nob ? nob : pga[i]->count;
912
913                 /* corrupt the data before we compute the checksum, to
914                  * simulate an OST->client data error */
915                 if (i == 0 && opc == OST_READ &&
916                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
917                         memcpy(ptr + off, "bad1", min(4, nob));
918                 cksum = crc32_le(cksum, ptr + off, count);
919                 cfs_kunmap(pga[i]->pg);
920                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
921                                off, cksum);
922
923                 nob -= pga[i]->count;
924                 pg_count--;
925                 i++;
926         }
927         /* For sending we only compute the wrong checksum instead
928          * of corrupting the data so it is still correct on a redo */
929         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
930                 cksum++;
931
932         return cksum;
933 }
934
935 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
936                                 struct lov_stripe_md *lsm, obd_count page_count,
937                                 struct brw_page **pga, 
938                                 struct ptlrpc_request **reqp,
939                                 struct obd_capa *ocapa)
940 {
941         struct ptlrpc_request   *req;
942         struct ptlrpc_bulk_desc *desc;
943         struct ost_body         *body;
944         struct obd_ioobj        *ioobj;
945         struct niobuf_remote    *niobuf;
946         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
947         int niocount, i, requested_nob, opc, rc;
948         struct ptlrpc_request_pool *pool;
949         struct lustre_capa      *capa;
950         struct osc_brw_async_args *aa;
951
952         ENTRY;
953         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
954                 RETURN(-ENOMEM); /* Recoverable */
955         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
956                 RETURN(-EINVAL); /* Fatal */
957
958         if ((cmd & OBD_BRW_WRITE) != 0) {
959                 opc = OST_WRITE;
960                 pool = cli->cl_import->imp_rq_pool;
961         } else {
962                 opc = OST_READ;
963                 pool = NULL;
964         }
965
966         for (niocount = i = 1; i < page_count; i++) {
967                 if (!can_merge_pages(pga[i - 1], pga[i]))
968                         niocount++;
969         }
970
971         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
972         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
973         if (ocapa)
974                 size[REQ_REC_OFF + 3] = sizeof(*capa);
975
976         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
977                                    size, NULL, pool, NULL);
978         if (req == NULL)
979                 RETURN (-ENOMEM);
980
981         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
982
983         if (opc == OST_WRITE)
984                 desc = ptlrpc_prep_bulk_imp (req, page_count,
985                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
986         else
987                 desc = ptlrpc_prep_bulk_imp (req, page_count,
988                                              BULK_PUT_SINK, OST_BULK_PORTAL);
989         if (desc == NULL)
990                 GOTO(out, rc = -ENOMEM);
991         /* NB request now owns desc and will free it when it gets freed */
992
993         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
994         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
995         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
996                                 niocount * sizeof(*niobuf));
997
998         body->oa = *oa;
999
1000         obdo_to_ioobj(oa, ioobj);
1001         ioobj->ioo_bufcnt = niocount;
1002         if (ocapa) {
1003                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1004                                       sizeof(*capa));
1005                 capa_cpy(capa, ocapa);
1006                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1007         }
1008
1009         LASSERT (page_count > 0);
1010         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1011                 struct brw_page *pg = pga[i];
1012                 struct brw_page *pg_prev = pga[i - 1];
1013
1014                 LASSERT(pg->count > 0);
1015                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1016                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1017                          pg->off, pg->count);
1018 #ifdef __linux__
1019                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1020                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1021                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1022                          i, page_count,
1023                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1024                          pg_prev->pg, page_private(pg_prev->pg),
1025                          pg_prev->pg->index, pg_prev->off);
1026 #else
1027                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1028                          "i %d p_c %u\n", i, page_count);
1029 #endif
1030                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1031                         (pg->flag & OBD_BRW_SRVLOCK));
1032
1033                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1034                                       pg->count);
1035                 requested_nob += pg->count;
1036
1037                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1038                         niobuf--;
1039                         niobuf->len += pg->count;
1040                 } else {
1041                         niobuf->offset = pg->off;
1042                         niobuf->len    = pg->count;
1043                         niobuf->flags  = pg->flag;
1044                 }
1045         }
1046
1047         LASSERT((void *)(niobuf - niocount) ==
1048                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1049                                niocount * sizeof(*niobuf)));
1050         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1051
1052         /* size[REQ_REC_OFF] still sizeof (*body) */
1053         if (opc == OST_WRITE) {
1054                 if (unlikely(cli->cl_checksum)) {
1055                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1056                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1057                                                              page_count, pga,
1058                                                              OST_WRITE);
1059                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1060                                body->oa.o_cksum);
1061                         /* save this in 'oa', too, for later checking */
1062                         oa->o_valid |= OBD_MD_FLCKSUM;
1063                 } else {
1064                         /* clear out the checksum flag, in case this is a
1065                          * resend but cl_checksum is no longer set. b=11238 */
1066                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1067                 }
1068                 oa->o_cksum = body->oa.o_cksum;
1069                 /* 1 RC per niobuf */
1070                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1071                 ptlrpc_req_set_repsize(req, 3, size);
1072         } else {
1073                 if (unlikely(cli->cl_checksum))
1074                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1075                 /* 1 RC for the whole I/O */
1076                 ptlrpc_req_set_repsize(req, 2, size);
1077         }
1078
1079         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1080         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1081         aa->aa_oa = oa;
1082         aa->aa_requested_nob = requested_nob;
1083         aa->aa_nio_count = niocount;
1084         aa->aa_page_count = page_count;
1085         aa->aa_resends = 0;
1086         aa->aa_ppga = pga;
1087         aa->aa_cli = cli;
1088         INIT_LIST_HEAD(&aa->aa_oaps);
1089
1090         *reqp = req;
1091         RETURN (0);
1092
1093  out:
1094         ptlrpc_req_finished (req);
1095         RETURN (rc);
1096 }
1097
1098 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1099                                 __u32 client_cksum, __u32 server_cksum,
1100                                 int nob, obd_count page_count,
1101                                 struct brw_page **pga)
1102 {
1103         __u32 new_cksum;
1104         char *msg;
1105
1106         if (server_cksum == client_cksum) {
1107                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1108                 return 0;
1109         }
1110
1111         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1112
1113         if (new_cksum == server_cksum)
1114                 msg = "changed on the client after we checksummed it - "
1115                       "likely false positive due to mmap IO (bug 11742)";
1116         else if (new_cksum == client_cksum)
1117                 msg = "changed in transit before arrival at OST";
1118         else
1119                 msg = "changed in transit AND doesn't match the original - "
1120                       "likely false positive due to mmap IO (bug 11742)";
1121
1122         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1123                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1124                            "["LPU64"-"LPU64"]\n",
1125                            msg, libcfs_nid2str(peer->nid),
1126                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1127                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1128                                                         (__u64)0,
1129                            oa->o_id,
1130                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1131                            pga[0]->off,
1132                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1133         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1134                client_cksum, server_cksum, new_cksum);
1135         return 1;        
1136 }
1137
1138 /* Note rc enters this function as number of bytes transferred */
1139 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1140 {
1141         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1142         const lnet_process_id_t *peer =
1143                         &req->rq_import->imp_connection->c_peer;
1144         struct client_obd *cli = aa->aa_cli;
1145         struct ost_body *body;
1146         __u32 client_cksum = 0;
1147         ENTRY;
1148
1149         if (rc < 0 && rc != -EDQUOT)
1150                 RETURN(rc);
1151
1152         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1153         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1154                                   lustre_swab_ost_body);
1155         if (body == NULL) {
1156                 CERROR ("Can't unpack body\n");
1157                 RETURN(-EPROTO);
1158         }
1159
1160         /* set/clear over quota flag for a uid/gid */
1161         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1162             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1163                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1164                              body->oa.o_gid, body->oa.o_valid,
1165                              body->oa.o_flags);
1166
1167         if (rc < 0)
1168                 RETURN(rc);
1169
1170         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1171                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1172
1173         osc_update_grant(cli, body);
1174
1175         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1176                 if (rc > 0) {
1177                         CERROR ("Unexpected +ve rc %d\n", rc);
1178                         RETURN(-EPROTO);
1179                 }
1180                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1181
1182                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1183                              client_cksum &&
1184                              check_write_checksum(&body->oa, peer, client_cksum,
1185                                                   body->oa.o_cksum,
1186                                                   aa->aa_requested_nob,
1187                                                   aa->aa_page_count,
1188                                                   aa->aa_ppga)))
1189                         RETURN(-EAGAIN);
1190
1191                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1192                         RETURN(-EAGAIN);
1193
1194                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1195                                      aa->aa_page_count, aa->aa_ppga);
1196                 GOTO(out, rc);
1197         }
1198
1199         /* The rest of this function executes only for OST_READs */
1200         if (rc > aa->aa_requested_nob) {
1201                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1202                        aa->aa_requested_nob);
1203                 RETURN(-EPROTO);
1204         }
1205
1206         if (rc != req->rq_bulk->bd_nob_transferred) {
1207                 CERROR ("Unexpected rc %d (%d transferred)\n",
1208                         rc, req->rq_bulk->bd_nob_transferred);
1209                 return (-EPROTO);
1210         }
1211
1212         if (rc < aa->aa_requested_nob)
1213                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1214
1215         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1216                                          aa->aa_ppga))
1217                 GOTO(out, rc = -EAGAIN);
1218
1219         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1220                 static int cksum_counter;
1221                 __u32      server_cksum = body->oa.o_cksum;
1222                 char      *via;
1223                 char      *router;
1224
1225                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1226                                                  aa->aa_ppga, OST_READ);
1227
1228                 if (peer->nid == req->rq_bulk->bd_sender) {
1229                         via = router = "";
1230                 } else {
1231                         via = " via ";
1232                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1233                 }
1234
1235                 if (server_cksum == ~0 && rc > 0) {
1236                         CERROR("Protocol error: server %s set the 'checksum' "
1237                                "bit, but didn't send a checksum.  Not fatal, "
1238                                "but please tell CFS.\n",
1239                                libcfs_nid2str(peer->nid));
1240                 } else if (server_cksum != client_cksum) {
1241                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1242                                            "%s%s%s inum "LPU64"/"LPU64" object "
1243                                            LPU64"/"LPU64" extent "
1244                                            "["LPU64"-"LPU64"]\n",
1245                                            req->rq_import->imp_obd->obd_name,
1246                                            libcfs_nid2str(peer->nid),
1247                                            via, router,
1248                                            body->oa.o_valid & OBD_MD_FLFID ?
1249                                                 body->oa.o_fid : (__u64)0,
1250                                            body->oa.o_valid & OBD_MD_FLFID ?
1251                                                 body->oa.o_generation :(__u64)0,
1252                                            body->oa.o_id,
1253                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1254                                                 body->oa.o_gr : (__u64)0,
1255                                            aa->aa_ppga[0]->off,
1256                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1257                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1258                                                                         1);
1259                         CERROR("client %x, server %x\n",
1260                                client_cksum, server_cksum);
1261                         cksum_counter = 0;
1262                         aa->aa_oa->o_cksum = client_cksum;
1263                         rc = -EAGAIN;
1264                 } else {
1265                         cksum_counter++;
1266                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1267                         rc = 0;
1268                 }
1269         } else if (unlikely(client_cksum)) {
1270                 static int cksum_missed;
1271
1272                 cksum_missed++;
1273                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1274                         CERROR("Checksum %u requested from %s but not sent\n",
1275                                cksum_missed, libcfs_nid2str(peer->nid));
1276         } else {
1277                 rc = 0;
1278         }
1279 out:
1280         if (rc >= 0)
1281                 *aa->aa_oa = body->oa;
1282
1283         RETURN(rc);
1284 }
1285
1286 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1287                             struct lov_stripe_md *lsm,
1288                             obd_count page_count, struct brw_page **pga,
1289                             struct obd_capa *ocapa)
1290 {
1291         struct ptlrpc_request *req;
1292         int                    rc;
1293         cfs_waitq_t            waitq;
1294         int                    resends = 0;
1295         struct l_wait_info     lwi;
1296
1297         ENTRY;
1298
1299         cfs_waitq_init(&waitq);
1300
1301 restart_bulk:
1302         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1303                                   page_count, pga, &req, ocapa);
1304         if (rc != 0)
1305                 return (rc);
1306
1307         rc = ptlrpc_queue_wait(req);
1308
1309         if (rc == -ETIMEDOUT && req->rq_resend) {
1310                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1311                 ptlrpc_req_finished(req);
1312                 goto restart_bulk;
1313         }
1314
1315         rc = osc_brw_fini_request(req, rc);
1316
1317         ptlrpc_req_finished(req);
1318         if (osc_recoverable_error(rc)) {
1319                 resends++;
1320                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1321                         CERROR("too many resend retries, returning error\n");
1322                         RETURN(-EIO);
1323                 }
1324
1325                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1326                 l_wait_event(waitq, 0, &lwi);
1327
1328                 goto restart_bulk;
1329         }
1330         
1331         RETURN (rc);
1332 }
1333
1334 int osc_brw_redo_request(struct ptlrpc_request *request,
1335                          struct osc_brw_async_args *aa)
1336 {
1337         struct ptlrpc_request *new_req;
1338         struct ptlrpc_request_set *set = request->rq_set;
1339         struct osc_brw_async_args *new_aa;
1340         struct osc_async_page *oap;
1341         int rc = 0;
1342         ENTRY;
1343
1344         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1345                 CERROR("too many resend retries, returning error\n");
1346                 RETURN(-EIO);
1347         }
1348         
1349         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1350 /*
1351         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1352         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1353                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1354                                            REQ_REC_OFF + 3);
1355 */
1356         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1357                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1358                                   aa->aa_cli, aa->aa_oa,
1359                                   NULL /* lsm unused by osc currently */,
1360                                   aa->aa_page_count, aa->aa_ppga, 
1361                                   &new_req, NULL /* ocapa */);
1362         if (rc)
1363                 RETURN(rc);
1364
1365         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1366    
1367         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1368                 if (oap->oap_request != NULL) {
1369                         LASSERTF(request == oap->oap_request,
1370                                  "request %p != oap_request %p\n",
1371                                  request, oap->oap_request);
1372                         if (oap->oap_interrupted) {
1373                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1374                                 ptlrpc_req_finished(new_req);                        
1375                                 RETURN(-EINTR);
1376                         }
1377                 }
1378         }
1379         /* New request takes over pga and oaps from old request.
1380          * Note that copying a list_head doesn't work, need to move it... */
1381         aa->aa_resends++;
1382         new_req->rq_interpret_reply = request->rq_interpret_reply;
1383         new_req->rq_async_args = request->rq_async_args;
1384         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1385
1386         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1387
1388         INIT_LIST_HEAD(&new_aa->aa_oaps);
1389         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1390         INIT_LIST_HEAD(&aa->aa_oaps);
1391
1392         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1393                 if (oap->oap_request) {
1394                         ptlrpc_req_finished(oap->oap_request);
1395                         oap->oap_request = ptlrpc_request_addref(new_req);
1396                 }
1397         }
1398
1399         /* use ptlrpc_set_add_req is safe because interpret functions work 
1400          * in check_set context. only one way exist with access to request 
1401          * from different thread got -EINTR - this way protected with 
1402          * cl_loi_list_lock */
1403         ptlrpc_set_add_req(set, new_req);
1404
1405         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1406
1407         DEBUG_REQ(D_INFO, new_req, "new request");
1408         RETURN(0);
1409 }
1410
1411 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1412 {
1413         struct osc_brw_async_args *aa = data;
1414         int                        i;
1415         int                        nob = rc;
1416         ENTRY;
1417
1418         rc = osc_brw_fini_request(req, rc);
1419         if (osc_recoverable_error(rc)) {
1420                 rc = osc_brw_redo_request(req, aa);
1421                 if (rc == 0)
1422                         RETURN(0);
1423         }
1424         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1425                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1426
1427         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1428         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1429                 aa->aa_cli->cl_w_in_flight--;
1430         else
1431                 aa->aa_cli->cl_r_in_flight--;
1432         for (i = 0; i < aa->aa_page_count; i++)
1433                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1434         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1435
1436         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1437
1438         RETURN(rc);
1439 }
1440
1441 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1442                           struct lov_stripe_md *lsm, obd_count page_count,
1443                           struct brw_page **pga, struct ptlrpc_request_set *set,
1444                           struct obd_capa *ocapa)
1445 {
1446         struct ptlrpc_request     *req;
1447         struct client_obd         *cli = &exp->exp_obd->u.cli;
1448         int                        rc, i;
1449         struct osc_brw_async_args *aa;
1450         ENTRY;
1451
1452         /* Consume write credits even if doing a sync write -
1453          * otherwise we may run out of space on OST due to grant. */
1454         if (cmd == OBD_BRW_WRITE) {
1455                 spin_lock(&cli->cl_loi_list_lock);
1456                 for (i = 0; i < page_count; i++) {
1457                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1458                                 osc_consume_write_grant(cli, pga[i]);
1459                 }
1460                 spin_unlock(&cli->cl_loi_list_lock);
1461         }
1462
1463         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1464                                   &req, ocapa);
1465
1466         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1467         if (cmd == OBD_BRW_READ) {
1468                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1469                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1470                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1471         } else {
1472                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1473                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1474                                  cli->cl_w_in_flight);
1475                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1476         }
1477
1478         if (rc == 0) {
1479                 req->rq_interpret_reply = brw_interpret;
1480                 ptlrpc_set_add_req(set, req);
1481                 client_obd_list_lock(&cli->cl_loi_list_lock);
1482                 if (cmd == OBD_BRW_READ)
1483                         cli->cl_r_in_flight++;
1484                 else
1485                         cli->cl_w_in_flight++;
1486                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1487         } else if (cmd == OBD_BRW_WRITE) {
1488                 client_obd_list_lock(&cli->cl_loi_list_lock);
1489                 for (i = 0; i < page_count; i++)
1490                         osc_release_write_grant(cli, pga[i], 0);
1491                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1492         }
1493         RETURN (rc);
1494 }
1495
1496 /*
1497  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1498  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1499  * fine for our small page arrays and doesn't require allocation.  its an
1500  * insertion sort that swaps elements that are strides apart, shrinking the
1501  * stride down until its '1' and the array is sorted.
1502  */
1503 static void sort_brw_pages(struct brw_page **array, int num)
1504 {
1505         int stride, i, j;
1506         struct brw_page *tmp;
1507
1508         if (num == 1)
1509                 return;
1510         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1511                 ;
1512
1513         do {
1514                 stride /= 3;
1515                 for (i = stride ; i < num ; i++) {
1516                         tmp = array[i];
1517                         j = i;
1518                         while (j >= stride && array[j - stride]->off > tmp->off) {
1519                                 array[j] = array[j - stride];
1520                                 j -= stride;
1521                         }
1522                         array[j] = tmp;
1523                 }
1524         } while (stride > 1);
1525 }
1526
1527 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1528 {
1529         int count = 1;
1530         int offset;
1531         int i = 0;
1532
1533         LASSERT (pages > 0);
1534         offset = pg[i]->off & ~CFS_PAGE_MASK;
1535
1536         for (;;) {
1537                 pages--;
1538                 if (pages == 0)         /* that's all */
1539                         return count;
1540
1541                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1542                         return count;   /* doesn't end on page boundary */
1543
1544                 i++;
1545                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1546                 if (offset != 0)        /* doesn't start on page boundary */
1547                         return count;
1548
1549                 count++;
1550         }
1551 }
1552
1553 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1554 {
1555         struct brw_page **ppga;
1556         int i;
1557
1558         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1559         if (ppga == NULL)
1560                 return NULL;
1561
1562         for (i = 0; i < count; i++)
1563                 ppga[i] = pga + i;
1564         return ppga;
1565 }
1566
1567 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1568 {
1569         LASSERT(ppga != NULL);
1570         OBD_FREE(ppga, sizeof(*ppga) * count);
1571 }
1572
1573 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1574                    obd_count page_count, struct brw_page *pga,
1575                    struct obd_trans_info *oti)
1576 {
1577         struct obdo *saved_oa = NULL;
1578         struct brw_page **ppga, **orig;
1579         struct obd_import *imp = class_exp2cliimp(exp);
1580         struct client_obd *cli = &imp->imp_obd->u.cli;
1581         int rc, page_count_orig;
1582         ENTRY;
1583
1584         if (cmd & OBD_BRW_CHECK) {
1585                 /* The caller just wants to know if there's a chance that this
1586                  * I/O can succeed */
1587
1588                 if (imp == NULL || imp->imp_invalid)
1589                         RETURN(-EIO);
1590                 RETURN(0);
1591         }
1592
1593         /* test_brw with a failed create can trip this, maybe others. */
1594         LASSERT(cli->cl_max_pages_per_rpc);
1595
1596         rc = 0;
1597
1598         orig = ppga = osc_build_ppga(pga, page_count);
1599         if (ppga == NULL)
1600                 RETURN(-ENOMEM);
1601         page_count_orig = page_count;
1602
1603         sort_brw_pages(ppga, page_count);
1604         while (page_count) {
1605                 obd_count pages_per_brw;
1606
1607                 if (page_count > cli->cl_max_pages_per_rpc)
1608                         pages_per_brw = cli->cl_max_pages_per_rpc;
1609                 else
1610                         pages_per_brw = page_count;
1611
1612                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1613
1614                 if (saved_oa != NULL) {
1615                         /* restore previously saved oa */
1616                         *oinfo->oi_oa = *saved_oa;
1617                 } else if (page_count > pages_per_brw) {
1618                         /* save a copy of oa (brw will clobber it) */
1619                         OBDO_ALLOC(saved_oa);
1620                         if (saved_oa == NULL)
1621                                 GOTO(out, rc = -ENOMEM);
1622                         *saved_oa = *oinfo->oi_oa;
1623                 }
1624
1625                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1626                                       pages_per_brw, ppga, oinfo->oi_capa);
1627
1628                 if (rc != 0)
1629                         break;
1630
1631                 page_count -= pages_per_brw;
1632                 ppga += pages_per_brw;
1633         }
1634
1635 out:
1636         osc_release_ppga(orig, page_count_orig);
1637
1638         if (saved_oa != NULL)
1639                 OBDO_FREE(saved_oa);
1640
1641         RETURN(rc);
1642 }
1643
1644 static int osc_brw_async(int cmd, struct obd_export *exp,
1645                          struct obd_info *oinfo, obd_count page_count,
1646                          struct brw_page *pga, struct obd_trans_info *oti,
1647                          struct ptlrpc_request_set *set)
1648 {
1649         struct brw_page **ppga, **orig;
1650         struct client_obd *cli = &exp->exp_obd->u.cli;
1651         int page_count_orig;
1652         int rc = 0;
1653         ENTRY;
1654
1655         if (cmd & OBD_BRW_CHECK) {
1656                 struct obd_import *imp = class_exp2cliimp(exp);
1657                 /* The caller just wants to know if there's a chance that this
1658                  * I/O can succeed */
1659
1660                 if (imp == NULL || imp->imp_invalid)
1661                         RETURN(-EIO);
1662                 RETURN(0);
1663         }
1664
1665         orig = ppga = osc_build_ppga(pga, page_count);
1666         if (ppga == NULL)
1667                 RETURN(-ENOMEM);
1668         page_count_orig = page_count;
1669
1670         sort_brw_pages(ppga, page_count);
1671         while (page_count) {
1672                 struct brw_page **copy;
1673                 obd_count pages_per_brw;
1674
1675                 pages_per_brw = min_t(obd_count, page_count,
1676                                       cli->cl_max_pages_per_rpc);
1677
1678                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1679
1680                 /* use ppga only if single RPC is going to fly */
1681                 if (pages_per_brw != page_count_orig || ppga != orig) {
1682                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1683                         if (copy == NULL)
1684                                 GOTO(out, rc = -ENOMEM);
1685                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1686                 } else
1687                         copy = ppga;
1688
1689                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1690                                     pages_per_brw, copy, set, oinfo->oi_capa);
1691
1692                 if (rc != 0) {
1693                         if (copy != ppga)
1694                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1695                         break;
1696                 }
1697                 if (copy == orig) {
1698                         /* we passed it to async_internal() which is
1699                          * now responsible for releasing memory */
1700                         orig = NULL;
1701                 }
1702
1703                 page_count -= pages_per_brw;
1704                 ppga += pages_per_brw;
1705         }
1706 out:
1707         if (orig)
1708                 osc_release_ppga(orig, page_count_orig);
1709         RETURN(rc);
1710 }
1711
1712 static void osc_check_rpcs(struct client_obd *cli);
1713
1714 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1715  * the dirty accounting.  Writeback completes or truncate happens before
1716  * writing starts.  Must be called with the loi lock held. */
1717 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1718                            int sent)
1719 {
1720         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1721 }
1722
1723
1724 /* This maintains the lists of pending pages to read/write for a given object
1725  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1726  * to quickly find objects that are ready to send an RPC. */
1727 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1728                          int cmd)
1729 {
1730         int optimal;
1731         ENTRY;
1732
1733         if (lop->lop_num_pending == 0)
1734                 RETURN(0);
1735
1736         /* if we have an invalid import we want to drain the queued pages
1737          * by forcing them through rpcs that immediately fail and complete
1738          * the pages.  recovery relies on this to empty the queued pages
1739          * before canceling the locks and evicting down the llite pages */
1740         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1741                 RETURN(1);
1742
1743         /* stream rpcs in queue order as long as as there is an urgent page
1744          * queued.  this is our cheap solution for good batching in the case
1745          * where writepage marks some random page in the middle of the file
1746          * as urgent because of, say, memory pressure */
1747         if (!list_empty(&lop->lop_urgent)) {
1748                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1749                 RETURN(1);
1750         }
1751         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1752         optimal = cli->cl_max_pages_per_rpc;
1753         if (cmd & OBD_BRW_WRITE) {
1754                 /* trigger a write rpc stream as long as there are dirtiers
1755                  * waiting for space.  as they're waiting, they're not going to
1756                  * create more pages to coallesce with what's waiting.. */
1757                 if (!list_empty(&cli->cl_cache_waiters)) {
1758                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1759                         RETURN(1);
1760                 }
1761                 /* +16 to avoid triggering rpcs that would want to include pages
1762                  * that are being queued but which can't be made ready until
1763                  * the queuer finishes with the page. this is a wart for
1764                  * llite::commit_write() */
1765                 optimal += 16;
1766         }
1767         if (lop->lop_num_pending >= optimal)
1768                 RETURN(1);
1769
1770         RETURN(0);
1771 }
1772
1773 static void on_list(struct list_head *item, struct list_head *list,
1774                     int should_be_on)
1775 {
1776         if (list_empty(item) && should_be_on)
1777                 list_add_tail(item, list);
1778         else if (!list_empty(item) && !should_be_on)
1779                 list_del_init(item);
1780 }
1781
1782 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1783  * can find pages to build into rpcs quickly */
1784 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1785 {
1786         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1787                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1788                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1789
1790         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1791                 loi->loi_write_lop.lop_num_pending);
1792
1793         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1794                 loi->loi_read_lop.lop_num_pending);
1795 }
1796
1797 static void lop_update_pending(struct client_obd *cli,
1798                                struct loi_oap_pages *lop, int cmd, int delta)
1799 {
1800         lop->lop_num_pending += delta;
1801         if (cmd & OBD_BRW_WRITE)
1802                 cli->cl_pending_w_pages += delta;
1803         else
1804                 cli->cl_pending_r_pages += delta;
1805 }
1806
1807 /* this is called when a sync waiter receives an interruption.  Its job is to
1808  * get the caller woken as soon as possible.  If its page hasn't been put in an
1809  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1810  * desiring interruption which will forcefully complete the rpc once the rpc
1811  * has timed out */
1812 static void osc_occ_interrupted(struct oig_callback_context *occ)
1813 {
1814         struct osc_async_page *oap;
1815         struct loi_oap_pages *lop;
1816         struct lov_oinfo *loi;
1817         ENTRY;
1818
1819         /* XXX member_of() */
1820         oap = list_entry(occ, struct osc_async_page, oap_occ);
1821
1822         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1823
1824         oap->oap_interrupted = 1;
1825
1826         /* ok, it's been put in an rpc. only one oap gets a request reference */
1827         if (oap->oap_request != NULL) {
1828                 ptlrpc_mark_interrupted(oap->oap_request);
1829                 ptlrpcd_wake(oap->oap_request);
1830                 GOTO(unlock, 0);
1831         }
1832
1833         /* we don't get interruption callbacks until osc_trigger_group_io()
1834          * has been called and put the sync oaps in the pending/urgent lists.*/
1835         if (!list_empty(&oap->oap_pending_item)) {
1836                 list_del_init(&oap->oap_pending_item);
1837                 list_del_init(&oap->oap_urgent_item);
1838
1839                 loi = oap->oap_loi;
1840                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1841                         &loi->loi_write_lop : &loi->loi_read_lop;
1842                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1843                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1844
1845                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1846                 oap->oap_oig = NULL;
1847         }
1848
1849 unlock:
1850         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1851 }
1852
1853 /* this is trying to propogate async writeback errors back up to the
1854  * application.  As an async write fails we record the error code for later if
1855  * the app does an fsync.  As long as errors persist we force future rpcs to be
1856  * sync so that the app can get a sync error and break the cycle of queueing
1857  * pages for which writeback will fail. */
1858 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1859                            int rc)
1860 {
1861         if (rc) {
1862                 if (!ar->ar_rc)
1863                         ar->ar_rc = rc;
1864
1865                 ar->ar_force_sync = 1;
1866                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1867                 return;
1868
1869         }
1870
1871         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1872                 ar->ar_force_sync = 0;
1873 }
1874
1875 static void osc_oap_to_pending(struct osc_async_page *oap)
1876 {
1877         struct loi_oap_pages *lop;
1878
1879         if (oap->oap_cmd & OBD_BRW_WRITE)
1880                 lop = &oap->oap_loi->loi_write_lop;
1881         else
1882                 lop = &oap->oap_loi->loi_read_lop;
1883
1884         if (oap->oap_async_flags & ASYNC_URGENT)
1885                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1886         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1887         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1888 }
1889
1890 /* this must be called holding the loi list lock to give coverage to exit_cache,
1891  * async_flag maintenance, and oap_request */
1892 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1893                               struct osc_async_page *oap, int sent, int rc)
1894 {
1895         __u64 xid = 0;
1896
1897         ENTRY;
1898         if (oap->oap_request != NULL) {
1899                 xid = ptlrpc_req_xid(oap->oap_request);
1900                 ptlrpc_req_finished(oap->oap_request);
1901                 oap->oap_request = NULL;
1902         }
1903
1904         oap->oap_async_flags = 0;
1905         oap->oap_interrupted = 0;
1906
1907         if (oap->oap_cmd & OBD_BRW_WRITE) {
1908                 osc_process_ar(&cli->cl_ar, xid, rc);
1909                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1910         }
1911
1912         if (rc == 0 && oa != NULL) {
1913                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1914                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1915                 if (oa->o_valid & OBD_MD_FLMTIME)
1916                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1917                 if (oa->o_valid & OBD_MD_FLATIME)
1918                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1919                 if (oa->o_valid & OBD_MD_FLCTIME)
1920                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1921         }
1922
1923         if (oap->oap_oig) {
1924                 osc_exit_cache(cli, oap, sent);
1925                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1926                 oap->oap_oig = NULL;
1927                 EXIT;
1928                 return;
1929         }
1930
1931         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1932                                                 oap->oap_cmd, oa, rc);
1933
1934         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1935          * I/O on the page could start, but OSC calls it under lock
1936          * and thus we can add oap back to pending safely */
1937         if (rc)
1938                 /* upper layer wants to leave the page on pending queue */
1939                 osc_oap_to_pending(oap);
1940         else
1941                 osc_exit_cache(cli, oap, sent);
1942         EXIT;
1943 }
1944
1945 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1946 {
1947         struct osc_async_page *oap, *tmp;
1948         struct osc_brw_async_args *aa = data;
1949         struct client_obd *cli;
1950         ENTRY;
1951
1952         rc = osc_brw_fini_request(req, rc);
1953         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1954         if (osc_recoverable_error(rc)) {
1955                 rc = osc_brw_redo_request(req, aa);
1956                 if (rc == 0)
1957                         RETURN(0);
1958         }
1959
1960         cli = aa->aa_cli;
1961
1962         client_obd_list_lock(&cli->cl_loi_list_lock);
1963
1964         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1965          * is called so we know whether to go to sync BRWs or wait for more
1966          * RPCs to complete */
1967         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1968                 cli->cl_w_in_flight--;
1969         else
1970                 cli->cl_r_in_flight--;
1971
1972         /* the caller may re-use the oap after the completion call so
1973          * we need to clean it up a little */
1974         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1975                 list_del_init(&oap->oap_rpc_item);
1976                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1977         }
1978
1979         osc_wake_cache_waiters(cli);
1980         osc_check_rpcs(cli);
1981
1982         client_obd_list_unlock(&cli->cl_loi_list_lock);
1983
1984         OBDO_FREE(aa->aa_oa);
1985         
1986         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1987         RETURN(rc);
1988 }
1989
1990 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1991                                             struct list_head *rpc_list,
1992                                             int page_count, int cmd)
1993 {
1994         struct ptlrpc_request *req;
1995         struct brw_page **pga = NULL;
1996         struct osc_brw_async_args *aa;
1997         struct obdo *oa = NULL;
1998         struct obd_async_page_ops *ops = NULL;
1999         void *caller_data = NULL;
2000         struct obd_capa *ocapa;
2001         struct osc_async_page *oap;
2002         int i, rc;
2003
2004         ENTRY;
2005         LASSERT(!list_empty(rpc_list));
2006
2007         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2008         if (pga == NULL)
2009                 RETURN(ERR_PTR(-ENOMEM));
2010
2011         OBDO_ALLOC(oa);
2012         if (oa == NULL)
2013                 GOTO(out, req = ERR_PTR(-ENOMEM));
2014
2015         i = 0;
2016         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2017                 if (ops == NULL) {
2018                         ops = oap->oap_caller_ops;
2019                         caller_data = oap->oap_caller_data;
2020                 }
2021                 pga[i] = &oap->oap_brw_page;
2022                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2023                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2024                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2025                 i++;
2026         }
2027
2028         /* always get the data for the obdo for the rpc */
2029         LASSERT(ops != NULL);
2030         ops->ap_fill_obdo(caller_data, cmd, oa);
2031         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2032
2033         sort_brw_pages(pga, page_count);
2034         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2035                                   pga, &req, ocapa);
2036         capa_put(ocapa);
2037         if (rc != 0) {
2038                 CERROR("prep_req failed: %d\n", rc);
2039                 GOTO(out, req = ERR_PTR(rc));
2040         }
2041
2042         /* Need to update the timestamps after the request is built in case
2043          * we race with setattr (locally or in queue at OST).  If OST gets
2044          * later setattr before earlier BRW (as determined by the request xid),
2045          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2046          * way to do this in a single call.  bug 10150 */
2047         ops->ap_update_obdo(caller_data, cmd, oa,
2048                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2049
2050         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2051         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2052         INIT_LIST_HEAD(&aa->aa_oaps);
2053         list_splice(rpc_list, &aa->aa_oaps);
2054         INIT_LIST_HEAD(rpc_list);
2055
2056 out:
2057         if (IS_ERR(req)) {
2058                 if (oa)
2059                         OBDO_FREE(oa);
2060                 if (pga)
2061                         OBD_FREE(pga, sizeof(*pga) * page_count);
2062         }
2063         RETURN(req);
2064 }
2065
2066 /* the loi lock is held across this function but it's allowed to release
2067  * and reacquire it during its work */
2068 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2069                             int cmd, struct loi_oap_pages *lop)
2070 {
2071         struct ptlrpc_request *req;
2072         obd_count page_count = 0;
2073         struct osc_async_page *oap = NULL, *tmp;
2074         struct osc_brw_async_args *aa;
2075         struct obd_async_page_ops *ops;
2076         CFS_LIST_HEAD(rpc_list);
2077         unsigned int ending_offset;
2078         unsigned  starting_offset = 0;
2079         ENTRY;
2080
2081         /* first we find the pages we're allowed to work with */
2082         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2083                                  oap_pending_item) {
2084                 ops = oap->oap_caller_ops;
2085
2086                 LASSERT(oap->oap_magic == OAP_MAGIC);
2087
2088                 /* in llite being 'ready' equates to the page being locked
2089                  * until completion unlocks it.  commit_write submits a page
2090                  * as not ready because its unlock will happen unconditionally
2091                  * as the call returns.  if we race with commit_write giving
2092                  * us that page we dont' want to create a hole in the page
2093                  * stream, so we stop and leave the rpc to be fired by
2094                  * another dirtier or kupdated interval (the not ready page
2095                  * will still be on the dirty list).  we could call in
2096                  * at the end of ll_file_write to process the queue again. */
2097                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2098                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2099                         if (rc < 0)
2100                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2101                                                 "instead of ready\n", oap,
2102                                                 oap->oap_page, rc);
2103                         switch (rc) {
2104                         case -EAGAIN:
2105                                 /* llite is telling us that the page is still
2106                                  * in commit_write and that we should try
2107                                  * and put it in an rpc again later.  we
2108                                  * break out of the loop so we don't create
2109                                  * a hole in the sequence of pages in the rpc
2110                                  * stream.*/
2111                                 oap = NULL;
2112                                 break;
2113                         case -EINTR:
2114                                 /* the io isn't needed.. tell the checks
2115                                  * below to complete the rpc with EINTR */
2116                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2117                                 oap->oap_count = -EINTR;
2118                                 break;
2119                         case 0:
2120                                 oap->oap_async_flags |= ASYNC_READY;
2121                                 break;
2122                         default:
2123                                 LASSERTF(0, "oap %p page %p returned %d "
2124                                             "from make_ready\n", oap,
2125                                             oap->oap_page, rc);
2126                                 break;
2127                         }
2128                 }
2129                 if (oap == NULL)
2130                         break;
2131                 /*
2132                  * Page submitted for IO has to be locked. Either by
2133                  * ->ap_make_ready() or by higher layers.
2134                  *
2135                  * XXX nikita: this assertion should be adjusted when lustre
2136                  * starts using PG_writeback for pages being written out.
2137                  */
2138 #if defined(__KERNEL__) && defined(__linux__)
2139                 LASSERT(PageLocked(oap->oap_page));
2140 #endif
2141                 /* If there is a gap at the start of this page, it can't merge
2142                  * with any previous page, so we'll hand the network a
2143                  * "fragmented" page array that it can't transfer in 1 RDMA */
2144                 if (page_count != 0 && oap->oap_page_off != 0)
2145                         break;
2146
2147                 /* take the page out of our book-keeping */
2148                 list_del_init(&oap->oap_pending_item);
2149                 lop_update_pending(cli, lop, cmd, -1);
2150                 list_del_init(&oap->oap_urgent_item);
2151
2152                 if (page_count == 0)
2153                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2154                                           (PTLRPC_MAX_BRW_SIZE - 1);
2155
2156                 /* ask the caller for the size of the io as the rpc leaves. */
2157                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2158                         oap->oap_count =
2159                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2160                 if (oap->oap_count <= 0) {
2161                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2162                                oap->oap_count);
2163                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2164                         continue;
2165                 }
2166
2167                 /* now put the page back in our accounting */
2168                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2169                 if (++page_count >= cli->cl_max_pages_per_rpc)
2170                         break;
2171
2172                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2173                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2174                  * have the same alignment as the initial writes that allocated
2175                  * extents on the server. */
2176                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2177                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2178                 if (ending_offset == 0)
2179                         break;
2180
2181                 /* If there is a gap at the end of this page, it can't merge
2182                  * with any subsequent pages, so we'll hand the network a
2183                  * "fragmented" page array that it can't transfer in 1 RDMA */
2184                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2185                         break;
2186         }
2187
2188         osc_wake_cache_waiters(cli);
2189
2190         if (page_count == 0)
2191                 RETURN(0);
2192
2193         loi_list_maint(cli, loi);
2194
2195         client_obd_list_unlock(&cli->cl_loi_list_lock);
2196
2197         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2198         if (IS_ERR(req)) {
2199                 /* this should happen rarely and is pretty bad, it makes the
2200                  * pending list not follow the dirty order */
2201                 client_obd_list_lock(&cli->cl_loi_list_lock);
2202                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2203                         list_del_init(&oap->oap_rpc_item);
2204
2205                         /* queued sync pages can be torn down while the pages
2206                          * were between the pending list and the rpc */
2207                         if (oap->oap_interrupted) {
2208                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2209                                 osc_ap_completion(cli, NULL, oap, 0,
2210                                                   oap->oap_count);
2211                                 continue;
2212                         }
2213                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2214                 }
2215                 loi_list_maint(cli, loi);
2216                 RETURN(PTR_ERR(req));
2217         }
2218
2219         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2220
2221         if (cmd == OBD_BRW_READ) {
2222                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2223                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2224                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2225                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2226                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2227         } else {
2228                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2229                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2230                                  cli->cl_w_in_flight);
2231                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2232                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2233                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2234         }
2235
2236         client_obd_list_lock(&cli->cl_loi_list_lock);
2237
2238         if (cmd == OBD_BRW_READ)
2239                 cli->cl_r_in_flight++;
2240         else
2241                 cli->cl_w_in_flight++;
2242
2243         /* queued sync pages can be torn down while the pages
2244          * were between the pending list and the rpc */
2245         tmp = NULL;
2246         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2247                 /* only one oap gets a request reference */
2248                 if (tmp == NULL)
2249                         tmp = oap;
2250                 if (oap->oap_interrupted && !req->rq_intr) {
2251                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2252                                oap, req);
2253                         ptlrpc_mark_interrupted(req);
2254                 }
2255         }
2256         if (tmp != NULL)
2257                 tmp->oap_request = ptlrpc_request_addref(req);
2258
2259         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2260                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2261
2262         req->rq_interpret_reply = brw_interpret_oap;
2263         ptlrpcd_add_req(req);
2264         RETURN(1);
2265 }
2266
2267 #define LOI_DEBUG(LOI, STR, args...)                                     \
2268         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2269                !list_empty(&(LOI)->loi_cli_item),                        \
2270                (LOI)->loi_write_lop.lop_num_pending,                     \
2271                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2272                (LOI)->loi_read_lop.lop_num_pending,                      \
2273                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2274                args)                                                     \
2275
2276 /* This is called by osc_check_rpcs() to find which objects have pages that
2277  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2278 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2279 {
2280         ENTRY;
2281         /* first return all objects which we already know to have
2282          * pages ready to be stuffed into rpcs */
2283         if (!list_empty(&cli->cl_loi_ready_list))
2284                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2285                                   struct lov_oinfo, loi_cli_item));
2286
2287         /* then if we have cache waiters, return all objects with queued
2288          * writes.  This is especially important when many small files
2289          * have filled up the cache and not been fired into rpcs because
2290          * they don't pass the nr_pending/object threshhold */
2291         if (!list_empty(&cli->cl_cache_waiters) &&
2292             !list_empty(&cli->cl_loi_write_list))
2293                 RETURN(list_entry(cli->cl_loi_write_list.next,
2294                                   struct lov_oinfo, loi_write_item));
2295
2296         /* then return all queued objects when we have an invalid import
2297          * so that they get flushed */
2298         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2299                 if (!list_empty(&cli->cl_loi_write_list))
2300                         RETURN(list_entry(cli->cl_loi_write_list.next,
2301                                           struct lov_oinfo, loi_write_item));
2302                 if (!list_empty(&cli->cl_loi_read_list))
2303                         RETURN(list_entry(cli->cl_loi_read_list.next,
2304                                           struct lov_oinfo, loi_read_item));
2305         }
2306         RETURN(NULL);
2307 }
2308
2309 /* called with the loi list lock held */
2310 static void osc_check_rpcs(struct client_obd *cli)
2311 {
2312         struct lov_oinfo *loi;
2313         int rc = 0, race_counter = 0;
2314         ENTRY;
2315
2316         while ((loi = osc_next_loi(cli)) != NULL) {
2317                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2318
2319                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2320                         break;
2321
2322                 /* attempt some read/write balancing by alternating between
2323                  * reads and writes in an object.  The makes_rpc checks here
2324                  * would be redundant if we were getting read/write work items
2325                  * instead of objects.  we don't want send_oap_rpc to drain a
2326                  * partial read pending queue when we're given this object to
2327                  * do io on writes while there are cache waiters */
2328                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2329                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2330                                               &loi->loi_write_lop);
2331                         if (rc < 0)
2332                                 break;
2333                         if (rc > 0)
2334                                 race_counter = 0;
2335                         else
2336                                 race_counter++;
2337                 }
2338                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2339                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2340                                               &loi->loi_read_lop);
2341                         if (rc < 0)
2342                                 break;
2343                         if (rc > 0)
2344                                 race_counter = 0;
2345                         else
2346                                 race_counter++;
2347                 }
2348
2349                 /* attempt some inter-object balancing by issueing rpcs
2350                  * for each object in turn */
2351                 if (!list_empty(&loi->loi_cli_item))
2352                         list_del_init(&loi->loi_cli_item);
2353                 if (!list_empty(&loi->loi_write_item))
2354                         list_del_init(&loi->loi_write_item);
2355                 if (!list_empty(&loi->loi_read_item))
2356                         list_del_init(&loi->loi_read_item);
2357
2358                 loi_list_maint(cli, loi);
2359
2360                 /* send_oap_rpc fails with 0 when make_ready tells it to
2361                  * back off.  llite's make_ready does this when it tries
2362                  * to lock a page queued for write that is already locked.
2363                  * we want to try sending rpcs from many objects, but we
2364                  * don't want to spin failing with 0.  */
2365                 if (race_counter == 10)
2366                         break;
2367         }
2368         EXIT;
2369 }
2370
2371 /* we're trying to queue a page in the osc so we're subject to the
2372  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2373  * If the osc's queued pages are already at that limit, then we want to sleep
2374  * until there is space in the osc's queue for us.  We also may be waiting for
2375  * write credits from the OST if there are RPCs in flight that may return some
2376  * before we fall back to sync writes.
2377  *
2378  * We need this know our allocation was granted in the presence of signals */
2379 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2380 {
2381         int rc;
2382         ENTRY;
2383         client_obd_list_lock(&cli->cl_loi_list_lock);
2384         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2385         client_obd_list_unlock(&cli->cl_loi_list_lock);
2386         RETURN(rc);
2387 };
2388
2389 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2390  * grant or cache space. */
2391 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2392                            struct osc_async_page *oap)
2393 {
2394         struct osc_cache_waiter ocw;
2395         struct l_wait_info lwi = { 0 };
2396
2397         ENTRY;
2398
2399         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2400                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2401                cli->cl_dirty_max, obd_max_dirty_pages,
2402                cli->cl_lost_grant, cli->cl_avail_grant);
2403
2404         /* force the caller to try sync io.  this can jump the list
2405          * of queued writes and create a discontiguous rpc stream */
2406         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2407             loi->loi_ar.ar_force_sync)
2408                 RETURN(-EDQUOT);
2409
2410         /* Hopefully normal case - cache space and write credits available */
2411         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2412             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2413             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2414                 /* account for ourselves */
2415                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2416                 RETURN(0);
2417         }
2418
2419         /* Make sure that there are write rpcs in flight to wait for.  This
2420          * is a little silly as this object may not have any pending but
2421          * other objects sure might. */
2422         if (cli->cl_w_in_flight) {
2423                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2424                 cfs_waitq_init(&ocw.ocw_waitq);
2425                 ocw.ocw_oap = oap;
2426                 ocw.ocw_rc = 0;
2427
2428                 loi_list_maint(cli, loi);
2429                 osc_check_rpcs(cli);
2430                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2431
2432                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2433                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2434
2435                 client_obd_list_lock(&cli->cl_loi_list_lock);
2436                 if (!list_empty(&ocw.ocw_entry)) {
2437                         list_del(&ocw.ocw_entry);
2438                         RETURN(-EINTR);
2439                 }
2440                 RETURN(ocw.ocw_rc);
2441         }
2442
2443         RETURN(-EDQUOT);
2444 }
2445
2446 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2447                         struct lov_oinfo *loi, cfs_page_t *page,
2448                         obd_off offset, struct obd_async_page_ops *ops,
2449                         void *data, void **res)
2450 {
2451         struct osc_async_page *oap;
2452         ENTRY;
2453
2454         if (!page)
2455                 return size_round(sizeof(*oap));
2456
2457         oap = *res;
2458         oap->oap_magic = OAP_MAGIC;
2459         oap->oap_cli = &exp->exp_obd->u.cli;
2460         oap->oap_loi = loi;
2461
2462         oap->oap_caller_ops = ops;
2463         oap->oap_caller_data = data;
2464
2465         oap->oap_page = page;
2466         oap->oap_obj_off = offset;
2467
2468         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2469         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2470         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2471
2472         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2473
2474         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2475         RETURN(0);
2476 }
2477
2478 struct osc_async_page *oap_from_cookie(void *cookie)
2479 {
2480         struct osc_async_page *oap = cookie;
2481         if (oap->oap_magic != OAP_MAGIC)
2482                 return ERR_PTR(-EINVAL);
2483         return oap;
2484 };
2485
2486 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2487                               struct lov_oinfo *loi, void *cookie,
2488                               int cmd, obd_off off, int count,
2489                               obd_flag brw_flags, enum async_flags async_flags)
2490 {
2491         struct client_obd *cli = &exp->exp_obd->u.cli;
2492         struct osc_async_page *oap;
2493         int rc = 0;
2494         ENTRY;
2495
2496         oap = oap_from_cookie(cookie);
2497         if (IS_ERR(oap))
2498                 RETURN(PTR_ERR(oap));
2499
2500         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2501                 RETURN(-EIO);
2502
2503         if (!list_empty(&oap->oap_pending_item) ||
2504             !list_empty(&oap->oap_urgent_item) ||
2505             !list_empty(&oap->oap_rpc_item))
2506                 RETURN(-EBUSY);
2507
2508         /* check if the file's owner/group is over quota */
2509 #ifdef HAVE_QUOTA_SUPPORT
2510         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2511                 struct obd_async_page_ops *ops;
2512                 struct obdo *oa;
2513
2514                 OBDO_ALLOC(oa);
2515                 if (oa == NULL)
2516                         RETURN(-ENOMEM);
2517
2518                 ops = oap->oap_caller_ops;
2519                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2520                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2521                     NO_QUOTA)
2522                         rc = -EDQUOT;
2523
2524                 OBDO_FREE(oa);
2525                 if (rc)
2526                         RETURN(rc);
2527         }
2528 #endif
2529
2530         if (loi == NULL)
2531                 loi = lsm->lsm_oinfo[0];
2532
2533         client_obd_list_lock(&cli->cl_loi_list_lock);
2534
2535         oap->oap_cmd = cmd;
2536         oap->oap_page_off = off;
2537         oap->oap_count = count;
2538         oap->oap_brw_flags = brw_flags;
2539         oap->oap_async_flags = async_flags;
2540
2541         if (cmd & OBD_BRW_WRITE) {
2542                 rc = osc_enter_cache(cli, loi, oap);
2543                 if (rc) {
2544                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2545                         RETURN(rc);
2546                 }
2547         }
2548
2549         osc_oap_to_pending(oap);
2550         loi_list_maint(cli, loi);
2551
2552         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2553                   cmd);
2554
2555         osc_check_rpcs(cli);
2556         client_obd_list_unlock(&cli->cl_loi_list_lock);
2557
2558         RETURN(0);
2559 }
2560
2561 /* aka (~was & now & flag), but this is more clear :) */
2562 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2563
2564 static int osc_set_async_flags(struct obd_export *exp,
2565                                struct lov_stripe_md *lsm,
2566                                struct lov_oinfo *loi, void *cookie,
2567                                obd_flag async_flags)
2568 {
2569         struct client_obd *cli = &exp->exp_obd->u.cli;
2570         struct loi_oap_pages *lop;
2571         struct osc_async_page *oap;
2572         int rc = 0;
2573         ENTRY;
2574
2575         oap = oap_from_cookie(cookie);
2576         if (IS_ERR(oap))
2577                 RETURN(PTR_ERR(oap));
2578
2579         /*
2580          * bug 7311: OST-side locking is only supported for liblustre for now
2581          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2582          * implementation has to handle case where OST-locked page was picked
2583          * up by, e.g., ->writepage().
2584          */
2585         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2586         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2587                                      * tread here. */
2588
2589         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2590                 RETURN(-EIO);
2591
2592         if (loi == NULL)
2593                 loi = lsm->lsm_oinfo[0];
2594
2595         if (oap->oap_cmd & OBD_BRW_WRITE) {
2596                 lop = &loi->loi_write_lop;
2597         } else {
2598                 lop = &loi->loi_read_lop;
2599         }
2600
2601         client_obd_list_lock(&cli->cl_loi_list_lock);
2602
2603         if (list_empty(&oap->oap_pending_item))
2604                 GOTO(out, rc = -EINVAL);
2605
2606         if ((oap->oap_async_flags & async_flags) == async_flags)
2607                 GOTO(out, rc = 0);
2608
2609         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2610                 oap->oap_async_flags |= ASYNC_READY;
2611
2612         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2613                 if (list_empty(&oap->oap_rpc_item)) {
2614                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2615                         loi_list_maint(cli, loi);
2616                 }
2617         }
2618
2619         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2620                         oap->oap_async_flags);
2621 out:
2622         osc_check_rpcs(cli);
2623         client_obd_list_unlock(&cli->cl_loi_list_lock);
2624         RETURN(rc);
2625 }
2626
2627 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2628                              struct lov_oinfo *loi,
2629                              struct obd_io_group *oig, void *cookie,
2630                              int cmd, obd_off off, int count,
2631                              obd_flag brw_flags,
2632                              obd_flag async_flags)
2633 {
2634         struct client_obd *cli = &exp->exp_obd->u.cli;
2635         struct osc_async_page *oap;
2636         struct loi_oap_pages *lop;
2637         int rc = 0;
2638         ENTRY;
2639
2640         oap = oap_from_cookie(cookie);
2641         if (IS_ERR(oap))
2642                 RETURN(PTR_ERR(oap));
2643
2644         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2645                 RETURN(-EIO);
2646
2647         if (!list_empty(&oap->oap_pending_item) ||
2648             !list_empty(&oap->oap_urgent_item) ||
2649             !list_empty(&oap->oap_rpc_item))
2650                 RETURN(-EBUSY);
2651
2652         if (loi == NULL)
2653                 loi = lsm->lsm_oinfo[0];
2654
2655         client_obd_list_lock(&cli->cl_loi_list_lock);
2656
2657         oap->oap_cmd = cmd;
2658         oap->oap_page_off = off;
2659         oap->oap_count = count;
2660         oap->oap_brw_flags = brw_flags;
2661         oap->oap_async_flags = async_flags;
2662
2663         if (cmd & OBD_BRW_WRITE)
2664                 lop = &loi->loi_write_lop;
2665         else
2666                 lop = &loi->loi_read_lop;
2667
2668         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2669         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2670                 oap->oap_oig = oig;
2671                 rc = oig_add_one(oig, &oap->oap_occ);
2672         }
2673
2674         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2675                   oap, oap->oap_page, rc);
2676
2677         client_obd_list_unlock(&cli->cl_loi_list_lock);
2678
2679         RETURN(rc);
2680 }
2681
2682 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2683                                  struct loi_oap_pages *lop, int cmd)
2684 {
2685         struct list_head *pos, *tmp;
2686         struct osc_async_page *oap;
2687
2688         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2689                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2690                 list_del(&oap->oap_pending_item);
2691                 osc_oap_to_pending(oap);
2692         }
2693         loi_list_maint(cli, loi);
2694 }
2695
2696 static int osc_trigger_group_io(struct obd_export *exp,
2697                                 struct lov_stripe_md *lsm,
2698                                 struct lov_oinfo *loi,
2699                                 struct obd_io_group *oig)
2700 {
2701         struct client_obd *cli = &exp->exp_obd->u.cli;
2702         ENTRY;
2703
2704         if (loi == NULL)
2705                 loi = lsm->lsm_oinfo[0];
2706
2707         client_obd_list_lock(&cli->cl_loi_list_lock);
2708
2709         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2710         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2711
2712         osc_check_rpcs(cli);
2713         client_obd_list_unlock(&cli->cl_loi_list_lock);
2714
2715         RETURN(0);
2716 }
2717
2718 static int osc_teardown_async_page(struct obd_export *exp,
2719                                    struct lov_stripe_md *lsm,
2720                                    struct lov_oinfo *loi, void *cookie)
2721 {
2722         struct client_obd *cli = &exp->exp_obd->u.cli;
2723         struct loi_oap_pages *lop;
2724         struct osc_async_page *oap;
2725         int rc = 0;
2726         ENTRY;
2727
2728         oap = oap_from_cookie(cookie);
2729         if (IS_ERR(oap))
2730                 RETURN(PTR_ERR(oap));
2731
2732         if (loi == NULL)
2733                 loi = lsm->lsm_oinfo[0];
2734
2735         if (oap->oap_cmd & OBD_BRW_WRITE) {
2736                 lop = &loi->loi_write_lop;
2737         } else {
2738                 lop = &loi->loi_read_lop;
2739         }
2740
2741         client_obd_list_lock(&cli->cl_loi_list_lock);
2742
2743         if (!list_empty(&oap->oap_rpc_item))
2744                 GOTO(out, rc = -EBUSY);
2745
2746         osc_exit_cache(cli, oap, 0);
2747         osc_wake_cache_waiters(cli);
2748
2749         if (!list_empty(&oap->oap_urgent_item)) {
2750                 list_del_init(&oap->oap_urgent_item);
2751                 oap->oap_async_flags &= ~ASYNC_URGENT;
2752         }
2753         if (!list_empty(&oap->oap_pending_item)) {
2754                 list_del_init(&oap->oap_pending_item);
2755                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2756         }
2757         loi_list_maint(cli, loi);
2758
2759         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2760 out:
2761         client_obd_list_unlock(&cli->cl_loi_list_lock);
2762         RETURN(rc);
2763 }
2764
2765 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2766                                     int flags)
2767 {
2768         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2769
2770         if (lock == NULL) {
2771                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2772                 return;
2773         }
2774         lock_res_and_lock(lock);
2775 #if defined (__KERNEL__) && defined (__linux__)
2776         /* Liang XXX: Darwin and Winnt checking should be added */
2777         if (lock->l_ast_data && lock->l_ast_data != data) {
2778                 struct inode *new_inode = data;
2779                 struct inode *old_inode = lock->l_ast_data;
2780                 if (!(old_inode->i_state & I_FREEING))
2781                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2782                 LASSERTF(old_inode->i_state & I_FREEING,
2783                          "Found existing inode %p/%lu/%u state %lu in lock: "
2784                          "setting data to %p/%lu/%u\n", old_inode,
2785                          old_inode->i_ino, old_inode->i_generation,
2786                          old_inode->i_state,
2787                          new_inode, new_inode->i_ino, new_inode->i_generation);
2788         }
2789 #endif
2790         lock->l_ast_data = data;
2791         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2792         unlock_res_and_lock(lock);
2793         LDLM_LOCK_PUT(lock);
2794 }
2795
2796 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2797                              ldlm_iterator_t replace, void *data)
2798 {
2799         struct ldlm_res_id res_id = { .name = {0} };
2800         struct obd_device *obd = class_exp2obd(exp);
2801
2802         res_id.name[0] = lsm->lsm_object_id;
2803         res_id.name[2] = lsm->lsm_object_gr;
2804
2805         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2806         return 0;
2807 }
2808
2809 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2810                             int intent, int rc)
2811 {
2812         ENTRY;
2813
2814         if (intent) {
2815                 /* The request was created before ldlm_cli_enqueue call. */
2816                 if (rc == ELDLM_LOCK_ABORTED) {
2817                         struct ldlm_reply *rep;
2818
2819                         /* swabbed by ldlm_cli_enqueue() */
2820                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2821                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2822                                              sizeof(*rep));
2823                         LASSERT(rep != NULL);
2824                         if (rep->lock_policy_res1)
2825                                 rc = rep->lock_policy_res1;
2826                 }
2827         }
2828
2829         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2830                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2831                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2832                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2833                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2834         }
2835
2836         /* Call the update callback. */
2837         rc = oinfo->oi_cb_up(oinfo, rc);
2838         RETURN(rc);
2839 }
2840
2841 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2842                                  struct osc_enqueue_args *aa, int rc)
2843 {
2844         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2845         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2846         struct ldlm_lock *lock;
2847
2848         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2849          * be valid. */
2850         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2851
2852         /* Complete obtaining the lock procedure. */
2853         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2854                                    aa->oa_ei->ei_mode,
2855                                    &aa->oa_oi->oi_flags,
2856                                    &lsm->lsm_oinfo[0]->loi_lvb,
2857                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2858                                    lustre_swab_ost_lvb,
2859                                    aa->oa_oi->oi_lockh, rc);
2860
2861         /* Complete osc stuff. */
2862         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2863
2864         /* Release the lock for async request. */
2865         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2866                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2867
2868         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2869                  aa->oa_oi->oi_lockh, req, aa);
2870         LDLM_LOCK_PUT(lock);
2871         return rc;
2872 }
2873
2874 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2875  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2876  * other synchronous requests, however keeping some locks and trying to obtain
2877  * others may take a considerable amount of time in a case of ost failure; and
2878  * when other sync requests do not get released lock from a client, the client
2879  * is excluded from the cluster -- such scenarious make the life difficult, so
2880  * release locks just after they are obtained. */
2881 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2882                        struct ldlm_enqueue_info *einfo,
2883                        struct ptlrpc_request_set *rqset)
2884 {
2885         struct ldlm_res_id res_id = { .name = {0} };
2886         struct obd_device *obd = exp->exp_obd;
2887         struct ldlm_reply *rep;
2888         struct ptlrpc_request *req = NULL;
2889         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2890         ldlm_mode_t mode;
2891         int rc;
2892         ENTRY;
2893
2894         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2895         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2896
2897         /* Filesystem lock extents are extended to page boundaries so that
2898          * dealing with the page cache is a little smoother.  */
2899         oinfo->oi_policy.l_extent.start -=
2900                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2901         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2902
2903         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2904                 goto no_match;
2905
2906         /* Next, search for already existing extent locks that will cover us */
2907         /* If we're trying to read, we also search for an existing PW lock.  The
2908          * VFS and page cache already protect us locally, so lots of readers/
2909          * writers can share a single PW lock.
2910          *
2911          * There are problems with conversion deadlocks, so instead of
2912          * converting a read lock to a write lock, we'll just enqueue a new
2913          * one.
2914          *
2915          * At some point we should cancel the read lock instead of making them
2916          * send us a blocking callback, but there are problems with canceling
2917          * locks out from other users right now, too. */
2918         mode = einfo->ei_mode;
2919         if (einfo->ei_mode == LCK_PR)
2920                 mode |= LCK_PW;
2921         mode = ldlm_lock_match(obd->obd_namespace,
2922                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2923                                einfo->ei_type, &oinfo->oi_policy, mode,
2924                                oinfo->oi_lockh);
2925         if (mode) {
2926                 /* addref the lock only if not async requests and PW lock is
2927                  * matched whereas we asked for PR. */
2928                 if (!rqset && einfo->ei_mode != mode)
2929                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2930                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2931                                         oinfo->oi_flags);
2932                 if (intent) {
2933                         /* I would like to be able to ASSERT here that rss <=
2934                          * kms, but I can't, for reasons which are explained in
2935                          * lov_enqueue() */
2936                 }
2937
2938                 /* We already have a lock, and it's referenced */
2939                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2940
2941                 /* For async requests, decref the lock. */
2942                 if (einfo->ei_mode != mode)
2943                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2944                 else if (rqset)
2945                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2946
2947                 RETURN(ELDLM_OK);
2948         }
2949
2950  no_match:
2951         if (intent) {
2952                 int size[3] = {
2953                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2954                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2955                         [DLM_LOCKREQ_OFF + 1] = 0 };
2956
2957                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2958                 if (req == NULL)
2959                         RETURN(-ENOMEM);
2960
2961                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2962                 size[DLM_REPLY_REC_OFF] =
2963                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2964                 ptlrpc_req_set_repsize(req, 3, size);
2965         }
2966
2967         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2968         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2969
2970         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2971                               &oinfo->oi_policy, &oinfo->oi_flags,
2972                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2973                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2974                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2975                               rqset ? 1 : 0);
2976         if (rqset) {
2977                 if (!rc) {
2978                         struct osc_enqueue_args *aa;
2979                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2980                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2981                         aa->oa_oi = oinfo;
2982                         aa->oa_ei = einfo;
2983                         aa->oa_exp = exp;
2984
2985                         req->rq_interpret_reply = osc_enqueue_interpret;
2986                         ptlrpc_set_add_req(rqset, req);
2987                 } else if (intent) {
2988                         ptlrpc_req_finished(req);
2989                 }
2990                 RETURN(rc);
2991         }
2992
2993         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2994         if (intent)
2995                 ptlrpc_req_finished(req);
2996
2997         RETURN(rc);
2998 }
2999
3000 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3001                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3002                      int *flags, void *data, struct lustre_handle *lockh)
3003 {
3004         struct ldlm_res_id res_id = { .name = {0} };
3005         struct obd_device *obd = exp->exp_obd;
3006         int lflags = *flags;
3007         ldlm_mode_t rc;
3008         ENTRY;
3009
3010         res_id.name[0] = lsm->lsm_object_id;
3011         res_id.name[2] = lsm->lsm_object_gr;
3012
3013         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3014                 RETURN(-EIO);
3015
3016         /* Filesystem lock extents are extended to page boundaries so that
3017          * dealing with the page cache is a little smoother */
3018         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3019         policy->l_extent.end |= ~CFS_PAGE_MASK;
3020
3021         /* Next, search for already existing extent locks that will cover us */
3022         /* If we're trying to read, we also search for an existing PW lock.  The
3023          * VFS and page cache already protect us locally, so lots of readers/
3024          * writers can share a single PW lock. */
3025         rc = mode;
3026         if (mode == LCK_PR)
3027                 rc |= LCK_PW;
3028         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3029                              &res_id, type, policy, rc, lockh);
3030         if (rc) {
3031                 osc_set_data_with_check(lockh, data, lflags);
3032                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3033                         ldlm_lock_addref(lockh, LCK_PR);
3034                         ldlm_lock_decref(lockh, LCK_PW);
3035                 }
3036                 RETURN(rc);
3037         }
3038         RETURN(rc);
3039 }
3040
3041 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3042                       __u32 mode, struct lustre_handle *lockh)
3043 {
3044         ENTRY;
3045
3046         if (unlikely(mode == LCK_GROUP))
3047                 ldlm_lock_decref_and_cancel(lockh, mode);
3048         else
3049                 ldlm_lock_decref(lockh, mode);
3050
3051         RETURN(0);
3052 }
3053
3054 static int osc_cancel_unused(struct obd_export *exp,
3055                              struct lov_stripe_md *lsm, int flags,
3056                              void *opaque)
3057 {
3058         struct obd_device *obd = class_exp2obd(exp);
3059         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3060
3061         if (lsm != NULL) {
3062                 res_id.name[0] = lsm->lsm_object_id;
3063                 res_id.name[2] = lsm->lsm_object_gr;
3064                 resp = &res_id;
3065         }
3066
3067         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3068 }
3069
3070 static int osc_join_lru(struct obd_export *exp,
3071                         struct lov_stripe_md *lsm, int join)
3072 {
3073         struct obd_device *obd = class_exp2obd(exp);
3074         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3075
3076         if (lsm != NULL) {
3077                 res_id.name[0] = lsm->lsm_object_id;
3078                 res_id.name[2] = lsm->lsm_object_gr;
3079                 resp = &res_id;
3080         }
3081
3082         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3083 }
3084
3085 static int osc_statfs_interpret(struct ptlrpc_request *req,
3086                                 struct osc_async_args *aa, int rc)
3087 {
3088         struct obd_statfs *msfs;
3089         ENTRY;
3090
3091         if (rc != 0)
3092                 GOTO(out, rc);
3093
3094         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3095                                   lustre_swab_obd_statfs);
3096         if (msfs == NULL) {
3097                 CERROR("Can't unpack obd_statfs\n");
3098                 GOTO(out, rc = -EPROTO);
3099         }
3100
3101         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3102 out:
3103         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3104         RETURN(rc);
3105 }
3106
3107 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3108                             __u64 max_age, struct ptlrpc_request_set *rqset)
3109 {
3110         struct ptlrpc_request *req;
3111         struct osc_async_args *aa;
3112         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3113         ENTRY;
3114
3115         /* We could possibly pass max_age in the request (as an absolute
3116          * timestamp or a "seconds.usec ago") so the target can avoid doing
3117          * extra calls into the filesystem if that isn't necessary (e.g.
3118          * during mount that would help a bit).  Having relative timestamps
3119          * is not so great if request processing is slow, while absolute
3120          * timestamps are not ideal because they need time synchronization. */
3121         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3122                               OST_STATFS, 1, NULL, NULL);
3123         if (!req)
3124                 RETURN(-ENOMEM);
3125
3126         ptlrpc_req_set_repsize(req, 2, size);
3127         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3128
3129         req->rq_interpret_reply = osc_statfs_interpret;
3130         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3131         aa = (struct osc_async_args *)&req->rq_async_args;
3132         aa->aa_oi = oinfo;
3133
3134         ptlrpc_set_add_req(rqset, req);
3135         RETURN(0);
3136 }
3137
3138 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3139                       __u64 max_age)
3140 {
3141         struct obd_statfs *msfs;
3142         struct ptlrpc_request *req;
3143         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3144         ENTRY;
3145
3146         /* We could possibly pass max_age in the request (as an absolute
3147          * timestamp or a "seconds.usec ago") so the target can avoid doing
3148          * extra calls into the filesystem if that isn't necessary (e.g.
3149          * during mount that would help a bit).  Having relative timestamps
3150          * is not so great if request processing is slow, while absolute
3151          * timestamps are not ideal because they need time synchronization. */
3152         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3153                               OST_STATFS, 1, NULL, NULL);
3154         if (!req)
3155                 RETURN(-ENOMEM);
3156
3157         ptlrpc_req_set_repsize(req, 2, size);
3158         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3159
3160         rc = ptlrpc_queue_wait(req);
3161         if (rc)
3162                 GOTO(out, rc);
3163
3164         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3165                                   lustre_swab_obd_statfs);
3166         if (msfs == NULL) {
3167                 CERROR("Can't unpack obd_statfs\n");
3168                 GOTO(out, rc = -EPROTO);
3169         }
3170
3171         memcpy(osfs, msfs, sizeof(*osfs));
3172
3173         EXIT;
3174  out:
3175         ptlrpc_req_finished(req);
3176         return rc;
3177 }
3178
3179 /* Retrieve object striping information.
3180  *
3181  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3182  * the maximum number of OST indices which will fit in the user buffer.
3183  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3184  */
3185 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3186 {
3187         struct lov_user_md lum, *lumk;
3188         int rc = 0, lum_size;
3189         ENTRY;
3190
3191         if (!lsm)
3192                 RETURN(-ENODATA);
3193
3194         if (copy_from_user(&lum, lump, sizeof(lum)))
3195                 RETURN(-EFAULT);
3196
3197         if (lum.lmm_magic != LOV_USER_MAGIC)
3198                 RETURN(-EINVAL);
3199
3200         if (lum.lmm_stripe_count > 0) {
3201                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3202                 OBD_ALLOC(lumk, lum_size);
3203                 if (!lumk)
3204                         RETURN(-ENOMEM);
3205
3206                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3207                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3208         } else {
3209                 lum_size = sizeof(lum);
3210                 lumk = &lum;
3211         }
3212
3213         lumk->lmm_object_id = lsm->lsm_object_id;
3214         lumk->lmm_object_gr = lsm->lsm_object_gr;
3215         lumk->lmm_stripe_count = 1;
3216
3217         if (copy_to_user(lump, lumk, lum_size))
3218                 rc = -EFAULT;
3219
3220         if (lumk != &lum)
3221                 OBD_FREE(lumk, lum_size);
3222
3223         RETURN(rc);
3224 }
3225
3226
3227 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3228                          void *karg, void *uarg)
3229 {
3230         struct obd_device *obd = exp->exp_obd;
3231         struct obd_ioctl_data *data = karg;
3232         int err = 0;
3233         ENTRY;
3234
3235         if (!try_module_get(THIS_MODULE)) {
3236                 CERROR("Can't get module. Is it alive?");
3237                 return -EINVAL;
3238         }
3239         switch (cmd) {
3240         case OBD_IOC_LOV_GET_CONFIG: {
3241                 char *buf;
3242                 struct lov_desc *desc;
3243                 struct obd_uuid uuid;
3244
3245                 buf = NULL;
3246                 len = 0;
3247                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3248                         GOTO(out, err = -EINVAL);
3249
3250                 data = (struct obd_ioctl_data *)buf;
3251
3252                 if (sizeof(*desc) > data->ioc_inllen1) {
3253                         obd_ioctl_freedata(buf, len);
3254                         GOTO(out, err = -EINVAL);
3255                 }
3256
3257                 if (data->ioc_inllen2 < sizeof(uuid)) {
3258                         obd_ioctl_freedata(buf, len);
3259                         GOTO(out, err = -EINVAL);
3260                 }
3261
3262                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3263                 desc->ld_tgt_count = 1;
3264                 desc->ld_active_tgt_count = 1;
3265                 desc->ld_default_stripe_count = 1;
3266                 desc->ld_default_stripe_size = 0;
3267                 desc->ld_default_stripe_offset = 0;
3268                 desc->ld_pattern = 0;
3269                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3270
3271                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3272
3273                 err = copy_to_user((void *)uarg, buf, len);
3274                 if (err)
3275                         err = -EFAULT;
3276                 obd_ioctl_freedata(buf, len);
3277                 GOTO(out, err);
3278         }
3279         case LL_IOC_LOV_SETSTRIPE:
3280                 err = obd_alloc_memmd(exp, karg);
3281                 if (err > 0)
3282                         err = 0;
3283                 GOTO(out, err);
3284         case LL_IOC_LOV_GETSTRIPE:
3285                 err = osc_getstripe(karg, uarg);
3286                 GOTO(out, err);
3287         case OBD_IOC_CLIENT_RECOVER:
3288                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3289                                             data->ioc_inlbuf1);
3290                 if (err > 0)
3291                         err = 0;
3292                 GOTO(out, err);
3293         case IOC_OSC_SET_ACTIVE:
3294                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3295                                                data->ioc_offset);
3296                 GOTO(out, err);
3297         case OBD_IOC_POLL_QUOTACHECK:
3298                 err = lquota_poll_check(quota_interface, exp,
3299                                         (struct if_quotacheck *)karg);
3300                 GOTO(out, err);
3301         default:
3302                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3303                        cmd, cfs_curproc_comm());
3304                 GOTO(out, err = -ENOTTY);
3305         }
3306 out:
3307         module_put(THIS_MODULE);
3308         return err;
3309 }
3310
3311 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3312                         void *key, __u32 *vallen, void *val)
3313 {
3314         ENTRY;
3315         if (!vallen || !val)
3316                 RETURN(-EFAULT);
3317
3318         if (KEY_IS("lock_to_stripe")) {
3319                 __u32 *stripe = val;
3320                 *vallen = sizeof(*stripe);
3321                 *stripe = 0;
3322                 RETURN(0);
3323         } else if (KEY_IS("last_id")) {
3324                 struct ptlrpc_request *req;
3325                 obd_id *reply;
3326                 char *bufs[2] = { NULL, key };
3327                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3328
3329                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3330                                       OST_GET_INFO, 2, size, bufs);
3331                 if (req == NULL)
3332                         RETURN(-ENOMEM);
3333
3334                 size[REPLY_REC_OFF] = *vallen;
3335                 ptlrpc_req_set_repsize(req, 2, size);
3336                 rc = ptlrpc_queue_wait(req);
3337                 if (rc)
3338                         GOTO(out, rc);
3339
3340                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3341                                            lustre_swab_ost_last_id);
3342                 if (reply == NULL) {
3343                         CERROR("Can't unpack OST last ID\n");
3344                         GOTO(out, rc = -EPROTO);
3345                 }
3346                 *((obd_id *)val) = *reply;
3347         out:
3348                 ptlrpc_req_finished(req);
3349                 RETURN(rc);
3350         }
3351         RETURN(-EINVAL);
3352 }
3353
3354 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3355                                           void *aa, int rc)
3356 {
3357         struct llog_ctxt *ctxt;
3358         struct obd_import *imp = req->rq_import;
3359         ENTRY;
3360
3361         if (rc != 0)
3362                 RETURN(rc);
3363
3364         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3365         if (ctxt) {
3366                 if (rc == 0)
3367                         rc = llog_initiator_connect(ctxt);
3368                 else
3369                         CERROR("cannot establish connection for "
3370                                "ctxt %p: %d\n", ctxt, rc);
3371         }
3372
3373         spin_lock(&imp->imp_lock);
3374         imp->imp_server_timeout = 1;
3375         imp->imp_pingable = 1;
3376         spin_unlock(&imp->imp_lock);
3377         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3378
3379         RETURN(rc);
3380 }
3381
3382 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3383                               void *key, obd_count vallen, void *val,
3384                               struct ptlrpc_request_set *set)
3385 {
3386         struct ptlrpc_request *req;
3387         struct obd_device  *obd = exp->exp_obd;
3388         struct obd_import *imp = class_exp2cliimp(exp);
3389         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3390         char *bufs[3] = { NULL, key, val };
3391         ENTRY;
3392
3393         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3394
3395         if (KEY_IS(KEY_NEXT_ID)) {
3396                 if (vallen != sizeof(obd_id))
3397                         RETURN(-EINVAL);
3398                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3399                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3400                        exp->exp_obd->obd_name,
3401                        obd->u.cli.cl_oscc.oscc_next_id);
3402
3403                 RETURN(0);
3404         }
3405
3406         if (KEY_IS("unlinked")) {
3407                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3408                 spin_lock(&oscc->oscc_lock);
3409                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3410                 spin_unlock(&oscc->oscc_lock);
3411                 RETURN(0);
3412         }
3413
3414         if (KEY_IS(KEY_INIT_RECOV)) {
3415                 if (vallen != sizeof(int))
3416                         RETURN(-EINVAL);
3417                 spin_lock(&imp->imp_lock);
3418                 imp->imp_initial_recov = *(int *)val;
3419                 spin_unlock(&imp->imp_lock);
3420                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3421                        exp->exp_obd->obd_name,
3422                        imp->imp_initial_recov);
3423                 RETURN(0);
3424         }
3425
3426         if (KEY_IS("checksum")) {
3427                 if (vallen != sizeof(int))
3428                         RETURN(-EINVAL);
3429                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3430                 RETURN(0);
3431         }
3432
3433         if (KEY_IS(KEY_FLUSH_CTX)) {
3434                 sptlrpc_import_flush_my_ctx(imp);
3435                 RETURN(0);
3436         }
3437
3438         if (!set)
3439                 RETURN(-EINVAL);
3440
3441         /* We pass all other commands directly to OST. Since nobody calls osc
3442            methods directly and everybody is supposed to go through LOV, we
3443            assume lov checked invalid values for us.
3444            The only recognised values so far are evict_by_nid and mds_conn.
3445            Even if something bad goes through, we'd get a -EINVAL from OST
3446            anyway. */
3447
3448         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3449                               bufs);
3450         if (req == NULL)
3451                 RETURN(-ENOMEM);
3452
3453         if (KEY_IS(KEY_MDS_CONN)) {
3454                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3455
3456                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3457                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3458                 LASSERT(oscc->oscc_oa.o_gr > 0);
3459                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3460         }
3461
3462         ptlrpc_req_set_repsize(req, 1, NULL);
3463         ptlrpc_set_add_req(set, req);
3464         ptlrpc_check_set(set);
3465
3466         RETURN(0);
3467 }
3468
3469
3470 static struct llog_operations osc_size_repl_logops = {
3471         lop_cancel: llog_obd_repl_cancel
3472 };
3473
3474 static struct llog_operations osc_mds_ost_orig_logops;
3475 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3476                          struct obd_device *tgt, int count,
3477                          struct llog_catid *catid, struct obd_uuid *uuid)
3478 {
3479         int rc;
3480         ENTRY;
3481
3482         spin_lock(&obd->obd_dev_lock);
3483         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3484                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3485                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3486                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3487                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3488                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3489         }
3490         spin_unlock(&obd->obd_dev_lock);
3491
3492         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3493                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3494         if (rc) {
3495                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3496                 GOTO (out, rc);
3497         }
3498
3499         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3500                         &osc_size_repl_logops);
3501         if (rc)
3502                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3503 out:
3504         if (rc) {
3505                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3506                        obd->obd_name, tgt->obd_name, count, catid, rc);
3507                 CERROR("logid "LPX64":0x%x\n",
3508                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3509         }
3510         RETURN(rc);
3511 }
3512
3513 static int osc_llog_finish(struct obd_device *obd, int count)
3514 {
3515         struct llog_ctxt *ctxt;
3516         int rc = 0, rc2 = 0;
3517         ENTRY;
3518
3519         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3520         if (ctxt)
3521                 rc = llog_cleanup(ctxt);
3522
3523         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3524         if (ctxt)
3525                 rc2 = llog_cleanup(ctxt);
3526         if (!rc)
3527                 rc = rc2;
3528
3529         RETURN(rc);
3530 }
3531
3532 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3533                          struct obd_uuid *cluuid,
3534                          struct obd_connect_data *data)
3535 {
3536         struct client_obd *cli = &obd->u.cli;
3537
3538         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3539                 long lost_grant;
3540
3541                 client_obd_list_lock(&cli->cl_loi_list_lock);
3542                 data->ocd_grant = cli->cl_avail_grant ?:
3543                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3544                 lost_grant = cli->cl_lost_grant;
3545                 cli->cl_lost_grant = 0;
3546                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3547
3548                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3549                        "cl_lost_grant: %ld\n", data->ocd_grant,
3550                        cli->cl_avail_grant, lost_grant);
3551                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3552                        " ocd_grant: %d\n", data->ocd_connect_flags,
3553                        data->ocd_version, data->ocd_grant);
3554         }
3555
3556         RETURN(0);
3557 }
3558
3559 static int osc_disconnect(struct obd_export *exp)
3560 {
3561         struct obd_device *obd = class_exp2obd(exp);
3562         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3563         int rc;
3564
3565         if (obd->u.cli.cl_conn_count == 1)
3566                 /* flush any remaining cancel messages out to the target */
3567                 llog_sync(ctxt, exp);
3568
3569         rc = client_disconnect_export(exp);
3570         return rc;
3571 }
3572
3573 static int osc_import_event(struct obd_device *obd,
3574                             struct obd_import *imp,
3575                             enum obd_import_event event)
3576 {
3577         struct client_obd *cli;
3578         int rc = 0;
3579
3580         ENTRY;
3581         LASSERT(imp->imp_obd == obd);
3582
3583         switch (event) {
3584         case IMP_EVENT_DISCON: {
3585                 /* Only do this on the MDS OSC's */
3586                 if (imp->imp_server_timeout) {
3587                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3588
3589                         spin_lock(&oscc->oscc_lock);
3590                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3591                         spin_unlock(&oscc->oscc_lock);
3592                 }
3593                 cli = &obd->u.cli;
3594                 client_obd_list_lock(&cli->cl_loi_list_lock);
3595                 cli->cl_avail_grant = 0;
3596                 cli->cl_lost_grant = 0;
3597                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3598                 break;
3599         }
3600         case IMP_EVENT_INACTIVE: {
3601                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3602                 break;
3603         }
3604         case IMP_EVENT_INVALIDATE: {
3605                 struct ldlm_namespace *ns = obd->obd_namespace;
3606
3607                 /* Reset grants */
3608                 cli = &obd->u.cli;
3609                 client_obd_list_lock(&cli->cl_loi_list_lock);
3610                 /* all pages go to failing rpcs due to the invalid import */
3611                 osc_check_rpcs(cli);
3612                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3613
3614                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3615
3616                 break;
3617         }
3618         case IMP_EVENT_ACTIVE: {
3619                 /* Only do this on the MDS OSC's */
3620                 if (imp->imp_server_timeout) {
3621                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3622
3623                         spin_lock(&oscc->oscc_lock);
3624                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3625                         spin_unlock(&oscc->oscc_lock);
3626                 }
3627                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3628                 break;
3629         }
3630         case IMP_EVENT_OCD: {
3631                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3632
3633                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3634                         osc_init_grant(&obd->u.cli, ocd);
3635
3636                 /* See bug 7198 */
3637                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3638                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3639
3640                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3641                 break;
3642         }
3643         default:
3644                 CERROR("Unknown import event %d\n", event);
3645                 LBUG();
3646         }
3647         RETURN(rc);
3648 }
3649
3650 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3651 {
3652         int rc;
3653         ENTRY;
3654
3655         ENTRY;
3656         rc = ptlrpcd_addref();
3657         if (rc)
3658                 RETURN(rc);
3659
3660         rc = client_obd_setup(obd, lcfg);
3661         if (rc) {
3662                 ptlrpcd_decref();
3663         } else {
3664                 struct lprocfs_static_vars lvars;
3665                 struct client_obd *cli = &obd->u.cli;
3666
3667                 lprocfs_init_vars(osc, &lvars);
3668                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3669                         lproc_osc_attach_seqstat(obd);
3670                         ptlrpc_lprocfs_register_obd(obd);
3671                 }
3672
3673                 oscc_init(obd);
3674                 /* We need to allocate a few requests more, because
3675                    brw_interpret_oap tries to create new requests before freeing
3676                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3677                    reserved, but I afraid that might be too much wasted RAM
3678                    in fact, so 2 is just my guess and still should work. */
3679                 cli->cl_import->imp_rq_pool =
3680                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3681                                             OST_MAXREQSIZE,
3682                                             ptlrpc_add_rqs_to_pool);
3683         }
3684
3685         RETURN(rc);
3686 }
3687
3688 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3689 {
3690         int rc = 0;
3691         ENTRY;
3692
3693         switch (stage) {
3694         case OBD_CLEANUP_EARLY: {
3695                 struct obd_import *imp;
3696                 imp = obd->u.cli.cl_import;
3697                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3698                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3699                 ptlrpc_deactivate_import(imp);
3700                 spin_lock(&imp->imp_lock);
3701                 imp->imp_pingable = 0;
3702                 spin_unlock(&imp->imp_lock);
3703                 break;
3704         }
3705         case OBD_CLEANUP_EXPORTS: {
3706                 /* If we set up but never connected, the
3707                    client import will not have been cleaned. */
3708                 if (obd->u.cli.cl_import) {
3709                         struct obd_import *imp;
3710                         imp = obd->u.cli.cl_import;
3711                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3712                                obd->obd_name);
3713                         ptlrpc_invalidate_import(imp);
3714                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3715                         class_destroy_import(imp);
3716                         obd->u.cli.cl_import = NULL;
3717                 }
3718                 break;
3719         }
3720         case OBD_CLEANUP_SELF_EXP:
3721                 rc = obd_llog_finish(obd, 0);
3722                 if (rc != 0)
3723                         CERROR("failed to cleanup llogging subsystems\n");
3724                 break;
3725         case OBD_CLEANUP_OBD:
3726                 break;
3727         }
3728         RETURN(rc);
3729 }
3730
3731 int osc_cleanup(struct obd_device *obd)
3732 {
3733         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3734         int rc;
3735
3736         ENTRY;
3737         ptlrpc_lprocfs_unregister_obd(obd);
3738         lprocfs_obd_cleanup(obd);
3739
3740         spin_lock(&oscc->oscc_lock);
3741         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3742         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3743         spin_unlock(&oscc->oscc_lock);
3744
3745         /* free memory of osc quota cache */
3746         lquota_cleanup(quota_interface, obd);
3747
3748         rc = client_obd_cleanup(obd);
3749
3750         ptlrpcd_decref();
3751         RETURN(rc);
3752 }
3753
3754 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3755 {
3756         struct lustre_cfg *lcfg = buf;
3757         struct lprocfs_static_vars lvars;
3758         int rc = 0;
3759
3760         lprocfs_init_vars(osc, &lvars);
3761
3762         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3763         return(rc);
3764 }
3765
3766 struct obd_ops osc_obd_ops = {
3767         .o_owner                = THIS_MODULE,
3768         .o_setup                = osc_setup,
3769         .o_precleanup           = osc_precleanup,
3770         .o_cleanup              = osc_cleanup,
3771         .o_add_conn             = client_import_add_conn,
3772         .o_del_conn             = client_import_del_conn,
3773         .o_connect              = client_connect_import,
3774         .o_reconnect            = osc_reconnect,
3775         .o_disconnect           = osc_disconnect,
3776         .o_statfs               = osc_statfs,
3777         .o_statfs_async         = osc_statfs_async,
3778         .o_packmd               = osc_packmd,
3779         .o_unpackmd             = osc_unpackmd,
3780         .o_precreate            = osc_precreate,
3781         .o_create               = osc_create,
3782         .o_destroy              = osc_destroy,
3783         .o_getattr              = osc_getattr,
3784         .o_getattr_async        = osc_getattr_async,
3785         .o_setattr              = osc_setattr,
3786         .o_setattr_async        = osc_setattr_async,
3787         .o_brw                  = osc_brw,
3788         .o_brw_async            = osc_brw_async,
3789         .o_prep_async_page      = osc_prep_async_page,
3790         .o_queue_async_io       = osc_queue_async_io,
3791         .o_set_async_flags      = osc_set_async_flags,
3792         .o_queue_group_io       = osc_queue_group_io,
3793         .o_trigger_group_io     = osc_trigger_group_io,
3794         .o_teardown_async_page  = osc_teardown_async_page,
3795         .o_punch                = osc_punch,
3796         .o_sync                 = osc_sync,
3797         .o_enqueue              = osc_enqueue,
3798         .o_match                = osc_match,
3799         .o_change_cbdata        = osc_change_cbdata,
3800         .o_cancel               = osc_cancel,
3801         .o_cancel_unused        = osc_cancel_unused,
3802         .o_join_lru             = osc_join_lru,
3803         .o_iocontrol            = osc_iocontrol,
3804         .o_get_info             = osc_get_info,
3805         .o_set_info_async       = osc_set_info_async,
3806         .o_import_event         = osc_import_event,
3807         .o_llog_init            = osc_llog_init,
3808         .o_llog_finish          = osc_llog_finish,
3809         .o_process_config       = osc_process_config,
3810 };
3811 int __init osc_init(void)
3812 {
3813         struct lprocfs_static_vars lvars;
3814         int rc;
3815         ENTRY;
3816
3817         lprocfs_init_vars(osc, &lvars);
3818
3819         request_module("lquota");
3820         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3821         lquota_init(quota_interface);
3822         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3823
3824         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3825                                  LUSTRE_OSC_NAME, NULL);
3826         if (rc) {
3827                 if (quota_interface)
3828                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3829                 RETURN(rc);
3830         }
3831
3832         RETURN(rc);
3833 }
3834
3835 #ifdef __KERNEL__
3836 static void /*__exit*/ osc_exit(void)
3837 {
3838         lquota_exit(quota_interface);
3839         if (quota_interface)
3840                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3841
3842         class_unregister_type(LUSTRE_OSC_NAME);
3843 }
3844
3845 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3846 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3847 MODULE_LICENSE("GPL");
3848
3849 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3850 #endif