Whamcloud - gitweb
8404c52c64b8294dcd042877b62d43a6de60aeb3
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE(*lsmp, lsm_size);
126                 *lsmp = NULL;
127                 RETURN(0);
128         }
129
130         if (*lsmp == NULL) {
131                 OBD_ALLOC(*lsmp, lsm_size);
132                 if (*lsmp == NULL)
133                         RETURN(-ENOMEM);
134                 loi_init((*lsmp)->lsm_oinfo);
135         }
136
137         if (lmm != NULL) {
138                 /* XXX zero *lsmp? */
139                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
140                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
141                 LASSERT((*lsmp)->lsm_object_id);
142                 LASSERT((*lsmp)->lsm_object_gr);
143         }
144
145         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
146
147         RETURN(lsm_size);
148 }
149
150 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
151                                  struct ost_body *body, void *capa)
152 {
153         struct obd_capa *oc = (struct obd_capa *)capa;
154         struct lustre_capa *c;
155
156         if (!capa)
157                 return;
158
159         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
160         LASSERT(c);
161         capa_cpy(c, oc);
162         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
163         DEBUG_CAPA(D_SEC, c, "pack");
164 }
165
166 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
167                                      struct obd_info *oinfo)
168 {
169         struct ost_body *body;
170
171         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
172         body->oa = *oinfo->oi_oa;
173         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
174 }
175
176 static int osc_getattr_interpret(struct ptlrpc_request *req,
177                                  struct osc_async_args *aa, int rc)
178 {
179         struct ost_body *body;
180         ENTRY;
181
182         if (rc != 0)
183                 GOTO(out, rc);
184
185         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
186                                   lustre_swab_ost_body);
187         if (body) {
188                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
189                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
190
191                 /* This should really be sent by the OST */
192                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
193                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
194         } else {
195                 CERROR("can't unpack ost_body\n");
196                 rc = -EPROTO;
197                 aa->aa_oi->oi_oa->o_valid = 0;
198         }
199 out:
200         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
201         RETURN(rc);
202 }
203
204 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
205                              struct ptlrpc_request_set *set)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body *body;
209         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
210         struct osc_async_args *aa;
211         ENTRY;
212
213         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
214         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
215                               OST_GETATTR, 3, size,NULL);
216         if (!req)
217                 RETURN(-ENOMEM);
218
219         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
220
221         ptlrpc_req_set_repsize(req, 2, size);
222         req->rq_interpret_reply = osc_getattr_interpret;
223
224         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
225         aa = (struct osc_async_args *)&req->rq_async_args;
226         aa->aa_oi = oinfo;
227
228         ptlrpc_set_add_req(set, req);
229         RETURN (0);
230 }
231
232 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
233 {
234         struct ptlrpc_request *req;
235         struct ost_body *body;
236         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
237         ENTRY;
238
239         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
240         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
241                               OST_GETATTR, 3, size, NULL);
242         if (!req)
243                 RETURN(-ENOMEM);
244
245         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
246
247         ptlrpc_req_set_repsize(req, 2, size);
248
249         rc = ptlrpc_queue_wait(req);
250         if (rc) {
251                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
252                 GOTO(out, rc);
253         }
254
255         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
256                                   lustre_swab_ost_body);
257         if (body == NULL) {
258                 CERROR ("can't unpack ost_body\n");
259                 GOTO (out, rc = -EPROTO);
260         }
261
262         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
263         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
264
265         /* This should really be sent by the OST */
266         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
267         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
268
269         EXIT;
270  out:
271         ptlrpc_req_finished(req);
272         return rc;
273 }
274
275 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
276                        struct obd_trans_info *oti)
277 {
278         struct ptlrpc_request *req;
279         struct ost_body *body;
280         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
281         ENTRY;
282
283         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || 
284                                         oinfo->oi_oa->o_gr > 0);
285         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
286         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
287                               OST_SETATTR, 3, size, NULL);
288         if (!req)
289                 RETURN(-ENOMEM);
290
291         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
292
293         ptlrpc_req_set_repsize(req, 2, size);
294
295         rc = ptlrpc_queue_wait(req);
296         if (rc)
297                 GOTO(out, rc);
298
299         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
300                                   lustre_swab_ost_body);
301         if (body == NULL)
302                 GOTO(out, rc = -EPROTO);
303
304         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
305
306         EXIT;
307 out:
308         ptlrpc_req_finished(req);
309         RETURN(rc);
310 }
311
312 static int osc_setattr_interpret(struct ptlrpc_request *req,
313                                  struct osc_async_args *aa, int rc)
314 {
315         struct ost_body *body;
316         ENTRY;
317
318         if (rc != 0)
319                 GOTO(out, rc);
320
321         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
322                                   lustre_swab_ost_body);
323         if (body == NULL) {
324                 CERROR("can't unpack ost_body\n");
325                 GOTO(out, rc = -EPROTO);
326         }
327
328         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
329 out:
330         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
331         RETURN(rc);
332 }
333
334 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
335                              struct obd_trans_info *oti,
336                              struct ptlrpc_request_set *rqset)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body *body;
340         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
341         struct osc_async_args *aa;
342         ENTRY;
343
344         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
345         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
346                               OST_SETATTR, 3, size, NULL);
347         if (!req)
348                 RETURN(-ENOMEM);
349
350         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
351         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
352                 LASSERT(oti);
353                 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
354                                       sizeof(*body));
355                 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
356                        sizeof(*oti->oti_logcookies));
357         }
358
359         ptlrpc_req_set_repsize(req, 2, size);
360         /* do mds to ost setattr asynchronouly */
361         if (!rqset) {
362                 /* Do not wait for response. */
363                 ptlrpcd_add_req(req);
364         } else {
365                 req->rq_interpret_reply = osc_setattr_interpret;
366
367                 LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
368                 aa = (struct osc_async_args *)&req->rq_async_args;
369                 aa->aa_oi = oinfo;
370
371                 ptlrpc_set_add_req(rqset, req);
372         }
373
374         RETURN(0);
375 }
376
377 int osc_real_create(struct obd_export *exp, struct obdo *oa,
378                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
379 {
380         struct ptlrpc_request *req;
381         struct ost_body *body;
382         struct lov_stripe_md *lsm;
383         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
384         ENTRY;
385
386         LASSERT(oa);
387         LASSERT(ea);
388
389         lsm = *ea;
390         if (!lsm) {
391                 rc = obd_alloc_memmd(exp, &lsm);
392                 if (rc < 0)
393                         RETURN(rc);
394         }
395
396         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
397                               OST_CREATE, 2, size, NULL);
398         if (!req)
399                 GOTO(out, rc = -ENOMEM);
400
401         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
402         body->oa = *oa;
403
404         ptlrpc_req_set_repsize(req, 2, size);
405         if (oa->o_valid & OBD_MD_FLINLINE) {
406                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
407                         oa->o_flags == OBD_FL_DELORPHAN);
408                 DEBUG_REQ(D_HA, req,
409                           "delorphan from OST integration");
410                 /* Don't resend the delorphan req */
411                 req->rq_no_resend = req->rq_no_delay = 1;
412         }
413
414         rc = ptlrpc_queue_wait(req);
415         if (rc)
416                 GOTO(out_req, rc);
417
418         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
419                                   lustre_swab_ost_body);
420         if (body == NULL) {
421                 CERROR ("can't unpack ost_body\n");
422                 GOTO (out_req, rc = -EPROTO);
423         }
424
425         memcpy(oa, &body->oa, sizeof(*oa));
426
427         /* This should really be sent by the OST */
428         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
429         oa->o_valid |= OBD_MD_FLBLKSZ;
430
431         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
432          * have valid lsm_oinfo data structs, so don't go touching that.
433          * This needs to be fixed in a big way.
434          */
435         lsm->lsm_object_id = oa->o_id;
436         lsm->lsm_object_gr = oa->o_gr;
437         *ea = lsm;
438
439         if (oti != NULL) {
440                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
441
442                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
443                         if (!oti->oti_logcookies)
444                                 oti_alloc_cookies(oti, 1);
445                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
446                                sizeof(oti->oti_onecookie));
447                 }
448         }
449
450         CDEBUG(D_HA, "transno: "LPD64"\n",
451                lustre_msg_get_transno(req->rq_repmsg));
452         EXIT;
453 out_req:
454         ptlrpc_req_finished(req);
455 out:
456         if (rc && !*ea)
457                 obd_free_memmd(exp, &lsm);
458         return rc;
459 }
460
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462                                struct osc_async_args *aa, int rc)
463 {
464         struct ost_body *body;
465         ENTRY;
466
467         if (rc != 0)
468                 GOTO(out, rc);
469
470         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471                                   lustre_swab_ost_body);
472         if (body == NULL) {
473                 CERROR ("can't unpack ost_body\n");
474                 GOTO(out, rc = -EPROTO);
475         }
476
477         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
478 out:
479         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
480         RETURN(rc);
481 }
482
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484                      struct obd_trans_info *oti,
485                      struct ptlrpc_request_set *rqset)
486 {
487         struct ptlrpc_request *req;
488         struct osc_async_args *aa;
489         struct ost_body *body;
490         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491         ENTRY;
492
493         if (!oinfo->oi_oa) {
494                 CERROR("oa NULL\n");
495                 RETURN(-EINVAL);
496         }
497
498         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500                               OST_PUNCH, 3, size, NULL);
501         if (!req)
502                 RETURN(-ENOMEM);
503
504         /* FIXME bug 249. Also see bug 7198 */
505         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
506             OBD_CONNECT_REQPORTAL)
507                 req->rq_request_portal = OST_IO_PORTAL;
508
509         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
510         /* overload the size and blocks fields in the oa with start/end */
511         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
512         body->oa.o_size = oinfo->oi_policy.l_extent.start;
513         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
514         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
515
516         ptlrpc_req_set_repsize(req, 2, size);
517
518         req->rq_interpret_reply = osc_punch_interpret;
519         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
520         aa = (struct osc_async_args *)&req->rq_async_args;
521         aa->aa_oi = oinfo;
522         ptlrpc_set_add_req(rqset, req);
523
524         RETURN(0);
525 }
526
527 static int osc_sync(struct obd_export *exp, struct obdo *oa,
528                     struct lov_stripe_md *md, obd_size start, obd_size end,
529                     void *capa)
530 {
531         struct ptlrpc_request *req;
532         struct ost_body *body;
533         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
534         ENTRY;
535
536         if (!oa) {
537                 CERROR("oa NULL\n");
538                 RETURN(-EINVAL);
539         }
540
541         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
542
543         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
544                               OST_SYNC, 3, size, NULL);
545         if (!req)
546                 RETURN(-ENOMEM);
547
548         /* overload the size and blocks fields in the oa with start/end */
549         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
550         body->oa = *oa;
551         body->oa.o_size = start;
552         body->oa.o_blocks = end;
553         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
554
555         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
556
557         ptlrpc_req_set_repsize(req, 2, size);
558
559         rc = ptlrpc_queue_wait(req);
560         if (rc)
561                 GOTO(out, rc);
562
563         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
564                                   lustre_swab_ost_body);
565         if (body == NULL) {
566                 CERROR ("can't unpack ost_body\n");
567                 GOTO (out, rc = -EPROTO);
568         }
569
570         memcpy(oa, &body->oa, sizeof(*oa));
571
572         EXIT;
573  out:
574         ptlrpc_req_finished(req);
575         return rc;
576 }
577
578 /* Destroy requests can be async always on the client, and we don't even really
579  * care about the return code since the client cannot do anything at all about
580  * a destroy failure.
581  * When the MDS is unlinking a filename, it saves the file objects into a
582  * recovery llog, and these object records are cancelled when the OST reports
583  * they were destroyed and sync'd to disk (i.e. transaction committed).
584  * If the client dies, or the OST is down when the object should be destroyed,
585  * the records are not cancelled, and when the OST reconnects to the MDS next,
586  * it will retrieve the llog unlink logs and then sends the log cancellation
587  * cookies to the MDS after committing destroy transactions. */
588 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
589                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
590                        struct obd_export *md_export)
591 {
592         struct ptlrpc_request *req;
593         struct ost_body *body;
594         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
595         ENTRY;
596
597         if (!oa) {
598                 CERROR("oa NULL\n");
599                 RETURN(-EINVAL);
600         }
601
602         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603                               OST_DESTROY, 2, size, NULL);
604         if (!req)
605                 RETURN(-ENOMEM);
606
607         /* FIXME bug 249. Also see bug 7198 */
608         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
609             OBD_CONNECT_REQPORTAL)
610                 req->rq_request_portal = OST_IO_PORTAL;
611
612         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
613         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
614                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
615                        sizeof(*oti->oti_logcookies));
616         body->oa = *oa;
617
618         ptlrpc_req_set_repsize(req, 2, size);
619
620         ptlrpcd_add_req(req);
621         RETURN(0);
622 }
623
624 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
625                                 long writing_bytes)
626 {
627         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
628
629         LASSERT(!(oa->o_valid & bits));
630
631         oa->o_valid |= bits;
632         client_obd_list_lock(&cli->cl_loi_list_lock);
633         oa->o_dirty = cli->cl_dirty;
634         if (cli->cl_dirty > cli->cl_dirty_max) {
635                 CERROR("dirty %lu > dirty_max %lu\n",
636                        cli->cl_dirty, cli->cl_dirty_max);
637                 oa->o_undirty = 0;
638         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
639                 CERROR("dirty %lu - dirty_max %lu too big???\n",
640                        cli->cl_dirty, cli->cl_dirty_max);
641                 oa->o_undirty = 0;
642         } else {
643                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
644                                 (cli->cl_max_rpcs_in_flight + 1);
645                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
646         }
647         oa->o_grant = cli->cl_avail_grant;
648         oa->o_dropped = cli->cl_lost_grant;
649         cli->cl_lost_grant = 0;
650         client_obd_list_unlock(&cli->cl_loi_list_lock);
651         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
652                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
653 }
654
655 /* caller must hold loi_list_lock */
656 static void osc_consume_write_grant(struct client_obd *cli,
657                                     struct osc_async_page *oap)
658 {
659         cli->cl_dirty += CFS_PAGE_SIZE;
660         cli->cl_avail_grant -= CFS_PAGE_SIZE;
661         oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
662         CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", CFS_PAGE_SIZE, oap);
663         LASSERT(cli->cl_avail_grant >= 0);
664 }
665
666 static unsigned long rpcs_in_flight(struct client_obd *cli)
667 {
668         return cli->cl_r_in_flight + cli->cl_w_in_flight;
669 }
670
671 /* caller must hold loi_list_lock */
672 void osc_wake_cache_waiters(struct client_obd *cli)
673 {
674         struct list_head *l, *tmp;
675         struct osc_cache_waiter *ocw;
676
677         ENTRY;
678         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
679                 /* if we can't dirty more, we must wait until some is written */
680                 if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) {
681                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
682                                cli->cl_dirty, cli->cl_dirty_max);
683                         return;
684                 }
685
686                 /* if still dirty cache but no grant wait for pending RPCs that
687                  * may yet return us some grant before doing sync writes */
688                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
689                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
690                                cli->cl_w_in_flight);
691                         return;
692                 }
693
694                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
695                 list_del_init(&ocw->ocw_entry);
696                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
697                         /* no more RPCs in flight to return grant, do sync IO */
698                         ocw->ocw_rc = -EDQUOT;
699                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
700                 } else {
701                         osc_consume_write_grant(cli, ocw->ocw_oap);
702                 }
703
704                 cfs_waitq_signal(&ocw->ocw_waitq);
705         }
706
707         EXIT;
708 }
709
710 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
711 {
712         client_obd_list_lock(&cli->cl_loi_list_lock);
713         cli->cl_avail_grant = ocd->ocd_grant;
714         client_obd_list_unlock(&cli->cl_loi_list_lock);
715
716         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
717                cli->cl_avail_grant, cli->cl_lost_grant);
718         LASSERT(cli->cl_avail_grant >= 0);
719 }
720
721 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
722 {
723         client_obd_list_lock(&cli->cl_loi_list_lock);
724         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
725         cli->cl_avail_grant += body->oa.o_grant;
726         /* waiters are woken in brw_interpret_oap */
727         client_obd_list_unlock(&cli->cl_loi_list_lock);
728 }
729
730 /* We assume that the reason this OSC got a short read is because it read
731  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
732  * via the LOV, and it _knows_ it's reading inside the file, it's just that
733  * this stripe never got written at or beyond this stripe offset yet. */
734 static void handle_short_read(int nob_read, obd_count page_count,
735                               struct brw_page **pga)
736 {
737         char *ptr;
738         int i = 0;
739
740         /* skip bytes read OK */
741         while (nob_read > 0) {
742                 LASSERT (page_count > 0);
743
744                 if (pga[i]->count > nob_read) {
745                         /* EOF inside this page */
746                         ptr = cfs_kmap(pga[i]->pg) + 
747                                 (pga[i]->off & ~CFS_PAGE_MASK);
748                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
749                         cfs_kunmap(pga[i]->pg);
750                         page_count--;
751                         i++;
752                         break;
753                 }
754
755                 nob_read -= pga[i]->count;
756                 page_count--;
757                 i++;
758         }
759
760         /* zero remaining pages */
761         while (page_count-- > 0) {
762                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
763                 memset(ptr, 0, pga[i]->count);
764                 cfs_kunmap(pga[i]->pg);
765                 i++;
766         }
767 }
768
769 static int check_write_rcs(struct ptlrpc_request *req,
770                            int requested_nob, int niocount,
771                            obd_count page_count, struct brw_page **pga)
772 {
773         int    *remote_rcs, i;
774
775         /* return error if any niobuf was in error */
776         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
777                                         sizeof(*remote_rcs) * niocount, NULL);
778         if (remote_rcs == NULL) {
779                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
780                 return(-EPROTO);
781         }
782         if (lustre_msg_swabbed(req->rq_repmsg))
783                 for (i = 0; i < niocount; i++)
784                         __swab32s(&remote_rcs[i]);
785
786         for (i = 0; i < niocount; i++) {
787                 if (remote_rcs[i] < 0)
788                         return(remote_rcs[i]);
789
790                 if (remote_rcs[i] != 0) {
791                         CERROR("rc[%d] invalid (%d) req %p\n",
792                                 i, remote_rcs[i], req);
793                         return(-EPROTO);
794                 }
795         }
796
797         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
798                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
799                        requested_nob, req->rq_bulk->bd_nob_transferred);
800                 return(-EPROTO);
801         }
802
803         return (0);
804 }
805
806 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
807 {
808         if (p1->flag != p2->flag) {
809                 unsigned mask = ~OBD_BRW_FROM_GRANT;
810
811                 /* warn if we try to combine flags that we don't know to be
812                  * safe to combine */
813                 if ((p1->flag & mask) != (p2->flag & mask))
814                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
815                                "same brw?\n", p1->flag, p2->flag);
816                 return 0;
817         }
818
819         return (p1->off + p1->count == p2->off);
820 }
821
822 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
823                                    struct brw_page **pga)
824 {
825         __u32 cksum = ~0;
826         int i = 0;
827
828         LASSERT (pg_count > 0);
829         while (nob > 0 && pg_count > 0) {
830                 char *ptr = cfs_kmap(pga[i]->pg);
831                 int off = pga[i]->off & ~CFS_PAGE_MASK;
832                 int count = pga[i]->count > nob ? nob : pga[i]->count;
833
834                 cksum = crc32_le(cksum, ptr + off, count);
835                 cfs_kunmap(pga[i]->pg);
836                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
837                                off, cksum);
838
839                 nob -= pga[i]->count;
840                 pg_count--;
841                 i++;
842         }
843
844         return cksum;
845 }
846
847 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
848                                 struct lov_stripe_md *lsm, obd_count page_count,
849                                 struct brw_page **pga, int *requested_nobp,
850                                 int *niocountp, struct ptlrpc_request **reqp,
851                                 struct obd_capa *ocapa)
852 {
853         struct ptlrpc_request   *req;
854         struct ptlrpc_bulk_desc *desc;
855         struct client_obd       *cli = &imp->imp_obd->u.cli;
856         struct ost_body         *body;
857         struct obd_ioobj        *ioobj;
858         struct niobuf_remote    *niobuf;
859         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
860         int niocount, i, requested_nob, opc, rc;
861         struct ptlrpc_request_pool *pool;
862         struct lustre_capa      *capa;
863
864         ENTRY;
865         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
866         pool = ((cmd & OBD_BRW_WRITE) != 0) ? imp->imp_rq_pool : NULL;
867
868         for (niocount = i = 1; i < page_count; i++) {
869                 if (!can_merge_pages(pga[i - 1], pga[i]))
870                         niocount++;
871         }
872
873         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
874         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
875         if (ocapa)
876                 size[REQ_REC_OFF + 3] = sizeof(*capa);
877
878         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
879         req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 5, size, NULL,
880                                    pool, NULL);
881         if (req == NULL)
882                 RETURN (-ENOMEM);
883
884         /* FIXME bug 249. Also see bug 7198 */
885         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
886                 req->rq_request_portal = OST_IO_PORTAL;
887
888         if (opc == OST_WRITE)
889                 desc = ptlrpc_prep_bulk_imp (req, page_count,
890                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
891         else
892                 desc = ptlrpc_prep_bulk_imp (req, page_count,
893                                              BULK_PUT_SINK, OST_BULK_PORTAL);
894         if (desc == NULL)
895                 GOTO(out, rc = -ENOMEM);
896         /* NB request now owns desc and will free it when it gets freed */
897
898         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
899         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
900         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
901                                 niocount * sizeof(*niobuf));
902
903         body->oa = *oa;
904
905         obdo_to_ioobj(oa, ioobj);
906         ioobj->ioo_bufcnt = niocount;
907         if (ocapa) {
908                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
909                                       sizeof(*capa));
910                 capa_cpy(capa, ocapa);
911                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
912         }
913
914         LASSERT (page_count > 0);
915         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
916                 struct brw_page *pg = pga[i];
917                 struct brw_page *pg_prev = pga[i - 1];
918
919                 LASSERT(pg->count > 0);
920                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
921                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
922                          pg->off, pg->count);
923 #ifdef __LINUX__
924                 LASSERTF(i == 0 || pg->off > pg_prev->off,
925                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
926                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
927                          i, page_count,
928                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
929                          pg_prev->pg, page_private(pg_prev->pg),
930                          pg_prev->pg->index, pg_prev->off);
931 #else
932                 LASSERTF(i == 0 || pg->off > pg_prev->off,
933                          "i %d p_c %u\n", i, page_count);
934 #endif
935                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
936                         (pg->flag & OBD_BRW_SRVLOCK));
937
938                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
939                                       pg->count);
940                 requested_nob += pg->count;
941
942                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
943                         niobuf--;
944                         niobuf->len += pg->count;
945                 } else {
946                         niobuf->offset = pg->off;
947                         niobuf->len    = pg->count;
948                         niobuf->flags  = pg->flag;
949                 }
950         }
951
952         LASSERT((void *)(niobuf - niocount) ==
953                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
954                                niocount * sizeof(*niobuf)));
955         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
956
957         /* size[REQ_REC_OFF] still sizeof (*body) */
958         if (opc == OST_WRITE) {
959                 if (unlikely(cli->cl_checksum)) {
960                         body->oa.o_valid |= OBD_MD_FLCKSUM;
961                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
962                                                              page_count, pga);
963                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
964                                body->oa.o_cksum);
965                         /* save this in 'oa', too, for later checking */
966                         oa->o_valid |= OBD_MD_FLCKSUM;
967                         oa->o_cksum = body->oa.o_cksum;
968                 }
969                 /* 1 RC per niobuf */
970                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
971                 ptlrpc_req_set_repsize(req, 3, size);
972         } else {
973                 if (unlikely(cli->cl_checksum))
974                         body->oa.o_valid |= OBD_MD_FLCKSUM;
975                 /* 1 RC for the whole I/O */
976                 ptlrpc_req_set_repsize(req, 2, size);
977         }
978
979         *niocountp = niocount;
980         *requested_nobp = requested_nob;
981         *reqp = req;
982         RETURN (0);
983
984  out:
985         ptlrpc_req_finished (req);
986         RETURN (rc);
987 }
988
989 static void check_write_csum(__u32 cli, __u32 srv, int requested_nob,
990                              obd_count page_count, struct brw_page **pga)
991 {
992         __u32 new_csum;
993
994         if (srv == cli) {
995                 CDEBUG(D_PAGE, "checksum %x confirmed\n", cli);
996                 return;
997         }
998
999         new_csum = osc_checksum_bulk(requested_nob, page_count, pga);
1000
1001         if (new_csum == srv) {
1002                 CERROR("BAD CHECKSUM (WRITE): pages were mutated on the client"
1003                        "after we checksummed them (original client csum:"
1004                        " %x; server csum: %x; client csum now: %x)\n",
1005                        cli, srv, new_csum);
1006                 return;
1007         }
1008
1009         if (new_csum == cli) {
1010                 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit "
1011                        "(original client csum: %x; server csum: %x; client "
1012                        "csum now: %x)\n", cli, srv, new_csum);
1013                 return;
1014         }
1015
1016         CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit, and the "
1017                "current page contents don't match the originals OR what the "
1018                "server received (original client csum: %x; server csum: %x; "
1019                "client csum now: %x)\n", cli, srv, new_csum);
1020 }
1021
1022 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
1023                                 int requested_nob, int niocount,
1024                                 obd_count page_count, struct brw_page **pga,
1025                                 int rc)
1026 {
1027         const lnet_process_id_t *peer =
1028                         &req->rq_import->imp_connection->c_peer;
1029         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
1030         struct ost_body *body;
1031         __u32 client_cksum = 0;
1032         ENTRY;
1033
1034         if (rc < 0 && rc != -EDQUOT)
1035                 RETURN(rc);
1036
1037         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1038         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1039                                   lustre_swab_ost_body);
1040         if (body == NULL) {
1041                 CERROR ("Can't unpack body\n");
1042                 RETURN(-EPROTO);
1043         }
1044
1045         /* set/clear over quota flag for a uid/gid */
1046         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1047             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1048                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1049                              body->oa.o_gid, body->oa.o_valid,
1050                              body->oa.o_flags);
1051
1052         if (rc < 0)
1053                 RETURN(rc);
1054
1055         if (unlikely(oa->o_valid & OBD_MD_FLCKSUM))
1056                 client_cksum = oa->o_cksum; /* save for later */
1057
1058         osc_update_grant(cli, body);
1059         memcpy(oa, &body->oa, sizeof(*oa));
1060
1061         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1062                 if (rc > 0) {
1063                         CERROR ("Unexpected +ve rc %d\n", rc);
1064                         RETURN(-EPROTO);
1065                 }
1066                 LASSERT (req->rq_bulk->bd_nob == requested_nob);
1067
1068                 if (unlikely((oa->o_valid & OBD_MD_FLCKSUM) &&
1069                              client_cksum)) {
1070                         check_write_csum(client_cksum, oa->o_cksum,
1071                                          requested_nob, page_count, pga);
1072                 }
1073
1074                 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
1075
1076                 RETURN(check_write_rcs(req, requested_nob, niocount,
1077                                        page_count, pga));
1078         }
1079
1080         /* The rest of this function executes only for OST_READs */
1081         if (rc > requested_nob) {
1082                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
1083                 RETURN(-EPROTO);
1084         }
1085
1086         if (rc != req->rq_bulk->bd_nob_transferred) {
1087                 CERROR ("Unexpected rc %d (%d transferred)\n",
1088                         rc, req->rq_bulk->bd_nob_transferred);
1089                 return (-EPROTO);
1090         }
1091
1092         if (rc < requested_nob)
1093                 handle_short_read(rc, page_count, pga);
1094
1095         if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) {
1096                 static int cksum_counter;
1097                 __u32 cksum = osc_checksum_bulk(rc, page_count, pga);
1098                 __u32 server_cksum = oa->o_cksum;
1099
1100                 if (server_cksum == ~0 && rc > 0) {
1101                         CERROR("Protocol error: server %s set the 'checksum' "
1102                                "bit, but didn't send a checksum.  Not fatal, "
1103                                "but please tell CFS.\n",
1104                                libcfs_nid2str(peer->nid));
1105                         RETURN(0);
1106                 }
1107
1108                 cksum_counter++;
1109
1110                 if (server_cksum != cksum) {
1111                         CERROR("Bad checksum from %s: server %x != client %x\n",
1112                                libcfs_nid2str(peer->nid), server_cksum, cksum);
1113                         cksum_counter = 0;
1114                         oa->o_cksum = cksum;
1115                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1116                         CWARN("Checksum %u from %s OK: %x\n",
1117                               cksum_counter, libcfs_nid2str(peer->nid), cksum);
1118                 }
1119                 CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum);
1120         } else if (unlikely(client_cksum)) {
1121                 static int cksum_missed;
1122
1123                 cksum_missed++;
1124                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1125                         CERROR("Checksum %u requested from %s but not sent\n",
1126                                cksum_missed, libcfs_nid2str(peer->nid));
1127         }
1128
1129         sptlrpc_cli_unwrap_bulk_read(req, rc, page_count, pga);
1130
1131         RETURN(0);
1132 }
1133
1134 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1135                             struct lov_stripe_md *lsm,
1136                             obd_count page_count, struct brw_page **pga,
1137                             struct obd_capa *ocapa)
1138 {
1139         int                    requested_nob;
1140         int                    niocount;
1141         struct ptlrpc_request *req;
1142         int                    rc;
1143         ENTRY;
1144
1145 restart_bulk:
1146         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1147                                   page_count, pga, &requested_nob, &niocount,
1148                                   &req, ocapa);
1149         if (rc != 0)
1150                 return (rc);
1151
1152         rc = ptlrpc_queue_wait(req);
1153
1154         if (rc == -ETIMEDOUT && req->rq_resend) {
1155                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1156                 ptlrpc_req_finished(req);
1157                 goto restart_bulk;
1158         }
1159
1160         rc = osc_brw_fini_request(req, oa, requested_nob, niocount,
1161                                   page_count, pga, rc);
1162
1163         ptlrpc_req_finished(req);
1164         RETURN (rc);
1165 }
1166
1167 static int brw_interpret(struct ptlrpc_request *req,
1168                          struct osc_brw_async_args *aa, int rc)
1169 {
1170         struct obdo *oa      = aa->aa_oa;
1171         int requested_nob    = aa->aa_requested_nob;
1172         int niocount         = aa->aa_nio_count;
1173         obd_count page_count = aa->aa_page_count;
1174         struct brw_page **pga = aa->aa_ppga;
1175         ENTRY;
1176
1177         rc = osc_brw_fini_request(req, oa, requested_nob, niocount,
1178                                   page_count, pga, rc);
1179         osc_release_ppga(pga, page_count);
1180         RETURN (rc);
1181 }
1182
1183 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1184                           struct lov_stripe_md *lsm, obd_count page_count,
1185                           struct brw_page **pga, struct ptlrpc_request_set *set,
1186                           struct obd_capa *ocapa)
1187 {
1188         struct ptlrpc_request     *req;
1189         int                        requested_nob;
1190         int                        nio_count;
1191         struct osc_brw_async_args *aa;
1192         int                        rc;
1193         ENTRY;
1194
1195         /* Consume write credits even if doing a sync write -
1196          * otherwise we may run out of space on OST due to grant. */
1197         spin_lock(&exp->exp_obd->u.cli.cl_loi_list_lock);
1198         for (nio_count = 0; nio_count < page_count; nio_count++) {
1199                 if (exp->exp_obd->u.cli.cl_avail_grant >= PAGE_SIZE) {
1200                         exp->exp_obd->u.cli.cl_avail_grant -= PAGE_SIZE;
1201                         pga[nio_count]->flag |= OBD_BRW_FROM_GRANT;
1202                 }
1203         }
1204         spin_unlock(&exp->exp_obd->u.cli.cl_loi_list_lock);
1205
1206         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1207                                   page_count, pga, &requested_nob, &nio_count,
1208                                   &req, ocapa);
1209
1210         if (rc == 0) {
1211                 LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1212                 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1213                 aa->aa_oa = oa;
1214                 aa->aa_requested_nob = requested_nob;
1215                 aa->aa_nio_count = nio_count;
1216                 aa->aa_page_count = page_count;
1217                 aa->aa_ppga = pga;
1218
1219                 req->rq_interpret_reply = brw_interpret;
1220                 ptlrpc_set_add_req(set, req);
1221         }
1222         RETURN (rc);
1223 }
1224
1225 /*
1226  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1227  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1228  * fine for our small page arrays and doesn't require allocation.  its an
1229  * insertion sort that swaps elements that are strides apart, shrinking the
1230  * stride down until its '1' and the array is sorted.
1231  */
1232 static void sort_brw_pages(struct brw_page **array, int num)
1233 {
1234         int stride, i, j;
1235         struct brw_page *tmp;
1236
1237         if (num == 1)
1238                 return;
1239         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1240                 ;
1241
1242         do {
1243                 stride /= 3;
1244                 for (i = stride ; i < num ; i++) {
1245                         tmp = array[i];
1246                         j = i;
1247                         while (j >= stride && array[j - stride]->off > tmp->off) {
1248                                 array[j] = array[j - stride];
1249                                 j -= stride;
1250                         }
1251                         array[j] = tmp;
1252                 }
1253         } while (stride > 1);
1254 }
1255
1256 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1257 {
1258         int count = 1;
1259         int offset;
1260         int i = 0;
1261
1262         LASSERT (pages > 0);
1263         offset = pg[i]->off & (CFS_PAGE_SIZE - 1);
1264
1265         for (;;) {
1266                 pages--;
1267                 if (pages == 0)         /* that's all */
1268                         return count;
1269
1270                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1271                         return count;   /* doesn't end on page boundary */
1272
1273                 i++;
1274                 offset = pg[i]->off & (CFS_PAGE_SIZE - 1);
1275                 if (offset != 0)        /* doesn't start on page boundary */
1276                         return count;
1277
1278                 count++;
1279         }
1280 }
1281
1282 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1283 {
1284         struct brw_page **ppga;
1285         int i;
1286
1287         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1288         if (ppga == NULL)
1289                 return NULL;
1290
1291         for (i = 0; i < count; i++)
1292                 ppga[i] = pga + i;
1293         return ppga;
1294 }
1295
1296 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1297 {
1298         LASSERT(ppga != NULL);
1299         OBD_FREE(ppga, sizeof(*ppga) * count);
1300 }
1301
1302 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1303                    obd_count page_count, struct brw_page *pga,
1304                    struct obd_trans_info *oti)
1305 {
1306         struct obdo *saved_oa = NULL;
1307         struct brw_page **ppga, **orig;
1308         struct obd_import *imp = class_exp2cliimp(exp);
1309         struct client_obd *cli = &imp->imp_obd->u.cli;
1310         int rc, page_count_orig;
1311         ENTRY;
1312
1313         if (cmd & OBD_BRW_CHECK) {
1314                 /* The caller just wants to know if there's a chance that this
1315                  * I/O can succeed */
1316
1317                 if (imp == NULL || imp->imp_invalid)
1318                         RETURN(-EIO);
1319                 RETURN(0);
1320         }
1321
1322         rc = 0;
1323
1324         orig = ppga = osc_build_ppga(pga, page_count);
1325         if (ppga == NULL)
1326                 RETURN(-ENOMEM);
1327         page_count_orig = page_count;
1328
1329         sort_brw_pages(ppga, page_count);
1330         while (page_count) {
1331                 obd_count pages_per_brw;
1332
1333                 if (page_count > cli->cl_max_pages_per_rpc)
1334                         pages_per_brw = cli->cl_max_pages_per_rpc;
1335                 else
1336                         pages_per_brw = page_count;
1337
1338                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1339
1340                 if (saved_oa != NULL) {
1341                         /* restore previously saved oa */
1342                         *oinfo->oi_oa = *saved_oa;
1343                 } else if (page_count > pages_per_brw) {
1344                         /* save a copy of oa (brw will clobber it) */
1345                         saved_oa = obdo_alloc();
1346                         if (saved_oa == NULL)
1347                                 GOTO(out, rc = -ENOMEM);
1348                         *saved_oa = *oinfo->oi_oa;
1349                 }
1350
1351                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1352                                       pages_per_brw, ppga, oinfo->oi_capa);
1353
1354                 if (rc != 0)
1355                         break;
1356
1357                 page_count -= pages_per_brw;
1358                 ppga += pages_per_brw;
1359         }
1360
1361 out:
1362         osc_release_ppga(orig, page_count_orig);
1363
1364         if (saved_oa != NULL)
1365                 obdo_free(saved_oa);
1366
1367         RETURN(rc);
1368 }
1369
1370 static int osc_brw_async(int cmd, struct obd_export *exp,
1371                          struct obd_info *oinfo, obd_count page_count,
1372                          struct brw_page *pga, struct obd_trans_info *oti,
1373                          struct ptlrpc_request_set *set)
1374 {
1375         struct brw_page **ppga, **orig, **copy;
1376         struct obd_import *imp = class_exp2cliimp(exp);
1377         struct client_obd *cli = &imp->imp_obd->u.cli;
1378         int page_count_orig;
1379         int rc = 0;
1380         ENTRY;
1381
1382         if (cmd & OBD_BRW_CHECK) {
1383                 /* The caller just wants to know if there's a chance that this
1384                  * I/O can succeed */
1385
1386                 if (imp == NULL || imp->imp_invalid)
1387                         RETURN(-EIO);
1388                 RETURN(0);
1389         }
1390
1391         orig = ppga = osc_build_ppga(pga, page_count);
1392         if (ppga == NULL)
1393                 RETURN(-ENOMEM);
1394         page_count_orig = page_count;
1395
1396         sort_brw_pages(ppga, page_count);
1397         while (page_count) {
1398                 obd_count pages_per_brw;
1399
1400                 if (page_count > cli->cl_max_pages_per_rpc)
1401                         pages_per_brw = cli->cl_max_pages_per_rpc;
1402                 else
1403                         pages_per_brw = page_count;
1404
1405                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1406
1407                 /* use ppga only if single RPC is going to fly */
1408                 if (pages_per_brw != page_count_orig || ppga != orig) {
1409                         int size = sizeof(struct brw_page *) * pages_per_brw;
1410                         OBD_ALLOC(copy, size);
1411                         if (copy == NULL)
1412                                 GOTO(out, rc = -ENOMEM);
1413                         memcpy(copy, ppga, size);
1414                 } else
1415                         copy = ppga;
1416
1417                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1418                                     pages_per_brw, copy, set, oinfo->oi_capa);
1419
1420                 if (rc != 0)
1421                         break;
1422
1423                 if (copy == orig) {
1424                         /* we passed it to async_internal() which is
1425                          * now responsible for releasing memory */
1426                         orig = NULL;
1427                 }
1428
1429                 page_count -= pages_per_brw;
1430                 ppga += pages_per_brw;
1431         }
1432 out:
1433         if (orig)
1434                 osc_release_ppga(orig, page_count_orig);
1435         RETURN(rc);
1436 }
1437
1438 static void osc_check_rpcs(struct client_obd *cli);
1439 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1440                            int sent);
1441
1442 /* This maintains the lists of pending pages to read/write for a given object
1443  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1444  * to quickly find objects that are ready to send an RPC. */
1445 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1446                          int cmd)
1447 {
1448         int optimal;
1449         ENTRY;
1450
1451         if (lop->lop_num_pending == 0)
1452                 RETURN(0);
1453
1454         /* if we have an invalid import we want to drain the queued pages
1455          * by forcing them through rpcs that immediately fail and complete
1456          * the pages.  recovery relies on this to empty the queued pages
1457          * before canceling the locks and evicting down the llite pages */
1458         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1459                 RETURN(1);
1460
1461         /* stream rpcs in queue order as long as as there is an urgent page
1462          * queued.  this is our cheap solution for good batching in the case
1463          * where writepage marks some random page in the middle of the file
1464          * as urgent because of, say, memory pressure */
1465         if (!list_empty(&lop->lop_urgent))
1466                 RETURN(1);
1467
1468         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1469         optimal = cli->cl_max_pages_per_rpc;
1470         if (cmd & OBD_BRW_WRITE) {
1471                 /* trigger a write rpc stream as long as there are dirtiers
1472                  * waiting for space.  as they're waiting, they're not going to
1473                  * create more pages to coallesce with what's waiting.. */
1474                 if (!list_empty(&cli->cl_cache_waiters))
1475                         RETURN(1);
1476
1477                 /* +16 to avoid triggering rpcs that would want to include pages
1478                  * that are being queued but which can't be made ready until
1479                  * the queuer finishes with the page. this is a wart for
1480                  * llite::commit_write() */
1481                 optimal += 16;
1482         }
1483         if (lop->lop_num_pending >= optimal)
1484                 RETURN(1);
1485
1486         RETURN(0);
1487 }
1488
1489 static void on_list(struct list_head *item, struct list_head *list,
1490                     int should_be_on)
1491 {
1492         if (list_empty(item) && should_be_on)
1493                 list_add_tail(item, list);
1494         else if (!list_empty(item) && !should_be_on)
1495                 list_del_init(item);
1496 }
1497
1498 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1499  * can find pages to build into rpcs quickly */
1500 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1501 {
1502         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1503                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1504                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1505
1506         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1507                 loi->loi_write_lop.lop_num_pending);
1508
1509         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1510                 loi->loi_read_lop.lop_num_pending);
1511 }
1512
1513 static void lop_update_pending(struct client_obd *cli,
1514                                struct loi_oap_pages *lop, int cmd, int delta)
1515 {
1516         lop->lop_num_pending += delta;
1517         if (cmd & OBD_BRW_WRITE)
1518                 cli->cl_pending_w_pages += delta;
1519         else
1520                 cli->cl_pending_r_pages += delta;
1521 }
1522
1523 /* this is called when a sync waiter receives an interruption.  Its job is to
1524  * get the caller woken as soon as possible.  If its page hasn't been put in an
1525  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1526  * desiring interruption which will forcefully complete the rpc once the rpc
1527  * has timed out */
1528 static void osc_occ_interrupted(struct oig_callback_context *occ)
1529 {
1530         struct osc_async_page *oap;
1531         struct loi_oap_pages *lop;
1532         struct lov_oinfo *loi;
1533         ENTRY;
1534
1535         /* XXX member_of() */
1536         oap = list_entry(occ, struct osc_async_page, oap_occ);
1537
1538         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1539
1540         oap->oap_interrupted = 1;
1541
1542         /* ok, it's been put in an rpc. */
1543         if (oap->oap_request != NULL) {
1544                 ptlrpc_mark_interrupted(oap->oap_request);
1545                 ptlrpcd_wake(oap->oap_request);
1546                 GOTO(unlock, 0);
1547         }
1548
1549         /* we don't get interruption callbacks until osc_trigger_group_io()
1550          * has been called and put the sync oaps in the pending/urgent lists.*/
1551         if (!list_empty(&oap->oap_pending_item)) {
1552                 list_del_init(&oap->oap_pending_item);
1553                 list_del_init(&oap->oap_urgent_item);
1554
1555                 loi = oap->oap_loi;
1556                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1557                         &loi->loi_write_lop : &loi->loi_read_lop;
1558                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1559                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1560
1561                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1562                 oap->oap_oig = NULL;
1563         }
1564
1565 unlock:
1566         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1567 }
1568
1569 /* this is trying to propogate async writeback errors back up to the
1570  * application.  As an async write fails we record the error code for later if
1571  * the app does an fsync.  As long as errors persist we force future rpcs to be
1572  * sync so that the app can get a sync error and break the cycle of queueing
1573  * pages for which writeback will fail. */
1574 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1575                            int rc)
1576 {
1577         if (rc) {
1578                 if (!ar->ar_rc)
1579                         ar->ar_rc = rc;
1580
1581                 ar->ar_force_sync = 1;
1582                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1583                 return;
1584
1585         }
1586
1587         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1588                 ar->ar_force_sync = 0;
1589 }
1590
1591 static void osc_oap_to_pending(struct osc_async_page *oap)
1592 {
1593         struct loi_oap_pages *lop;
1594
1595         if (oap->oap_cmd & OBD_BRW_WRITE)
1596                 lop = &oap->oap_loi->loi_write_lop;
1597         else
1598                 lop = &oap->oap_loi->loi_read_lop;
1599
1600         if (oap->oap_async_flags & ASYNC_URGENT)
1601                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1602         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1603         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1604 }
1605
1606 /* this must be called holding the loi list lock to give coverage to exit_cache,
1607  * async_flag maintenance, and oap_request */
1608 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1609                               struct osc_async_page *oap, int sent, int rc)
1610 {
1611         ENTRY;
1612         oap->oap_async_flags = 0;
1613         oap->oap_interrupted = 0;
1614
1615         if (oap->oap_cmd & OBD_BRW_WRITE) {
1616                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1617                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1618         }
1619
1620         if (oap->oap_request != NULL) {
1621                 ptlrpc_req_finished(oap->oap_request);
1622                 oap->oap_request = NULL;
1623         }
1624
1625         if (rc == 0 && oa != NULL) {
1626                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1627                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1628                 if (oa->o_valid & OBD_MD_FLMTIME)
1629                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1630                 if (oa->o_valid & OBD_MD_FLATIME)
1631                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1632                 if (oa->o_valid & OBD_MD_FLCTIME)
1633                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1634         }
1635
1636         if (oap->oap_oig) {
1637                 osc_exit_cache(cli, oap, sent);
1638                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1639                 oap->oap_oig = NULL;
1640                 EXIT;
1641                 return;
1642         }
1643
1644         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1645                                                 oap->oap_cmd, oa, rc);
1646
1647         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1648          * I/O on the page could start, but OSC calls it under lock
1649          * and thus we can add oap back to pending safely */
1650         if (rc)
1651                 /* upper layer wants to leave the page on pending queue */
1652                 osc_oap_to_pending(oap);
1653         else
1654                 osc_exit_cache(cli, oap, sent);
1655         EXIT;
1656 }
1657
1658 static int brw_interpret_oap(struct ptlrpc_request *req,
1659                              struct osc_brw_async_args *aa, int rc)
1660 {
1661         struct osc_async_page *oap;
1662         struct client_obd *cli;
1663         struct list_head *pos, *n;
1664         ENTRY;
1665
1666         rc = osc_brw_fini_request(req, aa->aa_oa, aa->aa_requested_nob,
1667                                   aa->aa_nio_count, aa->aa_page_count,
1668                                   aa->aa_ppga, rc);
1669
1670         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1671
1672         cli = aa->aa_cli;
1673
1674         client_obd_list_lock(&cli->cl_loi_list_lock);
1675
1676         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1677          * is called so we know whether to go to sync BRWs or wait for more
1678          * RPCs to complete */
1679         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1680                 cli->cl_w_in_flight--;
1681         else
1682                 cli->cl_r_in_flight--;
1683
1684         /* the caller may re-use the oap after the completion call so
1685          * we need to clean it up a little */
1686         list_for_each_safe(pos, n, &aa->aa_oaps) {
1687                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1688
1689                 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1690                        //oap->oap_page, oap->oap_page->index, oap);
1691
1692                 list_del_init(&oap->oap_rpc_item);
1693                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1694         }
1695
1696         osc_wake_cache_waiters(cli);
1697         osc_check_rpcs(cli);
1698
1699         client_obd_list_unlock(&cli->cl_loi_list_lock);
1700
1701         obdo_free(aa->aa_oa);
1702         OBD_FREE(aa->aa_ppga, aa->aa_page_count * sizeof(struct brw_page *));
1703
1704         RETURN(0);
1705 }
1706
1707 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1708                                             struct list_head *rpc_list,
1709                                             int page_count, int cmd)
1710 {
1711         struct ptlrpc_request *req;
1712         struct brw_page **pga = NULL;
1713         int requested_nob, nio_count;
1714         struct osc_brw_async_args *aa;
1715         struct obdo *oa = NULL;
1716         struct obd_async_page_ops *ops = NULL;
1717         void *caller_data = NULL;
1718         struct list_head *pos;
1719         struct obd_capa *ocapa;
1720         int i, rc;
1721
1722         ENTRY;
1723         LASSERT(!list_empty(rpc_list));
1724
1725         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1726         if (pga == NULL)
1727                 RETURN(ERR_PTR(-ENOMEM));
1728
1729         oa = obdo_alloc();
1730         if (oa == NULL)
1731                 GOTO(out, req = ERR_PTR(-ENOMEM));
1732
1733         i = 0;
1734         list_for_each(pos, rpc_list) {
1735                 struct osc_async_page *oap;
1736
1737                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1738                 if (ops == NULL) {
1739                         ops = oap->oap_caller_ops;
1740                         caller_data = oap->oap_caller_data;
1741                 }
1742                 pga[i] = &oap->oap_brw_page;
1743                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1744                 /*pga[i]->pg = oap->oap_page;
1745                 pga[i]->count = oap->oap_count;
1746                 pga[i]->flag = oap->oap_brw_flags;*/
1747                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1748                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1749                 i++;
1750         }
1751
1752         /* always get the data for the obdo for the rpc */
1753         LASSERT(ops != NULL);
1754         ops->ap_fill_obdo(caller_data, cmd, oa);
1755         ocapa = ops->ap_lookup_capa(caller_data, cmd);
1756
1757         sort_brw_pages(pga, page_count);
1758         rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1759                                   pga, &requested_nob, &nio_count, &req, ocapa);
1760         capa_put(ocapa);
1761         if (rc != 0) {
1762                 CERROR("prep_req failed: %d\n", rc);
1763                 GOTO(out, req = ERR_PTR(rc));
1764         }
1765
1766         /* Need to update the timestamps after the request is built in case
1767          * we race with setattr (locally or in queue at OST).  If OST gets
1768          * later setattr before earlier BRW (as determined by the request xid),
1769          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1770          * way to do this in a single call.  bug 10150 */
1771         ops->ap_update_obdo(caller_data, cmd, oa,
1772                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1773
1774         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1775         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1776         aa->aa_oa = oa;
1777         aa->aa_requested_nob = requested_nob;
1778         aa->aa_nio_count = nio_count;
1779         aa->aa_page_count = page_count;
1780         aa->aa_ppga = pga;
1781         aa->aa_cli = cli;
1782
1783 out:
1784         if (IS_ERR(req)) {
1785                 if (oa)
1786                         obdo_free(oa);
1787                 if (pga)
1788                         OBD_FREE(pga, sizeof(*pga) * page_count);
1789         }
1790         RETURN(req);
1791 }
1792
1793 /* the loi lock is held across this function but it's allowed to release
1794  * and reacquire it during its work */
1795 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1796                             int cmd, struct loi_oap_pages *lop)
1797 {
1798         struct ptlrpc_request *req;
1799         obd_count page_count = 0;
1800         struct list_head *tmp, *pos;
1801         struct osc_async_page *oap = NULL;
1802         struct osc_brw_async_args *aa;
1803         struct obd_async_page_ops *ops;
1804         CFS_LIST_HEAD(rpc_list);
1805         unsigned int ending_offset;
1806         unsigned  starting_offset = 0;
1807         ENTRY;
1808
1809         /* first we find the pages we're allowed to work with */
1810         list_for_each_safe(pos, tmp, &lop->lop_pending) {
1811                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1812                 ops = oap->oap_caller_ops;
1813
1814                 LASSERT(oap->oap_magic == OAP_MAGIC);
1815
1816                 /* in llite being 'ready' equates to the page being locked
1817                  * until completion unlocks it.  commit_write submits a page
1818                  * as not ready because its unlock will happen unconditionally
1819                  * as the call returns.  if we race with commit_write giving
1820                  * us that page we dont' want to create a hole in the page
1821                  * stream, so we stop and leave the rpc to be fired by
1822                  * another dirtier or kupdated interval (the not ready page
1823                  * will still be on the dirty list).  we could call in
1824                  * at the end of ll_file_write to process the queue again. */
1825                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1826                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1827                         if (rc < 0)
1828                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1829                                                 "instead of ready\n", oap,
1830                                                 oap->oap_page, rc);
1831                         switch (rc) {
1832                         case -EAGAIN:
1833                                 /* llite is telling us that the page is still
1834                                  * in commit_write and that we should try
1835                                  * and put it in an rpc again later.  we
1836                                  * break out of the loop so we don't create
1837                                  * a hole in the sequence of pages in the rpc
1838                                  * stream.*/
1839                                 pos = NULL;
1840                                 break;
1841                         case -EINTR:
1842                                 /* the io isn't needed.. tell the checks
1843                                  * below to complete the rpc with EINTR */
1844                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1845                                 oap->oap_count = -EINTR;
1846                                 break;
1847                         case 0:
1848                                 oap->oap_async_flags |= ASYNC_READY;
1849                                 break;
1850                         default:
1851                                 LASSERTF(0, "oap %p page %p returned %d "
1852                                             "from make_ready\n", oap,
1853                                             oap->oap_page, rc);
1854                                 break;
1855                         }
1856                 }
1857                 if (pos == NULL)
1858                         break;
1859                 /*
1860                  * Page submitted for IO has to be locked. Either by
1861                  * ->ap_make_ready() or by higher layers.
1862                  *
1863                  * XXX nikita: this assertion should be adjusted when lustre
1864                  * starts using PG_writeback for pages being written out.
1865                  */
1866 #if defined(__KERNEL__) && defined(__LINUX__)
1867                 LASSERT(PageLocked(oap->oap_page));
1868 #endif
1869                 /* If there is a gap at the start of this page, it can't merge
1870                  * with any previous page, so we'll hand the network a
1871                  * "fragmented" page array that it can't transfer in 1 RDMA */
1872                 if (page_count != 0 && oap->oap_page_off != 0)
1873                         break;
1874
1875                 /* take the page out of our book-keeping */
1876                 list_del_init(&oap->oap_pending_item);
1877                 lop_update_pending(cli, lop, cmd, -1);
1878                 list_del_init(&oap->oap_urgent_item);
1879
1880                 if (page_count == 0)
1881                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1882                                           (PTLRPC_MAX_BRW_SIZE - 1);
1883
1884                 /* ask the caller for the size of the io as the rpc leaves. */
1885                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1886                         oap->oap_count =
1887                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1888                 if (oap->oap_count <= 0) {
1889                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1890                                oap->oap_count);
1891                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1892                         continue;
1893                 }
1894
1895                 /* now put the page back in our accounting */
1896                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1897                 if (++page_count >= cli->cl_max_pages_per_rpc)
1898                         break;
1899
1900                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
1901                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
1902                  * have the same alignment as the initial writes that allocated
1903                  * extents on the server. */
1904                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
1905                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
1906                 if (ending_offset == 0)
1907                         break;
1908
1909                 /* If there is a gap at the end of this page, it can't merge
1910                  * with any subsequent pages, so we'll hand the network a
1911                  * "fragmented" page array that it can't transfer in 1 RDMA */
1912                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
1913                         break;
1914         }
1915
1916         osc_wake_cache_waiters(cli);
1917
1918         if (page_count == 0)
1919                 RETURN(0);
1920
1921         loi_list_maint(cli, loi);
1922
1923         client_obd_list_unlock(&cli->cl_loi_list_lock);
1924
1925         req = osc_build_req(cli, &rpc_list, page_count, cmd);
1926         if (IS_ERR(req)) {
1927                 /* this should happen rarely and is pretty bad, it makes the
1928                  * pending list not follow the dirty order */
1929                 client_obd_list_lock(&cli->cl_loi_list_lock);
1930                 list_for_each_safe(pos, tmp, &rpc_list) {
1931                         oap = list_entry(pos, struct osc_async_page,
1932                                          oap_rpc_item);
1933                         list_del_init(&oap->oap_rpc_item);
1934
1935                         /* queued sync pages can be torn down while the pages
1936                          * were between the pending list and the rpc */
1937                         if (oap->oap_interrupted) {
1938                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1939                                 osc_ap_completion(cli, NULL, oap, 0,
1940                                                   oap->oap_count);
1941                                 continue;
1942                         }
1943                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
1944                 }
1945                 loi_list_maint(cli, loi);
1946                 RETURN(PTR_ERR(req));
1947         }
1948
1949         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1950         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1951         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1952         list_splice(&rpc_list, &aa->aa_oaps);
1953         CFS_INIT_LIST_HEAD(&rpc_list);
1954
1955         if (cmd == OBD_BRW_READ) {
1956                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1957                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1958                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1959                                       starting_offset/CFS_PAGE_SIZE + 1);
1960         } else {
1961                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1962                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1963                                  cli->cl_w_in_flight);
1964                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1965                                       starting_offset/CFS_PAGE_SIZE + 1);
1966         }
1967
1968         client_obd_list_lock(&cli->cl_loi_list_lock);
1969
1970         if (cmd == OBD_BRW_READ)
1971                 cli->cl_r_in_flight++;
1972         else
1973                 cli->cl_w_in_flight++;
1974
1975         /* queued sync pages can be torn down while the pages
1976          * were between the pending list and the rpc */
1977         list_for_each(pos, &aa->aa_oaps) {
1978                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1979                 if (oap->oap_interrupted) {
1980                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1981                                oap, req);
1982                         ptlrpc_mark_interrupted(req);
1983                         break;
1984                 }
1985         }
1986
1987         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %dr/%dw in flight\n",
1988                         req, page_count, aa, cli->cl_r_in_flight,
1989                         cli->cl_w_in_flight);
1990
1991         oap->oap_request = ptlrpc_request_addref(req);
1992         req->rq_interpret_reply = brw_interpret_oap;
1993         ptlrpcd_add_req(req);
1994         RETURN(1);
1995 }
1996
1997 #define LOI_DEBUG(LOI, STR, args...)                                     \
1998         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
1999                !list_empty(&(LOI)->loi_cli_item),                        \
2000                (LOI)->loi_write_lop.lop_num_pending,                     \
2001                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2002                (LOI)->loi_read_lop.lop_num_pending,                      \
2003                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2004                args)                                                     \
2005
2006 /* This is called by osc_check_rpcs() to find which objects have pages that
2007  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2008 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2009 {
2010         ENTRY;
2011         /* first return all objects which we already know to have
2012          * pages ready to be stuffed into rpcs */
2013         if (!list_empty(&cli->cl_loi_ready_list))
2014                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2015                                   struct lov_oinfo, loi_cli_item));
2016
2017         /* then if we have cache waiters, return all objects with queued
2018          * writes.  This is especially important when many small files
2019          * have filled up the cache and not been fired into rpcs because
2020          * they don't pass the nr_pending/object threshhold */
2021         if (!list_empty(&cli->cl_cache_waiters) &&
2022             !list_empty(&cli->cl_loi_write_list))
2023                 RETURN(list_entry(cli->cl_loi_write_list.next,
2024                                   struct lov_oinfo, loi_write_item));
2025
2026         /* then return all queued objects when we have an invalid import
2027          * so that they get flushed */
2028         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2029                 if (!list_empty(&cli->cl_loi_write_list))
2030                         RETURN(list_entry(cli->cl_loi_write_list.next,
2031                                           struct lov_oinfo, loi_write_item));
2032                 if (!list_empty(&cli->cl_loi_read_list))
2033                         RETURN(list_entry(cli->cl_loi_read_list.next,
2034                                           struct lov_oinfo, loi_read_item));
2035         }
2036         RETURN(NULL);
2037 }
2038
2039 /* called with the loi list lock held */
2040 static void osc_check_rpcs(struct client_obd *cli)
2041 {
2042         struct lov_oinfo *loi;
2043         int rc = 0, race_counter = 0;
2044         ENTRY;
2045
2046         while ((loi = osc_next_loi(cli)) != NULL) {
2047                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2048
2049                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2050                         break;
2051
2052                 /* attempt some read/write balancing by alternating between
2053                  * reads and writes in an object.  The makes_rpc checks here
2054                  * would be redundant if we were getting read/write work items
2055                  * instead of objects.  we don't want send_oap_rpc to drain a
2056                  * partial read pending queue when we're given this object to
2057                  * do io on writes while there are cache waiters */
2058                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2059                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2060                                               &loi->loi_write_lop);
2061                         if (rc < 0)
2062                                 break;
2063                         if (rc > 0)
2064                                 race_counter = 0;
2065                         else
2066                                 race_counter++;
2067                 }
2068                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2069                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2070                                               &loi->loi_read_lop);
2071                         if (rc < 0)
2072                                 break;
2073                         if (rc > 0)
2074                                 race_counter = 0;
2075                         else
2076                                 race_counter++;
2077                 }
2078
2079                 /* attempt some inter-object balancing by issueing rpcs
2080                  * for each object in turn */
2081                 if (!list_empty(&loi->loi_cli_item))
2082                         list_del_init(&loi->loi_cli_item);
2083                 if (!list_empty(&loi->loi_write_item))
2084                         list_del_init(&loi->loi_write_item);
2085                 if (!list_empty(&loi->loi_read_item))
2086                         list_del_init(&loi->loi_read_item);
2087
2088                 loi_list_maint(cli, loi);
2089
2090                 /* send_oap_rpc fails with 0 when make_ready tells it to
2091                  * back off.  llite's make_ready does this when it tries
2092                  * to lock a page queued for write that is already locked.
2093                  * we want to try sending rpcs from many objects, but we
2094                  * don't want to spin failing with 0.  */
2095                 if (race_counter == 10)
2096                         break;
2097         }
2098         EXIT;
2099 }
2100
2101 /* we're trying to queue a page in the osc so we're subject to the
2102  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2103  * If the osc's queued pages are already at that limit, then we want to sleep
2104  * until there is space in the osc's queue for us.  We also may be waiting for
2105  * write credits from the OST if there are RPCs in flight that may return some
2106  * before we fall back to sync writes.
2107  *
2108  * We need this know our allocation was granted in the presence of signals */
2109 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2110 {
2111         int rc;
2112         ENTRY;
2113         client_obd_list_lock(&cli->cl_loi_list_lock);
2114         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2115         client_obd_list_unlock(&cli->cl_loi_list_lock);
2116         RETURN(rc);
2117 };
2118
2119 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2120  * grant or cache space. */
2121 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2122                            struct osc_async_page *oap)
2123 {
2124         struct osc_cache_waiter ocw;
2125         struct l_wait_info lwi = { 0 };
2126
2127         ENTRY;
2128         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
2129                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
2130                cli->cl_avail_grant);
2131
2132         /* force the caller to try sync io.  this can jump the list
2133          * of queued writes and create a discontiguous rpc stream */
2134         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2135             loi->loi_ar.ar_force_sync)
2136                 RETURN(-EDQUOT);
2137
2138         /* Hopefully normal case - cache space and write credits available */
2139         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2140             cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2141                 /* account for ourselves */
2142                 osc_consume_write_grant(cli, oap);
2143                 RETURN(0);
2144         }
2145
2146         /* Make sure that there are write rpcs in flight to wait for.  This
2147          * is a little silly as this object may not have any pending but
2148          * other objects sure might. */
2149         if (cli->cl_w_in_flight) {
2150                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2151                 cfs_waitq_init(&ocw.ocw_waitq);
2152                 ocw.ocw_oap = oap;
2153                 ocw.ocw_rc = 0;
2154
2155                 loi_list_maint(cli, loi);
2156                 osc_check_rpcs(cli);
2157                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2158
2159                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2160                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2161
2162                 client_obd_list_lock(&cli->cl_loi_list_lock);
2163                 if (!list_empty(&ocw.ocw_entry)) {
2164                         list_del(&ocw.ocw_entry);
2165                         RETURN(-EINTR);
2166                 }
2167                 RETURN(ocw.ocw_rc);
2168         }
2169
2170         RETURN(-EDQUOT);
2171 }
2172
2173 /* the companion to enter_cache, called when an oap is no longer part of the
2174  * dirty accounting.. so writeback completes or truncate happens before writing
2175  * starts.  must be called with the loi lock held. */
2176 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
2177                            int sent)
2178 {
2179         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
2180         ENTRY;
2181
2182         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
2183                 EXIT;
2184                 return;
2185         }
2186
2187         oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
2188         cli->cl_dirty -= CFS_PAGE_SIZE;
2189         if (!sent) {
2190                 cli->cl_lost_grant += CFS_PAGE_SIZE;
2191                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
2192                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
2193         } else if (CFS_PAGE_SIZE != blocksize && oap->oap_count != CFS_PAGE_SIZE) {
2194                 /* For short writes we shouldn't count parts of pages that
2195                  * span a whole block on the OST side, or our accounting goes
2196                  * wrong.  Should match the code in filter_grant_check. */
2197                 int offset = (oap->oap_obj_off +oap->oap_page_off) & ~CFS_PAGE_MASK;
2198                 int count = oap->oap_count + (offset & (blocksize - 1));
2199                 int end = (offset + oap->oap_count) & (blocksize - 1);
2200                 if (end)
2201                         count += blocksize - end;
2202
2203                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
2204                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
2205                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
2206                        cli->cl_avail_grant, cli->cl_dirty);
2207         }
2208
2209         EXIT;
2210 }
2211
2212 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2213                         struct lov_oinfo *loi, cfs_page_t *page,
2214                         obd_off offset, struct obd_async_page_ops *ops,
2215                         void *data, void **res)
2216 {
2217         struct osc_async_page *oap;
2218         ENTRY;
2219
2220         if (!page)
2221                 return size_round(sizeof(*oap));
2222
2223         oap = *res;
2224         oap->oap_magic = OAP_MAGIC;
2225         oap->oap_cli = &exp->exp_obd->u.cli;
2226         oap->oap_loi = loi;
2227
2228         oap->oap_caller_ops = ops;
2229         oap->oap_caller_data = data;
2230
2231         oap->oap_page = page;
2232         oap->oap_obj_off = offset;
2233
2234         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2235         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2236         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2237
2238         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2239
2240         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2241         RETURN(0);
2242 }
2243
2244 struct osc_async_page *oap_from_cookie(void *cookie)
2245 {
2246         struct osc_async_page *oap = cookie;
2247         if (oap->oap_magic != OAP_MAGIC)
2248                 return ERR_PTR(-EINVAL);
2249         return oap;
2250 };
2251
2252 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2253                               struct lov_oinfo *loi, void *cookie,
2254                               int cmd, obd_off off, int count,
2255                               obd_flag brw_flags, enum async_flags async_flags)
2256 {
2257         struct client_obd *cli = &exp->exp_obd->u.cli;
2258         struct osc_async_page *oap;
2259         int rc = 0;
2260         ENTRY;
2261
2262         oap = oap_from_cookie(cookie);
2263         if (IS_ERR(oap))
2264                 RETURN(PTR_ERR(oap));
2265
2266         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2267                 RETURN(-EIO);
2268
2269         if (!list_empty(&oap->oap_pending_item) ||
2270             !list_empty(&oap->oap_urgent_item) ||
2271             !list_empty(&oap->oap_rpc_item))
2272                 RETURN(-EBUSY);
2273
2274         /* check if the file's owner/group is over quota */
2275 #ifdef HAVE_QUOTA_SUPPORT
2276         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2277                 struct obd_async_page_ops *ops;
2278                 struct obdo *oa;
2279
2280                 oa = obdo_alloc();
2281                 if (oa == NULL)
2282                         RETURN(-ENOMEM);
2283
2284                 ops = oap->oap_caller_ops;
2285                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2286                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2287                     NO_QUOTA)
2288                         rc = -EDQUOT;
2289
2290                 obdo_free(oa);
2291                 if (rc)
2292                         RETURN(rc);
2293         }
2294 #endif
2295
2296         if (loi == NULL)
2297                 loi = &lsm->lsm_oinfo[0];
2298
2299         client_obd_list_lock(&cli->cl_loi_list_lock);
2300
2301         oap->oap_cmd = cmd;
2302         oap->oap_page_off = off;
2303         oap->oap_count = count;
2304         oap->oap_brw_flags = brw_flags;
2305         oap->oap_async_flags = async_flags;
2306
2307         if (cmd & OBD_BRW_WRITE) {
2308                 rc = osc_enter_cache(cli, loi, oap);
2309                 if (rc) {
2310                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2311                         RETURN(rc);
2312                 }
2313         }
2314
2315         osc_oap_to_pending(oap);
2316         loi_list_maint(cli, loi);
2317
2318         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2319                   cmd);
2320
2321         osc_check_rpcs(cli);
2322         client_obd_list_unlock(&cli->cl_loi_list_lock);
2323
2324         RETURN(0);
2325 }
2326
2327 /* aka (~was & now & flag), but this is more clear :) */
2328 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2329
2330 static int osc_set_async_flags(struct obd_export *exp,
2331                                struct lov_stripe_md *lsm,
2332                                struct lov_oinfo *loi, void *cookie,
2333                                obd_flag async_flags)
2334 {
2335         struct client_obd *cli = &exp->exp_obd->u.cli;
2336         struct loi_oap_pages *lop;
2337         struct osc_async_page *oap;
2338         int rc = 0;
2339         ENTRY;
2340
2341         oap = oap_from_cookie(cookie);
2342         if (IS_ERR(oap))
2343                 RETURN(PTR_ERR(oap));
2344
2345         /*
2346          * bug 7311: OST-side locking is only supported for liblustre for now
2347          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2348          * implementation has to handle case where OST-locked page was picked
2349          * up by, e.g., ->writepage().
2350          */
2351         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2352         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2353                                      * tread here. */
2354
2355         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2356                 RETURN(-EIO);
2357
2358         if (loi == NULL)
2359                 loi = &lsm->lsm_oinfo[0];
2360
2361         if (oap->oap_cmd & OBD_BRW_WRITE) {
2362                 lop = &loi->loi_write_lop;
2363         } else {
2364                 lop = &loi->loi_read_lop;
2365         }
2366
2367         client_obd_list_lock(&cli->cl_loi_list_lock);
2368
2369         if (list_empty(&oap->oap_pending_item))
2370                 GOTO(out, rc = -EINVAL);
2371
2372         if ((oap->oap_async_flags & async_flags) == async_flags)
2373                 GOTO(out, rc = 0);
2374
2375         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2376                 oap->oap_async_flags |= ASYNC_READY;
2377
2378         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2379                 if (list_empty(&oap->oap_rpc_item)) {
2380                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2381                         loi_list_maint(cli, loi);
2382                 }
2383         }
2384
2385         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2386                         oap->oap_async_flags);
2387 out:
2388         osc_check_rpcs(cli);
2389         client_obd_list_unlock(&cli->cl_loi_list_lock);
2390         RETURN(rc);
2391 }
2392
2393 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2394                              struct lov_oinfo *loi,
2395                              struct obd_io_group *oig, void *cookie,
2396                              int cmd, obd_off off, int count,
2397                              obd_flag brw_flags,
2398                              obd_flag async_flags)
2399 {
2400         struct client_obd *cli = &exp->exp_obd->u.cli;
2401         struct osc_async_page *oap;
2402         struct loi_oap_pages *lop;
2403         int rc = 0;
2404         ENTRY;
2405
2406         oap = oap_from_cookie(cookie);
2407         if (IS_ERR(oap))
2408                 RETURN(PTR_ERR(oap));
2409
2410         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2411                 RETURN(-EIO);
2412
2413         if (!list_empty(&oap->oap_pending_item) ||
2414             !list_empty(&oap->oap_urgent_item) ||
2415             !list_empty(&oap->oap_rpc_item))
2416                 RETURN(-EBUSY);
2417
2418         if (loi == NULL)
2419                 loi = &lsm->lsm_oinfo[0];
2420
2421         client_obd_list_lock(&cli->cl_loi_list_lock);
2422
2423         oap->oap_cmd = cmd;
2424         oap->oap_page_off = off;
2425         oap->oap_count = count;
2426         oap->oap_brw_flags = brw_flags;
2427         oap->oap_async_flags = async_flags;
2428
2429         if (cmd & OBD_BRW_WRITE)
2430                 lop = &loi->loi_write_lop;
2431         else
2432                 lop = &loi->loi_read_lop;
2433
2434         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2435         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2436                 oap->oap_oig = oig;
2437                 rc = oig_add_one(oig, &oap->oap_occ);
2438         }
2439
2440         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2441                   oap, oap->oap_page, rc);
2442
2443         client_obd_list_unlock(&cli->cl_loi_list_lock);
2444
2445         RETURN(rc);
2446 }
2447
2448 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2449                                  struct loi_oap_pages *lop, int cmd)
2450 {
2451         struct list_head *pos, *tmp;
2452         struct osc_async_page *oap;
2453
2454         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2455                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2456                 list_del(&oap->oap_pending_item);
2457                 osc_oap_to_pending(oap);
2458         }
2459         loi_list_maint(cli, loi);
2460 }
2461
2462 static int osc_trigger_group_io(struct obd_export *exp,
2463                                 struct lov_stripe_md *lsm,
2464                                 struct lov_oinfo *loi,
2465                                 struct obd_io_group *oig)
2466 {
2467         struct client_obd *cli = &exp->exp_obd->u.cli;
2468         ENTRY;
2469
2470         if (loi == NULL)
2471                 loi = &lsm->lsm_oinfo[0];
2472
2473         client_obd_list_lock(&cli->cl_loi_list_lock);
2474
2475         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2476         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2477
2478         osc_check_rpcs(cli);
2479         client_obd_list_unlock(&cli->cl_loi_list_lock);
2480
2481         RETURN(0);
2482 }
2483
2484 static int osc_teardown_async_page(struct obd_export *exp,
2485                                    struct lov_stripe_md *lsm,
2486                                    struct lov_oinfo *loi, void *cookie)
2487 {
2488         struct client_obd *cli = &exp->exp_obd->u.cli;
2489         struct loi_oap_pages *lop;
2490         struct osc_async_page *oap;
2491         int rc = 0;
2492         ENTRY;
2493
2494         oap = oap_from_cookie(cookie);
2495         if (IS_ERR(oap))
2496                 RETURN(PTR_ERR(oap));
2497
2498         if (loi == NULL)
2499                 loi = &lsm->lsm_oinfo[0];
2500
2501         if (oap->oap_cmd & OBD_BRW_WRITE) {
2502                 lop = &loi->loi_write_lop;
2503         } else {
2504                 lop = &loi->loi_read_lop;
2505         }
2506
2507         client_obd_list_lock(&cli->cl_loi_list_lock);
2508
2509         if (!list_empty(&oap->oap_rpc_item))
2510                 GOTO(out, rc = -EBUSY);
2511
2512         osc_exit_cache(cli, oap, 0);
2513         osc_wake_cache_waiters(cli);
2514
2515         if (!list_empty(&oap->oap_urgent_item)) {
2516                 list_del_init(&oap->oap_urgent_item);
2517                 oap->oap_async_flags &= ~ASYNC_URGENT;
2518         }
2519         if (!list_empty(&oap->oap_pending_item)) {
2520                 list_del_init(&oap->oap_pending_item);
2521                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2522         }
2523         loi_list_maint(cli, loi);
2524
2525         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2526 out:
2527         client_obd_list_unlock(&cli->cl_loi_list_lock);
2528         RETURN(rc);
2529 }
2530
2531 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2532                                     int flags)
2533 {
2534         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2535
2536         if (lock == NULL) {
2537                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2538                 return;
2539         }
2540         lock_res_and_lock(lock);
2541 #ifdef __KERNEL__
2542 #ifdef __LINUX__
2543         /* Liang XXX: Darwin and Winnt checking should be added */
2544         if (lock->l_ast_data && lock->l_ast_data != data) {
2545                 struct inode *new_inode = data;
2546                 struct inode *old_inode = lock->l_ast_data;
2547                 if (!(old_inode->i_state & I_FREEING))
2548                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2549                 LASSERTF(old_inode->i_state & I_FREEING,
2550                          "Found existing inode %p/%lu/%u state %lu in lock: "
2551                          "setting data to %p/%lu/%u\n", old_inode,
2552                          old_inode->i_ino, old_inode->i_generation,
2553                          old_inode->i_state,
2554                          new_inode, new_inode->i_ino, new_inode->i_generation);
2555         }
2556 #endif
2557 #endif
2558         lock->l_ast_data = data;
2559         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2560         unlock_res_and_lock(lock);
2561         LDLM_LOCK_PUT(lock);
2562 }
2563
2564 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2565                              ldlm_iterator_t replace, void *data)
2566 {
2567         struct ldlm_res_id res_id = { .name = {0} };
2568         struct obd_device *obd = class_exp2obd(exp);
2569
2570         res_id.name[0] = lsm->lsm_object_id;
2571         res_id.name[2] = lsm->lsm_object_gr;
2572
2573         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2574         return 0;
2575 }
2576
2577 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2578                             int intent, int rc)
2579 {
2580         ENTRY;
2581
2582         if (intent) {
2583                 /* The request was created before ldlm_cli_enqueue call. */
2584                 if (rc == ELDLM_LOCK_ABORTED) {
2585                         struct ldlm_reply *rep;
2586
2587                         /* swabbed by ldlm_cli_enqueue() */
2588                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2589                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2590                                              sizeof(*rep));
2591                         LASSERT(rep != NULL);
2592                         if (rep->lock_policy_res1)
2593                                 rc = rep->lock_policy_res1;
2594                 }
2595         }
2596
2597         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2598                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2599                        oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_size,
2600                        oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_blocks,
2601                        oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_mtime);
2602         }
2603
2604         /* Call the update callback. */
2605         rc = oinfo->oi_cb_up(oinfo, rc);
2606         RETURN(rc);
2607 }
2608
2609 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2610                                  struct osc_enqueue_args *aa, int rc)
2611 {
2612         int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2613         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2614         struct ldlm_lock *lock;
2615
2616         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2617          * be valid. */
2618         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2619
2620         /* Complete obtaining the lock procedure. */
2621         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2622                                    aa->oa_ei->ei_mode,
2623                                    &aa->oa_ei->ei_flags,
2624                                    &lsm->lsm_oinfo->loi_lvb,
2625                                    sizeof(lsm->lsm_oinfo->loi_lvb),
2626                                    lustre_swab_ost_lvb,
2627                                    aa->oa_oi->oi_lockh, rc);
2628
2629         /* Complete osc stuff. */
2630         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2631
2632         /* Release the lock for async request. */
2633         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2634                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2635
2636         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2637                  aa->oa_oi->oi_lockh, req, aa);
2638         LDLM_LOCK_PUT(lock);
2639         return rc;
2640 }
2641
2642 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2643  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2644  * other synchronous requests, however keeping some locks and trying to obtain
2645  * others may take a considerable amount of time in a case of ost failure; and
2646  * when other sync requests do not get released lock from a client, the client
2647  * is excluded from the cluster -- such scenarious make the life difficult, so
2648  * release locks just after they are obtained. */
2649 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2650                        struct obd_enqueue_info *einfo)
2651 {
2652         struct ldlm_res_id res_id = { .name = {0} };
2653         struct obd_device *obd = exp->exp_obd;
2654         struct ldlm_reply *rep;
2655         struct ptlrpc_request *req = NULL;
2656         int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2657         int rc;
2658         ENTRY;
2659
2660         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2661         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2662
2663         /* Filesystem lock extents are extended to page boundaries so that
2664          * dealing with the page cache is a little smoother.  */
2665         oinfo->oi_policy.l_extent.start -=
2666                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2667         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2668
2669         if (oinfo->oi_md->lsm_oinfo->loi_kms_valid == 0)
2670                 goto no_match;
2671
2672         /* Next, search for already existing extent locks that will cover us */
2673         rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags, &res_id,
2674                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2675                              oinfo->oi_lockh);
2676         if (rc == 1) {
2677                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2678                                         einfo->ei_flags);
2679                 if (intent) {
2680                         /* I would like to be able to ASSERT here that rss <=
2681                          * kms, but I can't, for reasons which are explained in
2682                          * lov_enqueue() */
2683                 }
2684
2685                 /* We already have a lock, and it's referenced */
2686                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2687                 
2688                 /* For async requests, decref the lock. */
2689                 if (einfo->ei_rqset)
2690                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2691
2692                 RETURN(ELDLM_OK);
2693         }
2694
2695         /* If we're trying to read, we also search for an existing PW lock.  The
2696          * VFS and page cache already protect us locally, so lots of readers/
2697          * writers can share a single PW lock.
2698          *
2699          * There are problems with conversion deadlocks, so instead of
2700          * converting a read lock to a write lock, we'll just enqueue a new
2701          * one.
2702          *
2703          * At some point we should cancel the read lock instead of making them
2704          * send us a blocking callback, but there are problems with canceling
2705          * locks out from other users right now, too. */
2706
2707         if (einfo->ei_mode == LCK_PR) {
2708                 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags,
2709                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2710                                      LCK_PW, oinfo->oi_lockh);
2711                 if (rc == 1) {
2712                         /* FIXME: This is not incredibly elegant, but it might
2713                          * be more elegant than adding another parameter to
2714                          * lock_match.  I want a second opinion. */
2715                         /* addref the lock only if not async requests. */
2716                         if (!einfo->ei_rqset)
2717                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2718                         osc_set_data_with_check(oinfo->oi_lockh,
2719                                                 einfo->ei_cbdata,
2720                                                 einfo->ei_flags);
2721                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2722                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2723                         RETURN(ELDLM_OK);
2724                 }
2725         }
2726
2727  no_match:
2728         if (intent) {
2729                 int size[3] = {
2730                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2731                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
2732
2733                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2734                                       LDLM_ENQUEUE, 2, size, NULL);
2735                 if (req == NULL)
2736                         RETURN(-ENOMEM);
2737
2738                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2739                 size[DLM_REPLY_REC_OFF] = 
2740                         sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb);
2741                 ptlrpc_req_set_repsize(req, 3, size);
2742         }
2743
2744         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2745         einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2746
2747         rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2748                               &oinfo->oi_policy, einfo->ei_mode,
2749                               &einfo->ei_flags, einfo->ei_cb_bl,
2750                               einfo->ei_cb_cp, einfo->ei_cb_gl,
2751                               einfo->ei_cbdata,
2752                               &oinfo->oi_md->lsm_oinfo->loi_lvb,
2753                               sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb),
2754                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2755                               einfo->ei_rqset ? 1 : 0);
2756         if (einfo->ei_rqset) {
2757                 if (!rc) {
2758                         struct osc_enqueue_args *aa;
2759                         LASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2760                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2761                         aa->oa_oi = oinfo;
2762                         aa->oa_ei = einfo;
2763                         aa->oa_exp = exp;
2764
2765                         req->rq_interpret_reply = osc_enqueue_interpret;
2766                         ptlrpc_set_add_req(einfo->ei_rqset, req);
2767                 } else if (intent) {
2768                         ptlrpc_req_finished(req);
2769                 }
2770                 RETURN(rc);
2771         }
2772
2773         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2774         if (intent)
2775                 ptlrpc_req_finished(req);
2776
2777         RETURN(rc);
2778 }
2779
2780 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2781                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2782                      int *flags, void *data, struct lustre_handle *lockh)
2783 {
2784         struct ldlm_res_id res_id = { .name = {0} };
2785         struct obd_device *obd = exp->exp_obd;
2786         int rc;
2787         ENTRY;
2788         
2789         res_id.name[0] = lsm->lsm_object_id;
2790         res_id.name[2] = lsm->lsm_object_gr;
2791
2792         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2793
2794         /* Filesystem lock extents are extended to page boundaries so that
2795          * dealing with the page cache is a little smoother */
2796         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2797         policy->l_extent.end |= ~CFS_PAGE_MASK;
2798
2799         /* Next, search for already existing extent locks that will cover us */
2800         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2801                              policy, mode, lockh);
2802         if (rc) {
2803                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2804                         osc_set_data_with_check(lockh, data, *flags);
2805                 RETURN(rc);
2806         }
2807         /* If we're trying to read, we also search for an existing PW lock.  The
2808          * VFS and page cache already protect us locally, so lots of readers/
2809          * writers can share a single PW lock. */
2810         if (mode == LCK_PR) {
2811                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2812                                      policy, LCK_PW, lockh);
2813                 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2814                         /* FIXME: This is not incredibly elegant, but it might
2815                          * be more elegant than adding another parameter to
2816                          * lock_match.  I want a second opinion. */
2817                         osc_set_data_with_check(lockh, data, *flags);
2818                         ldlm_lock_addref(lockh, LCK_PR);
2819                         ldlm_lock_decref(lockh, LCK_PW);
2820                 }
2821         }
2822         RETURN(rc);
2823 }
2824
2825 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2826                       __u32 mode, struct lustre_handle *lockh)
2827 {
2828         ENTRY;
2829
2830         if (unlikely(mode == LCK_GROUP))
2831                 ldlm_lock_decref_and_cancel(lockh, mode);
2832         else
2833                 ldlm_lock_decref(lockh, mode);
2834
2835         RETURN(0);
2836 }
2837
2838 static int osc_cancel_unused(struct obd_export *exp,
2839                              struct lov_stripe_md *lsm,
2840                              int flags, void *opaque)
2841 {
2842         struct obd_device *obd = class_exp2obd(exp);
2843         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2844
2845         if (lsm != NULL) {
2846                 res_id.name[0] = lsm->lsm_object_id;
2847                 res_id.name[2] = lsm->lsm_object_gr;
2848                 resp = &res_id;
2849         }
2850
2851         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, 
2852                                       opaque);
2853 }
2854
2855 static int osc_join_lru(struct obd_export *exp,
2856                         struct lov_stripe_md *lsm, int join)
2857 {
2858         struct obd_device *obd = class_exp2obd(exp);
2859         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2860
2861         if (lsm != NULL) {
2862                 res_id.name[0] = lsm->lsm_object_id;
2863                 res_id.name[2] = lsm->lsm_object_gr;
2864                 resp = &res_id;
2865         }
2866
2867         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
2868 }
2869
2870 static int osc_statfs_interpret(struct ptlrpc_request *req,
2871                                 struct osc_async_args *aa, int rc)
2872 {
2873         struct obd_statfs *msfs;
2874         ENTRY;
2875
2876         if (rc != 0)
2877                 GOTO(out, rc);
2878
2879         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2880                                   lustre_swab_obd_statfs);
2881         if (msfs == NULL) {
2882                 CERROR("Can't unpack obd_statfs\n");
2883                 GOTO(out, rc = -EPROTO);
2884         }
2885
2886         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2887 out:
2888         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2889         RETURN(rc);
2890 }
2891
2892 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2893                             __u64 max_age, struct ptlrpc_request_set *rqset)
2894 {
2895         struct ptlrpc_request *req;
2896         struct osc_async_args *aa;
2897         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2898         ENTRY;
2899
2900         /* We could possibly pass max_age in the request (as an absolute
2901          * timestamp or a "seconds.usec ago") so the target can avoid doing
2902          * extra calls into the filesystem if that isn't necessary (e.g.
2903          * during mount that would help a bit).  Having relative timestamps
2904          * is not so great if request processing is slow, while absolute
2905          * timestamps are not ideal because they need time synchronization. */
2906         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2907                               OST_STATFS, 1, NULL, NULL);
2908         if (!req)
2909                 RETURN(-ENOMEM);
2910
2911         ptlrpc_req_set_repsize(req, 2, size);
2912         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2913
2914         req->rq_interpret_reply = osc_statfs_interpret;
2915         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
2916         aa = (struct osc_async_args *)&req->rq_async_args;
2917         aa->aa_oi = oinfo;
2918
2919         ptlrpc_set_add_req(rqset, req);
2920         RETURN(0);
2921 }
2922
2923 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2924                       __u64 max_age)
2925 {
2926         struct obd_statfs *msfs;
2927         struct ptlrpc_request *req;
2928         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2929         ENTRY;
2930
2931         /* We could possibly pass max_age in the request (as an absolute
2932          * timestamp or a "seconds.usec ago") so the target can avoid doing
2933          * extra calls into the filesystem if that isn't necessary (e.g.
2934          * during mount that would help a bit).  Having relative timestamps
2935          * is not so great if request processing is slow, while absolute
2936          * timestamps are not ideal because they need time synchronization. */
2937         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2938                               OST_STATFS, 1, NULL, NULL);
2939         if (!req)
2940                 RETURN(-ENOMEM);
2941
2942         ptlrpc_req_set_repsize(req, 2, size);
2943         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2944
2945         rc = ptlrpc_queue_wait(req);
2946         if (rc)
2947                 GOTO(out, rc);
2948
2949         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2950                                   lustre_swab_obd_statfs);
2951         if (msfs == NULL) {
2952                 CERROR("Can't unpack obd_statfs\n");
2953                 GOTO(out, rc = -EPROTO);
2954         }
2955
2956         memcpy(osfs, msfs, sizeof(*osfs));
2957
2958         EXIT;
2959  out:
2960         ptlrpc_req_finished(req);
2961         return rc;
2962 }
2963
2964 /* Retrieve object striping information.
2965  *
2966  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2967  * the maximum number of OST indices which will fit in the user buffer.
2968  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2969  */
2970 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2971 {
2972         struct lov_user_md lum, *lumk;
2973         int rc = 0, lum_size;
2974         ENTRY;
2975
2976         if (!lsm)
2977                 RETURN(-ENODATA);
2978
2979         if (copy_from_user(&lum, lump, sizeof(lum)))
2980                 RETURN(-EFAULT);
2981
2982         if (lum.lmm_magic != LOV_USER_MAGIC)
2983                 RETURN(-EINVAL);
2984
2985         if (lum.lmm_stripe_count > 0) {
2986                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2987                 OBD_ALLOC(lumk, lum_size);
2988                 if (!lumk)
2989                         RETURN(-ENOMEM);
2990
2991                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2992                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
2993         } else {
2994                 lum_size = sizeof(lum);
2995                 lumk = &lum;
2996         }
2997
2998         lumk->lmm_object_id = lsm->lsm_object_id;
2999         lumk->lmm_object_gr = lsm->lsm_object_gr;
3000         lumk->lmm_stripe_count = 1;
3001
3002         if (copy_to_user(lump, lumk, lum_size))
3003                 rc = -EFAULT;
3004
3005         if (lumk != &lum)
3006                 OBD_FREE(lumk, lum_size);
3007
3008         RETURN(rc);
3009 }
3010
3011
3012 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3013                          void *karg, void *uarg)
3014 {
3015         struct obd_device *obd = exp->exp_obd;
3016         struct obd_ioctl_data *data = karg;
3017         int err = 0;
3018         ENTRY;
3019
3020 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3021         MOD_INC_USE_COUNT;
3022 #else
3023         if (!try_module_get(THIS_MODULE)) {
3024                 CERROR("Can't get module. Is it alive?");
3025                 return -EINVAL;
3026         }
3027 #endif
3028         switch (cmd) {
3029         case OBD_IOC_LOV_GET_CONFIG: {
3030                 char *buf;
3031                 struct lov_desc *desc;
3032                 struct obd_uuid uuid;
3033
3034                 buf = NULL;
3035                 len = 0;
3036                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3037                         GOTO(out, err = -EINVAL);
3038
3039                 data = (struct obd_ioctl_data *)buf;
3040
3041                 if (sizeof(*desc) > data->ioc_inllen1) {
3042                         obd_ioctl_freedata(buf, len);
3043                         GOTO(out, err = -EINVAL);
3044                 }
3045
3046                 if (data->ioc_inllen2 < sizeof(uuid)) {
3047                         obd_ioctl_freedata(buf, len);
3048                         GOTO(out, err = -EINVAL);
3049                 }
3050
3051                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3052                 desc->ld_tgt_count = 1;
3053                 desc->ld_active_tgt_count = 1;
3054                 desc->ld_default_stripe_count = 1;
3055                 desc->ld_default_stripe_size = 0;
3056                 desc->ld_default_stripe_offset = 0;
3057                 desc->ld_pattern = 0;
3058                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3059
3060                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3061
3062                 err = copy_to_user((void *)uarg, buf, len);
3063                 if (err)
3064                         err = -EFAULT;
3065                 obd_ioctl_freedata(buf, len);
3066                 GOTO(out, err);
3067         }
3068         case LL_IOC_LOV_SETSTRIPE:
3069                 err = obd_alloc_memmd(exp, karg);
3070                 if (err > 0)
3071                         err = 0;
3072                 GOTO(out, err);
3073         case LL_IOC_LOV_GETSTRIPE:
3074                 err = osc_getstripe(karg, uarg);
3075                 GOTO(out, err);
3076         case OBD_IOC_CLIENT_RECOVER:
3077                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3078                                             data->ioc_inlbuf1);
3079                 if (err > 0)
3080                         err = 0;
3081                 GOTO(out, err);
3082         case IOC_OSC_SET_ACTIVE:
3083                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3084                                                data->ioc_offset);
3085                 GOTO(out, err);
3086         case OBD_IOC_POLL_QUOTACHECK:
3087                 err = lquota_poll_check(quota_interface, exp,
3088                                         (struct if_quotacheck *)karg);
3089                 GOTO(out, err);
3090         default:
3091                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3092                        cmd, cfs_curproc_comm());
3093                 GOTO(out, err = -ENOTTY);
3094         }
3095 out:
3096 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3097         MOD_DEC_USE_COUNT;
3098 #else
3099         module_put(THIS_MODULE);
3100 #endif
3101         return err;
3102 }
3103
3104 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3105                         void *key, __u32 *vallen, void *val)
3106 {
3107         ENTRY;
3108         if (!vallen || !val)
3109                 RETURN(-EFAULT);
3110
3111         if (keylen > strlen("lock_to_stripe") &&
3112             strcmp(key, "lock_to_stripe") == 0) {
3113                 __u32 *stripe = val;
3114                 *vallen = sizeof(*stripe);
3115                 *stripe = 0;
3116                 RETURN(0);
3117         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3118                 struct ptlrpc_request *req;
3119                 obd_id *reply;
3120                 char *bufs[2] = { NULL, key };
3121                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3122
3123                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3124                                       OST_GET_INFO, 2, size, bufs);
3125                 if (req == NULL)
3126                         RETURN(-ENOMEM);
3127
3128                 size[REPLY_REC_OFF] = *vallen;
3129                 ptlrpc_req_set_repsize(req, 2, size);
3130                 rc = ptlrpc_queue_wait(req);
3131                 if (rc)
3132                         GOTO(out, rc);
3133
3134                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3135                                            lustre_swab_ost_last_id);
3136                 if (reply == NULL) {
3137                         CERROR("Can't unpack OST last ID\n");
3138                         GOTO(out, rc = -EPROTO);
3139                 }
3140                 *((obd_id *)val) = *reply;
3141         out:
3142                 ptlrpc_req_finished(req);
3143                 RETURN(rc);
3144         }
3145         RETURN(-EINVAL);
3146 }
3147
3148 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3149                                           void *aa, int rc)
3150 {
3151         struct llog_ctxt *ctxt;
3152         struct obd_import *imp = req->rq_import;
3153         ENTRY;
3154
3155         if (rc != 0)
3156                 RETURN(rc);
3157
3158         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3159         if (ctxt) {
3160                 if (rc == 0)
3161                         rc = llog_initiator_connect(ctxt);
3162                 else
3163                         CERROR("cannot establish connection for "
3164                                "ctxt %p: %d\n", ctxt, rc);
3165         }
3166
3167         imp->imp_server_timeout = 1;
3168         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3169         imp->imp_pingable = 1;
3170
3171         RETURN(rc);
3172 }
3173
3174 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3175                               void *key, obd_count vallen, void *val,
3176                               struct ptlrpc_request_set *set)
3177 {
3178         struct ptlrpc_request *req;
3179         struct obd_device  *obd = exp->exp_obd;
3180         struct obd_import *imp = class_exp2cliimp(exp);
3181         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3182         char *bufs[3] = { NULL, key, val };
3183         ENTRY;
3184
3185         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3186
3187         if (KEY_IS(KEY_NEXT_ID)) {
3188                 if (vallen != sizeof(obd_id))
3189                         RETURN(-EINVAL);
3190                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3191                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3192                        exp->exp_obd->obd_name,
3193                        obd->u.cli.cl_oscc.oscc_next_id);
3194
3195                 RETURN(0);
3196         }
3197
3198         if (KEY_IS("unlinked")) {
3199                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3200                 spin_lock(&oscc->oscc_lock);
3201                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3202                 spin_unlock(&oscc->oscc_lock);
3203                 RETURN(0);
3204         }
3205
3206         if (KEY_IS(KEY_INIT_RECOV)) {
3207                 if (vallen != sizeof(int))
3208                         RETURN(-EINVAL);
3209                 imp->imp_initial_recov = *(int *)val;
3210                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3211                        exp->exp_obd->obd_name,
3212                        imp->imp_initial_recov);
3213                 RETURN(0);
3214         }
3215
3216         if (KEY_IS("checksum")) {
3217                 if (vallen != sizeof(int))
3218                         RETURN(-EINVAL);
3219                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3220                 RETURN(0);
3221         }
3222
3223         if (KEY_IS(KEY_FLUSH_CTX)) {
3224                 sptlrpc_import_flush_my_ctx(imp);
3225                 RETURN(0);
3226         }
3227
3228         if (!set)
3229                 RETURN(-EINVAL);
3230
3231         /* We pass all other commands directly to OST. Since nobody calls osc
3232            methods directly and everybody is supposed to go through LOV, we
3233            assume lov checked invalid values for us.
3234            The only recognised values so far are evict_by_nid and mds_conn.
3235            Even if something bad goes through, we'd get a -EINVAL from OST
3236            anyway. */
3237
3238         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3239                               bufs);
3240         if (req == NULL)
3241                 RETURN(-ENOMEM);
3242
3243         if (KEY_IS("mds_conn")) {
3244                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3245
3246                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3247                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3248                 LASSERT(oscc->oscc_oa.o_gr > 0);
3249                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3250         }
3251
3252         ptlrpc_req_set_repsize(req, 1, NULL);
3253         ptlrpc_set_add_req(set, req);
3254         ptlrpc_check_set(set);
3255
3256         RETURN(0);
3257 }
3258
3259
3260 static struct llog_operations osc_size_repl_logops = {
3261         lop_cancel: llog_obd_repl_cancel
3262 };
3263
3264 static struct llog_operations osc_mds_ost_orig_logops;
3265 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
3266                          struct obd_device *tgt, int count, 
3267                          struct llog_catid *catid, struct obd_uuid *uuid)
3268 {
3269         int rc;
3270         ENTRY;
3271
3272         spin_lock(&obd->obd_dev_lock);
3273         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3274                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3275                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3276                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3277                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3278                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3279         }
3280         spin_unlock(&obd->obd_dev_lock);
3281
3282         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3283                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3284         if (rc) {
3285                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3286                 GOTO (out, rc);
3287         }
3288
3289         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3290                         &osc_size_repl_logops);
3291         if (rc) 
3292                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3293 out:
3294         if (rc) {
3295                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3296                        obd->obd_name, tgt->obd_name, count, catid, rc);
3297                 CERROR("logid "LPX64":0x%x\n",
3298                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3299         }
3300         RETURN(rc);
3301 }
3302
3303 static int osc_llog_finish(struct obd_device *obd, int count)
3304 {
3305         struct llog_ctxt *ctxt;
3306         int rc = 0, rc2 = 0;
3307         ENTRY;
3308
3309         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3310         if (ctxt)
3311                 rc = llog_cleanup(ctxt);
3312
3313         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3314         if (ctxt)
3315                 rc2 = llog_cleanup(ctxt);
3316         if (!rc)
3317                 rc = rc2;
3318
3319         RETURN(rc);
3320 }
3321
3322 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3323                          struct obd_uuid *cluuid,
3324                          struct obd_connect_data *data)
3325 {
3326         struct client_obd *cli = &obd->u.cli;
3327
3328         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3329                 long lost_grant;
3330
3331                 client_obd_list_lock(&cli->cl_loi_list_lock);
3332                 data->ocd_grant = cli->cl_avail_grant ?:
3333                                 2 * cli->cl_max_pages_per_rpc << PAGE_SHIFT;
3334                 lost_grant = cli->cl_lost_grant;
3335                 cli->cl_lost_grant = 0;
3336                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3337
3338                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3339                        "cl_lost_grant: %ld\n", data->ocd_grant,
3340                        cli->cl_avail_grant, lost_grant);
3341                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3342                        " ocd_grant: %d\n", data->ocd_connect_flags,
3343                        data->ocd_version, data->ocd_grant);
3344         }
3345
3346         RETURN(0);
3347 }
3348
3349 static int osc_disconnect(struct obd_export *exp)
3350 {
3351         struct obd_device *obd = class_exp2obd(exp);
3352         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3353         int rc;
3354
3355         if (obd->u.cli.cl_conn_count == 1)
3356                 /* flush any remaining cancel messages out to the target */
3357                 llog_sync(ctxt, exp);
3358
3359         rc = client_disconnect_export(exp);
3360         return rc;
3361 }
3362
3363 static int osc_import_event(struct obd_device *obd,
3364                             struct obd_import *imp,
3365                             enum obd_import_event event)
3366 {
3367         struct client_obd *cli;
3368         int rc = 0;
3369
3370         ENTRY;
3371         LASSERT(imp->imp_obd == obd);
3372
3373         switch (event) {
3374         case IMP_EVENT_DISCON: {
3375                 /* Only do this on the MDS OSC's */
3376                 if (imp->imp_server_timeout) {
3377                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3378
3379                         spin_lock(&oscc->oscc_lock);
3380                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3381                         spin_unlock(&oscc->oscc_lock);
3382                 }
3383
3384                 break;
3385         }
3386         case IMP_EVENT_INACTIVE: {
3387                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3388                 break;
3389         }
3390         case IMP_EVENT_INVALIDATE: {
3391                 struct ldlm_namespace *ns = obd->obd_namespace;
3392
3393                 /* Reset grants */
3394                 cli = &obd->u.cli;
3395                 client_obd_list_lock(&cli->cl_loi_list_lock);
3396                 cli->cl_avail_grant = 0;
3397                 cli->cl_lost_grant = 0;
3398                 /* all pages go to failing rpcs due to the invalid import */
3399                 osc_check_rpcs(cli);
3400                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3401
3402                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3403
3404                 break;
3405         }
3406         case IMP_EVENT_ACTIVE: {
3407                 /* Only do this on the MDS OSC's */
3408                 if (imp->imp_server_timeout) {
3409                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3410
3411                         spin_lock(&oscc->oscc_lock);
3412                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3413                         spin_unlock(&oscc->oscc_lock);
3414                 }
3415                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3416                 break;
3417         }
3418         case IMP_EVENT_OCD: {
3419                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3420
3421                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3422                         osc_init_grant(&obd->u.cli, ocd);
3423
3424                 /* See bug 7198 */
3425                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3426                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3427
3428                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3429                 break;
3430         }
3431         default:
3432                 CERROR("Unknown import event %d\n", event);
3433                 LBUG();
3434         }
3435         RETURN(rc);
3436 }
3437
3438 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3439 {
3440         int rc;
3441         ENTRY;
3442
3443         ENTRY;
3444         rc = ptlrpcd_addref();
3445         if (rc)
3446                 RETURN(rc);
3447
3448         rc = client_obd_setup(obd, lcfg);
3449         if (rc) {
3450                 ptlrpcd_decref();
3451         } else {
3452                 struct lprocfs_static_vars lvars;
3453                 struct client_obd *cli = &obd->u.cli;
3454
3455                 lprocfs_init_vars(osc, &lvars);
3456                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3457                         lproc_osc_attach_seqstat(obd);
3458                         ptlrpc_lprocfs_register_obd(obd);
3459                 }
3460
3461                 oscc_init(obd);
3462                 /* We need to allocate a few requests more, because
3463                    brw_interpret_oap tries to create new requests before freeing
3464                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3465                    reserved, but I afraid that might be too much wasted RAM
3466                    in fact, so 2 is just my guess and still should work. */
3467                 cli->cl_import->imp_rq_pool =
3468                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3469                                             OST_MAXREQSIZE,
3470                                             ptlrpc_add_rqs_to_pool);
3471         }
3472
3473         RETURN(rc);
3474 }
3475
3476 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3477 {
3478         int rc = 0;
3479         ENTRY;
3480
3481         switch (stage) {
3482         case OBD_CLEANUP_EARLY: {
3483                 struct obd_import *imp;
3484                 imp = obd->u.cli.cl_import;
3485                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3486                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3487                 ptlrpc_deactivate_import(imp);
3488                 break;
3489         }
3490         case OBD_CLEANUP_EXPORTS:
3491                 break;
3492         case OBD_CLEANUP_SELF_EXP:
3493                 rc = obd_llog_finish(obd, 0);
3494                 if (rc != 0)
3495                         CERROR("failed to cleanup llogging subsystems\n");
3496                 break;
3497         case OBD_CLEANUP_OBD:
3498                 break;
3499         }
3500         RETURN(rc);
3501 }
3502
3503 int osc_cleanup(struct obd_device *obd)
3504 {
3505         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3506         int rc;
3507
3508         ENTRY;
3509         ptlrpc_lprocfs_unregister_obd(obd);
3510         lprocfs_obd_cleanup(obd);
3511
3512         spin_lock(&oscc->oscc_lock);
3513         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3514         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3515         spin_unlock(&oscc->oscc_lock);
3516
3517         /* free memory of osc quota cache */
3518         lquota_cleanup(quota_interface, obd);
3519
3520         rc = client_obd_cleanup(obd);
3521
3522         ptlrpcd_decref();
3523         RETURN(rc);
3524 }
3525
3526 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3527 {
3528         struct lustre_cfg *lcfg = buf;
3529         struct lprocfs_static_vars lvars;
3530         int rc = 0;
3531
3532         lprocfs_init_vars(osc, &lvars);
3533
3534         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3535         return(rc);
3536 }
3537
3538 struct obd_ops osc_obd_ops = {
3539         .o_owner                = THIS_MODULE,
3540         .o_setup                = osc_setup,
3541         .o_precleanup           = osc_precleanup,
3542         .o_cleanup              = osc_cleanup,
3543         .o_add_conn             = client_import_add_conn,
3544         .o_del_conn             = client_import_del_conn,
3545         .o_connect              = client_connect_import,
3546         .o_reconnect            = osc_reconnect,
3547         .o_disconnect           = osc_disconnect,
3548         .o_statfs               = osc_statfs,
3549         .o_statfs_async         = osc_statfs_async,
3550         .o_packmd               = osc_packmd,
3551         .o_unpackmd             = osc_unpackmd,
3552         .o_create               = osc_create,
3553         .o_destroy              = osc_destroy,
3554         .o_getattr              = osc_getattr,
3555         .o_getattr_async        = osc_getattr_async,
3556         .o_setattr              = osc_setattr,
3557         .o_setattr_async        = osc_setattr_async,
3558         .o_brw                  = osc_brw,
3559         .o_brw_async            = osc_brw_async,
3560         .o_prep_async_page      = osc_prep_async_page,
3561         .o_queue_async_io       = osc_queue_async_io,
3562         .o_set_async_flags      = osc_set_async_flags,
3563         .o_queue_group_io       = osc_queue_group_io,
3564         .o_trigger_group_io     = osc_trigger_group_io,
3565         .o_teardown_async_page  = osc_teardown_async_page,
3566         .o_punch                = osc_punch,
3567         .o_sync                 = osc_sync,
3568         .o_enqueue              = osc_enqueue,
3569         .o_match                = osc_match,
3570         .o_change_cbdata        = osc_change_cbdata,
3571         .o_cancel               = osc_cancel,
3572         .o_cancel_unused        = osc_cancel_unused,
3573         .o_join_lru             = osc_join_lru,
3574         .o_iocontrol            = osc_iocontrol,
3575         .o_get_info             = osc_get_info,
3576         .o_set_info_async       = osc_set_info_async,
3577         .o_import_event         = osc_import_event,
3578         .o_llog_init            = osc_llog_init,
3579         .o_llog_finish          = osc_llog_finish,
3580         .o_process_config       = osc_process_config,
3581 };
3582
3583 extern quota_interface_t osc_quota_interface;
3584
3585 int __init osc_init(void)
3586 {
3587         struct lprocfs_static_vars lvars;
3588         int rc;
3589         ENTRY;
3590
3591         lprocfs_init_vars(osc, &lvars);
3592
3593         request_module("lquota");
3594         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3595         lquota_init(quota_interface);
3596         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3597
3598         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3599                                  LUSTRE_OSC_NAME, NULL);
3600         if (rc) {
3601                 if (quota_interface)
3602                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3603                 RETURN(rc);
3604         }
3605
3606         RETURN(rc);
3607 }
3608
3609 #ifdef __KERNEL__
3610 static void /*__exit*/ osc_exit(void)
3611 {
3612         lquota_exit(quota_interface);
3613         if (quota_interface)
3614                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3615
3616         class_unregister_type(LUSTRE_OSC_NAME);
3617 }
3618
3619 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3620 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3621 MODULE_LICENSE("GPL");
3622
3623 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3624 #endif