Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  struct osc_async_args *aa, int rc)
168 {
169         struct ost_body *body;
170         ENTRY;
171
172         if (rc != 0)
173                 GOTO(out, rc);
174
175         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
176                                   lustre_swab_ost_body);
177         if (body) {
178                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
179                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
180
181                 /* This should really be sent by the OST */
182                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
183                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
184         } else {
185                 CERROR("can't unpack ost_body\n");
186                 rc = -EPROTO;
187                 aa->aa_oi->oi_oa->o_valid = 0;
188         }
189 out:
190         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
191         RETURN(rc);
192 }
193
194 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
195                              struct ptlrpc_request_set *set)
196 {
197         struct ptlrpc_request *req;
198         struct ost_body *body;
199         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
200         struct osc_async_args *aa;
201         ENTRY;
202
203         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
204                               OST_GETATTR, 2, size,NULL);
205         if (!req)
206                 RETURN(-ENOMEM);
207
208         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
209         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
210
211         ptlrpc_req_set_repsize(req, 2, size);
212         req->rq_interpret_reply = osc_getattr_interpret;
213
214         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
215         aa = ptlrpc_req_async_args(req);
216         aa->aa_oi = oinfo;
217
218         ptlrpc_set_add_req(set, req);
219         RETURN (0);
220 }
221
222 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
223 {
224         struct ptlrpc_request *req;
225         struct ost_body *body;
226         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
227         int rc;
228         ENTRY;
229
230         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
231                               OST_GETATTR, 2, size, NULL);
232         if (!req)
233                 RETURN(-ENOMEM);
234
235         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
236         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
237
238         ptlrpc_req_set_repsize(req, 2, size);
239
240         rc = ptlrpc_queue_wait(req);
241         if (rc) {
242                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
243                 GOTO(out, rc);
244         }
245
246         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
247                                   lustre_swab_ost_body);
248         if (body == NULL) {
249                 CERROR ("can't unpack ost_body\n");
250                 GOTO (out, rc = -EPROTO);
251         }
252
253         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
254         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
255
256         /* This should really be sent by the OST */
257         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
258         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
259
260         EXIT;
261  out:
262         ptlrpc_req_finished(req);
263         return rc;
264 }
265
266 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
267                        struct obd_trans_info *oti)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body *body;
271         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
272         int rc;
273         ENTRY;
274
275         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
276                               OST_SETATTR, 2, size, NULL);
277         if (!req)
278                 RETURN(-ENOMEM);
279
280         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
281         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
282
283         ptlrpc_req_set_repsize(req, 2, size);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
290                                   lustre_swab_ost_body);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
295
296         EXIT;
297 out:
298         ptlrpc_req_finished(req);
299         RETURN(rc);
300 }
301
302 static int osc_setattr_interpret(struct ptlrpc_request *req,
303                                  struct osc_async_args *aa, int rc)
304 {
305         struct ost_body *body;
306         ENTRY;
307
308         if (rc != 0)
309                 GOTO(out, rc);
310
311         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
312                                   lustre_swab_ost_body);
313         if (body == NULL) {
314                 CERROR("can't unpack ost_body\n");
315                 GOTO(out, rc = -EPROTO);
316         }
317
318         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
319 out:
320         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
321         RETURN(rc);
322 }
323
324 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
325                              struct obd_trans_info *oti,
326                              struct ptlrpc_request_set *rqset)
327 {
328         struct ptlrpc_request *req;
329         struct ost_body *body;
330         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
331         int bufcount = 2;
332         struct osc_async_args *aa;
333         ENTRY;
334
335         if (osc_exp_is_2_0_server(exp)) {
336                 bufcount = 3;
337         }
338
339         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
340                               OST_SETATTR, bufcount, size, NULL);
341         if (!req)
342                 RETURN(-ENOMEM);
343
344         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
345
346         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
347                 LASSERT(oti);
348                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
349         }
350
351         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
352         ptlrpc_req_set_repsize(req, 2, size);
353         /* do mds to ost setattr asynchronouly */
354         if (!rqset) {
355                 /* Do not wait for response. */
356                 ptlrpcd_add_req(req);
357         } else {
358                 req->rq_interpret_reply = osc_setattr_interpret;
359
360                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
361                 aa = ptlrpc_req_async_args(req);
362                 aa->aa_oi = oinfo;
363
364                 ptlrpc_set_add_req(rqset, req);
365         }
366
367         RETURN(0);
368 }
369
370 int osc_real_create(struct obd_export *exp, struct obdo *oa,
371                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body *body;
375         struct lov_stripe_md *lsm;
376         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
377         int rc;
378         ENTRY;
379
380         LASSERT(oa);
381         LASSERT(ea);
382
383         lsm = *ea;
384         if (!lsm) {
385                 rc = obd_alloc_memmd(exp, &lsm);
386                 if (rc < 0)
387                         RETURN(rc);
388         }
389
390         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
391                               OST_CREATE, 2, size, NULL);
392         if (!req)
393                 GOTO(out, rc = -ENOMEM);
394
395         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
396         memcpy(&body->oa, oa, sizeof(body->oa));
397
398         ptlrpc_req_set_repsize(req, 2, size);
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
412                                   lustre_swab_ost_body);
413         if (body == NULL) {
414                 CERROR ("can't unpack ost_body\n");
415                 GOTO (out_req, rc = -EPROTO);
416         }
417
418         memcpy(oa, &body->oa, sizeof(*oa));
419
420         /* This should really be sent by the OST */
421         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
422         oa->o_valid |= OBD_MD_FLBLKSZ;
423
424         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
425          * have valid lsm_oinfo data structs, so don't go touching that.
426          * This needs to be fixed in a big way.
427          */
428         lsm->lsm_object_id = oa->o_id;
429         *ea = lsm;
430
431         if (oti != NULL) {
432                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
433
434                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
435                         if (!oti->oti_logcookies)
436                                 oti_alloc_cookies(oti, 1);
437                         *oti->oti_logcookies = oa->o_lcookie;
438                 }
439         }
440
441         CDEBUG(D_HA, "transno: "LPD64"\n",
442                lustre_msg_get_transno(req->rq_repmsg));
443 out_req:
444         ptlrpc_req_finished(req);
445 out:
446         if (rc && !*ea)
447                 obd_free_memmd(exp, &lsm);
448         RETURN(rc);
449 }
450
451 static int osc_punch_interpret(struct ptlrpc_request *req,
452                                struct osc_async_args *aa, int rc)
453 {
454         struct ost_body *body;
455         ENTRY;
456
457         if (rc != 0)
458                 GOTO(out, rc);
459
460         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
461                                   lustre_swab_ost_body);
462         if (body == NULL) {
463                 CERROR ("can't unpack ost_body\n");
464                 GOTO(out, rc = -EPROTO);
465         }
466
467         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
468 out:
469         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
470         RETURN(rc);
471 }
472
473 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
474                      struct obd_trans_info *oti,
475                      struct ptlrpc_request_set *rqset)
476 {
477         struct ptlrpc_request *req;
478         struct osc_async_args *aa;
479         struct ost_body *body;
480         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
481         ENTRY;
482
483         if (!oinfo->oi_oa) {
484                 CERROR("oa NULL\n");
485                 RETURN(-EINVAL);
486         }
487
488         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
489                               OST_PUNCH, 2, size, NULL);
490         if (!req)
491                 RETURN(-ENOMEM);
492
493         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
494         ptlrpc_at_set_req_timeout(req);
495
496         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
497         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
498
499         /* overload the size and blocks fields in the oa with start/end */
500         body->oa.o_size = oinfo->oi_policy.l_extent.start;
501         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
502         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
503
504         ptlrpc_req_set_repsize(req, 2, size);
505
506         req->rq_interpret_reply = osc_punch_interpret;
507         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
508         aa = ptlrpc_req_async_args(req);
509         aa->aa_oi = oinfo;
510         ptlrpc_set_add_req(rqset, req);
511
512         RETURN(0);
513 }
514
515 static int osc_sync_interpret(struct ptlrpc_request *req,
516                               struct osc_async_args *aa, int rc)
517 {
518         struct ost_body *body;
519         ENTRY;
520
521         if (rc)
522                 GOTO(out, rc);
523
524         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
525                                   lustre_swab_ost_body);
526         if (body == NULL) {
527                 CERROR ("can't unpack ost_body\n");
528                 GOTO(out, rc = -EPROTO);
529         }
530
531         *aa->aa_oi->oi_oa = body->oa;
532 out:
533         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
534         RETURN(rc);
535 }
536
537 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
538                     obd_size start, obd_size end,
539                     struct ptlrpc_request_set *set)
540 {
541         struct ptlrpc_request *req;
542         struct ost_body *body;
543         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
544         struct osc_async_args *aa;
545         ENTRY;
546
547         if (!oinfo->oi_oa) {
548                 CERROR("oa NULL\n");
549                 RETURN(-EINVAL);
550         }
551
552         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
553                               OST_SYNC, 2, size, NULL);
554         if (!req)
555                 RETURN(-ENOMEM);
556
557         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
558         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
559
560         /* overload the size and blocks fields in the oa with start/end */
561         body->oa.o_size = start;
562         body->oa.o_blocks = end;
563         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
564
565         ptlrpc_req_set_repsize(req, 2, size);
566         req->rq_interpret_reply = osc_sync_interpret;
567
568         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
569         aa = ptlrpc_req_async_args(req);
570         aa->aa_oi = oinfo;
571
572         ptlrpc_set_add_req(set, req);
573         RETURN (0);
574 }
575
576 /* Find and cancel locally locks matched by @mode in the resource found by
577  * @objid. Found locks are added into @cancel list. Returns the amount of
578  * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580                                    struct list_head *cancels, ldlm_mode_t mode,
581                                    int lock_flags)
582 {
583         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584         struct ldlm_res_id res_id;
585         struct ldlm_resource *res;
586         int count;
587         ENTRY;
588
589         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
590         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
591         if (res == NULL)
592                 RETURN(0);
593
594         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
595                                            lock_flags, 0, NULL);
596         ldlm_resource_putref(res);
597         RETURN(count);
598 }
599
600 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
601                                  int rc)
602 {
603         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
604
605         atomic_dec(&cli->cl_destroy_in_flight);
606         cfs_waitq_signal(&cli->cl_destroy_waitq);
607         return 0;
608 }
609
610 static int osc_can_send_destroy(struct client_obd *cli)
611 {
612         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
613             cli->cl_max_rpcs_in_flight) {
614                 /* The destroy request can be sent */
615                 return 1;
616         }
617         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
618             cli->cl_max_rpcs_in_flight) {
619                 /*
620                  * The counter has been modified between the two atomic
621                  * operations.
622                  */
623                 cfs_waitq_signal(&cli->cl_destroy_waitq);
624         }
625         return 0;
626 }
627
628 /* Destroy requests can be async always on the client, and we don't even really
629  * care about the return code since the client cannot do anything at all about
630  * a destroy failure.
631  * When the MDS is unlinking a filename, it saves the file objects into a
632  * recovery llog, and these object records are cancelled when the OST reports
633  * they were destroyed and sync'd to disk (i.e. transaction committed).
634  * If the client dies, or the OST is down when the object should be destroyed,
635  * the records are not cancelled, and when the OST reconnects to the MDS next,
636  * it will retrieve the llog unlink logs and then sends the log cancellation
637  * cookies to the MDS after committing destroy transactions. */
638 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
639                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
640                        struct obd_export *md_export)
641 {
642         CFS_LIST_HEAD(cancels);
643         struct ptlrpc_request *req;
644         struct ost_body *body;
645         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
646                         sizeof(struct ldlm_request) };
647         int count, bufcount = 2;
648         struct client_obd *cli = &exp->exp_obd->u.cli;
649         ENTRY;
650
651         if (!oa) {
652                 CERROR("oa NULL\n");
653                 RETURN(-EINVAL);
654         }
655
656         LASSERT(oa->o_id != 0);
657
658         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
659                                         LDLM_FL_DISCARD_DATA);
660         if (exp_connect_cancelset(exp))
661                 bufcount = 3;
662         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
663                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
664         if (!req)
665                 RETURN(-ENOMEM);
666
667         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
668         req->rq_interpret_reply = osc_destroy_interpret;
669         ptlrpc_at_set_req_timeout(req);
670
671         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
672
673         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
674                 oa->o_lcookie = *oti->oti_logcookies;
675         }
676
677         memcpy(&body->oa, oa, sizeof(*oa));
678         ptlrpc_req_set_repsize(req, 2, size);
679
680         if (!osc_can_send_destroy(cli)) {
681                 struct l_wait_info lwi = { 0 };
682
683                 /*
684                  * Wait until the number of on-going destroy RPCs drops
685                  * under max_rpc_in_flight
686                  */
687                 l_wait_event_exclusive(cli->cl_destroy_waitq,
688                                        osc_can_send_destroy(cli), &lwi);
689         }
690
691         /* Do not wait for response */
692         ptlrpcd_add_req(req);
693         RETURN(0);
694 }
695
696 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
697                                 long writing_bytes)
698 {
699         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
700
701         LASSERT(!(oa->o_valid & bits));
702
703         oa->o_valid |= bits;
704         client_obd_list_lock(&cli->cl_loi_list_lock);
705         oa->o_dirty = cli->cl_dirty;
706         if (cli->cl_dirty > cli->cl_dirty_max) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty, cli->cl_dirty_max);
709                 oa->o_undirty = 0;
710         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
711                 CERROR("dirty %d > system dirty_max %d\n",
712                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
713                 oa->o_undirty = 0;
714         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
715                 CERROR("dirty %lu - dirty_max %lu too big???\n",
716                        cli->cl_dirty, cli->cl_dirty_max);
717                 oa->o_undirty = 0;
718         } else {
719                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
720                                 (cli->cl_max_rpcs_in_flight + 1);
721                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
722         }
723         oa->o_grant = cli->cl_avail_grant;
724         oa->o_dropped = cli->cl_lost_grant;
725         cli->cl_lost_grant = 0;
726         client_obd_list_unlock(&cli->cl_loi_list_lock);
727         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
728                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
729 }
730
731 /* caller must hold loi_list_lock */
732 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
733 {
734         atomic_inc(&obd_dirty_pages);
735         cli->cl_dirty += CFS_PAGE_SIZE;
736         cli->cl_avail_grant -= CFS_PAGE_SIZE;
737         pga->flag |= OBD_BRW_FROM_GRANT;
738         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
739                CFS_PAGE_SIZE, pga, pga->pg);
740         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
741                  cli->cl_avail_grant);
742 }
743
744 /* the companion to osc_consume_write_grant, called when a brw has completed.
745  * must be called with the loi lock held. */
746 static void osc_release_write_grant(struct client_obd *cli,
747                                     struct brw_page *pga, int sent)
748 {
749         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
750         ENTRY;
751
752         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
753                 EXIT;
754                 return;
755         }
756
757         pga->flag &= ~OBD_BRW_FROM_GRANT;
758         atomic_dec(&obd_dirty_pages);
759         cli->cl_dirty -= CFS_PAGE_SIZE;
760         if (!sent) {
761                 cli->cl_lost_grant += CFS_PAGE_SIZE;
762                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
763                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
764         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
765                 /* For short writes we shouldn't count parts of pages that
766                  * span a whole block on the OST side, or our accounting goes
767                  * wrong.  Should match the code in filter_grant_check. */
768                 int offset = pga->off & ~CFS_PAGE_MASK;
769                 int count = pga->count + (offset & (blocksize - 1));
770                 int end = (offset + pga->count) & (blocksize - 1);
771                 if (end)
772                         count += blocksize - end;
773
774                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
775                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
776                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
777                        cli->cl_avail_grant, cli->cl_dirty);
778         }
779
780         EXIT;
781 }
782
783 static unsigned long rpcs_in_flight(struct client_obd *cli)
784 {
785         return cli->cl_r_in_flight + cli->cl_w_in_flight;
786 }
787
788 /* caller must hold loi_list_lock */
789 void osc_wake_cache_waiters(struct client_obd *cli)
790 {
791         struct list_head *l, *tmp;
792         struct osc_cache_waiter *ocw;
793
794         ENTRY;
795         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
796                 /* if we can't dirty more, we must wait until some is written */
797                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
798                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
799                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
800                                "osc max %ld, sys max %d\n", cli->cl_dirty,
801                                cli->cl_dirty_max, obd_max_dirty_pages);
802                         return;
803                 }
804
805                 /* if still dirty cache but no grant wait for pending RPCs that
806                  * may yet return us some grant before doing sync writes */
807                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
808                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
809                                cli->cl_w_in_flight);
810                         return;
811                 }
812
813                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
814                 list_del_init(&ocw->ocw_entry);
815                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
816                         /* no more RPCs in flight to return grant, do sync IO */
817                         ocw->ocw_rc = -EDQUOT;
818                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
819                 } else {
820                         osc_consume_write_grant(cli,
821                                                 &ocw->ocw_oap->oap_brw_page);
822                 }
823
824                 cfs_waitq_signal(&ocw->ocw_waitq);
825         }
826
827         EXIT;
828 }
829
830 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
831 {
832         client_obd_list_lock(&cli->cl_loi_list_lock);
833         cli->cl_avail_grant = ocd->ocd_grant;
834         client_obd_list_unlock(&cli->cl_loi_list_lock);
835
836         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
837                cli->cl_avail_grant, cli->cl_lost_grant);
838         LASSERT(cli->cl_avail_grant >= 0);
839 }
840
841 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
842 {
843         client_obd_list_lock(&cli->cl_loi_list_lock);
844         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
845         if (body->oa.o_valid & OBD_MD_FLGRANT)
846                 cli->cl_avail_grant += body->oa.o_grant;
847         /* waiters are woken in brw_interpret */
848         client_obd_list_unlock(&cli->cl_loi_list_lock);
849 }
850
851 /* We assume that the reason this OSC got a short read is because it read
852  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
853  * via the LOV, and it _knows_ it's reading inside the file, it's just that
854  * this stripe never got written at or beyond this stripe offset yet. */
855 static void handle_short_read(int nob_read, obd_count page_count,
856                               struct brw_page **pga)
857 {
858         char *ptr;
859         int i = 0;
860
861         /* skip bytes read OK */
862         while (nob_read > 0) {
863                 LASSERT (page_count > 0);
864
865                 if (pga[i]->count > nob_read) {
866                         /* EOF inside this page */
867                         ptr = cfs_kmap(pga[i]->pg) +
868                                 (pga[i]->off & ~CFS_PAGE_MASK);
869                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
870                         cfs_kunmap(pga[i]->pg);
871                         page_count--;
872                         i++;
873                         break;
874                 }
875
876                 nob_read -= pga[i]->count;
877                 page_count--;
878                 i++;
879         }
880
881         /* zero remaining pages */
882         while (page_count-- > 0) {
883                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
884                 memset(ptr, 0, pga[i]->count);
885                 cfs_kunmap(pga[i]->pg);
886                 i++;
887         }
888 }
889
890 static int check_write_rcs(struct ptlrpc_request *req,
891                            int requested_nob, int niocount,
892                            obd_count page_count, struct brw_page **pga)
893 {
894         int    *remote_rcs, i;
895
896         /* return error if any niobuf was in error */
897         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
898                                         sizeof(*remote_rcs) * niocount, NULL);
899         if (remote_rcs == NULL) {
900                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
901                 return(-EPROTO);
902         }
903         if (lustre_rep_need_swab(req))
904                 for (i = 0; i < niocount; i++)
905                         __swab32s(&remote_rcs[i]);
906
907         for (i = 0; i < niocount; i++) {
908                 if (remote_rcs[i] < 0)
909                         return(remote_rcs[i]);
910
911                 if (remote_rcs[i] != 0) {
912                         CERROR("rc[%d] invalid (%d) req %p\n",
913                                 i, remote_rcs[i], req);
914                         return(-EPROTO);
915                 }
916         }
917
918         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
919                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
920                        req->rq_bulk->bd_nob_transferred, requested_nob);
921                 return(-EPROTO);
922         }
923
924         return (0);
925 }
926
927 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
928 {
929         if (p1->flag != p2->flag) {
930                 unsigned mask = ~OBD_BRW_FROM_GRANT;
931
932                 /* warn if we try to combine flags that we don't know to be
933                  * safe to combine */
934                 if ((p1->flag & mask) != (p2->flag & mask))
935                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
936                                "same brw?\n", p1->flag, p2->flag);
937                 return 0;
938         }
939
940         return (p1->off + p1->count == p2->off);
941 }
942
943 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
944                                    struct brw_page **pga, int opc,
945                                    cksum_type_t cksum_type)
946 {
947         __u32 cksum;
948         int i = 0;
949
950         LASSERT (pg_count > 0);
951         cksum = init_checksum(cksum_type);
952         while (nob > 0 && pg_count > 0) {
953                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
954                 int off = pga[i]->off & ~CFS_PAGE_MASK;
955                 int count = pga[i]->count > nob ? nob : pga[i]->count;
956
957                 /* corrupt the data before we compute the checksum, to
958                  * simulate an OST->client data error */
959                 if (i == 0 && opc == OST_READ &&
960                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
961                         memcpy(ptr + off, "bad1", min(4, nob));
962                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
963                 cfs_kunmap(pga[i]->pg);
964                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
965                                off, cksum);
966
967                 nob -= pga[i]->count;
968                 pg_count--;
969                 i++;
970         }
971         /* For sending we only compute the wrong checksum instead
972          * of corrupting the data so it is still correct on a redo */
973         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
974                 cksum++;
975
976         return cksum;
977 }
978
979 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
980                                 struct lov_stripe_md *lsm, obd_count page_count,
981                                 struct brw_page **pga,
982                                 struct ptlrpc_request **reqp)
983 {
984         struct ptlrpc_request   *req;
985         struct ptlrpc_bulk_desc *desc;
986         struct ost_body         *body;
987         struct obd_ioobj        *ioobj;
988         struct niobuf_remote    *niobuf;
989         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
990         int niocount, i, requested_nob, opc, rc;
991         struct ptlrpc_request_pool *pool;
992         struct osc_brw_async_args *aa;
993         struct brw_page *pg_prev;
994
995         ENTRY;
996         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
997         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
998
999         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1000         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1001
1002         for (niocount = i = 1; i < page_count; i++) {
1003                 if (!can_merge_pages(pga[i - 1], pga[i]))
1004                         niocount++;
1005         }
1006
1007         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1008         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1009
1010         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1011                                    NULL, pool);
1012         if (req == NULL)
1013                 RETURN (-ENOMEM);
1014
1015         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1016         ptlrpc_at_set_req_timeout(req);
1017
1018         if (opc == OST_WRITE)
1019                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1020                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1021         else
1022                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1023                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1024         if (desc == NULL)
1025                 GOTO(out, rc = -ENOMEM);
1026         /* NB request now owns desc and will free it when it gets freed */
1027
1028         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1029         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1030         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1031                                 niocount * sizeof(*niobuf));
1032
1033         memcpy(&body->oa, oa, sizeof(*oa));
1034
1035         obdo_to_ioobj(oa, ioobj);
1036         ioobj->ioo_bufcnt = niocount;
1037
1038         LASSERT (page_count > 0);
1039         pg_prev = pga[0];
1040         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1041                 struct brw_page *pg = pga[i];
1042
1043                 LASSERT(pg->count > 0);
1044                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1045                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1046                          pg->off, pg->count);
1047 #ifdef __linux__
1048                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1049                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1050                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1051                          i, page_count,
1052                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1053                          pg_prev->pg, page_private(pg_prev->pg),
1054                          pg_prev->pg->index, pg_prev->off);
1055 #else
1056                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1057                          "i %d p_c %u\n", i, page_count);
1058 #endif
1059                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1060                         (pg->flag & OBD_BRW_SRVLOCK));
1061
1062                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1063                                       pg->count);
1064                 requested_nob += pg->count;
1065
1066                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1067                         niobuf--;
1068                         niobuf->len += pg->count;
1069                 } else {
1070                         niobuf->offset = pg->off;
1071                         niobuf->len    = pg->count;
1072                         niobuf->flags  = pg->flag;
1073                 }
1074                 pg_prev = pg;
1075         }
1076
1077         LASSERTF((void *)(niobuf - niocount) ==
1078                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1079                                niocount * sizeof(*niobuf)),
1080                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1081                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1082                 (void *)(niobuf - niocount));
1083
1084         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1085
1086         /* size[REQ_REC_OFF] still sizeof (*body) */
1087         if (opc == OST_WRITE) {
1088                 if (cli->cl_checksum) {
1089                         /* store cl_cksum_type in a local variable since
1090                          * it can be changed via lprocfs */
1091                         cksum_type_t cksum_type = cli->cl_cksum_type;
1092
1093                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1094                                 oa->o_flags = body->oa.o_flags = 0;
1095                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1096                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1097                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1098                                                              page_count, pga,
1099                                                              OST_WRITE,
1100                                                              cksum_type);
1101                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1102                                body->oa.o_cksum);
1103                         /* save this in 'oa', too, for later checking */
1104                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1105                         oa->o_flags |= cksum_type_pack(cksum_type);
1106                 } else {
1107                         /* clear out the checksum flag, in case this is a
1108                          * resend but cl_checksum is no longer set. b=11238 */
1109                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1110                 }
1111                 oa->o_cksum = body->oa.o_cksum;
1112                 /* 1 RC per niobuf */
1113                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1114                 ptlrpc_req_set_repsize(req, 3, size);
1115         } else {
1116                 if (cli->cl_checksum) {
1117                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1118                                 body->oa.o_flags = 0;
1119                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1120                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1121                 }
1122                 /* 1 RC for the whole I/O */
1123                 ptlrpc_req_set_repsize(req, 2, size);
1124         }
1125
1126         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1127         aa = ptlrpc_req_async_args(req);
1128         aa->aa_oa = oa;
1129         aa->aa_requested_nob = requested_nob;
1130         aa->aa_nio_count = niocount;
1131         aa->aa_page_count = page_count;
1132         aa->aa_resends = 0;
1133         aa->aa_ppga = pga;
1134         aa->aa_cli = cli;
1135         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1136
1137         *reqp = req;
1138         RETURN (0);
1139
1140  out:
1141         ptlrpc_req_finished (req);
1142         RETURN (rc);
1143 }
1144
1145 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1146                                 __u32 client_cksum, __u32 server_cksum, int nob,
1147                                 obd_count page_count, struct brw_page **pga,
1148                                 cksum_type_t client_cksum_type)
1149 {
1150         __u32 new_cksum;
1151         char *msg;
1152         cksum_type_t cksum_type;
1153
1154         if (server_cksum == client_cksum) {
1155                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1156                 return 0;
1157         }
1158
1159         if (oa->o_valid & OBD_MD_FLFLAGS)
1160                 cksum_type = cksum_type_unpack(oa->o_flags);
1161         else
1162                 cksum_type = OBD_CKSUM_CRC32;
1163
1164         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1165                                       cksum_type);
1166
1167         if (cksum_type != client_cksum_type)
1168                 msg = "the server did not use the checksum type specified in "
1169                       "the original request - likely a protocol problem";
1170         else if (new_cksum == server_cksum)
1171                 msg = "changed on the client after we checksummed it - "
1172                       "likely false positive due to mmap IO (bug 11742)";
1173         else if (new_cksum == client_cksum)
1174                 msg = "changed in transit before arrival at OST";
1175         else
1176                 msg = "changed in transit AND doesn't match the original - "
1177                       "likely false positive due to mmap IO (bug 11742)";
1178
1179         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1180                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1181                            "["LPU64"-"LPU64"]\n",
1182                            msg, libcfs_nid2str(peer->nid),
1183                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1184                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1185                                                         (__u64)0,
1186                            oa->o_id,
1187                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1188                            pga[0]->off,
1189                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1190         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1191                "client csum now %x\n", client_cksum, client_cksum_type,
1192                server_cksum, cksum_type, new_cksum);
1193
1194         return 1;
1195 }
1196
1197 /* Note rc enters this function as number of bytes transferred */
1198 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1199 {
1200         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1201         const lnet_process_id_t *peer =
1202                         &req->rq_import->imp_connection->c_peer;
1203         struct client_obd *cli = aa->aa_cli;
1204         struct ost_body *body;
1205         __u32 client_cksum = 0;
1206         ENTRY;
1207
1208         if (rc < 0 && rc != -EDQUOT)
1209                 RETURN(rc);
1210
1211         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1212         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1213                                   lustre_swab_ost_body);
1214         if (body == NULL) {
1215                 CERROR ("Can't unpack body\n");
1216                 RETURN(-EPROTO);
1217         }
1218
1219         /* set/clear over quota flag for a uid/gid */
1220         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1221             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1222                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1223                              body->oa.o_gid, body->oa.o_valid,
1224                              body->oa.o_flags);
1225
1226         if (rc < 0)
1227                 RETURN(rc);
1228
1229         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1230                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1231
1232         osc_update_grant(cli, body);
1233
1234         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1235                 if (rc > 0) {
1236                         CERROR ("Unexpected +ve rc %d\n", rc);
1237                         RETURN(-EPROTO);
1238                 }
1239                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1240
1241                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1242                     check_write_checksum(&body->oa, peer, client_cksum,
1243                                          body->oa.o_cksum, aa->aa_requested_nob,
1244                                          aa->aa_page_count, aa->aa_ppga,
1245                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1246                         RETURN(-EAGAIN);
1247
1248                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1249                                      aa->aa_page_count, aa->aa_ppga);
1250                 GOTO(out, rc);
1251         }
1252
1253         /* The rest of this function executes only for OST_READs */
1254         if (rc > aa->aa_requested_nob) {
1255                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1256                        aa->aa_requested_nob);
1257                 RETURN(-EPROTO);
1258         }
1259
1260         if (rc != req->rq_bulk->bd_nob_transferred) {
1261                 CERROR ("Unexpected rc %d (%d transferred)\n",
1262                         rc, req->rq_bulk->bd_nob_transferred);
1263                 return (-EPROTO);
1264         }
1265
1266         if (rc < aa->aa_requested_nob)
1267                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1268
1269         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1270                 static int cksum_counter;
1271                 __u32      server_cksum = body->oa.o_cksum;
1272                 char      *via;
1273                 char      *router;
1274                 cksum_type_t cksum_type;
1275
1276                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1277                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1278                 else
1279                         cksum_type = OBD_CKSUM_CRC32;
1280                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1281                                                  aa->aa_ppga, OST_READ,
1282                                                  cksum_type);
1283
1284                 if (peer->nid == req->rq_bulk->bd_sender) {
1285                         via = router = "";
1286                 } else {
1287                         via = " via ";
1288                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1289                 }
1290
1291                 if (server_cksum == ~0 && rc > 0) {
1292                         CERROR("Protocol error: server %s set the 'checksum' "
1293                                "bit, but didn't send a checksum.  Not fatal, "
1294                                "but please notify on http://bugzilla.lustre.org/\n",
1295                                libcfs_nid2str(peer->nid));
1296                 } else if (server_cksum != client_cksum) {
1297                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1298                                            "%s%s%s inum "LPU64"/"LPU64" object "
1299                                            LPU64"/"LPU64" extent "
1300                                            "["LPU64"-"LPU64"]\n",
1301                                            req->rq_import->imp_obd->obd_name,
1302                                            libcfs_nid2str(peer->nid),
1303                                            via, router,
1304                                            body->oa.o_valid & OBD_MD_FLFID ?
1305                                                 body->oa.o_fid : (__u64)0,
1306                                            body->oa.o_valid & OBD_MD_FLFID ?
1307                                                 body->oa.o_generation :(__u64)0,
1308                                            body->oa.o_id,
1309                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1310                                                 body->oa.o_gr : (__u64)0,
1311                                            aa->aa_ppga[0]->off,
1312                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1313                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1314                                                                         1);
1315                         CERROR("client %x, server %x, cksum_type %x\n",
1316                                client_cksum, server_cksum, cksum_type);
1317                         cksum_counter = 0;
1318                         aa->aa_oa->o_cksum = client_cksum;
1319                         rc = -EAGAIN;
1320                 } else {
1321                         cksum_counter++;
1322                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1323                         rc = 0;
1324                 }
1325         } else if (unlikely(client_cksum)) {
1326                 static int cksum_missed;
1327
1328                 cksum_missed++;
1329                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1330                         CERROR("Checksum %u requested from %s but not sent\n",
1331                                cksum_missed, libcfs_nid2str(peer->nid));
1332         } else {
1333                 rc = 0;
1334         }
1335 out:
1336         if (rc >= 0)
1337                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1338
1339         RETURN(rc);
1340 }
1341
1342 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1343                             struct lov_stripe_md *lsm,
1344                             obd_count page_count, struct brw_page **pga)
1345 {
1346         struct ptlrpc_request *request;
1347         int                    rc;
1348         cfs_waitq_t            waitq;
1349         int                    resends = 0;
1350         struct l_wait_info     lwi;
1351
1352         ENTRY;
1353         init_waitqueue_head(&waitq);
1354
1355 restart_bulk:
1356         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1357                                   page_count, pga, &request);
1358         if (rc != 0)
1359                 return (rc);
1360
1361         rc = ptlrpc_queue_wait(request);
1362
1363         if (rc == -ETIMEDOUT && request->rq_resend) {
1364                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1365                 ptlrpc_req_finished(request);
1366                 goto restart_bulk;
1367         }
1368
1369         rc = osc_brw_fini_request(request, rc);
1370
1371         ptlrpc_req_finished(request);
1372         if (osc_recoverable_error(rc)) {
1373                 resends++;
1374                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1375                         CERROR("too many resend retries, returning error\n");
1376                         RETURN(-EIO);
1377                 }
1378
1379                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1380                 l_wait_event(waitq, 0, &lwi);
1381
1382                 goto restart_bulk;
1383         }
1384         RETURN(rc);
1385 }
1386
1387 int osc_brw_redo_request(struct ptlrpc_request *request,
1388                          struct osc_brw_async_args *aa)
1389 {
1390         struct ptlrpc_request *new_req;
1391         struct ptlrpc_request_set *set = request->rq_set;
1392         struct osc_brw_async_args *new_aa;
1393         struct osc_async_page *oap;
1394         int rc = 0;
1395         ENTRY;
1396
1397         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1398                 CERROR("too many resend retries, returning error\n");
1399                 RETURN(-EIO);
1400         }
1401
1402         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1403
1404         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1405                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1406                                   aa->aa_cli, aa->aa_oa,
1407                                   NULL /* lsm unused by osc currently */,
1408                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1409         if (rc)
1410                 RETURN(rc);
1411
1412         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1413
1414         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1415                 if (oap->oap_request != NULL) {
1416                         LASSERTF(request == oap->oap_request,
1417                                  "request %p != oap_request %p\n",
1418                                  request, oap->oap_request);
1419                         if (oap->oap_interrupted) {
1420                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1421                                 ptlrpc_req_finished(new_req);
1422                                 RETURN(-EINTR);
1423                         }
1424                 }
1425         }
1426         /* New request takes over pga and oaps from old request.
1427          * Note that copying a list_head doesn't work, need to move it... */
1428         aa->aa_resends++;
1429         new_req->rq_interpret_reply = request->rq_interpret_reply;
1430         new_req->rq_async_args = request->rq_async_args;
1431         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1432
1433         new_aa = ptlrpc_req_async_args(new_req);
1434
1435         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1436         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1437         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1438
1439         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1440                 if (oap->oap_request) {
1441                         ptlrpc_req_finished(oap->oap_request);
1442                         oap->oap_request = ptlrpc_request_addref(new_req);
1443                 }
1444         }
1445
1446         /* use ptlrpc_set_add_req is safe because interpret functions work
1447          * in check_set context. only one way exist with access to request
1448          * from different thread got -EINTR - this way protected with
1449          * cl_loi_list_lock */
1450         ptlrpc_set_add_req(set, new_req);
1451
1452         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1453
1454         DEBUG_REQ(D_INFO, new_req, "new request");
1455         RETURN(0);
1456 }
1457
1458 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1459                           struct lov_stripe_md *lsm, obd_count page_count,
1460                           struct brw_page **pga, struct ptlrpc_request_set *set)
1461 {
1462         struct ptlrpc_request     *request;
1463         struct client_obd         *cli = &exp->exp_obd->u.cli;
1464         int                        rc, i;
1465         struct osc_brw_async_args *aa;
1466         ENTRY;
1467
1468         /* Consume write credits even if doing a sync write -
1469          * otherwise we may run out of space on OST due to grant. */
1470         if (cmd == OBD_BRW_WRITE) {
1471                 client_obd_list_lock(&cli->cl_loi_list_lock);
1472                 for (i = 0; i < page_count; i++) {
1473                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1474                                 osc_consume_write_grant(cli, pga[i]);
1475                 }
1476                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1477         }
1478
1479         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1480                                   page_count, pga, &request);
1481
1482         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1483         aa = ptlrpc_req_async_args(request);
1484         if (cmd == OBD_BRW_READ) {
1485                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1486                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1487         } else {
1488                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1489                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1490                                  cli->cl_w_in_flight);
1491         }
1492         ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1493
1494         LASSERT(list_empty(&aa->aa_oaps));
1495
1496         if (rc == 0) {
1497                 request->rq_interpret_reply = brw_interpret;
1498                 ptlrpc_set_add_req(set, request);
1499                 client_obd_list_lock(&cli->cl_loi_list_lock);
1500                 if (cmd == OBD_BRW_READ)
1501                         cli->cl_r_in_flight++;
1502                 else
1503                         cli->cl_w_in_flight++;
1504                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1505                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1506         } else if (cmd == OBD_BRW_WRITE) {
1507                 client_obd_list_lock(&cli->cl_loi_list_lock);
1508                 for (i = 0; i < page_count; i++)
1509                         osc_release_write_grant(cli, pga[i], 0);
1510                 osc_wake_cache_waiters(cli);
1511                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1512         }
1513
1514         RETURN (rc);
1515 }
1516
1517 /*
1518  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1519  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1520  * fine for our small page arrays and doesn't require allocation.  its an
1521  * insertion sort that swaps elements that are strides apart, shrinking the
1522  * stride down until its '1' and the array is sorted.
1523  */
1524 static void sort_brw_pages(struct brw_page **array, int num)
1525 {
1526         int stride, i, j;
1527         struct brw_page *tmp;
1528
1529         if (num == 1)
1530                 return;
1531         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1532                 ;
1533
1534         do {
1535                 stride /= 3;
1536                 for (i = stride ; i < num ; i++) {
1537                         tmp = array[i];
1538                         j = i;
1539                         while (j >= stride && array[j-stride]->off > tmp->off) {
1540                                 array[j] = array[j - stride];
1541                                 j -= stride;
1542                         }
1543                         array[j] = tmp;
1544                 }
1545         } while (stride > 1);
1546 }
1547
1548 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1549 {
1550         int count = 1;
1551         int offset;
1552         int i = 0;
1553
1554         LASSERT (pages > 0);
1555         offset = pg[i]->off & (~CFS_PAGE_MASK);
1556
1557         for (;;) {
1558                 pages--;
1559                 if (pages == 0)         /* that's all */
1560                         return count;
1561
1562                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1563                         return count;   /* doesn't end on page boundary */
1564
1565                 i++;
1566                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1567                 if (offset != 0)        /* doesn't start on page boundary */
1568                         return count;
1569
1570                 count++;
1571         }
1572 }
1573
1574 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1575 {
1576         struct brw_page **ppga;
1577         int i;
1578
1579         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1580         if (ppga == NULL)
1581                 return NULL;
1582
1583         for (i = 0; i < count; i++)
1584                 ppga[i] = pga + i;
1585         return ppga;
1586 }
1587
1588 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1589 {
1590         LASSERT(ppga != NULL);
1591         OBD_FREE(ppga, sizeof(*ppga) * count);
1592 }
1593
1594 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1595                    obd_count page_count, struct brw_page *pga,
1596                    struct obd_trans_info *oti)
1597 {
1598         struct obdo *saved_oa = NULL;
1599         struct brw_page **ppga, **orig;
1600         struct obd_import *imp = class_exp2cliimp(exp);
1601         struct client_obd *cli = &imp->imp_obd->u.cli;
1602         int rc, page_count_orig;
1603         ENTRY;
1604
1605         if (cmd & OBD_BRW_CHECK) {
1606                 /* The caller just wants to know if there's a chance that this
1607                  * I/O can succeed */
1608
1609                 if (imp == NULL || imp->imp_invalid)
1610                         RETURN(-EIO);
1611                 RETURN(0);
1612         }
1613
1614         /* test_brw with a failed create can trip this, maybe others. */
1615         LASSERT(cli->cl_max_pages_per_rpc);
1616
1617         rc = 0;
1618
1619         orig = ppga = osc_build_ppga(pga, page_count);
1620         if (ppga == NULL)
1621                 RETURN(-ENOMEM);
1622         page_count_orig = page_count;
1623
1624         sort_brw_pages(ppga, page_count);
1625         while (page_count) {
1626                 obd_count pages_per_brw;
1627
1628                 if (page_count > cli->cl_max_pages_per_rpc)
1629                         pages_per_brw = cli->cl_max_pages_per_rpc;
1630                 else
1631                         pages_per_brw = page_count;
1632
1633                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1634
1635                 if (saved_oa != NULL) {
1636                         /* restore previously saved oa */
1637                         *oinfo->oi_oa = *saved_oa;
1638                 } else if (page_count > pages_per_brw) {
1639                         /* save a copy of oa (brw will clobber it) */
1640                         OBDO_ALLOC(saved_oa);
1641                         if (saved_oa == NULL)
1642                                 GOTO(out, rc = -ENOMEM);
1643                         *saved_oa = *oinfo->oi_oa;
1644                 }
1645
1646                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1647                                       pages_per_brw, ppga);
1648
1649                 if (rc != 0)
1650                         break;
1651
1652                 page_count -= pages_per_brw;
1653                 ppga += pages_per_brw;
1654         }
1655
1656 out:
1657         osc_release_ppga(orig, page_count_orig);
1658
1659         if (saved_oa != NULL)
1660                 OBDO_FREE(saved_oa);
1661
1662         RETURN(rc);
1663 }
1664
1665 static int osc_brw_async(int cmd, struct obd_export *exp,
1666                          struct obd_info *oinfo, obd_count page_count,
1667                          struct brw_page *pga, struct obd_trans_info *oti,
1668                          struct ptlrpc_request_set *set)
1669 {
1670         struct brw_page **ppga, **orig;
1671         int page_count_orig;
1672         int rc = 0;
1673         ENTRY;
1674
1675         if (cmd & OBD_BRW_CHECK) {
1676                 /* The caller just wants to know if there's a chance that this
1677                  * I/O can succeed */
1678                 struct obd_import *imp = class_exp2cliimp(exp);
1679
1680                 if (imp == NULL || imp->imp_invalid)
1681                         RETURN(-EIO);
1682                 RETURN(0);
1683         }
1684
1685         orig = ppga = osc_build_ppga(pga, page_count);
1686         if (ppga == NULL)
1687                 RETURN(-ENOMEM);
1688         page_count_orig = page_count;
1689
1690         sort_brw_pages(ppga, page_count);
1691         while (page_count) {
1692                 struct brw_page **copy;
1693                 obd_count pages_per_brw;
1694
1695                 pages_per_brw = min_t(obd_count, page_count,
1696                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1697
1698                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1699
1700                 /* use ppga only if single RPC is going to fly */
1701                 if (pages_per_brw != page_count_orig || ppga != orig) {
1702                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1703                         if (copy == NULL)
1704                                 GOTO(out, rc = -ENOMEM);
1705                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1706                 } else
1707                         copy = ppga;
1708
1709                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1710                                     pages_per_brw, copy, set);
1711
1712                 if (rc != 0) {
1713                         if (copy != ppga)
1714                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1715                         break;
1716                 }
1717
1718                 if (copy == orig) {
1719                         /* we passed it to async_internal() which is
1720                          * now responsible for releasing memory */
1721                         orig = NULL;
1722                 }
1723
1724                 page_count -= pages_per_brw;
1725                 ppga += pages_per_brw;
1726         }
1727 out:
1728         if (orig)
1729                 osc_release_ppga(orig, page_count_orig);
1730         RETURN(rc);
1731 }
1732
1733 static void osc_check_rpcs(struct client_obd *cli);
1734
1735 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1736  * the dirty accounting.  Writeback completes or truncate happens before
1737  * writing starts.  Must be called with the loi lock held. */
1738 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1739                            int sent)
1740 {
1741         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1742 }
1743
1744 /* This maintains the lists of pending pages to read/write for a given object
1745  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1746  * to quickly find objects that are ready to send an RPC. */
1747 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1748                          int cmd)
1749 {
1750         int optimal;
1751         ENTRY;
1752
1753         if (lop->lop_num_pending == 0)
1754                 RETURN(0);
1755
1756         /* if we have an invalid import we want to drain the queued pages
1757          * by forcing them through rpcs that immediately fail and complete
1758          * the pages.  recovery relies on this to empty the queued pages
1759          * before canceling the locks and evicting down the llite pages */
1760         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1761                 RETURN(1);
1762
1763         /* stream rpcs in queue order as long as as there is an urgent page
1764          * queued.  this is our cheap solution for good batching in the case
1765          * where writepage marks some random page in the middle of the file
1766          * as urgent because of, say, memory pressure */
1767         if (!list_empty(&lop->lop_urgent)) {
1768                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1769                 RETURN(1);
1770         }
1771
1772         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1773         optimal = cli->cl_max_pages_per_rpc;
1774         if (cmd & OBD_BRW_WRITE) {
1775                 /* trigger a write rpc stream as long as there are dirtiers
1776                  * waiting for space.  as they're waiting, they're not going to
1777                  * create more pages to coallesce with what's waiting.. */
1778                 if (!list_empty(&cli->cl_cache_waiters)) {
1779                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1780                         RETURN(1);
1781                 }
1782
1783                 /* +16 to avoid triggering rpcs that would want to include pages
1784                  * that are being queued but which can't be made ready until
1785                  * the queuer finishes with the page. this is a wart for
1786                  * llite::commit_write() */
1787                 optimal += 16;
1788         }
1789         if (lop->lop_num_pending >= optimal)
1790                 RETURN(1);
1791
1792         RETURN(0);
1793 }
1794
1795 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1796 {
1797         struct osc_async_page *oap;
1798         ENTRY;
1799
1800         if (list_empty(&lop->lop_urgent))
1801                 RETURN(0);
1802
1803         oap = list_entry(lop->lop_urgent.next,
1804                          struct osc_async_page, oap_urgent_item);
1805
1806         if (oap->oap_async_flags & ASYNC_HP) {
1807                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1808                 RETURN(1);
1809         }
1810
1811         RETURN(0);
1812 }
1813
1814 static void on_list(struct list_head *item, struct list_head *list,
1815                     int should_be_on)
1816 {
1817         if (list_empty(item) && should_be_on)
1818                 list_add_tail(item, list);
1819         else if (!list_empty(item) && !should_be_on)
1820                 list_del_init(item);
1821 }
1822
1823 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1824  * can find pages to build into rpcs quickly */
1825 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1826 {
1827         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1828             lop_makes_hprpc(&loi->loi_read_lop)) {
1829                 /* HP rpc */
1830                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1831                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1832         } else {
1833                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1834                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1835                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1836                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1837         }
1838
1839         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1840                 loi->loi_write_lop.lop_num_pending);
1841
1842         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1843                 loi->loi_read_lop.lop_num_pending);
1844 }
1845
1846 static void lop_update_pending(struct client_obd *cli,
1847                                struct loi_oap_pages *lop, int cmd, int delta)
1848 {
1849         lop->lop_num_pending += delta;
1850         if (cmd & OBD_BRW_WRITE)
1851                 cli->cl_pending_w_pages += delta;
1852         else
1853                 cli->cl_pending_r_pages += delta;
1854 }
1855
1856 /* this is called when a sync waiter receives an interruption.  Its job is to
1857  * get the caller woken as soon as possible.  If its page hasn't been put in an
1858  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1859  * desiring interruption which will forcefully complete the rpc once the rpc
1860  * has timed out */
1861 static void osc_occ_interrupted(struct oig_callback_context *occ)
1862 {
1863         struct osc_async_page *oap;
1864         struct loi_oap_pages *lop;
1865         struct lov_oinfo *loi;
1866         ENTRY;
1867
1868         /* XXX member_of() */
1869         oap = list_entry(occ, struct osc_async_page, oap_occ);
1870
1871         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1872
1873         oap->oap_interrupted = 1;
1874
1875         /* ok, it's been put in an rpc. only one oap gets a request reference */
1876         if (oap->oap_request != NULL) {
1877                 ptlrpc_mark_interrupted(oap->oap_request);
1878                 ptlrpcd_wake(oap->oap_request);
1879                 GOTO(unlock, 0);
1880         }
1881
1882         /* we don't get interruption callbacks until osc_trigger_group_io()
1883          * has been called and put the sync oaps in the pending/urgent lists.*/
1884         if (!list_empty(&oap->oap_pending_item)) {
1885                 list_del_init(&oap->oap_pending_item);
1886                 list_del_init(&oap->oap_urgent_item);
1887
1888                 loi = oap->oap_loi;
1889                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1890                         &loi->loi_write_lop : &loi->loi_read_lop;
1891                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1892                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1893
1894                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1895                 oap->oap_oig = NULL;
1896         }
1897
1898 unlock:
1899         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1900 }
1901
1902 /* this is trying to propogate async writeback errors back up to the
1903  * application.  As an async write fails we record the error code for later if
1904  * the app does an fsync.  As long as errors persist we force future rpcs to be
1905  * sync so that the app can get a sync error and break the cycle of queueing
1906  * pages for which writeback will fail. */
1907 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1908                            int rc)
1909 {
1910         if (rc) {
1911                 if (!ar->ar_rc)
1912                         ar->ar_rc = rc;
1913
1914                 ar->ar_force_sync = 1;
1915                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1916                 return;
1917
1918         }
1919
1920         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1921                 ar->ar_force_sync = 0;
1922 }
1923
1924 static void osc_oap_to_pending(struct osc_async_page *oap)
1925 {
1926         struct loi_oap_pages *lop;
1927
1928         if (oap->oap_cmd & OBD_BRW_WRITE)
1929                 lop = &oap->oap_loi->loi_write_lop;
1930         else
1931                 lop = &oap->oap_loi->loi_read_lop;
1932
1933         if (oap->oap_async_flags & ASYNC_HP)
1934                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1935         else if (oap->oap_async_flags & ASYNC_URGENT)
1936                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
1937         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1938         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1939 }
1940
1941 /* this must be called holding the loi list lock to give coverage to exit_cache,
1942  * async_flag maintenance, and oap_request */
1943 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1944                               struct osc_async_page *oap, int sent, int rc)
1945 {
1946         __u64 xid = 0;
1947
1948         ENTRY;
1949         if (oap->oap_request != NULL) {
1950                 xid = ptlrpc_req_xid(oap->oap_request);
1951                 ptlrpc_req_finished(oap->oap_request);
1952                 oap->oap_request = NULL;
1953         }
1954
1955         oap->oap_async_flags = 0;
1956         oap->oap_interrupted = 0;
1957
1958         if (oap->oap_cmd & OBD_BRW_WRITE) {
1959                 osc_process_ar(&cli->cl_ar, xid, rc);
1960                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1961         }
1962
1963         if (rc == 0 && oa != NULL) {
1964                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1965                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1966                 if (oa->o_valid & OBD_MD_FLMTIME)
1967                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1968                 if (oa->o_valid & OBD_MD_FLATIME)
1969                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1970                 if (oa->o_valid & OBD_MD_FLCTIME)
1971                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1972         }
1973
1974         if (oap->oap_oig) {
1975                 osc_exit_cache(cli, oap, sent);
1976                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1977                 oap->oap_oig = NULL;
1978                 EXIT;
1979                 return;
1980         }
1981
1982         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1983                                                 oap->oap_cmd, oa, rc);
1984
1985         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1986          * I/O on the page could start, but OSC calls it under lock
1987          * and thus we can add oap back to pending safely */
1988         if (rc)
1989                 /* upper layer wants to leave the page on pending queue */
1990                 osc_oap_to_pending(oap);
1991         else
1992                 osc_exit_cache(cli, oap, sent);
1993         EXIT;
1994 }
1995
1996 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1997 {
1998         struct osc_brw_async_args *aa = data;
1999         struct client_obd *cli;
2000         ENTRY;
2001
2002         rc = osc_brw_fini_request(request, rc);
2003         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2004
2005         if (osc_recoverable_error(rc)) {
2006                 rc = osc_brw_redo_request(request, aa);
2007                 if (rc == 0)
2008                         RETURN(0);
2009         }
2010
2011         cli = aa->aa_cli;
2012         client_obd_list_lock(&cli->cl_loi_list_lock);
2013         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2014          * is called so we know whether to go to sync BRWs or wait for more
2015          * RPCs to complete */
2016         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2017                 cli->cl_w_in_flight--;
2018         else
2019                 cli->cl_r_in_flight--;
2020
2021         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2022                 struct osc_async_page *oap, *tmp;
2023                 /* the caller may re-use the oap after the completion call so
2024                  * we need to clean it up a little */
2025                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2026                         list_del_init(&oap->oap_rpc_item);
2027                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2028                 }
2029                 OBDO_FREE(aa->aa_oa);
2030         } else { /* from async_internal() */
2031                 int i;
2032                 for (i = 0; i < aa->aa_page_count; i++)
2033                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2034         }
2035         osc_wake_cache_waiters(cli);
2036         osc_check_rpcs(cli);
2037         client_obd_list_unlock(&cli->cl_loi_list_lock);
2038
2039         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2040         RETURN(rc);
2041 }
2042
2043 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2044                                             struct list_head *rpc_list,
2045                                             int page_count, int cmd)
2046 {
2047         struct ptlrpc_request *req;
2048         struct brw_page **pga = NULL;
2049         struct osc_brw_async_args *aa;
2050         struct obdo *oa = NULL;
2051         struct obd_async_page_ops *ops = NULL;
2052         void *caller_data = NULL;
2053         struct osc_async_page *oap;
2054         struct ldlm_lock *lock = NULL;
2055         obd_valid valid;
2056         int i, rc;
2057
2058         ENTRY;
2059         LASSERT(!list_empty(rpc_list));
2060
2061         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2062         if (pga == NULL)
2063                 RETURN(ERR_PTR(-ENOMEM));
2064
2065         OBDO_ALLOC(oa);
2066         if (oa == NULL)
2067                 GOTO(out, req = ERR_PTR(-ENOMEM));
2068
2069         i = 0;
2070         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2071                 if (ops == NULL) {
2072                         ops = oap->oap_caller_ops;
2073                         caller_data = oap->oap_caller_data;
2074                         lock = oap->oap_ldlm_lock;
2075                 }
2076                 pga[i] = &oap->oap_brw_page;
2077                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2078                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2079                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2080                 i++;
2081         }
2082
2083         /* always get the data for the obdo for the rpc */
2084         LASSERT(ops != NULL);
2085         ops->ap_fill_obdo(caller_data, cmd, oa);
2086         if (lock) {
2087                 oa->o_handle = lock->l_remote_handle;
2088                 oa->o_valid |= OBD_MD_FLHANDLE;
2089         }
2090
2091         sort_brw_pages(pga, page_count);
2092         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
2093         if (rc != 0) {
2094                 CERROR("prep_req failed: %d\n", rc);
2095                 GOTO(out, req = ERR_PTR(rc));
2096         }
2097         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2098                                                  sizeof(struct ost_body)))->oa;
2099
2100         /* Need to update the timestamps after the request is built in case
2101          * we race with setattr (locally or in queue at OST).  If OST gets
2102          * later setattr before earlier BRW (as determined by the request xid),
2103          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2104          * way to do this in a single call.  bug 10150 */
2105         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2106                 /* in case of lockless read/write do not use inode's
2107                  * timestamps because concurrent stat might fill the
2108                  * inode with out-of-date times, send current
2109                  * instead */
2110                 if (cmd & OBD_BRW_WRITE) {
2111                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2112                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2113                         valid = OBD_MD_FLATIME;
2114                 } else {
2115                         oa->o_atime = LTIME_S(CURRENT_TIME);
2116                         oa->o_valid |= OBD_MD_FLATIME;
2117                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2118                 }
2119         } else {
2120                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2121         }
2122         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2123
2124         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2125         aa = ptlrpc_req_async_args(req);
2126         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2127         list_splice(rpc_list, &aa->aa_oaps);
2128         CFS_INIT_LIST_HEAD(rpc_list);
2129
2130 out:
2131         if (IS_ERR(req)) {
2132                 if (oa)
2133                         OBDO_FREE(oa);
2134                 if (pga)
2135                         OBD_FREE(pga, sizeof(*pga) * page_count);
2136         }
2137         RETURN(req);
2138 }
2139
2140 /* the loi lock is held across this function but it's allowed to release
2141  * and reacquire it during its work */
2142 /**
2143  * prepare pages for ASYNC io and put pages in send queue.
2144  *
2145  * \param cli -
2146  * \param loi -
2147  * \param cmd - OBD_BRW_* macroses
2148  * \param lop - pending pages
2149  *
2150  * \return zero if pages successfully add to send queue.
2151  * \return not zere if error occurring.
2152  */
2153 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2154                             int cmd, struct loi_oap_pages *lop)
2155 {
2156         struct ptlrpc_request *req;
2157         obd_count page_count = 0;
2158         struct osc_async_page *oap = NULL, *tmp;
2159         struct osc_brw_async_args *aa;
2160         struct obd_async_page_ops *ops;
2161         CFS_LIST_HEAD(rpc_list);
2162         unsigned int ending_offset;
2163         unsigned  starting_offset = 0;
2164         int srvlock = 0;
2165         ENTRY;
2166
2167         /* If there are HP OAPs we need to handle at least 1 of them,
2168          * move it the beginning of the pending list for that. */
2169         if (!list_empty(&lop->lop_urgent)) {
2170                 oap = list_entry(lop->lop_urgent.next,
2171                                  struct osc_async_page, oap_urgent_item);
2172                 if (oap->oap_async_flags & ASYNC_HP)
2173                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2174         }
2175
2176         /* first we find the pages we're allowed to work with */
2177         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2178                 ops = oap->oap_caller_ops;
2179
2180                 LASSERT(oap->oap_magic == OAP_MAGIC);
2181
2182                 if (page_count != 0 &&
2183                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2184                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2185                                " oap %p, page %p, srvlock %u\n",
2186                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2187                         break;
2188                 }
2189                 /* in llite being 'ready' equates to the page being locked
2190                  * until completion unlocks it.  commit_write submits a page
2191                  * as not ready because its unlock will happen unconditionally
2192                  * as the call returns.  if we race with commit_write giving
2193                  * us that page we dont' want to create a hole in the page
2194                  * stream, so we stop and leave the rpc to be fired by
2195                  * another dirtier or kupdated interval (the not ready page
2196                  * will still be on the dirty list).  we could call in
2197                  * at the end of ll_file_write to process the queue again. */
2198                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2199                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2200                         if (rc < 0)
2201                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2202                                                 "instead of ready\n", oap,
2203                                                 oap->oap_page, rc);
2204                         switch (rc) {
2205                         case -EAGAIN:
2206                                 /* llite is telling us that the page is still
2207                                  * in commit_write and that we should try
2208                                  * and put it in an rpc again later.  we
2209                                  * break out of the loop so we don't create
2210                                  * a hole in the sequence of pages in the rpc
2211                                  * stream.*/
2212                                 oap = NULL;
2213                                 break;
2214                         case -EINTR:
2215                                 /* the io isn't needed.. tell the checks
2216                                  * below to complete the rpc with EINTR */
2217                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2218                                 oap->oap_count = -EINTR;
2219                                 break;
2220                         case 0:
2221                                 oap->oap_async_flags |= ASYNC_READY;
2222                                 break;
2223                         default:
2224                                 LASSERTF(0, "oap %p page %p returned %d "
2225                                             "from make_ready\n", oap,
2226                                             oap->oap_page, rc);
2227                                 break;
2228                         }
2229                 }
2230                 if (oap == NULL)
2231                         break;
2232                 /*
2233                  * Page submitted for IO has to be locked. Either by
2234                  * ->ap_make_ready() or by higher layers.
2235                  */
2236 #if defined(__KERNEL__) && defined(__linux__)
2237                  if(!(PageLocked(oap->oap_page) &&
2238                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2239                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2240                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2241                         LBUG();
2242                 }
2243 #endif
2244                 /* If there is a gap at the start of this page, it can't merge
2245                  * with any previous page, so we'll hand the network a
2246                  * "fragmented" page array that it can't transfer in 1 RDMA */
2247                 if (page_count != 0 && oap->oap_page_off != 0)
2248                         break;
2249
2250                 /* take the page out of our book-keeping */
2251                 list_del_init(&oap->oap_pending_item);
2252                 lop_update_pending(cli, lop, cmd, -1);
2253                 list_del_init(&oap->oap_urgent_item);
2254
2255                 if (page_count == 0)
2256                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2257                                           (PTLRPC_MAX_BRW_SIZE - 1);
2258
2259                 /* ask the caller for the size of the io as the rpc leaves. */
2260                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2261                         oap->oap_count =
2262                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2263                 if (oap->oap_count <= 0) {
2264                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2265                                oap->oap_count);
2266                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2267                         continue;
2268                 }
2269
2270                 /* now put the page back in our accounting */
2271                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2272                 if (page_count == 0)
2273                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2274                 if (++page_count >= cli->cl_max_pages_per_rpc)
2275                         break;
2276
2277                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2278                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2279                  * have the same alignment as the initial writes that allocated
2280                  * extents on the server. */
2281                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2282                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2283                 if (ending_offset == 0)
2284                         break;
2285
2286                 /* If there is a gap at the end of this page, it can't merge
2287                  * with any subsequent pages, so we'll hand the network a
2288                  * "fragmented" page array that it can't transfer in 1 RDMA */
2289                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2290                         break;
2291         }
2292
2293         osc_wake_cache_waiters(cli);
2294
2295         if (page_count == 0)
2296                 RETURN(0);
2297
2298         loi_list_maint(cli, loi);
2299
2300         client_obd_list_unlock(&cli->cl_loi_list_lock);
2301
2302         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2303         if (IS_ERR(req)) {
2304                 /* this should happen rarely and is pretty bad, it makes the
2305                  * pending list not follow the dirty order */
2306                 client_obd_list_lock(&cli->cl_loi_list_lock);
2307                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2308                         list_del_init(&oap->oap_rpc_item);
2309
2310                         /* queued sync pages can be torn down while the pages
2311                          * were between the pending list and the rpc */
2312                         if (oap->oap_interrupted) {
2313                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2314                                 osc_ap_completion(cli, NULL, oap, 0,
2315                                                   oap->oap_count);
2316                                 continue;
2317                         }
2318                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2319                 }
2320                 loi_list_maint(cli, loi);
2321                 RETURN(PTR_ERR(req));
2322         }
2323
2324         aa = ptlrpc_req_async_args(req);
2325         if (cmd == OBD_BRW_READ) {
2326                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2327                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2328                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2329                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2330         } else {
2331                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2332                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2333                                  cli->cl_w_in_flight);
2334                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2335                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2336         }
2337         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2338
2339         client_obd_list_lock(&cli->cl_loi_list_lock);
2340
2341         if (cmd == OBD_BRW_READ)
2342                 cli->cl_r_in_flight++;
2343         else
2344                 cli->cl_w_in_flight++;
2345
2346         /* queued sync pages can be torn down while the pages
2347          * were between the pending list and the rpc */
2348         tmp = NULL;
2349         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2350                 /* only one oap gets a request reference */
2351                 if (tmp == NULL)
2352                         tmp = oap;
2353                 if (oap->oap_interrupted && !req->rq_intr) {
2354                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2355                                oap, req);
2356                         ptlrpc_mark_interrupted(req);
2357                 }
2358         }
2359         if (tmp != NULL)
2360                 tmp->oap_request = ptlrpc_request_addref(req);
2361
2362         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2363                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2364
2365         req->rq_interpret_reply = brw_interpret;
2366         ptlrpcd_add_req(req);
2367         RETURN(1);
2368 }
2369
2370 #define LOI_DEBUG(LOI, STR, args...)                                     \
2371         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2372                !list_empty(&(LOI)->loi_ready_item) ||                    \
2373                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2374                (LOI)->loi_write_lop.lop_num_pending,                     \
2375                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2376                (LOI)->loi_read_lop.lop_num_pending,                      \
2377                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2378                args)                                                     \
2379
2380 /* This is called by osc_check_rpcs() to find which objects have pages that
2381  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2382 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2383 {
2384         ENTRY;
2385         /* First return objects that have blocked locks so that they
2386          * will be flushed quickly and other clients can get the lock,
2387          * then objects which have pages ready to be stuffed into RPCs */
2388         if (!list_empty(&cli->cl_loi_hp_ready_list))
2389                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2390                                   struct lov_oinfo, loi_hp_ready_item));
2391         if (!list_empty(&cli->cl_loi_ready_list))
2392                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2393                                   struct lov_oinfo, loi_ready_item));
2394
2395         /* then if we have cache waiters, return all objects with queued
2396          * writes.  This is especially important when many small files
2397          * have filled up the cache and not been fired into rpcs because
2398          * they don't pass the nr_pending/object threshhold */
2399         if (!list_empty(&cli->cl_cache_waiters) &&
2400             !list_empty(&cli->cl_loi_write_list))
2401                 RETURN(list_entry(cli->cl_loi_write_list.next,
2402                                   struct lov_oinfo, loi_write_item));
2403
2404         /* then return all queued objects when we have an invalid import
2405          * so that they get flushed */
2406         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2407                 if (!list_empty(&cli->cl_loi_write_list))
2408                         RETURN(list_entry(cli->cl_loi_write_list.next,
2409                                           struct lov_oinfo, loi_write_item));
2410                 if (!list_empty(&cli->cl_loi_read_list))
2411                         RETURN(list_entry(cli->cl_loi_read_list.next,
2412                                           struct lov_oinfo, loi_read_item));
2413         }
2414         RETURN(NULL);
2415 }
2416
2417 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2418 {
2419         struct osc_async_page *oap;
2420         int hprpc = 0;
2421
2422         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2423                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2424                                  struct osc_async_page, oap_urgent_item);
2425                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2426         }
2427
2428         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2429                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2430                                  struct osc_async_page, oap_urgent_item);
2431                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2432         }
2433
2434         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2435 }
2436
2437 /* called with the loi list lock held */
2438 static void osc_check_rpcs(struct client_obd *cli)
2439 {
2440         struct lov_oinfo *loi;
2441         int rc = 0, race_counter = 0;
2442         ENTRY;
2443
2444         while ((loi = osc_next_loi(cli)) != NULL) {
2445                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2446
2447                 if (osc_max_rpc_in_flight(cli, loi))
2448                         break;
2449
2450                 /* attempt some read/write balancing by alternating between
2451                  * reads and writes in an object.  The makes_rpc checks here
2452                  * would be redundant if we were getting read/write work items
2453                  * instead of objects.  we don't want send_oap_rpc to drain a
2454                  * partial read pending queue when we're given this object to
2455                  * do io on writes while there are cache waiters */
2456                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2457                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2458                                               &loi->loi_write_lop);
2459                         if (rc < 0)
2460                                 break;
2461                         if (rc > 0)
2462                                 race_counter = 0;
2463                         else
2464                                 race_counter++;
2465                 }
2466                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2467                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2468                                               &loi->loi_read_lop);
2469                         if (rc < 0)
2470                                 break;
2471                         if (rc > 0)
2472                                 race_counter = 0;
2473                         else
2474                                 race_counter++;
2475                 }
2476
2477                 /* attempt some inter-object balancing by issueing rpcs
2478                  * for each object in turn */
2479                 if (!list_empty(&loi->loi_hp_ready_item))
2480                         list_del_init(&loi->loi_hp_ready_item);
2481                 if (!list_empty(&loi->loi_ready_item))
2482                         list_del_init(&loi->loi_ready_item);
2483                 if (!list_empty(&loi->loi_write_item))
2484                         list_del_init(&loi->loi_write_item);
2485                 if (!list_empty(&loi->loi_read_item))
2486                         list_del_init(&loi->loi_read_item);
2487
2488                 loi_list_maint(cli, loi);
2489
2490                 /* send_oap_rpc fails with 0 when make_ready tells it to
2491                  * back off.  llite's make_ready does this when it tries
2492                  * to lock a page queued for write that is already locked.
2493                  * we want to try sending rpcs from many objects, but we
2494                  * don't want to spin failing with 0.  */
2495                 if (race_counter == 10)
2496                         break;
2497         }
2498         EXIT;
2499 }
2500
2501 /* we're trying to queue a page in the osc so we're subject to the
2502  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2503  * If the osc's queued pages are already at that limit, then we want to sleep
2504  * until there is space in the osc's queue for us.  We also may be waiting for
2505  * write credits from the OST if there are RPCs in flight that may return some
2506  * before we fall back to sync writes.
2507  *
2508  * We need this know our allocation was granted in the presence of signals */
2509 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2510 {
2511         int rc;
2512         ENTRY;
2513         client_obd_list_lock(&cli->cl_loi_list_lock);
2514         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2515         client_obd_list_unlock(&cli->cl_loi_list_lock);
2516         RETURN(rc);
2517 };
2518
2519 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2520  * grant or cache space. */
2521 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2522                            struct osc_async_page *oap)
2523 {
2524         struct osc_cache_waiter ocw;
2525         struct l_wait_info lwi = { 0 };
2526         ENTRY;
2527
2528         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2529                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2530                cli->cl_dirty_max, obd_max_dirty_pages,
2531                cli->cl_lost_grant, cli->cl_avail_grant);
2532
2533         /* force the caller to try sync io.  this can jump the list
2534          * of queued writes and create a discontiguous rpc stream */
2535         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2536             loi->loi_ar.ar_force_sync)
2537                 RETURN(-EDQUOT);
2538
2539         /* Hopefully normal case - cache space and write credits available */
2540         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2541             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2542             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2543                 /* account for ourselves */
2544                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2545                 RETURN(0);
2546         }
2547
2548         /* Make sure that there are write rpcs in flight to wait for.  This
2549          * is a little silly as this object may not have any pending but
2550          * other objects sure might. */
2551         if (cli->cl_w_in_flight) {
2552                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2553                 cfs_waitq_init(&ocw.ocw_waitq);
2554                 ocw.ocw_oap = oap;
2555                 ocw.ocw_rc = 0;
2556
2557                 loi_list_maint(cli, loi);
2558                 osc_check_rpcs(cli);
2559                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2560
2561                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2562                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2563
2564                 client_obd_list_lock(&cli->cl_loi_list_lock);
2565                 if (!list_empty(&ocw.ocw_entry)) {
2566                         list_del(&ocw.ocw_entry);
2567                         RETURN(-EINTR);
2568                 }
2569                 RETURN(ocw.ocw_rc);
2570         }
2571
2572         RETURN(-EDQUOT);
2573 }
2574
2575 static int osc_reget_short_lock(struct obd_export *exp,
2576                                 struct lov_stripe_md *lsm,
2577                                 void **res, int rw,
2578                                 obd_off start, obd_off end,
2579                                 void **cookie)
2580 {
2581         struct osc_async_page *oap = *res;
2582         int rc;
2583
2584         ENTRY;
2585
2586         spin_lock(&oap->oap_lock);
2587         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2588                                   start, end, cookie);
2589         spin_unlock(&oap->oap_lock);
2590
2591         RETURN(rc);
2592 }
2593
2594 static int osc_release_short_lock(struct obd_export *exp,
2595                                   struct lov_stripe_md *lsm, obd_off end,
2596                                   void *cookie, int rw)
2597 {
2598         ENTRY;
2599         ldlm_lock_fast_release(cookie, rw);
2600         /* no error could have happened at this layer */
2601         RETURN(0);
2602 }
2603
2604 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2605                         struct lov_oinfo *loi, cfs_page_t *page,
2606                         obd_off offset, struct obd_async_page_ops *ops,
2607                         void *data, void **res, int nocache,
2608                         struct lustre_handle *lockh)
2609 {
2610         struct osc_async_page *oap;
2611         struct ldlm_res_id oid = {{0}};
2612         int rc = 0;
2613
2614         ENTRY;
2615
2616         if (!page)
2617                 return size_round(sizeof(*oap));
2618
2619         oap = *res;
2620         oap->oap_magic = OAP_MAGIC;
2621         oap->oap_cli = &exp->exp_obd->u.cli;
2622         oap->oap_loi = loi;
2623
2624         oap->oap_caller_ops = ops;
2625         oap->oap_caller_data = data;
2626
2627         oap->oap_page = page;
2628         oap->oap_obj_off = offset;
2629
2630         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2631         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2632         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2633         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2634
2635         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2636
2637         spin_lock_init(&oap->oap_lock);
2638
2639         /* If the page was marked as notcacheable - don't add to any locks */
2640         if (!nocache) {
2641                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2642                 /* This is the only place where we can call cache_add_extent
2643                    without oap_lock, because this page is locked now, and
2644                    the lock we are adding it to is referenced, so cannot lose
2645                    any pages either. */
2646                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2647                 if (rc)
2648                         RETURN(rc);
2649         }
2650
2651         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2652         RETURN(0);
2653 }
2654
2655 struct osc_async_page *oap_from_cookie(void *cookie)
2656 {
2657         struct osc_async_page *oap = cookie;
2658         if (oap->oap_magic != OAP_MAGIC)
2659                 return ERR_PTR(-EINVAL);
2660         return oap;
2661 };
2662
2663 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2664                               struct lov_oinfo *loi, void *cookie,
2665                               int cmd, obd_off off, int count,
2666                               obd_flag brw_flags, enum async_flags async_flags)
2667 {
2668         struct client_obd *cli = &exp->exp_obd->u.cli;
2669         struct osc_async_page *oap;
2670         int rc = 0;
2671         ENTRY;
2672
2673         oap = oap_from_cookie(cookie);
2674         if (IS_ERR(oap))
2675                 RETURN(PTR_ERR(oap));
2676
2677         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2678                 RETURN(-EIO);
2679
2680         if (!list_empty(&oap->oap_pending_item) ||
2681             !list_empty(&oap->oap_urgent_item) ||
2682             !list_empty(&oap->oap_rpc_item))
2683                 RETURN(-EBUSY);
2684
2685         /* check if the file's owner/group is over quota */
2686         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2687                 struct obd_async_page_ops *ops;
2688                 struct obdo *oa;
2689
2690                 OBDO_ALLOC(oa);
2691                 if (oa == NULL)
2692                         RETURN(-ENOMEM);
2693
2694                 ops = oap->oap_caller_ops;
2695                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2696                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2697                     NO_QUOTA)
2698                         rc = -EDQUOT;
2699
2700                 OBDO_FREE(oa);
2701                 if (rc)
2702                         RETURN(rc);
2703         }
2704
2705         if (loi == NULL)
2706                 loi = lsm->lsm_oinfo[0];
2707
2708         client_obd_list_lock(&cli->cl_loi_list_lock);
2709
2710         oap->oap_cmd = cmd;
2711         oap->oap_page_off = off;
2712         oap->oap_count = count;
2713         oap->oap_brw_flags = brw_flags;
2714         oap->oap_async_flags = async_flags;
2715
2716         if (cmd & OBD_BRW_WRITE) {
2717                 rc = osc_enter_cache(cli, loi, oap);
2718                 if (rc) {
2719                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2720                         RETURN(rc);
2721                 }
2722         }
2723
2724         osc_oap_to_pending(oap);
2725         loi_list_maint(cli, loi);
2726
2727         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2728                   cmd);
2729
2730         osc_check_rpcs(cli);
2731         client_obd_list_unlock(&cli->cl_loi_list_lock);
2732
2733         RETURN(0);
2734 }
2735
2736 /* aka (~was & now & flag), but this is more clear :) */
2737 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2738
2739 static int osc_set_async_flags(struct obd_export *exp,
2740                                struct lov_stripe_md *lsm,
2741                                struct lov_oinfo *loi, void *cookie,
2742                                obd_flag async_flags)
2743 {
2744         struct client_obd *cli = &exp->exp_obd->u.cli;
2745         struct loi_oap_pages *lop;
2746         struct osc_async_page *oap;
2747         int rc = 0;
2748         ENTRY;
2749
2750         oap = oap_from_cookie(cookie);
2751         if (IS_ERR(oap))
2752                 RETURN(PTR_ERR(oap));
2753
2754         /*
2755          * bug 7311: OST-side locking is only supported for liblustre for now
2756          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2757          * implementation has to handle case where OST-locked page was picked
2758          * up by, e.g., ->writepage().
2759          */
2760         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2761         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2762                                      * tread here. */
2763
2764         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2765                 RETURN(-EIO);
2766
2767         if (loi == NULL)
2768                 loi = lsm->lsm_oinfo[0];
2769
2770         if (oap->oap_cmd & OBD_BRW_WRITE) {
2771                 lop = &loi->loi_write_lop;
2772         } else {
2773                 lop = &loi->loi_read_lop;
2774         }
2775
2776         client_obd_list_lock(&cli->cl_loi_list_lock);
2777
2778         if (list_empty(&oap->oap_pending_item))
2779                 GOTO(out, rc = -EINVAL);
2780
2781         if ((oap->oap_async_flags & async_flags) == async_flags)
2782                 GOTO(out, rc = 0);
2783
2784         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2785                 oap->oap_async_flags |= ASYNC_READY;
2786
2787         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2788             list_empty(&oap->oap_rpc_item)) {
2789                 if (oap->oap_async_flags & ASYNC_HP)
2790                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2791                 else
2792                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2793                 oap->oap_async_flags |= ASYNC_URGENT;
2794                 loi_list_maint(cli, loi);
2795         }
2796
2797         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2798                         oap->oap_async_flags);
2799 out:
2800         osc_check_rpcs(cli);
2801         client_obd_list_unlock(&cli->cl_loi_list_lock);
2802         RETURN(rc);
2803 }
2804
2805 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2806                              struct lov_oinfo *loi,
2807                              struct obd_io_group *oig, void *cookie,
2808                              int cmd, obd_off off, int count,
2809                              obd_flag brw_flags,
2810                              obd_flag async_flags)
2811 {
2812         struct client_obd *cli = &exp->exp_obd->u.cli;
2813         struct osc_async_page *oap;
2814         struct loi_oap_pages *lop;
2815         int rc = 0;
2816         ENTRY;
2817
2818         oap = oap_from_cookie(cookie);
2819         if (IS_ERR(oap))
2820                 RETURN(PTR_ERR(oap));
2821
2822         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2823                 RETURN(-EIO);
2824
2825         if (!list_empty(&oap->oap_pending_item) ||
2826             !list_empty(&oap->oap_urgent_item) ||
2827             !list_empty(&oap->oap_rpc_item))
2828                 RETURN(-EBUSY);
2829
2830         if (loi == NULL)
2831                 loi = lsm->lsm_oinfo[0];
2832
2833         client_obd_list_lock(&cli->cl_loi_list_lock);
2834
2835         oap->oap_cmd = cmd;
2836         oap->oap_page_off = off;
2837         oap->oap_count = count;
2838         oap->oap_brw_flags = brw_flags;
2839         oap->oap_async_flags = async_flags;
2840
2841         if (cmd & OBD_BRW_WRITE)
2842                 lop = &loi->loi_write_lop;
2843         else
2844                 lop = &loi->loi_read_lop;
2845
2846         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2847         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2848                 oap->oap_oig = oig;
2849                 rc = oig_add_one(oig, &oap->oap_occ);
2850         }
2851
2852         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2853                   oap, oap->oap_page, rc);
2854
2855         client_obd_list_unlock(&cli->cl_loi_list_lock);
2856
2857         RETURN(rc);
2858 }
2859
2860 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2861                                  struct loi_oap_pages *lop, int cmd)
2862 {
2863         struct list_head *pos, *tmp;
2864         struct osc_async_page *oap;
2865
2866         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2867                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2868                 list_del(&oap->oap_pending_item);
2869                 osc_oap_to_pending(oap);
2870         }
2871         loi_list_maint(cli, loi);
2872 }
2873
2874 static int osc_trigger_group_io(struct obd_export *exp,
2875                                 struct lov_stripe_md *lsm,
2876                                 struct lov_oinfo *loi,
2877                                 struct obd_io_group *oig)
2878 {
2879         struct client_obd *cli = &exp->exp_obd->u.cli;
2880         ENTRY;
2881
2882         if (loi == NULL)
2883                 loi = lsm->lsm_oinfo[0];
2884
2885         client_obd_list_lock(&cli->cl_loi_list_lock);
2886
2887         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2888         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2889
2890         osc_check_rpcs(cli);
2891         client_obd_list_unlock(&cli->cl_loi_list_lock);
2892
2893         RETURN(0);
2894 }
2895
2896 static int osc_teardown_async_page(struct obd_export *exp,
2897                                    struct lov_stripe_md *lsm,
2898                                    struct lov_oinfo *loi, void *cookie)
2899 {
2900         struct client_obd *cli = &exp->exp_obd->u.cli;
2901         struct loi_oap_pages *lop;
2902         struct osc_async_page *oap;
2903         int rc = 0;
2904         ENTRY;
2905
2906         oap = oap_from_cookie(cookie);
2907         if (IS_ERR(oap))
2908                 RETURN(PTR_ERR(oap));
2909
2910         if (loi == NULL)
2911                 loi = lsm->lsm_oinfo[0];
2912
2913         if (oap->oap_cmd & OBD_BRW_WRITE) {
2914                 lop = &loi->loi_write_lop;
2915         } else {
2916                 lop = &loi->loi_read_lop;
2917         }
2918
2919         client_obd_list_lock(&cli->cl_loi_list_lock);
2920
2921         if (!list_empty(&oap->oap_rpc_item))
2922                 GOTO(out, rc = -EBUSY);
2923
2924         osc_exit_cache(cli, oap, 0);
2925         osc_wake_cache_waiters(cli);
2926
2927         if (!list_empty(&oap->oap_urgent_item)) {
2928                 list_del_init(&oap->oap_urgent_item);
2929                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2930         }
2931
2932         if (!list_empty(&oap->oap_pending_item)) {
2933                 list_del_init(&oap->oap_pending_item);
2934                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2935         }
2936         loi_list_maint(cli, loi);
2937         cache_remove_extent(cli->cl_cache, oap);
2938
2939         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2940 out:
2941         client_obd_list_unlock(&cli->cl_loi_list_lock);
2942         RETURN(rc);
2943 }
2944
2945 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2946                            struct ldlm_lock_desc *new, void *data,
2947                            int flag)
2948 {
2949         struct lustre_handle lockh = { 0 };
2950         int rc;
2951         ENTRY;
2952
2953         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2954                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2955                 LBUG();
2956         }
2957
2958         switch (flag) {
2959         case LDLM_CB_BLOCKING:
2960                 ldlm_lock2handle(lock, &lockh);
2961                 rc = ldlm_cli_cancel(&lockh);
2962                 if (rc != ELDLM_OK)
2963                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2964                 break;
2965         case LDLM_CB_CANCELING: {
2966
2967                 ldlm_lock2handle(lock, &lockh);
2968                 /* This lock wasn't granted, don't try to do anything */
2969                 if (lock->l_req_mode != lock->l_granted_mode)
2970                         RETURN(0);
2971
2972                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2973                                   &lockh);
2974
2975                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2976                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2977                                                           lock, new, data,flag);
2978                 break;
2979         }
2980         default:
2981                 LBUG();
2982         }
2983
2984         RETURN(0);
2985 }
2986 EXPORT_SYMBOL(osc_extent_blocking_cb);
2987
2988 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2989                                     int flags)
2990 {
2991         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2992
2993         if (lock == NULL) {
2994                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2995                 return;
2996         }
2997         lock_res_and_lock(lock);
2998 #if defined (__KERNEL__) && defined (__linux__)
2999         /* Liang XXX: Darwin and Winnt checking should be added */
3000         if (lock->l_ast_data && lock->l_ast_data != data) {
3001                 struct inode *new_inode = data;
3002                 struct inode *old_inode = lock->l_ast_data;
3003                 if (!(old_inode->i_state & I_FREEING))
3004                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3005                 LASSERTF(old_inode->i_state & I_FREEING,
3006                          "Found existing inode %p/%lu/%u state %lu in lock: "
3007                          "setting data to %p/%lu/%u\n", old_inode,
3008                          old_inode->i_ino, old_inode->i_generation,
3009                          old_inode->i_state,
3010                          new_inode, new_inode->i_ino, new_inode->i_generation);
3011         }
3012 #endif
3013         lock->l_ast_data = data;
3014         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3015         unlock_res_and_lock(lock);
3016         LDLM_LOCK_PUT(lock);
3017 }
3018
3019 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3020                              ldlm_iterator_t replace, void *data)
3021 {
3022         struct ldlm_res_id res_id;
3023         struct obd_device *obd = class_exp2obd(exp);
3024
3025         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3026         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3027         return 0;
3028 }
3029
3030 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3031                             struct obd_info *oinfo, int intent, int rc)
3032 {
3033         ENTRY;
3034
3035         if (intent) {
3036                 /* The request was created before ldlm_cli_enqueue call. */
3037                 if (rc == ELDLM_LOCK_ABORTED) {
3038                         struct ldlm_reply *rep;
3039
3040                         /* swabbed by ldlm_cli_enqueue() */
3041                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3042                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3043                                              sizeof(*rep));
3044                         LASSERT(rep != NULL);
3045                         if (rep->lock_policy_res1)
3046                                 rc = rep->lock_policy_res1;
3047                 }
3048         }
3049
3050         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3051                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3052                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3053                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3054                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3055         }
3056
3057         if (!rc)
3058                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3059
3060         /* Call the update callback. */
3061         rc = oinfo->oi_cb_up(oinfo, rc);
3062         RETURN(rc);
3063 }
3064
3065 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3066                                  struct osc_enqueue_args *aa, int rc)
3067 {
3068         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3069         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3070         struct ldlm_lock *lock;
3071
3072         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3073          * be valid. */
3074         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3075
3076         /* Complete obtaining the lock procedure. */
3077         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3078                                    aa->oa_ei->ei_mode,
3079                                    &aa->oa_oi->oi_flags,
3080                                    &lsm->lsm_oinfo[0]->loi_lvb,
3081                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3082                                    lustre_swab_ost_lvb,
3083                                    aa->oa_oi->oi_lockh, rc);
3084
3085         /* Complete osc stuff. */
3086         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3087
3088         /* Release the lock for async request. */
3089         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3090                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3091
3092         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3093                  aa->oa_oi->oi_lockh, req, aa);
3094         LDLM_LOCK_PUT(lock);
3095         return rc;
3096 }
3097
3098 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3099  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3100  * other synchronous requests, however keeping some locks and trying to obtain
3101  * others may take a considerable amount of time in a case of ost failure; and
3102  * when other sync requests do not get released lock from a client, the client
3103  * is excluded from the cluster -- such scenarious make the life difficult, so
3104  * release locks just after they are obtained. */
3105 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3106                        struct ldlm_enqueue_info *einfo,
3107                        struct ptlrpc_request_set *rqset)
3108 {
3109         struct ldlm_res_id res_id;
3110         struct obd_device *obd = exp->exp_obd;
3111         struct ldlm_reply *rep;
3112         struct ptlrpc_request *req = NULL;
3113         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3114         ldlm_mode_t mode;
3115         int rc;
3116         ENTRY;
3117
3118         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3119                            oinfo->oi_md->lsm_object_gr, &res_id);
3120         /* Filesystem lock extents are extended to page boundaries so that
3121          * dealing with the page cache is a little smoother.  */
3122         oinfo->oi_policy.l_extent.start -=
3123                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3124         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3125
3126         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3127                 goto no_match;
3128
3129         /* Next, search for already existing extent locks that will cover us */
3130         /* If we're trying to read, we also search for an existing PW lock.  The
3131          * VFS and page cache already protect us locally, so lots of readers/
3132          * writers can share a single PW lock.
3133          *
3134          * There are problems with conversion deadlocks, so instead of
3135          * converting a read lock to a write lock, we'll just enqueue a new
3136          * one.
3137          *
3138          * At some point we should cancel the read lock instead of making them
3139          * send us a blocking callback, but there are problems with canceling
3140          * locks out from other users right now, too. */
3141         mode = einfo->ei_mode;
3142         if (einfo->ei_mode == LCK_PR)
3143                 mode |= LCK_PW;
3144         mode = ldlm_lock_match(obd->obd_namespace,
3145                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3146                                einfo->ei_type, &oinfo->oi_policy, mode,
3147                                oinfo->oi_lockh);
3148         if (mode) {
3149                 /* addref the lock only if not async requests and PW lock is
3150                  * matched whereas we asked for PR. */
3151                 if (!rqset && einfo->ei_mode != mode)
3152                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3153                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3154                                         oinfo->oi_flags);
3155                 if (intent) {
3156                         /* I would like to be able to ASSERT here that rss <=
3157                          * kms, but I can't, for reasons which are explained in
3158                          * lov_enqueue() */
3159                 }
3160
3161                 /* We already have a lock, and it's referenced */
3162                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3163
3164                 /* For async requests, decref the lock. */
3165                 if (einfo->ei_mode != mode)
3166                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3167                 else if (rqset)
3168                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3169
3170                 RETURN(ELDLM_OK);
3171         }
3172
3173  no_match:
3174         if (intent) {
3175                 __u32 size[3] = {
3176                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3177                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3178                         [DLM_LOCKREQ_OFF + 1] = 0 };
3179
3180                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3181                 if (req == NULL)
3182                         RETURN(-ENOMEM);
3183
3184                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3185                 size[DLM_REPLY_REC_OFF] =
3186                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3187                 ptlrpc_req_set_repsize(req, 3, size);
3188         }
3189
3190         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3191         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3192
3193         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3194                               &oinfo->oi_policy, &oinfo->oi_flags,
3195                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3196                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3197                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3198                               rqset ? 1 : 0);
3199         if (rqset) {
3200                 if (!rc) {
3201                         struct osc_enqueue_args *aa;
3202                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3203                         aa = ptlrpc_req_async_args(req);
3204                         aa->oa_oi = oinfo;
3205                         aa->oa_ei = einfo;
3206                         aa->oa_exp = exp;
3207
3208                         req->rq_interpret_reply = osc_enqueue_interpret;
3209                         ptlrpc_set_add_req(rqset, req);
3210                 } else if (intent) {
3211                         ptlrpc_req_finished(req);
3212                 }
3213                 RETURN(rc);
3214         }
3215
3216         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3217         if (intent)
3218                 ptlrpc_req_finished(req);
3219
3220         RETURN(rc);
3221 }
3222
3223 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3224                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3225                      int *flags, void *data, struct lustre_handle *lockh)
3226 {
3227         struct ldlm_res_id res_id;
3228         struct obd_device *obd = exp->exp_obd;
3229         int lflags = *flags;
3230         ldlm_mode_t rc;
3231         ENTRY;
3232
3233         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3234
3235         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3236
3237         /* Filesystem lock extents are extended to page boundaries so that
3238          * dealing with the page cache is a little smoother */
3239         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3240         policy->l_extent.end |= ~CFS_PAGE_MASK;
3241
3242         /* Next, search for already existing extent locks that will cover us */
3243         /* If we're trying to read, we also search for an existing PW lock.  The
3244          * VFS and page cache already protect us locally, so lots of readers/
3245          * writers can share a single PW lock. */
3246         rc = mode;
3247         if (mode == LCK_PR)
3248                 rc |= LCK_PW;
3249         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3250                              &res_id, type, policy, rc, lockh);
3251         if (rc) {
3252                 osc_set_data_with_check(lockh, data, lflags);
3253                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3254                         ldlm_lock_addref(lockh, LCK_PR);
3255                         ldlm_lock_decref(lockh, LCK_PW);
3256                 }
3257                 RETURN(rc);
3258         }
3259
3260         RETURN(rc);
3261 }
3262
3263 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3264                       __u32 mode, struct lustre_handle *lockh)
3265 {
3266         ENTRY;
3267
3268         if (unlikely(mode == LCK_GROUP))
3269                 ldlm_lock_decref_and_cancel(lockh, mode);
3270         else
3271                 ldlm_lock_decref(lockh, mode);
3272
3273         RETURN(0);
3274 }
3275
3276 static int osc_cancel_unused(struct obd_export *exp,
3277                              struct lov_stripe_md *lsm, int flags, void *opaque)
3278 {
3279         struct obd_device *obd = class_exp2obd(exp);
3280         struct ldlm_res_id res_id, *resp = NULL;
3281
3282         if (lsm != NULL) {
3283                 resp = osc_build_res_name(lsm->lsm_object_id,
3284                                           lsm->lsm_object_gr, &res_id);
3285         }
3286
3287         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3288
3289 }
3290
3291 static int osc_join_lru(struct obd_export *exp,
3292                         struct lov_stripe_md *lsm, int join)
3293 {
3294         struct obd_device *obd = class_exp2obd(exp);
3295         struct ldlm_res_id res_id, *resp = NULL;
3296
3297         if (lsm != NULL) {
3298                 resp = osc_build_res_name(lsm->lsm_object_id,
3299                                           lsm->lsm_object_gr, &res_id);
3300         }
3301
3302         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3303
3304 }
3305
3306 static int osc_statfs_interpret(struct ptlrpc_request *req,
3307                                 struct osc_async_args *aa, int rc)
3308 {
3309         struct obd_statfs *msfs;
3310         ENTRY;
3311
3312         if (rc != 0)
3313                 GOTO(out, rc);
3314
3315         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3316                                   lustre_swab_obd_statfs);
3317         if (msfs == NULL) {
3318                 CERROR("Can't unpack obd_statfs\n");
3319                 GOTO(out, rc = -EPROTO);
3320         }
3321
3322         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3323 out:
3324         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3325         RETURN(rc);
3326 }
3327
3328 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3329                             __u64 max_age, struct ptlrpc_request_set *rqset)
3330 {
3331         struct ptlrpc_request *req;
3332         struct osc_async_args *aa;
3333         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3334         ENTRY;
3335
3336         /* We could possibly pass max_age in the request (as an absolute
3337          * timestamp or a "seconds.usec ago") so the target can avoid doing
3338          * extra calls into the filesystem if that isn't necessary (e.g.
3339          * during mount that would help a bit).  Having relative timestamps
3340          * is not so great if request processing is slow, while absolute
3341          * timestamps are not ideal because they need time synchronization. */
3342         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3343                               OST_STATFS, 1, NULL, NULL);
3344         if (!req)
3345                 RETURN(-ENOMEM);
3346
3347         ptlrpc_req_set_repsize(req, 2, size);
3348         req->rq_request_portal = OST_CREATE_PORTAL;
3349         ptlrpc_at_set_req_timeout(req);
3350         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3351                 /* procfs requests not want stat in wait for avoid deadlock */
3352                 req->rq_no_resend = 1;
3353                 req->rq_no_delay = 1;
3354         }
3355
3356         req->rq_interpret_reply = osc_statfs_interpret;
3357         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3358         aa = ptlrpc_req_async_args(req);
3359         aa->aa_oi = oinfo;
3360
3361         ptlrpc_set_add_req(rqset, req);
3362         RETURN(0);
3363 }
3364
3365 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3366                       __u64 max_age, __u32 flags)
3367 {
3368         struct obd_statfs *msfs;
3369         struct ptlrpc_request *req;
3370         struct obd_import     *imp = NULL;
3371         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3372         int rc;
3373         ENTRY;
3374
3375         /*Since the request might also come from lprocfs, so we need
3376          *sync this with client_disconnect_export Bug15684*/
3377         down_read(&obd->u.cli.cl_sem);
3378         if (obd->u.cli.cl_import)
3379                 imp = class_import_get(obd->u.cli.cl_import);
3380         up_read(&obd->u.cli.cl_sem);
3381         if (!imp)
3382                 RETURN(-ENODEV);
3383
3384         /* We could possibly pass max_age in the request (as an absolute
3385          * timestamp or a "seconds.usec ago") so the target can avoid doing
3386          * extra calls into the filesystem if that isn't necessary (e.g.
3387          * during mount that would help a bit).  Having relative timestamps
3388          * is not so great if request processing is slow, while absolute
3389          * timestamps are not ideal because they need time synchronization. */
3390         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3391                               OST_STATFS, 1, NULL, NULL);
3392
3393         class_import_put(imp);
3394         if (!req)
3395                 RETURN(-ENOMEM);
3396
3397         ptlrpc_req_set_repsize(req, 2, size);
3398         req->rq_request_portal = OST_CREATE_PORTAL;
3399         ptlrpc_at_set_req_timeout(req);
3400
3401         if (flags & OBD_STATFS_NODELAY) {
3402                 /* procfs requests not want stat in wait for avoid deadlock */
3403                 req->rq_no_resend = 1;
3404                 req->rq_no_delay = 1;
3405         }
3406
3407         rc = ptlrpc_queue_wait(req);
3408         if (rc)
3409                 GOTO(out, rc);
3410
3411         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3412                                   lustre_swab_obd_statfs);
3413         if (msfs == NULL) {
3414                 CERROR("Can't unpack obd_statfs\n");
3415                 GOTO(out, rc = -EPROTO);
3416         }
3417
3418         memcpy(osfs, msfs, sizeof(*osfs));
3419
3420         EXIT;
3421  out:
3422         ptlrpc_req_finished(req);
3423         return rc;
3424 }
3425
3426 /* Retrieve object striping information.
3427  *
3428  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3429  * the maximum number of OST indices which will fit in the user buffer.
3430  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3431  */
3432 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3433 {
3434         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3435         struct lov_user_md_v3 lum, *lumk;
3436         int rc = 0, lum_size;
3437         struct lov_user_ost_data_v1 *lmm_objects;
3438         ENTRY;
3439
3440         if (!lsm)
3441                 RETURN(-ENODATA);
3442
3443         /* we only need the header part from user space to get lmm_magic and
3444          * lmm_stripe_count, (the header part is common to v1 and v3) */
3445         lum_size = sizeof(struct lov_user_md_v1);
3446         if (copy_from_user(&lum, lump, lum_size))
3447                 RETURN(-EFAULT);
3448
3449         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3450             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3451                 RETURN(-EINVAL);
3452
3453         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3454         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3455         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3456         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3457
3458         /* we can use lov_mds_md_size() to compute lum_size
3459          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3460         if (lum.lmm_stripe_count > 0) {
3461                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3462                 OBD_ALLOC(lumk, lum_size);
3463                 if (!lumk)
3464                         RETURN(-ENOMEM);
3465                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3466                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3467                 else
3468                         lmm_objects = &(lumk->lmm_objects[0]);
3469                 lmm_objects->l_object_id = lsm->lsm_object_id;
3470         } else {
3471                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3472                 lumk = &lum;
3473         }
3474
3475         lumk->lmm_object_id = lsm->lsm_object_id;
3476         lumk->lmm_stripe_count = 1;
3477
3478         if (copy_to_user(lump, lumk, lum_size))
3479                 rc = -EFAULT;
3480
3481         if (lumk != &lum)
3482                 OBD_FREE(lumk, lum_size);
3483
3484         RETURN(rc);
3485 }
3486
3487
3488 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3489                          void *karg, void *uarg)
3490 {
3491         struct obd_device *obd = exp->exp_obd;
3492         struct obd_ioctl_data *data = karg;
3493         int err = 0;
3494         ENTRY;
3495
3496         if (!try_module_get(THIS_MODULE)) {
3497                 CERROR("Can't get module. Is it alive?");
3498                 return -EINVAL;
3499         }
3500         switch (cmd) {
3501         case OBD_IOC_LOV_GET_CONFIG: {
3502                 char *buf;
3503                 struct lov_desc *desc;
3504                 struct obd_uuid uuid;
3505
3506                 buf = NULL;
3507                 len = 0;
3508                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3509                         GOTO(out, err = -EINVAL);
3510
3511                 data = (struct obd_ioctl_data *)buf;
3512
3513                 if (sizeof(*desc) > data->ioc_inllen1) {
3514                         obd_ioctl_freedata(buf, len);
3515                         GOTO(out, err = -EINVAL);
3516                 }
3517
3518                 if (data->ioc_inllen2 < sizeof(uuid)) {
3519                         obd_ioctl_freedata(buf, len);
3520                         GOTO(out, err = -EINVAL);
3521                 }
3522
3523                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3524                 desc->ld_tgt_count = 1;
3525                 desc->ld_active_tgt_count = 1;
3526                 desc->ld_default_stripe_count = 1;
3527                 desc->ld_default_stripe_size = 0;
3528                 desc->ld_default_stripe_offset = 0;
3529                 desc->ld_pattern = 0;
3530                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3531
3532                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3533
3534                 err = copy_to_user((void *)uarg, buf, len);
3535                 if (err)
3536                         err = -EFAULT;
3537                 obd_ioctl_freedata(buf, len);
3538                 GOTO(out, err);
3539         }
3540         case LL_IOC_LOV_SETSTRIPE:
3541                 err = obd_alloc_memmd(exp, karg);
3542                 if (err > 0)
3543                         err = 0;
3544                 GOTO(out, err);
3545         case LL_IOC_LOV_GETSTRIPE:
3546                 err = osc_getstripe(karg, uarg);
3547                 GOTO(out, err);
3548         case OBD_IOC_CLIENT_RECOVER:
3549                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3550                                             data->ioc_inlbuf1);
3551                 if (err > 0)
3552                         err = 0;
3553                 GOTO(out, err);
3554         case IOC_OSC_SET_ACTIVE:
3555                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3556                                                data->ioc_offset);
3557                 GOTO(out, err);
3558         case OBD_IOC_POLL_QUOTACHECK:
3559                 err = lquota_poll_check(quota_interface, exp,
3560                                         (struct if_quotacheck *)karg);
3561                 GOTO(out, err);
3562         case OBD_IOC_DESTROY: {
3563                 struct obdo            *oa;
3564
3565                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3566                         GOTO (out, err = -EPERM);
3567                 oa = &data->ioc_obdo1;
3568
3569                 if (oa->o_id == 0)
3570                         GOTO(out, err = -EINVAL);
3571
3572                 oa->o_valid |= OBD_MD_FLGROUP;
3573
3574                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3575                 GOTO(out, err);
3576         }
3577         case OBD_IOC_PING_TARGET:
3578                 err = ptlrpc_obd_ping(obd);
3579                 GOTO(out, err);
3580         default:
3581                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3582                        cmd, cfs_curproc_comm());
3583                 GOTO(out, err = -ENOTTY);
3584         }
3585 out:
3586         module_put(THIS_MODULE);
3587         return err;
3588 }
3589
3590 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3591                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3592 {
3593         ENTRY;
3594         if (!vallen || !val)
3595                 RETURN(-EFAULT);
3596
3597         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3598                 __u32 *stripe = val;
3599                 *vallen = sizeof(*stripe);
3600                 *stripe = 0;
3601                 RETURN(0);
3602         } else if (KEY_IS(KEY_LAST_ID)) {
3603                 struct ptlrpc_request *req;
3604                 obd_id *reply;
3605                 char *bufs[2] = { NULL, key };
3606                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3607                 int rc;
3608
3609                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3610                                       OST_GET_INFO, 2, size, bufs);
3611                 if (req == NULL)
3612                         RETURN(-ENOMEM);
3613
3614                 size[REPLY_REC_OFF] = *vallen;
3615                 ptlrpc_req_set_repsize(req, 2, size);
3616                 rc = ptlrpc_queue_wait(req);
3617                 if (rc)
3618                         GOTO(out, rc);
3619
3620                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3621                                            lustre_swab_ost_last_id);
3622                 if (reply == NULL) {
3623                         CERROR("Can't unpack OST last ID\n");
3624                         GOTO(out, rc = -EPROTO);
3625                 }
3626                 *((obd_id *)val) = *reply;
3627         out:
3628                 ptlrpc_req_finished(req);
3629                 RETURN(rc);
3630         } else if (KEY_IS(KEY_FIEMAP)) {
3631                 struct ptlrpc_request *req;
3632                 struct ll_user_fiemap *reply;
3633                 char *bufs[2] = { NULL, key };
3634                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3635                 int rc;
3636
3637                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3638                                       OST_GET_INFO, 2, size, bufs);
3639                 if (req == NULL)
3640                         RETURN(-ENOMEM);
3641
3642                 size[REPLY_REC_OFF] = *vallen;
3643                 ptlrpc_req_set_repsize(req, 2, size);
3644
3645                 rc = ptlrpc_queue_wait(req);
3646                 if (rc)
3647                         GOTO(out1, rc);
3648                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3649                                            lustre_swab_fiemap);
3650                 if (reply == NULL) {
3651                         CERROR("Can't unpack FIEMAP reply.\n");
3652                         GOTO(out1, rc = -EPROTO);
3653                 }
3654
3655                 memcpy(val, reply, *vallen);
3656
3657         out1:
3658                 ptlrpc_req_finished(req);
3659
3660                 RETURN(rc);
3661         }
3662
3663         RETURN(-EINVAL);
3664 }
3665
3666 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3667                                           void *aa, int rc)
3668 {
3669         struct llog_ctxt *ctxt;
3670         struct obd_import *imp = req->rq_import;
3671         ENTRY;
3672
3673         if (rc != 0)
3674                 RETURN(rc);
3675
3676         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3677         if (ctxt) {
3678                 if (rc == 0)
3679                         rc = llog_initiator_connect(ctxt);
3680                 else
3681                         CERROR("cannot establish connection for "
3682                                "ctxt %p: %d\n", ctxt, rc);
3683         }
3684
3685         llog_ctxt_put(ctxt);
3686         spin_lock(&imp->imp_lock);
3687         imp->imp_server_timeout = 1;
3688         imp->imp_pingable = 1;
3689         spin_unlock(&imp->imp_lock);
3690         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3691
3692         RETURN(rc);
3693 }
3694
3695 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3696                               void *key, obd_count vallen, void *val,
3697                               struct ptlrpc_request_set *set)
3698 {
3699         struct ptlrpc_request *req;
3700         struct obd_device  *obd = exp->exp_obd;
3701         struct obd_import *imp = class_exp2cliimp(exp);
3702         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3703         char *bufs[3] = { NULL, key, val };
3704         ENTRY;
3705
3706         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3707
3708         if (KEY_IS(KEY_NEXT_ID)) {
3709                 if (vallen != sizeof(obd_id))
3710                         RETURN(-EINVAL);
3711                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3712                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3713                        exp->exp_obd->obd_name,
3714                        obd->u.cli.cl_oscc.oscc_next_id);
3715
3716                 RETURN(0);
3717         }
3718
3719         if (KEY_IS(KEY_UNLINKED)) {
3720                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3721                 spin_lock(&oscc->oscc_lock);
3722                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3723                 spin_unlock(&oscc->oscc_lock);
3724                 RETURN(0);
3725         }
3726
3727         if (KEY_IS(KEY_INIT_RECOV)) {
3728                 if (vallen != sizeof(int))
3729                         RETURN(-EINVAL);
3730                 spin_lock(&imp->imp_lock);
3731                 imp->imp_initial_recov = *(int *)val;
3732                 spin_unlock(&imp->imp_lock);
3733                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3734                        exp->exp_obd->obd_name,
3735                        imp->imp_initial_recov);
3736                 RETURN(0);
3737         }
3738
3739         if (KEY_IS(KEY_CHECKSUM)) {
3740                 if (vallen != sizeof(int))
3741                         RETURN(-EINVAL);
3742                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3743                 RETURN(0);
3744         }
3745
3746         if (!set)
3747                 RETURN(-EINVAL);
3748
3749         /* We pass all other commands directly to OST. Since nobody calls osc
3750            methods directly and everybody is supposed to go through LOV, we
3751            assume lov checked invalid values for us.
3752            The only recognised values so far are evict_by_nid and mds_conn.
3753            Even if something bad goes through, we'd get a -EINVAL from OST
3754            anyway. */
3755
3756         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3757                               bufs);
3758         if (req == NULL)
3759                 RETURN(-ENOMEM);
3760
3761         if (KEY_IS(KEY_MDS_CONN))
3762                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3763
3764         ptlrpc_req_set_repsize(req, 1, NULL);
3765         ptlrpc_set_add_req(set, req);
3766         ptlrpc_check_set(set);
3767
3768         RETURN(0);
3769 }
3770
3771
3772 static struct llog_operations osc_size_repl_logops = {
3773         lop_cancel: llog_obd_repl_cancel
3774 };
3775
3776 static struct llog_operations osc_mds_ost_orig_logops;
3777 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3778                          int count, struct llog_catid *catid,
3779                          struct obd_uuid *uuid)
3780 {
3781         int rc;
3782         ENTRY;
3783
3784         spin_lock(&obd->obd_dev_lock);
3785         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3786                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3787                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3788                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3789                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3790                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3791         }
3792         spin_unlock(&obd->obd_dev_lock);
3793
3794         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3795                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3796         if (rc) {
3797                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3798                 GOTO (out, rc);
3799         }
3800
3801         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3802                         &osc_size_repl_logops);
3803         if (rc) {
3804                 struct llog_ctxt *ctxt = 
3805                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3806                 if (ctxt)
3807                         llog_cleanup(ctxt);
3808                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3809         }
3810 out:
3811         if (rc) {
3812                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3813                        obd->obd_name, tgt->obd_name, count, catid, rc);
3814                 CERROR("logid "LPX64":0x%x\n",
3815                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3816         }
3817         RETURN(rc);
3818 }
3819
3820 static int osc_llog_finish(struct obd_device *obd, int count)
3821 {
3822         struct llog_ctxt *ctxt;
3823         int rc = 0, rc2 = 0;
3824         ENTRY;
3825
3826         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3827         if (ctxt)
3828                 rc = llog_cleanup(ctxt);
3829
3830         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3831         if (ctxt)
3832                 rc2 = llog_cleanup(ctxt);
3833         if (!rc)
3834                 rc = rc2;
3835
3836         RETURN(rc);
3837 }
3838
3839 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3840                          struct obd_uuid *cluuid,
3841                          struct obd_connect_data *data,
3842                          void *localdata)
3843 {
3844         struct client_obd *cli = &obd->u.cli;
3845
3846         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3847                 long lost_grant;
3848
3849                 client_obd_list_lock(&cli->cl_loi_list_lock);
3850                 data->ocd_grant = cli->cl_avail_grant ?:
3851                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3852                 lost_grant = cli->cl_lost_grant;
3853                 cli->cl_lost_grant = 0;
3854                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3855
3856                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3857                        "cl_lost_grant: %ld\n", data->ocd_grant,
3858                        cli->cl_avail_grant, lost_grant);
3859                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3860                        " ocd_grant: %d\n", data->ocd_connect_flags,
3861                        data->ocd_version, data->ocd_grant);
3862         }
3863
3864         RETURN(0);
3865 }
3866
3867 static int osc_disconnect(struct obd_export *exp)
3868 {
3869         struct obd_device *obd = class_exp2obd(exp);
3870         struct llog_ctxt  *ctxt;
3871         int rc;
3872
3873         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3874         if (ctxt) {
3875                 if (obd->u.cli.cl_conn_count == 1) {
3876                         /* Flush any remaining cancel messages out to the 
3877                          * target */
3878                         llog_sync(ctxt, exp);
3879                 }
3880                 llog_ctxt_put(ctxt);
3881         } else {
3882                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", 
3883                        obd);
3884         }
3885
3886         rc = client_disconnect_export(exp);
3887         return rc;
3888 }
3889
3890 static int osc_import_event(struct obd_device *obd,
3891                             struct obd_import *imp,
3892                             enum obd_import_event event)
3893 {
3894         struct client_obd *cli;
3895         int rc = 0;
3896
3897         ENTRY;
3898         LASSERT(imp->imp_obd == obd);
3899
3900         switch (event) {
3901         case IMP_EVENT_DISCON: {
3902                 /* Only do this on the MDS OSC's */
3903                 if (imp->imp_server_timeout) {
3904                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3905
3906                         spin_lock(&oscc->oscc_lock);
3907                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3908                         spin_unlock(&oscc->oscc_lock);
3909                 }
3910                 cli = &obd->u.cli;
3911                 client_obd_list_lock(&cli->cl_loi_list_lock);
3912                 cli->cl_avail_grant = 0;
3913                 cli->cl_lost_grant = 0;
3914                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3915                 ptlrpc_import_setasync(imp, -1);
3916
3917                 break;
3918         }
3919         case IMP_EVENT_INACTIVE: {
3920                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3921                 break;
3922         }
3923         case IMP_EVENT_INVALIDATE: {
3924                 struct ldlm_namespace *ns = obd->obd_namespace;
3925
3926                 /* Reset grants */
3927                 cli = &obd->u.cli;
3928                 client_obd_list_lock(&cli->cl_loi_list_lock);
3929                 /* all pages go to failing rpcs due to the invalid import */
3930                 osc_check_rpcs(cli);
3931                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3932
3933                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3934
3935                 break;
3936         }
3937         case IMP_EVENT_ACTIVE: {
3938                 /* Only do this on the MDS OSC's */
3939                 if (imp->imp_server_timeout) {
3940                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3941
3942                         spin_lock(&oscc->oscc_lock);
3943                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3944                         spin_unlock(&oscc->oscc_lock);
3945                 }
3946                 CDEBUG(D_INFO, "notify server \n");
3947                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3948                 break;
3949         }
3950         case IMP_EVENT_OCD: {
3951                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3952
3953                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3954                         osc_init_grant(&obd->u.cli, ocd);
3955
3956                 /* See bug 7198 */
3957                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3958                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3959
3960                 ptlrpc_import_setasync(imp, 1);
3961                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3962                 break;
3963         }
3964         default:
3965                 CERROR("Unknown import event %d\n", event);
3966                 LBUG();
3967         }
3968         RETURN(rc);
3969 }
3970
3971 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3972 {
3973         int rc;
3974         ENTRY;
3975
3976         ENTRY;
3977         rc = ptlrpcd_addref();
3978         if (rc)
3979                 RETURN(rc);
3980
3981         rc = client_obd_setup(obd, len, buf);
3982         if (rc) {
3983                 ptlrpcd_decref();
3984         } else {
3985                 struct lprocfs_static_vars lvars = { 0 };
3986                 struct client_obd *cli = &obd->u.cli;
3987
3988                 lprocfs_osc_init_vars(&lvars);
3989                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3990                         lproc_osc_attach_seqstat(obd);
3991                         ptlrpc_lprocfs_register_obd(obd);
3992                 }
3993
3994                 oscc_init(obd);
3995                 /* We need to allocate a few requests more, because
3996                    brw_interpret tries to create new requests before freeing
3997                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3998                    reserved, but I afraid that might be too much wasted RAM
3999                    in fact, so 2 is just my guess and still should work. */
4000                 cli->cl_import->imp_rq_pool =
4001                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4002                                             OST_MAXREQSIZE,
4003                                             ptlrpc_add_rqs_to_pool);
4004                 cli->cl_cache = cache_create(obd);
4005                 if (!cli->cl_cache) {
4006                         osc_cleanup(obd);
4007                         rc = -ENOMEM;
4008                 }
4009         }
4010
4011         RETURN(rc);
4012 }
4013
4014 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4015 {
4016         int rc = 0;
4017         ENTRY;
4018
4019         switch (stage) {
4020         case OBD_CLEANUP_EARLY: {
4021                 struct obd_import *imp;
4022                 imp = obd->u.cli.cl_import;
4023                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4024                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4025                 ptlrpc_deactivate_import(imp);
4026                 break;
4027         }
4028         case OBD_CLEANUP_EXPORTS: {
4029                 /* If we set up but never connected, the
4030                    client import will not have been cleaned. */
4031                 if (obd->u.cli.cl_import) {
4032                         struct obd_import *imp;
4033                         down_write(&obd->u.cli.cl_sem);
4034                         imp = obd->u.cli.cl_import;
4035                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4036                                obd->obd_name);
4037                         ptlrpc_invalidate_import(imp);
4038                         if (imp->imp_rq_pool) {
4039                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4040                                 imp->imp_rq_pool = NULL;
4041                         }
4042                         class_destroy_import(imp);
4043                         up_write(&obd->u.cli.cl_sem);
4044                         obd->u.cli.cl_import = NULL;
4045                 }
4046                 rc = obd_llog_finish(obd, 0);
4047                 if (rc != 0)
4048                         CERROR("failed to cleanup llogging subsystems\n");
4049                 break;
4050         }
4051         case OBD_CLEANUP_SELF_EXP:
4052                 break;
4053         case OBD_CLEANUP_OBD:
4054                 break;
4055         }
4056         RETURN(rc);
4057 }
4058
4059 int osc_cleanup(struct obd_device *obd)
4060 {
4061         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4062         int rc;
4063
4064         ENTRY;
4065         ptlrpc_lprocfs_unregister_obd(obd);
4066         lprocfs_obd_cleanup(obd);
4067
4068         spin_lock(&oscc->oscc_lock);
4069         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4070         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4071         spin_unlock(&oscc->oscc_lock);
4072
4073         /* free memory of osc quota cache */
4074         lquota_cleanup(quota_interface, obd);
4075
4076         cache_destroy(obd->u.cli.cl_cache);
4077         rc = client_obd_cleanup(obd);
4078
4079         ptlrpcd_decref();
4080         RETURN(rc);
4081 }
4082
4083 static int osc_register_page_removal_cb(struct obd_export *exp,
4084                                         obd_page_removal_cb_t func,
4085                                         obd_pin_extent_cb pin_cb)
4086 {
4087         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4088                                            pin_cb);
4089 }
4090
4091 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4092                                           obd_page_removal_cb_t func)
4093 {
4094         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4095 }
4096
4097 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4098                                        obd_lock_cancel_cb cb)
4099 {
4100         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4101
4102         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4103         return 0;
4104 }
4105
4106 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4107                                          obd_lock_cancel_cb cb)
4108 {
4109         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4110                 CERROR("Unregistering cancel cb %p, while only %p was "
4111                        "registered\n", cb,
4112                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4113                 RETURN(-EINVAL);
4114         }
4115
4116         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4117         return 0;
4118 }
4119
4120 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4121 {
4122         struct lustre_cfg *lcfg = buf;
4123         struct lprocfs_static_vars lvars = { 0 };
4124         int rc = 0;
4125
4126         lprocfs_osc_init_vars(&lvars);
4127
4128         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4129         return(rc);
4130 }
4131
4132 struct obd_ops osc_obd_ops = {
4133         .o_owner                = THIS_MODULE,
4134         .o_setup                = osc_setup,
4135         .o_precleanup           = osc_precleanup,
4136         .o_cleanup              = osc_cleanup,
4137         .o_add_conn             = client_import_add_conn,
4138         .o_del_conn             = client_import_del_conn,
4139         .o_connect              = client_connect_import,
4140         .o_reconnect            = osc_reconnect,
4141         .o_disconnect           = osc_disconnect,
4142         .o_statfs               = osc_statfs,
4143         .o_statfs_async         = osc_statfs_async,
4144         .o_packmd               = osc_packmd,
4145         .o_unpackmd             = osc_unpackmd,
4146         .o_precreate            = osc_precreate,
4147         .o_create               = osc_create,
4148         .o_destroy              = osc_destroy,
4149         .o_getattr              = osc_getattr,
4150         .o_getattr_async        = osc_getattr_async,
4151         .o_setattr              = osc_setattr,
4152         .o_setattr_async        = osc_setattr_async,
4153         .o_brw                  = osc_brw,
4154         .o_brw_async            = osc_brw_async,
4155         .o_prep_async_page      = osc_prep_async_page,
4156         .o_reget_short_lock     = osc_reget_short_lock,
4157         .o_release_short_lock   = osc_release_short_lock,
4158         .o_queue_async_io       = osc_queue_async_io,
4159         .o_set_async_flags      = osc_set_async_flags,
4160         .o_queue_group_io       = osc_queue_group_io,
4161         .o_trigger_group_io     = osc_trigger_group_io,
4162         .o_teardown_async_page  = osc_teardown_async_page,
4163         .o_punch                = osc_punch,
4164         .o_sync                 = osc_sync,
4165         .o_enqueue              = osc_enqueue,
4166         .o_match                = osc_match,
4167         .o_change_cbdata        = osc_change_cbdata,
4168         .o_cancel               = osc_cancel,
4169         .o_cancel_unused        = osc_cancel_unused,
4170         .o_join_lru             = osc_join_lru,
4171         .o_iocontrol            = osc_iocontrol,
4172         .o_get_info             = osc_get_info,
4173         .o_set_info_async       = osc_set_info_async,
4174         .o_import_event         = osc_import_event,
4175         .o_llog_init            = osc_llog_init,
4176         .o_llog_finish          = osc_llog_finish,
4177         .o_process_config       = osc_process_config,
4178         .o_register_page_removal_cb = osc_register_page_removal_cb,
4179         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4180         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4181         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4182 };
4183 int __init osc_init(void)
4184 {
4185         struct lprocfs_static_vars lvars = { 0 };
4186         int rc;
4187         ENTRY;
4188
4189         lprocfs_osc_init_vars(&lvars);
4190
4191         request_module("lquota");
4192         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4193         lquota_init(quota_interface);
4194         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4195
4196         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4197                                  LUSTRE_OSC_NAME);
4198         if (rc) {
4199                 if (quota_interface)
4200                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4201                 RETURN(rc);
4202         }
4203
4204         RETURN(rc);
4205 }
4206
4207 #ifdef __KERNEL__
4208 static void /*__exit*/ osc_exit(void)
4209 {
4210         lquota_exit(quota_interface);
4211         if (quota_interface)
4212                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4213
4214         class_unregister_type(LUSTRE_OSC_NAME);
4215 }
4216
4217 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4218 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4219 MODULE_LICENSE("GPL");
4220
4221 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4222 #endif