Whamcloud - gitweb
12852727d7a2a1666d947f51ce38d007560f9d6e
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  struct osc_async_args *aa, int rc)
168 {
169         struct ost_body *body;
170         ENTRY;
171
172         if (rc != 0)
173                 GOTO(out, rc);
174
175         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
176                                   lustre_swab_ost_body);
177         if (body) {
178                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
179                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
180
181                 /* This should really be sent by the OST */
182                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
183                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
184         } else {
185                 CERROR("can't unpack ost_body\n");
186                 rc = -EPROTO;
187                 aa->aa_oi->oi_oa->o_valid = 0;
188         }
189 out:
190         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
191         RETURN(rc);
192 }
193
194 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
195                              struct ptlrpc_request_set *set)
196 {
197         struct ptlrpc_request *req;
198         struct ost_body *body;
199         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
200         struct osc_async_args *aa;
201         ENTRY;
202
203         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
204                               OST_GETATTR, 2, size,NULL);
205         if (!req)
206                 RETURN(-ENOMEM);
207
208         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
209         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
210
211         ptlrpc_req_set_repsize(req, 2, size);
212         req->rq_interpret_reply = osc_getattr_interpret;
213
214         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
215         aa = ptlrpc_req_async_args(req);
216         aa->aa_oi = oinfo;
217
218         ptlrpc_set_add_req(set, req);
219         RETURN (0);
220 }
221
222 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
223 {
224         struct ptlrpc_request *req;
225         struct ost_body *body;
226         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
227         int rc;
228         ENTRY;
229
230         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
231                               OST_GETATTR, 2, size, NULL);
232         if (!req)
233                 RETURN(-ENOMEM);
234
235         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
236         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
237
238         ptlrpc_req_set_repsize(req, 2, size);
239
240         rc = ptlrpc_queue_wait(req);
241         if (rc) {
242                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
243                 GOTO(out, rc);
244         }
245
246         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
247                                   lustre_swab_ost_body);
248         if (body == NULL) {
249                 CERROR ("can't unpack ost_body\n");
250                 GOTO (out, rc = -EPROTO);
251         }
252
253         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
254         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
255
256         /* This should really be sent by the OST */
257         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
258         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
259
260         EXIT;
261  out:
262         ptlrpc_req_finished(req);
263         return rc;
264 }
265
266 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
267                        struct obd_trans_info *oti)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body *body;
271         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
272         int rc;
273         ENTRY;
274
275         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
276                               OST_SETATTR, 2, size, NULL);
277         if (!req)
278                 RETURN(-ENOMEM);
279
280         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
281         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
282
283         ptlrpc_req_set_repsize(req, 2, size);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
290                                   lustre_swab_ost_body);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         EXIT;
297 out:
298         ptlrpc_req_finished(req);
299         RETURN(rc);
300 }
301
302 static int osc_setattr_interpret(struct ptlrpc_request *req,
303                                  struct osc_async_args *aa, int rc)
304 {
305         struct ost_body *body;
306         ENTRY;
307
308         if (rc != 0)
309                 GOTO(out, rc);
310
311         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
312                                   lustre_swab_ost_body);
313         if (body == NULL) {
314                 CERROR("can't unpack ost_body\n");
315                 GOTO(out, rc = -EPROTO);
316         }
317
318         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
319 out:
320         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
321         RETURN(rc);
322 }
323
324 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
325                              struct obd_trans_info *oti,
326                              struct ptlrpc_request_set *rqset)
327 {
328         struct ptlrpc_request *req;
329         struct ost_body *body;
330         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
331         int bufcount = 2;
332         struct osc_async_args *aa;
333         ENTRY;
334
335         if (osc_exp_is_2_0_server(exp)) {
336                 bufcount = 3;
337         }
338
339         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
340                               OST_SETATTR, bufcount, size, NULL);
341         if (!req)
342                 RETURN(-ENOMEM);
343
344         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
345
346         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
347                 LASSERT(oti);
348                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
349         }
350
351         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
352         ptlrpc_req_set_repsize(req, 2, size);
353         /* do mds to ost setattr asynchronouly */
354         if (!rqset) {
355                 /* Do not wait for response. */
356                 ptlrpcd_add_req(req);
357         } else {
358                 req->rq_interpret_reply = osc_setattr_interpret;
359
360                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
361                 aa = ptlrpc_req_async_args(req);
362                 aa->aa_oi = oinfo;
363
364                 ptlrpc_set_add_req(rqset, req);
365         }
366
367         RETURN(0);
368 }
369
370 int osc_real_create(struct obd_export *exp, struct obdo *oa,
371                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body *body;
375         struct lov_stripe_md *lsm;
376         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
377         int rc;
378         ENTRY;
379
380         LASSERT(oa);
381         LASSERT(ea);
382
383         lsm = *ea;
384         if (!lsm) {
385                 rc = obd_alloc_memmd(exp, &lsm);
386                 if (rc < 0)
387                         RETURN(rc);
388         }
389
390         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
391                               OST_CREATE, 2, size, NULL);
392         if (!req)
393                 GOTO(out, rc = -ENOMEM);
394
395         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
396         lustre_set_wire_obdo(&body->oa, oa);
397
398         ptlrpc_req_set_repsize(req, 2, size);
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
412                                   lustre_swab_ost_body);
413         if (body == NULL) {
414                 CERROR ("can't unpack ost_body\n");
415                 GOTO (out_req, rc = -EPROTO);
416         }
417
418         lustre_get_wire_obdo(oa, &body->oa);
419
420         /* This should really be sent by the OST */
421         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
422         oa->o_valid |= OBD_MD_FLBLKSZ;
423
424         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
425          * have valid lsm_oinfo data structs, so don't go touching that.
426          * This needs to be fixed in a big way.
427          */
428         lsm->lsm_object_id = oa->o_id;
429         *ea = lsm;
430
431         if (oti != NULL) {
432                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
433
434                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
435                         if (!oti->oti_logcookies)
436                                 oti_alloc_cookies(oti, 1);
437                         *oti->oti_logcookies = oa->o_lcookie;
438                 }
439         }
440
441         CDEBUG(D_HA, "transno: "LPD64"\n",
442                lustre_msg_get_transno(req->rq_repmsg));
443 out_req:
444         ptlrpc_req_finished(req);
445 out:
446         if (rc && !*ea)
447                 obd_free_memmd(exp, &lsm);
448         RETURN(rc);
449 }
450
451 static int osc_punch_interpret(struct ptlrpc_request *req,
452                                struct osc_async_args *aa, int rc)
453 {
454         struct ost_body *body;
455         ENTRY;
456
457         if (rc != 0)
458                 GOTO(out, rc);
459
460         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
461                                   lustre_swab_ost_body);
462         if (body == NULL) {
463                 CERROR ("can't unpack ost_body\n");
464                 GOTO(out, rc = -EPROTO);
465         }
466
467         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
468 out:
469         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
470         RETURN(rc);
471 }
472
473 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
474                      struct obd_trans_info *oti,
475                      struct ptlrpc_request_set *rqset)
476 {
477         struct ptlrpc_request *req;
478         struct osc_async_args *aa;
479         struct ost_body *body;
480         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
481         ENTRY;
482
483         if (!oinfo->oi_oa) {
484                 CERROR("oa NULL\n");
485                 RETURN(-EINVAL);
486         }
487
488         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
489                               OST_PUNCH, 2, size, NULL);
490         if (!req)
491                 RETURN(-ENOMEM);
492
493         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
494         ptlrpc_at_set_req_timeout(req);
495
496         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
497         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
498
499         /* overload the size and blocks fields in the oa with start/end */
500         body->oa.o_size = oinfo->oi_policy.l_extent.start;
501         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
502         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
503
504         ptlrpc_req_set_repsize(req, 2, size);
505
506         req->rq_interpret_reply = osc_punch_interpret;
507         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
508         aa = ptlrpc_req_async_args(req);
509         aa->aa_oi = oinfo;
510         ptlrpc_set_add_req(rqset, req);
511
512         RETURN(0);
513 }
514
515 static int osc_sync_interpret(struct ptlrpc_request *req,
516                               struct osc_async_args *aa, int rc)
517 {
518         struct ost_body *body;
519         ENTRY;
520
521         if (rc)
522                 GOTO(out, rc);
523
524         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
525                                   lustre_swab_ost_body);
526         if (body == NULL) {
527                 CERROR ("can't unpack ost_body\n");
528                 GOTO(out, rc = -EPROTO);
529         }
530
531         *aa->aa_oi->oi_oa = body->oa;
532 out:
533         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
534         RETURN(rc);
535 }
536
537 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
538                     obd_size start, obd_size end,
539                     struct ptlrpc_request_set *set)
540 {
541         struct ptlrpc_request *req;
542         struct ost_body *body;
543         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
544         struct osc_async_args *aa;
545         ENTRY;
546
547         if (!oinfo->oi_oa) {
548                 CERROR("oa NULL\n");
549                 RETURN(-EINVAL);
550         }
551
552         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
553                               OST_SYNC, 2, size, NULL);
554         if (!req)
555                 RETURN(-ENOMEM);
556
557         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
558         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
559
560         /* overload the size and blocks fields in the oa with start/end */
561         body->oa.o_size = start;
562         body->oa.o_blocks = end;
563         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
564
565         ptlrpc_req_set_repsize(req, 2, size);
566         req->rq_interpret_reply = osc_sync_interpret;
567
568         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
569         aa = ptlrpc_req_async_args(req);
570         aa->aa_oi = oinfo;
571
572         ptlrpc_set_add_req(set, req);
573         RETURN (0);
574 }
575
576 /* Find and cancel locally locks matched by @mode in the resource found by
577  * @objid. Found locks are added into @cancel list. Returns the amount of
578  * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580                                    struct list_head *cancels, ldlm_mode_t mode,
581                                    int lock_flags)
582 {
583         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584         struct ldlm_res_id res_id;
585         struct ldlm_resource *res;
586         int count;
587         ENTRY;
588
589         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
590         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
591         if (res == NULL)
592                 RETURN(0);
593
594         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
595                                            lock_flags, 0, NULL);
596         ldlm_resource_putref(res);
597         RETURN(count);
598 }
599
600 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
601                                  int rc)
602 {
603         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
604
605         atomic_dec(&cli->cl_destroy_in_flight);
606         cfs_waitq_signal(&cli->cl_destroy_waitq);
607         return 0;
608 }
609
610 static int osc_can_send_destroy(struct client_obd *cli)
611 {
612         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
613             cli->cl_max_rpcs_in_flight) {
614                 /* The destroy request can be sent */
615                 return 1;
616         }
617         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
618             cli->cl_max_rpcs_in_flight) {
619                 /*
620                  * The counter has been modified between the two atomic
621                  * operations.
622                  */
623                 cfs_waitq_signal(&cli->cl_destroy_waitq);
624         }
625         return 0;
626 }
627
628 /* Destroy requests can be async always on the client, and we don't even really
629  * care about the return code since the client cannot do anything at all about
630  * a destroy failure.
631  * When the MDS is unlinking a filename, it saves the file objects into a
632  * recovery llog, and these object records are cancelled when the OST reports
633  * they were destroyed and sync'd to disk (i.e. transaction committed).
634  * If the client dies, or the OST is down when the object should be destroyed,
635  * the records are not cancelled, and when the OST reconnects to the MDS next,
636  * it will retrieve the llog unlink logs and then sends the log cancellation
637  * cookies to the MDS after committing destroy transactions. */
638 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
639                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
640                        struct obd_export *md_export)
641 {
642         CFS_LIST_HEAD(cancels);
643         struct ptlrpc_request *req;
644         struct ost_body *body;
645         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
646                         sizeof(struct ldlm_request) };
647         int count, bufcount = 2;
648         struct client_obd *cli = &exp->exp_obd->u.cli;
649         ENTRY;
650
651         if (!oa) {
652                 CERROR("oa NULL\n");
653                 RETURN(-EINVAL);
654         }
655
656         LASSERT(oa->o_id != 0);
657
658         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
659                                         LDLM_FL_DISCARD_DATA);
660         if (exp_connect_cancelset(exp))
661                 bufcount = 3;
662         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
663                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
664         if (!req)
665                 RETURN(-ENOMEM);
666
667         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
668         ptlrpc_at_set_req_timeout(req);
669
670         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
671
672         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
673                 oa->o_lcookie = *oti->oti_logcookies;
674         }
675
676         lustre_set_wire_obdo(&body->oa, oa);
677         ptlrpc_req_set_repsize(req, 2, size);
678
679         /* don't throttle destroy RPCs for the MDT */
680         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
681                 req->rq_interpret_reply = osc_destroy_interpret;
682                 if (!osc_can_send_destroy(cli)) {
683                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
684                                                           NULL);
685
686                         /*
687                          * Wait until the number of on-going destroy RPCs drops
688                          * under max_rpc_in_flight
689                          */
690                         l_wait_event_exclusive(cli->cl_destroy_waitq,
691                                                osc_can_send_destroy(cli), &lwi);
692                 }
693         }
694
695         /* Do not wait for response */
696         ptlrpcd_add_req(req);
697         RETURN(0);
698 }
699
700 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
701                                 long writing_bytes)
702 {
703         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
704
705         LASSERT(!(oa->o_valid & bits));
706
707         oa->o_valid |= bits;
708         client_obd_list_lock(&cli->cl_loi_list_lock);
709         oa->o_dirty = cli->cl_dirty;
710         if (cli->cl_dirty > cli->cl_dirty_max) {
711                 CERROR("dirty %lu > dirty_max %lu\n",
712                        cli->cl_dirty, cli->cl_dirty_max);
713                 oa->o_undirty = 0;
714         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
715                 CERROR("dirty %d > system dirty_max %d\n",
716                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
717                 oa->o_undirty = 0;
718         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
719                 CERROR("dirty %lu - dirty_max %lu too big???\n",
720                        cli->cl_dirty, cli->cl_dirty_max);
721                 oa->o_undirty = 0;
722         } else {
723                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
724                                 (cli->cl_max_rpcs_in_flight + 1);
725                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
726         }
727         oa->o_grant = cli->cl_avail_grant;
728         oa->o_dropped = cli->cl_lost_grant;
729         cli->cl_lost_grant = 0;
730         client_obd_list_unlock(&cli->cl_loi_list_lock);
731         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
732                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
733
734 }
735
736 static void osc_update_next_shrink(struct client_obd *cli)
737 {
738         cli->cl_next_shrink_grant =
739                 cfs_time_shift(cli->cl_grant_shrink_interval);
740         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
741                cli->cl_next_shrink_grant);
742 }
743
744 /* caller must hold loi_list_lock */
745 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
746 {
747         atomic_inc(&obd_dirty_pages);
748         cli->cl_dirty += CFS_PAGE_SIZE;
749         cli->cl_avail_grant -= CFS_PAGE_SIZE;
750         pga->flag |= OBD_BRW_FROM_GRANT;
751         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
752                CFS_PAGE_SIZE, pga, pga->pg);
753         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
754                  cli->cl_avail_grant);
755         osc_update_next_shrink(cli);
756 }
757
758 /* the companion to osc_consume_write_grant, called when a brw has completed.
759  * must be called with the loi lock held. */
760 static void osc_release_write_grant(struct client_obd *cli,
761                                     struct brw_page *pga, int sent)
762 {
763         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
764         ENTRY;
765
766         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
767                 EXIT;
768                 return;
769         }
770
771         pga->flag &= ~OBD_BRW_FROM_GRANT;
772         atomic_dec(&obd_dirty_pages);
773         cli->cl_dirty -= CFS_PAGE_SIZE;
774         if (!sent) {
775                 cli->cl_lost_grant += CFS_PAGE_SIZE;
776                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
777                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
778         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
779                 /* For short writes we shouldn't count parts of pages that
780                  * span a whole block on the OST side, or our accounting goes
781                  * wrong.  Should match the code in filter_grant_check. */
782                 int offset = pga->off & ~CFS_PAGE_MASK;
783                 int count = pga->count + (offset & (blocksize - 1));
784                 int end = (offset + pga->count) & (blocksize - 1);
785                 if (end)
786                         count += blocksize - end;
787
788                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
789                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
790                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
791                        cli->cl_avail_grant, cli->cl_dirty);
792         }
793
794         EXIT;
795 }
796
797 static unsigned long rpcs_in_flight(struct client_obd *cli)
798 {
799         return cli->cl_r_in_flight + cli->cl_w_in_flight;
800 }
801
802 /* caller must hold loi_list_lock */
803 void osc_wake_cache_waiters(struct client_obd *cli)
804 {
805         struct list_head *l, *tmp;
806         struct osc_cache_waiter *ocw;
807
808         ENTRY;
809         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
810                 /* if we can't dirty more, we must wait until some is written */
811                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
812                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
813                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
814                                "osc max %ld, sys max %d\n", cli->cl_dirty,
815                                cli->cl_dirty_max, obd_max_dirty_pages);
816                         return;
817                 }
818
819                 /* if still dirty cache but no grant wait for pending RPCs that
820                  * may yet return us some grant before doing sync writes */
821                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
822                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
823                                cli->cl_w_in_flight);
824                         return;
825                 }
826
827                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
828                 list_del_init(&ocw->ocw_entry);
829                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
830                         /* no more RPCs in flight to return grant, do sync IO */
831                         ocw->ocw_rc = -EDQUOT;
832                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
833                 } else {
834                         osc_consume_write_grant(cli,
835                                                 &ocw->ocw_oap->oap_brw_page);
836                 }
837
838                 cfs_waitq_signal(&ocw->ocw_waitq);
839         }
840
841         EXIT;
842 }
843
844 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
845 {
846         client_obd_list_lock(&cli->cl_loi_list_lock);
847         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
848         if (body->oa.o_valid & OBD_MD_FLGRANT)
849                 cli->cl_avail_grant += body->oa.o_grant;
850         /* waiters are woken in brw_interpret */
851         client_obd_list_unlock(&cli->cl_loi_list_lock);
852 }
853
854 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
855                               void *key, obd_count vallen, void *val,
856                               struct ptlrpc_request_set *set);
857
858 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
859                                       struct osc_grant_args *aa, int rc)
860 {
861         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
862         struct obdo *oa = aa->aa_oa;
863         struct ost_body *body;
864
865         if (rc != 0) {
866                 client_obd_list_lock(&cli->cl_loi_list_lock);
867                 cli->cl_avail_grant += oa->o_grant;
868                 client_obd_list_unlock(&cli->cl_loi_list_lock);
869                 GOTO(out, rc);
870         }
871         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
872                                 lustre_swab_ost_body);
873         osc_update_grant(cli, body);
874 out:
875         OBD_FREE_PTR(oa);
876         return rc;
877 }
878
879 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
880 {
881         client_obd_list_lock(&cli->cl_loi_list_lock);
882         oa->o_grant = cli->cl_avail_grant / 4;
883         cli->cl_avail_grant -= oa->o_grant;
884         client_obd_list_unlock(&cli->cl_loi_list_lock);
885         oa->o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887 }
888
889 /* Shrink the current grant, either from some large amount to enough for a
890  * full set of in-flight RPCs, or if we have already shrunk to that limit
891  * then to enough for a single RPC.  This avoids keeping more grant than
892  * needed, and avoids shrinking the grant piecemeal. */
893 static int osc_shrink_grant(struct client_obd *cli)
894 {
895         long target = (cli->cl_max_rpcs_in_flight + 1) *
896                       cli->cl_max_pages_per_rpc;
897
898         client_obd_list_lock(&cli->cl_loi_list_lock);
899         if (cli->cl_avail_grant <= target)
900                 target = cli->cl_max_pages_per_rpc;
901         client_obd_list_unlock(&cli->cl_loi_list_lock);
902
903         return osc_shrink_grant_to_target(cli, target);
904 }
905
906 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
907 {
908         int    rc = 0;
909         struct ost_body     *body;
910         ENTRY;
911
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         /* Don't shrink if we are already above or below the desired limit
914          * We don't want to shrink below a single RPC, as that will negatively
915          * impact block allocation and long-term performance. */
916         if (target < cli->cl_max_pages_per_rpc)
917                 target = cli->cl_max_pages_per_rpc;
918
919         if (target >= cli->cl_avail_grant) {
920                 client_obd_list_unlock(&cli->cl_loi_list_lock);
921                 RETURN(0);
922         }
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924
925         OBD_ALLOC_PTR(body);
926         if (!body)
927                 RETURN(-ENOMEM);
928
929         osc_announce_cached(cli, &body->oa, 0);
930
931         client_obd_list_lock(&cli->cl_loi_list_lock);
932         body->oa.o_grant = cli->cl_avail_grant - target;
933         cli->cl_avail_grant = target;
934         client_obd_list_unlock(&cli->cl_loi_list_lock);
935         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
936         osc_update_next_shrink(cli);
937
938         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
939                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
940                                 sizeof(*body), body, NULL);
941         if (rc) {
942                 client_obd_list_lock(&cli->cl_loi_list_lock);
943                 cli->cl_avail_grant += body->oa.o_grant;
944                 client_obd_list_unlock(&cli->cl_loi_list_lock);
945         }
946         OBD_FREE_PTR(body);
947         RETURN(rc);
948 }
949
950 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
951 static int osc_should_shrink_grant(struct client_obd *client)
952 {
953         cfs_time_t time = cfs_time_current();
954         cfs_time_t next_shrink = client->cl_next_shrink_grant;
955         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
956                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
957                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
958                         return 1;
959                 else
960                         osc_update_next_shrink(client);
961         }
962         return 0;
963 }
964
965 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
966 {
967         struct client_obd *client;
968
969         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
970                 if (osc_should_shrink_grant(client))
971                         osc_shrink_grant(client);
972         }
973         return 0;
974 }
975
976 static int osc_add_shrink_grant(struct client_obd *client)
977 {
978         int rc;
979
980         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
981                                        TIMEOUT_GRANT,
982                                        osc_grant_shrink_grant_cb, NULL,
983                                        &client->cl_grant_shrink_list);
984         if (rc) {
985                 CERROR("add grant client %s error %d\n",
986                         client->cl_import->imp_obd->obd_name, rc);
987                 return rc;
988         }
989         CDEBUG(D_CACHE, "add grant client %s \n",
990                client->cl_import->imp_obd->obd_name);
991         osc_update_next_shrink(client);
992         return 0;
993 }
994
995 static int osc_del_shrink_grant(struct client_obd *client)
996 {
997         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
998                                          TIMEOUT_GRANT);
999 }
1000
1001 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1002 {
1003         client_obd_list_lock(&cli->cl_loi_list_lock);
1004         cli->cl_avail_grant = ocd->ocd_grant;
1005         client_obd_list_unlock(&cli->cl_loi_list_lock);
1006
1007         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1008             list_empty(&cli->cl_grant_shrink_list))
1009                 osc_add_shrink_grant(cli);
1010
1011         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1012                cli->cl_avail_grant, cli->cl_lost_grant);
1013         LASSERT(cli->cl_avail_grant >= 0);
1014 }
1015
1016 /* We assume that the reason this OSC got a short read is because it read
1017  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1018  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1019  * this stripe never got written at or beyond this stripe offset yet. */
1020 static void handle_short_read(int nob_read, obd_count page_count,
1021                               struct brw_page **pga)
1022 {
1023         char *ptr;
1024         int i = 0;
1025
1026         /* skip bytes read OK */
1027         while (nob_read > 0) {
1028                 LASSERT (page_count > 0);
1029
1030                 if (pga[i]->count > nob_read) {
1031                         /* EOF inside this page */
1032                         ptr = cfs_kmap(pga[i]->pg) +
1033                                 (pga[i]->off & ~CFS_PAGE_MASK);
1034                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1035                         cfs_kunmap(pga[i]->pg);
1036                         page_count--;
1037                         i++;
1038                         break;
1039                 }
1040
1041                 nob_read -= pga[i]->count;
1042                 page_count--;
1043                 i++;
1044         }
1045
1046         /* zero remaining pages */
1047         while (page_count-- > 0) {
1048                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1049                 memset(ptr, 0, pga[i]->count);
1050                 cfs_kunmap(pga[i]->pg);
1051                 i++;
1052         }
1053 }
1054
1055 static int check_write_rcs(struct ptlrpc_request *req,
1056                            int requested_nob, int niocount,
1057                            obd_count page_count, struct brw_page **pga)
1058 {
1059         int    *remote_rcs, i;
1060
1061         /* return error if any niobuf was in error */
1062         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1063                                         sizeof(*remote_rcs) * niocount, NULL);
1064         if (remote_rcs == NULL) {
1065                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1066                 return(-EPROTO);
1067         }
1068         if (lustre_rep_need_swab(req))
1069                 for (i = 0; i < niocount; i++)
1070                         __swab32s(&remote_rcs[i]);
1071
1072         for (i = 0; i < niocount; i++) {
1073                 if (remote_rcs[i] < 0)
1074                         return(remote_rcs[i]);
1075
1076                 if (remote_rcs[i] != 0) {
1077                         CERROR("rc[%d] invalid (%d) req %p\n",
1078                                 i, remote_rcs[i], req);
1079                         return(-EPROTO);
1080                 }
1081         }
1082
1083         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1084                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1085                        req->rq_bulk->bd_nob_transferred, requested_nob);
1086                 return(-EPROTO);
1087         }
1088
1089         return (0);
1090 }
1091
1092 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1093 {
1094         if (p1->flag != p2->flag) {
1095                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1096
1097                 /* warn if we try to combine flags that we don't know to be
1098                  * safe to combine */
1099                 if ((p1->flag & mask) != (p2->flag & mask))
1100                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1101                                "same brw?\n", p1->flag, p2->flag);
1102                 return 0;
1103         }
1104
1105         return (p1->off + p1->count == p2->off);
1106 }
1107
1108 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1109                                    struct brw_page **pga, int opc,
1110                                    cksum_type_t cksum_type, int pshift)
1111 {
1112         __u32 cksum;
1113         int i = 0;
1114
1115         LASSERT (pg_count > 0);
1116         cksum = init_checksum(cksum_type);
1117         while (nob > 0 && pg_count > 0) {
1118                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1119                 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1120                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1121
1122                 /* corrupt the data before we compute the checksum, to
1123                  * simulate an OST->client data error */
1124                 if (i == 0 && opc == OST_READ &&
1125                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1126                         memcpy(ptr + off, "bad1", min(4, nob));
1127                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1128                 cfs_kunmap(pga[i]->pg);
1129                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1130                                off, cksum);
1131
1132                 nob -= pga[i]->count;
1133                 pg_count--;
1134                 i++;
1135         }
1136         /* For sending we only compute the wrong checksum instead
1137          * of corrupting the data so it is still correct on a redo */
1138         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1139                 cksum++;
1140
1141         return cksum;
1142 }
1143
1144 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1145                                 struct lov_stripe_md *lsm, obd_count page_count,
1146                                 struct brw_page **pga,
1147                                 struct ptlrpc_request **reqp, int pshift)
1148 {
1149         struct ptlrpc_request   *req;
1150         struct ptlrpc_bulk_desc *desc;
1151         struct ost_body         *body;
1152         struct obd_ioobj        *ioobj;
1153         struct niobuf_remote    *niobuf;
1154         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1155         int niocount, i, requested_nob, opc, rc;
1156         struct ptlrpc_request_pool *pool;
1157         struct osc_brw_async_args *aa;
1158         struct brw_page *pg_prev;
1159
1160         ENTRY;
1161         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1162         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1163
1164         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1165         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1166
1167         for (niocount = i = 1; i < page_count; i++) {
1168                 if (!can_merge_pages(pga[i - 1], pga[i]))
1169                         niocount++;
1170         }
1171
1172         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1173         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1174
1175         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1176                                    NULL, pool);
1177         if (req == NULL)
1178                 RETURN (-ENOMEM);
1179
1180         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1181         ptlrpc_at_set_req_timeout(req);
1182
1183         if (opc == OST_WRITE)
1184                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1185                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1186         else
1187                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1188                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1189         if (desc == NULL)
1190                 GOTO(out, rc = -ENOMEM);
1191         /* NB request now owns desc and will free it when it gets freed */
1192
1193         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1194         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1195         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1196                                 niocount * sizeof(*niobuf));
1197
1198         lustre_set_wire_obdo(&body->oa, oa);
1199         obdo_to_ioobj(oa, ioobj);
1200         ioobj->ioo_bufcnt = niocount;
1201
1202         LASSERT (page_count > 0);
1203         pg_prev = pga[0];
1204         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1205                 struct brw_page *pg = pga[i];
1206
1207                 LASSERT(pg->count > 0);
1208                 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1209                          pg->count <= CFS_PAGE_SIZE,
1210                          "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1211                          i, pg, pg->off, pg->count, pshift);
1212 #ifdef __linux__
1213                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1214                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1215                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1216                          i, page_count,
1217                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1218                          pg_prev->pg, page_private(pg_prev->pg),
1219                          pg_prev->pg->index, pg_prev->off);
1220 #else
1221                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1222                          "i %d p_c %u\n", i, page_count);
1223 #endif
1224                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1225                         (pg->flag & OBD_BRW_SRVLOCK));
1226
1227                 ptlrpc_prep_bulk_page(desc, pg->pg,
1228                                       OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1229                                       pg->count);
1230                 requested_nob += pg->count;
1231
1232                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1233                         niobuf--;
1234                         niobuf->len += pg->count;
1235                 } else {
1236                         niobuf->offset = pg->off;
1237                         niobuf->len    = pg->count;
1238                         niobuf->flags  = pg->flag;
1239                 }
1240                 pg_prev = pg;
1241         }
1242
1243         LASSERTF((void *)(niobuf - niocount) ==
1244                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1245                                niocount * sizeof(*niobuf)),
1246                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1247                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1248                 (void *)(niobuf - niocount));
1249
1250         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1251         if (osc_should_shrink_grant(cli))
1252                 osc_shrink_grant_local(cli, &body->oa);
1253
1254         /* size[REQ_REC_OFF] still sizeof (*body) */
1255         if (opc == OST_WRITE) {
1256                 if (cli->cl_checksum) {
1257                         /* store cl_cksum_type in a local variable since
1258                          * it can be changed via lprocfs */
1259                         cksum_type_t cksum_type = cli->cl_cksum_type;
1260
1261                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1262                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1263                                 body->oa.o_flags = 0;
1264                         }
1265                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1266                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1267                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1268                                                              page_count, pga,
1269                                                              OST_WRITE,
1270                                                              cksum_type, pshift);
1271                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1272                                body->oa.o_cksum);
1273                         /* save this in 'oa', too, for later checking */
1274                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1275                         oa->o_flags |= cksum_type_pack(cksum_type);
1276                 } else {
1277                         /* clear out the checksum flag, in case this is a
1278                          * resend but cl_checksum is no longer set. b=11238 */
1279                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1280                 }
1281                 oa->o_cksum = body->oa.o_cksum;
1282                 /* 1 RC per niobuf */
1283                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1284                 ptlrpc_req_set_repsize(req, 3, size);
1285         } else {
1286                 if (cli->cl_checksum) {
1287                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1288                                 body->oa.o_flags = 0;
1289                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1290                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1291                 }
1292                 /* 1 RC for the whole I/O */
1293                 ptlrpc_req_set_repsize(req, 2, size);
1294         }
1295
1296         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1297         aa = ptlrpc_req_async_args(req);
1298         aa->aa_oa = oa;
1299         aa->aa_requested_nob = requested_nob;
1300         aa->aa_nio_count = niocount;
1301         aa->aa_page_count = page_count;
1302         aa->aa_resends = 0;
1303         aa->aa_ppga = pga;
1304         aa->aa_cli = cli;
1305         aa->aa_pshift = pshift;
1306         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1307
1308         *reqp = req;
1309         RETURN (0);
1310
1311  out:
1312         ptlrpc_req_finished (req);
1313         RETURN (rc);
1314 }
1315
1316 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1317                                 __u32 client_cksum, __u32 server_cksum, int nob,
1318                                 obd_count page_count, struct brw_page **pga,
1319                                 cksum_type_t client_cksum_type, int pshift)
1320 {
1321         __u32 new_cksum;
1322         char *msg;
1323         cksum_type_t cksum_type;
1324
1325         if (server_cksum == client_cksum) {
1326                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1327                 return 0;
1328         }
1329
1330         if (oa->o_valid & OBD_MD_FLFLAGS)
1331                 cksum_type = cksum_type_unpack(oa->o_flags);
1332         else
1333                 cksum_type = OBD_CKSUM_CRC32;
1334
1335         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1336                                       cksum_type, pshift);
1337
1338         if (cksum_type != client_cksum_type)
1339                 msg = "the server did not use the checksum type specified in "
1340                       "the original request - likely a protocol problem";
1341         else if (new_cksum == server_cksum)
1342                 msg = "changed on the client after we checksummed it - "
1343                       "likely false positive due to mmap IO (bug 11742)";
1344         else if (new_cksum == client_cksum)
1345                 msg = "changed in transit before arrival at OST";
1346         else
1347                 msg = "changed in transit AND doesn't match the original - "
1348                       "likely false positive due to mmap IO (bug 11742)";
1349
1350         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1351                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1352                            "["LPU64"-"LPU64"]\n",
1353                            msg, libcfs_nid2str(peer->nid),
1354                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1355                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1356                                                         (__u64)0,
1357                            oa->o_id,
1358                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1359                            pga[0]->off,
1360                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1361         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1362                "client csum now %x\n", client_cksum, client_cksum_type,
1363                server_cksum, cksum_type, new_cksum);
1364
1365         return 1;
1366 }
1367
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1370 {
1371         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1372         const lnet_process_id_t *peer =
1373                         &req->rq_import->imp_connection->c_peer;
1374         struct client_obd *cli = aa->aa_cli;
1375         struct ost_body *body;
1376         __u32 client_cksum = 0;
1377         ENTRY;
1378
1379         if (rc < 0 && rc != -EDQUOT)
1380                 RETURN(rc);
1381
1382         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1383         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1384                                   lustre_swab_ost_body);
1385         if (body == NULL) {
1386                 CERROR ("Can't unpack body\n");
1387                 RETURN(-EPROTO);
1388         }
1389
1390         /* set/clear over quota flag for a uid/gid */
1391         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1392             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1393                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1394                              body->oa.o_gid, body->oa.o_valid,
1395                              body->oa.o_flags);
1396
1397         if (rc < 0)
1398                 RETURN(rc);
1399
1400         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1401                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1402
1403         osc_update_grant(cli, body);
1404
1405         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1406                 if (rc > 0) {
1407                         CERROR ("Unexpected +ve rc %d\n", rc);
1408                         RETURN(-EPROTO);
1409                 }
1410                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1411
1412                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1413                     check_write_checksum(&body->oa, peer, client_cksum,
1414                                          body->oa.o_cksum, aa->aa_requested_nob,
1415                                          aa->aa_page_count, aa->aa_ppga,
1416                                          cksum_type_unpack(aa->aa_oa->o_flags),
1417                                          aa->aa_pshift))
1418                         RETURN(-EAGAIN);
1419
1420                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1421                                      aa->aa_page_count, aa->aa_ppga);
1422                 GOTO(out, rc);
1423         }
1424
1425         /* The rest of this function executes only for OST_READs */
1426         if (rc > aa->aa_requested_nob) {
1427                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1428                        aa->aa_requested_nob);
1429                 RETURN(-EPROTO);
1430         }
1431
1432         if (rc != req->rq_bulk->bd_nob_transferred) {
1433                 CERROR ("Unexpected rc %d (%d transferred)\n",
1434                         rc, req->rq_bulk->bd_nob_transferred);
1435                 return (-EPROTO);
1436         }
1437
1438         if (rc < aa->aa_requested_nob)
1439                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1440
1441         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1442                 static int cksum_counter;
1443                 __u32      server_cksum = body->oa.o_cksum;
1444                 char      *via;
1445                 char      *router;
1446                 cksum_type_t cksum_type;
1447
1448                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1449                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1450                 else
1451                         cksum_type = OBD_CKSUM_CRC32;
1452                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1453                                                  aa->aa_ppga, OST_READ,
1454                                                  cksum_type, aa->aa_pshift);
1455
1456                 if (peer->nid == req->rq_bulk->bd_sender) {
1457                         via = router = "";
1458                 } else {
1459                         via = " via ";
1460                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1461                 }
1462
1463                 if (server_cksum == ~0 && rc > 0) {
1464                         CERROR("Protocol error: server %s set the 'checksum' "
1465                                "bit, but didn't send a checksum.  Not fatal, "
1466                                "but please notify on http://bugzilla.lustre.org/\n",
1467                                libcfs_nid2str(peer->nid));
1468                 } else if (server_cksum != client_cksum) {
1469                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1470                                            "%s%s%s inum "LPU64"/"LPU64" object "
1471                                            LPU64"/"LPU64" extent "
1472                                            "["LPU64"-"LPU64"]\n",
1473                                            req->rq_import->imp_obd->obd_name,
1474                                            libcfs_nid2str(peer->nid),
1475                                            via, router,
1476                                            body->oa.o_valid & OBD_MD_FLFID ?
1477                                                 body->oa.o_fid : (__u64)0,
1478                                            body->oa.o_valid & OBD_MD_FLFID ?
1479                                                 body->oa.o_generation :(__u64)0,
1480                                            body->oa.o_id,
1481                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1482                                                 body->oa.o_gr : (__u64)0,
1483                                            aa->aa_ppga[0]->off,
1484                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1485                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1486                                                                         1);
1487                         CERROR("client %x, server %x, cksum_type %x\n",
1488                                client_cksum, server_cksum, cksum_type);
1489                         cksum_counter = 0;
1490                         aa->aa_oa->o_cksum = client_cksum;
1491                         rc = -EAGAIN;
1492                 } else {
1493                         cksum_counter++;
1494                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1495                         rc = 0;
1496                 }
1497         } else if (unlikely(client_cksum)) {
1498                 static int cksum_missed;
1499
1500                 cksum_missed++;
1501                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1502                         CERROR("Checksum %u requested from %s but not sent\n",
1503                                cksum_missed, libcfs_nid2str(peer->nid));
1504         } else {
1505                 rc = 0;
1506         }
1507 out:
1508         if (rc >= 0)
1509                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1510
1511         RETURN(rc);
1512 }
1513
1514 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1515                             struct lov_stripe_md *lsm,
1516                             obd_count page_count, struct brw_page **pga)
1517 {
1518         struct ptlrpc_request *request;
1519         int                    rc;
1520         cfs_waitq_t            waitq;
1521         int                    resends = 0;
1522         struct l_wait_info     lwi;
1523
1524         ENTRY;
1525         init_waitqueue_head(&waitq);
1526
1527 restart_bulk:
1528         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1529                                   page_count, pga, &request, 0);
1530         if (rc != 0)
1531                 return (rc);
1532
1533         rc = ptlrpc_queue_wait(request);
1534
1535         if (rc == -ETIMEDOUT && request->rq_resend) {
1536                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1537                 ptlrpc_req_finished(request);
1538                 goto restart_bulk;
1539         }
1540
1541         rc = osc_brw_fini_request(request, rc);
1542
1543         ptlrpc_req_finished(request);
1544         if (osc_recoverable_error(rc)) {
1545                 resends++;
1546                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1547                         CERROR("too many resend retries, returning error\n");
1548                         RETURN(-EIO);
1549                 }
1550
1551                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1552                 l_wait_event(waitq, 0, &lwi);
1553
1554                 goto restart_bulk;
1555         }
1556         RETURN(rc);
1557 }
1558
1559 int osc_brw_redo_request(struct ptlrpc_request *request,
1560                          struct osc_brw_async_args *aa)
1561 {
1562         struct ptlrpc_request *new_req;
1563         struct ptlrpc_request_set *set = request->rq_set;
1564         struct osc_brw_async_args *new_aa;
1565         struct osc_async_page *oap;
1566         int rc = 0;
1567         ENTRY;
1568
1569         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1570                 CERROR("too many resend retries, returning error\n");
1571                 RETURN(-EIO);
1572         }
1573
1574         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1575
1576         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1577                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1578                                   aa->aa_cli, aa->aa_oa,
1579                                   NULL /* lsm unused by osc currently */,
1580                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1581                                   aa->aa_pshift);
1582         if (rc)
1583                 RETURN(rc);
1584
1585         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1586
1587         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1588                 if (oap->oap_request != NULL) {
1589                         LASSERTF(request == oap->oap_request,
1590                                  "request %p != oap_request %p\n",
1591                                  request, oap->oap_request);
1592                         if (oap->oap_interrupted) {
1593                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1594                                 ptlrpc_req_finished(new_req);
1595                                 RETURN(-EINTR);
1596                         }
1597                 }
1598         }
1599         /* New request takes over pga and oaps from old request.
1600          * Note that copying a list_head doesn't work, need to move it... */
1601         aa->aa_resends++;
1602         new_req->rq_interpret_reply = request->rq_interpret_reply;
1603         new_req->rq_async_args = request->rq_async_args;
1604         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1605
1606         new_aa = ptlrpc_req_async_args(new_req);
1607
1608         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1609         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1610         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1611
1612         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1613                 if (oap->oap_request) {
1614                         ptlrpc_req_finished(oap->oap_request);
1615                         oap->oap_request = ptlrpc_request_addref(new_req);
1616                 }
1617         }
1618
1619         /* use ptlrpc_set_add_req is safe because interpret functions work
1620          * in check_set context. only one way exist with access to request
1621          * from different thread got -EINTR - this way protected with
1622          * cl_loi_list_lock */
1623         ptlrpc_set_add_req(set, new_req);
1624
1625         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1626
1627         DEBUG_REQ(D_INFO, new_req, "new request");
1628         RETURN(0);
1629 }
1630
1631 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1632                           struct lov_stripe_md *lsm, obd_count page_count,
1633                           struct brw_page **pga, struct ptlrpc_request_set *set,
1634                           int pshift)
1635 {
1636         struct ptlrpc_request     *request;
1637         struct client_obd         *cli = &exp->exp_obd->u.cli;
1638         int                        rc, i;
1639         struct osc_brw_async_args *aa;
1640         ENTRY;
1641
1642         /* Consume write credits even if doing a sync write -
1643          * otherwise we may run out of space on OST due to grant. */
1644         /* FIXME: unaligned writes must use write grants too */
1645         if (cmd == OBD_BRW_WRITE && pshift == 0) {
1646                 client_obd_list_lock(&cli->cl_loi_list_lock);
1647                 for (i = 0; i < page_count; i++) {
1648                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1649                                 osc_consume_write_grant(cli, pga[i]);
1650                 }
1651                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1652         }
1653
1654         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1655                                   page_count, pga, &request, pshift);
1656
1657         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1658
1659         if (rc == 0) {
1660                 aa = ptlrpc_req_async_args(request);
1661                 if (cmd == OBD_BRW_READ) {
1662                         lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1663                         lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1664                 } else {
1665                         lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1666                         lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1667                                          cli->cl_w_in_flight);
1668                 }
1669                 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1670
1671                 LASSERT(list_empty(&aa->aa_oaps));
1672
1673                 request->rq_interpret_reply = brw_interpret;
1674                 ptlrpc_set_add_req(set, request);
1675                 client_obd_list_lock(&cli->cl_loi_list_lock);
1676                 if (cmd == OBD_BRW_READ)
1677                         cli->cl_r_in_flight++;
1678                 else
1679                         cli->cl_w_in_flight++;
1680                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1681                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1682         } else if (cmd == OBD_BRW_WRITE) {
1683                 client_obd_list_lock(&cli->cl_loi_list_lock);
1684                 for (i = 0; i < page_count; i++)
1685                         osc_release_write_grant(cli, pga[i], 0);
1686                 osc_wake_cache_waiters(cli);
1687                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1688         }
1689
1690         RETURN (rc);
1691 }
1692
1693 /*
1694  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1695  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1696  * fine for our small page arrays and doesn't require allocation.  its an
1697  * insertion sort that swaps elements that are strides apart, shrinking the
1698  * stride down until its '1' and the array is sorted.
1699  */
1700 static void sort_brw_pages(struct brw_page **array, int num)
1701 {
1702         int stride, i, j;
1703         struct brw_page *tmp;
1704
1705         if (num == 1)
1706                 return;
1707         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1708                 ;
1709
1710         do {
1711                 stride /= 3;
1712                 for (i = stride ; i < num ; i++) {
1713                         tmp = array[i];
1714                         j = i;
1715                         while (j >= stride && array[j-stride]->off > tmp->off) {
1716                                 array[j] = array[j - stride];
1717                                 j -= stride;
1718                         }
1719                         array[j] = tmp;
1720                 }
1721         } while (stride > 1);
1722 }
1723
1724 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1725                                         int pshift)
1726 {
1727         int count = 1;
1728         int offset;
1729         int i = 0;
1730
1731         LASSERT (pages > 0);
1732         offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1733
1734         for (;;) {
1735                 pages--;
1736                 if (pages == 0)         /* that's all */
1737                         return count;
1738
1739                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1740                         return count;   /* doesn't end on page boundary */
1741
1742                 i++;
1743                 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1744                 if (offset != 0)        /* doesn't start on page boundary */
1745                         return count;
1746
1747                 count++;
1748         }
1749 }
1750
1751 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1752 {
1753         struct brw_page **ppga;
1754         int i;
1755
1756         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1757         if (ppga == NULL)
1758                 return NULL;
1759
1760         for (i = 0; i < count; i++)
1761                 ppga[i] = pga + i;
1762         return ppga;
1763 }
1764
1765 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1766 {
1767         LASSERT(ppga != NULL);
1768         OBD_FREE(ppga, sizeof(*ppga) * count);
1769 }
1770
1771 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1772                    obd_count page_count, struct brw_page *pga,
1773                    struct obd_trans_info *oti)
1774 {
1775         struct obdo *saved_oa = NULL;
1776         struct brw_page **ppga, **orig;
1777         struct obd_import *imp = class_exp2cliimp(exp);
1778         struct client_obd *cli;
1779         int rc, page_count_orig;
1780         ENTRY;
1781
1782         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1783         cli = &imp->imp_obd->u.cli;
1784
1785         if (cmd & OBD_BRW_CHECK) {
1786                 /* The caller just wants to know if there's a chance that this
1787                  * I/O can succeed */
1788
1789                 if (imp->imp_invalid)
1790                         RETURN(-EIO);
1791                 RETURN(0);
1792         }
1793
1794         /* test_brw with a failed create can trip this, maybe others. */
1795         LASSERT(cli->cl_max_pages_per_rpc);
1796
1797         rc = 0;
1798
1799         orig = ppga = osc_build_ppga(pga, page_count);
1800         if (ppga == NULL)
1801                 RETURN(-ENOMEM);
1802         page_count_orig = page_count;
1803
1804         sort_brw_pages(ppga, page_count);
1805         while (page_count) {
1806                 obd_count pages_per_brw;
1807
1808                 if (page_count > cli->cl_max_pages_per_rpc)
1809                         pages_per_brw = cli->cl_max_pages_per_rpc;
1810                 else
1811                         pages_per_brw = page_count;
1812
1813                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1814
1815                 if (saved_oa != NULL) {
1816                         /* restore previously saved oa */
1817                         *oinfo->oi_oa = *saved_oa;
1818                 } else if (page_count > pages_per_brw) {
1819                         /* save a copy of oa (brw will clobber it) */
1820                         OBDO_ALLOC(saved_oa);
1821                         if (saved_oa == NULL)
1822                                 GOTO(out, rc = -ENOMEM);
1823                         *saved_oa = *oinfo->oi_oa;
1824                 }
1825
1826                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1827                                       pages_per_brw, ppga);
1828
1829                 if (rc != 0)
1830                         break;
1831
1832                 page_count -= pages_per_brw;
1833                 ppga += pages_per_brw;
1834         }
1835
1836 out:
1837         osc_release_ppga(orig, page_count_orig);
1838
1839         if (saved_oa != NULL)
1840                 OBDO_FREE(saved_oa);
1841
1842         RETURN(rc);
1843 }
1844
1845 static int osc_brw_async(int cmd, struct obd_export *exp,
1846                          struct obd_info *oinfo, obd_count page_count,
1847                          struct brw_page *pga, struct obd_trans_info *oti,
1848                          struct ptlrpc_request_set *set, int pshift)
1849 {
1850         struct brw_page **ppga, **orig;
1851         int page_count_orig;
1852         int rc = 0;
1853         ENTRY;
1854
1855         if (cmd & OBD_BRW_CHECK) {
1856                 /* The caller just wants to know if there's a chance that this
1857                  * I/O can succeed */
1858                 struct obd_import *imp = class_exp2cliimp(exp);
1859
1860                 if (imp == NULL || imp->imp_invalid)
1861                         RETURN(-EIO);
1862                 RETURN(0);
1863         }
1864
1865         orig = ppga = osc_build_ppga(pga, page_count);
1866         if (ppga == NULL)
1867                 RETURN(-ENOMEM);
1868         page_count_orig = page_count;
1869
1870         sort_brw_pages(ppga, page_count);
1871         while (page_count) {
1872                 struct brw_page **copy;
1873                 struct obdo *oa;
1874                 obd_count pages_per_brw;
1875
1876                 /* one page less under unaligned direct i/o */
1877                 pages_per_brw = min_t(obd_count, page_count,
1878                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1879                                       !!pshift);
1880
1881                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1882                                                        pshift);
1883
1884                 /* use ppga only if single RPC is going to fly */
1885                 if (pages_per_brw != page_count_orig || ppga != orig) {
1886                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1887                         if (copy == NULL)
1888                                 GOTO(out, rc = -ENOMEM);
1889                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1890
1891                         OBDO_ALLOC(oa);
1892                         if (oa == NULL) {
1893                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1894                                 GOTO(out, rc = -ENOMEM);
1895                         }
1896                         memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1897                         oa->o_flags |= OBD_FL_TEMPORARY;
1898                 } else {
1899                         copy = ppga;
1900                         oa = oinfo->oi_oa;
1901                         LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1902                 }
1903
1904                 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1905                                     copy, set, pshift);
1906
1907                 if (rc != 0) {
1908                         if (copy != ppga)
1909                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1910
1911                         if (oa->o_flags & OBD_FL_TEMPORARY)
1912                                 OBDO_FREE(oa);
1913                         break;
1914                 }
1915
1916                 if (copy == orig) {
1917                         /* we passed it to async_internal() which is
1918                          * now responsible for releasing memory */
1919                         orig = NULL;
1920                 }
1921
1922                 page_count -= pages_per_brw;
1923                 ppga += pages_per_brw;
1924         }
1925 out:
1926         if (orig)
1927                 osc_release_ppga(orig, page_count_orig);
1928         RETURN(rc);
1929 }
1930
1931 static void osc_check_rpcs(struct client_obd *cli);
1932
1933 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1934  * the dirty accounting.  Writeback completes or truncate happens before
1935  * writing starts.  Must be called with the loi lock held. */
1936 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1937                            int sent)
1938 {
1939         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1940 }
1941
1942 /* This maintains the lists of pending pages to read/write for a given object
1943  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1944  * to quickly find objects that are ready to send an RPC. */
1945 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1946                          int cmd)
1947 {
1948         int optimal;
1949         ENTRY;
1950
1951         if (lop->lop_num_pending == 0)
1952                 RETURN(0);
1953
1954         /* if we have an invalid import we want to drain the queued pages
1955          * by forcing them through rpcs that immediately fail and complete
1956          * the pages.  recovery relies on this to empty the queued pages
1957          * before canceling the locks and evicting down the llite pages */
1958         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1959                 RETURN(1);
1960
1961         /* stream rpcs in queue order as long as as there is an urgent page
1962          * queued.  this is our cheap solution for good batching in the case
1963          * where writepage marks some random page in the middle of the file
1964          * as urgent because of, say, memory pressure */
1965         if (!list_empty(&lop->lop_urgent)) {
1966                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1967                 RETURN(1);
1968         }
1969
1970         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1971         optimal = cli->cl_max_pages_per_rpc;
1972         if (cmd & OBD_BRW_WRITE) {
1973                 /* trigger a write rpc stream as long as there are dirtiers
1974                  * waiting for space.  as they're waiting, they're not going to
1975                  * create more pages to coallesce with what's waiting.. */
1976                 if (!list_empty(&cli->cl_cache_waiters)) {
1977                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1978                         RETURN(1);
1979                 }
1980
1981                 /* +16 to avoid triggering rpcs that would want to include pages
1982                  * that are being queued but which can't be made ready until
1983                  * the queuer finishes with the page. this is a wart for
1984                  * llite::commit_write() */
1985                 optimal += 16;
1986         }
1987         if (lop->lop_num_pending >= optimal)
1988                 RETURN(1);
1989
1990         RETURN(0);
1991 }
1992
1993 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1994 {
1995         struct osc_async_page *oap;
1996         ENTRY;
1997
1998         if (list_empty(&lop->lop_urgent))
1999                 RETURN(0);
2000
2001         oap = list_entry(lop->lop_urgent.next,
2002                          struct osc_async_page, oap_urgent_item);
2003
2004         if (oap->oap_async_flags & ASYNC_HP) {
2005                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2006                 RETURN(1);
2007         }
2008
2009         RETURN(0);
2010 }
2011
2012 static void on_list(struct list_head *item, struct list_head *list,
2013                     int should_be_on)
2014 {
2015         if (list_empty(item) && should_be_on)
2016                 list_add_tail(item, list);
2017         else if (!list_empty(item) && !should_be_on)
2018                 list_del_init(item);
2019 }
2020
2021 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2022  * can find pages to build into rpcs quickly */
2023 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2024 {
2025         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2026             lop_makes_hprpc(&loi->loi_read_lop)) {
2027                 /* HP rpc */
2028                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2029                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2030         } else {
2031                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2032                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2033                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2034                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2035         }
2036
2037         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2038                 loi->loi_write_lop.lop_num_pending);
2039
2040         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2041                 loi->loi_read_lop.lop_num_pending);
2042 }
2043
2044 static void lop_update_pending(struct client_obd *cli,
2045                                struct loi_oap_pages *lop, int cmd, int delta)
2046 {
2047         lop->lop_num_pending += delta;
2048         if (cmd & OBD_BRW_WRITE)
2049                 cli->cl_pending_w_pages += delta;
2050         else
2051                 cli->cl_pending_r_pages += delta;
2052 }
2053
2054 /* this is called when a sync waiter receives an interruption.  Its job is to
2055  * get the caller woken as soon as possible.  If its page hasn't been put in an
2056  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2057  * desiring interruption which will forcefully complete the rpc once the rpc
2058  * has timed out */
2059 static void osc_occ_interrupted(struct oig_callback_context *occ)
2060 {
2061         struct osc_async_page *oap;
2062         struct loi_oap_pages *lop;
2063         struct lov_oinfo *loi;
2064         ENTRY;
2065
2066         /* XXX member_of() */
2067         oap = list_entry(occ, struct osc_async_page, oap_occ);
2068
2069         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2070
2071         oap->oap_interrupted = 1;
2072
2073         /* ok, it's been put in an rpc. only one oap gets a request reference */
2074         if (oap->oap_request != NULL) {
2075                 ptlrpc_mark_interrupted(oap->oap_request);
2076                 ptlrpcd_wake(oap->oap_request);
2077                 GOTO(unlock, 0);
2078         }
2079
2080         /* we don't get interruption callbacks until osc_trigger_group_io()
2081          * has been called and put the sync oaps in the pending/urgent lists.*/
2082         if (!list_empty(&oap->oap_pending_item)) {
2083                 list_del_init(&oap->oap_pending_item);
2084                 list_del_init(&oap->oap_urgent_item);
2085
2086                 loi = oap->oap_loi;
2087                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2088                         &loi->loi_write_lop : &loi->loi_read_lop;
2089                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2090                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2091
2092                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2093                 oap->oap_oig = NULL;
2094         }
2095
2096 unlock:
2097         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2098 }
2099
2100 /* this is trying to propogate async writeback errors back up to the
2101  * application.  As an async write fails we record the error code for later if
2102  * the app does an fsync.  As long as errors persist we force future rpcs to be
2103  * sync so that the app can get a sync error and break the cycle of queueing
2104  * pages for which writeback will fail. */
2105 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2106                            int rc)
2107 {
2108         if (rc) {
2109                 if (!ar->ar_rc)
2110                         ar->ar_rc = rc;
2111
2112                 ar->ar_force_sync = 1;
2113                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2114                 return;
2115
2116         }
2117
2118         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2119                 ar->ar_force_sync = 0;
2120 }
2121
2122 static void osc_oap_to_pending(struct osc_async_page *oap)
2123 {
2124         struct loi_oap_pages *lop;
2125
2126         if (oap->oap_cmd & OBD_BRW_WRITE)
2127                 lop = &oap->oap_loi->loi_write_lop;
2128         else
2129                 lop = &oap->oap_loi->loi_read_lop;
2130
2131         if (oap->oap_async_flags & ASYNC_HP)
2132                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2133         else if (oap->oap_async_flags & ASYNC_URGENT)
2134                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2135         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2136         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2137 }
2138
2139 /* this must be called holding the loi list lock to give coverage to exit_cache,
2140  * async_flag maintenance, and oap_request */
2141 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2142                               struct osc_async_page *oap, int sent, int rc)
2143 {
2144         __u64 xid = 0;
2145
2146         ENTRY;
2147         if (oap->oap_request != NULL) {
2148                 xid = ptlrpc_req_xid(oap->oap_request);
2149                 ptlrpc_req_finished(oap->oap_request);
2150                 oap->oap_request = NULL;
2151         }
2152
2153         spin_lock(&oap->oap_lock);
2154         oap->oap_async_flags = 0;
2155         spin_unlock(&oap->oap_lock);
2156         oap->oap_interrupted = 0;
2157
2158         if (oap->oap_cmd & OBD_BRW_WRITE) {
2159                 osc_process_ar(&cli->cl_ar, xid, rc);
2160                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2161         }
2162
2163         if (rc == 0 && oa != NULL) {
2164                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2165                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2166                 if (oa->o_valid & OBD_MD_FLMTIME)
2167                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2168                 if (oa->o_valid & OBD_MD_FLATIME)
2169                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2170                 if (oa->o_valid & OBD_MD_FLCTIME)
2171                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2172         }
2173
2174         if (oap->oap_oig) {
2175                 osc_exit_cache(cli, oap, sent);
2176                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2177                 oap->oap_oig = NULL;
2178                 EXIT;
2179                 return;
2180         }
2181
2182         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2183                                                 oap->oap_cmd, oa, rc);
2184
2185         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2186          * I/O on the page could start, but OSC calls it under lock
2187          * and thus we can add oap back to pending safely */
2188         if (rc)
2189                 /* upper layer wants to leave the page on pending queue */
2190                 osc_oap_to_pending(oap);
2191         else
2192                 osc_exit_cache(cli, oap, sent);
2193         EXIT;
2194 }
2195
2196 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2197 {
2198         struct osc_brw_async_args *aa = data;
2199         struct client_obd *cli;
2200         ENTRY;
2201
2202         rc = osc_brw_fini_request(request, rc);
2203         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2204
2205         if (osc_recoverable_error(rc)) {
2206                 rc = osc_brw_redo_request(request, aa);
2207                 if (rc == 0)
2208                         RETURN(0);
2209         }
2210
2211         cli = aa->aa_cli;
2212         client_obd_list_lock(&cli->cl_loi_list_lock);
2213         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2214          * is called so we know whether to go to sync BRWs or wait for more
2215          * RPCs to complete */
2216         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2217                 cli->cl_w_in_flight--;
2218         else
2219                 cli->cl_r_in_flight--;
2220
2221         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2222                 struct osc_async_page *oap, *tmp;
2223                 /* the caller may re-use the oap after the completion call so
2224                  * we need to clean it up a little */
2225                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2226                         list_del_init(&oap->oap_rpc_item);
2227                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2228                 }
2229                 OBDO_FREE(aa->aa_oa);
2230         } else { /* from async_internal() */
2231                 obd_count i;
2232                 for (i = 0; i < aa->aa_page_count; i++)
2233                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2234
2235                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2236                         OBDO_FREE(aa->aa_oa);
2237         }
2238         osc_wake_cache_waiters(cli);
2239         osc_check_rpcs(cli);
2240         client_obd_list_unlock(&cli->cl_loi_list_lock);
2241
2242         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2243
2244         RETURN(rc);
2245 }
2246
2247 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2248                                             struct list_head *rpc_list,
2249                                             int page_count, int cmd)
2250 {
2251         struct ptlrpc_request *req;
2252         struct brw_page **pga = NULL;
2253         struct osc_brw_async_args *aa;
2254         struct obdo *oa = NULL;
2255         struct obd_async_page_ops *ops = NULL;
2256         void *caller_data = NULL;
2257         struct osc_async_page *oap;
2258         struct ldlm_lock *lock = NULL;
2259         obd_valid valid;
2260         int i, rc;
2261
2262         ENTRY;
2263         LASSERT(!list_empty(rpc_list));
2264
2265         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2266         if (pga == NULL)
2267                 RETURN(ERR_PTR(-ENOMEM));
2268
2269         OBDO_ALLOC(oa);
2270         if (oa == NULL)
2271                 GOTO(out, req = ERR_PTR(-ENOMEM));
2272
2273         i = 0;
2274         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2275                 if (ops == NULL) {
2276                         ops = oap->oap_caller_ops;
2277                         caller_data = oap->oap_caller_data;
2278                         lock = oap->oap_ldlm_lock;
2279                 }
2280                 pga[i] = &oap->oap_brw_page;
2281                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2282                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2283                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2284                 i++;
2285         }
2286
2287         /* always get the data for the obdo for the rpc */
2288         LASSERT(ops != NULL);
2289         ops->ap_fill_obdo(caller_data, cmd, oa);
2290         if (lock) {
2291                 oa->o_handle = lock->l_remote_handle;
2292                 oa->o_valid |= OBD_MD_FLHANDLE;
2293         }
2294
2295         sort_brw_pages(pga, page_count);
2296         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2297         if (rc != 0) {
2298                 CERROR("prep_req failed: %d\n", rc);
2299                 GOTO(out, req = ERR_PTR(rc));
2300         }
2301         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2302                                                  sizeof(struct ost_body)))->oa;
2303
2304         /* Need to update the timestamps after the request is built in case
2305          * we race with setattr (locally or in queue at OST).  If OST gets
2306          * later setattr before earlier BRW (as determined by the request xid),
2307          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2308          * way to do this in a single call.  bug 10150 */
2309         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2310                 /* in case of lockless read/write do not use inode's
2311                  * timestamps because concurrent stat might fill the
2312                  * inode with out-of-date times, send current
2313                  * instead */
2314                 if (cmd & OBD_BRW_WRITE) {
2315                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2316                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2317                         valid = OBD_MD_FLATIME;
2318                 } else {
2319                         oa->o_atime = LTIME_S(CURRENT_TIME);
2320                         oa->o_valid |= OBD_MD_FLATIME;
2321                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2322                 }
2323         } else {
2324                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2325         }
2326         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2327
2328         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2329         aa = ptlrpc_req_async_args(req);
2330         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2331         list_splice(rpc_list, &aa->aa_oaps);
2332         CFS_INIT_LIST_HEAD(rpc_list);
2333
2334 out:
2335         if (IS_ERR(req)) {
2336                 if (oa)
2337                         OBDO_FREE(oa);
2338                 if (pga)
2339                         OBD_FREE(pga, sizeof(*pga) * page_count);
2340         }
2341         RETURN(req);
2342 }
2343
2344 /* the loi lock is held across this function but it's allowed to release
2345  * and reacquire it during its work */
2346 /**
2347  * prepare pages for ASYNC io and put pages in send queue.
2348  *
2349  * \param cli -
2350  * \param loi -
2351  * \param cmd - OBD_BRW_* macroses
2352  * \param lop - pending pages
2353  *
2354  * \return zero if pages successfully add to send queue.
2355  * \return not zere if error occurring.
2356  */
2357 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2358                             int cmd, struct loi_oap_pages *lop)
2359 {
2360         struct ptlrpc_request *req;
2361         obd_count page_count = 0;
2362         struct osc_async_page *oap = NULL, *tmp;
2363         struct osc_brw_async_args *aa;
2364         struct obd_async_page_ops *ops;
2365         CFS_LIST_HEAD(rpc_list);
2366         unsigned int ending_offset;
2367         unsigned  starting_offset = 0;
2368         int srvlock = 0;
2369         ENTRY;
2370
2371         /* If there are HP OAPs we need to handle at least 1 of them,
2372          * move it the beginning of the pending list for that. */
2373         if (!list_empty(&lop->lop_urgent)) {
2374                 oap = list_entry(lop->lop_urgent.next,
2375                                  struct osc_async_page, oap_urgent_item);
2376                 if (oap->oap_async_flags & ASYNC_HP)
2377                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2378         }
2379
2380         /* first we find the pages we're allowed to work with */
2381         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2382                 ops = oap->oap_caller_ops;
2383
2384                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2385                          "magic 0x%x\n", oap, oap->oap_magic);
2386
2387                 if (page_count != 0 &&
2388                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2389                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2390                                " oap %p, page %p, srvlock %u\n",
2391                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2392                         break;
2393                 }
2394                 /* in llite being 'ready' equates to the page being locked
2395                  * until completion unlocks it.  commit_write submits a page
2396                  * as not ready because its unlock will happen unconditionally
2397                  * as the call returns.  if we race with commit_write giving
2398                  * us that page we dont' want to create a hole in the page
2399                  * stream, so we stop and leave the rpc to be fired by
2400                  * another dirtier or kupdated interval (the not ready page
2401                  * will still be on the dirty list).  we could call in
2402                  * at the end of ll_file_write to process the queue again. */
2403                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2404                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2405                         if (rc < 0)
2406                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2407                                                 "instead of ready\n", oap,
2408                                                 oap->oap_page, rc);
2409                         switch (rc) {
2410                         case -EAGAIN:
2411                                 /* llite is telling us that the page is still
2412                                  * in commit_write and that we should try
2413                                  * and put it in an rpc again later.  we
2414                                  * break out of the loop so we don't create
2415                                  * a hole in the sequence of pages in the rpc
2416                                  * stream.*/
2417                                 oap = NULL;
2418                                 break;
2419                         case -EINTR:
2420                                 /* the io isn't needed.. tell the checks
2421                                  * below to complete the rpc with EINTR */
2422                                 spin_lock(&oap->oap_lock);
2423                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2424                                 spin_unlock(&oap->oap_lock);
2425                                 oap->oap_count = -EINTR;
2426                                 break;
2427                         case 0:
2428                                 spin_lock(&oap->oap_lock);
2429                                 oap->oap_async_flags |= ASYNC_READY;
2430                                 spin_unlock(&oap->oap_lock);
2431                                 break;
2432                         default:
2433                                 LASSERTF(0, "oap %p page %p returned %d "
2434                                             "from make_ready\n", oap,
2435                                             oap->oap_page, rc);
2436                                 break;
2437                         }
2438                 }
2439                 if (oap == NULL)
2440                         break;
2441                 /*
2442                  * Page submitted for IO has to be locked. Either by
2443                  * ->ap_make_ready() or by higher layers.
2444                  */
2445 #if defined(__KERNEL__) && defined(__linux__)
2446                  if(!(PageLocked(oap->oap_page) &&
2447                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2448                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2449                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2450                         LBUG();
2451                 }
2452 #endif
2453                 /* If there is a gap at the start of this page, it can't merge
2454                  * with any previous page, so we'll hand the network a
2455                  * "fragmented" page array that it can't transfer in 1 RDMA */
2456                 if (page_count != 0 && oap->oap_page_off != 0)
2457                         break;
2458
2459                 /* take the page out of our book-keeping */
2460                 list_del_init(&oap->oap_pending_item);
2461                 lop_update_pending(cli, lop, cmd, -1);
2462                 list_del_init(&oap->oap_urgent_item);
2463
2464                 if (page_count == 0)
2465                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2466                                           (PTLRPC_MAX_BRW_SIZE - 1);
2467
2468                 /* ask the caller for the size of the io as the rpc leaves. */
2469                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2470                         oap->oap_count =
2471                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2472                 if (oap->oap_count <= 0) {
2473                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2474                                oap->oap_count);
2475                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2476                         continue;
2477                 }
2478
2479                 /* now put the page back in our accounting */
2480                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2481                 if (page_count == 0)
2482                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2483                 if (++page_count >= cli->cl_max_pages_per_rpc)
2484                         break;
2485
2486                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2487                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2488                  * have the same alignment as the initial writes that allocated
2489                  * extents on the server. */
2490                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2491                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2492                 if (ending_offset == 0)
2493                         break;
2494
2495                 /* If there is a gap at the end of this page, it can't merge
2496                  * with any subsequent pages, so we'll hand the network a
2497                  * "fragmented" page array that it can't transfer in 1 RDMA */
2498                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2499                         break;
2500         }
2501
2502         osc_wake_cache_waiters(cli);
2503
2504         if (page_count == 0)
2505                 RETURN(0);
2506
2507         loi_list_maint(cli, loi);
2508
2509         client_obd_list_unlock(&cli->cl_loi_list_lock);
2510
2511         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2512         if (IS_ERR(req)) {
2513                 /* this should happen rarely and is pretty bad, it makes the
2514                  * pending list not follow the dirty order */
2515                 client_obd_list_lock(&cli->cl_loi_list_lock);
2516                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2517                         list_del_init(&oap->oap_rpc_item);
2518
2519                         /* queued sync pages can be torn down while the pages
2520                          * were between the pending list and the rpc */
2521                         if (oap->oap_interrupted) {
2522                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2523                                 osc_ap_completion(cli, NULL, oap, 0,
2524                                                   oap->oap_count);
2525                                 continue;
2526                         }
2527                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2528                 }
2529                 loi_list_maint(cli, loi);
2530                 RETURN(PTR_ERR(req));
2531         }
2532
2533         aa = ptlrpc_req_async_args(req);
2534         if (cmd == OBD_BRW_READ) {
2535                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2536                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2537                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2538                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2539         } else {
2540                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2541                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2542                                  cli->cl_w_in_flight);
2543                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2544                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2545         }
2546         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2547
2548         client_obd_list_lock(&cli->cl_loi_list_lock);
2549
2550         if (cmd == OBD_BRW_READ)
2551                 cli->cl_r_in_flight++;
2552         else
2553                 cli->cl_w_in_flight++;
2554
2555         /* queued sync pages can be torn down while the pages
2556          * were between the pending list and the rpc */
2557         tmp = NULL;
2558         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2559                 /* only one oap gets a request reference */
2560                 if (tmp == NULL)
2561                         tmp = oap;
2562                 if (oap->oap_interrupted && !req->rq_intr) {
2563                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2564                                oap, req);
2565                         ptlrpc_mark_interrupted(req);
2566                 }
2567         }
2568         if (tmp != NULL)
2569                 tmp->oap_request = ptlrpc_request_addref(req);
2570
2571         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2572                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2573
2574         req->rq_interpret_reply = brw_interpret;
2575         ptlrpcd_add_req(req);
2576         RETURN(1);
2577 }
2578
2579 #define LOI_DEBUG(LOI, STR, args...)                                     \
2580         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2581                !list_empty(&(LOI)->loi_ready_item) ||                    \
2582                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2583                (LOI)->loi_write_lop.lop_num_pending,                     \
2584                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2585                (LOI)->loi_read_lop.lop_num_pending,                      \
2586                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2587                args)                                                     \
2588
2589 /* This is called by osc_check_rpcs() to find which objects have pages that
2590  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2591 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2592 {
2593         ENTRY;
2594         /* First return objects that have blocked locks so that they
2595          * will be flushed quickly and other clients can get the lock,
2596          * then objects which have pages ready to be stuffed into RPCs */
2597         if (!list_empty(&cli->cl_loi_hp_ready_list))
2598                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2599                                   struct lov_oinfo, loi_hp_ready_item));
2600         if (!list_empty(&cli->cl_loi_ready_list))
2601                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2602                                   struct lov_oinfo, loi_ready_item));
2603
2604         /* then if we have cache waiters, return all objects with queued
2605          * writes.  This is especially important when many small files
2606          * have filled up the cache and not been fired into rpcs because
2607          * they don't pass the nr_pending/object threshhold */
2608         if (!list_empty(&cli->cl_cache_waiters) &&
2609             !list_empty(&cli->cl_loi_write_list))
2610                 RETURN(list_entry(cli->cl_loi_write_list.next,
2611                                   struct lov_oinfo, loi_write_item));
2612
2613         /* then return all queued objects when we have an invalid import
2614          * so that they get flushed */
2615         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2616                 if (!list_empty(&cli->cl_loi_write_list))
2617                         RETURN(list_entry(cli->cl_loi_write_list.next,
2618                                           struct lov_oinfo, loi_write_item));
2619                 if (!list_empty(&cli->cl_loi_read_list))
2620                         RETURN(list_entry(cli->cl_loi_read_list.next,
2621                                           struct lov_oinfo, loi_read_item));
2622         }
2623         RETURN(NULL);
2624 }
2625
2626 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2627 {
2628         struct osc_async_page *oap;
2629         int hprpc = 0;
2630
2631         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2632                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2633                                  struct osc_async_page, oap_urgent_item);
2634                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2635         }
2636
2637         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2638                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2639                                  struct osc_async_page, oap_urgent_item);
2640                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2641         }
2642
2643         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2644 }
2645
2646 /* called with the loi list lock held */
2647 static void osc_check_rpcs(struct client_obd *cli)
2648 {
2649         struct lov_oinfo *loi;
2650         int rc = 0, race_counter = 0;
2651         ENTRY;
2652
2653         while ((loi = osc_next_loi(cli)) != NULL) {
2654                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2655
2656                 if (osc_max_rpc_in_flight(cli, loi))
2657                         break;
2658
2659                 /* attempt some read/write balancing by alternating between
2660                  * reads and writes in an object.  The makes_rpc checks here
2661                  * would be redundant if we were getting read/write work items
2662                  * instead of objects.  we don't want send_oap_rpc to drain a
2663                  * partial read pending queue when we're given this object to
2664                  * do io on writes while there are cache waiters */
2665                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2666                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2667                                               &loi->loi_write_lop);
2668                         if (rc < 0)
2669                                 break;
2670                         if (rc > 0)
2671                                 race_counter = 0;
2672                         else
2673                                 race_counter++;
2674                 }
2675                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2676                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2677                                               &loi->loi_read_lop);
2678                         if (rc < 0)
2679                                 break;
2680                         if (rc > 0)
2681                                 race_counter = 0;
2682                         else
2683                                 race_counter++;
2684                 }
2685
2686                 /* attempt some inter-object balancing by issueing rpcs
2687                  * for each object in turn */
2688                 if (!list_empty(&loi->loi_hp_ready_item))
2689                         list_del_init(&loi->loi_hp_ready_item);
2690                 if (!list_empty(&loi->loi_ready_item))
2691                         list_del_init(&loi->loi_ready_item);
2692                 if (!list_empty(&loi->loi_write_item))
2693                         list_del_init(&loi->loi_write_item);
2694                 if (!list_empty(&loi->loi_read_item))
2695                         list_del_init(&loi->loi_read_item);
2696
2697                 loi_list_maint(cli, loi);
2698
2699                 /* send_oap_rpc fails with 0 when make_ready tells it to
2700                  * back off.  llite's make_ready does this when it tries
2701                  * to lock a page queued for write that is already locked.
2702                  * we want to try sending rpcs from many objects, but we
2703                  * don't want to spin failing with 0.  */
2704                 if (race_counter == 10)
2705                         break;
2706         }
2707         EXIT;
2708 }
2709
2710 /* we're trying to queue a page in the osc so we're subject to the
2711  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2712  * If the osc's queued pages are already at that limit, then we want to sleep
2713  * until there is space in the osc's queue for us.  We also may be waiting for
2714  * write credits from the OST if there are RPCs in flight that may return some
2715  * before we fall back to sync writes.
2716  *
2717  * We need this know our allocation was granted in the presence of signals */
2718 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2719 {
2720         int rc;
2721         ENTRY;
2722         client_obd_list_lock(&cli->cl_loi_list_lock);
2723         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2724         client_obd_list_unlock(&cli->cl_loi_list_lock);
2725         RETURN(rc);
2726 };
2727
2728 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2729  * grant or cache space. */
2730 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2731                            struct osc_async_page *oap)
2732 {
2733         struct osc_cache_waiter ocw;
2734         struct l_wait_info lwi = { 0 };
2735         ENTRY;
2736
2737         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2738                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2739                cli->cl_dirty_max, obd_max_dirty_pages,
2740                cli->cl_lost_grant, cli->cl_avail_grant);
2741
2742         /* force the caller to try sync io.  this can jump the list
2743          * of queued writes and create a discontiguous rpc stream */
2744         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2745             loi->loi_ar.ar_force_sync)
2746                 RETURN(-EDQUOT);
2747
2748         /* Hopefully normal case - cache space and write credits available */
2749         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2750             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2751             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2752                 /* account for ourselves */
2753                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2754                 RETURN(0);
2755         }
2756
2757         /* Make sure that there are write rpcs in flight to wait for.  This
2758          * is a little silly as this object may not have any pending but
2759          * other objects sure might. */
2760         if (cli->cl_w_in_flight) {
2761                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2762                 cfs_waitq_init(&ocw.ocw_waitq);
2763                 ocw.ocw_oap = oap;
2764                 ocw.ocw_rc = 0;
2765
2766                 loi_list_maint(cli, loi);
2767                 osc_check_rpcs(cli);
2768                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2769
2770                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2771                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2772
2773                 client_obd_list_lock(&cli->cl_loi_list_lock);
2774                 if (!list_empty(&ocw.ocw_entry)) {
2775                         list_del(&ocw.ocw_entry);
2776                         RETURN(-EINTR);
2777                 }
2778                 RETURN(ocw.ocw_rc);
2779         }
2780
2781         RETURN(-EDQUOT);
2782 }
2783
2784 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2785                         void **res, int rw, obd_off start, obd_off end,
2786                         struct lustre_handle *lockh, int flags)
2787 {
2788         struct ldlm_lock *lock = NULL;
2789         int rc, release = 0;
2790
2791         ENTRY;
2792
2793         if (lockh && lustre_handle_is_used(lockh)) {
2794                 /* if a valid lockh is passed, just check that the corresponding
2795                  * lock covers the extent */
2796                 lock = ldlm_handle2lock(lockh);
2797                 release = 1;
2798         } else {
2799                 struct osc_async_page *oap = *res;
2800                 spin_lock(&oap->oap_lock);
2801                 lock = oap->oap_ldlm_lock;
2802                 if (likely(lock))
2803                         LDLM_LOCK_GET(lock);
2804                 spin_unlock(&oap->oap_lock);
2805         }
2806         /* lock can be NULL in case race obd_get_lock vs lock cancel
2807          * so we should be don't try match this */
2808         if (unlikely(!lock))
2809                 return 0;
2810
2811         rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2812         if (release == 1 && rc == 1)
2813                 /* if a valid lockh was passed, we just need to check
2814                  * that the lock covers the page, no reference should be
2815                  * taken*/
2816                 ldlm_lock_decref(lockh,
2817                                  rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2818         LDLM_LOCK_PUT(lock);
2819         RETURN(rc);
2820 }
2821
2822 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2823                         struct lov_oinfo *loi, cfs_page_t *page,
2824                         obd_off offset, struct obd_async_page_ops *ops,
2825                         void *data, void **res, int flags,
2826                         struct lustre_handle *lockh)
2827 {
2828         struct osc_async_page *oap;
2829         struct ldlm_res_id oid = {{0}};
2830         int rc = 0;
2831
2832         ENTRY;
2833
2834         if (!page)
2835                 return size_round(sizeof(*oap));
2836
2837         oap = *res;
2838         oap->oap_magic = OAP_MAGIC;
2839         oap->oap_cli = &exp->exp_obd->u.cli;
2840         oap->oap_loi = loi;
2841
2842         oap->oap_caller_ops = ops;
2843         oap->oap_caller_data = data;
2844
2845         oap->oap_page = page;
2846         oap->oap_obj_off = offset;
2847
2848         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2849         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2850         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2851         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2852
2853         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2854
2855         spin_lock_init(&oap->oap_lock);
2856
2857         /* If the page was marked as notcacheable - don't add to any locks */
2858         if (!(flags & OBD_PAGE_NO_CACHE)) {
2859                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2860                 /* This is the only place where we can call cache_add_extent
2861                    without oap_lock, because this page is locked now, and
2862                    the lock we are adding it to is referenced, so cannot lose
2863                    any pages either. */
2864                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2865                 if (rc)
2866                         RETURN(rc);
2867         }
2868
2869         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2870         RETURN(0);
2871 }
2872
2873 struct osc_async_page *oap_from_cookie(void *cookie)
2874 {
2875         struct osc_async_page *oap = cookie;
2876         if (oap->oap_magic != OAP_MAGIC)
2877                 return ERR_PTR(-EINVAL);
2878         return oap;
2879 };
2880
2881 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2882                               struct lov_oinfo *loi, void *cookie,
2883                               int cmd, obd_off off, int count,
2884                               obd_flag brw_flags, enum async_flags async_flags)
2885 {
2886         struct client_obd *cli = &exp->exp_obd->u.cli;
2887         struct osc_async_page *oap;
2888         int rc = 0;
2889         ENTRY;
2890
2891         oap = oap_from_cookie(cookie);
2892         if (IS_ERR(oap))
2893                 RETURN(PTR_ERR(oap));
2894
2895         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2896                 RETURN(-EIO);
2897
2898         if (!list_empty(&oap->oap_pending_item) ||
2899             !list_empty(&oap->oap_urgent_item) ||
2900             !list_empty(&oap->oap_rpc_item))
2901                 RETURN(-EBUSY);
2902
2903         /* check if the file's owner/group is over quota */
2904         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2905                 struct obd_async_page_ops *ops;
2906                 struct obdo *oa;
2907
2908                 OBDO_ALLOC(oa);
2909                 if (oa == NULL)
2910                         RETURN(-ENOMEM);
2911
2912                 ops = oap->oap_caller_ops;
2913                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2914                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2915                     NO_QUOTA)
2916                         rc = -EDQUOT;
2917
2918                 OBDO_FREE(oa);
2919                 if (rc)
2920                         RETURN(rc);
2921         }
2922
2923         if (loi == NULL)
2924                 loi = lsm->lsm_oinfo[0];
2925
2926         client_obd_list_lock(&cli->cl_loi_list_lock);
2927
2928         oap->oap_cmd = cmd;
2929         oap->oap_page_off = off;
2930         oap->oap_count = count;
2931         oap->oap_brw_flags = brw_flags;
2932         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2933         if (libcfs_memory_pressure_get())
2934                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2935         spin_lock(&oap->oap_lock);
2936         oap->oap_async_flags = async_flags;
2937         spin_unlock(&oap->oap_lock);
2938
2939         if (cmd & OBD_BRW_WRITE) {
2940                 rc = osc_enter_cache(cli, loi, oap);
2941                 if (rc) {
2942                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2943                         RETURN(rc);
2944                 }
2945         }
2946
2947         osc_oap_to_pending(oap);
2948         loi_list_maint(cli, loi);
2949
2950         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2951                   cmd);
2952
2953         osc_check_rpcs(cli);
2954         client_obd_list_unlock(&cli->cl_loi_list_lock);
2955
2956         RETURN(0);
2957 }
2958
2959 /* aka (~was & now & flag), but this is more clear :) */
2960 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2961
2962 static int osc_set_async_flags(struct obd_export *exp,
2963                                struct lov_stripe_md *lsm,
2964                                struct lov_oinfo *loi, void *cookie,
2965                                obd_flag async_flags)
2966 {
2967         struct client_obd *cli = &exp->exp_obd->u.cli;
2968         struct loi_oap_pages *lop;
2969         struct osc_async_page *oap;
2970         int rc = 0;
2971         ENTRY;
2972
2973         oap = oap_from_cookie(cookie);
2974         if (IS_ERR(oap))
2975                 RETURN(PTR_ERR(oap));
2976
2977         /*
2978          * bug 7311: OST-side locking is only supported for liblustre for now
2979          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2980          * implementation has to handle case where OST-locked page was picked
2981          * up by, e.g., ->writepage().
2982          */
2983         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2984         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2985                                      * tread here. */
2986
2987         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2988                 RETURN(-EIO);
2989
2990         if (loi == NULL)
2991                 loi = lsm->lsm_oinfo[0];
2992
2993         if (oap->oap_cmd & OBD_BRW_WRITE) {
2994                 lop = &loi->loi_write_lop;
2995         } else {
2996                 lop = &loi->loi_read_lop;
2997         }
2998
2999         client_obd_list_lock(&cli->cl_loi_list_lock);
3000         /* oap_lock provides atomic semantics of oap_async_flags access */
3001         spin_lock(&oap->oap_lock);
3002         if (list_empty(&oap->oap_pending_item))
3003                 GOTO(out, rc = -EINVAL);
3004
3005         if ((oap->oap_async_flags & async_flags) == async_flags)
3006                 GOTO(out, rc = 0);
3007
3008         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3009                 oap->oap_async_flags |= ASYNC_READY;
3010
3011         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3012             list_empty(&oap->oap_rpc_item)) {
3013                 if (oap->oap_async_flags & ASYNC_HP)
3014                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3015                 else
3016                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3017                 oap->oap_async_flags |= ASYNC_URGENT;
3018                 loi_list_maint(cli, loi);
3019         }
3020
3021         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3022                         oap->oap_async_flags);
3023 out:
3024         spin_unlock(&oap->oap_lock);
3025         osc_check_rpcs(cli);
3026         client_obd_list_unlock(&cli->cl_loi_list_lock);
3027         RETURN(rc);
3028 }
3029
3030 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3031                              struct lov_oinfo *loi,
3032                              struct obd_io_group *oig, void *cookie,
3033                              int cmd, obd_off off, int count,
3034                              obd_flag brw_flags,
3035                              obd_flag async_flags)
3036 {
3037         struct client_obd *cli = &exp->exp_obd->u.cli;
3038         struct osc_async_page *oap;
3039         struct loi_oap_pages *lop;
3040         int rc = 0;
3041         ENTRY;
3042
3043         oap = oap_from_cookie(cookie);
3044         if (IS_ERR(oap))
3045                 RETURN(PTR_ERR(oap));
3046
3047         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3048                 RETURN(-EIO);
3049
3050         if (!list_empty(&oap->oap_pending_item) ||
3051             !list_empty(&oap->oap_urgent_item) ||
3052             !list_empty(&oap->oap_rpc_item))
3053                 RETURN(-EBUSY);
3054
3055         if (loi == NULL)
3056                 loi = lsm->lsm_oinfo[0];
3057
3058         client_obd_list_lock(&cli->cl_loi_list_lock);
3059
3060         oap->oap_cmd = cmd;
3061         oap->oap_page_off = off;
3062         oap->oap_count = count;
3063         oap->oap_brw_flags = brw_flags;
3064         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3065         if (libcfs_memory_pressure_get())
3066                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3067         spin_lock(&oap->oap_lock);
3068         oap->oap_async_flags = async_flags;
3069         spin_unlock(&oap->oap_lock);
3070
3071         if (cmd & OBD_BRW_WRITE)
3072                 lop = &loi->loi_write_lop;
3073         else
3074                 lop = &loi->loi_read_lop;
3075
3076         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3077         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3078                 oap->oap_oig = oig;
3079                 rc = oig_add_one(oig, &oap->oap_occ);
3080         }
3081
3082         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3083                   oap, oap->oap_page, rc);
3084
3085         client_obd_list_unlock(&cli->cl_loi_list_lock);
3086
3087         RETURN(rc);
3088 }
3089
3090 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3091                                  struct loi_oap_pages *lop, int cmd)
3092 {
3093         struct list_head *pos, *tmp;
3094         struct osc_async_page *oap;
3095
3096         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3097                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3098                 list_del(&oap->oap_pending_item);
3099                 osc_oap_to_pending(oap);
3100         }
3101         loi_list_maint(cli, loi);
3102 }
3103
3104 static int osc_trigger_group_io(struct obd_export *exp,
3105                                 struct lov_stripe_md *lsm,
3106                                 struct lov_oinfo *loi,
3107                                 struct obd_io_group *oig)
3108 {
3109         struct client_obd *cli = &exp->exp_obd->u.cli;
3110         ENTRY;
3111
3112         if (loi == NULL)
3113                 loi = lsm->lsm_oinfo[0];
3114
3115         client_obd_list_lock(&cli->cl_loi_list_lock);
3116
3117         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3118         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3119
3120         osc_check_rpcs(cli);
3121         client_obd_list_unlock(&cli->cl_loi_list_lock);
3122
3123         RETURN(0);
3124 }
3125
3126 static int osc_teardown_async_page(struct obd_export *exp,
3127                                    struct lov_stripe_md *lsm,
3128                                    struct lov_oinfo *loi, void *cookie)
3129 {
3130         struct client_obd *cli = &exp->exp_obd->u.cli;
3131         struct loi_oap_pages *lop;
3132         struct osc_async_page *oap;
3133         int rc = 0;
3134         ENTRY;
3135
3136         oap = oap_from_cookie(cookie);
3137         if (IS_ERR(oap))
3138                 RETURN(PTR_ERR(oap));
3139
3140         if (loi == NULL)
3141                 loi = lsm->lsm_oinfo[0];
3142
3143         if (oap->oap_cmd & OBD_BRW_WRITE) {
3144                 lop = &loi->loi_write_lop;
3145         } else {
3146                 lop = &loi->loi_read_lop;
3147         }
3148
3149         client_obd_list_lock(&cli->cl_loi_list_lock);
3150
3151         if (!list_empty(&oap->oap_rpc_item))
3152                 GOTO(out, rc = -EBUSY);
3153
3154         osc_exit_cache(cli, oap, 0);
3155         osc_wake_cache_waiters(cli);
3156
3157         if (!list_empty(&oap->oap_urgent_item)) {
3158                 list_del_init(&oap->oap_urgent_item);
3159                 spin_lock(&oap->oap_lock);
3160                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3161                 spin_unlock(&oap->oap_lock);
3162         }
3163
3164         if (!list_empty(&oap->oap_pending_item)) {
3165                 list_del_init(&oap->oap_pending_item);
3166                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3167         }
3168         loi_list_maint(cli, loi);
3169         cache_remove_extent(cli->cl_cache, oap);
3170
3171         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3172 out:
3173         client_obd_list_unlock(&cli->cl_loi_list_lock);
3174         RETURN(rc);
3175 }
3176
3177 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3178                            struct ldlm_lock_desc *new, void *data,
3179                            int flag)
3180 {
3181         struct lustre_handle lockh = { 0 };
3182         int rc;
3183         ENTRY;
3184
3185         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3186                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3187                 LBUG();
3188         }
3189
3190         switch (flag) {
3191         case LDLM_CB_BLOCKING:
3192                 ldlm_lock2handle(lock, &lockh);
3193                 rc = ldlm_cli_cancel(&lockh);
3194                 if (rc != ELDLM_OK)
3195                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
3196                 break;
3197         case LDLM_CB_CANCELING: {
3198
3199                 ldlm_lock2handle(lock, &lockh);
3200                 /* This lock wasn't granted, don't try to do anything */
3201                 if (lock->l_req_mode != lock->l_granted_mode)
3202                         RETURN(0);
3203
3204                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3205                                   &lockh);
3206
3207                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3208                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3209                                                           lock, new, data,flag);
3210                 break;
3211         }
3212         default:
3213                 LBUG();
3214         }
3215
3216         RETURN(0);
3217 }
3218 EXPORT_SYMBOL(osc_extent_blocking_cb);
3219
3220 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3221                                     int flags)
3222 {
3223         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3224
3225         if (lock == NULL) {
3226                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3227                 return;
3228         }
3229         lock_res_and_lock(lock);
3230 #if defined (__KERNEL__) && defined (__linux__)
3231         /* Liang XXX: Darwin and Winnt checking should be added */
3232         if (lock->l_ast_data && lock->l_ast_data != data) {
3233                 struct inode *new_inode = data;
3234                 struct inode *old_inode = lock->l_ast_data;
3235                 if (!(old_inode->i_state & I_FREEING))
3236                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3237                 LASSERTF(old_inode->i_state & I_FREEING,
3238                          "Found existing inode %p/%lu/%u state %lu in lock: "
3239                          "setting data to %p/%lu/%u\n", old_inode,
3240                          old_inode->i_ino, old_inode->i_generation,
3241                          old_inode->i_state,
3242                          new_inode, new_inode->i_ino, new_inode->i_generation);
3243         }
3244 #endif
3245         lock->l_ast_data = data;
3246         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3247         unlock_res_and_lock(lock);
3248         LDLM_LOCK_PUT(lock);
3249 }
3250
3251 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3252                              ldlm_iterator_t replace, void *data)
3253 {
3254         struct ldlm_res_id res_id;
3255         struct obd_device *obd = class_exp2obd(exp);
3256
3257         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3258         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3259         return 0;
3260 }
3261
3262 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3263                             struct obd_info *oinfo, int intent, int rc)
3264 {
3265         ENTRY;
3266
3267         if (intent) {
3268                 /* The request was created before ldlm_cli_enqueue call. */
3269                 if (rc == ELDLM_LOCK_ABORTED) {
3270                         struct ldlm_reply *rep;
3271
3272                         /* swabbed by ldlm_cli_enqueue() */
3273                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3274                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3275                                              sizeof(*rep));
3276                         LASSERT(rep != NULL);
3277                         if (rep->lock_policy_res1)
3278                                 rc = rep->lock_policy_res1;
3279                 }
3280         }
3281
3282         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3283                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3284                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3285                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3286                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3287         }
3288
3289         if (!rc)
3290                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3291
3292         /* Call the update callback. */
3293         rc = oinfo->oi_cb_up(oinfo, rc);
3294         RETURN(rc);
3295 }
3296
3297 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3298                                  struct osc_enqueue_args *aa, int rc)
3299 {
3300         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3301         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3302         struct ldlm_lock *lock;
3303
3304         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3305          * be valid. */
3306         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3307
3308         /* Complete obtaining the lock procedure. */
3309         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3310                                    aa->oa_ei->ei_mode,
3311                                    &aa->oa_oi->oi_flags,
3312                                    &lsm->lsm_oinfo[0]->loi_lvb,
3313                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3314                                    lustre_swab_ost_lvb,
3315                                    aa->oa_oi->oi_lockh, rc);
3316
3317         /* Complete osc stuff. */
3318         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3319
3320         /* Release the lock for async request. */
3321         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3322                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3323
3324         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3325                  aa->oa_oi->oi_lockh, req, aa);
3326         LDLM_LOCK_PUT(lock);
3327         return rc;
3328 }
3329
3330 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3331  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3332  * other synchronous requests, however keeping some locks and trying to obtain
3333  * others may take a considerable amount of time in a case of ost failure; and
3334  * when other sync requests do not get released lock from a client, the client
3335  * is excluded from the cluster -- such scenarious make the life difficult, so
3336  * release locks just after they are obtained. */
3337 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3338                        struct ldlm_enqueue_info *einfo,
3339                        struct ptlrpc_request_set *rqset)
3340 {
3341         struct ldlm_res_id res_id;
3342         struct obd_device *obd = exp->exp_obd;
3343         struct ldlm_reply *rep;
3344         struct ptlrpc_request *req = NULL;
3345         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3346         ldlm_mode_t mode;
3347         int rc;
3348         ENTRY;
3349
3350         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3351                            oinfo->oi_md->lsm_object_gr, &res_id);
3352         /* Filesystem lock extents are extended to page boundaries so that
3353          * dealing with the page cache is a little smoother.  */
3354         oinfo->oi_policy.l_extent.start -=
3355                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3356         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3357
3358         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3359                 goto no_match;
3360
3361         /* Next, search for already existing extent locks that will cover us */
3362         /* If we're trying to read, we also search for an existing PW lock.  The
3363          * VFS and page cache already protect us locally, so lots of readers/
3364          * writers can share a single PW lock.
3365          *
3366          * There are problems with conversion deadlocks, so instead of
3367          * converting a read lock to a write lock, we'll just enqueue a new
3368          * one.
3369          *
3370          * At some point we should cancel the read lock instead of making them
3371          * send us a blocking callback, but there are problems with canceling
3372          * locks out from other users right now, too. */
3373         mode = einfo->ei_mode;
3374         if (einfo->ei_mode == LCK_PR)
3375                 mode |= LCK_PW;
3376         mode = ldlm_lock_match(obd->obd_namespace,
3377                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3378                                einfo->ei_type, &oinfo->oi_policy, mode,
3379                                oinfo->oi_lockh);
3380         if (mode) {
3381                 /* addref the lock only if not async requests and PW lock is
3382                  * matched whereas we asked for PR. */
3383                 if (!rqset && einfo->ei_mode != mode)
3384                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3385                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3386                                         oinfo->oi_flags);
3387                 if (intent) {
3388                         /* I would like to be able to ASSERT here that rss <=
3389                          * kms, but I can't, for reasons which are explained in
3390                          * lov_enqueue() */
3391                 }
3392
3393                 /* We already have a lock, and it's referenced */
3394                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3395
3396                 /* For async requests, decref the lock. */
3397                 if (einfo->ei_mode != mode)
3398                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3399                 else if (rqset)
3400                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3401
3402                 RETURN(ELDLM_OK);
3403         }
3404
3405  no_match:
3406         if (intent) {
3407                 __u32 size[3] = {
3408                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3409                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3410                         [DLM_LOCKREQ_OFF + 1] = 0 };
3411
3412                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3413                 if (req == NULL)
3414                         RETURN(-ENOMEM);
3415
3416                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3417                 size[DLM_REPLY_REC_OFF] =
3418                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3419                 ptlrpc_req_set_repsize(req, 3, size);
3420         }
3421
3422         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3423         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3424
3425         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3426                               &oinfo->oi_policy, &oinfo->oi_flags,
3427                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3428                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3429                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3430                               rqset ? 1 : 0);
3431         if (rqset) {
3432                 if (!rc) {
3433                         struct osc_enqueue_args *aa;
3434                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3435                         aa = ptlrpc_req_async_args(req);
3436                         aa->oa_oi = oinfo;
3437                         aa->oa_ei = einfo;
3438                         aa->oa_exp = exp;
3439
3440                         req->rq_interpret_reply = osc_enqueue_interpret;
3441                         ptlrpc_set_add_req(rqset, req);
3442                 } else if (intent) {
3443                         ptlrpc_req_finished(req);
3444                 }
3445                 RETURN(rc);
3446         }
3447
3448         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3449         if (intent)
3450                 ptlrpc_req_finished(req);
3451
3452         RETURN(rc);
3453 }
3454
3455 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3456                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3457                      int *flags, void *data, struct lustre_handle *lockh,
3458                      int *n_matches)
3459 {
3460         struct ldlm_res_id res_id;
3461         struct obd_device *obd = exp->exp_obd;
3462         int lflags = *flags;
3463         ldlm_mode_t rc;
3464         ENTRY;
3465
3466         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3467
3468         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3469
3470         /* Filesystem lock extents are extended to page boundaries so that
3471          * dealing with the page cache is a little smoother */
3472         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3473         policy->l_extent.end |= ~CFS_PAGE_MASK;
3474
3475         /* Next, search for already existing extent locks that will cover us */
3476         /* If we're trying to read, we also search for an existing PW lock.  The
3477          * VFS and page cache already protect us locally, so lots of readers/
3478          * writers can share a single PW lock. */
3479         rc = mode;
3480         if (mode == LCK_PR)
3481                 rc |= LCK_PW;
3482         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3483                              &res_id, type, policy, rc, lockh);
3484         if (rc) {
3485                 osc_set_data_with_check(lockh, data, lflags);
3486                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3487                         ldlm_lock_addref(lockh, LCK_PR);
3488                         ldlm_lock_decref(lockh, LCK_PW);
3489                 }
3490                 if (n_matches != NULL)
3491                         (*n_matches)++;
3492         }
3493
3494         RETURN(rc);
3495 }
3496
3497 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3498                       __u32 mode, struct lustre_handle *lockh, int flags,
3499                       obd_off end)
3500 {
3501         ENTRY;
3502
3503         if (unlikely(mode == LCK_GROUP))
3504                 ldlm_lock_decref_and_cancel(lockh, mode);
3505         else
3506                 ldlm_lock_decref(lockh, mode);
3507
3508         RETURN(0);
3509 }
3510
3511 static int osc_cancel_unused(struct obd_export *exp,
3512                              struct lov_stripe_md *lsm, int flags, void *opaque)
3513 {
3514         struct obd_device *obd = class_exp2obd(exp);
3515         struct ldlm_res_id res_id, *resp = NULL;
3516
3517         if (lsm != NULL) {
3518                 resp = osc_build_res_name(lsm->lsm_object_id,
3519                                           lsm->lsm_object_gr, &res_id);
3520         }
3521
3522         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3523
3524 }
3525
3526 static int osc_join_lru(struct obd_export *exp,
3527                         struct lov_stripe_md *lsm, int join)
3528 {
3529         struct obd_device *obd = class_exp2obd(exp);
3530         struct ldlm_res_id res_id, *resp = NULL;
3531
3532         if (lsm != NULL) {
3533                 resp = osc_build_res_name(lsm->lsm_object_id,
3534                                           lsm->lsm_object_gr, &res_id);
3535         }
3536
3537         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3538
3539 }
3540
3541 static int osc_statfs_interpret(struct ptlrpc_request *req,
3542                                 struct osc_async_args *aa, int rc)
3543 {
3544         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3545         struct obd_statfs *msfs;
3546         ENTRY;
3547
3548         if (rc == -EBADR)
3549                 /* The request has in fact never been sent
3550                  * due to issues at a higher level (LOV).
3551                  * Exit immediately since the caller is
3552                  * aware of the problem and takes care
3553                  * of the clean up */
3554                  RETURN(rc);
3555
3556         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3557             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3558                 GOTO(out, rc = 0);
3559
3560         if (rc != 0)
3561                 GOTO(out, rc);
3562
3563         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3564                                   lustre_swab_obd_statfs);
3565         if (msfs == NULL) {
3566                 CERROR("Can't unpack obd_statfs\n");
3567                 GOTO(out, rc = -EPROTO);
3568         }
3569
3570         /* Reinitialize the RDONLY and DEGRADED flags at the client
3571          * on each statfs, so they don't stay set permanently. */
3572         spin_lock(&cli->cl_oscc.oscc_lock);
3573         cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_RDONLY | OSCC_FLAG_DEGRADED);
3574         if (msfs->os_state & OS_STATE_DEGRADED)
3575                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3576
3577         if (msfs->os_state & OS_STATE_READONLY)
3578                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3579         spin_unlock(&cli->cl_oscc.oscc_lock);
3580
3581         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3582 out:
3583         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3584         RETURN(rc);
3585 }
3586
3587 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3588                             __u64 max_age, struct ptlrpc_request_set *rqset)
3589 {
3590         struct ptlrpc_request *req;
3591         struct osc_async_args *aa;
3592         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3593         ENTRY;
3594
3595         /* We could possibly pass max_age in the request (as an absolute
3596          * timestamp or a "seconds.usec ago") so the target can avoid doing
3597          * extra calls into the filesystem if that isn't necessary (e.g.
3598          * during mount that would help a bit).  Having relative timestamps
3599          * is not so great if request processing is slow, while absolute
3600          * timestamps are not ideal because they need time synchronization. */
3601         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3602                               OST_STATFS, 1, NULL, NULL);
3603         if (!req)
3604                 RETURN(-ENOMEM);
3605
3606         ptlrpc_req_set_repsize(req, 2, size);
3607         req->rq_request_portal = OST_CREATE_PORTAL;
3608         ptlrpc_at_set_req_timeout(req);
3609         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3610                 /* procfs requests not want stat in wait for avoid deadlock */
3611                 req->rq_no_resend = 1;
3612                 req->rq_no_delay = 1;
3613         }
3614
3615         req->rq_interpret_reply = osc_statfs_interpret;
3616         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3617         aa = ptlrpc_req_async_args(req);
3618         aa->aa_oi = oinfo;
3619
3620         ptlrpc_set_add_req(rqset, req);
3621         RETURN(0);
3622 }
3623
3624 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3625                       __u64 max_age, __u32 flags)
3626 {
3627         struct obd_statfs *msfs;
3628         struct ptlrpc_request *req;
3629         struct obd_import     *imp = NULL;
3630         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3631         int rc;
3632         ENTRY;
3633
3634         /*Since the request might also come from lprocfs, so we need
3635          *sync this with client_disconnect_export Bug15684*/
3636         down_read(&obd->u.cli.cl_sem);
3637         if (obd->u.cli.cl_import)
3638                 imp = class_import_get(obd->u.cli.cl_import);
3639         up_read(&obd->u.cli.cl_sem);
3640         if (!imp)
3641                 RETURN(-ENODEV);
3642
3643         /* We could possibly pass max_age in the request (as an absolute
3644          * timestamp or a "seconds.usec ago") so the target can avoid doing
3645          * extra calls into the filesystem if that isn't necessary (e.g.
3646          * during mount that would help a bit).  Having relative timestamps
3647          * is not so great if request processing is slow, while absolute
3648          * timestamps are not ideal because they need time synchronization. */
3649         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3650                               OST_STATFS, 1, NULL, NULL);
3651
3652         class_import_put(imp);
3653         if (!req)
3654                 RETURN(-ENOMEM);
3655
3656         ptlrpc_req_set_repsize(req, 2, size);
3657         req->rq_request_portal = OST_CREATE_PORTAL;
3658         ptlrpc_at_set_req_timeout(req);
3659
3660         if (flags & OBD_STATFS_NODELAY) {
3661                 /* procfs requests not want stat in wait for avoid deadlock */
3662                 req->rq_no_resend = 1;
3663                 req->rq_no_delay = 1;
3664         }
3665
3666         rc = ptlrpc_queue_wait(req);
3667         if (rc)
3668                 GOTO(out, rc);
3669
3670         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3671                                   lustre_swab_obd_statfs);
3672         if (msfs == NULL) {
3673                 CERROR("Can't unpack obd_statfs\n");
3674                 GOTO(out, rc = -EPROTO);
3675         }
3676
3677         memcpy(osfs, msfs, sizeof(*osfs));
3678
3679         EXIT;
3680  out:
3681         ptlrpc_req_finished(req);
3682         return rc;
3683 }
3684
3685 /* Retrieve object striping information.
3686  *
3687  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3688  * the maximum number of OST indices which will fit in the user buffer.
3689  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3690  */
3691 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3692 {
3693         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3694         struct lov_user_md_v3 lum, *lumk;
3695         int rc = 0, lum_size;
3696         struct lov_user_ost_data_v1 *lmm_objects;
3697         ENTRY;
3698
3699         if (!lsm)
3700                 RETURN(-ENODATA);
3701
3702         /* we only need the header part from user space to get lmm_magic and
3703          * lmm_stripe_count, (the header part is common to v1 and v3) */
3704         lum_size = sizeof(struct lov_user_md_v1);
3705         memset(&lum, 0x00, sizeof(lum));
3706         if (copy_from_user(&lum, lump, lum_size))
3707                 RETURN(-EFAULT);
3708
3709         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3710             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3711                 RETURN(-EINVAL);
3712
3713         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3714         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3715         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3716         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3717
3718         /* we can use lov_mds_md_size() to compute lum_size
3719          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3720         if (lum.lmm_stripe_count > 0) {
3721                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3722                 OBD_ALLOC(lumk, lum_size);
3723                 if (!lumk)
3724                         RETURN(-ENOMEM);
3725                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3726                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3727                 else
3728                         lmm_objects = &(lumk->lmm_objects[0]);
3729                 lmm_objects->l_object_id = lsm->lsm_object_id;
3730         } else {
3731                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3732                 lumk = &lum;
3733         }
3734
3735         lumk->lmm_magic = lum.lmm_magic;
3736         lumk->lmm_stripe_count = 1;
3737         lumk->lmm_object_id = lsm->lsm_object_id;
3738
3739         if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3740             (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3741                /* lsm not in host order, so count also need be in same order */
3742                 __swab32s(&lumk->lmm_magic);
3743                 __swab16s(&lumk->lmm_stripe_count);
3744                 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3745                 if (lum.lmm_stripe_count > 0)
3746                         lustre_swab_lov_user_md_objects(
3747                                 (struct lov_user_md_v1*)lumk);
3748         }
3749
3750         if (copy_to_user(lump, lumk, lum_size))
3751                 rc = -EFAULT;
3752
3753         if (lumk != &lum)
3754                 OBD_FREE(lumk, lum_size);
3755
3756         RETURN(rc);
3757 }
3758
3759
3760 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3761                          void *karg, void *uarg)
3762 {
3763         struct obd_device *obd = exp->exp_obd;
3764         struct obd_ioctl_data *data = karg;
3765         int err = 0;
3766         ENTRY;
3767
3768         if (!try_module_get(THIS_MODULE)) {
3769                 CERROR("Can't get module. Is it alive?");
3770                 return -EINVAL;
3771         }
3772         switch (cmd) {
3773         case OBD_IOC_LOV_GET_CONFIG: {
3774                 char *buf;
3775                 struct lov_desc *desc;
3776                 struct obd_uuid uuid;
3777
3778                 buf = NULL;
3779                 len = 0;
3780                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3781                         GOTO(out, err = -EINVAL);
3782
3783                 data = (struct obd_ioctl_data *)buf;
3784
3785                 if (sizeof(*desc) > data->ioc_inllen1) {
3786                         obd_ioctl_freedata(buf, len);
3787                         GOTO(out, err = -EINVAL);
3788                 }
3789
3790                 if (data->ioc_inllen2 < sizeof(uuid)) {
3791                         obd_ioctl_freedata(buf, len);
3792                         GOTO(out, err = -EINVAL);
3793                 }
3794
3795                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3796                 desc->ld_tgt_count = 1;
3797                 desc->ld_active_tgt_count = 1;
3798                 desc->ld_default_stripe_count = 1;
3799                 desc->ld_default_stripe_size = 0;
3800                 desc->ld_default_stripe_offset = 0;
3801                 desc->ld_pattern = 0;
3802                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3803
3804                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3805
3806                 err = copy_to_user((void *)uarg, buf, len);
3807                 if (err)
3808                         err = -EFAULT;
3809                 obd_ioctl_freedata(buf, len);
3810                 GOTO(out, err);
3811         }
3812         case LL_IOC_LOV_SETSTRIPE:
3813                 err = obd_alloc_memmd(exp, karg);
3814                 if (err > 0)
3815                         err = 0;
3816                 GOTO(out, err);
3817         case LL_IOC_LOV_GETSTRIPE:
3818                 err = osc_getstripe(karg, uarg);
3819                 GOTO(out, err);
3820         case OBD_IOC_CLIENT_RECOVER:
3821                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3822                                             data->ioc_inlbuf1);
3823                 if (err > 0)
3824                         err = 0;
3825                 GOTO(out, err);
3826         case IOC_OSC_SET_ACTIVE:
3827                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3828                                                data->ioc_offset);
3829                 GOTO(out, err);
3830         case OBD_IOC_POLL_QUOTACHECK:
3831                 err = lquota_poll_check(quota_interface, exp,
3832                                         (struct if_quotacheck *)karg);
3833                 GOTO(out, err);
3834         case OBD_IOC_DESTROY: {
3835                 struct obdo            *oa;
3836
3837                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3838                         GOTO (out, err = -EPERM);
3839                 oa = &data->ioc_obdo1;
3840
3841                 if (oa->o_id == 0)
3842                         GOTO(out, err = -EINVAL);
3843
3844                 oa->o_valid |= OBD_MD_FLGROUP;
3845
3846                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3847                 GOTO(out, err);
3848         }
3849         case OBD_IOC_PING_TARGET:
3850                 err = ptlrpc_obd_ping(obd);
3851                 GOTO(out, err);
3852         default:
3853                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3854                        cmd, cfs_curproc_comm());
3855                 GOTO(out, err = -ENOTTY);
3856         }
3857 out:
3858         module_put(THIS_MODULE);
3859         return err;
3860 }
3861
3862 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3863                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3864 {
3865         ENTRY;
3866         if (!vallen || !val)
3867                 RETURN(-EFAULT);
3868
3869         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3870                 __u32 *stripe = val;
3871                 *vallen = sizeof(*stripe);
3872                 *stripe = 0;
3873                 RETURN(0);
3874         } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3875                 struct client_obd *cli = &exp->exp_obd->u.cli;
3876                 __u64 *rpcsize = val;
3877                 LASSERT(*vallen == sizeof(__u64));
3878                 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3879                 RETURN(0);
3880         } else if (KEY_IS(KEY_LAST_ID)) {
3881                 struct ptlrpc_request *req;
3882                 obd_id *reply;
3883                 char *bufs[2] = { NULL, key };
3884                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3885                 int rc;
3886
3887                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3888                                       OST_GET_INFO, 2, size, bufs);
3889                 if (req == NULL)
3890                         RETURN(-ENOMEM);
3891
3892                 size[REPLY_REC_OFF] = *vallen;
3893                 ptlrpc_req_set_repsize(req, 2, size);
3894                 rc = ptlrpc_queue_wait(req);
3895                 if (rc)
3896                         GOTO(out, rc);
3897
3898                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3899                                            lustre_swab_ost_last_id);
3900                 if (reply == NULL) {
3901                         CERROR("Can't unpack OST last ID\n");
3902                         GOTO(out, rc = -EPROTO);
3903                 }
3904                 *((obd_id *)val) = *reply;
3905         out:
3906                 ptlrpc_req_finished(req);
3907                 RETURN(rc);
3908         } else if (KEY_IS(KEY_FIEMAP)) {
3909                 struct ptlrpc_request *req;
3910                 struct ll_user_fiemap *reply;
3911                 char *bufs[2] = { NULL, key };
3912                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3913                 int rc;
3914
3915                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3916                                       OST_GET_INFO, 2, size, bufs);
3917                 if (req == NULL)
3918                         RETURN(-ENOMEM);
3919
3920                 size[REPLY_REC_OFF] = *vallen;
3921                 ptlrpc_req_set_repsize(req, 2, size);
3922
3923                 rc = ptlrpc_queue_wait(req);
3924                 if (rc)
3925                         GOTO(out1, rc);
3926                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3927                                            lustre_swab_fiemap);
3928                 if (reply == NULL) {
3929                         CERROR("Can't unpack FIEMAP reply.\n");
3930                         GOTO(out1, rc = -EPROTO);
3931                 }
3932
3933                 memcpy(val, reply, *vallen);
3934
3935         out1:
3936                 ptlrpc_req_finished(req);
3937
3938                 RETURN(rc);
3939         }
3940
3941         RETURN(-EINVAL);
3942 }
3943
3944 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3945                                           void *aa, int rc)
3946 {
3947         struct llog_ctxt *ctxt;
3948         struct obd_import *imp = req->rq_import;
3949         ENTRY;
3950
3951         if (rc != 0)
3952                 RETURN(rc);
3953
3954         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3955         if (ctxt) {
3956                 if (rc == 0)
3957                         rc = llog_initiator_connect(ctxt);
3958                 else
3959                         CERROR("cannot establish connection for "
3960                                "ctxt %p: %d\n", ctxt, rc);
3961         }
3962
3963         llog_ctxt_put(ctxt);
3964         spin_lock(&imp->imp_lock);
3965         imp->imp_server_timeout = 1;
3966         imp->imp_pingable = 1;
3967         spin_unlock(&imp->imp_lock);
3968         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3969
3970         RETURN(rc);
3971 }
3972
3973 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3974                               void *key, obd_count vallen, void *val,
3975                               struct ptlrpc_request_set *set)
3976 {
3977         struct ptlrpc_request *req;
3978         struct obd_device  *obd = exp->exp_obd;
3979         struct obd_import *imp = class_exp2cliimp(exp);
3980         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3981         char *bufs[3] = { NULL, key, val };
3982         ENTRY;
3983
3984         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3985
3986         if (KEY_IS(KEY_NEXT_ID)) {
3987                 if (vallen != sizeof(obd_id))
3988                         RETURN(-EINVAL);
3989                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3990                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3991                        exp->exp_obd->obd_name,
3992                        obd->u.cli.cl_oscc.oscc_next_id);
3993
3994                 RETURN(0);
3995         }
3996
3997         if (KEY_IS(KEY_UNLINKED)) {
3998                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3999                 spin_lock(&oscc->oscc_lock);
4000                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4001                 spin_unlock(&oscc->oscc_lock);
4002                 RETURN(0);
4003         }
4004
4005         if (KEY_IS(KEY_INIT_RECOV)) {
4006                 if (vallen != sizeof(int))
4007                         RETURN(-EINVAL);
4008                 spin_lock(&imp->imp_lock);
4009                 imp->imp_initial_recov = *(int *)val;
4010                 spin_unlock(&imp->imp_lock);
4011                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4012                        exp->exp_obd->obd_name,
4013                        imp->imp_initial_recov);
4014                 RETURN(0);
4015         }
4016
4017         if (KEY_IS(KEY_CHECKSUM)) {
4018                 if (vallen != sizeof(int))
4019                         RETURN(-EINVAL);
4020                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4021                 RETURN(0);
4022         }
4023
4024         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4025                 RETURN(-EINVAL);
4026
4027         /* We pass all other commands directly to OST. Since nobody calls osc
4028            methods directly and everybody is supposed to go through LOV, we
4029            assume lov checked invalid values for us.
4030            The only recognised values so far are evict_by_nid and mds_conn.
4031            Even if something bad goes through, we'd get a -EINVAL from OST
4032            anyway. */
4033
4034         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4035                               bufs);
4036         if (req == NULL)
4037                 RETURN(-ENOMEM);
4038
4039         if (KEY_IS(KEY_MDS_CONN))
4040                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4041         else if (KEY_IS(KEY_GRANT_SHRINK))
4042                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4043
4044         if (KEY_IS(KEY_GRANT_SHRINK)) {
4045                 struct osc_grant_args *aa;
4046                 struct obdo *oa;
4047
4048                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4049                 aa = ptlrpc_req_async_args(req);
4050                 OBD_ALLOC_PTR(oa);
4051                 if (!oa) {
4052                         ptlrpc_req_finished(req);
4053                         RETURN(-ENOMEM);
4054                 }
4055                 *oa = ((struct ost_body *)val)->oa;
4056                 aa->aa_oa = oa;
4057
4058                 size[1] = vallen;
4059                 ptlrpc_req_set_repsize(req, 2, size);
4060                 ptlrpcd_add_req(req);
4061         } else {
4062                 ptlrpc_req_set_repsize(req, 1, NULL);
4063                 ptlrpc_set_add_req(set, req);
4064                 ptlrpc_check_set(set);
4065         }
4066
4067         RETURN(0);
4068 }
4069
4070
4071 static struct llog_operations osc_size_repl_logops = {
4072         lop_cancel: llog_obd_repl_cancel
4073 };
4074
4075 static struct llog_operations osc_mds_ost_orig_logops;
4076 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4077                          int *index)
4078 {
4079         struct llog_catid catid;
4080         static char name[32] = CATLIST;
4081         int rc;
4082         ENTRY;
4083
4084         LASSERT(index);
4085
4086         mutex_down(&disk_obd->obd_llog_cat_process);
4087
4088         rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4089         if (rc) {
4090                 CERROR("rc: %d\n", rc);
4091                 GOTO(out_unlock, rc);
4092         }
4093 #if 0
4094         CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4095                obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4096                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4097 #endif
4098
4099         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4100                         &catid.lci_logid, &osc_mds_ost_orig_logops);
4101         if (rc) {
4102                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4103                 GOTO (out, rc);
4104         }
4105
4106         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4107                         &osc_size_repl_logops);
4108         if (rc) {
4109                 struct llog_ctxt *ctxt =
4110                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4111                 if (ctxt)
4112                         llog_cleanup(ctxt);
4113                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4114         }
4115 out:
4116         if (rc) {
4117                 CERROR("osc '%s' tgt '%s' rc=%d\n",
4118                        obd->obd_name, disk_obd->obd_name, rc);
4119                 CERROR("logid "LPX64":0x%x\n",
4120                        catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4121         } else {
4122                 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4123                                        &catid);
4124                 if (rc)
4125                         CERROR("rc: %d\n", rc);
4126         }
4127 out_unlock:
4128         mutex_up(&disk_obd->obd_llog_cat_process);
4129
4130         RETURN(rc);
4131 }
4132
4133 static int osc_llog_finish(struct obd_device *obd, int count)
4134 {
4135         struct llog_ctxt *ctxt;
4136         int rc = 0, rc2 = 0;
4137         ENTRY;
4138
4139         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4140         if (ctxt)
4141                 rc = llog_cleanup(ctxt);
4142
4143         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4144         if (ctxt)
4145                 rc2 = llog_cleanup(ctxt);
4146         if (!rc)
4147                 rc = rc2;
4148
4149         RETURN(rc);
4150 }
4151
4152 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4153                          struct obd_uuid *cluuid,
4154                          struct obd_connect_data *data,
4155                          void *localdata)
4156 {
4157         struct client_obd *cli = &obd->u.cli;
4158
4159         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4160                 long lost_grant;
4161
4162                 client_obd_list_lock(&cli->cl_loi_list_lock);
4163                 data->ocd_grant = cli->cl_avail_grant ?:
4164                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4165                 lost_grant = cli->cl_lost_grant;
4166                 cli->cl_lost_grant = 0;
4167                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4168
4169                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4170                        "cl_lost_grant: %ld\n", data->ocd_grant,
4171                        cli->cl_avail_grant, lost_grant);
4172                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4173                        " ocd_grant: %d\n", data->ocd_connect_flags,
4174                        data->ocd_version, data->ocd_grant);
4175         }
4176
4177         RETURN(0);
4178 }
4179
4180 static int osc_disconnect(struct obd_export *exp)
4181 {
4182         struct obd_device *obd = class_exp2obd(exp);
4183         struct llog_ctxt  *ctxt;
4184         int rc;
4185
4186         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4187         if (ctxt) {
4188                 if (obd->u.cli.cl_conn_count == 1) {
4189                         /* Flush any remaining cancel messages out to the
4190                          * target */
4191                         llog_sync(ctxt, exp);
4192                 }
4193                 llog_ctxt_put(ctxt);
4194         } else {
4195                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4196                        obd);
4197         }
4198
4199         rc = client_disconnect_export(exp);
4200         /**
4201          * Initially we put del_shrink_grant before disconnect_export, but it
4202          * causes the following problem if setup (connect) and cleanup
4203          * (disconnect) are tangled together.
4204          *      connect p1                     disconnect p2
4205          *   ptlrpc_connect_import
4206          *     ...............               class_manual_cleanup
4207          *                                     osc_disconnect
4208          *                                     del_shrink_grant
4209          *   ptlrpc_connect_interrupt
4210          *     init_grant_shrink
4211          *   add this client to shrink list
4212          *                                      cleanup_osc
4213          * Bang! pinger trigger the shrink.
4214          * So the osc should be disconnected from the shrink list, after we
4215          * are sure the import has been destroyed. BUG18662
4216          */
4217         if (obd->u.cli.cl_import == NULL)
4218                 osc_del_shrink_grant(&obd->u.cli);
4219         return rc;
4220 }
4221
4222 static int osc_import_event(struct obd_device *obd,
4223                             struct obd_import *imp,
4224                             enum obd_import_event event)
4225 {
4226         struct client_obd *cli;
4227         int rc = 0;
4228
4229         ENTRY;
4230         LASSERT(imp->imp_obd == obd);
4231
4232         switch (event) {
4233         case IMP_EVENT_DISCON: {
4234                 /* Only do this on the MDS OSC's */
4235                 if (imp->imp_server_timeout) {
4236                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4237
4238                         spin_lock(&oscc->oscc_lock);
4239                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4240                         spin_unlock(&oscc->oscc_lock);
4241                 }
4242                 cli = &obd->u.cli;
4243                 client_obd_list_lock(&cli->cl_loi_list_lock);
4244                 cli->cl_avail_grant = 0;
4245                 cli->cl_lost_grant = 0;
4246                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4247                 ptlrpc_import_setasync(imp, -1);
4248
4249                 break;
4250         }
4251         case IMP_EVENT_INACTIVE: {
4252                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4253                 break;
4254         }
4255         case IMP_EVENT_INVALIDATE: {
4256                 struct ldlm_namespace *ns = obd->obd_namespace;
4257
4258                 /* Reset grants */
4259                 cli = &obd->u.cli;
4260                 client_obd_list_lock(&cli->cl_loi_list_lock);
4261                 /* all pages go to failing rpcs due to the invalid import */
4262                 osc_check_rpcs(cli);
4263                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4264
4265                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4266
4267                 break;
4268         }
4269         case IMP_EVENT_ACTIVE: {
4270                 /* Only do this on the MDS OSC's */
4271                 if (imp->imp_server_timeout) {
4272                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4273
4274                         spin_lock(&oscc->oscc_lock);
4275                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4276                         spin_unlock(&oscc->oscc_lock);
4277                 }
4278                 CDEBUG(D_INFO, "notify server \n");
4279                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4280                 break;
4281         }
4282         case IMP_EVENT_OCD: {
4283                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4284
4285                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4286                         osc_init_grant(&obd->u.cli, ocd);
4287
4288                 /* See bug 7198 */
4289                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4290                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4291
4292                 ptlrpc_import_setasync(imp, 1);
4293                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4294                 break;
4295         }
4296         default:
4297                 CERROR("Unknown import event %d\n", event);
4298                 LBUG();
4299         }
4300         RETURN(rc);
4301 }
4302
4303 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4304 {
4305         int rc;
4306         ENTRY;
4307
4308         ENTRY;
4309         rc = ptlrpcd_addref();
4310         if (rc)
4311                 RETURN(rc);
4312
4313         rc = client_obd_setup(obd, len, buf);
4314         if (rc) {
4315                 ptlrpcd_decref();
4316         } else {
4317                 struct lprocfs_static_vars lvars = { 0 };
4318                 struct client_obd *cli = &obd->u.cli;
4319
4320                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4321                 lprocfs_osc_init_vars(&lvars);
4322                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4323                         lproc_osc_attach_seqstat(obd);
4324                         ptlrpc_lprocfs_register_obd(obd);
4325                 }
4326
4327                 oscc_init(obd);
4328                 /* We need to allocate a few requests more, because
4329                    brw_interpret tries to create new requests before freeing
4330                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4331                    reserved, but I afraid that might be too much wasted RAM
4332                    in fact, so 2 is just my guess and still should work. */
4333                 cli->cl_import->imp_rq_pool =
4334                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4335                                             OST_MAXREQSIZE,
4336                                             ptlrpc_add_rqs_to_pool);
4337                 cli->cl_cache = cache_create(obd);
4338                 if (!cli->cl_cache) {
4339                         osc_cleanup(obd);
4340                         rc = -ENOMEM;
4341                 }
4342                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4343                 sema_init(&cli->cl_grant_sem, 1);
4344         }
4345
4346         RETURN(rc);
4347 }
4348
4349 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4350 {
4351         int rc = 0;
4352         ENTRY;
4353
4354         switch (stage) {
4355         case OBD_CLEANUP_EARLY: {
4356                 struct obd_import *imp;
4357                 imp = obd->u.cli.cl_import;
4358                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4359                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4360                 ptlrpc_deactivate_import(imp);
4361                 break;
4362         }
4363         case OBD_CLEANUP_EXPORTS: {
4364                 /* If we set up but never connected, the
4365                    client import will not have been cleaned. */
4366                 down_write(&obd->u.cli.cl_sem);
4367                 if (obd->u.cli.cl_import) {
4368                         struct obd_import *imp;
4369                         imp = obd->u.cli.cl_import;
4370                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4371                                obd->obd_name);
4372                         ptlrpc_invalidate_import(imp);
4373                         if (imp->imp_rq_pool) {
4374                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4375                                 imp->imp_rq_pool = NULL;
4376                         }
4377                         class_destroy_import(imp);
4378                         obd->u.cli.cl_import = NULL;
4379                 }
4380                 up_write(&obd->u.cli.cl_sem);
4381
4382                 rc = obd_llog_finish(obd, 0);
4383                 if (rc != 0)
4384                         CERROR("failed to cleanup llogging subsystems\n");
4385                 break;
4386         }
4387         case OBD_CLEANUP_SELF_EXP:
4388                 break;
4389         case OBD_CLEANUP_OBD:
4390                 break;
4391         }
4392         RETURN(rc);
4393 }
4394
4395 int osc_cleanup(struct obd_device *obd)
4396 {
4397         int rc;
4398
4399         ENTRY;
4400         ptlrpc_lprocfs_unregister_obd(obd);
4401         lprocfs_obd_cleanup(obd);
4402
4403         /* free memory of osc quota cache */
4404         lquota_cleanup(quota_interface, obd);
4405
4406         cache_destroy(obd->u.cli.cl_cache);
4407         rc = client_obd_cleanup(obd);
4408
4409         ptlrpcd_decref();
4410         RETURN(rc);
4411 }
4412
4413 static int osc_register_page_removal_cb(struct obd_device *obd,
4414                                         obd_page_removal_cb_t func,
4415                                         obd_pin_extent_cb pin_cb)
4416 {
4417         ENTRY;
4418
4419         /* this server - not need init */
4420         if (func == NULL)
4421                 return 0;
4422
4423         return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4424                                            pin_cb);
4425 }
4426
4427 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4428                                           obd_page_removal_cb_t func)
4429 {
4430         ENTRY;
4431         return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4432 }
4433
4434 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4435                                        obd_lock_cancel_cb cb)
4436 {
4437         ENTRY;
4438         LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4439
4440         /* this server - not need init */
4441         if (cb == NULL)
4442                 return 0;
4443
4444         obd->u.cli.cl_ext_lock_cancel_cb = cb;
4445         return 0;
4446 }
4447
4448 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4449                                          obd_lock_cancel_cb cb)
4450 {
4451         ENTRY;
4452
4453         if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4454                 CERROR("Unregistering cancel cb %p, while only %p was "
4455                        "registered\n", cb,
4456                        obd->u.cli.cl_ext_lock_cancel_cb);
4457                 RETURN(-EINVAL);
4458         }
4459
4460         obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4461         return 0;
4462 }
4463
4464 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4465 {
4466         struct lustre_cfg *lcfg = buf;
4467         struct lprocfs_static_vars lvars = { 0 };
4468         int rc = 0;
4469
4470         lprocfs_osc_init_vars(&lvars);
4471
4472         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4473         return(rc);
4474 }
4475
4476 struct obd_ops osc_obd_ops = {
4477         .o_owner                = THIS_MODULE,
4478         .o_setup                = osc_setup,
4479         .o_precleanup           = osc_precleanup,
4480         .o_cleanup              = osc_cleanup,
4481         .o_add_conn             = client_import_add_conn,
4482         .o_del_conn             = client_import_del_conn,
4483         .o_connect              = client_connect_import,
4484         .o_reconnect            = osc_reconnect,
4485         .o_disconnect           = osc_disconnect,
4486         .o_statfs               = osc_statfs,
4487         .o_statfs_async         = osc_statfs_async,
4488         .o_packmd               = osc_packmd,
4489         .o_unpackmd             = osc_unpackmd,
4490         .o_precreate            = osc_precreate,
4491         .o_create               = osc_create,
4492         .o_create_async         = osc_create_async,
4493         .o_destroy              = osc_destroy,
4494         .o_getattr              = osc_getattr,
4495         .o_getattr_async        = osc_getattr_async,
4496         .o_setattr              = osc_setattr,
4497         .o_setattr_async        = osc_setattr_async,
4498         .o_brw                  = osc_brw,
4499         .o_brw_async            = osc_brw_async,
4500         .o_prep_async_page      = osc_prep_async_page,
4501         .o_get_lock             = osc_get_lock,
4502         .o_queue_async_io       = osc_queue_async_io,
4503         .o_set_async_flags      = osc_set_async_flags,
4504         .o_queue_group_io       = osc_queue_group_io,
4505         .o_trigger_group_io     = osc_trigger_group_io,
4506         .o_teardown_async_page  = osc_teardown_async_page,
4507         .o_punch                = osc_punch,
4508         .o_sync                 = osc_sync,
4509         .o_enqueue              = osc_enqueue,
4510         .o_match                = osc_match,
4511         .o_change_cbdata        = osc_change_cbdata,
4512         .o_cancel               = osc_cancel,
4513         .o_cancel_unused        = osc_cancel_unused,
4514         .o_join_lru             = osc_join_lru,
4515         .o_iocontrol            = osc_iocontrol,
4516         .o_get_info             = osc_get_info,
4517         .o_set_info_async       = osc_set_info_async,
4518         .o_import_event         = osc_import_event,
4519         .o_llog_init            = osc_llog_init,
4520         .o_llog_finish          = osc_llog_finish,
4521         .o_process_config       = osc_process_config,
4522         .o_register_page_removal_cb = osc_register_page_removal_cb,
4523         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4524         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4525         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4526 };
4527 int __init osc_init(void)
4528 {
4529         struct lprocfs_static_vars lvars = { 0 };
4530         int rc;
4531         ENTRY;
4532
4533         lprocfs_osc_init_vars(&lvars);
4534
4535         request_module("lquota");
4536         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4537         lquota_init(quota_interface);
4538         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4539
4540         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4541                                  LUSTRE_OSC_NAME);
4542         if (rc) {
4543                 if (quota_interface)
4544                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4545                 RETURN(rc);
4546         }
4547
4548         osc_mds_ost_orig_logops = llog_lvfs_ops;
4549         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4550         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4551         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4552         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4553
4554         RETURN(rc);
4555 }
4556
4557 #ifdef __KERNEL__
4558 static void /*__exit*/ osc_exit(void)
4559 {
4560         lquota_exit(quota_interface);
4561         if (quota_interface)
4562                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4563
4564         class_unregister_type(LUSTRE_OSC_NAME);
4565 }
4566
4567 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4568 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4569 MODULE_LICENSE("GPL");
4570
4571 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4572 #endif