Whamcloud - gitweb
Branch: b1_8
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  struct osc_async_args *aa, int rc)
168 {
169         struct ost_body *body;
170         ENTRY;
171
172         if (rc != 0)
173                 GOTO(out, rc);
174
175         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
176                                   lustre_swab_ost_body);
177         if (body) {
178                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
179                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
180
181                 /* This should really be sent by the OST */
182                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
183                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
184         } else {
185                 CERROR("can't unpack ost_body\n");
186                 rc = -EPROTO;
187                 aa->aa_oi->oi_oa->o_valid = 0;
188         }
189 out:
190         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
191         RETURN(rc);
192 }
193
194 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
195                              struct ptlrpc_request_set *set)
196 {
197         struct ptlrpc_request *req;
198         struct ost_body *body;
199         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
200         struct osc_async_args *aa;
201         ENTRY;
202
203         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
204                               OST_GETATTR, 2, size,NULL);
205         if (!req)
206                 RETURN(-ENOMEM);
207
208         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
209         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
210
211         ptlrpc_req_set_repsize(req, 2, size);
212         req->rq_interpret_reply = osc_getattr_interpret;
213
214         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
215         aa = ptlrpc_req_async_args(req);
216         aa->aa_oi = oinfo;
217
218         ptlrpc_set_add_req(set, req);
219         RETURN (0);
220 }
221
222 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
223 {
224         struct ptlrpc_request *req;
225         struct ost_body *body;
226         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
227         int rc;
228         ENTRY;
229
230         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
231                               OST_GETATTR, 2, size, NULL);
232         if (!req)
233                 RETURN(-ENOMEM);
234
235         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
236         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
237
238         ptlrpc_req_set_repsize(req, 2, size);
239
240         rc = ptlrpc_queue_wait(req);
241         if (rc) {
242                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
243                 GOTO(out, rc);
244         }
245
246         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
247                                   lustre_swab_ost_body);
248         if (body == NULL) {
249                 CERROR ("can't unpack ost_body\n");
250                 GOTO (out, rc = -EPROTO);
251         }
252
253         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
254         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
255
256         /* This should really be sent by the OST */
257         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
258         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
259
260         EXIT;
261  out:
262         ptlrpc_req_finished(req);
263         return rc;
264 }
265
266 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
267                        struct obd_trans_info *oti)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body *body;
271         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
272         int rc;
273         ENTRY;
274
275         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
276                               OST_SETATTR, 2, size, NULL);
277         if (!req)
278                 RETURN(-ENOMEM);
279
280         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
281         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
282
283         ptlrpc_req_set_repsize(req, 2, size);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
290                                   lustre_swab_ost_body);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
295
296         EXIT;
297 out:
298         ptlrpc_req_finished(req);
299         RETURN(rc);
300 }
301
302 static int osc_setattr_interpret(struct ptlrpc_request *req,
303                                  struct osc_async_args *aa, int rc)
304 {
305         struct ost_body *body;
306         ENTRY;
307
308         if (rc != 0)
309                 GOTO(out, rc);
310
311         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
312                                   lustre_swab_ost_body);
313         if (body == NULL) {
314                 CERROR("can't unpack ost_body\n");
315                 GOTO(out, rc = -EPROTO);
316         }
317
318         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
319 out:
320         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
321         RETURN(rc);
322 }
323
324 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
325                              struct obd_trans_info *oti,
326                              struct ptlrpc_request_set *rqset)
327 {
328         struct ptlrpc_request *req;
329         struct ost_body *body;
330         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
331         int bufcount = 2;
332         struct osc_async_args *aa;
333         ENTRY;
334
335         if (osc_exp_is_2_0_server(exp)) {
336                 bufcount = 3;
337         }
338
339         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
340                               OST_SETATTR, bufcount, size, NULL);
341         if (!req)
342                 RETURN(-ENOMEM);
343
344         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
345
346         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
347                 LASSERT(oti);
348                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
349         }
350
351         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
352         ptlrpc_req_set_repsize(req, 2, size);
353         /* do mds to ost setattr asynchronouly */
354         if (!rqset) {
355                 /* Do not wait for response. */
356                 ptlrpcd_add_req(req);
357         } else {
358                 req->rq_interpret_reply = osc_setattr_interpret;
359
360                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
361                 aa = ptlrpc_req_async_args(req);
362                 aa->aa_oi = oinfo;
363
364                 ptlrpc_set_add_req(rqset, req);
365         }
366
367         RETURN(0);
368 }
369
370 int osc_real_create(struct obd_export *exp, struct obdo *oa,
371                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body *body;
375         struct lov_stripe_md *lsm;
376         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
377         int rc;
378         ENTRY;
379
380         LASSERT(oa);
381         LASSERT(ea);
382
383         lsm = *ea;
384         if (!lsm) {
385                 rc = obd_alloc_memmd(exp, &lsm);
386                 if (rc < 0)
387                         RETURN(rc);
388         }
389
390         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
391                               OST_CREATE, 2, size, NULL);
392         if (!req)
393                 GOTO(out, rc = -ENOMEM);
394
395         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
396         memcpy(&body->oa, oa, sizeof(body->oa));
397
398         ptlrpc_req_set_repsize(req, 2, size);
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
412                                   lustre_swab_ost_body);
413         if (body == NULL) {
414                 CERROR ("can't unpack ost_body\n");
415                 GOTO (out_req, rc = -EPROTO);
416         }
417
418         memcpy(oa, &body->oa, sizeof(*oa));
419
420         /* This should really be sent by the OST */
421         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
422         oa->o_valid |= OBD_MD_FLBLKSZ;
423
424         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
425          * have valid lsm_oinfo data structs, so don't go touching that.
426          * This needs to be fixed in a big way.
427          */
428         lsm->lsm_object_id = oa->o_id;
429         *ea = lsm;
430
431         if (oti != NULL) {
432                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
433
434                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
435                         if (!oti->oti_logcookies)
436                                 oti_alloc_cookies(oti, 1);
437                         *oti->oti_logcookies = oa->o_lcookie;
438                 }
439         }
440
441         CDEBUG(D_HA, "transno: "LPD64"\n",
442                lustre_msg_get_transno(req->rq_repmsg));
443 out_req:
444         ptlrpc_req_finished(req);
445 out:
446         if (rc && !*ea)
447                 obd_free_memmd(exp, &lsm);
448         RETURN(rc);
449 }
450
451 static int osc_punch_interpret(struct ptlrpc_request *req,
452                                struct osc_async_args *aa, int rc)
453 {
454         struct ost_body *body;
455         ENTRY;
456
457         if (rc != 0)
458                 GOTO(out, rc);
459
460         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
461                                   lustre_swab_ost_body);
462         if (body == NULL) {
463                 CERROR ("can't unpack ost_body\n");
464                 GOTO(out, rc = -EPROTO);
465         }
466
467         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
468 out:
469         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
470         RETURN(rc);
471 }
472
473 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
474                      struct obd_trans_info *oti,
475                      struct ptlrpc_request_set *rqset)
476 {
477         struct ptlrpc_request *req;
478         struct osc_async_args *aa;
479         struct ost_body *body;
480         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
481         ENTRY;
482
483         if (!oinfo->oi_oa) {
484                 CERROR("oa NULL\n");
485                 RETURN(-EINVAL);
486         }
487
488         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
489                               OST_PUNCH, 2, size, NULL);
490         if (!req)
491                 RETURN(-ENOMEM);
492
493         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
494         ptlrpc_at_set_req_timeout(req);
495
496         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
497         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
498
499         /* overload the size and blocks fields in the oa with start/end */
500         body->oa.o_size = oinfo->oi_policy.l_extent.start;
501         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
502         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
503
504         ptlrpc_req_set_repsize(req, 2, size);
505
506         req->rq_interpret_reply = osc_punch_interpret;
507         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
508         aa = ptlrpc_req_async_args(req);
509         aa->aa_oi = oinfo;
510         ptlrpc_set_add_req(rqset, req);
511
512         RETURN(0);
513 }
514
515 static int osc_sync_interpret(struct ptlrpc_request *req,
516                               struct osc_async_args *aa, int rc)
517 {
518         struct ost_body *body;
519         ENTRY;
520
521         if (rc)
522                 GOTO(out, rc);
523
524         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
525                                   lustre_swab_ost_body);
526         if (body == NULL) {
527                 CERROR ("can't unpack ost_body\n");
528                 GOTO(out, rc = -EPROTO);
529         }
530
531         *aa->aa_oi->oi_oa = body->oa;
532 out:
533         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
534         RETURN(rc);
535 }
536
537 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
538                     obd_size start, obd_size end,
539                     struct ptlrpc_request_set *set)
540 {
541         struct ptlrpc_request *req;
542         struct ost_body *body;
543         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
544         struct osc_async_args *aa;
545         ENTRY;
546
547         if (!oinfo->oi_oa) {
548                 CERROR("oa NULL\n");
549                 RETURN(-EINVAL);
550         }
551
552         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
553                               OST_SYNC, 2, size, NULL);
554         if (!req)
555                 RETURN(-ENOMEM);
556
557         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
558         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
559
560         /* overload the size and blocks fields in the oa with start/end */
561         body->oa.o_size = start;
562         body->oa.o_blocks = end;
563         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
564
565         ptlrpc_req_set_repsize(req, 2, size);
566         req->rq_interpret_reply = osc_sync_interpret;
567
568         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
569         aa = ptlrpc_req_async_args(req);
570         aa->aa_oi = oinfo;
571
572         ptlrpc_set_add_req(set, req);
573         RETURN (0);
574 }
575
576 /* Find and cancel locally locks matched by @mode in the resource found by
577  * @objid. Found locks are added into @cancel list. Returns the amount of
578  * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580                                    struct list_head *cancels, ldlm_mode_t mode,
581                                    int lock_flags)
582 {
583         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584         struct ldlm_res_id res_id;
585         struct ldlm_resource *res;
586         int count;
587         ENTRY;
588
589         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
590         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
591         if (res == NULL)
592                 RETURN(0);
593
594         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
595                                            lock_flags, 0, NULL);
596         ldlm_resource_putref(res);
597         RETURN(count);
598 }
599
600 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
601                                  int rc)
602 {
603         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
604
605         atomic_dec(&cli->cl_destroy_in_flight);
606         cfs_waitq_signal(&cli->cl_destroy_waitq);
607         return 0;
608 }
609
610 static int osc_can_send_destroy(struct client_obd *cli)
611 {
612         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
613             cli->cl_max_rpcs_in_flight) {
614                 /* The destroy request can be sent */
615                 return 1;
616         }
617         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
618             cli->cl_max_rpcs_in_flight) {
619                 /*
620                  * The counter has been modified between the two atomic
621                  * operations.
622                  */
623                 cfs_waitq_signal(&cli->cl_destroy_waitq);
624         }
625         return 0;
626 }
627
628 /* Destroy requests can be async always on the client, and we don't even really
629  * care about the return code since the client cannot do anything at all about
630  * a destroy failure.
631  * When the MDS is unlinking a filename, it saves the file objects into a
632  * recovery llog, and these object records are cancelled when the OST reports
633  * they were destroyed and sync'd to disk (i.e. transaction committed).
634  * If the client dies, or the OST is down when the object should be destroyed,
635  * the records are not cancelled, and when the OST reconnects to the MDS next,
636  * it will retrieve the llog unlink logs and then sends the log cancellation
637  * cookies to the MDS after committing destroy transactions. */
638 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
639                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
640                        struct obd_export *md_export)
641 {
642         CFS_LIST_HEAD(cancels);
643         struct ptlrpc_request *req;
644         struct ost_body *body;
645         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
646                         sizeof(struct ldlm_request) };
647         int count, bufcount = 2;
648         struct client_obd *cli = &exp->exp_obd->u.cli;
649         ENTRY;
650
651         if (!oa) {
652                 CERROR("oa NULL\n");
653                 RETURN(-EINVAL);
654         }
655
656         LASSERT(oa->o_id != 0);
657
658         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
659                                         LDLM_FL_DISCARD_DATA);
660         if (exp_connect_cancelset(exp))
661                 bufcount = 3;
662         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
663                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
664         if (!req)
665                 RETURN(-ENOMEM);
666
667         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
668         ptlrpc_at_set_req_timeout(req);
669
670         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
671
672         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
673                 oa->o_lcookie = *oti->oti_logcookies;
674         }
675
676         memcpy(&body->oa, oa, sizeof(*oa));
677         ptlrpc_req_set_repsize(req, 2, size);
678
679         /* don't throttle destroy RPCs for the MDT */
680         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
681                 req->rq_interpret_reply = osc_destroy_interpret;
682                 if (!osc_can_send_destroy(cli)) {
683                         struct l_wait_info lwi = { 0 };
684
685                         /*
686                          * Wait until the number of on-going destroy RPCs drops
687                          * under max_rpc_in_flight
688                          */
689                         l_wait_event_exclusive(cli->cl_destroy_waitq,
690                                                osc_can_send_destroy(cli), &lwi);
691                 }
692         }
693
694         /* Do not wait for response */
695         ptlrpcd_add_req(req);
696         RETURN(0);
697 }
698
699 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
700                                 long writing_bytes)
701 {
702         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
703
704         LASSERT(!(oa->o_valid & bits));
705
706         oa->o_valid |= bits;
707         client_obd_list_lock(&cli->cl_loi_list_lock);
708         oa->o_dirty = cli->cl_dirty;
709         if (cli->cl_dirty > cli->cl_dirty_max) {
710                 CERROR("dirty %lu > dirty_max %lu\n",
711                        cli->cl_dirty, cli->cl_dirty_max);
712                 oa->o_undirty = 0;
713         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
714                 CERROR("dirty %d > system dirty_max %d\n",
715                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
716                 oa->o_undirty = 0;
717         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
718                 CERROR("dirty %lu - dirty_max %lu too big???\n",
719                        cli->cl_dirty, cli->cl_dirty_max);
720                 oa->o_undirty = 0;
721         } else {
722                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
723                                 (cli->cl_max_rpcs_in_flight + 1);
724                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
725         }
726         oa->o_grant = cli->cl_avail_grant;
727         oa->o_dropped = cli->cl_lost_grant;
728         cli->cl_lost_grant = 0;
729         client_obd_list_unlock(&cli->cl_loi_list_lock);
730         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
731                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
732
733 }
734
735 static void osc_update_next_shrink(struct client_obd *cli)
736 {
737         int time = GRANT_SHRINK_INTERVAL;
738         cli->cl_next_shrink_grant = cfs_time_shift(time);
739         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
740                cli->cl_next_shrink_grant);
741 }
742
743 /* caller must hold loi_list_lock */
744 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
745 {
746         atomic_inc(&obd_dirty_pages);
747         cli->cl_dirty += CFS_PAGE_SIZE;
748         cli->cl_avail_grant -= CFS_PAGE_SIZE;
749         pga->flag |= OBD_BRW_FROM_GRANT;
750         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
751                CFS_PAGE_SIZE, pga, pga->pg);
752         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
753                  cli->cl_avail_grant);
754         osc_update_next_shrink(cli);
755 }
756
757 /* the companion to osc_consume_write_grant, called when a brw has completed.
758  * must be called with the loi lock held. */
759 static void osc_release_write_grant(struct client_obd *cli,
760                                     struct brw_page *pga, int sent)
761 {
762         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
763         ENTRY;
764
765         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
766                 EXIT;
767                 return;
768         }
769
770         pga->flag &= ~OBD_BRW_FROM_GRANT;
771         atomic_dec(&obd_dirty_pages);
772         cli->cl_dirty -= CFS_PAGE_SIZE;
773         if (!sent) {
774                 cli->cl_lost_grant += CFS_PAGE_SIZE;
775                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
776                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
777         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
778                 /* For short writes we shouldn't count parts of pages that
779                  * span a whole block on the OST side, or our accounting goes
780                  * wrong.  Should match the code in filter_grant_check. */
781                 int offset = pga->off & ~CFS_PAGE_MASK;
782                 int count = pga->count + (offset & (blocksize - 1));
783                 int end = (offset + pga->count) & (blocksize - 1);
784                 if (end)
785                         count += blocksize - end;
786
787                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
788                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
789                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
790                        cli->cl_avail_grant, cli->cl_dirty);
791         }
792
793         EXIT;
794 }
795
796 static unsigned long rpcs_in_flight(struct client_obd *cli)
797 {
798         return cli->cl_r_in_flight + cli->cl_w_in_flight;
799 }
800
801 /* caller must hold loi_list_lock */
802 void osc_wake_cache_waiters(struct client_obd *cli)
803 {
804         struct list_head *l, *tmp;
805         struct osc_cache_waiter *ocw;
806
807         ENTRY;
808         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
809                 /* if we can't dirty more, we must wait until some is written */
810                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
811                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
812                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
813                                "osc max %ld, sys max %d\n", cli->cl_dirty,
814                                cli->cl_dirty_max, obd_max_dirty_pages);
815                         return;
816                 }
817
818                 /* if still dirty cache but no grant wait for pending RPCs that
819                  * may yet return us some grant before doing sync writes */
820                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
821                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
822                                cli->cl_w_in_flight);
823                         return;
824                 }
825
826                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
827                 list_del_init(&ocw->ocw_entry);
828                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
829                         /* no more RPCs in flight to return grant, do sync IO */
830                         ocw->ocw_rc = -EDQUOT;
831                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
832                 } else {
833                         osc_consume_write_grant(cli,
834                                                 &ocw->ocw_oap->oap_brw_page);
835                 }
836
837                 cfs_waitq_signal(&ocw->ocw_waitq);
838         }
839
840         EXIT;
841 }
842
843 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
844 {
845         client_obd_list_lock(&cli->cl_loi_list_lock);
846         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
847         if (body->oa.o_valid & OBD_MD_FLGRANT)
848                 cli->cl_avail_grant += body->oa.o_grant;
849         /* waiters are woken in brw_interpret */
850         client_obd_list_unlock(&cli->cl_loi_list_lock);
851 }
852
853 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
854                               void *key, obd_count vallen, void *val,
855                               struct ptlrpc_request_set *set);
856
857 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
858                                       struct osc_grant_args *aa, int rc)
859 {
860         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
861         struct obdo *oa = aa->aa_oa;
862         struct ost_body *body;
863         
864         if (rc != 0) {
865                 client_obd_list_lock(&cli->cl_loi_list_lock);
866                 cli->cl_avail_grant += oa->o_grant;
867                 client_obd_list_unlock(&cli->cl_loi_list_lock);
868                 GOTO(out, rc);
869         }
870         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
871                                 lustre_swab_ost_body);
872         osc_update_grant(cli, body);
873 out:
874         OBD_FREE_PTR(oa);
875         return rc;        
876 }
877
878 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
879 {
880         client_obd_list_lock(&cli->cl_loi_list_lock);
881         oa->o_grant = cli->cl_avail_grant / 4;
882         cli->cl_avail_grant -= oa->o_grant; 
883         client_obd_list_unlock(&cli->cl_loi_list_lock);
884         oa->o_flags |= OBD_FL_SHRINK_GRANT;
885         osc_update_next_shrink(cli);
886 }
887
888 static int osc_shrink_grant(struct client_obd *cli)
889 {
890         int    rc = 0;
891         struct ost_body     *body;
892         ENTRY;
893
894         OBD_ALLOC_PTR(body);
895         if (!body)
896                 RETURN(-ENOMEM);
897
898         osc_announce_cached(cli, &body->oa, 0);
899         osc_shrink_grant_local(cli, &body->oa);
900         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
901                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
902                                 sizeof(*body), body, NULL);
903         if (rc) {
904                 client_obd_list_lock(&cli->cl_loi_list_lock);
905                 cli->cl_avail_grant += body->oa.o_grant;
906                 client_obd_list_unlock(&cli->cl_loi_list_lock);
907         }
908         if (body)
909                OBD_FREE_PTR(body);
910         RETURN(rc);
911 }
912
913 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
914 static int osc_should_shrink_grant(struct client_obd *client)
915 {
916         cfs_time_t time = cfs_time_current();
917         cfs_time_t next_shrink = client->cl_next_shrink_grant;
918         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
919                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
920                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
921                         return 1;
922                 else
923                         osc_update_next_shrink(client);
924         }
925         return 0;
926 }
927
928 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
929 {
930         struct client_obd *client;
931
932         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
933                 if (osc_should_shrink_grant(client))
934                         osc_shrink_grant(client);
935         }
936         return 0;
937 }
938
939 static int osc_add_shrink_grant(struct client_obd *client)
940 {
941         int rc;
942
943         rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
944                                          TIMEOUT_GRANT,
945                                          osc_grant_shrink_grant_cb, NULL,
946                                          &client->cl_grant_shrink_list);
947         if (rc) {
948                 CERROR("add grant client %s error %d\n", 
949                         client->cl_import->imp_obd->obd_name, rc);
950                 return rc;
951         }
952         CDEBUG(D_CACHE, "add grant client %s \n", 
953                client->cl_import->imp_obd->obd_name);
954         osc_update_next_shrink(client);
955         return 0; 
956 }
957
958 static int osc_del_shrink_grant(struct client_obd *client)
959 {
960         CDEBUG(D_CACHE, "del grant client %s \n", 
961                client->cl_import->imp_obd->obd_name);
962         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
963 }
964
965 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
966 {
967         client_obd_list_lock(&cli->cl_loi_list_lock);
968         cli->cl_avail_grant = ocd->ocd_grant;
969         client_obd_list_unlock(&cli->cl_loi_list_lock);
970
971         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
972             list_empty(&cli->cl_grant_shrink_list))
973                 osc_add_shrink_grant(cli);
974
975         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
976                cli->cl_avail_grant, cli->cl_lost_grant);
977         LASSERT(cli->cl_avail_grant >= 0);
978 }
979
980 /* We assume that the reason this OSC got a short read is because it read
981  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
982  * via the LOV, and it _knows_ it's reading inside the file, it's just that
983  * this stripe never got written at or beyond this stripe offset yet. */
984 static void handle_short_read(int nob_read, obd_count page_count,
985                               struct brw_page **pga)
986 {
987         char *ptr;
988         int i = 0;
989
990         /* skip bytes read OK */
991         while (nob_read > 0) {
992                 LASSERT (page_count > 0);
993
994                 if (pga[i]->count > nob_read) {
995                         /* EOF inside this page */
996                         ptr = cfs_kmap(pga[i]->pg) +
997                                 (pga[i]->off & ~CFS_PAGE_MASK);
998                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
999                         cfs_kunmap(pga[i]->pg);
1000                         page_count--;
1001                         i++;
1002                         break;
1003                 }
1004
1005                 nob_read -= pga[i]->count;
1006                 page_count--;
1007                 i++;
1008         }
1009
1010         /* zero remaining pages */
1011         while (page_count-- > 0) {
1012                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1013                 memset(ptr, 0, pga[i]->count);
1014                 cfs_kunmap(pga[i]->pg);
1015                 i++;
1016         }
1017 }
1018
1019 static int check_write_rcs(struct ptlrpc_request *req,
1020                            int requested_nob, int niocount,
1021                            obd_count page_count, struct brw_page **pga)
1022 {
1023         int    *remote_rcs, i;
1024
1025         /* return error if any niobuf was in error */
1026         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1027                                         sizeof(*remote_rcs) * niocount, NULL);
1028         if (remote_rcs == NULL) {
1029                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1030                 return(-EPROTO);
1031         }
1032         if (lustre_rep_need_swab(req))
1033                 for (i = 0; i < niocount; i++)
1034                         __swab32s(&remote_rcs[i]);
1035
1036         for (i = 0; i < niocount; i++) {
1037                 if (remote_rcs[i] < 0)
1038                         return(remote_rcs[i]);
1039
1040                 if (remote_rcs[i] != 0) {
1041                         CERROR("rc[%d] invalid (%d) req %p\n",
1042                                 i, remote_rcs[i], req);
1043                         return(-EPROTO);
1044                 }
1045         }
1046
1047         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1048                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1049                        req->rq_bulk->bd_nob_transferred, requested_nob);
1050                 return(-EPROTO);
1051         }
1052
1053         return (0);
1054 }
1055
1056 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1057 {
1058         if (p1->flag != p2->flag) {
1059                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1060
1061                 /* warn if we try to combine flags that we don't know to be
1062                  * safe to combine */
1063                 if ((p1->flag & mask) != (p2->flag & mask))
1064                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1065                                "same brw?\n", p1->flag, p2->flag);
1066                 return 0;
1067         }
1068
1069         return (p1->off + p1->count == p2->off);
1070 }
1071
1072 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1073                                    struct brw_page **pga, int opc,
1074                                    cksum_type_t cksum_type)
1075 {
1076         __u32 cksum;
1077         int i = 0;
1078
1079         LASSERT (pg_count > 0);
1080         cksum = init_checksum(cksum_type);
1081         while (nob > 0 && pg_count > 0) {
1082                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1083                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1084                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1085
1086                 /* corrupt the data before we compute the checksum, to
1087                  * simulate an OST->client data error */
1088                 if (i == 0 && opc == OST_READ &&
1089                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1090                         memcpy(ptr + off, "bad1", min(4, nob));
1091                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1092                 cfs_kunmap(pga[i]->pg);
1093                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1094                                off, cksum);
1095
1096                 nob -= pga[i]->count;
1097                 pg_count--;
1098                 i++;
1099         }
1100         /* For sending we only compute the wrong checksum instead
1101          * of corrupting the data so it is still correct on a redo */
1102         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1103                 cksum++;
1104
1105         return cksum;
1106 }
1107
1108 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1109                                 struct lov_stripe_md *lsm, obd_count page_count,
1110                                 struct brw_page **pga,
1111                                 struct ptlrpc_request **reqp)
1112 {
1113         struct ptlrpc_request   *req;
1114         struct ptlrpc_bulk_desc *desc;
1115         struct ost_body         *body;
1116         struct obd_ioobj        *ioobj;
1117         struct niobuf_remote    *niobuf;
1118         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1119         int niocount, i, requested_nob, opc, rc;
1120         struct ptlrpc_request_pool *pool;
1121         struct osc_brw_async_args *aa;
1122         struct brw_page *pg_prev;
1123
1124         ENTRY;
1125         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1126         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1127
1128         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1129         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1130
1131         for (niocount = i = 1; i < page_count; i++) {
1132                 if (!can_merge_pages(pga[i - 1], pga[i]))
1133                         niocount++;
1134         }
1135
1136         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1137         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1138
1139         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1140                                    NULL, pool);
1141         if (req == NULL)
1142                 RETURN (-ENOMEM);
1143
1144         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1145         ptlrpc_at_set_req_timeout(req);
1146
1147         if (opc == OST_WRITE)
1148                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1149                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1150         else
1151                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1152                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1153         if (desc == NULL)
1154                 GOTO(out, rc = -ENOMEM);
1155         /* NB request now owns desc and will free it when it gets freed */
1156
1157         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1158         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1159         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1160                                 niocount * sizeof(*niobuf));
1161
1162         memcpy(&body->oa, oa, sizeof(*oa));
1163
1164         obdo_to_ioobj(oa, ioobj);
1165         ioobj->ioo_bufcnt = niocount;
1166
1167         LASSERT (page_count > 0);
1168         pg_prev = pga[0];
1169         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1170                 struct brw_page *pg = pga[i];
1171
1172                 LASSERT(pg->count > 0);
1173                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1174                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1175                          pg->off, pg->count);
1176 #ifdef __linux__
1177                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1178                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1179                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1180                          i, page_count,
1181                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1182                          pg_prev->pg, page_private(pg_prev->pg),
1183                          pg_prev->pg->index, pg_prev->off);
1184 #else
1185                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1186                          "i %d p_c %u\n", i, page_count);
1187 #endif
1188                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1189                         (pg->flag & OBD_BRW_SRVLOCK));
1190
1191                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1192                                       pg->count);
1193                 requested_nob += pg->count;
1194
1195                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1196                         niobuf--;
1197                         niobuf->len += pg->count;
1198                 } else {
1199                         niobuf->offset = pg->off;
1200                         niobuf->len    = pg->count;
1201                         niobuf->flags  = pg->flag;
1202                 }
1203                 pg_prev = pg;
1204         }
1205
1206         LASSERTF((void *)(niobuf - niocount) ==
1207                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1208                                niocount * sizeof(*niobuf)),
1209                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1210                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1211                 (void *)(niobuf - niocount));
1212
1213         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1214         if (osc_should_shrink_grant(cli))
1215                 osc_shrink_grant_local(cli, &body->oa); 
1216
1217         /* size[REQ_REC_OFF] still sizeof (*body) */
1218         if (opc == OST_WRITE) {
1219                 if (cli->cl_checksum) {
1220                         /* store cl_cksum_type in a local variable since
1221                          * it can be changed via lprocfs */
1222                         cksum_type_t cksum_type = cli->cl_cksum_type;
1223
1224                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1225                                 oa->o_flags = body->oa.o_flags = 0;
1226                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1227                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1228                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1229                                                              page_count, pga,
1230                                                              OST_WRITE,
1231                                                              cksum_type);
1232                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1233                                body->oa.o_cksum);
1234                         /* save this in 'oa', too, for later checking */
1235                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1236                         oa->o_flags |= cksum_type_pack(cksum_type);
1237                 } else {
1238                         /* clear out the checksum flag, in case this is a
1239                          * resend but cl_checksum is no longer set. b=11238 */
1240                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1241                 }
1242                 oa->o_cksum = body->oa.o_cksum;
1243                 /* 1 RC per niobuf */
1244                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1245                 ptlrpc_req_set_repsize(req, 3, size);
1246         } else {
1247                 if (cli->cl_checksum) {
1248                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1249                                 body->oa.o_flags = 0;
1250                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1251                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1252                 }
1253                 /* 1 RC for the whole I/O */
1254                 ptlrpc_req_set_repsize(req, 2, size);
1255         }
1256
1257         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1258         aa = ptlrpc_req_async_args(req);
1259         aa->aa_oa = oa;
1260         aa->aa_requested_nob = requested_nob;
1261         aa->aa_nio_count = niocount;
1262         aa->aa_page_count = page_count;
1263         aa->aa_resends = 0;
1264         aa->aa_ppga = pga;
1265         aa->aa_cli = cli;
1266         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1267
1268         *reqp = req;
1269         RETURN (0);
1270
1271  out:
1272         ptlrpc_req_finished (req);
1273         RETURN (rc);
1274 }
1275
1276 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1277                                 __u32 client_cksum, __u32 server_cksum, int nob,
1278                                 obd_count page_count, struct brw_page **pga,
1279                                 cksum_type_t client_cksum_type)
1280 {
1281         __u32 new_cksum;
1282         char *msg;
1283         cksum_type_t cksum_type;
1284
1285         if (server_cksum == client_cksum) {
1286                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1287                 return 0;
1288         }
1289
1290         if (oa->o_valid & OBD_MD_FLFLAGS)
1291                 cksum_type = cksum_type_unpack(oa->o_flags);
1292         else
1293                 cksum_type = OBD_CKSUM_CRC32;
1294
1295         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1296                                       cksum_type);
1297
1298         if (cksum_type != client_cksum_type)
1299                 msg = "the server did not use the checksum type specified in "
1300                       "the original request - likely a protocol problem";
1301         else if (new_cksum == server_cksum)
1302                 msg = "changed on the client after we checksummed it - "
1303                       "likely false positive due to mmap IO (bug 11742)";
1304         else if (new_cksum == client_cksum)
1305                 msg = "changed in transit before arrival at OST";
1306         else
1307                 msg = "changed in transit AND doesn't match the original - "
1308                       "likely false positive due to mmap IO (bug 11742)";
1309
1310         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1311                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1312                            "["LPU64"-"LPU64"]\n",
1313                            msg, libcfs_nid2str(peer->nid),
1314                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1315                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1316                                                         (__u64)0,
1317                            oa->o_id,
1318                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1319                            pga[0]->off,
1320                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1321         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1322                "client csum now %x\n", client_cksum, client_cksum_type,
1323                server_cksum, cksum_type, new_cksum);
1324
1325         return 1;
1326 }
1327
1328 /* Note rc enters this function as number of bytes transferred */
1329 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1330 {
1331         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1332         const lnet_process_id_t *peer =
1333                         &req->rq_import->imp_connection->c_peer;
1334         struct client_obd *cli = aa->aa_cli;
1335         struct ost_body *body;
1336         __u32 client_cksum = 0;
1337         ENTRY;
1338
1339         if (rc < 0 && rc != -EDQUOT)
1340                 RETURN(rc);
1341
1342         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1343         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1344                                   lustre_swab_ost_body);
1345         if (body == NULL) {
1346                 CERROR ("Can't unpack body\n");
1347                 RETURN(-EPROTO);
1348         }
1349
1350         /* set/clear over quota flag for a uid/gid */
1351         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1352             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1353                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1354                              body->oa.o_gid, body->oa.o_valid,
1355                              body->oa.o_flags);
1356
1357         if (rc < 0)
1358                 RETURN(rc);
1359
1360         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1361                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1362
1363         osc_update_grant(cli, body);
1364
1365         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1366                 if (rc > 0) {
1367                         CERROR ("Unexpected +ve rc %d\n", rc);
1368                         RETURN(-EPROTO);
1369                 }
1370                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1371
1372                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1373                     check_write_checksum(&body->oa, peer, client_cksum,
1374                                          body->oa.o_cksum, aa->aa_requested_nob,
1375                                          aa->aa_page_count, aa->aa_ppga,
1376                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1377                         RETURN(-EAGAIN);
1378
1379                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1380                                      aa->aa_page_count, aa->aa_ppga);
1381                 GOTO(out, rc);
1382         }
1383
1384         /* The rest of this function executes only for OST_READs */
1385         if (rc > aa->aa_requested_nob) {
1386                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1387                        aa->aa_requested_nob);
1388                 RETURN(-EPROTO);
1389         }
1390
1391         if (rc != req->rq_bulk->bd_nob_transferred) {
1392                 CERROR ("Unexpected rc %d (%d transferred)\n",
1393                         rc, req->rq_bulk->bd_nob_transferred);
1394                 return (-EPROTO);
1395         }
1396
1397         if (rc < aa->aa_requested_nob)
1398                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1399
1400         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1401                 static int cksum_counter;
1402                 __u32      server_cksum = body->oa.o_cksum;
1403                 char      *via;
1404                 char      *router;
1405                 cksum_type_t cksum_type;
1406
1407                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1408                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1409                 else
1410                         cksum_type = OBD_CKSUM_CRC32;
1411                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1412                                                  aa->aa_ppga, OST_READ,
1413                                                  cksum_type);
1414
1415                 if (peer->nid == req->rq_bulk->bd_sender) {
1416                         via = router = "";
1417                 } else {
1418                         via = " via ";
1419                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1420                 }
1421
1422                 if (server_cksum == ~0 && rc > 0) {
1423                         CERROR("Protocol error: server %s set the 'checksum' "
1424                                "bit, but didn't send a checksum.  Not fatal, "
1425                                "but please notify on http://bugzilla.lustre.org/\n",
1426                                libcfs_nid2str(peer->nid));
1427                 } else if (server_cksum != client_cksum) {
1428                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1429                                            "%s%s%s inum "LPU64"/"LPU64" object "
1430                                            LPU64"/"LPU64" extent "
1431                                            "["LPU64"-"LPU64"]\n",
1432                                            req->rq_import->imp_obd->obd_name,
1433                                            libcfs_nid2str(peer->nid),
1434                                            via, router,
1435                                            body->oa.o_valid & OBD_MD_FLFID ?
1436                                                 body->oa.o_fid : (__u64)0,
1437                                            body->oa.o_valid & OBD_MD_FLFID ?
1438                                                 body->oa.o_generation :(__u64)0,
1439                                            body->oa.o_id,
1440                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1441                                                 body->oa.o_gr : (__u64)0,
1442                                            aa->aa_ppga[0]->off,
1443                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1444                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1445                                                                         1);
1446                         CERROR("client %x, server %x, cksum_type %x\n",
1447                                client_cksum, server_cksum, cksum_type);
1448                         cksum_counter = 0;
1449                         aa->aa_oa->o_cksum = client_cksum;
1450                         rc = -EAGAIN;
1451                 } else {
1452                         cksum_counter++;
1453                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1454                         rc = 0;
1455                 }
1456         } else if (unlikely(client_cksum)) {
1457                 static int cksum_missed;
1458
1459                 cksum_missed++;
1460                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1461                         CERROR("Checksum %u requested from %s but not sent\n",
1462                                cksum_missed, libcfs_nid2str(peer->nid));
1463         } else {
1464                 rc = 0;
1465         }
1466 out:
1467         if (rc >= 0)
1468                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1469
1470         RETURN(rc);
1471 }
1472
1473 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1474                             struct lov_stripe_md *lsm,
1475                             obd_count page_count, struct brw_page **pga)
1476 {
1477         struct ptlrpc_request *request;
1478         int                    rc;
1479         cfs_waitq_t            waitq;
1480         int                    resends = 0;
1481         struct l_wait_info     lwi;
1482
1483         ENTRY;
1484         init_waitqueue_head(&waitq);
1485
1486 restart_bulk:
1487         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1488                                   page_count, pga, &request);
1489         if (rc != 0)
1490                 return (rc);
1491
1492         rc = ptlrpc_queue_wait(request);
1493
1494         if (rc == -ETIMEDOUT && request->rq_resend) {
1495                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1496                 ptlrpc_req_finished(request);
1497                 goto restart_bulk;
1498         }
1499
1500         rc = osc_brw_fini_request(request, rc);
1501
1502         ptlrpc_req_finished(request);
1503         if (osc_recoverable_error(rc)) {
1504                 resends++;
1505                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1506                         CERROR("too many resend retries, returning error\n");
1507                         RETURN(-EIO);
1508                 }
1509
1510                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1511                 l_wait_event(waitq, 0, &lwi);
1512
1513                 goto restart_bulk;
1514         }
1515         RETURN(rc);
1516 }
1517
1518 int osc_brw_redo_request(struct ptlrpc_request *request,
1519                          struct osc_brw_async_args *aa)
1520 {
1521         struct ptlrpc_request *new_req;
1522         struct ptlrpc_request_set *set = request->rq_set;
1523         struct osc_brw_async_args *new_aa;
1524         struct osc_async_page *oap;
1525         int rc = 0;
1526         ENTRY;
1527
1528         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1529                 CERROR("too many resend retries, returning error\n");
1530                 RETURN(-EIO);
1531         }
1532
1533         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1534
1535         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1536                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1537                                   aa->aa_cli, aa->aa_oa,
1538                                   NULL /* lsm unused by osc currently */,
1539                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1540         if (rc)
1541                 RETURN(rc);
1542
1543         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1544
1545         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1546                 if (oap->oap_request != NULL) {
1547                         LASSERTF(request == oap->oap_request,
1548                                  "request %p != oap_request %p\n",
1549                                  request, oap->oap_request);
1550                         if (oap->oap_interrupted) {
1551                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1552                                 ptlrpc_req_finished(new_req);
1553                                 RETURN(-EINTR);
1554                         }
1555                 }
1556         }
1557         /* New request takes over pga and oaps from old request.
1558          * Note that copying a list_head doesn't work, need to move it... */
1559         aa->aa_resends++;
1560         new_req->rq_interpret_reply = request->rq_interpret_reply;
1561         new_req->rq_async_args = request->rq_async_args;
1562         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1563
1564         new_aa = ptlrpc_req_async_args(new_req);
1565
1566         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1567         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1568         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1569
1570         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1571                 if (oap->oap_request) {
1572                         ptlrpc_req_finished(oap->oap_request);
1573                         oap->oap_request = ptlrpc_request_addref(new_req);
1574                 }
1575         }
1576
1577         /* use ptlrpc_set_add_req is safe because interpret functions work
1578          * in check_set context. only one way exist with access to request
1579          * from different thread got -EINTR - this way protected with
1580          * cl_loi_list_lock */
1581         ptlrpc_set_add_req(set, new_req);
1582
1583         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1584
1585         DEBUG_REQ(D_INFO, new_req, "new request");
1586         RETURN(0);
1587 }
1588
1589 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1590                           struct lov_stripe_md *lsm, obd_count page_count,
1591                           struct brw_page **pga, struct ptlrpc_request_set *set)
1592 {
1593         struct ptlrpc_request     *request;
1594         struct client_obd         *cli = &exp->exp_obd->u.cli;
1595         int                        rc, i;
1596         struct osc_brw_async_args *aa;
1597         ENTRY;
1598
1599         /* Consume write credits even if doing a sync write -
1600          * otherwise we may run out of space on OST due to grant. */
1601         if (cmd == OBD_BRW_WRITE) {
1602                 client_obd_list_lock(&cli->cl_loi_list_lock);
1603                 for (i = 0; i < page_count; i++) {
1604                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1605                                 osc_consume_write_grant(cli, pga[i]);
1606                 }
1607                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1608         }
1609
1610         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1611                                   page_count, pga, &request);
1612
1613         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1614         aa = ptlrpc_req_async_args(request);
1615         if (cmd == OBD_BRW_READ) {
1616                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1617                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1618         } else {
1619                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1620                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1621                                  cli->cl_w_in_flight);
1622         }
1623         ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1624
1625         LASSERT(list_empty(&aa->aa_oaps));
1626
1627         if (rc == 0) {
1628                 request->rq_interpret_reply = brw_interpret;
1629                 ptlrpc_set_add_req(set, request);
1630                 client_obd_list_lock(&cli->cl_loi_list_lock);
1631                 if (cmd == OBD_BRW_READ)
1632                         cli->cl_r_in_flight++;
1633                 else
1634                         cli->cl_w_in_flight++;
1635                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1636                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1637         } else if (cmd == OBD_BRW_WRITE) {
1638                 client_obd_list_lock(&cli->cl_loi_list_lock);
1639                 for (i = 0; i < page_count; i++)
1640                         osc_release_write_grant(cli, pga[i], 0);
1641                 osc_wake_cache_waiters(cli);
1642                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1643         }
1644
1645         RETURN (rc);
1646 }
1647
1648 /*
1649  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1650  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1651  * fine for our small page arrays and doesn't require allocation.  its an
1652  * insertion sort that swaps elements that are strides apart, shrinking the
1653  * stride down until its '1' and the array is sorted.
1654  */
1655 static void sort_brw_pages(struct brw_page **array, int num)
1656 {
1657         int stride, i, j;
1658         struct brw_page *tmp;
1659
1660         if (num == 1)
1661                 return;
1662         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1663                 ;
1664
1665         do {
1666                 stride /= 3;
1667                 for (i = stride ; i < num ; i++) {
1668                         tmp = array[i];
1669                         j = i;
1670                         while (j >= stride && array[j-stride]->off > tmp->off) {
1671                                 array[j] = array[j - stride];
1672                                 j -= stride;
1673                         }
1674                         array[j] = tmp;
1675                 }
1676         } while (stride > 1);
1677 }
1678
1679 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1680 {
1681         int count = 1;
1682         int offset;
1683         int i = 0;
1684
1685         LASSERT (pages > 0);
1686         offset = pg[i]->off & (~CFS_PAGE_MASK);
1687
1688         for (;;) {
1689                 pages--;
1690                 if (pages == 0)         /* that's all */
1691                         return count;
1692
1693                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1694                         return count;   /* doesn't end on page boundary */
1695
1696                 i++;
1697                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1698                 if (offset != 0)        /* doesn't start on page boundary */
1699                         return count;
1700
1701                 count++;
1702         }
1703 }
1704
1705 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1706 {
1707         struct brw_page **ppga;
1708         int i;
1709
1710         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1711         if (ppga == NULL)
1712                 return NULL;
1713
1714         for (i = 0; i < count; i++)
1715                 ppga[i] = pga + i;
1716         return ppga;
1717 }
1718
1719 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1720 {
1721         LASSERT(ppga != NULL);
1722         OBD_FREE(ppga, sizeof(*ppga) * count);
1723 }
1724
1725 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1726                    obd_count page_count, struct brw_page *pga,
1727                    struct obd_trans_info *oti)
1728 {
1729         struct obdo *saved_oa = NULL;
1730         struct brw_page **ppga, **orig;
1731         struct obd_import *imp = class_exp2cliimp(exp);
1732         struct client_obd *cli = &imp->imp_obd->u.cli;
1733         int rc, page_count_orig;
1734         ENTRY;
1735
1736         if (cmd & OBD_BRW_CHECK) {
1737                 /* The caller just wants to know if there's a chance that this
1738                  * I/O can succeed */
1739
1740                 if (imp == NULL || imp->imp_invalid)
1741                         RETURN(-EIO);
1742                 RETURN(0);
1743         }
1744
1745         /* test_brw with a failed create can trip this, maybe others. */
1746         LASSERT(cli->cl_max_pages_per_rpc);
1747
1748         rc = 0;
1749
1750         orig = ppga = osc_build_ppga(pga, page_count);
1751         if (ppga == NULL)
1752                 RETURN(-ENOMEM);
1753         page_count_orig = page_count;
1754
1755         sort_brw_pages(ppga, page_count);
1756         while (page_count) {
1757                 obd_count pages_per_brw;
1758
1759                 if (page_count > cli->cl_max_pages_per_rpc)
1760                         pages_per_brw = cli->cl_max_pages_per_rpc;
1761                 else
1762                         pages_per_brw = page_count;
1763
1764                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1765
1766                 if (saved_oa != NULL) {
1767                         /* restore previously saved oa */
1768                         *oinfo->oi_oa = *saved_oa;
1769                 } else if (page_count > pages_per_brw) {
1770                         /* save a copy of oa (brw will clobber it) */
1771                         OBDO_ALLOC(saved_oa);
1772                         if (saved_oa == NULL)
1773                                 GOTO(out, rc = -ENOMEM);
1774                         *saved_oa = *oinfo->oi_oa;
1775                 }
1776
1777                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1778                                       pages_per_brw, ppga);
1779
1780                 if (rc != 0)
1781                         break;
1782
1783                 page_count -= pages_per_brw;
1784                 ppga += pages_per_brw;
1785         }
1786
1787 out:
1788         osc_release_ppga(orig, page_count_orig);
1789
1790         if (saved_oa != NULL)
1791                 OBDO_FREE(saved_oa);
1792
1793         RETURN(rc);
1794 }
1795
1796 static int osc_brw_async(int cmd, struct obd_export *exp,
1797                          struct obd_info *oinfo, obd_count page_count,
1798                          struct brw_page *pga, struct obd_trans_info *oti,
1799                          struct ptlrpc_request_set *set)
1800 {
1801         struct brw_page **ppga, **orig;
1802         int page_count_orig;
1803         int rc = 0;
1804         ENTRY;
1805
1806         if (cmd & OBD_BRW_CHECK) {
1807                 /* The caller just wants to know if there's a chance that this
1808                  * I/O can succeed */
1809                 struct obd_import *imp = class_exp2cliimp(exp);
1810
1811                 if (imp == NULL || imp->imp_invalid)
1812                         RETURN(-EIO);
1813                 RETURN(0);
1814         }
1815
1816         orig = ppga = osc_build_ppga(pga, page_count);
1817         if (ppga == NULL)
1818                 RETURN(-ENOMEM);
1819         page_count_orig = page_count;
1820
1821         sort_brw_pages(ppga, page_count);
1822         while (page_count) {
1823                 struct brw_page **copy;
1824                 obd_count pages_per_brw;
1825
1826                 pages_per_brw = min_t(obd_count, page_count,
1827                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1828
1829                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1830
1831                 /* use ppga only if single RPC is going to fly */
1832                 if (pages_per_brw != page_count_orig || ppga != orig) {
1833                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1834                         if (copy == NULL)
1835                                 GOTO(out, rc = -ENOMEM);
1836                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1837                 } else
1838                         copy = ppga;
1839
1840                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1841                                     pages_per_brw, copy, set);
1842
1843                 if (rc != 0) {
1844                         if (copy != ppga)
1845                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1846                         break;
1847                 }
1848
1849                 if (copy == orig) {
1850                         /* we passed it to async_internal() which is
1851                          * now responsible for releasing memory */
1852                         orig = NULL;
1853                 }
1854
1855                 page_count -= pages_per_brw;
1856                 ppga += pages_per_brw;
1857         }
1858 out:
1859         if (orig)
1860                 osc_release_ppga(orig, page_count_orig);
1861         RETURN(rc);
1862 }
1863
1864 static void osc_check_rpcs(struct client_obd *cli);
1865
1866 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1867  * the dirty accounting.  Writeback completes or truncate happens before
1868  * writing starts.  Must be called with the loi lock held. */
1869 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1870                            int sent)
1871 {
1872         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1873 }
1874
1875 /* This maintains the lists of pending pages to read/write for a given object
1876  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1877  * to quickly find objects that are ready to send an RPC. */
1878 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1879                          int cmd)
1880 {
1881         int optimal;
1882         ENTRY;
1883
1884         if (lop->lop_num_pending == 0)
1885                 RETURN(0);
1886
1887         /* if we have an invalid import we want to drain the queued pages
1888          * by forcing them through rpcs that immediately fail and complete
1889          * the pages.  recovery relies on this to empty the queued pages
1890          * before canceling the locks and evicting down the llite pages */
1891         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1892                 RETURN(1);
1893
1894         /* stream rpcs in queue order as long as as there is an urgent page
1895          * queued.  this is our cheap solution for good batching in the case
1896          * where writepage marks some random page in the middle of the file
1897          * as urgent because of, say, memory pressure */
1898         if (!list_empty(&lop->lop_urgent)) {
1899                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1900                 RETURN(1);
1901         }
1902
1903         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1904         optimal = cli->cl_max_pages_per_rpc;
1905         if (cmd & OBD_BRW_WRITE) {
1906                 /* trigger a write rpc stream as long as there are dirtiers
1907                  * waiting for space.  as they're waiting, they're not going to
1908                  * create more pages to coallesce with what's waiting.. */
1909                 if (!list_empty(&cli->cl_cache_waiters)) {
1910                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1911                         RETURN(1);
1912                 }
1913
1914                 /* +16 to avoid triggering rpcs that would want to include pages
1915                  * that are being queued but which can't be made ready until
1916                  * the queuer finishes with the page. this is a wart for
1917                  * llite::commit_write() */
1918                 optimal += 16;
1919         }
1920         if (lop->lop_num_pending >= optimal)
1921                 RETURN(1);
1922
1923         RETURN(0);
1924 }
1925
1926 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1927 {
1928         struct osc_async_page *oap;
1929         ENTRY;
1930
1931         if (list_empty(&lop->lop_urgent))
1932                 RETURN(0);
1933
1934         oap = list_entry(lop->lop_urgent.next,
1935                          struct osc_async_page, oap_urgent_item);
1936
1937         if (oap->oap_async_flags & ASYNC_HP) {
1938                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1939                 RETURN(1);
1940         }
1941
1942         RETURN(0);
1943 }
1944
1945 static void on_list(struct list_head *item, struct list_head *list,
1946                     int should_be_on)
1947 {
1948         if (list_empty(item) && should_be_on)
1949                 list_add_tail(item, list);
1950         else if (!list_empty(item) && !should_be_on)
1951                 list_del_init(item);
1952 }
1953
1954 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1955  * can find pages to build into rpcs quickly */
1956 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1957 {
1958         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1959             lop_makes_hprpc(&loi->loi_read_lop)) {
1960                 /* HP rpc */
1961                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1962                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1963         } else {
1964                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1965                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1966                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1967                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1968         }
1969
1970         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1971                 loi->loi_write_lop.lop_num_pending);
1972
1973         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1974                 loi->loi_read_lop.lop_num_pending);
1975 }
1976
1977 static void lop_update_pending(struct client_obd *cli,
1978                                struct loi_oap_pages *lop, int cmd, int delta)
1979 {
1980         lop->lop_num_pending += delta;
1981         if (cmd & OBD_BRW_WRITE)
1982                 cli->cl_pending_w_pages += delta;
1983         else
1984                 cli->cl_pending_r_pages += delta;
1985 }
1986
1987 /* this is called when a sync waiter receives an interruption.  Its job is to
1988  * get the caller woken as soon as possible.  If its page hasn't been put in an
1989  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1990  * desiring interruption which will forcefully complete the rpc once the rpc
1991  * has timed out */
1992 static void osc_occ_interrupted(struct oig_callback_context *occ)
1993 {
1994         struct osc_async_page *oap;
1995         struct loi_oap_pages *lop;
1996         struct lov_oinfo *loi;
1997         ENTRY;
1998
1999         /* XXX member_of() */
2000         oap = list_entry(occ, struct osc_async_page, oap_occ);
2001
2002         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2003
2004         oap->oap_interrupted = 1;
2005
2006         /* ok, it's been put in an rpc. only one oap gets a request reference */
2007         if (oap->oap_request != NULL) {
2008                 ptlrpc_mark_interrupted(oap->oap_request);
2009                 ptlrpcd_wake(oap->oap_request);
2010                 GOTO(unlock, 0);
2011         }
2012
2013         /* we don't get interruption callbacks until osc_trigger_group_io()
2014          * has been called and put the sync oaps in the pending/urgent lists.*/
2015         if (!list_empty(&oap->oap_pending_item)) {
2016                 list_del_init(&oap->oap_pending_item);
2017                 list_del_init(&oap->oap_urgent_item);
2018
2019                 loi = oap->oap_loi;
2020                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2021                         &loi->loi_write_lop : &loi->loi_read_lop;
2022                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2023                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2024
2025                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2026                 oap->oap_oig = NULL;
2027         }
2028
2029 unlock:
2030         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2031 }
2032
2033 /* this is trying to propogate async writeback errors back up to the
2034  * application.  As an async write fails we record the error code for later if
2035  * the app does an fsync.  As long as errors persist we force future rpcs to be
2036  * sync so that the app can get a sync error and break the cycle of queueing
2037  * pages for which writeback will fail. */
2038 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2039                            int rc)
2040 {
2041         if (rc) {
2042                 if (!ar->ar_rc)
2043                         ar->ar_rc = rc;
2044
2045                 ar->ar_force_sync = 1;
2046                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2047                 return;
2048
2049         }
2050
2051         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2052                 ar->ar_force_sync = 0;
2053 }
2054
2055 static void osc_oap_to_pending(struct osc_async_page *oap)
2056 {
2057         struct loi_oap_pages *lop;
2058
2059         if (oap->oap_cmd & OBD_BRW_WRITE)
2060                 lop = &oap->oap_loi->loi_write_lop;
2061         else
2062                 lop = &oap->oap_loi->loi_read_lop;
2063
2064         if (oap->oap_async_flags & ASYNC_HP)
2065                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2066         else if (oap->oap_async_flags & ASYNC_URGENT)
2067                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2068         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2069         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2070 }
2071
2072 /* this must be called holding the loi list lock to give coverage to exit_cache,
2073  * async_flag maintenance, and oap_request */
2074 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2075                               struct osc_async_page *oap, int sent, int rc)
2076 {
2077         __u64 xid = 0;
2078
2079         ENTRY;
2080         if (oap->oap_request != NULL) {
2081                 xid = ptlrpc_req_xid(oap->oap_request);
2082                 ptlrpc_req_finished(oap->oap_request);
2083                 oap->oap_request = NULL;
2084         }
2085
2086         oap->oap_async_flags = 0;
2087         oap->oap_interrupted = 0;
2088
2089         if (oap->oap_cmd & OBD_BRW_WRITE) {
2090                 osc_process_ar(&cli->cl_ar, xid, rc);
2091                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2092         }
2093
2094         if (rc == 0 && oa != NULL) {
2095                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2096                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2097                 if (oa->o_valid & OBD_MD_FLMTIME)
2098                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2099                 if (oa->o_valid & OBD_MD_FLATIME)
2100                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2101                 if (oa->o_valid & OBD_MD_FLCTIME)
2102                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2103         }
2104
2105         if (oap->oap_oig) {
2106                 osc_exit_cache(cli, oap, sent);
2107                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2108                 oap->oap_oig = NULL;
2109                 EXIT;
2110                 return;
2111         }
2112
2113         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2114                                                 oap->oap_cmd, oa, rc);
2115
2116         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2117          * I/O on the page could start, but OSC calls it under lock
2118          * and thus we can add oap back to pending safely */
2119         if (rc)
2120                 /* upper layer wants to leave the page on pending queue */
2121                 osc_oap_to_pending(oap);
2122         else
2123                 osc_exit_cache(cli, oap, sent);
2124         EXIT;
2125 }
2126
2127 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2128 {
2129         struct osc_brw_async_args *aa = data;
2130         struct client_obd *cli;
2131         ENTRY;
2132
2133         rc = osc_brw_fini_request(request, rc);
2134         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2135
2136         if (osc_recoverable_error(rc)) {
2137                 rc = osc_brw_redo_request(request, aa);
2138                 if (rc == 0)
2139                         RETURN(0);
2140         }
2141
2142         cli = aa->aa_cli;
2143         client_obd_list_lock(&cli->cl_loi_list_lock);
2144         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2145          * is called so we know whether to go to sync BRWs or wait for more
2146          * RPCs to complete */
2147         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2148                 cli->cl_w_in_flight--;
2149         else
2150                 cli->cl_r_in_flight--;
2151
2152         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2153                 struct osc_async_page *oap, *tmp;
2154                 /* the caller may re-use the oap after the completion call so
2155                  * we need to clean it up a little */
2156                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2157                         list_del_init(&oap->oap_rpc_item);
2158                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2159                 }
2160                 OBDO_FREE(aa->aa_oa);
2161         } else { /* from async_internal() */
2162                 obd_count i;
2163                 for (i = 0; i < aa->aa_page_count; i++)
2164                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2165         }
2166         osc_wake_cache_waiters(cli);
2167         osc_check_rpcs(cli);
2168         client_obd_list_unlock(&cli->cl_loi_list_lock);
2169
2170         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2171
2172         RETURN(rc);
2173 }
2174
2175 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2176                                             struct list_head *rpc_list,
2177                                             int page_count, int cmd)
2178 {
2179         struct ptlrpc_request *req;
2180         struct brw_page **pga = NULL;
2181         struct osc_brw_async_args *aa;
2182         struct obdo *oa = NULL;
2183         struct obd_async_page_ops *ops = NULL;
2184         void *caller_data = NULL;
2185         struct osc_async_page *oap;
2186         struct ldlm_lock *lock = NULL;
2187         obd_valid valid;
2188         int i, rc;
2189
2190         ENTRY;
2191         LASSERT(!list_empty(rpc_list));
2192
2193         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2194         if (pga == NULL)
2195                 RETURN(ERR_PTR(-ENOMEM));
2196
2197         OBDO_ALLOC(oa);
2198         if (oa == NULL)
2199                 GOTO(out, req = ERR_PTR(-ENOMEM));
2200
2201         i = 0;
2202         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2203                 if (ops == NULL) {
2204                         ops = oap->oap_caller_ops;
2205                         caller_data = oap->oap_caller_data;
2206                         lock = oap->oap_ldlm_lock;
2207                 }
2208                 pga[i] = &oap->oap_brw_page;
2209                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2210                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2211                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2212                 i++;
2213         }
2214
2215         /* always get the data for the obdo for the rpc */
2216         LASSERT(ops != NULL);
2217         ops->ap_fill_obdo(caller_data, cmd, oa);
2218         if (lock) {
2219                 oa->o_handle = lock->l_remote_handle;
2220                 oa->o_valid |= OBD_MD_FLHANDLE;
2221         }
2222
2223         sort_brw_pages(pga, page_count);
2224         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
2225         if (rc != 0) {
2226                 CERROR("prep_req failed: %d\n", rc);
2227                 GOTO(out, req = ERR_PTR(rc));
2228         }
2229         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2230                                                  sizeof(struct ost_body)))->oa;
2231
2232         /* Need to update the timestamps after the request is built in case
2233          * we race with setattr (locally or in queue at OST).  If OST gets
2234          * later setattr before earlier BRW (as determined by the request xid),
2235          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2236          * way to do this in a single call.  bug 10150 */
2237         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2238                 /* in case of lockless read/write do not use inode's
2239                  * timestamps because concurrent stat might fill the
2240                  * inode with out-of-date times, send current
2241                  * instead */
2242                 if (cmd & OBD_BRW_WRITE) {
2243                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2244                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2245                         valid = OBD_MD_FLATIME;
2246                 } else {
2247                         oa->o_atime = LTIME_S(CURRENT_TIME);
2248                         oa->o_valid |= OBD_MD_FLATIME;
2249                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2250                 }
2251         } else {
2252                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2253         }
2254         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2255
2256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2257         aa = ptlrpc_req_async_args(req);
2258         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2259         list_splice(rpc_list, &aa->aa_oaps);
2260         CFS_INIT_LIST_HEAD(rpc_list);
2261
2262 out:
2263         if (IS_ERR(req)) {
2264                 if (oa)
2265                         OBDO_FREE(oa);
2266                 if (pga)
2267                         OBD_FREE(pga, sizeof(*pga) * page_count);
2268         }
2269         RETURN(req);
2270 }
2271
2272 /* the loi lock is held across this function but it's allowed to release
2273  * and reacquire it during its work */
2274 /**
2275  * prepare pages for ASYNC io and put pages in send queue.
2276  *
2277  * \param cli -
2278  * \param loi -
2279  * \param cmd - OBD_BRW_* macroses
2280  * \param lop - pending pages
2281  *
2282  * \return zero if pages successfully add to send queue.
2283  * \return not zere if error occurring.
2284  */
2285 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2286                             int cmd, struct loi_oap_pages *lop)
2287 {
2288         struct ptlrpc_request *req;
2289         obd_count page_count = 0;
2290         struct osc_async_page *oap = NULL, *tmp;
2291         struct osc_brw_async_args *aa;
2292         struct obd_async_page_ops *ops;
2293         CFS_LIST_HEAD(rpc_list);
2294         unsigned int ending_offset;
2295         unsigned  starting_offset = 0;
2296         int srvlock = 0;
2297         ENTRY;
2298
2299         /* If there are HP OAPs we need to handle at least 1 of them,
2300          * move it the beginning of the pending list for that. */
2301         if (!list_empty(&lop->lop_urgent)) {
2302                 oap = list_entry(lop->lop_urgent.next,
2303                                  struct osc_async_page, oap_urgent_item);
2304                 if (oap->oap_async_flags & ASYNC_HP)
2305                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2306         }
2307
2308         /* first we find the pages we're allowed to work with */
2309         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2310                 ops = oap->oap_caller_ops;
2311
2312                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2313                          "magic 0x%x\n", oap, oap->oap_magic);
2314
2315                 if (page_count != 0 &&
2316                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2317                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2318                                " oap %p, page %p, srvlock %u\n",
2319                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2320                         break;
2321                 }
2322                 /* in llite being 'ready' equates to the page being locked
2323                  * until completion unlocks it.  commit_write submits a page
2324                  * as not ready because its unlock will happen unconditionally
2325                  * as the call returns.  if we race with commit_write giving
2326                  * us that page we dont' want to create a hole in the page
2327                  * stream, so we stop and leave the rpc to be fired by
2328                  * another dirtier or kupdated interval (the not ready page
2329                  * will still be on the dirty list).  we could call in
2330                  * at the end of ll_file_write to process the queue again. */
2331                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2332                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2333                         if (rc < 0)
2334                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2335                                                 "instead of ready\n", oap,
2336                                                 oap->oap_page, rc);
2337                         switch (rc) {
2338                         case -EAGAIN:
2339                                 /* llite is telling us that the page is still
2340                                  * in commit_write and that we should try
2341                                  * and put it in an rpc again later.  we
2342                                  * break out of the loop so we don't create
2343                                  * a hole in the sequence of pages in the rpc
2344                                  * stream.*/
2345                                 oap = NULL;
2346                                 break;
2347                         case -EINTR:
2348                                 /* the io isn't needed.. tell the checks
2349                                  * below to complete the rpc with EINTR */
2350                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2351                                 oap->oap_count = -EINTR;
2352                                 break;
2353                         case 0:
2354                                 oap->oap_async_flags |= ASYNC_READY;
2355                                 break;
2356                         default:
2357                                 LASSERTF(0, "oap %p page %p returned %d "
2358                                             "from make_ready\n", oap,
2359                                             oap->oap_page, rc);
2360                                 break;
2361                         }
2362                 }
2363                 if (oap == NULL)
2364                         break;
2365                 /*
2366                  * Page submitted for IO has to be locked. Either by
2367                  * ->ap_make_ready() or by higher layers.
2368                  */
2369 #if defined(__KERNEL__) && defined(__linux__)
2370                  if(!(PageLocked(oap->oap_page) &&
2371                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2372                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2373                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2374                         LBUG();
2375                 }
2376 #endif
2377                 /* If there is a gap at the start of this page, it can't merge
2378                  * with any previous page, so we'll hand the network a
2379                  * "fragmented" page array that it can't transfer in 1 RDMA */
2380                 if (page_count != 0 && oap->oap_page_off != 0)
2381                         break;
2382
2383                 /* take the page out of our book-keeping */
2384                 list_del_init(&oap->oap_pending_item);
2385                 lop_update_pending(cli, lop, cmd, -1);
2386                 list_del_init(&oap->oap_urgent_item);
2387
2388                 if (page_count == 0)
2389                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2390                                           (PTLRPC_MAX_BRW_SIZE - 1);
2391
2392                 /* ask the caller for the size of the io as the rpc leaves. */
2393                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2394                         oap->oap_count =
2395                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2396                 if (oap->oap_count <= 0) {
2397                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2398                                oap->oap_count);
2399                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2400                         continue;
2401                 }
2402
2403                 /* now put the page back in our accounting */
2404                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2405                 if (page_count == 0)
2406                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2407                 if (++page_count >= cli->cl_max_pages_per_rpc)
2408                         break;
2409
2410                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2411                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2412                  * have the same alignment as the initial writes that allocated
2413                  * extents on the server. */
2414                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2415                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2416                 if (ending_offset == 0)
2417                         break;
2418
2419                 /* If there is a gap at the end of this page, it can't merge
2420                  * with any subsequent pages, so we'll hand the network a
2421                  * "fragmented" page array that it can't transfer in 1 RDMA */
2422                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2423                         break;
2424         }
2425
2426         osc_wake_cache_waiters(cli);
2427
2428         if (page_count == 0)
2429                 RETURN(0);
2430
2431         loi_list_maint(cli, loi);
2432
2433         client_obd_list_unlock(&cli->cl_loi_list_lock);
2434
2435         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2436         if (IS_ERR(req)) {
2437                 /* this should happen rarely and is pretty bad, it makes the
2438                  * pending list not follow the dirty order */
2439                 client_obd_list_lock(&cli->cl_loi_list_lock);
2440                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2441                         list_del_init(&oap->oap_rpc_item);
2442
2443                         /* queued sync pages can be torn down while the pages
2444                          * were between the pending list and the rpc */
2445                         if (oap->oap_interrupted) {
2446                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2447                                 osc_ap_completion(cli, NULL, oap, 0,
2448                                                   oap->oap_count);
2449                                 continue;
2450                         }
2451                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2452                 }
2453                 loi_list_maint(cli, loi);
2454                 RETURN(PTR_ERR(req));
2455         }
2456
2457         aa = ptlrpc_req_async_args(req);
2458         if (cmd == OBD_BRW_READ) {
2459                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2460                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2461                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2462                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2463         } else {
2464                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2465                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2466                                  cli->cl_w_in_flight);
2467                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2468                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2469         }
2470         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2471
2472         client_obd_list_lock(&cli->cl_loi_list_lock);
2473
2474         if (cmd == OBD_BRW_READ)
2475                 cli->cl_r_in_flight++;
2476         else
2477                 cli->cl_w_in_flight++;
2478
2479         /* queued sync pages can be torn down while the pages
2480          * were between the pending list and the rpc */
2481         tmp = NULL;
2482         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2483                 /* only one oap gets a request reference */
2484                 if (tmp == NULL)
2485                         tmp = oap;
2486                 if (oap->oap_interrupted && !req->rq_intr) {
2487                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2488                                oap, req);
2489                         ptlrpc_mark_interrupted(req);
2490                 }
2491         }
2492         if (tmp != NULL)
2493                 tmp->oap_request = ptlrpc_request_addref(req);
2494
2495         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2496                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2497
2498         req->rq_interpret_reply = brw_interpret;
2499         ptlrpcd_add_req(req);
2500         RETURN(1);
2501 }
2502
2503 #define LOI_DEBUG(LOI, STR, args...)                                     \
2504         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2505                !list_empty(&(LOI)->loi_ready_item) ||                    \
2506                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2507                (LOI)->loi_write_lop.lop_num_pending,                     \
2508                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2509                (LOI)->loi_read_lop.lop_num_pending,                      \
2510                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2511                args)                                                     \
2512
2513 /* This is called by osc_check_rpcs() to find which objects have pages that
2514  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2515 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2516 {
2517         ENTRY;
2518         /* First return objects that have blocked locks so that they
2519          * will be flushed quickly and other clients can get the lock,
2520          * then objects which have pages ready to be stuffed into RPCs */
2521         if (!list_empty(&cli->cl_loi_hp_ready_list))
2522                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2523                                   struct lov_oinfo, loi_hp_ready_item));
2524         if (!list_empty(&cli->cl_loi_ready_list))
2525                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2526                                   struct lov_oinfo, loi_ready_item));
2527
2528         /* then if we have cache waiters, return all objects with queued
2529          * writes.  This is especially important when many small files
2530          * have filled up the cache and not been fired into rpcs because
2531          * they don't pass the nr_pending/object threshhold */
2532         if (!list_empty(&cli->cl_cache_waiters) &&
2533             !list_empty(&cli->cl_loi_write_list))
2534                 RETURN(list_entry(cli->cl_loi_write_list.next,
2535                                   struct lov_oinfo, loi_write_item));
2536
2537         /* then return all queued objects when we have an invalid import
2538          * so that they get flushed */
2539         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2540                 if (!list_empty(&cli->cl_loi_write_list))
2541                         RETURN(list_entry(cli->cl_loi_write_list.next,
2542                                           struct lov_oinfo, loi_write_item));
2543                 if (!list_empty(&cli->cl_loi_read_list))
2544                         RETURN(list_entry(cli->cl_loi_read_list.next,
2545                                           struct lov_oinfo, loi_read_item));
2546         }
2547         RETURN(NULL);
2548 }
2549
2550 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2551 {
2552         struct osc_async_page *oap;
2553         int hprpc = 0;
2554
2555         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2556                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2557                                  struct osc_async_page, oap_urgent_item);
2558                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2559         }
2560
2561         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2562                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2563                                  struct osc_async_page, oap_urgent_item);
2564                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2565         }
2566
2567         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2568 }
2569
2570 /* called with the loi list lock held */
2571 static void osc_check_rpcs(struct client_obd *cli)
2572 {
2573         struct lov_oinfo *loi;
2574         int rc = 0, race_counter = 0;
2575         ENTRY;
2576
2577         while ((loi = osc_next_loi(cli)) != NULL) {
2578                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2579
2580                 if (osc_max_rpc_in_flight(cli, loi))
2581                         break;
2582
2583                 /* attempt some read/write balancing by alternating between
2584                  * reads and writes in an object.  The makes_rpc checks here
2585                  * would be redundant if we were getting read/write work items
2586                  * instead of objects.  we don't want send_oap_rpc to drain a
2587                  * partial read pending queue when we're given this object to
2588                  * do io on writes while there are cache waiters */
2589                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2590                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2591                                               &loi->loi_write_lop);
2592                         if (rc < 0)
2593                                 break;
2594                         if (rc > 0)
2595                                 race_counter = 0;
2596                         else
2597                                 race_counter++;
2598                 }
2599                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2600                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2601                                               &loi->loi_read_lop);
2602                         if (rc < 0)
2603                                 break;
2604                         if (rc > 0)
2605                                 race_counter = 0;
2606                         else
2607                                 race_counter++;
2608                 }
2609
2610                 /* attempt some inter-object balancing by issueing rpcs
2611                  * for each object in turn */
2612                 if (!list_empty(&loi->loi_hp_ready_item))
2613                         list_del_init(&loi->loi_hp_ready_item);
2614                 if (!list_empty(&loi->loi_ready_item))
2615                         list_del_init(&loi->loi_ready_item);
2616                 if (!list_empty(&loi->loi_write_item))
2617                         list_del_init(&loi->loi_write_item);
2618                 if (!list_empty(&loi->loi_read_item))
2619                         list_del_init(&loi->loi_read_item);
2620
2621                 loi_list_maint(cli, loi);
2622
2623                 /* send_oap_rpc fails with 0 when make_ready tells it to
2624                  * back off.  llite's make_ready does this when it tries
2625                  * to lock a page queued for write that is already locked.
2626                  * we want to try sending rpcs from many objects, but we
2627                  * don't want to spin failing with 0.  */
2628                 if (race_counter == 10)
2629                         break;
2630         }
2631         EXIT;
2632 }
2633
2634 /* we're trying to queue a page in the osc so we're subject to the
2635  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2636  * If the osc's queued pages are already at that limit, then we want to sleep
2637  * until there is space in the osc's queue for us.  We also may be waiting for
2638  * write credits from the OST if there are RPCs in flight that may return some
2639  * before we fall back to sync writes.
2640  *
2641  * We need this know our allocation was granted in the presence of signals */
2642 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2643 {
2644         int rc;
2645         ENTRY;
2646         client_obd_list_lock(&cli->cl_loi_list_lock);
2647         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2648         client_obd_list_unlock(&cli->cl_loi_list_lock);
2649         RETURN(rc);
2650 };
2651
2652 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2653  * grant or cache space. */
2654 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2655                            struct osc_async_page *oap)
2656 {
2657         struct osc_cache_waiter ocw;
2658         struct l_wait_info lwi = { 0 };
2659         ENTRY;
2660
2661         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2662                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2663                cli->cl_dirty_max, obd_max_dirty_pages,
2664                cli->cl_lost_grant, cli->cl_avail_grant);
2665
2666         /* force the caller to try sync io.  this can jump the list
2667          * of queued writes and create a discontiguous rpc stream */
2668         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2669             loi->loi_ar.ar_force_sync)
2670                 RETURN(-EDQUOT);
2671
2672         /* Hopefully normal case - cache space and write credits available */
2673         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2674             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2675             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2676                 /* account for ourselves */
2677                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2678                 RETURN(0);
2679         }
2680
2681         /* Make sure that there are write rpcs in flight to wait for.  This
2682          * is a little silly as this object may not have any pending but
2683          * other objects sure might. */
2684         if (cli->cl_w_in_flight) {
2685                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2686                 cfs_waitq_init(&ocw.ocw_waitq);
2687                 ocw.ocw_oap = oap;
2688                 ocw.ocw_rc = 0;
2689
2690                 loi_list_maint(cli, loi);
2691                 osc_check_rpcs(cli);
2692                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2693
2694                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2695                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2696
2697                 client_obd_list_lock(&cli->cl_loi_list_lock);
2698                 if (!list_empty(&ocw.ocw_entry)) {
2699                         list_del(&ocw.ocw_entry);
2700                         RETURN(-EINTR);
2701                 }
2702                 RETURN(ocw.ocw_rc);
2703         }
2704
2705         RETURN(-EDQUOT);
2706 }
2707
2708 static int osc_reget_short_lock(struct obd_export *exp,
2709                                 struct lov_stripe_md *lsm,
2710                                 void **res, int rw,
2711                                 obd_off start, obd_off end,
2712                                 void **cookie)
2713 {
2714         struct osc_async_page *oap = *res;
2715         int rc;
2716
2717         ENTRY;
2718
2719         spin_lock(&oap->oap_lock);
2720         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2721                                   start, end, cookie);
2722         spin_unlock(&oap->oap_lock);
2723
2724         RETURN(rc);
2725 }
2726
2727 static int osc_release_short_lock(struct obd_export *exp,
2728                                   struct lov_stripe_md *lsm, obd_off end,
2729                                   void *cookie, int rw)
2730 {
2731         ENTRY;
2732         ldlm_lock_fast_release(cookie, rw);
2733         /* no error could have happened at this layer */
2734         RETURN(0);
2735 }
2736
2737 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2738                         struct lov_oinfo *loi, cfs_page_t *page,
2739                         obd_off offset, struct obd_async_page_ops *ops,
2740                         void *data, void **res, int nocache,
2741                         struct lustre_handle *lockh)
2742 {
2743         struct osc_async_page *oap;
2744         struct ldlm_res_id oid = {{0}};
2745         int rc = 0;
2746
2747         ENTRY;
2748
2749         if (!page)
2750                 return size_round(sizeof(*oap));
2751
2752         oap = *res;
2753         oap->oap_magic = OAP_MAGIC;
2754         oap->oap_cli = &exp->exp_obd->u.cli;
2755         oap->oap_loi = loi;
2756
2757         oap->oap_caller_ops = ops;
2758         oap->oap_caller_data = data;
2759
2760         oap->oap_page = page;
2761         oap->oap_obj_off = offset;
2762
2763         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2764         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2765         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2766         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2767
2768         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2769
2770         spin_lock_init(&oap->oap_lock);
2771
2772         /* If the page was marked as notcacheable - don't add to any locks */
2773         if (!nocache) {
2774                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2775                 /* This is the only place where we can call cache_add_extent
2776                    without oap_lock, because this page is locked now, and
2777                    the lock we are adding it to is referenced, so cannot lose
2778                    any pages either. */
2779                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2780                 if (rc)
2781                         RETURN(rc);
2782         }
2783
2784         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2785         RETURN(0);
2786 }
2787
2788 struct osc_async_page *oap_from_cookie(void *cookie)
2789 {
2790         struct osc_async_page *oap = cookie;
2791         if (oap->oap_magic != OAP_MAGIC)
2792                 return ERR_PTR(-EINVAL);
2793         return oap;
2794 };
2795
2796 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2797                               struct lov_oinfo *loi, void *cookie,
2798                               int cmd, obd_off off, int count,
2799                               obd_flag brw_flags, enum async_flags async_flags)
2800 {
2801         struct client_obd *cli = &exp->exp_obd->u.cli;
2802         struct osc_async_page *oap;
2803         int rc = 0;
2804         ENTRY;
2805
2806         oap = oap_from_cookie(cookie);
2807         if (IS_ERR(oap))
2808                 RETURN(PTR_ERR(oap));
2809
2810         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2811                 RETURN(-EIO);
2812
2813         if (!list_empty(&oap->oap_pending_item) ||
2814             !list_empty(&oap->oap_urgent_item) ||
2815             !list_empty(&oap->oap_rpc_item))
2816                 RETURN(-EBUSY);
2817
2818         /* check if the file's owner/group is over quota */
2819         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2820                 struct obd_async_page_ops *ops;
2821                 struct obdo *oa;
2822
2823                 OBDO_ALLOC(oa);
2824                 if (oa == NULL)
2825                         RETURN(-ENOMEM);
2826
2827                 ops = oap->oap_caller_ops;
2828                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2829                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2830                     NO_QUOTA)
2831                         rc = -EDQUOT;
2832
2833                 OBDO_FREE(oa);
2834                 if (rc)
2835                         RETURN(rc);
2836         }
2837
2838         if (loi == NULL)
2839                 loi = lsm->lsm_oinfo[0];
2840
2841         client_obd_list_lock(&cli->cl_loi_list_lock);
2842
2843         oap->oap_cmd = cmd;
2844         oap->oap_page_off = off;
2845         oap->oap_count = count;
2846         oap->oap_brw_flags = brw_flags;
2847         oap->oap_async_flags = async_flags;
2848
2849         if (cmd & OBD_BRW_WRITE) {
2850                 rc = osc_enter_cache(cli, loi, oap);
2851                 if (rc) {
2852                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2853                         RETURN(rc);
2854                 }
2855         }
2856
2857         osc_oap_to_pending(oap);
2858         loi_list_maint(cli, loi);
2859
2860         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2861                   cmd);
2862
2863         osc_check_rpcs(cli);
2864         client_obd_list_unlock(&cli->cl_loi_list_lock);
2865
2866         RETURN(0);
2867 }
2868
2869 /* aka (~was & now & flag), but this is more clear :) */
2870 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2871
2872 static int osc_set_async_flags(struct obd_export *exp,
2873                                struct lov_stripe_md *lsm,
2874                                struct lov_oinfo *loi, void *cookie,
2875                                obd_flag async_flags)
2876 {
2877         struct client_obd *cli = &exp->exp_obd->u.cli;
2878         struct loi_oap_pages *lop;
2879         struct osc_async_page *oap;
2880         int rc = 0;
2881         ENTRY;
2882
2883         oap = oap_from_cookie(cookie);
2884         if (IS_ERR(oap))
2885                 RETURN(PTR_ERR(oap));
2886
2887         /*
2888          * bug 7311: OST-side locking is only supported for liblustre for now
2889          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2890          * implementation has to handle case where OST-locked page was picked
2891          * up by, e.g., ->writepage().
2892          */
2893         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2894         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2895                                      * tread here. */
2896
2897         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2898                 RETURN(-EIO);
2899
2900         if (loi == NULL)
2901                 loi = lsm->lsm_oinfo[0];
2902
2903         if (oap->oap_cmd & OBD_BRW_WRITE) {
2904                 lop = &loi->loi_write_lop;
2905         } else {
2906                 lop = &loi->loi_read_lop;
2907         }
2908
2909         client_obd_list_lock(&cli->cl_loi_list_lock);
2910
2911         if (list_empty(&oap->oap_pending_item))
2912                 GOTO(out, rc = -EINVAL);
2913
2914         if ((oap->oap_async_flags & async_flags) == async_flags)
2915                 GOTO(out, rc = 0);
2916
2917         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2918                 oap->oap_async_flags |= ASYNC_READY;
2919
2920         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2921             list_empty(&oap->oap_rpc_item)) {
2922                 if (oap->oap_async_flags & ASYNC_HP)
2923                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2924                 else
2925                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2926                 oap->oap_async_flags |= ASYNC_URGENT;
2927                 loi_list_maint(cli, loi);
2928         }
2929
2930         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2931                         oap->oap_async_flags);
2932 out:
2933         osc_check_rpcs(cli);
2934         client_obd_list_unlock(&cli->cl_loi_list_lock);
2935         RETURN(rc);
2936 }
2937
2938 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2939                              struct lov_oinfo *loi,
2940                              struct obd_io_group *oig, void *cookie,
2941                              int cmd, obd_off off, int count,
2942                              obd_flag brw_flags,
2943                              obd_flag async_flags)
2944 {
2945         struct client_obd *cli = &exp->exp_obd->u.cli;
2946         struct osc_async_page *oap;
2947         struct loi_oap_pages *lop;
2948         int rc = 0;
2949         ENTRY;
2950
2951         oap = oap_from_cookie(cookie);
2952         if (IS_ERR(oap))
2953                 RETURN(PTR_ERR(oap));
2954
2955         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2956                 RETURN(-EIO);
2957
2958         if (!list_empty(&oap->oap_pending_item) ||
2959             !list_empty(&oap->oap_urgent_item) ||
2960             !list_empty(&oap->oap_rpc_item))
2961                 RETURN(-EBUSY);
2962
2963         if (loi == NULL)
2964                 loi = lsm->lsm_oinfo[0];
2965
2966         client_obd_list_lock(&cli->cl_loi_list_lock);
2967
2968         oap->oap_cmd = cmd;
2969         oap->oap_page_off = off;
2970         oap->oap_count = count;
2971         oap->oap_brw_flags = brw_flags;
2972         oap->oap_async_flags = async_flags;
2973
2974         if (cmd & OBD_BRW_WRITE)
2975                 lop = &loi->loi_write_lop;
2976         else
2977                 lop = &loi->loi_read_lop;
2978
2979         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2980         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2981                 oap->oap_oig = oig;
2982                 rc = oig_add_one(oig, &oap->oap_occ);
2983         }
2984
2985         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2986                   oap, oap->oap_page, rc);
2987
2988         client_obd_list_unlock(&cli->cl_loi_list_lock);
2989
2990         RETURN(rc);
2991 }
2992
2993 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2994                                  struct loi_oap_pages *lop, int cmd)
2995 {
2996         struct list_head *pos, *tmp;
2997         struct osc_async_page *oap;
2998
2999         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3000                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3001                 list_del(&oap->oap_pending_item);
3002                 osc_oap_to_pending(oap);
3003         }
3004         loi_list_maint(cli, loi);
3005 }
3006
3007 static int osc_trigger_group_io(struct obd_export *exp,
3008                                 struct lov_stripe_md *lsm,
3009                                 struct lov_oinfo *loi,
3010                                 struct obd_io_group *oig)
3011 {
3012         struct client_obd *cli = &exp->exp_obd->u.cli;
3013         ENTRY;
3014
3015         if (loi == NULL)
3016                 loi = lsm->lsm_oinfo[0];
3017
3018         client_obd_list_lock(&cli->cl_loi_list_lock);
3019
3020         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3021         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3022
3023         osc_check_rpcs(cli);
3024         client_obd_list_unlock(&cli->cl_loi_list_lock);
3025
3026         RETURN(0);
3027 }
3028
3029 static int osc_teardown_async_page(struct obd_export *exp,
3030                                    struct lov_stripe_md *lsm,
3031                                    struct lov_oinfo *loi, void *cookie)
3032 {
3033         struct client_obd *cli = &exp->exp_obd->u.cli;
3034         struct loi_oap_pages *lop;
3035         struct osc_async_page *oap;
3036         int rc = 0;
3037         ENTRY;
3038
3039         oap = oap_from_cookie(cookie);
3040         if (IS_ERR(oap))
3041                 RETURN(PTR_ERR(oap));
3042
3043         if (loi == NULL)
3044                 loi = lsm->lsm_oinfo[0];
3045
3046         if (oap->oap_cmd & OBD_BRW_WRITE) {
3047                 lop = &loi->loi_write_lop;
3048         } else {
3049                 lop = &loi->loi_read_lop;
3050         }
3051
3052         client_obd_list_lock(&cli->cl_loi_list_lock);
3053
3054         if (!list_empty(&oap->oap_rpc_item))
3055                 GOTO(out, rc = -EBUSY);
3056
3057         osc_exit_cache(cli, oap, 0);
3058         osc_wake_cache_waiters(cli);
3059
3060         if (!list_empty(&oap->oap_urgent_item)) {
3061                 list_del_init(&oap->oap_urgent_item);
3062                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3063         }
3064
3065         if (!list_empty(&oap->oap_pending_item)) {
3066                 list_del_init(&oap->oap_pending_item);
3067                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3068         }
3069         loi_list_maint(cli, loi);
3070         cache_remove_extent(cli->cl_cache, oap);
3071
3072         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3073 out:
3074         client_obd_list_unlock(&cli->cl_loi_list_lock);
3075         RETURN(rc);
3076 }
3077
3078 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3079                            struct ldlm_lock_desc *new, void *data,
3080                            int flag)
3081 {
3082         struct lustre_handle lockh = { 0 };
3083         int rc;
3084         ENTRY;
3085
3086         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3087                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3088                 LBUG();
3089         }
3090
3091         switch (flag) {
3092         case LDLM_CB_BLOCKING:
3093                 ldlm_lock2handle(lock, &lockh);
3094                 rc = ldlm_cli_cancel(&lockh);
3095                 if (rc != ELDLM_OK)
3096                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
3097                 break;
3098         case LDLM_CB_CANCELING: {
3099
3100                 ldlm_lock2handle(lock, &lockh);
3101                 /* This lock wasn't granted, don't try to do anything */
3102                 if (lock->l_req_mode != lock->l_granted_mode)
3103                         RETURN(0);
3104
3105                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3106                                   &lockh);
3107
3108                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3109                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3110                                                           lock, new, data,flag);
3111                 break;
3112         }
3113         default:
3114                 LBUG();
3115         }
3116
3117         RETURN(0);
3118 }
3119 EXPORT_SYMBOL(osc_extent_blocking_cb);
3120
3121 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3122                                     int flags)
3123 {
3124         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3125
3126         if (lock == NULL) {
3127                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3128                 return;
3129         }
3130         lock_res_and_lock(lock);
3131 #if defined (__KERNEL__) && defined (__linux__)
3132         /* Liang XXX: Darwin and Winnt checking should be added */
3133         if (lock->l_ast_data && lock->l_ast_data != data) {
3134                 struct inode *new_inode = data;
3135                 struct inode *old_inode = lock->l_ast_data;
3136                 if (!(old_inode->i_state & I_FREEING))
3137                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3138                 LASSERTF(old_inode->i_state & I_FREEING,
3139                          "Found existing inode %p/%lu/%u state %lu in lock: "
3140                          "setting data to %p/%lu/%u\n", old_inode,
3141                          old_inode->i_ino, old_inode->i_generation,
3142                          old_inode->i_state,
3143                          new_inode, new_inode->i_ino, new_inode->i_generation);
3144         }
3145 #endif
3146         lock->l_ast_data = data;
3147         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3148         unlock_res_and_lock(lock);
3149         LDLM_LOCK_PUT(lock);
3150 }
3151
3152 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3153                              ldlm_iterator_t replace, void *data)
3154 {
3155         struct ldlm_res_id res_id;
3156         struct obd_device *obd = class_exp2obd(exp);
3157
3158         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3159         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3160         return 0;
3161 }
3162
3163 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3164                             struct obd_info *oinfo, int intent, int rc)
3165 {
3166         ENTRY;
3167
3168         if (intent) {
3169                 /* The request was created before ldlm_cli_enqueue call. */
3170                 if (rc == ELDLM_LOCK_ABORTED) {
3171                         struct ldlm_reply *rep;
3172
3173                         /* swabbed by ldlm_cli_enqueue() */
3174                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3175                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3176                                              sizeof(*rep));
3177                         LASSERT(rep != NULL);
3178                         if (rep->lock_policy_res1)
3179                                 rc = rep->lock_policy_res1;
3180                 }
3181         }
3182
3183         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3184                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3185                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3186                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3187                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3188         }
3189
3190         if (!rc)
3191                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3192
3193         /* Call the update callback. */
3194         rc = oinfo->oi_cb_up(oinfo, rc);
3195         RETURN(rc);
3196 }
3197
3198 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3199                                  struct osc_enqueue_args *aa, int rc)
3200 {
3201         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3202         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3203         struct ldlm_lock *lock;
3204
3205         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3206          * be valid. */
3207         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3208
3209         /* Complete obtaining the lock procedure. */
3210         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3211                                    aa->oa_ei->ei_mode,
3212                                    &aa->oa_oi->oi_flags,
3213                                    &lsm->lsm_oinfo[0]->loi_lvb,
3214                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3215                                    lustre_swab_ost_lvb,
3216                                    aa->oa_oi->oi_lockh, rc);
3217
3218         /* Complete osc stuff. */
3219         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3220
3221         /* Release the lock for async request. */
3222         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3223                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3224
3225         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3226                  aa->oa_oi->oi_lockh, req, aa);
3227         LDLM_LOCK_PUT(lock);
3228         return rc;
3229 }
3230
3231 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3232  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3233  * other synchronous requests, however keeping some locks and trying to obtain
3234  * others may take a considerable amount of time in a case of ost failure; and
3235  * when other sync requests do not get released lock from a client, the client
3236  * is excluded from the cluster -- such scenarious make the life difficult, so
3237  * release locks just after they are obtained. */
3238 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3239                        struct ldlm_enqueue_info *einfo,
3240                        struct ptlrpc_request_set *rqset)
3241 {
3242         struct ldlm_res_id res_id;
3243         struct obd_device *obd = exp->exp_obd;
3244         struct ldlm_reply *rep;
3245         struct ptlrpc_request *req = NULL;
3246         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3247         ldlm_mode_t mode;
3248         int rc;
3249         ENTRY;
3250
3251         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3252                            oinfo->oi_md->lsm_object_gr, &res_id);
3253         /* Filesystem lock extents are extended to page boundaries so that
3254          * dealing with the page cache is a little smoother.  */
3255         oinfo->oi_policy.l_extent.start -=
3256                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3257         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3258
3259         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3260                 goto no_match;
3261
3262         /* Next, search for already existing extent locks that will cover us */
3263         /* If we're trying to read, we also search for an existing PW lock.  The
3264          * VFS and page cache already protect us locally, so lots of readers/
3265          * writers can share a single PW lock.
3266          *
3267          * There are problems with conversion deadlocks, so instead of
3268          * converting a read lock to a write lock, we'll just enqueue a new
3269          * one.
3270          *
3271          * At some point we should cancel the read lock instead of making them
3272          * send us a blocking callback, but there are problems with canceling
3273          * locks out from other users right now, too. */
3274         mode = einfo->ei_mode;
3275         if (einfo->ei_mode == LCK_PR)
3276                 mode |= LCK_PW;
3277         mode = ldlm_lock_match(obd->obd_namespace,
3278                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3279                                einfo->ei_type, &oinfo->oi_policy, mode,
3280                                oinfo->oi_lockh);
3281         if (mode) {
3282                 /* addref the lock only if not async requests and PW lock is
3283                  * matched whereas we asked for PR. */
3284                 if (!rqset && einfo->ei_mode != mode)
3285                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3286                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3287                                         oinfo->oi_flags);
3288                 if (intent) {
3289                         /* I would like to be able to ASSERT here that rss <=
3290                          * kms, but I can't, for reasons which are explained in
3291                          * lov_enqueue() */
3292                 }
3293
3294                 /* We already have a lock, and it's referenced */
3295                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3296
3297                 /* For async requests, decref the lock. */
3298                 if (einfo->ei_mode != mode)
3299                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3300                 else if (rqset)
3301                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3302
3303                 RETURN(ELDLM_OK);
3304         }
3305
3306  no_match:
3307         if (intent) {
3308                 __u32 size[3] = {
3309                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3310                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3311                         [DLM_LOCKREQ_OFF + 1] = 0 };
3312
3313                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3314                 if (req == NULL)
3315                         RETURN(-ENOMEM);
3316
3317                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3318                 size[DLM_REPLY_REC_OFF] =
3319                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3320                 ptlrpc_req_set_repsize(req, 3, size);
3321         }
3322
3323         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3324         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3325
3326         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3327                               &oinfo->oi_policy, &oinfo->oi_flags,
3328                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3329                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3330                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3331                               rqset ? 1 : 0);
3332         if (rqset) {
3333                 if (!rc) {
3334                         struct osc_enqueue_args *aa;
3335                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3336                         aa = ptlrpc_req_async_args(req);
3337                         aa->oa_oi = oinfo;
3338                         aa->oa_ei = einfo;
3339                         aa->oa_exp = exp;
3340
3341                         req->rq_interpret_reply = osc_enqueue_interpret;
3342                         ptlrpc_set_add_req(rqset, req);
3343                 } else if (intent) {
3344                         ptlrpc_req_finished(req);
3345                 }
3346                 RETURN(rc);
3347         }
3348
3349         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3350         if (intent)
3351                 ptlrpc_req_finished(req);
3352
3353         RETURN(rc);
3354 }
3355
3356 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3357                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3358                      int *flags, void *data, struct lustre_handle *lockh,
3359                      int *n_matches)
3360 {
3361         struct ldlm_res_id res_id;
3362         struct obd_device *obd = exp->exp_obd;
3363         int lflags = *flags;
3364         ldlm_mode_t rc;
3365         ENTRY;
3366
3367         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3368
3369         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3370
3371         /* Filesystem lock extents are extended to page boundaries so that
3372          * dealing with the page cache is a little smoother */
3373         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3374         policy->l_extent.end |= ~CFS_PAGE_MASK;
3375
3376         /* Next, search for already existing extent locks that will cover us */
3377         /* If we're trying to read, we also search for an existing PW lock.  The
3378          * VFS and page cache already protect us locally, so lots of readers/
3379          * writers can share a single PW lock. */
3380         rc = mode;
3381         if (mode == LCK_PR)
3382                 rc |= LCK_PW;
3383         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3384                              &res_id, type, policy, rc, lockh);
3385         if (rc) {
3386                 osc_set_data_with_check(lockh, data, lflags);
3387                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3388                         ldlm_lock_addref(lockh, LCK_PR);
3389                         ldlm_lock_decref(lockh, LCK_PW);
3390                 }
3391                 if (n_matches != NULL)
3392                         (*n_matches)++;
3393         }
3394
3395         RETURN(rc);
3396 }
3397
3398 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3399                       __u32 mode, struct lustre_handle *lockh)
3400 {
3401         ENTRY;
3402
3403         if (unlikely(mode == LCK_GROUP))
3404                 ldlm_lock_decref_and_cancel(lockh, mode);
3405         else
3406                 ldlm_lock_decref(lockh, mode);
3407
3408         RETURN(0);
3409 }
3410
3411 static int osc_cancel_unused(struct obd_export *exp,
3412                              struct lov_stripe_md *lsm, int flags, void *opaque)
3413 {
3414         struct obd_device *obd = class_exp2obd(exp);
3415         struct ldlm_res_id res_id, *resp = NULL;
3416
3417         if (lsm != NULL) {
3418                 resp = osc_build_res_name(lsm->lsm_object_id,
3419                                           lsm->lsm_object_gr, &res_id);
3420         }
3421
3422         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3423
3424 }
3425
3426 static int osc_join_lru(struct obd_export *exp,
3427                         struct lov_stripe_md *lsm, int join)
3428 {
3429         struct obd_device *obd = class_exp2obd(exp);
3430         struct ldlm_res_id res_id, *resp = NULL;
3431
3432         if (lsm != NULL) {
3433                 resp = osc_build_res_name(lsm->lsm_object_id,
3434                                           lsm->lsm_object_gr, &res_id);
3435         }
3436
3437         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3438
3439 }
3440
3441 static int osc_statfs_interpret(struct ptlrpc_request *req,
3442                                 struct osc_async_args *aa, int rc)
3443 {
3444         struct obd_statfs *msfs;
3445         ENTRY;
3446
3447         if (rc != 0)
3448                 GOTO(out, rc);
3449
3450         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3451                                   lustre_swab_obd_statfs);
3452         if (msfs == NULL) {
3453                 CERROR("Can't unpack obd_statfs\n");
3454                 GOTO(out, rc = -EPROTO);
3455         }
3456
3457         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3458 out:
3459         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3460         RETURN(rc);
3461 }
3462
3463 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3464                             __u64 max_age, struct ptlrpc_request_set *rqset)
3465 {
3466         struct ptlrpc_request *req;
3467         struct osc_async_args *aa;
3468         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3469         ENTRY;
3470
3471         /* We could possibly pass max_age in the request (as an absolute
3472          * timestamp or a "seconds.usec ago") so the target can avoid doing
3473          * extra calls into the filesystem if that isn't necessary (e.g.
3474          * during mount that would help a bit).  Having relative timestamps
3475          * is not so great if request processing is slow, while absolute
3476          * timestamps are not ideal because they need time synchronization. */
3477         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3478                               OST_STATFS, 1, NULL, NULL);
3479         if (!req)
3480                 RETURN(-ENOMEM);
3481
3482         ptlrpc_req_set_repsize(req, 2, size);
3483         req->rq_request_portal = OST_CREATE_PORTAL;
3484         ptlrpc_at_set_req_timeout(req);
3485         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3486                 /* procfs requests not want stat in wait for avoid deadlock */
3487                 req->rq_no_resend = 1;
3488                 req->rq_no_delay = 1;
3489         }
3490
3491         req->rq_interpret_reply = osc_statfs_interpret;
3492         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3493         aa = ptlrpc_req_async_args(req);
3494         aa->aa_oi = oinfo;
3495
3496         ptlrpc_set_add_req(rqset, req);
3497         RETURN(0);
3498 }
3499
3500 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3501                       __u64 max_age, __u32 flags)
3502 {
3503         struct obd_statfs *msfs;
3504         struct ptlrpc_request *req;
3505         struct obd_import     *imp = NULL;
3506         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3507         int rc;
3508         ENTRY;
3509
3510         /*Since the request might also come from lprocfs, so we need
3511          *sync this with client_disconnect_export Bug15684*/
3512         down_read(&obd->u.cli.cl_sem);
3513         if (obd->u.cli.cl_import)
3514                 imp = class_import_get(obd->u.cli.cl_import);
3515         up_read(&obd->u.cli.cl_sem);
3516         if (!imp)
3517                 RETURN(-ENODEV);
3518
3519         /* We could possibly pass max_age in the request (as an absolute
3520          * timestamp or a "seconds.usec ago") so the target can avoid doing
3521          * extra calls into the filesystem if that isn't necessary (e.g.
3522          * during mount that would help a bit).  Having relative timestamps
3523          * is not so great if request processing is slow, while absolute
3524          * timestamps are not ideal because they need time synchronization. */
3525         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3526                               OST_STATFS, 1, NULL, NULL);
3527
3528         class_import_put(imp);
3529         if (!req)
3530                 RETURN(-ENOMEM);
3531
3532         ptlrpc_req_set_repsize(req, 2, size);
3533         req->rq_request_portal = OST_CREATE_PORTAL;
3534         ptlrpc_at_set_req_timeout(req);
3535
3536         if (flags & OBD_STATFS_NODELAY) {
3537                 /* procfs requests not want stat in wait for avoid deadlock */
3538                 req->rq_no_resend = 1;
3539                 req->rq_no_delay = 1;
3540         }
3541
3542         rc = ptlrpc_queue_wait(req);
3543         if (rc)
3544                 GOTO(out, rc);
3545
3546         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3547                                   lustre_swab_obd_statfs);
3548         if (msfs == NULL) {
3549                 CERROR("Can't unpack obd_statfs\n");
3550                 GOTO(out, rc = -EPROTO);
3551         }
3552
3553         memcpy(osfs, msfs, sizeof(*osfs));
3554
3555         EXIT;
3556  out:
3557         ptlrpc_req_finished(req);
3558         return rc;
3559 }
3560
3561 /* Retrieve object striping information.
3562  *
3563  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3564  * the maximum number of OST indices which will fit in the user buffer.
3565  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3566  */
3567 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3568 {
3569         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3570         struct lov_user_md_v3 lum, *lumk;
3571         int rc = 0, lum_size;
3572         struct lov_user_ost_data_v1 *lmm_objects;
3573         ENTRY;
3574
3575         if (!lsm)
3576                 RETURN(-ENODATA);
3577
3578         /* we only need the header part from user space to get lmm_magic and
3579          * lmm_stripe_count, (the header part is common to v1 and v3) */
3580         lum_size = sizeof(struct lov_user_md_v1);
3581         if (copy_from_user(&lum, lump, lum_size))
3582                 RETURN(-EFAULT);
3583
3584         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3585             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3586                 RETURN(-EINVAL);
3587
3588         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3589         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3590         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3591         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3592
3593         /* we can use lov_mds_md_size() to compute lum_size
3594          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3595         if (lum.lmm_stripe_count > 0) {
3596                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3597                 OBD_ALLOC(lumk, lum_size);
3598                 if (!lumk)
3599                         RETURN(-ENOMEM);
3600                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3601                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3602                 else
3603                         lmm_objects = &(lumk->lmm_objects[0]);
3604                 lmm_objects->l_object_id = lsm->lsm_object_id;
3605         } else {
3606                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3607                 lumk = &lum;
3608         }
3609
3610         lumk->lmm_object_id = lsm->lsm_object_id;
3611         lumk->lmm_stripe_count = 1;
3612
3613         if (copy_to_user(lump, lumk, lum_size))
3614                 rc = -EFAULT;
3615
3616         if (lumk != &lum)
3617                 OBD_FREE(lumk, lum_size);
3618
3619         RETURN(rc);
3620 }
3621
3622
3623 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3624                          void *karg, void *uarg)
3625 {
3626         struct obd_device *obd = exp->exp_obd;
3627         struct obd_ioctl_data *data = karg;
3628         int err = 0;
3629         ENTRY;
3630
3631         if (!try_module_get(THIS_MODULE)) {
3632                 CERROR("Can't get module. Is it alive?");
3633                 return -EINVAL;
3634         }
3635         switch (cmd) {
3636         case OBD_IOC_LOV_GET_CONFIG: {
3637                 char *buf;
3638                 struct lov_desc *desc;
3639                 struct obd_uuid uuid;
3640
3641                 buf = NULL;
3642                 len = 0;
3643                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3644                         GOTO(out, err = -EINVAL);
3645
3646                 data = (struct obd_ioctl_data *)buf;
3647
3648                 if (sizeof(*desc) > data->ioc_inllen1) {
3649                         obd_ioctl_freedata(buf, len);
3650                         GOTO(out, err = -EINVAL);
3651                 }
3652
3653                 if (data->ioc_inllen2 < sizeof(uuid)) {
3654                         obd_ioctl_freedata(buf, len);
3655                         GOTO(out, err = -EINVAL);
3656                 }
3657
3658                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3659                 desc->ld_tgt_count = 1;
3660                 desc->ld_active_tgt_count = 1;
3661                 desc->ld_default_stripe_count = 1;
3662                 desc->ld_default_stripe_size = 0;
3663                 desc->ld_default_stripe_offset = 0;
3664                 desc->ld_pattern = 0;
3665                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3666
3667                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3668
3669                 err = copy_to_user((void *)uarg, buf, len);
3670                 if (err)
3671                         err = -EFAULT;
3672                 obd_ioctl_freedata(buf, len);
3673                 GOTO(out, err);
3674         }
3675         case LL_IOC_LOV_SETSTRIPE:
3676                 err = obd_alloc_memmd(exp, karg);
3677                 if (err > 0)
3678                         err = 0;
3679                 GOTO(out, err);
3680         case LL_IOC_LOV_GETSTRIPE:
3681                 err = osc_getstripe(karg, uarg);
3682                 GOTO(out, err);
3683         case OBD_IOC_CLIENT_RECOVER:
3684                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3685                                             data->ioc_inlbuf1);
3686                 if (err > 0)
3687                         err = 0;
3688                 GOTO(out, err);
3689         case IOC_OSC_SET_ACTIVE:
3690                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3691                                                data->ioc_offset);
3692                 GOTO(out, err);
3693         case OBD_IOC_POLL_QUOTACHECK:
3694                 err = lquota_poll_check(quota_interface, exp,
3695                                         (struct if_quotacheck *)karg);
3696                 GOTO(out, err);
3697         case OBD_IOC_DESTROY: {
3698                 struct obdo            *oa;
3699
3700                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3701                         GOTO (out, err = -EPERM);
3702                 oa = &data->ioc_obdo1;
3703
3704                 if (oa->o_id == 0)
3705                         GOTO(out, err = -EINVAL);
3706
3707                 oa->o_valid |= OBD_MD_FLGROUP;
3708
3709                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3710                 GOTO(out, err);
3711         }
3712         case OBD_IOC_PING_TARGET:
3713                 err = ptlrpc_obd_ping(obd);
3714                 GOTO(out, err);
3715         default:
3716                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3717                        cmd, cfs_curproc_comm());
3718                 GOTO(out, err = -ENOTTY);
3719         }
3720 out:
3721         module_put(THIS_MODULE);
3722         return err;
3723 }
3724
3725 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3726                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3727 {
3728         ENTRY;
3729         if (!vallen || !val)
3730                 RETURN(-EFAULT);
3731
3732         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3733                 __u32 *stripe = val;
3734                 *vallen = sizeof(*stripe);
3735                 *stripe = 0;
3736                 RETURN(0);
3737         } else if (KEY_IS(KEY_LAST_ID)) {
3738                 struct ptlrpc_request *req;
3739                 obd_id *reply;
3740                 char *bufs[2] = { NULL, key };
3741                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3742                 int rc;
3743
3744                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3745                                       OST_GET_INFO, 2, size, bufs);
3746                 if (req == NULL)
3747                         RETURN(-ENOMEM);
3748
3749                 size[REPLY_REC_OFF] = *vallen;
3750                 ptlrpc_req_set_repsize(req, 2, size);
3751                 rc = ptlrpc_queue_wait(req);
3752                 if (rc)
3753                         GOTO(out, rc);
3754
3755                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3756                                            lustre_swab_ost_last_id);
3757                 if (reply == NULL) {
3758                         CERROR("Can't unpack OST last ID\n");
3759                         GOTO(out, rc = -EPROTO);
3760                 }
3761                 *((obd_id *)val) = *reply;
3762         out:
3763                 ptlrpc_req_finished(req);
3764                 RETURN(rc);
3765         } else if (KEY_IS(KEY_FIEMAP)) {
3766                 struct ptlrpc_request *req;
3767                 struct ll_user_fiemap *reply;
3768                 char *bufs[2] = { NULL, key };
3769                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3770                 int rc;
3771
3772                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3773                                       OST_GET_INFO, 2, size, bufs);
3774                 if (req == NULL)
3775                         RETURN(-ENOMEM);
3776
3777                 size[REPLY_REC_OFF] = *vallen;
3778                 ptlrpc_req_set_repsize(req, 2, size);
3779
3780                 rc = ptlrpc_queue_wait(req);
3781                 if (rc)
3782                         GOTO(out1, rc);
3783                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3784                                            lustre_swab_fiemap);
3785                 if (reply == NULL) {
3786                         CERROR("Can't unpack FIEMAP reply.\n");
3787                         GOTO(out1, rc = -EPROTO);
3788                 }
3789
3790                 memcpy(val, reply, *vallen);
3791
3792         out1:
3793                 ptlrpc_req_finished(req);
3794
3795                 RETURN(rc);
3796         }
3797
3798         RETURN(-EINVAL);
3799 }
3800
3801 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3802                                           void *aa, int rc)
3803 {
3804         struct llog_ctxt *ctxt;
3805         struct obd_import *imp = req->rq_import;
3806         ENTRY;
3807
3808         if (rc != 0)
3809                 RETURN(rc);
3810
3811         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3812         if (ctxt) {
3813                 if (rc == 0)
3814                         rc = llog_initiator_connect(ctxt);
3815                 else
3816                         CERROR("cannot establish connection for "
3817                                "ctxt %p: %d\n", ctxt, rc);
3818         }
3819
3820         llog_ctxt_put(ctxt);
3821         spin_lock(&imp->imp_lock);
3822         imp->imp_server_timeout = 1;
3823         imp->imp_pingable = 1;
3824         spin_unlock(&imp->imp_lock);
3825         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3826
3827         RETURN(rc);
3828 }
3829
3830 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3831                               void *key, obd_count vallen, void *val,
3832                               struct ptlrpc_request_set *set)
3833 {
3834         struct ptlrpc_request *req;
3835         struct obd_device  *obd = exp->exp_obd;
3836         struct obd_import *imp = class_exp2cliimp(exp);
3837         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3838         char *bufs[3] = { NULL, key, val };
3839         ENTRY;
3840
3841         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3842
3843         if (KEY_IS(KEY_NEXT_ID)) {
3844                 if (vallen != sizeof(obd_id))
3845                         RETURN(-EINVAL);
3846                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3847                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3848                        exp->exp_obd->obd_name,
3849                        obd->u.cli.cl_oscc.oscc_next_id);
3850
3851                 RETURN(0);
3852         }
3853
3854         if (KEY_IS(KEY_UNLINKED)) {
3855                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3856                 spin_lock(&oscc->oscc_lock);
3857                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3858                 spin_unlock(&oscc->oscc_lock);
3859                 RETURN(0);
3860         }
3861
3862         if (KEY_IS(KEY_INIT_RECOV)) {
3863                 if (vallen != sizeof(int))
3864                         RETURN(-EINVAL);
3865                 spin_lock(&imp->imp_lock);
3866                 imp->imp_initial_recov = *(int *)val;
3867                 spin_unlock(&imp->imp_lock);
3868                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3869                        exp->exp_obd->obd_name,
3870                        imp->imp_initial_recov);
3871                 RETURN(0);
3872         }
3873
3874         if (KEY_IS(KEY_CHECKSUM)) {
3875                 if (vallen != sizeof(int))
3876                         RETURN(-EINVAL);
3877                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3878                 RETURN(0);
3879         }
3880
3881         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3882                 RETURN(-EINVAL);
3883
3884         /* We pass all other commands directly to OST. Since nobody calls osc
3885            methods directly and everybody is supposed to go through LOV, we
3886            assume lov checked invalid values for us.
3887            The only recognised values so far are evict_by_nid and mds_conn.
3888            Even if something bad goes through, we'd get a -EINVAL from OST
3889            anyway. */
3890
3891         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3892                               bufs);
3893         if (req == NULL)
3894                 RETURN(-ENOMEM);
3895
3896         if (KEY_IS(KEY_MDS_CONN))
3897                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3898         else if (KEY_IS(KEY_GRANT_SHRINK))
3899                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3900
3901         if (KEY_IS(KEY_GRANT_SHRINK)) {
3902                 struct osc_grant_args *aa;
3903                 struct obdo *oa;
3904
3905                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3906                 aa = ptlrpc_req_async_args(req);
3907                 OBD_ALLOC_PTR(oa);
3908                 if (!oa) {
3909                         ptlrpc_req_finished(req);
3910                         RETURN(-ENOMEM);
3911                 }
3912                 *oa = ((struct ost_body *)val)->oa;
3913                 aa->aa_oa = oa;
3914                  
3915                 size[1] = vallen;
3916                 ptlrpc_req_set_repsize(req, 2, size);
3917                 ptlrpcd_add_req(req);
3918         } else {
3919                 ptlrpc_req_set_repsize(req, 1, NULL);
3920                 ptlrpc_set_add_req(set, req);
3921                 ptlrpc_check_set(set);
3922         }
3923
3924         RETURN(0);
3925 }
3926
3927
3928 static struct llog_operations osc_size_repl_logops = {
3929         lop_cancel: llog_obd_repl_cancel
3930 };
3931
3932 static struct llog_operations osc_mds_ost_orig_logops;
3933 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3934                          int count, struct llog_catid *catid,
3935                          struct obd_uuid *uuid)
3936 {
3937         int rc;
3938         ENTRY;
3939
3940         spin_lock(&obd->obd_dev_lock);
3941         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3942                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3943                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3944                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3945                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3946                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3947         }
3948         spin_unlock(&obd->obd_dev_lock);
3949
3950         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3951                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3952         if (rc) {
3953                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3954                 GOTO (out, rc);
3955         }
3956
3957         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3958                         &osc_size_repl_logops);
3959         if (rc) {
3960                 struct llog_ctxt *ctxt =
3961                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3962                 if (ctxt)
3963                         llog_cleanup(ctxt);
3964                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3965         }
3966 out:
3967         if (rc) {
3968                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3969                        obd->obd_name, tgt->obd_name, count, catid, rc);
3970                 CERROR("logid "LPX64":0x%x\n",
3971                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3972         }
3973         RETURN(rc);
3974 }
3975
3976 static int osc_llog_finish(struct obd_device *obd, int count)
3977 {
3978         struct llog_ctxt *ctxt;
3979         int rc = 0, rc2 = 0;
3980         ENTRY;
3981
3982         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3983         if (ctxt)
3984                 rc = llog_cleanup(ctxt);
3985
3986         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3987         if (ctxt)
3988                 rc2 = llog_cleanup(ctxt);
3989         if (!rc)
3990                 rc = rc2;
3991
3992         RETURN(rc);
3993 }
3994
3995 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3996                          struct obd_uuid *cluuid,
3997                          struct obd_connect_data *data,
3998                          void *localdata)
3999 {
4000         struct client_obd *cli = &obd->u.cli;
4001
4002         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4003                 long lost_grant;
4004
4005                 client_obd_list_lock(&cli->cl_loi_list_lock);
4006                 data->ocd_grant = cli->cl_avail_grant ?:
4007                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4008                 lost_grant = cli->cl_lost_grant;
4009                 cli->cl_lost_grant = 0;
4010                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4011
4012                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4013                        "cl_lost_grant: %ld\n", data->ocd_grant,
4014                        cli->cl_avail_grant, lost_grant);
4015                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4016                        " ocd_grant: %d\n", data->ocd_connect_flags,
4017                        data->ocd_version, data->ocd_grant);
4018         }
4019
4020         RETURN(0);
4021 }
4022
4023 static int osc_disconnect(struct obd_export *exp)
4024 {
4025         struct obd_device *obd = class_exp2obd(exp);
4026         struct llog_ctxt  *ctxt;
4027         int rc;
4028
4029         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4030         if (ctxt) {
4031                 if (obd->u.cli.cl_conn_count == 1) {
4032                         /* Flush any remaining cancel messages out to the
4033                          * target */
4034                         llog_sync(ctxt, exp);
4035                 }
4036                 llog_ctxt_put(ctxt);
4037         } else {
4038                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4039                        obd);
4040         }
4041
4042         osc_del_shrink_grant(&obd->u.cli);
4043         rc = client_disconnect_export(exp);
4044         return rc;
4045 }
4046
4047 static int osc_import_event(struct obd_device *obd,
4048                             struct obd_import *imp,
4049                             enum obd_import_event event)
4050 {
4051         struct client_obd *cli;
4052         int rc = 0;
4053
4054         ENTRY;
4055         LASSERT(imp->imp_obd == obd);
4056
4057         switch (event) {
4058         case IMP_EVENT_DISCON: {
4059                 /* Only do this on the MDS OSC's */
4060                 if (imp->imp_server_timeout) {
4061                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4062
4063                         spin_lock(&oscc->oscc_lock);
4064                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4065                         spin_unlock(&oscc->oscc_lock);
4066                 }
4067                 cli = &obd->u.cli;
4068                 client_obd_list_lock(&cli->cl_loi_list_lock);
4069                 cli->cl_avail_grant = 0;
4070                 cli->cl_lost_grant = 0;
4071                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4072                 ptlrpc_import_setasync(imp, -1);
4073
4074                 break;
4075         }
4076         case IMP_EVENT_INACTIVE: {
4077                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4078                 break;
4079         }
4080         case IMP_EVENT_INVALIDATE: {
4081                 struct ldlm_namespace *ns = obd->obd_namespace;
4082
4083                 /* Reset grants */
4084                 cli = &obd->u.cli;
4085                 client_obd_list_lock(&cli->cl_loi_list_lock);
4086                 /* all pages go to failing rpcs due to the invalid import */
4087                 osc_check_rpcs(cli);
4088                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4089
4090                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4091
4092                 break;
4093         }
4094         case IMP_EVENT_ACTIVE: {
4095                 /* Only do this on the MDS OSC's */
4096                 if (imp->imp_server_timeout) {
4097                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4098
4099                         spin_lock(&oscc->oscc_lock);
4100                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4101                         spin_unlock(&oscc->oscc_lock);
4102                 }
4103                 CDEBUG(D_INFO, "notify server \n");
4104                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4105                 break;
4106         }
4107         case IMP_EVENT_OCD: {
4108                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4109
4110                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4111                         osc_init_grant(&obd->u.cli, ocd);
4112
4113                 /* See bug 7198 */
4114                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4115                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4116
4117                 ptlrpc_import_setasync(imp, 1);
4118                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4119                 break;
4120         }
4121         default:
4122                 CERROR("Unknown import event %d\n", event);
4123                 LBUG();
4124         }
4125         RETURN(rc);
4126 }
4127
4128 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4129 {
4130         int rc;
4131         ENTRY;
4132
4133         ENTRY;
4134         rc = ptlrpcd_addref();
4135         if (rc)
4136                 RETURN(rc);
4137
4138         rc = client_obd_setup(obd, len, buf);
4139         if (rc) {
4140                 ptlrpcd_decref();
4141         } else {
4142                 struct lprocfs_static_vars lvars = { 0 };
4143                 struct client_obd *cli = &obd->u.cli;
4144
4145                 lprocfs_osc_init_vars(&lvars);
4146                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4147                         lproc_osc_attach_seqstat(obd);
4148                         ptlrpc_lprocfs_register_obd(obd);
4149                 }
4150
4151                 oscc_init(obd);
4152                 /* We need to allocate a few requests more, because
4153                    brw_interpret tries to create new requests before freeing
4154                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4155                    reserved, but I afraid that might be too much wasted RAM
4156                    in fact, so 2 is just my guess and still should work. */
4157                 cli->cl_import->imp_rq_pool =
4158                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4159                                             OST_MAXREQSIZE,
4160                                             ptlrpc_add_rqs_to_pool);
4161                 cli->cl_cache = cache_create(obd);
4162                 if (!cli->cl_cache) {
4163                         osc_cleanup(obd);
4164                         rc = -ENOMEM;
4165                 }
4166                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4167                 sema_init(&cli->cl_grant_sem, 1);
4168         }
4169
4170         RETURN(rc);
4171 }
4172
4173 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4174 {
4175         int rc = 0;
4176         ENTRY;
4177
4178         switch (stage) {
4179         case OBD_CLEANUP_EARLY: {
4180                 struct obd_import *imp;
4181                 imp = obd->u.cli.cl_import;
4182                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4183                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4184                 ptlrpc_deactivate_import(imp);
4185                 break;
4186         }
4187         case OBD_CLEANUP_EXPORTS: {
4188                 /* If we set up but never connected, the
4189                    client import will not have been cleaned. */
4190                 if (obd->u.cli.cl_import) {
4191                         struct obd_import *imp;
4192                         down_write(&obd->u.cli.cl_sem);
4193                         imp = obd->u.cli.cl_import;
4194                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4195                                obd->obd_name);
4196                         ptlrpc_invalidate_import(imp);
4197                         if (imp->imp_rq_pool) {
4198                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4199                                 imp->imp_rq_pool = NULL;
4200                         }
4201                         class_destroy_import(imp);
4202                         up_write(&obd->u.cli.cl_sem);
4203                         obd->u.cli.cl_import = NULL;
4204                 }
4205                 rc = obd_llog_finish(obd, 0);
4206                 if (rc != 0)
4207                         CERROR("failed to cleanup llogging subsystems\n");
4208                 break;
4209         }
4210         case OBD_CLEANUP_SELF_EXP:
4211                 break;
4212         case OBD_CLEANUP_OBD:
4213                 break;
4214         }
4215         RETURN(rc);
4216 }
4217
4218 int osc_cleanup(struct obd_device *obd)
4219 {
4220         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4221         int rc;
4222
4223         ENTRY;
4224         ptlrpc_lprocfs_unregister_obd(obd);
4225         lprocfs_obd_cleanup(obd);
4226
4227         spin_lock(&oscc->oscc_lock);
4228         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4229         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4230         spin_unlock(&oscc->oscc_lock);
4231
4232         /* free memory of osc quota cache */
4233         lquota_cleanup(quota_interface, obd);
4234
4235         cache_destroy(obd->u.cli.cl_cache);
4236         rc = client_obd_cleanup(obd);
4237
4238         ptlrpcd_decref();
4239         RETURN(rc);
4240 }
4241
4242 static int osc_register_page_removal_cb(struct obd_device *obd,
4243                                         obd_page_removal_cb_t func,
4244                                         obd_pin_extent_cb pin_cb)
4245 {
4246         ENTRY;
4247
4248         /* this server - not need init */
4249         if (func == NULL)
4250                 return 0;
4251
4252         return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4253                                            pin_cb);
4254 }
4255
4256 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4257                                           obd_page_removal_cb_t func)
4258 {
4259         ENTRY;
4260         return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4261 }
4262
4263 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4264                                        obd_lock_cancel_cb cb)
4265 {
4266         ENTRY;
4267         LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4268
4269         /* this server - not need init */
4270         if (cb == NULL)
4271                 return 0;
4272
4273         obd->u.cli.cl_ext_lock_cancel_cb = cb;
4274         return 0;
4275 }
4276
4277 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4278                                          obd_lock_cancel_cb cb)
4279 {
4280         ENTRY;
4281
4282         if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4283                 CERROR("Unregistering cancel cb %p, while only %p was "
4284                        "registered\n", cb,
4285                        obd->u.cli.cl_ext_lock_cancel_cb);
4286                 RETURN(-EINVAL);
4287         }
4288
4289         obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4290         return 0;
4291 }
4292
4293 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4294 {
4295         struct lustre_cfg *lcfg = buf;
4296         struct lprocfs_static_vars lvars = { 0 };
4297         int rc = 0;
4298
4299         lprocfs_osc_init_vars(&lvars);
4300
4301         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4302         return(rc);
4303 }
4304
4305 struct obd_ops osc_obd_ops = {
4306         .o_owner                = THIS_MODULE,
4307         .o_setup                = osc_setup,
4308         .o_precleanup           = osc_precleanup,
4309         .o_cleanup              = osc_cleanup,
4310         .o_add_conn             = client_import_add_conn,
4311         .o_del_conn             = client_import_del_conn,
4312         .o_connect              = client_connect_import,
4313         .o_reconnect            = osc_reconnect,
4314         .o_disconnect           = osc_disconnect,
4315         .o_statfs               = osc_statfs,
4316         .o_statfs_async         = osc_statfs_async,
4317         .o_packmd               = osc_packmd,
4318         .o_unpackmd             = osc_unpackmd,
4319         .o_precreate            = osc_precreate,
4320         .o_create               = osc_create,
4321         .o_destroy              = osc_destroy,
4322         .o_getattr              = osc_getattr,
4323         .o_getattr_async        = osc_getattr_async,
4324         .o_setattr              = osc_setattr,
4325         .o_setattr_async        = osc_setattr_async,
4326         .o_brw                  = osc_brw,
4327         .o_brw_async            = osc_brw_async,
4328         .o_prep_async_page      = osc_prep_async_page,
4329         .o_reget_short_lock     = osc_reget_short_lock,
4330         .o_release_short_lock   = osc_release_short_lock,
4331         .o_queue_async_io       = osc_queue_async_io,
4332         .o_set_async_flags      = osc_set_async_flags,
4333         .o_queue_group_io       = osc_queue_group_io,
4334         .o_trigger_group_io     = osc_trigger_group_io,
4335         .o_teardown_async_page  = osc_teardown_async_page,
4336         .o_punch                = osc_punch,
4337         .o_sync                 = osc_sync,
4338         .o_enqueue              = osc_enqueue,
4339         .o_match                = osc_match,
4340         .o_change_cbdata        = osc_change_cbdata,
4341         .o_cancel               = osc_cancel,
4342         .o_cancel_unused        = osc_cancel_unused,
4343         .o_join_lru             = osc_join_lru,
4344         .o_iocontrol            = osc_iocontrol,
4345         .o_get_info             = osc_get_info,
4346         .o_set_info_async       = osc_set_info_async,
4347         .o_import_event         = osc_import_event,
4348         .o_llog_init            = osc_llog_init,
4349         .o_llog_finish          = osc_llog_finish,
4350         .o_process_config       = osc_process_config,
4351         .o_register_page_removal_cb = osc_register_page_removal_cb,
4352         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4353         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4354         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4355 };
4356 int __init osc_init(void)
4357 {
4358         struct lprocfs_static_vars lvars = { 0 };
4359         int rc;
4360         ENTRY;
4361
4362         lprocfs_osc_init_vars(&lvars);
4363
4364         request_module("lquota");
4365         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4366         lquota_init(quota_interface);
4367         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4368
4369         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4370                                  LUSTRE_OSC_NAME);
4371         if (rc) {
4372                 if (quota_interface)
4373                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4374                 RETURN(rc);
4375         }
4376
4377         RETURN(rc);
4378 }
4379
4380 #ifdef __KERNEL__
4381 static void /*__exit*/ osc_exit(void)
4382 {
4383         lquota_exit(quota_interface);
4384         if (quota_interface)
4385                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4386
4387         class_unregister_type(LUSTRE_OSC_NAME);
4388 }
4389
4390 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4391 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4392 MODULE_LICENSE("GPL");
4393
4394 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4395 #endif