Whamcloud - gitweb
Branch:HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
95         }
96
97         RETURN(lmm_size);
98 }
99
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102                         struct lov_mds_md *lmm, int lmm_bytes)
103 {
104         int lsm_size;
105         ENTRY;
106
107         if (lmm != NULL) {
108                 if (lmm_bytes < sizeof (*lmm)) {
109                         CERROR("lov_mds_md too small: %d, need %d\n",
110                                lmm_bytes, (int)sizeof(*lmm));
111                         RETURN(-EINVAL);
112                 }
113                 /* XXX LOV_MAGIC etc check? */
114
115                 if (lmm->lmm_object_id == 0) {
116                         CERROR("lov_mds_md: zero lmm_object_id\n");
117                         RETURN(-EINVAL);
118                 }
119         }
120
121         lsm_size = lov_stripe_md_size(1);
122         if (lsmp == NULL)
123                 RETURN(lsm_size);
124
125         if (*lsmp != NULL && lmm == NULL) {
126                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127                 OBD_FREE(*lsmp, lsm_size);
128                 *lsmp = NULL;
129                 RETURN(0);
130         }
131
132         if (*lsmp == NULL) {
133                 OBD_ALLOC(*lsmp, lsm_size);
134                 if (*lsmp == NULL)
135                         RETURN(-ENOMEM);
136                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138                         OBD_FREE(*lsmp, lsm_size);
139                         RETURN(-ENOMEM);
140                 }
141                 loi_init((*lsmp)->lsm_oinfo[0]);
142         }
143
144         if (lmm != NULL) {
145                 /* XXX zero *lsmp? */
146                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147                 LASSERT((*lsmp)->lsm_object_id);
148         }
149
150         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
151
152         RETURN(lsm_size);
153 }
154
155 static int osc_getattr_interpret(struct ptlrpc_request *req,
156                                  struct osc_async_args *aa, int rc)
157 {
158         struct ost_body *body;
159         ENTRY;
160
161         if (rc != 0)
162                 GOTO(out, rc);
163
164         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
165                                   lustre_swab_ost_body);
166         if (body) {
167                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
168                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
169
170                 /* This should really be sent by the OST */
171                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
172                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
173         } else {
174                 CERROR("can't unpack ost_body\n");
175                 rc = -EPROTO;
176                 aa->aa_oi->oi_oa->o_valid = 0;
177         }
178 out:
179         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
180         RETURN(rc);
181 }
182
183 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
184                              struct ptlrpc_request_set *set)
185 {
186         struct ptlrpc_request *req;
187         struct ost_body *body;
188         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
189         struct osc_async_args *aa;
190         ENTRY;
191
192         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
193                               OST_GETATTR, 2, size,NULL);
194         if (!req)
195                 RETURN(-ENOMEM);
196
197         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
198         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
199
200         ptlrpc_req_set_repsize(req, 2, size);
201         req->rq_interpret_reply = osc_getattr_interpret;
202
203         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
204         aa = (struct osc_async_args *)&req->rq_async_args;
205         aa->aa_oi = oinfo;
206
207         ptlrpc_set_add_req(set, req);
208         RETURN (0);
209 }
210
211 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         ENTRY;
217
218         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
219                               OST_GETATTR, 2, size, NULL);
220         if (!req)
221                 RETURN(-ENOMEM);
222
223         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
224         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
225
226         ptlrpc_req_set_repsize(req, 2, size);
227
228         rc = ptlrpc_queue_wait(req);
229         if (rc) {
230                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
231                 GOTO(out, rc);
232         }
233
234         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
235                                   lustre_swab_ost_body);
236         if (body == NULL) {
237                 CERROR ("can't unpack ost_body\n");
238                 GOTO (out, rc = -EPROTO);
239         }
240
241         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
242         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
243
244         /* This should really be sent by the OST */
245         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
246         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
247
248         EXIT;
249  out:
250         ptlrpc_req_finished(req);
251         return rc;
252 }
253
254 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
255                        struct obd_trans_info *oti)
256 {
257         struct ptlrpc_request *req;
258         struct ost_body *body;
259         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
260         ENTRY;
261
262         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
263                               OST_SETATTR, 2, size, NULL);
264         if (!req)
265                 RETURN(-ENOMEM);
266
267         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
268         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
269
270         ptlrpc_req_set_repsize(req, 2, size);
271
272         rc = ptlrpc_queue_wait(req);
273         if (rc)
274                 GOTO(out, rc);
275
276         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
277                                   lustre_swab_ost_body);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
282
283         EXIT;
284 out:
285         ptlrpc_req_finished(req);
286         RETURN(rc);
287 }
288
289 static int osc_setattr_interpret(struct ptlrpc_request *req,
290                                  struct osc_async_args *aa, int rc)
291 {
292         struct ost_body *body;
293         ENTRY;
294
295         if (rc != 0)
296                 GOTO(out, rc);
297
298         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
299                                   lustre_swab_ost_body);
300         if (body == NULL) {
301                 CERROR("can't unpack ost_body\n");
302                 GOTO(out, rc = -EPROTO);
303         }
304
305         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
306 out:
307         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
308         RETURN(rc);
309 }
310
311 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
312                              struct obd_trans_info *oti,
313                              struct ptlrpc_request_set *rqset)
314 {
315         struct ptlrpc_request *req;
316         struct ost_body *body;
317         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
318         struct osc_async_args *aa;
319         ENTRY;
320
321         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
322                               OST_SETATTR, 2, size, NULL);
323         if (!req)
324                 RETURN(-ENOMEM);
325
326         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
327
328         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
329                 LASSERT(oti);
330                 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
331                        sizeof(*oti->oti_logcookies));
332         }
333
334         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
335         ptlrpc_req_set_repsize(req, 2, size);
336         /* do mds to ost setattr asynchronouly */
337         if (!rqset) {
338                 /* Do not wait for response. */
339                 ptlrpcd_add_req(req);
340         } else {
341                 req->rq_interpret_reply = osc_setattr_interpret;
342
343                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
344                 aa = (struct osc_async_args *)&req->rq_async_args;
345                 aa->aa_oi = oinfo;
346
347                 ptlrpc_set_add_req(rqset, req);
348         }
349
350         RETURN(0);
351 }
352
353 int osc_real_create(struct obd_export *exp, struct obdo *oa,
354                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
355 {
356         struct ptlrpc_request *req;
357         struct ost_body *body;
358         struct lov_stripe_md *lsm;
359         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
360         ENTRY;
361
362         LASSERT(oa);
363         LASSERT(ea);
364
365         lsm = *ea;
366         if (!lsm) {
367                 rc = obd_alloc_memmd(exp, &lsm);
368                 if (rc < 0)
369                         RETURN(rc);
370         }
371
372         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
373                               OST_CREATE, 2, size, NULL);
374         if (!req)
375                 GOTO(out, rc = -ENOMEM);
376
377         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
378         memcpy(&body->oa, oa, sizeof(body->oa));
379
380         ptlrpc_req_set_repsize(req, 2, size);
381         if (oa->o_valid & OBD_MD_FLINLINE) {
382                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
383                         oa->o_flags == OBD_FL_DELORPHAN);
384                 DEBUG_REQ(D_HA, req,
385                           "delorphan from OST integration");
386                 /* Don't resend the delorphan req */
387                 req->rq_no_resend = req->rq_no_delay = 1;
388         }
389
390         rc = ptlrpc_queue_wait(req);
391         if (rc)
392                 GOTO(out_req, rc);
393
394         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
395                                   lustre_swab_ost_body);
396         if (body == NULL) {
397                 CERROR ("can't unpack ost_body\n");
398                 GOTO (out_req, rc = -EPROTO);
399         }
400
401         memcpy(oa, &body->oa, sizeof(*oa));
402
403         /* This should really be sent by the OST */
404         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
405         oa->o_valid |= OBD_MD_FLBLKSZ;
406
407         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
408          * have valid lsm_oinfo data structs, so don't go touching that.
409          * This needs to be fixed in a big way.
410          */
411         lsm->lsm_object_id = oa->o_id;
412         *ea = lsm;
413
414         if (oti != NULL) {
415                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
416
417                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
418                         if (!oti->oti_logcookies)
419                                 oti_alloc_cookies(oti, 1);
420                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
421                                sizeof(oti->oti_onecookie));
422                 }
423         }
424
425         CDEBUG(D_HA, "transno: "LPD64"\n",
426                lustre_msg_get_transno(req->rq_repmsg));
427         EXIT;
428 out_req:
429         ptlrpc_req_finished(req);
430 out:
431         if (rc && !*ea)
432                 obd_free_memmd(exp, &lsm);
433         return rc;
434 }
435
436 static int osc_punch_interpret(struct ptlrpc_request *req,
437                                struct osc_async_args *aa, int rc)
438 {
439         struct ost_body *body;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
446                                   lustre_swab_ost_body);
447         if (body == NULL) {
448                 CERROR ("can't unpack ost_body\n");
449                 GOTO(out, rc = -EPROTO);
450         }
451
452         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
453 out:
454         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
455         RETURN(rc);
456 }
457
458 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
459                      struct obd_trans_info *oti,
460                      struct ptlrpc_request_set *rqset)
461 {
462         struct ptlrpc_request *req;
463         struct osc_async_args *aa;
464         struct ost_body *body;
465         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
466         ENTRY;
467
468         if (!oinfo->oi_oa) {
469                 CERROR("oa NULL\n");
470                 RETURN(-EINVAL);
471         }
472
473         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
474                               OST_PUNCH, 2, size, NULL);
475         if (!req)
476                 RETURN(-ENOMEM);
477
478         /* FIXME bug 249. Also see bug 7198 */
479         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
480             OBD_CONNECT_REQPORTAL)
481                 req->rq_request_portal = OST_IO_PORTAL;
482
483         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
484         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
485
486         /* overload the size and blocks fields in the oa with start/end */
487         body->oa.o_size = oinfo->oi_policy.l_extent.start;
488         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
489         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
490
491         ptlrpc_req_set_repsize(req, 2, size);
492
493         req->rq_interpret_reply = osc_punch_interpret;
494         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
495         aa = (struct osc_async_args *)&req->rq_async_args;
496         aa->aa_oi = oinfo;
497         ptlrpc_set_add_req(rqset, req);
498
499         RETURN(0);
500 }
501
502 static int osc_sync(struct obd_export *exp, struct obdo *oa,
503                     struct lov_stripe_md *md, obd_size start, obd_size end)
504 {
505         struct ptlrpc_request *req;
506         struct ost_body *body;
507         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
508         ENTRY;
509
510         if (!oa) {
511                 CERROR("oa NULL\n");
512                 RETURN(-EINVAL);
513         }
514
515         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
516                               OST_SYNC, 2, size, NULL);
517         if (!req)
518                 RETURN(-ENOMEM);
519
520         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
521         memcpy(&body->oa, oa, sizeof(*oa));
522
523         /* overload the size and blocks fields in the oa with start/end */
524         body->oa.o_size = start;
525         body->oa.o_blocks = end;
526         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
527
528         ptlrpc_req_set_repsize(req, 2, size);
529
530         rc = ptlrpc_queue_wait(req);
531         if (rc)
532                 GOTO(out, rc);
533
534         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
535                                   lustre_swab_ost_body);
536         if (body == NULL) {
537                 CERROR ("can't unpack ost_body\n");
538                 GOTO (out, rc = -EPROTO);
539         }
540
541         memcpy(oa, &body->oa, sizeof(*oa));
542
543         EXIT;
544  out:
545         ptlrpc_req_finished(req);
546         return rc;
547 }
548
549 /* Destroy requests can be async always on the client, and we don't even really
550  * care about the return code since the client cannot do anything at all about
551  * a destroy failure.
552  * When the MDS is unlinking a filename, it saves the file objects into a
553  * recovery llog, and these object records are cancelled when the OST reports
554  * they were destroyed and sync'd to disk (i.e. transaction committed).
555  * If the client dies, or the OST is down when the object should be destroyed,
556  * the records are not cancelled, and when the OST reconnects to the MDS next,
557  * it will retrieve the llog unlink logs and then sends the log cancellation
558  * cookies to the MDS after committing destroy transactions. */
559 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
560                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
561                        struct obd_export *md_export)
562 {
563         struct ptlrpc_request *req;
564         struct ost_body *body;
565         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
566         ENTRY;
567
568         if (!oa) {
569                 CERROR("oa NULL\n");
570                 RETURN(-EINVAL);
571         }
572
573         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
574                               OST_DESTROY, 2, size, NULL);
575         if (!req)
576                 RETURN(-ENOMEM);
577
578         /* FIXME bug 249. Also see bug 7198 */
579         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
580             OBD_CONNECT_REQPORTAL)
581                 req->rq_request_portal = OST_IO_PORTAL;
582
583         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
584
585         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
586                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
587                        sizeof(*oti->oti_logcookies));
588         }
589
590         memcpy(&body->oa, oa, sizeof(*oa));
591         ptlrpc_req_set_repsize(req, 2, size);
592
593         ptlrpcd_add_req(req);
594         RETURN(0);
595 }
596
597 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
598                                 long writing_bytes)
599 {
600         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
601
602         LASSERT(!(oa->o_valid & bits));
603
604         oa->o_valid |= bits;
605         client_obd_list_lock(&cli->cl_loi_list_lock);
606         oa->o_dirty = cli->cl_dirty;
607         if (cli->cl_dirty > cli->cl_dirty_max) {
608                 CERROR("dirty %lu > dirty_max %lu\n",
609                        cli->cl_dirty, cli->cl_dirty_max);
610                 oa->o_undirty = 0;
611         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
612                 CERROR("dirty %d > system dirty_max %d\n",
613                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
614                 oa->o_undirty = 0;
615         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
616                 CERROR("dirty %lu - dirty_max %lu too big???\n",
617                        cli->cl_dirty, cli->cl_dirty_max);
618                 oa->o_undirty = 0;
619         } else {
620                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
621                                 (cli->cl_max_rpcs_in_flight + 1);
622                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
623         }
624         oa->o_grant = cli->cl_avail_grant;
625         oa->o_dropped = cli->cl_lost_grant;
626         cli->cl_lost_grant = 0;
627         client_obd_list_unlock(&cli->cl_loi_list_lock);
628         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
629                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
630 }
631
632 /* caller must hold loi_list_lock */
633 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
634 {
635         atomic_inc(&obd_dirty_pages);
636         cli->cl_dirty += CFS_PAGE_SIZE;
637         cli->cl_avail_grant -= CFS_PAGE_SIZE;
638         pga->flag |= OBD_BRW_FROM_GRANT;
639         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
640                CFS_PAGE_SIZE, pga, pga->pg);
641         LASSERT(cli->cl_avail_grant >= 0);
642 }
643
644 /* the companion to osc_consume_write_grant, called when a brw has completed.
645  * must be called with the loi lock held. */
646 static void osc_release_write_grant(struct client_obd *cli,
647                                     struct brw_page *pga, int sent)
648 {
649         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
650         ENTRY;
651
652         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
653                 EXIT;
654                 return;
655         }
656
657         pga->flag &= ~OBD_BRW_FROM_GRANT;
658         atomic_dec(&obd_dirty_pages);
659         cli->cl_dirty -= CFS_PAGE_SIZE;
660         if (!sent) {
661                 cli->cl_lost_grant += CFS_PAGE_SIZE;
662                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
663                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
664         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
665                 /* For short writes we shouldn't count parts of pages that
666                  * span a whole block on the OST side, or our accounting goes
667                  * wrong.  Should match the code in filter_grant_check. */
668                 int offset = pga->off & ~CFS_PAGE_MASK;
669                 int count = pga->count + (offset & (blocksize - 1));
670                 int end = (offset + pga->count) & (blocksize - 1);
671                 if (end)
672                         count += blocksize - end;
673
674                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
675                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
676                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
677                        cli->cl_avail_grant, cli->cl_dirty);
678         }
679
680         EXIT;
681 }
682
683 static unsigned long rpcs_in_flight(struct client_obd *cli)
684 {
685         return cli->cl_r_in_flight + cli->cl_w_in_flight;
686 }
687
688 /* caller must hold loi_list_lock */
689 void osc_wake_cache_waiters(struct client_obd *cli)
690 {
691         struct list_head *l, *tmp;
692         struct osc_cache_waiter *ocw;
693
694         ENTRY;
695         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
696                 /* if we can't dirty more, we must wait until some is written */
697                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
698                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
699                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
700                                "osc max %ld, sys max %d\n", cli->cl_dirty,
701                                cli->cl_dirty_max, obd_max_dirty_pages);
702                         return;
703                 }
704
705                 /* if still dirty cache but no grant wait for pending RPCs that
706                  * may yet return us some grant before doing sync writes */
707                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
708                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
709                                cli->cl_w_in_flight);
710                         return;
711                 }
712
713                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
714                 list_del_init(&ocw->ocw_entry);
715                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
716                         /* no more RPCs in flight to return grant, do sync IO */
717                         ocw->ocw_rc = -EDQUOT;
718                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
719                 } else {
720                         osc_consume_write_grant(cli,
721                                                 &ocw->ocw_oap->oap_brw_page);
722                 }
723
724                 cfs_waitq_signal(&ocw->ocw_waitq);
725         }
726
727         EXIT;
728 }
729
730 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
731 {
732         client_obd_list_lock(&cli->cl_loi_list_lock);
733         cli->cl_avail_grant = ocd->ocd_grant;
734         client_obd_list_unlock(&cli->cl_loi_list_lock);
735
736         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
737                cli->cl_avail_grant, cli->cl_lost_grant);
738         LASSERT(cli->cl_avail_grant >= 0);
739 }
740
741 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
742 {
743         client_obd_list_lock(&cli->cl_loi_list_lock);
744         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
745         cli->cl_avail_grant += body->oa.o_grant;
746         /* waiters are woken in brw_interpret_oap */
747         client_obd_list_unlock(&cli->cl_loi_list_lock);
748 }
749
750 /* We assume that the reason this OSC got a short read is because it read
751  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
752  * via the LOV, and it _knows_ it's reading inside the file, it's just that
753  * this stripe never got written at or beyond this stripe offset yet. */
754 static void handle_short_read(int nob_read, obd_count page_count,
755                               struct brw_page **pga)
756 {
757         char *ptr;
758         int i = 0;
759
760         /* skip bytes read OK */
761         while (nob_read > 0) {
762                 LASSERT (page_count > 0);
763
764                 if (pga[i]->count > nob_read) {
765                         /* EOF inside this page */
766                         ptr = cfs_kmap(pga[i]->pg) + 
767                                 (pga[i]->off & ~CFS_PAGE_MASK);
768                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
769                         cfs_kunmap(pga[i]->pg);
770                         page_count--;
771                         i++;
772                         break;
773                 }
774
775                 nob_read -= pga[i]->count;
776                 page_count--;
777                 i++;
778         }
779
780         /* zero remaining pages */
781         while (page_count-- > 0) {
782                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
783                 memset(ptr, 0, pga[i]->count);
784                 cfs_kunmap(pga[i]->pg);
785                 i++;
786         }
787 }
788
789 static int check_write_rcs(struct ptlrpc_request *req,
790                            int requested_nob, int niocount,
791                            obd_count page_count, struct brw_page **pga)
792 {
793         int    *remote_rcs, i;
794
795         /* return error if any niobuf was in error */
796         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
797                                         sizeof(*remote_rcs) * niocount, NULL);
798         if (remote_rcs == NULL) {
799                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
800                 return(-EPROTO);
801         }
802         if (lustre_msg_swabbed(req->rq_repmsg))
803                 for (i = 0; i < niocount; i++)
804                         __swab32s(&remote_rcs[i]);
805
806         for (i = 0; i < niocount; i++) {
807                 if (remote_rcs[i] < 0)
808                         return(remote_rcs[i]);
809
810                 if (remote_rcs[i] != 0) {
811                         CERROR("rc[%d] invalid (%d) req %p\n",
812                                 i, remote_rcs[i], req);
813                         return(-EPROTO);
814                 }
815         }
816
817         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
818                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
819                        requested_nob, req->rq_bulk->bd_nob_transferred);
820                 return(-EPROTO);
821         }
822
823         return (0);
824 }
825
826 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
827 {
828         if (p1->flag != p2->flag) {
829                 unsigned mask = ~OBD_BRW_FROM_GRANT;
830
831                 /* warn if we try to combine flags that we don't know to be
832                  * safe to combine */
833                 if ((p1->flag & mask) != (p2->flag & mask))
834                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
835                                "same brw?\n", p1->flag, p2->flag);
836                 return 0;
837         }
838
839         return (p1->off + p1->count == p2->off);
840 }
841
842 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
843                                    struct brw_page **pga)
844 {
845         __u32 cksum = ~0;
846         int i = 0;
847
848         LASSERT (pg_count > 0);
849         while (nob > 0 && pg_count > 0) {
850                 char *ptr = cfs_kmap(pga[i]->pg);
851                 int off = pga[i]->off & ~CFS_PAGE_MASK;
852                 int count = pga[i]->count > nob ? nob : pga[i]->count;
853
854                 /* corrupt the data before we compute the checksum, to
855                  * simulate an OST->client data error */
856                 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
857                         memcpy(ptr + off, "bad1", min(4, nob));
858                 cksum = crc32_le(cksum, ptr + off, count);
859                 cfs_kunmap(pga[i]->pg);
860                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
861                                off, cksum);
862
863                 nob -= pga[i]->count;
864                 pg_count--;
865                 i++;
866         }
867         /* For sending we only compute the wrong checksum instead
868          * of corrupting the data so it is still correct on a redo */
869         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
870                 cksum++;
871
872         return cksum;
873 }
874
875 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
876                                 struct lov_stripe_md *lsm, obd_count page_count,
877                                 struct brw_page **pga,
878                                 struct ptlrpc_request **reqp)
879 {
880         struct ptlrpc_request   *req;
881         struct ptlrpc_bulk_desc *desc;
882         struct ost_body         *body;
883         struct obd_ioobj        *ioobj;
884         struct niobuf_remote    *niobuf;
885         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
886         int niocount, i, requested_nob, opc, rc;
887         struct ptlrpc_request_pool *pool;
888         struct osc_brw_async_args *aa;
889
890         ENTRY;
891         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
892         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
893
894         for (niocount = i = 1; i < page_count; i++) {
895                 if (!can_merge_pages(pga[i - 1], pga[i]))
896                         niocount++;
897         }
898
899         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
900         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
901
902         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
903         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
904                                    NULL, pool);
905         if (req == NULL)
906                 RETURN (-ENOMEM);
907
908         /* FIXME bug 249. Also see bug 7198 */
909         if (cli->cl_import->imp_connect_data.ocd_connect_flags &
910             OBD_CONNECT_REQPORTAL)
911                 req->rq_request_portal = OST_IO_PORTAL;
912
913         if (opc == OST_WRITE)
914                 desc = ptlrpc_prep_bulk_imp (req, page_count,
915                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
916         else
917                 desc = ptlrpc_prep_bulk_imp (req, page_count,
918                                              BULK_PUT_SINK, OST_BULK_PORTAL);
919         if (desc == NULL)
920                 GOTO(out, rc = -ENOMEM);
921         /* NB request now owns desc and will free it when it gets freed */
922
923         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
924         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
925         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
926                                 niocount * sizeof(*niobuf));
927
928         memcpy(&body->oa, oa, sizeof(*oa));
929
930         obdo_to_ioobj(oa, ioobj);
931         ioobj->ioo_bufcnt = niocount;
932
933         LASSERT (page_count > 0);
934         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
935                 struct brw_page *pg = pga[i];
936                 struct brw_page *pg_prev = pga[i - 1];
937
938                 LASSERT(pg->count > 0);
939                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
940                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
941                          pg->off, pg->count);
942 #ifdef __LINUX__
943                 LASSERTF(i == 0 || pg->off > pg_prev->off,
944                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
945                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
946                          i, page_count,
947                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
948                          pg_prev->pg, page_private(pg_prev->pg),
949                          pg_prev->pg->index, pg_prev->off);
950 #else
951                 LASSERTF(i == 0 || pg->off > pg_prev->off,
952                          "i %d p_c %u\n", i, page_count);
953 #endif
954                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
955                         (pg->flag & OBD_BRW_SRVLOCK));
956
957                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
958                                       pg->count);
959                 requested_nob += pg->count;
960
961                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
962                         niobuf--;
963                         niobuf->len += pg->count;
964                 } else {
965                         niobuf->offset = pg->off;
966                         niobuf->len    = pg->count;
967                         niobuf->flags  = pg->flag;
968                 }
969         }
970
971         LASSERT((void *)(niobuf - niocount) ==
972                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
973                                niocount * sizeof(*niobuf)));
974         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
975
976         /* size[REQ_REC_OFF] still sizeof (*body) */
977         if (opc == OST_WRITE) {
978                 if (unlikely(cli->cl_checksum)) {
979                         body->oa.o_valid |= OBD_MD_FLCKSUM;
980                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
981                                                              page_count, pga);
982                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
983                                body->oa.o_cksum);
984                         /* save this in 'oa', too, for later checking */
985                         oa->o_valid |= OBD_MD_FLCKSUM;
986                 } else {
987                         /* clear out the checksum flag, in case this is a
988                          * resend but cl_checksum is no longer set. b=11238 */
989                         oa->o_valid &= ~OBD_MD_FLCKSUM;
990                 }
991                 oa->o_cksum = body->oa.o_cksum;
992                 /* 1 RC per niobuf */
993                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
994                 ptlrpc_req_set_repsize(req, 3, size);
995         } else {
996                 if (unlikely(cli->cl_checksum))
997                         body->oa.o_valid |= OBD_MD_FLCKSUM;
998                 /* 1 RC for the whole I/O */
999                 ptlrpc_req_set_repsize(req, 2, size);
1000         }
1001
1002         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1003         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1004         aa->aa_oa = oa;
1005         aa->aa_requested_nob = requested_nob;
1006         aa->aa_nio_count = niocount;
1007         aa->aa_page_count = page_count;
1008         aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
1009         aa->aa_ppga = pga;
1010         aa->aa_cli = cli;
1011         INIT_LIST_HEAD(&aa->aa_oaps);
1012
1013         *reqp = req;
1014         RETURN (0);
1015
1016  out:
1017         ptlrpc_req_finished (req);
1018         RETURN (rc);
1019 }
1020
1021 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1022                                  __u32 client_cksum, __u32 server_cksum, int nob,
1023                                  obd_count page_count, struct brw_page **pga)
1024 {
1025         __u32 new_cksum;
1026         char *msg;
1027
1028         if (server_cksum == client_cksum) {
1029                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1030                 return 0;
1031         }
1032
1033         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1034
1035         if (new_cksum == server_cksum)
1036                 msg = "changed on the client after we checksummed it";
1037         else if (new_cksum == client_cksum)
1038                 msg = "changed in transit before arrival at OST";
1039         else
1040                 msg = "changed in transit AND doesn't match the original";
1041
1042         LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1043                        " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1044                        msg, libcfs_nid2str(peer->nid),
1045                        oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1046                        oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1047                        oa->o_id,
1048                        oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1049                        pga[0]->off,
1050                        pga[page_count-1]->off + pga[page_count-1]->count - 1);
1051         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1052                client_cksum, server_cksum, new_cksum);
1053
1054         return 1;
1055 }
1056
1057 /* Note rc enters this function as number of bytes transferred */
1058 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1059 {
1060         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1061         const lnet_process_id_t *peer =
1062                         &req->rq_import->imp_connection->c_peer;
1063         struct client_obd *cli = aa->aa_cli;
1064         struct ost_body *body;
1065         __u32 client_cksum = 0;
1066         ENTRY;
1067
1068         if (rc < 0 && rc != -EDQUOT)
1069                 RETURN(rc);
1070
1071         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1072         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1073                                   lustre_swab_ost_body);
1074         if (body == NULL) {
1075                 CERROR ("Can't unpack body\n");
1076                 RETURN(-EPROTO);
1077         }
1078
1079         /* set/clear over quota flag for a uid/gid */
1080         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1081             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1082                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1083                              body->oa.o_gid, body->oa.o_valid,
1084                              body->oa.o_flags);
1085
1086         if (rc < 0)
1087                 RETURN(rc);
1088
1089         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1090                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1091
1092         osc_update_grant(cli, body);
1093
1094         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1095                 if (rc > 0) {
1096                         CERROR ("Unexpected +ve rc %d\n", rc);
1097                         RETURN(-EPROTO);
1098                 }
1099                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1100
1101                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1102                              client_cksum &&
1103                              check_write_checksum(&body->oa, peer, client_cksum,
1104                                                  body->oa.o_cksum,
1105                                                  aa->aa_requested_nob,
1106                                                  aa->aa_page_count,
1107                                                  aa->aa_ppga)))
1108                         RETURN(-EAGAIN);
1109
1110                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1111                                      aa->aa_page_count, aa->aa_ppga);
1112                 GOTO(out, rc);
1113         }
1114
1115         /* The rest of this function executes only for OST_READs */
1116         if (rc > aa->aa_requested_nob) {
1117                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1118                        aa->aa_requested_nob);
1119                 RETURN(-EPROTO);
1120         }
1121
1122         if (rc != req->rq_bulk->bd_nob_transferred) {
1123                 CERROR ("Unexpected rc %d (%d transferred)\n",
1124                         rc, req->rq_bulk->bd_nob_transferred);
1125                 return (-EPROTO);
1126         }
1127
1128         if (rc < aa->aa_requested_nob)
1129                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1130
1131         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1132                 static int cksum_counter;
1133                 __u32 server_cksum = body->oa.o_cksum;
1134                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1135                                                  aa->aa_ppga);
1136
1137                 if (server_cksum == ~0 && rc > 0) {
1138                         CERROR("Protocol error: server %s set the 'checksum' "
1139                                "bit, but didn't send a checksum.  Not fatal, "
1140                                "but please tell CFS.\n",
1141                                libcfs_nid2str(peer->nid));
1142                 } else if (server_cksum != client_cksum) {
1143                         LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s inum "
1144                                        LPU64"/"LPU64" object "LPU64"/"LPU64
1145                                        " extent ["LPU64"-"LPU64"]\n",
1146                                        req->rq_import->imp_obd->obd_name,
1147                                        libcfs_nid2str(peer->nid),
1148                                        body->oa.o_valid & OBD_MD_FLFID ?
1149                                                 body->oa.o_fid : (__u64)0,
1150                                        body->oa.o_valid & OBD_MD_FLFID ?
1151                                                 body->oa.o_generation :(__u64)0,
1152                                        body->oa.o_id,
1153                                        body->oa.o_valid & OBD_MD_FLGROUP ?
1154                                                 body->oa.o_gr : (__u64)0,
1155                                        aa->aa_ppga[0]->off,
1156                                        aa->aa_ppga[aa->aa_page_count-1]->off +
1157                                        aa->aa_ppga[aa->aa_page_count-1]->count -
1158                                                                         1);
1159                         CERROR("client %x, server %x\n",
1160                                client_cksum, server_cksum);
1161                         cksum_counter = 0;
1162                         aa->aa_oa->o_cksum = client_cksum;
1163                         rc = -EAGAIN;
1164                 } else {
1165                         cksum_counter++;
1166                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1167                         rc = 0;
1168                 }
1169         } else if (unlikely(client_cksum)) {
1170                 static int cksum_missed;
1171
1172                 cksum_missed++;
1173                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1174                         CERROR("Checksum %u requested from %s but not sent\n",
1175                                cksum_missed, libcfs_nid2str(peer->nid));
1176         } else {
1177                 rc = 0;
1178         }
1179 out:
1180         if (rc >= 0)
1181                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1182
1183         RETURN(rc);
1184 }
1185
1186 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1187                             struct lov_stripe_md *lsm,
1188                             obd_count page_count, struct brw_page **pga)
1189 {
1190         struct ptlrpc_request *request;
1191         int                    rc, retries = 5; /* lprocfs? */
1192         ENTRY;
1193
1194 restart_bulk:
1195         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1196                                   page_count, pga, &request);
1197         if (rc != 0)
1198                 return (rc);
1199
1200         rc = ptlrpc_queue_wait(request);
1201
1202         if (rc == -ETIMEDOUT && request->rq_resend) {
1203                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1204                 ptlrpc_req_finished(request);
1205                 goto restart_bulk;
1206         }
1207
1208         rc = osc_brw_fini_request(request, rc);
1209
1210         ptlrpc_req_finished(request);
1211         if (rc == -EAGAIN) {
1212                 if (retries-- > 0)
1213                         goto restart_bulk;
1214                 rc = -EIO;
1215         }
1216         RETURN(rc);
1217 }
1218
1219 int osc_brw_redo_request(struct ptlrpc_request *request,
1220                          struct osc_brw_async_args *aa)
1221 {
1222         struct ptlrpc_request *new_req;
1223         struct ptlrpc_request_set *set = request->rq_set;
1224         struct osc_brw_async_args *new_aa;
1225         struct osc_async_page *oap;
1226         int rc = 0;
1227         ENTRY;
1228
1229         if (aa->aa_retries-- <= 0) {
1230                 CERROR("too many checksum retries, returning error\n");
1231                 RETURN(-EIO);
1232         }
1233
1234         DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1235         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1236                 if (oap->oap_request != NULL) {
1237                         LASSERTF(request == oap->oap_request,
1238                                  "request %p != oap_request %p\n",
1239                                  request, oap->oap_request);
1240                         if (oap->oap_interrupted) {
1241                                 ptlrpc_mark_interrupted(oap->oap_request);
1242                                 rc = -EINTR;
1243                                 break;
1244                         }
1245                 }
1246         }
1247         if (rc)
1248                 RETURN(rc);
1249
1250         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1251                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1252                                   aa->aa_cli, aa->aa_oa,
1253                                   NULL /* lsm unused by osc currently */,
1254                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1255         if (rc)
1256                 RETURN(rc);
1257
1258         /* New request takes over pga and oaps from old request.
1259          * Note that copying a list_head doesn't work, need to move it... */
1260         new_req->rq_interpret_reply = request->rq_interpret_reply;
1261         new_req->rq_async_args = request->rq_async_args;
1262         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1263         INIT_LIST_HEAD(&new_aa->aa_oaps);
1264         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1265         INIT_LIST_HEAD(&aa->aa_oaps);
1266
1267         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1268                 if (oap->oap_request) {
1269                         ptlrpc_req_finished(oap->oap_request);
1270                         oap->oap_request = ptlrpc_request_addref(new_req);
1271                 }
1272         }
1273
1274         ptlrpc_set_add_req(set, new_req);
1275
1276         RETURN(0);
1277 }
1278
1279 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1280 {
1281         struct osc_brw_async_args *aa = data;
1282         int                        i;
1283         ENTRY;
1284
1285         rc = osc_brw_fini_request(request, rc);
1286         if (rc == -EAGAIN) {
1287                 rc = osc_brw_redo_request(request, aa);
1288                 if (rc == 0)
1289                         RETURN(0);
1290         }
1291
1292         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1293         for (i = 0; i < aa->aa_page_count; i++)
1294                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1295         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1296
1297         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1298
1299         RETURN(rc);
1300 }
1301
1302 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1303                           struct lov_stripe_md *lsm, obd_count page_count,
1304                           struct brw_page **pga, struct ptlrpc_request_set *set)
1305 {
1306         struct ptlrpc_request     *request;
1307         struct client_obd         *cli = &exp->exp_obd->u.cli;
1308         int                        rc, i;
1309         ENTRY;
1310
1311         /* Consume write credits even if doing a sync write -
1312          * otherwise we may run out of space on OST due to grant. */
1313         if (cmd == OBD_BRW_WRITE) {
1314                 spin_lock(&cli->cl_loi_list_lock);
1315                 for (i = 0; i < page_count; i++) {
1316                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1317                                 osc_consume_write_grant(cli, pga[i]);
1318                 }
1319                 spin_unlock(&cli->cl_loi_list_lock);
1320         }
1321
1322         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1323                                   page_count, pga, &request);
1324
1325         if (rc == 0) {
1326                 request->rq_interpret_reply = brw_interpret;
1327                 ptlrpc_set_add_req(set, request);
1328         } else if (cmd == OBD_BRW_WRITE) {
1329                 spin_lock(&cli->cl_loi_list_lock);
1330                 for (i = 0; i < page_count; i++)
1331                         osc_release_write_grant(cli, pga[i], 0);
1332                 spin_unlock(&cli->cl_loi_list_lock);
1333         }
1334
1335         RETURN (rc);
1336 }
1337
1338 /*
1339  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1340  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1341  * fine for our small page arrays and doesn't require allocation.  its an
1342  * insertion sort that swaps elements that are strides apart, shrinking the
1343  * stride down until its '1' and the array is sorted.
1344  */
1345 static void sort_brw_pages(struct brw_page **array, int num)
1346 {
1347         int stride, i, j;
1348         struct brw_page *tmp;
1349
1350         if (num == 1)
1351                 return;
1352         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1353                 ;
1354
1355         do {
1356                 stride /= 3;
1357                 for (i = stride ; i < num ; i++) {
1358                         tmp = array[i];
1359                         j = i;
1360                         while (j >= stride && array[j-stride]->off > tmp->off) {
1361                                 array[j] = array[j - stride];
1362                                 j -= stride;
1363                         }
1364                         array[j] = tmp;
1365                 }
1366         } while (stride > 1);
1367 }
1368
1369 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1370 {
1371         int count = 1;
1372         int offset;
1373         int i = 0;
1374
1375         LASSERT (pages > 0);
1376         offset = pg[i]->off & (~CFS_PAGE_MASK);
1377
1378         for (;;) {
1379                 pages--;
1380                 if (pages == 0)         /* that's all */
1381                         return count;
1382
1383                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1384                         return count;   /* doesn't end on page boundary */
1385
1386                 i++;
1387                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1388                 if (offset != 0)        /* doesn't start on page boundary */
1389                         return count;
1390
1391                 count++;
1392         }
1393 }
1394
1395 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1396 {
1397         struct brw_page **ppga;
1398         int i;
1399
1400         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1401         if (ppga == NULL)
1402                 return NULL;
1403
1404         for (i = 0; i < count; i++)
1405                 ppga[i] = pga + i;
1406         return ppga;
1407 }
1408
1409 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1410 {
1411         LASSERT(ppga != NULL);
1412         OBD_FREE(ppga, sizeof(*ppga) * count);
1413 }
1414
1415 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1416                    obd_count page_count, struct brw_page *pga,
1417                    struct obd_trans_info *oti)
1418 {
1419         struct obdo *saved_oa = NULL;
1420         struct brw_page **ppga, **orig;
1421         struct obd_import *imp = class_exp2cliimp(exp);
1422         struct client_obd *cli = &imp->imp_obd->u.cli;
1423         int rc, page_count_orig;
1424         ENTRY;
1425
1426         if (cmd & OBD_BRW_CHECK) {
1427                 /* The caller just wants to know if there's a chance that this
1428                  * I/O can succeed */
1429
1430                 if (imp == NULL || imp->imp_invalid)
1431                         RETURN(-EIO);
1432                 RETURN(0);
1433         }
1434
1435         /* test_brw with a failed create can trip this, maybe others. */
1436         LASSERT(cli->cl_max_pages_per_rpc);
1437
1438         rc = 0;
1439
1440         orig = ppga = osc_build_ppga(pga, page_count);
1441         if (ppga == NULL)
1442                 RETURN(-ENOMEM);
1443         page_count_orig = page_count;
1444
1445         sort_brw_pages(ppga, page_count);
1446         while (page_count) {
1447                 obd_count pages_per_brw;
1448
1449                 if (page_count > cli->cl_max_pages_per_rpc)
1450                         pages_per_brw = cli->cl_max_pages_per_rpc;
1451                 else
1452                         pages_per_brw = page_count;
1453
1454                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1455
1456                 if (saved_oa != NULL) {
1457                         /* restore previously saved oa */
1458                         *oinfo->oi_oa = *saved_oa;
1459                 } else if (page_count > pages_per_brw) {
1460                         /* save a copy of oa (brw will clobber it) */
1461                         saved_oa = obdo_alloc();
1462                         if (saved_oa == NULL)
1463                                 GOTO(out, rc = -ENOMEM);
1464                         *saved_oa = *oinfo->oi_oa;
1465                 }
1466
1467                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1468                                       pages_per_brw, ppga);
1469
1470                 if (rc != 0)
1471                         break;
1472
1473                 page_count -= pages_per_brw;
1474                 ppga += pages_per_brw;
1475         }
1476
1477 out:
1478         osc_release_ppga(orig, page_count_orig);
1479
1480         if (saved_oa != NULL)
1481                 obdo_free(saved_oa);
1482
1483         RETURN(rc);
1484 }
1485
1486 static int osc_brw_async(int cmd, struct obd_export *exp,
1487                          struct obd_info *oinfo, obd_count page_count,
1488                          struct brw_page *pga, struct obd_trans_info *oti,
1489                          struct ptlrpc_request_set *set)
1490 {
1491         struct brw_page **ppga, **orig;
1492         int page_count_orig;
1493         int rc = 0;
1494         ENTRY;
1495
1496         if (cmd & OBD_BRW_CHECK) {
1497                 /* The caller just wants to know if there's a chance that this
1498                  * I/O can succeed */
1499                 struct obd_import *imp = class_exp2cliimp(exp);
1500
1501                 if (imp == NULL || imp->imp_invalid)
1502                         RETURN(-EIO);
1503                 RETURN(0);
1504         }
1505
1506         orig = ppga = osc_build_ppga(pga, page_count);
1507         if (ppga == NULL)
1508                 RETURN(-ENOMEM);
1509         page_count_orig = page_count;
1510
1511         sort_brw_pages(ppga, page_count);
1512         while (page_count) {
1513                 struct brw_page **copy;
1514                 obd_count pages_per_brw;
1515
1516                 pages_per_brw = min_t(obd_count, page_count,
1517                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1518
1519                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1520
1521                 /* use ppga only if single RPC is going to fly */
1522                 if (pages_per_brw != page_count_orig || ppga != orig) {
1523                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1524                         if (copy == NULL)
1525                                 GOTO(out, rc = -ENOMEM);
1526                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1527                 } else
1528                         copy = ppga;
1529
1530                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1531                                     pages_per_brw, copy, set);
1532
1533                 if (rc != 0) {
1534                         if (copy != ppga)
1535                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1536                         break;
1537                 }
1538
1539                 if (copy == orig) {
1540                         /* we passed it to async_internal() which is
1541                          * now responsible for releasing memory */
1542                         orig = NULL;
1543                 }
1544
1545                 page_count -= pages_per_brw;
1546                 ppga += pages_per_brw;
1547         }
1548 out:
1549         if (orig)
1550                 osc_release_ppga(orig, page_count_orig);
1551         RETURN(rc);
1552 }
1553
1554 static void osc_check_rpcs(struct client_obd *cli);
1555
1556 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1557  * the dirty accounting.  Writeback completes or truncate happens before
1558  * writing starts.  Must be called with the loi lock held. */
1559 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1560                            int sent)
1561 {
1562         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1563 }
1564
1565 /* This maintains the lists of pending pages to read/write for a given object
1566  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1567  * to quickly find objects that are ready to send an RPC. */
1568 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1569                          int cmd)
1570 {
1571         int optimal;
1572         ENTRY;
1573
1574         if (lop->lop_num_pending == 0)
1575                 RETURN(0);
1576
1577         /* if we have an invalid import we want to drain the queued pages
1578          * by forcing them through rpcs that immediately fail and complete
1579          * the pages.  recovery relies on this to empty the queued pages
1580          * before canceling the locks and evicting down the llite pages */
1581         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1582                 RETURN(1);
1583
1584         /* stream rpcs in queue order as long as as there is an urgent page
1585          * queued.  this is our cheap solution for good batching in the case
1586          * where writepage marks some random page in the middle of the file
1587          * as urgent because of, say, memory pressure */
1588         if (!list_empty(&lop->lop_urgent)) {
1589                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1590                 RETURN(1);
1591         }
1592
1593         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1594         optimal = cli->cl_max_pages_per_rpc;
1595         if (cmd & OBD_BRW_WRITE) {
1596                 /* trigger a write rpc stream as long as there are dirtiers
1597                  * waiting for space.  as they're waiting, they're not going to
1598                  * create more pages to coallesce with what's waiting.. */
1599                 if (!list_empty(&cli->cl_cache_waiters)) {
1600                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1601                         RETURN(1);
1602                 }
1603
1604                 /* +16 to avoid triggering rpcs that would want to include pages
1605                  * that are being queued but which can't be made ready until
1606                  * the queuer finishes with the page. this is a wart for
1607                  * llite::commit_write() */
1608                 optimal += 16;
1609         }
1610         if (lop->lop_num_pending >= optimal)
1611                 RETURN(1);
1612
1613         RETURN(0);
1614 }
1615
1616 static void on_list(struct list_head *item, struct list_head *list,
1617                     int should_be_on)
1618 {
1619         if (list_empty(item) && should_be_on)
1620                 list_add_tail(item, list);
1621         else if (!list_empty(item) && !should_be_on)
1622                 list_del_init(item);
1623 }
1624
1625 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1626  * can find pages to build into rpcs quickly */
1627 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1628 {
1629         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1630                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1631                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1632
1633         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1634                 loi->loi_write_lop.lop_num_pending);
1635
1636         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1637                 loi->loi_read_lop.lop_num_pending);
1638 }
1639
1640 static void lop_update_pending(struct client_obd *cli,
1641                                struct loi_oap_pages *lop, int cmd, int delta)
1642 {
1643         lop->lop_num_pending += delta;
1644         if (cmd & OBD_BRW_WRITE)
1645                 cli->cl_pending_w_pages += delta;
1646         else
1647                 cli->cl_pending_r_pages += delta;
1648 }
1649
1650 /* this is called when a sync waiter receives an interruption.  Its job is to
1651  * get the caller woken as soon as possible.  If its page hasn't been put in an
1652  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1653  * desiring interruption which will forcefully complete the rpc once the rpc
1654  * has timed out */
1655 static void osc_occ_interrupted(struct oig_callback_context *occ)
1656 {
1657         struct osc_async_page *oap;
1658         struct loi_oap_pages *lop;
1659         struct lov_oinfo *loi;
1660         ENTRY;
1661
1662         /* XXX member_of() */
1663         oap = list_entry(occ, struct osc_async_page, oap_occ);
1664
1665         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1666
1667         oap->oap_interrupted = 1;
1668
1669         /* ok, it's been put in an rpc. only one oap gets a request reference */
1670         if (oap->oap_request != NULL) {
1671                 ptlrpc_mark_interrupted(oap->oap_request);
1672                 ptlrpcd_wake(oap->oap_request);
1673                 GOTO(unlock, 0);
1674         }
1675
1676         /* we don't get interruption callbacks until osc_trigger_group_io()
1677          * has been called and put the sync oaps in the pending/urgent lists.*/
1678         if (!list_empty(&oap->oap_pending_item)) {
1679                 list_del_init(&oap->oap_pending_item);
1680                 list_del_init(&oap->oap_urgent_item);
1681
1682                 loi = oap->oap_loi;
1683                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1684                         &loi->loi_write_lop : &loi->loi_read_lop;
1685                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1686                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1687
1688                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1689                 oap->oap_oig = NULL;
1690         }
1691
1692 unlock:
1693         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1694 }
1695
1696 /* this is trying to propogate async writeback errors back up to the
1697  * application.  As an async write fails we record the error code for later if
1698  * the app does an fsync.  As long as errors persist we force future rpcs to be
1699  * sync so that the app can get a sync error and break the cycle of queueing
1700  * pages for which writeback will fail. */
1701 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1702                            int rc)
1703 {
1704         if (rc) {
1705                 if (!ar->ar_rc)
1706                         ar->ar_rc = rc;
1707
1708                 ar->ar_force_sync = 1;
1709                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1710                 return;
1711
1712         }
1713
1714         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1715                 ar->ar_force_sync = 0;
1716 }
1717
1718 static void osc_oap_to_pending(struct osc_async_page *oap)
1719 {
1720         struct loi_oap_pages *lop;
1721
1722         if (oap->oap_cmd & OBD_BRW_WRITE)
1723                 lop = &oap->oap_loi->loi_write_lop;
1724         else
1725                 lop = &oap->oap_loi->loi_read_lop;
1726
1727         if (oap->oap_async_flags & ASYNC_URGENT)
1728                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1729         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1730         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1731 }
1732
1733 /* this must be called holding the loi list lock to give coverage to exit_cache,
1734  * async_flag maintenance, and oap_request */
1735 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1736                               struct osc_async_page *oap, int sent, int rc)
1737 {
1738         ENTRY;
1739         oap->oap_async_flags = 0;
1740         oap->oap_interrupted = 0;
1741
1742         if (oap->oap_cmd & OBD_BRW_WRITE) {
1743                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1744                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1745         }
1746
1747         if (oap->oap_request != NULL) {
1748                 ptlrpc_req_finished(oap->oap_request);
1749                 oap->oap_request = NULL;
1750         }
1751
1752         if (rc == 0 && oa != NULL) {
1753                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1754                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1755                 if (oa->o_valid & OBD_MD_FLMTIME)
1756                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1757                 if (oa->o_valid & OBD_MD_FLATIME)
1758                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1759                 if (oa->o_valid & OBD_MD_FLCTIME)
1760                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1761         }
1762
1763         if (oap->oap_oig) {
1764                 osc_exit_cache(cli, oap, sent);
1765                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1766                 oap->oap_oig = NULL;
1767                 EXIT;
1768                 return;
1769         }
1770
1771         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1772                                                 oap->oap_cmd, oa, rc);
1773
1774         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1775          * I/O on the page could start, but OSC calls it under lock
1776          * and thus we can add oap back to pending safely */
1777         if (rc)
1778                 /* upper layer wants to leave the page on pending queue */
1779                 osc_oap_to_pending(oap);
1780         else
1781                 osc_exit_cache(cli, oap, sent);
1782         EXIT;
1783 }
1784
1785 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1786 {
1787         struct osc_brw_async_args *aa = data;
1788         struct osc_async_page *oap, *tmp;
1789         struct client_obd *cli;
1790         ENTRY;
1791
1792         rc = osc_brw_fini_request(request, rc);
1793         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1794         if (rc == -EAGAIN) {
1795                 rc = osc_brw_redo_request(request, aa);
1796                 if (rc == 0)
1797                         RETURN(0);
1798                 GOTO(out, rc);
1799         }
1800
1801         cli = aa->aa_cli;
1802
1803         client_obd_list_lock(&cli->cl_loi_list_lock);
1804
1805         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1806          * is called so we know whether to go to sync BRWs or wait for more
1807          * RPCs to complete */
1808         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1809                 cli->cl_w_in_flight--;
1810         else
1811                 cli->cl_r_in_flight--;
1812
1813         /* the caller may re-use the oap after the completion call so
1814          * we need to clean it up a little */
1815         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1816                 list_del_init(&oap->oap_rpc_item);
1817                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1818         }
1819
1820         osc_wake_cache_waiters(cli);
1821         osc_check_rpcs(cli);
1822
1823         client_obd_list_unlock(&cli->cl_loi_list_lock);
1824
1825         obdo_free(aa->aa_oa);
1826
1827         rc = 0;
1828 out:
1829         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1830         RETURN(rc);
1831 }
1832
1833 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1834                                             struct list_head *rpc_list,
1835                                             int page_count, int cmd)
1836 {
1837         struct ptlrpc_request *req;
1838         struct brw_page **pga = NULL;
1839         struct osc_brw_async_args *aa;
1840         struct obdo *oa = NULL;
1841         struct obd_async_page_ops *ops = NULL;
1842         void *caller_data = NULL;
1843         struct osc_async_page *oap;
1844         int i, rc;
1845
1846         ENTRY;
1847         LASSERT(!list_empty(rpc_list));
1848
1849         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1850         if (pga == NULL)
1851                 RETURN(ERR_PTR(-ENOMEM));
1852
1853         oa = obdo_alloc();
1854         if (oa == NULL)
1855                 GOTO(out, req = ERR_PTR(-ENOMEM));
1856
1857         i = 0;
1858         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1859                 if (ops == NULL) {
1860                         ops = oap->oap_caller_ops;
1861                         caller_data = oap->oap_caller_data;
1862                 }
1863                 pga[i] = &oap->oap_brw_page;
1864                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1865                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1866                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1867                 i++;
1868         }
1869
1870         /* always get the data for the obdo for the rpc */
1871         LASSERT(ops != NULL);
1872         ops->ap_fill_obdo(caller_data, cmd, oa);
1873
1874         sort_brw_pages(pga, page_count);
1875         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1876         if (rc != 0) {
1877                 CERROR("prep_req failed: %d\n", rc);
1878                 GOTO(out, req = ERR_PTR(rc));
1879         }
1880
1881         /* Need to update the timestamps after the request is built in case
1882          * we race with setattr (locally or in queue at OST).  If OST gets
1883          * later setattr before earlier BRW (as determined by the request xid),
1884          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1885          * way to do this in a single call.  bug 10150 */
1886         ops->ap_update_obdo(caller_data, cmd, oa,
1887                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1888
1889         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1890         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1891         INIT_LIST_HEAD(&aa->aa_oaps);
1892         list_splice(rpc_list, &aa->aa_oaps);
1893         INIT_LIST_HEAD(rpc_list);
1894
1895 out:
1896         if (IS_ERR(req)) {
1897                 if (oa)
1898                         obdo_free(oa);
1899                 if (pga)
1900                         OBD_FREE(pga, sizeof(*pga) * page_count);
1901         }
1902         RETURN(req);
1903 }
1904
1905 /* the loi lock is held across this function but it's allowed to release
1906  * and reacquire it during its work */
1907 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1908                             int cmd, struct loi_oap_pages *lop)
1909 {
1910         struct ptlrpc_request *req;
1911         obd_count page_count = 0;
1912         struct osc_async_page *oap = NULL, *tmp;
1913         struct osc_brw_async_args *aa;
1914         struct obd_async_page_ops *ops;
1915         CFS_LIST_HEAD(rpc_list);
1916         unsigned int ending_offset;
1917         unsigned  starting_offset = 0;
1918         ENTRY;
1919
1920         /* first we find the pages we're allowed to work with */
1921         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1922                 ops = oap->oap_caller_ops;
1923
1924                 LASSERT(oap->oap_magic == OAP_MAGIC);
1925
1926                 /* in llite being 'ready' equates to the page being locked
1927                  * until completion unlocks it.  commit_write submits a page
1928                  * as not ready because its unlock will happen unconditionally
1929                  * as the call returns.  if we race with commit_write giving
1930                  * us that page we dont' want to create a hole in the page
1931                  * stream, so we stop and leave the rpc to be fired by
1932                  * another dirtier or kupdated interval (the not ready page
1933                  * will still be on the dirty list).  we could call in
1934                  * at the end of ll_file_write to process the queue again. */
1935                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1936                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1937                         if (rc < 0)
1938                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1939                                                 "instead of ready\n", oap,
1940                                                 oap->oap_page, rc);
1941                         switch (rc) {
1942                         case -EAGAIN:
1943                                 /* llite is telling us that the page is still
1944                                  * in commit_write and that we should try
1945                                  * and put it in an rpc again later.  we
1946                                  * break out of the loop so we don't create
1947                                  * a hole in the sequence of pages in the rpc
1948                                  * stream.*/
1949                                 oap = NULL;
1950                                 break;
1951                         case -EINTR:
1952                                 /* the io isn't needed.. tell the checks
1953                                  * below to complete the rpc with EINTR */
1954                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1955                                 oap->oap_count = -EINTR;
1956                                 break;
1957                         case 0:
1958                                 oap->oap_async_flags |= ASYNC_READY;
1959                                 break;
1960                         default:
1961                                 LASSERTF(0, "oap %p page %p returned %d "
1962                                             "from make_ready\n", oap,
1963                                             oap->oap_page, rc);
1964                                 break;
1965                         }
1966                 }
1967                 if (oap == NULL)
1968                         break;
1969                 /*
1970                  * Page submitted for IO has to be locked. Either by
1971                  * ->ap_make_ready() or by higher layers.
1972                  *
1973                  * XXX nikita: this assertion should be adjusted when lustre
1974                  * starts using PG_writeback for pages being written out.
1975                  */
1976 #if defined(__KERNEL__) && defined(__LINUX__)
1977                 LASSERT(PageLocked(oap->oap_page));
1978 #endif
1979                 /* If there is a gap at the start of this page, it can't merge
1980                  * with any previous page, so we'll hand the network a
1981                  * "fragmented" page array that it can't transfer in 1 RDMA */
1982                 if (page_count != 0 && oap->oap_page_off != 0)
1983                         break;
1984
1985                 /* take the page out of our book-keeping */
1986                 list_del_init(&oap->oap_pending_item);
1987                 lop_update_pending(cli, lop, cmd, -1);
1988                 list_del_init(&oap->oap_urgent_item);
1989
1990                 if (page_count == 0)
1991                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1992                                           (PTLRPC_MAX_BRW_SIZE - 1);
1993
1994                 /* ask the caller for the size of the io as the rpc leaves. */
1995                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1996                         oap->oap_count =
1997                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1998                 if (oap->oap_count <= 0) {
1999                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2000                                oap->oap_count);
2001                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2002                         continue;
2003                 }
2004
2005                 /* now put the page back in our accounting */
2006                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2007                 if (++page_count >= cli->cl_max_pages_per_rpc)
2008                         break;
2009
2010                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2011                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2012                  * have the same alignment as the initial writes that allocated
2013                  * extents on the server. */
2014                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2015                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2016                 if (ending_offset == 0)
2017                         break;
2018
2019                 /* If there is a gap at the end of this page, it can't merge
2020                  * with any subsequent pages, so we'll hand the network a
2021                  * "fragmented" page array that it can't transfer in 1 RDMA */
2022                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2023                         break;
2024         }
2025
2026         osc_wake_cache_waiters(cli);
2027
2028         if (page_count == 0)
2029                 RETURN(0);
2030
2031         loi_list_maint(cli, loi);
2032
2033         client_obd_list_unlock(&cli->cl_loi_list_lock);
2034
2035         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2036         if (IS_ERR(req)) {
2037                 /* this should happen rarely and is pretty bad, it makes the
2038                  * pending list not follow the dirty order */
2039                 client_obd_list_lock(&cli->cl_loi_list_lock);
2040                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2041                         list_del_init(&oap->oap_rpc_item);
2042
2043                         /* queued sync pages can be torn down while the pages
2044                          * were between the pending list and the rpc */
2045                         if (oap->oap_interrupted) {
2046                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2047                                 osc_ap_completion(cli, NULL, oap, 0,
2048                                                   oap->oap_count);
2049                                 continue;
2050                         }
2051                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2052                 }
2053                 loi_list_maint(cli, loi);
2054                 RETURN(PTR_ERR(req));
2055         }
2056
2057         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2058         if (cmd == OBD_BRW_READ) {
2059                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2060                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2061                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2062                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2063                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2064         } else {
2065                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2066                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2067                                  cli->cl_w_in_flight);
2068                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2069                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2070                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2071         }
2072
2073         client_obd_list_lock(&cli->cl_loi_list_lock);
2074
2075         if (cmd == OBD_BRW_READ)
2076                 cli->cl_r_in_flight++;
2077         else
2078                 cli->cl_w_in_flight++;
2079
2080         /* queued sync pages can be torn down while the pages
2081          * were between the pending list and the rpc */
2082         tmp = NULL;
2083         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2084                 /* only one oap gets a request reference */
2085                 if (tmp == NULL)
2086                         tmp = oap;
2087                 if (oap->oap_interrupted && !req->rq_intr) {
2088                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2089                                oap, req);
2090                         ptlrpc_mark_interrupted(req);
2091                 }
2092         }
2093         if (tmp != NULL)
2094                 tmp->oap_request = ptlrpc_request_addref(req);
2095
2096         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2097                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2098
2099         req->rq_interpret_reply = brw_interpret_oap;
2100         ptlrpcd_add_req(req);
2101         RETURN(1);
2102 }
2103
2104 #define LOI_DEBUG(LOI, STR, args...)                                     \
2105         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2106                !list_empty(&(LOI)->loi_cli_item),                        \
2107                (LOI)->loi_write_lop.lop_num_pending,                     \
2108                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2109                (LOI)->loi_read_lop.lop_num_pending,                      \
2110                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2111                args)                                                     \
2112
2113 /* This is called by osc_check_rpcs() to find which objects have pages that
2114  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2115 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2116 {
2117         ENTRY;
2118         /* first return all objects which we already know to have
2119          * pages ready to be stuffed into rpcs */
2120         if (!list_empty(&cli->cl_loi_ready_list))
2121                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2122                                   struct lov_oinfo, loi_cli_item));
2123
2124         /* then if we have cache waiters, return all objects with queued
2125          * writes.  This is especially important when many small files
2126          * have filled up the cache and not been fired into rpcs because
2127          * they don't pass the nr_pending/object threshhold */
2128         if (!list_empty(&cli->cl_cache_waiters) &&
2129             !list_empty(&cli->cl_loi_write_list))
2130                 RETURN(list_entry(cli->cl_loi_write_list.next,
2131                                   struct lov_oinfo, loi_write_item));
2132
2133         /* then return all queued objects when we have an invalid import
2134          * so that they get flushed */
2135         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2136                 if (!list_empty(&cli->cl_loi_write_list))
2137                         RETURN(list_entry(cli->cl_loi_write_list.next,
2138                                           struct lov_oinfo, loi_write_item));
2139                 if (!list_empty(&cli->cl_loi_read_list))
2140                         RETURN(list_entry(cli->cl_loi_read_list.next,
2141                                           struct lov_oinfo, loi_read_item));
2142         }
2143         RETURN(NULL);
2144 }
2145
2146 /* called with the loi list lock held */
2147 static void osc_check_rpcs(struct client_obd *cli)
2148 {
2149         struct lov_oinfo *loi;
2150         int rc = 0, race_counter = 0;
2151         ENTRY;
2152
2153         while ((loi = osc_next_loi(cli)) != NULL) {
2154                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2155
2156                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2157                         break;
2158
2159                 /* attempt some read/write balancing by alternating between
2160                  * reads and writes in an object.  The makes_rpc checks here
2161                  * would be redundant if we were getting read/write work items
2162                  * instead of objects.  we don't want send_oap_rpc to drain a
2163                  * partial read pending queue when we're given this object to
2164                  * do io on writes while there are cache waiters */
2165                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2166                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2167                                               &loi->loi_write_lop);
2168                         if (rc < 0)
2169                                 break;
2170                         if (rc > 0)
2171                                 race_counter = 0;
2172                         else
2173                                 race_counter++;
2174                 }
2175                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2176                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2177                                               &loi->loi_read_lop);
2178                         if (rc < 0)
2179                                 break;
2180                         if (rc > 0)
2181                                 race_counter = 0;
2182                         else
2183                                 race_counter++;
2184                 }
2185
2186                 /* attempt some inter-object balancing by issueing rpcs
2187                  * for each object in turn */
2188                 if (!list_empty(&loi->loi_cli_item))
2189                         list_del_init(&loi->loi_cli_item);
2190                 if (!list_empty(&loi->loi_write_item))
2191                         list_del_init(&loi->loi_write_item);
2192                 if (!list_empty(&loi->loi_read_item))
2193                         list_del_init(&loi->loi_read_item);
2194
2195                 loi_list_maint(cli, loi);
2196
2197                 /* send_oap_rpc fails with 0 when make_ready tells it to
2198                  * back off.  llite's make_ready does this when it tries
2199                  * to lock a page queued for write that is already locked.
2200                  * we want to try sending rpcs from many objects, but we
2201                  * don't want to spin failing with 0.  */
2202                 if (race_counter == 10)
2203                         break;
2204         }
2205         EXIT;
2206 }
2207
2208 /* we're trying to queue a page in the osc so we're subject to the
2209  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2210  * If the osc's queued pages are already at that limit, then we want to sleep
2211  * until there is space in the osc's queue for us.  We also may be waiting for
2212  * write credits from the OST if there are RPCs in flight that may return some
2213  * before we fall back to sync writes.
2214  *
2215  * We need this know our allocation was granted in the presence of signals */
2216 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2217 {
2218         int rc;
2219         ENTRY;
2220         client_obd_list_lock(&cli->cl_loi_list_lock);
2221         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2222         client_obd_list_unlock(&cli->cl_loi_list_lock);
2223         RETURN(rc);
2224 };
2225
2226 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2227  * grant or cache space. */
2228 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2229                            struct osc_async_page *oap)
2230 {
2231         struct osc_cache_waiter ocw;
2232         struct l_wait_info lwi = { 0 };
2233         ENTRY;
2234
2235         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2236                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2237                cli->cl_dirty_max, obd_max_dirty_pages,
2238                cli->cl_lost_grant, cli->cl_avail_grant);
2239
2240         /* force the caller to try sync io.  this can jump the list
2241          * of queued writes and create a discontiguous rpc stream */
2242         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2243             loi->loi_ar.ar_force_sync)
2244                 RETURN(-EDQUOT);
2245
2246         /* Hopefully normal case - cache space and write credits available */
2247         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2248             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2249             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2250                 /* account for ourselves */
2251                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2252                 RETURN(0);
2253         }
2254
2255         /* Make sure that there are write rpcs in flight to wait for.  This
2256          * is a little silly as this object may not have any pending but
2257          * other objects sure might. */
2258         if (cli->cl_w_in_flight) {
2259                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2260                 cfs_waitq_init(&ocw.ocw_waitq);
2261                 ocw.ocw_oap = oap;
2262                 ocw.ocw_rc = 0;
2263
2264                 loi_list_maint(cli, loi);
2265                 osc_check_rpcs(cli);
2266                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2267
2268                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2269                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2270
2271                 client_obd_list_lock(&cli->cl_loi_list_lock);
2272                 if (!list_empty(&ocw.ocw_entry)) {
2273                         list_del(&ocw.ocw_entry);
2274                         RETURN(-EINTR);
2275                 }
2276                 RETURN(ocw.ocw_rc);
2277         }
2278
2279         RETURN(-EDQUOT);
2280 }
2281
2282 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2283                         struct lov_oinfo *loi, cfs_page_t *page,
2284                         obd_off offset, struct obd_async_page_ops *ops,
2285                         void *data, void **res)
2286 {
2287         struct osc_async_page *oap;
2288         ENTRY;
2289
2290         if (!page)
2291                 return size_round(sizeof(*oap));
2292
2293         oap = *res;
2294         oap->oap_magic = OAP_MAGIC;
2295         oap->oap_cli = &exp->exp_obd->u.cli;
2296         oap->oap_loi = loi;
2297
2298         oap->oap_caller_ops = ops;
2299         oap->oap_caller_data = data;
2300
2301         oap->oap_page = page;
2302         oap->oap_obj_off = offset;
2303
2304         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2305         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2306         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2307
2308         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2309
2310         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2311         RETURN(0);
2312 }
2313
2314 struct osc_async_page *oap_from_cookie(void *cookie)
2315 {
2316         struct osc_async_page *oap = cookie;
2317         if (oap->oap_magic != OAP_MAGIC)
2318                 return ERR_PTR(-EINVAL);
2319         return oap;
2320 };
2321
2322 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2323                               struct lov_oinfo *loi, void *cookie,
2324                               int cmd, obd_off off, int count,
2325                               obd_flag brw_flags, enum async_flags async_flags)
2326 {
2327         struct client_obd *cli = &exp->exp_obd->u.cli;
2328         struct osc_async_page *oap;
2329         int rc = 0;
2330         ENTRY;
2331
2332         oap = oap_from_cookie(cookie);
2333         if (IS_ERR(oap))
2334                 RETURN(PTR_ERR(oap));
2335
2336         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2337                 RETURN(-EIO);
2338
2339         if (!list_empty(&oap->oap_pending_item) ||
2340             !list_empty(&oap->oap_urgent_item) ||
2341             !list_empty(&oap->oap_rpc_item))
2342                 RETURN(-EBUSY);
2343
2344         /* check if the file's owner/group is over quota */
2345 #ifdef HAVE_QUOTA_SUPPORT
2346         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2347                 struct obd_async_page_ops *ops;
2348                 struct obdo *oa;
2349
2350                 oa = obdo_alloc();
2351                 if (oa == NULL)
2352                         RETURN(-ENOMEM);
2353
2354                 ops = oap->oap_caller_ops;
2355                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2356                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2357                     NO_QUOTA)
2358                         rc = -EDQUOT;
2359
2360                 obdo_free(oa);
2361                 if (rc)
2362                         RETURN(rc);
2363         }
2364 #endif
2365
2366         if (loi == NULL)
2367                 loi = lsm->lsm_oinfo[0];
2368
2369         client_obd_list_lock(&cli->cl_loi_list_lock);
2370
2371         oap->oap_cmd = cmd;
2372         oap->oap_page_off = off;
2373         oap->oap_count = count;
2374         oap->oap_brw_flags = brw_flags;
2375         oap->oap_async_flags = async_flags;
2376
2377         if (cmd & OBD_BRW_WRITE) {
2378                 rc = osc_enter_cache(cli, loi, oap);
2379                 if (rc) {
2380                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2381                         RETURN(rc);
2382                 }
2383         }
2384
2385         osc_oap_to_pending(oap);
2386         loi_list_maint(cli, loi);
2387
2388         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2389                   cmd);
2390
2391         osc_check_rpcs(cli);
2392         client_obd_list_unlock(&cli->cl_loi_list_lock);
2393
2394         RETURN(0);
2395 }
2396
2397 /* aka (~was & now & flag), but this is more clear :) */
2398 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2399
2400 static int osc_set_async_flags(struct obd_export *exp,
2401                                struct lov_stripe_md *lsm,
2402                                struct lov_oinfo *loi, void *cookie,
2403                                obd_flag async_flags)
2404 {
2405         struct client_obd *cli = &exp->exp_obd->u.cli;
2406         struct loi_oap_pages *lop;
2407         struct osc_async_page *oap;
2408         int rc = 0;
2409         ENTRY;
2410
2411         oap = oap_from_cookie(cookie);
2412         if (IS_ERR(oap))
2413                 RETURN(PTR_ERR(oap));
2414
2415         /*
2416          * bug 7311: OST-side locking is only supported for liblustre for now
2417          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2418          * implementation has to handle case where OST-locked page was picked
2419          * up by, e.g., ->writepage().
2420          */
2421         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2422         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2423                                      * tread here. */
2424
2425         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2426                 RETURN(-EIO);
2427
2428         if (loi == NULL)
2429                 loi = lsm->lsm_oinfo[0];
2430
2431         if (oap->oap_cmd & OBD_BRW_WRITE) {
2432                 lop = &loi->loi_write_lop;
2433         } else {
2434                 lop = &loi->loi_read_lop;
2435         }
2436
2437         client_obd_list_lock(&cli->cl_loi_list_lock);
2438
2439         if (list_empty(&oap->oap_pending_item))
2440                 GOTO(out, rc = -EINVAL);
2441
2442         if ((oap->oap_async_flags & async_flags) == async_flags)
2443                 GOTO(out, rc = 0);
2444
2445         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2446                 oap->oap_async_flags |= ASYNC_READY;
2447
2448         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2449                 if (list_empty(&oap->oap_rpc_item)) {
2450                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2451                         loi_list_maint(cli, loi);
2452                 }
2453         }
2454
2455         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2456                         oap->oap_async_flags);
2457 out:
2458         osc_check_rpcs(cli);
2459         client_obd_list_unlock(&cli->cl_loi_list_lock);
2460         RETURN(rc);
2461 }
2462
2463 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2464                              struct lov_oinfo *loi,
2465                              struct obd_io_group *oig, void *cookie,
2466                              int cmd, obd_off off, int count,
2467                              obd_flag brw_flags,
2468                              obd_flag async_flags)
2469 {
2470         struct client_obd *cli = &exp->exp_obd->u.cli;
2471         struct osc_async_page *oap;
2472         struct loi_oap_pages *lop;
2473         int rc = 0;
2474         ENTRY;
2475
2476         oap = oap_from_cookie(cookie);
2477         if (IS_ERR(oap))
2478                 RETURN(PTR_ERR(oap));
2479
2480         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2481                 RETURN(-EIO);
2482
2483         if (!list_empty(&oap->oap_pending_item) ||
2484             !list_empty(&oap->oap_urgent_item) ||
2485             !list_empty(&oap->oap_rpc_item))
2486                 RETURN(-EBUSY);
2487
2488         if (loi == NULL)
2489                 loi = lsm->lsm_oinfo[0];
2490
2491         client_obd_list_lock(&cli->cl_loi_list_lock);
2492
2493         oap->oap_cmd = cmd;
2494         oap->oap_page_off = off;
2495         oap->oap_count = count;
2496         oap->oap_brw_flags = brw_flags;
2497         oap->oap_async_flags = async_flags;
2498
2499         if (cmd & OBD_BRW_WRITE)
2500                 lop = &loi->loi_write_lop;
2501         else
2502                 lop = &loi->loi_read_lop;
2503
2504         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2505         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2506                 oap->oap_oig = oig;
2507                 rc = oig_add_one(oig, &oap->oap_occ);
2508         }
2509
2510         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2511                   oap, oap->oap_page, rc);
2512
2513         client_obd_list_unlock(&cli->cl_loi_list_lock);
2514
2515         RETURN(rc);
2516 }
2517
2518 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2519                                  struct loi_oap_pages *lop, int cmd)
2520 {
2521         struct list_head *pos, *tmp;
2522         struct osc_async_page *oap;
2523
2524         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2525                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2526                 list_del(&oap->oap_pending_item);
2527                 osc_oap_to_pending(oap);
2528         }
2529         loi_list_maint(cli, loi);
2530 }
2531
2532 static int osc_trigger_group_io(struct obd_export *exp,
2533                                 struct lov_stripe_md *lsm,
2534                                 struct lov_oinfo *loi,
2535                                 struct obd_io_group *oig)
2536 {
2537         struct client_obd *cli = &exp->exp_obd->u.cli;
2538         ENTRY;
2539
2540         if (loi == NULL)
2541                 loi = lsm->lsm_oinfo[0];
2542
2543         client_obd_list_lock(&cli->cl_loi_list_lock);
2544
2545         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2546         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2547
2548         osc_check_rpcs(cli);
2549         client_obd_list_unlock(&cli->cl_loi_list_lock);
2550
2551         RETURN(0);
2552 }
2553
2554 static int osc_teardown_async_page(struct obd_export *exp,
2555                                    struct lov_stripe_md *lsm,
2556                                    struct lov_oinfo *loi, void *cookie)
2557 {
2558         struct client_obd *cli = &exp->exp_obd->u.cli;
2559         struct loi_oap_pages *lop;
2560         struct osc_async_page *oap;
2561         int rc = 0;
2562         ENTRY;
2563
2564         oap = oap_from_cookie(cookie);
2565         if (IS_ERR(oap))
2566                 RETURN(PTR_ERR(oap));
2567
2568         if (loi == NULL)
2569                 loi = lsm->lsm_oinfo[0];
2570
2571         if (oap->oap_cmd & OBD_BRW_WRITE) {
2572                 lop = &loi->loi_write_lop;
2573         } else {
2574                 lop = &loi->loi_read_lop;
2575         }
2576
2577         client_obd_list_lock(&cli->cl_loi_list_lock);
2578
2579         if (!list_empty(&oap->oap_rpc_item))
2580                 GOTO(out, rc = -EBUSY);
2581
2582         osc_exit_cache(cli, oap, 0);
2583         osc_wake_cache_waiters(cli);
2584
2585         if (!list_empty(&oap->oap_urgent_item)) {
2586                 list_del_init(&oap->oap_urgent_item);
2587                 oap->oap_async_flags &= ~ASYNC_URGENT;
2588         }
2589         if (!list_empty(&oap->oap_pending_item)) {
2590                 list_del_init(&oap->oap_pending_item);
2591                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2592         }
2593         loi_list_maint(cli, loi);
2594
2595         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2596 out:
2597         client_obd_list_unlock(&cli->cl_loi_list_lock);
2598         RETURN(rc);
2599 }
2600
2601 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2602                                     int flags)
2603 {
2604         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2605
2606         if (lock == NULL) {
2607                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2608                 return;
2609         }
2610         lock_res_and_lock(lock);
2611 #ifdef __KERNEL__
2612 #ifdef __LINUX__
2613         /* Liang XXX: Darwin and Winnt checking should be added */
2614         if (lock->l_ast_data && lock->l_ast_data != data) {
2615                 struct inode *new_inode = data;
2616                 struct inode *old_inode = lock->l_ast_data;
2617                 if (!(old_inode->i_state & I_FREEING))
2618                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2619                 LASSERTF(old_inode->i_state & I_FREEING,
2620                          "Found existing inode %p/%lu/%u state %lu in lock: "
2621                          "setting data to %p/%lu/%u\n", old_inode,
2622                          old_inode->i_ino, old_inode->i_generation,
2623                          old_inode->i_state,
2624                          new_inode, new_inode->i_ino, new_inode->i_generation);
2625         }
2626 #endif
2627 #endif
2628         lock->l_ast_data = data;
2629         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2630         unlock_res_and_lock(lock);
2631         LDLM_LOCK_PUT(lock);
2632 }
2633
2634 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2635                              ldlm_iterator_t replace, void *data)
2636 {
2637         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2638         struct obd_device *obd = class_exp2obd(exp);
2639
2640         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2641         return 0;
2642 }
2643
2644 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2645                             int intent, int rc)
2646 {
2647         ENTRY;
2648
2649         if (intent) {
2650                 /* The request was created before ldlm_cli_enqueue call. */
2651                 if (rc == ELDLM_LOCK_ABORTED) {
2652                         struct ldlm_reply *rep;
2653
2654                         /* swabbed by ldlm_cli_enqueue() */
2655                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2656                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2657                                              sizeof(*rep));
2658                         LASSERT(rep != NULL);
2659                         if (rep->lock_policy_res1)
2660                                 rc = rep->lock_policy_res1;
2661                 }
2662         }
2663
2664         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2665                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2666                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2667                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2668                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2669         }
2670
2671         /* Call the update callback. */
2672         rc = oinfo->oi_cb_up(oinfo, rc);
2673         RETURN(rc);
2674 }
2675
2676 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2677                                  struct osc_enqueue_args *aa, int rc)
2678 {
2679         int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2680         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2681         struct ldlm_lock *lock;
2682
2683         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2684          * be valid. */
2685         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2686
2687         /* Complete obtaining the lock procedure. */
2688         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2689                                    aa->oa_ei->ei_mode,
2690                                    &aa->oa_ei->ei_flags,
2691                                    &lsm->lsm_oinfo[0]->loi_lvb,
2692                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2693                                    lustre_swab_ost_lvb,
2694                                    aa->oa_oi->oi_lockh, rc);
2695
2696         /* Complete osc stuff. */
2697         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2698
2699         /* Release the lock for async request. */
2700         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2701                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2702
2703         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2704                  aa->oa_oi->oi_lockh, req, aa);
2705         LDLM_LOCK_PUT(lock);
2706         return rc;
2707 }
2708
2709 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2710  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2711  * other synchronous requests, however keeping some locks and trying to obtain
2712  * others may take a considerable amount of time in a case of ost failure; and
2713  * when other sync requests do not get released lock from a client, the client
2714  * is excluded from the cluster -- such scenarious make the life difficult, so
2715  * release locks just after they are obtained. */
2716 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2717                        struct obd_enqueue_info *einfo)
2718 {
2719         struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2720         struct obd_device *obd = exp->exp_obd;
2721         struct ldlm_reply *rep;
2722         struct ptlrpc_request *req = NULL;
2723         int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2724         int rc;
2725         ENTRY;
2726
2727         /* Filesystem lock extents are extended to page boundaries so that
2728          * dealing with the page cache is a little smoother.  */
2729         oinfo->oi_policy.l_extent.start -=
2730                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2731         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2732
2733         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2734                 goto no_match;
2735
2736         /* Next, search for already existing extent locks that will cover us */
2737         rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2738                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2739                              oinfo->oi_lockh);
2740         if (rc == 1) {
2741                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2742                                         einfo->ei_flags);
2743                 if (intent) {
2744                         /* I would like to be able to ASSERT here that rss <=
2745                          * kms, but I can't, for reasons which are explained in
2746                          * lov_enqueue() */
2747                 }
2748
2749                 /* We already have a lock, and it's referenced */
2750                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2751
2752                 /* For async requests, decref the lock. */
2753                 if (einfo->ei_rqset)
2754                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2755
2756                 RETURN(ELDLM_OK);
2757         }
2758
2759         /* If we're trying to read, we also search for an existing PW lock.  The
2760          * VFS and page cache already protect us locally, so lots of readers/
2761          * writers can share a single PW lock.
2762          *
2763          * There are problems with conversion deadlocks, so instead of
2764          * converting a read lock to a write lock, we'll just enqueue a new
2765          * one.
2766          *
2767          * At some point we should cancel the read lock instead of making them
2768          * send us a blocking callback, but there are problems with canceling
2769          * locks out from other users right now, too. */
2770
2771         if (einfo->ei_mode == LCK_PR) {
2772                 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2773                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2774                                      LCK_PW, oinfo->oi_lockh);
2775                 if (rc == 1) {
2776                         /* FIXME: This is not incredibly elegant, but it might
2777                          * be more elegant than adding another parameter to
2778                          * lock_match.  I want a second opinion. */
2779                         /* addref the lock only if not async requests. */
2780                         if (!einfo->ei_rqset)
2781                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2782                         osc_set_data_with_check(oinfo->oi_lockh,
2783                                                 einfo->ei_cbdata,
2784                                                 einfo->ei_flags);
2785                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2786                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2787                         RETURN(ELDLM_OK);
2788                 }
2789         }
2790
2791  no_match:
2792         if (intent) {
2793                 int size[3] = {
2794                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2795                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
2796
2797                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2798                                       LDLM_ENQUEUE, 2, size, NULL);
2799                 if (req == NULL)
2800                         RETURN(-ENOMEM);
2801
2802                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2803                 size[DLM_REPLY_REC_OFF] = 
2804                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2805                 ptlrpc_req_set_repsize(req, 3, size);
2806         }
2807
2808         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2809         einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2810
2811         rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2812                               &oinfo->oi_policy, einfo->ei_mode,
2813                               &einfo->ei_flags, einfo->ei_cb_bl,
2814                               einfo->ei_cb_cp, einfo->ei_cb_gl,
2815                               einfo->ei_cbdata,
2816                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2817                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2818                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2819                               einfo->ei_rqset ? 1 : 0);
2820         if (einfo->ei_rqset) {
2821                 if (!rc) {
2822                         struct osc_enqueue_args *aa;
2823                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2824                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2825                         aa->oa_oi = oinfo;
2826                         aa->oa_ei = einfo;
2827                         aa->oa_exp = exp;
2828
2829                         req->rq_interpret_reply = osc_enqueue_interpret;
2830                         ptlrpc_set_add_req(einfo->ei_rqset, req);
2831                 } else if (intent) {
2832                         ptlrpc_req_finished(req);
2833                 }
2834                 RETURN(rc);
2835         }
2836
2837         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2838         if (intent)
2839                 ptlrpc_req_finished(req);
2840
2841         RETURN(rc);
2842 }
2843
2844 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2845                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2846                      int *flags, void *data, struct lustre_handle *lockh)
2847 {
2848         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2849         struct obd_device *obd = exp->exp_obd;
2850         int rc;
2851         int lflags = *flags;
2852         ENTRY;
2853
2854         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2855
2856         /* Filesystem lock extents are extended to page boundaries so that
2857          * dealing with the page cache is a little smoother */
2858         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2859         policy->l_extent.end |= ~CFS_PAGE_MASK;
2860
2861         /* Next, search for already existing extent locks that will cover us */
2862         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2863                              policy, mode, lockh);
2864         if (rc) {
2865                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2866                         osc_set_data_with_check(lockh, data, lflags);
2867                 RETURN(rc);
2868         }
2869         /* If we're trying to read, we also search for an existing PW lock.  The
2870          * VFS and page cache already protect us locally, so lots of readers/
2871          * writers can share a single PW lock. */
2872         if (mode == LCK_PR) {
2873                 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, 
2874                                      &res_id, type,
2875                                      policy, LCK_PW, lockh);
2876                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2877                         /* FIXME: This is not incredibly elegant, but it might
2878                          * be more elegant than adding another parameter to
2879                          * lock_match.  I want a second opinion. */
2880                         osc_set_data_with_check(lockh, data, lflags);
2881                         ldlm_lock_addref(lockh, LCK_PR);
2882                         ldlm_lock_decref(lockh, LCK_PW);
2883                 }
2884         }
2885         RETURN(rc);
2886 }
2887
2888 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2889                       __u32 mode, struct lustre_handle *lockh)
2890 {
2891         ENTRY;
2892
2893         if (unlikely(mode == LCK_GROUP))
2894                 ldlm_lock_decref_and_cancel(lockh, mode);
2895         else
2896                 ldlm_lock_decref(lockh, mode);
2897
2898         RETURN(0);
2899 }
2900
2901 static int osc_cancel_unused(struct obd_export *exp,
2902                              struct lov_stripe_md *lsm, int flags, void *opaque)
2903 {
2904         struct obd_device *obd = class_exp2obd(exp);
2905         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2906
2907         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2908                                       opaque);
2909 }
2910
2911 static int osc_join_lru(struct obd_export *exp,
2912                         struct lov_stripe_md *lsm, int join)
2913 {
2914         struct obd_device *obd = class_exp2obd(exp);
2915         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2916
2917         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2918 }
2919
2920 static int osc_statfs_interpret(struct ptlrpc_request *req,
2921                                 struct osc_async_args *aa, int rc)
2922 {
2923         struct obd_statfs *msfs;
2924         ENTRY;
2925
2926         if (rc != 0)
2927                 GOTO(out, rc);
2928
2929         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2930                                   lustre_swab_obd_statfs);
2931         if (msfs == NULL) {
2932                 CERROR("Can't unpack obd_statfs\n");
2933                 GOTO(out, rc = -EPROTO);
2934         }
2935
2936         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2937 out:
2938         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2939         RETURN(rc);
2940 }
2941
2942 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2943                             __u64 max_age, struct ptlrpc_request_set *rqset)
2944 {
2945         struct ptlrpc_request *req;
2946         struct osc_async_args *aa;
2947         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2948         ENTRY;
2949
2950         /* We could possibly pass max_age in the request (as an absolute
2951          * timestamp or a "seconds.usec ago") so the target can avoid doing
2952          * extra calls into the filesystem if that isn't necessary (e.g.
2953          * during mount that would help a bit).  Having relative timestamps
2954          * is not so great if request processing is slow, while absolute
2955          * timestamps are not ideal because they need time synchronization. */
2956         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2957                               OST_STATFS, 1, NULL, NULL);
2958         if (!req)
2959                 RETURN(-ENOMEM);
2960
2961         ptlrpc_req_set_repsize(req, 2, size);
2962         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2963
2964         req->rq_interpret_reply = osc_statfs_interpret;
2965         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2966         aa = (struct osc_async_args *)&req->rq_async_args;
2967         aa->aa_oi = oinfo;
2968
2969         ptlrpc_set_add_req(rqset, req);
2970         RETURN(0);
2971 }
2972
2973 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2974                       __u64 max_age)
2975 {
2976         struct obd_statfs *msfs;
2977         struct ptlrpc_request *req;
2978         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2979         ENTRY;
2980
2981         /* We could possibly pass max_age in the request (as an absolute
2982          * timestamp or a "seconds.usec ago") so the target can avoid doing
2983          * extra calls into the filesystem if that isn't necessary (e.g.
2984          * during mount that would help a bit).  Having relative timestamps
2985          * is not so great if request processing is slow, while absolute
2986          * timestamps are not ideal because they need time synchronization. */
2987         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2988                               OST_STATFS, 1, NULL, NULL);
2989         if (!req)
2990                 RETURN(-ENOMEM);
2991
2992         ptlrpc_req_set_repsize(req, 2, size);
2993         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2994
2995         rc = ptlrpc_queue_wait(req);
2996         if (rc)
2997                 GOTO(out, rc);
2998
2999         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3000                                   lustre_swab_obd_statfs);
3001         if (msfs == NULL) {
3002                 CERROR("Can't unpack obd_statfs\n");
3003                 GOTO(out, rc = -EPROTO);
3004         }
3005
3006         memcpy(osfs, msfs, sizeof(*osfs));
3007
3008         EXIT;
3009  out:
3010         ptlrpc_req_finished(req);
3011         return rc;
3012 }
3013
3014 /* Retrieve object striping information.
3015  *
3016  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3017  * the maximum number of OST indices which will fit in the user buffer.
3018  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3019  */
3020 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3021 {
3022         struct lov_user_md lum, *lumk;
3023         int rc = 0, lum_size;
3024         ENTRY;
3025
3026         if (!lsm)
3027                 RETURN(-ENODATA);
3028
3029         if (copy_from_user(&lum, lump, sizeof(lum)))
3030                 RETURN(-EFAULT);
3031
3032         if (lum.lmm_magic != LOV_USER_MAGIC)
3033                 RETURN(-EINVAL);
3034
3035         if (lum.lmm_stripe_count > 0) {
3036                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3037                 OBD_ALLOC(lumk, lum_size);
3038                 if (!lumk)
3039                         RETURN(-ENOMEM);
3040
3041                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3042         } else {
3043                 lum_size = sizeof(lum);
3044                 lumk = &lum;
3045         }
3046
3047         lumk->lmm_object_id = lsm->lsm_object_id;
3048         lumk->lmm_stripe_count = 1;
3049
3050         if (copy_to_user(lump, lumk, lum_size))
3051                 rc = -EFAULT;
3052
3053         if (lumk != &lum)
3054                 OBD_FREE(lumk, lum_size);
3055
3056         RETURN(rc);
3057 }
3058
3059
3060 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3061                          void *karg, void *uarg)
3062 {
3063         struct obd_device *obd = exp->exp_obd;
3064         struct obd_ioctl_data *data = karg;
3065         int err = 0;
3066         ENTRY;
3067
3068 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3069         MOD_INC_USE_COUNT;
3070 #else
3071         if (!try_module_get(THIS_MODULE)) {
3072                 CERROR("Can't get module. Is it alive?");
3073                 return -EINVAL;
3074         }
3075 #endif
3076         switch (cmd) {
3077         case OBD_IOC_LOV_GET_CONFIG: {
3078                 char *buf;
3079                 struct lov_desc *desc;
3080                 struct obd_uuid uuid;
3081
3082                 buf = NULL;
3083                 len = 0;
3084                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3085                         GOTO(out, err = -EINVAL);
3086
3087                 data = (struct obd_ioctl_data *)buf;
3088
3089                 if (sizeof(*desc) > data->ioc_inllen1) {
3090                         obd_ioctl_freedata(buf, len);
3091                         GOTO(out, err = -EINVAL);
3092                 }
3093
3094                 if (data->ioc_inllen2 < sizeof(uuid)) {
3095                         obd_ioctl_freedata(buf, len);
3096                         GOTO(out, err = -EINVAL);
3097                 }
3098
3099                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3100                 desc->ld_tgt_count = 1;
3101                 desc->ld_active_tgt_count = 1;
3102                 desc->ld_default_stripe_count = 1;
3103                 desc->ld_default_stripe_size = 0;
3104                 desc->ld_default_stripe_offset = 0;
3105                 desc->ld_pattern = 0;
3106                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3107
3108                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3109
3110                 err = copy_to_user((void *)uarg, buf, len);
3111                 if (err)
3112                         err = -EFAULT;
3113                 obd_ioctl_freedata(buf, len);
3114                 GOTO(out, err);
3115         }
3116         case LL_IOC_LOV_SETSTRIPE:
3117                 err = obd_alloc_memmd(exp, karg);
3118                 if (err > 0)
3119                         err = 0;
3120                 GOTO(out, err);
3121         case LL_IOC_LOV_GETSTRIPE:
3122                 err = osc_getstripe(karg, uarg);
3123                 GOTO(out, err);
3124         case OBD_IOC_CLIENT_RECOVER:
3125                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3126                                             data->ioc_inlbuf1);
3127                 if (err > 0)
3128                         err = 0;
3129                 GOTO(out, err);
3130         case IOC_OSC_SET_ACTIVE:
3131                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3132                                                data->ioc_offset);
3133                 GOTO(out, err);
3134         case OBD_IOC_POLL_QUOTACHECK:
3135                 err = lquota_poll_check(quota_interface, exp,
3136                                         (struct if_quotacheck *)karg);
3137                 GOTO(out, err);
3138         default:
3139                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3140                        cmd, cfs_curproc_comm());
3141                 GOTO(out, err = -ENOTTY);
3142         }
3143 out:
3144 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3145         MOD_DEC_USE_COUNT;
3146 #else
3147         module_put(THIS_MODULE);
3148 #endif
3149         return err;
3150 }
3151
3152 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3153                         void *key, __u32 *vallen, void *val)
3154 {
3155         ENTRY;
3156         if (!vallen || !val)
3157                 RETURN(-EFAULT);
3158
3159         if (keylen > strlen("lock_to_stripe") &&
3160             strcmp(key, "lock_to_stripe") == 0) {
3161                 __u32 *stripe = val;
3162                 *vallen = sizeof(*stripe);
3163                 *stripe = 0;
3164                 RETURN(0);
3165         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3166                 struct ptlrpc_request *req;
3167                 obd_id *reply;
3168                 char *bufs[2] = { NULL, key };
3169                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3170
3171                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3172                                       OST_GET_INFO, 2, size, bufs);
3173                 if (req == NULL)
3174                         RETURN(-ENOMEM);
3175
3176                 size[REPLY_REC_OFF] = *vallen;
3177                 ptlrpc_req_set_repsize(req, 2, size);
3178                 rc = ptlrpc_queue_wait(req);
3179                 if (rc)
3180                         GOTO(out, rc);
3181
3182                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3183                                            lustre_swab_ost_last_id);
3184                 if (reply == NULL) {
3185                         CERROR("Can't unpack OST last ID\n");
3186                         GOTO(out, rc = -EPROTO);
3187                 }
3188                 *((obd_id *)val) = *reply;
3189         out:
3190                 ptlrpc_req_finished(req);
3191                 RETURN(rc);
3192         }
3193         RETURN(-EINVAL);
3194 }
3195
3196 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3197                                           void *aa, int rc)
3198 {
3199         struct llog_ctxt *ctxt;
3200         struct obd_import *imp = req->rq_import;
3201         ENTRY;
3202
3203         if (rc != 0)
3204                 RETURN(rc);
3205
3206         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3207         if (ctxt) {
3208                 if (rc == 0)
3209                         rc = llog_initiator_connect(ctxt);
3210                 else
3211                         CERROR("cannot establish connection for "
3212                                "ctxt %p: %d\n", ctxt, rc);
3213         }
3214
3215         imp->imp_server_timeout = 1;
3216         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3217         imp->imp_pingable = 1;
3218
3219         RETURN(rc);
3220 }
3221
3222 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3223                               void *key, obd_count vallen, void *val,
3224                               struct ptlrpc_request_set *set)
3225 {
3226         struct ptlrpc_request *req;
3227         struct obd_device  *obd = exp->exp_obd;
3228         struct obd_import *imp = class_exp2cliimp(exp);
3229         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3230         char *bufs[3] = { NULL, key, val };
3231         ENTRY;
3232
3233         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3234
3235         if (KEY_IS(KEY_NEXT_ID)) {
3236                 if (vallen != sizeof(obd_id))
3237                         RETURN(-EINVAL);
3238                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3239                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3240                        exp->exp_obd->obd_name,
3241                        obd->u.cli.cl_oscc.oscc_next_id);
3242
3243                 RETURN(0);
3244         }
3245
3246         if (KEY_IS("unlinked")) {
3247                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3248                 spin_lock(&oscc->oscc_lock);
3249                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3250                 spin_unlock(&oscc->oscc_lock);
3251                 RETURN(0);
3252         }
3253
3254         if (KEY_IS(KEY_INIT_RECOV)) {
3255                 if (vallen != sizeof(int))
3256                         RETURN(-EINVAL);
3257                 imp->imp_initial_recov = *(int *)val;
3258                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3259                        exp->exp_obd->obd_name,
3260                        imp->imp_initial_recov);
3261                 RETURN(0);
3262         }
3263
3264         if (KEY_IS("checksum")) {
3265                 if (vallen != sizeof(int))
3266                         RETURN(-EINVAL);
3267                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3268                 RETURN(0);
3269         }
3270
3271         if (!set)
3272                 RETURN(-EINVAL);
3273
3274         /* We pass all other commands directly to OST. Since nobody calls osc
3275            methods directly and everybody is supposed to go through LOV, we
3276            assume lov checked invalid values for us.
3277            The only recognised values so far are evict_by_nid and mds_conn.
3278            Even if something bad goes through, we'd get a -EINVAL from OST
3279            anyway. */
3280
3281         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3282                               bufs);
3283         if (req == NULL)
3284                 RETURN(-ENOMEM);
3285
3286         if (KEY_IS("mds_conn"))
3287                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3288
3289         ptlrpc_req_set_repsize(req, 1, NULL);
3290         ptlrpc_set_add_req(set, req);
3291         ptlrpc_check_set(set);
3292
3293         RETURN(0);
3294 }
3295
3296
3297 static struct llog_operations osc_size_repl_logops = {
3298         lop_cancel: llog_obd_repl_cancel
3299 };
3300
3301 static struct llog_operations osc_mds_ost_orig_logops;
3302 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3303                          int count, struct llog_catid *catid, 
3304                          struct obd_uuid *uuid)
3305 {
3306         int rc;
3307         ENTRY;
3308
3309         spin_lock(&obd->obd_dev_lock);
3310         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3311                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3312                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3313                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3314                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3315                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3316         }
3317         spin_unlock(&obd->obd_dev_lock);
3318
3319         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3320                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3321         if (rc) {
3322                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3323                 GOTO (out, rc);
3324         }
3325
3326         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3327                         &osc_size_repl_logops);
3328         if (rc) 
3329                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3330 out:
3331         if (rc) {
3332                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3333                        obd->obd_name, tgt->obd_name, count, catid, rc);
3334                 CERROR("logid "LPX64":0x%x\n",
3335                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3336         }
3337         RETURN(rc);
3338 }
3339
3340 static int osc_llog_finish(struct obd_device *obd, int count)
3341 {
3342         struct llog_ctxt *ctxt;
3343         int rc = 0, rc2 = 0;
3344         ENTRY;
3345
3346         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3347         if (ctxt)
3348                 rc = llog_cleanup(ctxt);
3349
3350         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3351         if (ctxt)
3352                 rc2 = llog_cleanup(ctxt);
3353         if (!rc)
3354                 rc = rc2;
3355
3356         RETURN(rc);
3357 }
3358
3359 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3360                          struct obd_uuid *cluuid,
3361                          struct obd_connect_data *data)
3362 {
3363         struct client_obd *cli = &obd->u.cli;
3364
3365         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3366                 long lost_grant;
3367
3368                 client_obd_list_lock(&cli->cl_loi_list_lock);
3369                 data->ocd_grant = cli->cl_avail_grant ?:
3370                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3371                 lost_grant = cli->cl_lost_grant;
3372                 cli->cl_lost_grant = 0;
3373                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3374
3375                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3376                        "cl_lost_grant: %ld\n", data->ocd_grant,
3377                        cli->cl_avail_grant, lost_grant);
3378                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3379                        " ocd_grant: %d\n", data->ocd_connect_flags,
3380                        data->ocd_version, data->ocd_grant);
3381         }
3382
3383         RETURN(0);
3384 }
3385
3386 static int osc_disconnect(struct obd_export *exp)
3387 {
3388         struct obd_device *obd = class_exp2obd(exp);
3389         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3390         int rc;
3391
3392         if (obd->u.cli.cl_conn_count == 1)
3393                 /* flush any remaining cancel messages out to the target */
3394                 llog_sync(ctxt, exp);
3395
3396         rc = client_disconnect_export(exp);
3397         return rc;
3398 }
3399
3400 static int osc_import_event(struct obd_device *obd,
3401                             struct obd_import *imp,
3402                             enum obd_import_event event)
3403 {
3404         struct client_obd *cli;
3405         int rc = 0;
3406
3407         ENTRY;
3408         LASSERT(imp->imp_obd == obd);
3409
3410         switch (event) {
3411         case IMP_EVENT_DISCON: {
3412                 /* Only do this on the MDS OSC's */
3413                 if (imp->imp_server_timeout) {
3414                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3415
3416                         spin_lock(&oscc->oscc_lock);
3417                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3418                         spin_unlock(&oscc->oscc_lock);
3419                 }
3420
3421                 break;
3422         }
3423         case IMP_EVENT_INACTIVE: {
3424                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3425                 break;
3426         }
3427         case IMP_EVENT_INVALIDATE: {
3428                 struct ldlm_namespace *ns = obd->obd_namespace;
3429
3430                 /* Reset grants */
3431                 cli = &obd->u.cli;
3432                 client_obd_list_lock(&cli->cl_loi_list_lock);
3433                 cli->cl_avail_grant = 0;
3434                 cli->cl_lost_grant = 0;
3435                 /* all pages go to failing rpcs due to the invalid import */
3436                 osc_check_rpcs(cli);
3437                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3438
3439                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3440
3441                 break;
3442         }
3443         case IMP_EVENT_ACTIVE: {
3444                 /* Only do this on the MDS OSC's */
3445                 if (imp->imp_server_timeout) {
3446                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3447
3448                         spin_lock(&oscc->oscc_lock);
3449                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3450                         spin_unlock(&oscc->oscc_lock);
3451                 }
3452                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3453                 break;
3454         }
3455         case IMP_EVENT_OCD: {
3456                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3457
3458                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3459                         osc_init_grant(&obd->u.cli, ocd);
3460
3461                 /* See bug 7198 */
3462                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3463                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3464
3465                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3466                 break;
3467         }
3468         default:
3469                 CERROR("Unknown import event %d\n", event);
3470                 LBUG();
3471         }
3472         RETURN(rc);
3473 }
3474
3475 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3476 {
3477         int rc;
3478         ENTRY;
3479
3480         ENTRY;
3481         rc = ptlrpcd_addref();
3482         if (rc)
3483                 RETURN(rc);
3484
3485         rc = client_obd_setup(obd, len, buf);
3486         if (rc) {
3487                 ptlrpcd_decref();
3488         } else {
3489                 struct lprocfs_static_vars lvars;
3490                 struct client_obd *cli = &obd->u.cli;
3491
3492                 lprocfs_init_vars(osc, &lvars);
3493                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3494                         lproc_osc_attach_seqstat(obd);
3495                         ptlrpc_lprocfs_register_obd(obd);
3496                 }
3497
3498                 oscc_init(obd);
3499                 /* We need to allocate a few requests more, because
3500                    brw_interpret_oap tries to create new requests before freeing
3501                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3502                    reserved, but I afraid that might be too much wasted RAM
3503                    in fact, so 2 is just my guess and still should work. */
3504                 cli->cl_import->imp_rq_pool =
3505                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3506                                             OST_MAXREQSIZE,
3507                                             ptlrpc_add_rqs_to_pool);
3508         }
3509
3510         RETURN(rc);
3511 }
3512
3513 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3514 {
3515         int rc = 0;
3516         ENTRY;
3517
3518         switch (stage) {
3519         case OBD_CLEANUP_EARLY: {
3520                 struct obd_import *imp;
3521                 imp = obd->u.cli.cl_import;
3522                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3523                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3524                 ptlrpc_deactivate_import(imp);
3525                 break;
3526         }
3527         case OBD_CLEANUP_EXPORTS: {
3528                 /* If we set up but never connected, the
3529                    client import will not have been cleaned. */
3530                 if (obd->u.cli.cl_import) {
3531                         struct obd_import *imp;
3532                         imp = obd->u.cli.cl_import;
3533                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3534                                obd->obd_name);
3535                         ptlrpc_invalidate_import(imp);
3536                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3537                         class_destroy_import(imp);
3538                         obd->u.cli.cl_import = NULL;
3539                 }
3540                 break;
3541         }
3542         case OBD_CLEANUP_SELF_EXP:
3543                 rc = obd_llog_finish(obd, 0);
3544                 if (rc != 0)
3545                         CERROR("failed to cleanup llogging subsystems\n");
3546                 break;
3547         case OBD_CLEANUP_OBD:
3548                 break;
3549         }
3550         RETURN(rc);
3551 }
3552
3553 int osc_cleanup(struct obd_device *obd)
3554 {
3555         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3556         int rc;
3557
3558         ENTRY;
3559         ptlrpc_lprocfs_unregister_obd(obd);
3560         lprocfs_obd_cleanup(obd);
3561
3562         spin_lock(&oscc->oscc_lock);
3563         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3564         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3565         spin_unlock(&oscc->oscc_lock);
3566
3567         /* free memory of osc quota cache */
3568         lquota_cleanup(quota_interface, obd);
3569
3570         rc = client_obd_cleanup(obd);
3571
3572         ptlrpcd_decref();
3573         RETURN(rc);
3574 }
3575
3576 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3577 {
3578         struct lustre_cfg *lcfg = buf;
3579         struct lprocfs_static_vars lvars;
3580         int rc = 0;
3581
3582         lprocfs_init_vars(osc, &lvars);
3583
3584         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3585         return(rc);
3586 }
3587
3588 struct obd_ops osc_obd_ops = {
3589         .o_owner                = THIS_MODULE,
3590         .o_setup                = osc_setup,
3591         .o_precleanup           = osc_precleanup,
3592         .o_cleanup              = osc_cleanup,
3593         .o_add_conn             = client_import_add_conn,
3594         .o_del_conn             = client_import_del_conn,
3595         .o_connect              = client_connect_import,
3596         .o_reconnect            = osc_reconnect,
3597         .o_disconnect           = osc_disconnect,
3598         .o_statfs               = osc_statfs,
3599         .o_statfs_async         = osc_statfs_async,
3600         .o_packmd               = osc_packmd,
3601         .o_unpackmd             = osc_unpackmd,
3602         .o_create               = osc_create,
3603         .o_destroy              = osc_destroy,
3604         .o_getattr              = osc_getattr,
3605         .o_getattr_async        = osc_getattr_async,
3606         .o_setattr              = osc_setattr,
3607         .o_setattr_async        = osc_setattr_async,
3608         .o_brw                  = osc_brw,
3609         .o_brw_async            = osc_brw_async,
3610         .o_prep_async_page      = osc_prep_async_page,
3611         .o_queue_async_io       = osc_queue_async_io,
3612         .o_set_async_flags      = osc_set_async_flags,
3613         .o_queue_group_io       = osc_queue_group_io,
3614         .o_trigger_group_io     = osc_trigger_group_io,
3615         .o_teardown_async_page  = osc_teardown_async_page,
3616         .o_punch                = osc_punch,
3617         .o_sync                 = osc_sync,
3618         .o_enqueue              = osc_enqueue,
3619         .o_match                = osc_match,
3620         .o_change_cbdata        = osc_change_cbdata,
3621         .o_cancel               = osc_cancel,
3622         .o_cancel_unused        = osc_cancel_unused,
3623         .o_join_lru             = osc_join_lru,
3624         .o_iocontrol            = osc_iocontrol,
3625         .o_get_info             = osc_get_info,
3626         .o_set_info_async       = osc_set_info_async,
3627         .o_import_event         = osc_import_event,
3628         .o_llog_init            = osc_llog_init,
3629         .o_llog_finish          = osc_llog_finish,
3630         .o_process_config       = osc_process_config,
3631 };
3632
3633 int __init osc_init(void)
3634 {
3635         struct lprocfs_static_vars lvars;
3636         int rc;
3637         ENTRY;
3638
3639         lprocfs_init_vars(osc, &lvars);
3640
3641         request_module("lquota");
3642         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3643         lquota_init(quota_interface);
3644         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3645
3646         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3647                                  LUSTRE_OSC_NAME);
3648         if (rc) {
3649                 if (quota_interface)
3650                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3651                 RETURN(rc);
3652         }
3653
3654         RETURN(rc);
3655 }
3656
3657 #ifdef __KERNEL__
3658 static void /*__exit*/ osc_exit(void)
3659 {
3660         lquota_exit(quota_interface);
3661         if (quota_interface)
3662                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3663
3664         class_unregister_type(LUSTRE_OSC_NAME);
3665 }
3666
3667 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3668 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3669 MODULE_LICENSE("GPL");
3670
3671 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3672 #endif