Whamcloud - gitweb
Revert "b=20433 decrease the usage of memory on clients."
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  void *data, int rc)
168 {
169         struct ost_body *body;
170         struct osc_async_args *aa = data;
171         ENTRY;
172
173         if (rc != 0)
174                 GOTO(out, rc);
175
176         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177                                   lustre_swab_ost_body);
178         if (body) {
179                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
181
182                 /* This should really be sent by the OST */
183                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
185         } else {
186                 CERROR("can't unpack ost_body\n");
187                 rc = -EPROTO;
188                 aa->aa_oi->oi_oa->o_valid = 0;
189         }
190 out:
191         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
192         RETURN(rc);
193 }
194
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196                              struct ptlrpc_request_set *set)
197 {
198         struct ptlrpc_request *req;
199         struct ost_body *body;
200         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201         struct osc_async_args *aa;
202         ENTRY;
203
204         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205                               OST_GETATTR, 2, size,NULL);
206         if (!req)
207                 RETURN(-ENOMEM);
208
209         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
211
212         ptlrpc_req_set_repsize(req, 2, size);
213         req->rq_interpret_reply = osc_getattr_interpret;
214
215         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216         aa = ptlrpc_req_async_args(req);
217         aa->aa_oi = oinfo;
218
219         ptlrpc_set_add_req(set, req);
220         RETURN (0);
221 }
222
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
224 {
225         struct ptlrpc_request *req;
226         struct ost_body *body;
227         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
228         int rc;
229         ENTRY;
230
231         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232                               OST_GETATTR, 2, size, NULL);
233         if (!req)
234                 RETURN(-ENOMEM);
235
236         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
238
239         ptlrpc_req_set_repsize(req, 2, size);
240
241         rc = ptlrpc_queue_wait(req);
242         if (rc) {
243                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
244                 GOTO(out, rc);
245         }
246
247         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248                                   lustre_swab_ost_body);
249         if (body == NULL) {
250                 CERROR ("can't unpack ost_body\n");
251                 GOTO (out, rc = -EPROTO);
252         }
253
254         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
256
257         /* This should really be sent by the OST */
258         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
260
261         EXIT;
262  out:
263         ptlrpc_req_finished(req);
264         return rc;
265 }
266
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268                        struct obd_trans_info *oti)
269 {
270         struct ptlrpc_request *req;
271         struct ost_body *body;
272         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
273         int rc;
274         ENTRY;
275
276         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277                               OST_SETATTR, 2, size, NULL);
278         if (!req)
279                 RETURN(-ENOMEM);
280
281         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
283
284         ptlrpc_req_set_repsize(req, 2, size);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291                                   lustre_swab_ost_body);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         EXIT;
298 out:
299         ptlrpc_req_finished(req);
300         RETURN(rc);
301 }
302
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
304                                  void *data, int rc)
305 {
306         struct ost_body *body;
307         struct osc_async_args *aa = data;
308         ENTRY;
309
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314                                   lustre_swab_ost_body);
315         if (body == NULL) {
316                 CERROR("can't unpack ost_body\n");
317                 GOTO(out, rc = -EPROTO);
318         }
319
320         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
321 out:
322         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
323         RETURN(rc);
324 }
325
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327                              struct obd_trans_info *oti,
328                              struct ptlrpc_request_set *rqset)
329 {
330         struct ptlrpc_request *req;
331         struct ost_body *body;
332         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
333         int bufcount = 2;
334         struct osc_async_args *aa;
335         ENTRY;
336
337         if (osc_exp_is_2_0_server(exp)) {
338                 bufcount = 3;
339         }
340
341         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342                               OST_SETATTR, bufcount, size, NULL);
343         if (!req)
344                 RETURN(-ENOMEM);
345
346         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
347
348         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
349                 LASSERT(oti);
350                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
351         }
352
353         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354         ptlrpc_req_set_repsize(req, 2, size);
355         /* do mds to ost setattr asynchronouly */
356         if (!rqset) {
357                 /* Do not wait for response. */
358                 ptlrpcd_add_req(req);
359         } else {
360                 req->rq_interpret_reply = osc_setattr_interpret;
361
362                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363                 aa = ptlrpc_req_async_args(req);
364                 aa->aa_oi = oinfo;
365
366                 ptlrpc_set_add_req(rqset, req);
367         }
368
369         RETURN(0);
370 }
371
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
374 {
375         struct ptlrpc_request *req;
376         struct ost_body *body;
377         struct lov_stripe_md *lsm;
378         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
379         int rc;
380         ENTRY;
381
382         LASSERT(oa);
383         LASSERT(ea);
384
385         lsm = *ea;
386         if (!lsm) {
387                 rc = obd_alloc_memmd(exp, &lsm);
388                 if (rc < 0)
389                         RETURN(rc);
390         }
391
392         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393                               OST_CREATE, 2, size, NULL);
394         if (!req)
395                 GOTO(out, rc = -ENOMEM);
396
397         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398         lustre_set_wire_obdo(&body->oa, oa);
399
400         ptlrpc_req_set_repsize(req, 2, size);
401         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402             oa->o_flags == OBD_FL_DELORPHAN) {
403                 DEBUG_REQ(D_HA, req,
404                           "delorphan from OST integration");
405                 /* Don't resend the delorphan req */
406                 req->rq_no_resend = req->rq_no_delay = 1;
407         }
408
409         rc = ptlrpc_queue_wait(req);
410         if (rc)
411                 GOTO(out_req, rc);
412
413         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414                                   lustre_swab_ost_body);
415         if (body == NULL) {
416                 CERROR ("can't unpack ost_body\n");
417                 GOTO (out_req, rc = -EPROTO);
418         }
419
420         lustre_get_wire_obdo(oa, &body->oa);
421
422         /* This should really be sent by the OST */
423         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424         oa->o_valid |= OBD_MD_FLBLKSZ;
425
426         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427          * have valid lsm_oinfo data structs, so don't go touching that.
428          * This needs to be fixed in a big way.
429          */
430         lsm->lsm_object_id = oa->o_id;
431         *ea = lsm;
432
433         if (oti != NULL) {
434                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
435
436                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437                         if (!oti->oti_logcookies)
438                                 oti_alloc_cookies(oti, 1);
439                         *oti->oti_logcookies = oa->o_lcookie;
440                 }
441         }
442
443         CDEBUG(D_HA, "transno: "LPD64"\n",
444                lustre_msg_get_transno(req->rq_repmsg));
445 out_req:
446         ptlrpc_req_finished(req);
447 out:
448         if (rc && !*ea)
449                 obd_free_memmd(exp, &lsm);
450         RETURN(rc);
451 }
452
453 static int osc_punch_interpret(struct ptlrpc_request *req,
454                                void *data, int rc)
455 {
456         struct ost_body *body;
457         struct osc_async_args *aa = data;
458         ENTRY;
459
460         if (rc != 0)
461                 GOTO(out, rc);
462
463         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464                                   lustre_swab_ost_body);
465         if (body == NULL) {
466                 CERROR ("can't unpack ost_body\n");
467                 GOTO(out, rc = -EPROTO);
468         }
469
470         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
471 out:
472         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
473         RETURN(rc);
474 }
475
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477                      struct obd_trans_info *oti,
478                      struct ptlrpc_request_set *rqset)
479 {
480         struct ptlrpc_request *req;
481         struct osc_async_args *aa;
482         struct ost_body *body;
483         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
484         ENTRY;
485
486         if (!oinfo->oi_oa) {
487                 CERROR("oa NULL\n");
488                 RETURN(-EINVAL);
489         }
490
491         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492                               OST_PUNCH, 2, size, NULL);
493         if (!req)
494                 RETURN(-ENOMEM);
495
496         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
497         ptlrpc_at_set_req_timeout(req);
498
499         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
501
502         /* overload the size and blocks fields in the oa with start/end */
503         body->oa.o_size = oinfo->oi_policy.l_extent.start;
504         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
506
507         ptlrpc_req_set_repsize(req, 2, size);
508
509         req->rq_interpret_reply = osc_punch_interpret;
510         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511         aa = ptlrpc_req_async_args(req);
512         aa->aa_oi = oinfo;
513         ptlrpc_set_add_req(rqset, req);
514
515         RETURN(0);
516 }
517
518 static int osc_sync_interpret(struct ptlrpc_request *req,
519                               void *data, int rc)
520 {
521         struct ost_body *body;
522         struct osc_async_args *aa = data;
523         ENTRY;
524
525         if (rc)
526                 GOTO(out, rc);
527
528         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529                                   lustre_swab_ost_body);
530         if (body == NULL) {
531                 CERROR ("can't unpack ost_body\n");
532                 GOTO(out, rc = -EPROTO);
533         }
534
535         *aa->aa_oi->oi_oa = body->oa;
536 out:
537         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
538         RETURN(rc);
539 }
540
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542                     obd_size start, obd_size end,
543                     struct ptlrpc_request_set *set)
544 {
545         struct ptlrpc_request *req;
546         struct ost_body *body;
547         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548         struct osc_async_args *aa;
549         ENTRY;
550
551         if (!oinfo->oi_oa) {
552                 CERROR("oa NULL\n");
553                 RETURN(-EINVAL);
554         }
555
556         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557                               OST_SYNC, 2, size, NULL);
558         if (!req)
559                 RETURN(-ENOMEM);
560
561         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
563
564         /* overload the size and blocks fields in the oa with start/end */
565         body->oa.o_size = start;
566         body->oa.o_blocks = end;
567         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
568
569         ptlrpc_req_set_repsize(req, 2, size);
570         req->rq_interpret_reply = osc_sync_interpret;
571
572         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573         aa = ptlrpc_req_async_args(req);
574         aa->aa_oi = oinfo;
575
576         ptlrpc_set_add_req(set, req);
577         RETURN (0);
578 }
579
580 /* Find and cancel locally locks matched by @mode in the resource found by
581  * @objid. Found locks are added into @cancel list. Returns the amount of
582  * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584                                    struct list_head *cancels, ldlm_mode_t mode,
585                                    int lock_flags)
586 {
587         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588         struct ldlm_res_id res_id;
589         struct ldlm_resource *res;
590         int count;
591         ENTRY;
592
593         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
595         if (res == NULL)
596                 RETURN(0);
597
598         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599                                            lock_flags, 0, NULL);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
605                                  int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         cfs_waitq_signal(&cli->cl_destroy_waitq);
611         return 0;
612 }
613
614 static int osc_can_send_destroy(struct client_obd *cli)
615 {
616         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617             cli->cl_max_rpcs_in_flight) {
618                 /* The destroy request can be sent */
619                 return 1;
620         }
621         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622             cli->cl_max_rpcs_in_flight) {
623                 /*
624                  * The counter has been modified between the two atomic
625                  * operations.
626                  */
627                 cfs_waitq_signal(&cli->cl_destroy_waitq);
628         }
629         return 0;
630 }
631
632 /* Destroy requests can be async always on the client, and we don't even really
633  * care about the return code since the client cannot do anything at all about
634  * a destroy failure.
635  * When the MDS is unlinking a filename, it saves the file objects into a
636  * recovery llog, and these object records are cancelled when the OST reports
637  * they were destroyed and sync'd to disk (i.e. transaction committed).
638  * If the client dies, or the OST is down when the object should be destroyed,
639  * the records are not cancelled, and when the OST reconnects to the MDS next,
640  * it will retrieve the llog unlink logs and then sends the log cancellation
641  * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
644                        struct obd_export *md_export)
645 {
646         CFS_LIST_HEAD(cancels);
647         struct ptlrpc_request *req;
648         struct ost_body *body;
649         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650                         sizeof(struct ldlm_request) };
651         int count, bufcount = 2;
652         struct client_obd *cli = &exp->exp_obd->u.cli;
653         ENTRY;
654
655         if (!oa) {
656                 CERROR("oa NULL\n");
657                 RETURN(-EINVAL);
658         }
659
660         LASSERT(oa->o_id != 0);
661
662         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663                                         LDLM_FL_DISCARD_DATA);
664         if (exp_connect_cancelset(exp))
665                 bufcount = 3;
666         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
668         if (!req)
669                 RETURN(-ENOMEM);
670
671         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
672         ptlrpc_at_set_req_timeout(req);
673
674         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
675
676         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677                 oa->o_lcookie = *oti->oti_logcookies;
678         }
679
680         lustre_set_wire_obdo(&body->oa, oa);
681         ptlrpc_req_set_repsize(req, 2, size);
682
683         /* don't throttle destroy RPCs for the MDT */
684         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685                 req->rq_interpret_reply = osc_destroy_interpret;
686                 if (!osc_can_send_destroy(cli)) {
687                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
688                                                           NULL);
689
690                         /*
691                          * Wait until the number of on-going destroy RPCs drops
692                          * under max_rpc_in_flight
693                          */
694                         l_wait_event_exclusive(cli->cl_destroy_waitq,
695                                                osc_can_send_destroy(cli), &lwi);
696                 }
697         }
698
699         /* Do not wait for response */
700         ptlrpcd_add_req(req);
701         RETURN(0);
702 }
703
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
705                                 long writing_bytes)
706 {
707         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
708
709         LASSERT(!(oa->o_valid & bits));
710
711         oa->o_valid |= bits;
712         client_obd_list_lock(&cli->cl_loi_list_lock);
713         oa->o_dirty = cli->cl_dirty;
714         if (cli->cl_dirty > cli->cl_dirty_max) {
715                 CERROR("dirty %lu > dirty_max %lu\n",
716                        cli->cl_dirty, cli->cl_dirty_max);
717                 oa->o_undirty = 0;
718         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719                 /* The atomic_read() allowing the atomic_inc() are not covered
720                  * by a lock thus they may safely race and trip this CERROR()
721                  * unless we add in a small fudge factor (+1). */
722                 CERROR("dirty %d > system dirty_max %d\n",
723                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
724                 oa->o_undirty = 0;
725         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726                 CERROR("dirty %lu - dirty_max %lu too big???\n",
727                        cli->cl_dirty, cli->cl_dirty_max);
728                 oa->o_undirty = 0;
729         } else {
730                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731                                 (cli->cl_max_rpcs_in_flight + 1);
732                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
733         }
734         oa->o_grant = cli->cl_avail_grant;
735         oa->o_dropped = cli->cl_lost_grant;
736         cli->cl_lost_grant = 0;
737         client_obd_list_unlock(&cli->cl_loi_list_lock);
738         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
740
741 }
742
743 static void osc_update_next_shrink(struct client_obd *cli)
744 {
745         cli->cl_next_shrink_grant =
746                 cfs_time_shift(cli->cl_grant_shrink_interval);
747         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748                cli->cl_next_shrink_grant);
749 }
750
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
753 {
754         atomic_inc(&obd_dirty_pages);
755         cli->cl_dirty += CFS_PAGE_SIZE;
756         cli->cl_avail_grant -= CFS_PAGE_SIZE;
757         pga->flag |= OBD_BRW_FROM_GRANT;
758         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759                CFS_PAGE_SIZE, pga, pga->pg);
760         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761                  cli->cl_avail_grant);
762         osc_update_next_shrink(cli);
763 }
764
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766  * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768                                     struct brw_page *pga, int sent)
769 {
770         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
771         ENTRY;
772
773         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
774                 EXIT;
775                 return;
776         }
777
778         pga->flag &= ~OBD_BRW_FROM_GRANT;
779         atomic_dec(&obd_dirty_pages);
780         cli->cl_dirty -= CFS_PAGE_SIZE;
781         if (!sent) {
782                 cli->cl_lost_grant += CFS_PAGE_SIZE;
783                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786                 /* For short writes we shouldn't count parts of pages that
787                  * span a whole block on the OST side, or our accounting goes
788                  * wrong.  Should match the code in filter_grant_check. */
789                 int offset = pga->off & ~CFS_PAGE_MASK;
790                 int count = pga->count + (offset & (blocksize - 1));
791                 int end = (offset + pga->count) & (blocksize - 1);
792                 if (end)
793                         count += blocksize - end;
794
795                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798                        cli->cl_avail_grant, cli->cl_dirty);
799         }
800
801         EXIT;
802 }
803
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
805 {
806         return cli->cl_r_in_flight + cli->cl_w_in_flight;
807 }
808
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
811 {
812         struct list_head *l, *tmp;
813         struct osc_cache_waiter *ocw;
814
815         ENTRY;
816         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817                 /* if we can't dirty more, we must wait until some is written */
818                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821                                "osc max %ld, sys max %d\n", cli->cl_dirty,
822                                cli->cl_dirty_max, obd_max_dirty_pages);
823                         return;
824                 }
825
826                 /* if still dirty cache but no grant wait for pending RPCs that
827                  * may yet return us some grant before doing sync writes */
828                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830                                cli->cl_w_in_flight);
831                         return;
832                 }
833
834                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835                 list_del_init(&ocw->ocw_entry);
836                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837                         /* no more RPCs in flight to return grant, do sync IO */
838                         ocw->ocw_rc = -EDQUOT;
839                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
840                 } else {
841                         osc_consume_write_grant(cli,
842                                                 &ocw->ocw_oap->oap_brw_page);
843                 }
844
845                 cfs_waitq_signal(&ocw->ocw_waitq);
846         }
847
848         EXIT;
849 }
850
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
852 {
853         client_obd_list_lock(&cli->cl_loi_list_lock);
854         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855         if (body->oa.o_valid & OBD_MD_FLGRANT)
856                 cli->cl_avail_grant += body->oa.o_grant;
857         /* waiters are woken in brw_interpret */
858         client_obd_list_unlock(&cli->cl_loi_list_lock);
859 }
860
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862                               void *key, obd_count vallen, void *val,
863                               struct ptlrpc_request_set *set);
864
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
866                                       void *data, int rc)
867 {
868         struct osc_grant_args *aa = data;
869         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870         struct obdo *oa = aa->aa_oa;
871         struct ost_body *body;
872
873         if (rc != 0) {
874                 client_obd_list_lock(&cli->cl_loi_list_lock);
875                 cli->cl_avail_grant += oa->o_grant;
876                 client_obd_list_unlock(&cli->cl_loi_list_lock);
877                 GOTO(out, rc);
878         }
879         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880                                 lustre_swab_ost_body);
881         osc_update_grant(cli, body);
882 out:
883         OBD_FREE_PTR(oa);
884         return rc;
885 }
886
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
888 {
889         client_obd_list_lock(&cli->cl_loi_list_lock);
890         oa->o_grant = cli->cl_avail_grant / 4;
891         cli->cl_avail_grant -= oa->o_grant;
892         client_obd_list_unlock(&cli->cl_loi_list_lock);
893         oa->o_flags |= OBD_FL_SHRINK_GRANT;
894         osc_update_next_shrink(cli);
895 }
896
897 /* Shrink the current grant, either from some large amount to enough for a
898  * full set of in-flight RPCs, or if we have already shrunk to that limit
899  * then to enough for a single RPC.  This avoids keeping more grant than
900  * needed, and avoids shrinking the grant piecemeal. */
901 static int osc_shrink_grant(struct client_obd *cli)
902 {
903         long target = (cli->cl_max_rpcs_in_flight + 1) *
904                       cli->cl_max_pages_per_rpc;
905
906         client_obd_list_lock(&cli->cl_loi_list_lock);
907         if (cli->cl_avail_grant <= target)
908                 target = cli->cl_max_pages_per_rpc;
909         client_obd_list_unlock(&cli->cl_loi_list_lock);
910
911         return osc_shrink_grant_to_target(cli, target);
912 }
913
914 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
915 {
916         int    rc = 0;
917         struct ost_body     *body;
918         ENTRY;
919
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         /* Don't shrink if we are already above or below the desired limit
922          * We don't want to shrink below a single RPC, as that will negatively
923          * impact block allocation and long-term performance. */
924         if (target < cli->cl_max_pages_per_rpc)
925                 target = cli->cl_max_pages_per_rpc;
926
927         if (target >= cli->cl_avail_grant) {
928                 client_obd_list_unlock(&cli->cl_loi_list_lock);
929                 RETURN(0);
930         }
931         client_obd_list_unlock(&cli->cl_loi_list_lock);
932
933         OBD_ALLOC_PTR(body);
934         if (!body)
935                 RETURN(-ENOMEM);
936
937         osc_announce_cached(cli, &body->oa, 0);
938
939         client_obd_list_lock(&cli->cl_loi_list_lock);
940         body->oa.o_grant = cli->cl_avail_grant - target;
941         cli->cl_avail_grant = target;
942         client_obd_list_unlock(&cli->cl_loi_list_lock);
943         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
944         osc_update_next_shrink(cli);
945
946         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
947                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
948                                 sizeof(*body), body, NULL);
949         if (rc) {
950                 client_obd_list_lock(&cli->cl_loi_list_lock);
951                 cli->cl_avail_grant += body->oa.o_grant;
952                 client_obd_list_unlock(&cli->cl_loi_list_lock);
953         }
954         OBD_FREE_PTR(body);
955         RETURN(rc);
956 }
957
958 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
959 static int osc_should_shrink_grant(struct client_obd *client)
960 {
961         cfs_time_t time = cfs_time_current();
962         cfs_time_t next_shrink = client->cl_next_shrink_grant;
963         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
964                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
965                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
966                         return 1;
967                 else
968                         osc_update_next_shrink(client);
969         }
970         return 0;
971 }
972
973 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
974 {
975         struct client_obd *client;
976
977         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
978                 if (osc_should_shrink_grant(client))
979                         osc_shrink_grant(client);
980         }
981         return 0;
982 }
983
984 static int osc_add_shrink_grant(struct client_obd *client)
985 {
986         int rc;
987
988         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
989                                        TIMEOUT_GRANT,
990                                        osc_grant_shrink_grant_cb, NULL,
991                                        &client->cl_grant_shrink_list);
992         if (rc) {
993                 CERROR("add grant client %s error %d\n",
994                         client->cl_import->imp_obd->obd_name, rc);
995                 return rc;
996         }
997         CDEBUG(D_CACHE, "add grant client %s \n",
998                client->cl_import->imp_obd->obd_name);
999         osc_update_next_shrink(client);
1000         return 0;
1001 }
1002
1003 static int osc_del_shrink_grant(struct client_obd *client)
1004 {
1005         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1006                                          TIMEOUT_GRANT);
1007 }
1008
1009 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1010 {
1011         client_obd_list_lock(&cli->cl_loi_list_lock);
1012         cli->cl_avail_grant = ocd->ocd_grant;
1013         client_obd_list_unlock(&cli->cl_loi_list_lock);
1014
1015         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1016             list_empty(&cli->cl_grant_shrink_list))
1017                 osc_add_shrink_grant(cli);
1018
1019         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1020                cli->cl_avail_grant, cli->cl_lost_grant);
1021         LASSERT(cli->cl_avail_grant >= 0);
1022 }
1023
1024 /* We assume that the reason this OSC got a short read is because it read
1025  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1026  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1027  * this stripe never got written at or beyond this stripe offset yet. */
1028 static void handle_short_read(int nob_read, obd_count page_count,
1029                               struct brw_page **pga, int pshift)
1030 {
1031         char *ptr;
1032         int i = 0;
1033
1034         /* skip bytes read OK */
1035         while (nob_read > 0) {
1036                 LASSERT (page_count > 0);
1037
1038                 if (pga[i]->count > nob_read) {
1039                         /* EOF inside this page */
1040                         ptr = cfs_kmap(pga[i]->pg) +
1041                               (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1042                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1043                         cfs_kunmap(pga[i]->pg);
1044                         page_count--;
1045                         i++;
1046                         break;
1047                 }
1048
1049                 nob_read -= pga[i]->count;
1050                 page_count--;
1051                 i++;
1052         }
1053
1054         /* zero remaining pages */
1055         while (page_count-- > 0) {
1056                 ptr = cfs_kmap(pga[i]->pg) +
1057                       (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1058                 memset(ptr, 0, pga[i]->count);
1059                 cfs_kunmap(pga[i]->pg);
1060                 i++;
1061         }
1062 }
1063
1064 static int check_write_rcs(struct ptlrpc_request *req,
1065                            int requested_nob, int niocount,
1066                            obd_count page_count, struct brw_page **pga)
1067 {
1068         int    *remote_rcs, i;
1069
1070         /* return error if any niobuf was in error */
1071         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1072                                         sizeof(*remote_rcs) * niocount, NULL);
1073         if (remote_rcs == NULL) {
1074                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1075                 return(-EPROTO);
1076         }
1077         if (lustre_rep_need_swab(req))
1078                 for (i = 0; i < niocount; i++)
1079                         __swab32s(&remote_rcs[i]);
1080
1081         for (i = 0; i < niocount; i++) {
1082                 if (remote_rcs[i] < 0)
1083                         return(remote_rcs[i]);
1084
1085                 if (remote_rcs[i] != 0) {
1086                         CERROR("rc[%d] invalid (%d) req %p\n",
1087                                 i, remote_rcs[i], req);
1088                         return(-EPROTO);
1089                 }
1090         }
1091
1092         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1093                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1094                        req->rq_bulk->bd_nob_transferred, requested_nob);
1095                 return(-EPROTO);
1096         }
1097
1098         return (0);
1099 }
1100
1101 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1102 {
1103         if (p1->flag != p2->flag) {
1104                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1105
1106                 /* warn if we try to combine flags that we don't know to be
1107                  * safe to combine */
1108                 if ((p1->flag & mask) != (p2->flag & mask))
1109                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1110                                "same brw?\n", p1->flag, p2->flag);
1111                 return 0;
1112         }
1113
1114         return (p1->off + p1->count == p2->off);
1115 }
1116
1117 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1118                                    struct brw_page **pga, int opc,
1119                                    cksum_type_t cksum_type, int pshift)
1120 {
1121         __u32 cksum;
1122         int i = 0;
1123
1124         LASSERT (pg_count > 0);
1125         cksum = init_checksum(cksum_type);
1126         while (nob > 0 && pg_count > 0) {
1127                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1128                 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1129                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1130
1131                 /* corrupt the data before we compute the checksum, to
1132                  * simulate an OST->client data error */
1133                 if (i == 0 && opc == OST_READ &&
1134                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1135                         memcpy(ptr + off, "bad1", min(4, nob));
1136                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1137                 cfs_kunmap(pga[i]->pg);
1138                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1139                                off, cksum);
1140
1141                 nob -= pga[i]->count;
1142                 pg_count--;
1143                 i++;
1144         }
1145         /* For sending we only compute the wrong checksum instead
1146          * of corrupting the data so it is still correct on a redo */
1147         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1148                 cksum++;
1149
1150         return cksum;
1151 }
1152
1153 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1154                                 struct lov_stripe_md *lsm, obd_count page_count,
1155                                 struct brw_page **pga,
1156                                 struct ptlrpc_request **reqp, int pshift)
1157 {
1158         struct ptlrpc_request   *req;
1159         struct ptlrpc_bulk_desc *desc;
1160         struct ost_body         *body;
1161         struct obd_ioobj        *ioobj;
1162         struct niobuf_remote    *niobuf;
1163         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1164         int niocount, i, requested_nob, opc, rc;
1165         struct ptlrpc_request_pool *pool;
1166         struct osc_brw_async_args *aa;
1167         struct brw_page *pg_prev;
1168
1169         ENTRY;
1170         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1171         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1172
1173         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1174         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1175
1176         for (niocount = i = 1; i < page_count; i++) {
1177                 if (!can_merge_pages(pga[i - 1], pga[i]))
1178                         niocount++;
1179         }
1180
1181         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1182         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1183
1184         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1185                                    NULL, pool);
1186         if (req == NULL)
1187                 RETURN (-ENOMEM);
1188
1189         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1190         ptlrpc_at_set_req_timeout(req);
1191
1192         if (opc == OST_WRITE)
1193                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1194                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1195         else
1196                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1197                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1198         if (desc == NULL)
1199                 GOTO(out, rc = -ENOMEM);
1200         /* NB request now owns desc and will free it when it gets freed */
1201
1202         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1203         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1204         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1205                                 niocount * sizeof(*niobuf));
1206
1207         lustre_set_wire_obdo(&body->oa, oa);
1208         obdo_to_ioobj(oa, ioobj);
1209         ioobj->ioo_bufcnt = niocount;
1210
1211         LASSERT (page_count > 0);
1212         pg_prev = pga[0];
1213         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1214                 struct brw_page *pg = pga[i];
1215
1216                 LASSERT(pg->count > 0);
1217                 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1218                          pg->count <= CFS_PAGE_SIZE,
1219                          "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1220                          i, pg, pg->off, pg->count, pshift);
1221 #ifdef __linux__
1222                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1223                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1224                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1225                          i, page_count,
1226                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1227                          pg_prev->pg, page_private(pg_prev->pg),
1228                          pg_prev->pg->index, pg_prev->off);
1229 #else
1230                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1231                          "i %d p_c %u\n", i, page_count);
1232 #endif
1233                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1234                         (pg->flag & OBD_BRW_SRVLOCK));
1235
1236                 ptlrpc_prep_bulk_page(desc, pg->pg,
1237                                       OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1238                                       pg->count);
1239                 requested_nob += pg->count;
1240
1241                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1242                         niobuf--;
1243                         niobuf->len += pg->count;
1244                 } else {
1245                         niobuf->offset = pg->off;
1246                         niobuf->len    = pg->count;
1247                         niobuf->flags  = pg->flag;
1248                 }
1249                 pg_prev = pg;
1250         }
1251
1252         LASSERTF((void *)(niobuf - niocount) ==
1253                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1254                                niocount * sizeof(*niobuf)),
1255                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1256                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1257                 (void *)(niobuf - niocount));
1258
1259         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1260         if (osc_should_shrink_grant(cli))
1261                 osc_shrink_grant_local(cli, &body->oa);
1262
1263         /* size[REQ_REC_OFF] still sizeof (*body) */
1264         if (opc == OST_WRITE) {
1265                 if (cli->cl_checksum) {
1266                         /* store cl_cksum_type in a local variable since
1267                          * it can be changed via lprocfs */
1268                         cksum_type_t cksum_type = cli->cl_cksum_type;
1269
1270                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1271                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1272                                 body->oa.o_flags = 0;
1273                         }
1274                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1275                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1277                                                              page_count, pga,
1278                                                              OST_WRITE,
1279                                                              cksum_type, pshift);
1280                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1281                                body->oa.o_cksum);
1282                         /* save this in 'oa', too, for later checking */
1283                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1284                         oa->o_flags |= cksum_type_pack(cksum_type);
1285                 } else {
1286                         /* clear out the checksum flag, in case this is a
1287                          * resend but cl_checksum is no longer set. b=11238 */
1288                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1289                 }
1290                 oa->o_cksum = body->oa.o_cksum;
1291                 /* 1 RC per niobuf */
1292                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1293                 ptlrpc_req_set_repsize(req, 3, size);
1294         } else {
1295                 if (cli->cl_checksum) {
1296                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1297                                 body->oa.o_flags = 0;
1298                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1299                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1300                 }
1301                 /* 1 RC for the whole I/O */
1302                 ptlrpc_req_set_repsize(req, 2, size);
1303         }
1304
1305         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1306         aa = ptlrpc_req_async_args(req);
1307         aa->aa_oa = oa;
1308         aa->aa_requested_nob = requested_nob;
1309         aa->aa_nio_count = niocount;
1310         aa->aa_page_count = page_count;
1311         aa->aa_resends = 0;
1312         aa->aa_ppga = pga;
1313         aa->aa_cli = cli;
1314         aa->aa_pshift = pshift;
1315         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1316
1317         *reqp = req;
1318         RETURN (0);
1319
1320  out:
1321         ptlrpc_req_finished (req);
1322         RETURN (rc);
1323 }
1324
1325 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1326                                 __u32 client_cksum, __u32 server_cksum, int nob,
1327                                 obd_count page_count, struct brw_page **pga,
1328                                 cksum_type_t client_cksum_type, int pshift)
1329 {
1330         __u32 new_cksum;
1331         char *msg;
1332         cksum_type_t cksum_type;
1333
1334         if (server_cksum == client_cksum) {
1335                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1336                 return 0;
1337         }
1338
1339         if (oa->o_valid & OBD_MD_FLFLAGS)
1340                 cksum_type = cksum_type_unpack(oa->o_flags);
1341         else
1342                 cksum_type = OBD_CKSUM_CRC32;
1343
1344         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1345                                       cksum_type, pshift);
1346
1347         if (cksum_type != client_cksum_type)
1348                 msg = "the server did not use the checksum type specified in "
1349                       "the original request - likely a protocol problem";
1350         else if (new_cksum == server_cksum)
1351                 msg = "changed on the client after we checksummed it - "
1352                       "likely false positive due to mmap IO (bug 11742)";
1353         else if (new_cksum == client_cksum)
1354                 msg = "changed in transit before arrival at OST";
1355         else
1356                 msg = "changed in transit AND doesn't match the original - "
1357                       "likely false positive due to mmap IO (bug 11742)";
1358
1359         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1360                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1361                            "["LPU64"-"LPU64"]\n",
1362                            msg, libcfs_nid2str(peer->nid),
1363                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1364                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1365                                                         (__u64)0,
1366                            oa->o_id,
1367                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1368                            pga[0]->off,
1369                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1370         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1371                "client csum now %x\n", client_cksum, client_cksum_type,
1372                server_cksum, cksum_type, new_cksum);
1373
1374         return 1;
1375 }
1376
1377 /* Note rc enters this function as number of bytes transferred */
1378 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1379 {
1380         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1381         const lnet_process_id_t *peer =
1382                         &req->rq_import->imp_connection->c_peer;
1383         struct client_obd *cli = aa->aa_cli;
1384         struct ost_body *body;
1385         __u32 client_cksum = 0;
1386         ENTRY;
1387
1388         if (rc < 0 && rc != -EDQUOT)
1389                 RETURN(rc);
1390
1391         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1392         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1393                                   lustre_swab_ost_body);
1394         if (body == NULL) {
1395                 CERROR ("Can't unpack body\n");
1396                 RETURN(-EPROTO);
1397         }
1398
1399         /* set/clear over quota flag for a uid/gid */
1400         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1401             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1402                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1403                              body->oa.o_gid, body->oa.o_valid,
1404                              body->oa.o_flags);
1405
1406         if (rc < 0)
1407                 RETURN(rc);
1408
1409         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1410                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1411
1412         osc_update_grant(cli, body);
1413
1414         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1415                 if (rc > 0) {
1416                         CERROR ("Unexpected +ve rc %d\n", rc);
1417                         RETURN(-EPROTO);
1418                 }
1419                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1420
1421                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422                     check_write_checksum(&body->oa, peer, client_cksum,
1423                                          body->oa.o_cksum, aa->aa_requested_nob,
1424                                          aa->aa_page_count, aa->aa_ppga,
1425                                          cksum_type_unpack(aa->aa_oa->o_flags),
1426                                          aa->aa_pshift))
1427                         RETURN(-EAGAIN);
1428
1429                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1430                                      aa->aa_page_count, aa->aa_ppga);
1431                 GOTO(out, rc);
1432         }
1433
1434         /* The rest of this function executes only for OST_READs */
1435         if (rc > aa->aa_requested_nob) {
1436                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1437                        aa->aa_requested_nob);
1438                 RETURN(-EPROTO);
1439         }
1440
1441         if (rc != req->rq_bulk->bd_nob_transferred) {
1442                 CERROR ("Unexpected rc %d (%d transferred)\n",
1443                         rc, req->rq_bulk->bd_nob_transferred);
1444                 return (-EPROTO);
1445         }
1446
1447         if (rc < aa->aa_requested_nob)
1448                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1449
1450         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1451                 static int cksum_counter;
1452                 __u32      server_cksum = body->oa.o_cksum;
1453                 char      *via;
1454                 char      *router;
1455                 cksum_type_t cksum_type;
1456
1457                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1458                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1459                 else
1460                         cksum_type = OBD_CKSUM_CRC32;
1461                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1462                                                  aa->aa_ppga, OST_READ,
1463                                                  cksum_type, aa->aa_pshift);
1464
1465                 if (peer->nid == req->rq_bulk->bd_sender) {
1466                         via = router = "";
1467                 } else {
1468                         via = " via ";
1469                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1470                 }
1471
1472                 if (server_cksum == ~0 && rc > 0) {
1473                         CERROR("Protocol error: server %s set the 'checksum' "
1474                                "bit, but didn't send a checksum.  Not fatal, "
1475                                "but please notify on http://bugzilla.lustre.org/\n",
1476                                libcfs_nid2str(peer->nid));
1477                 } else if (server_cksum != client_cksum) {
1478                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1479                                            "%s%s%s inum "LPU64"/"LPU64" object "
1480                                            LPU64"/"LPU64" extent "
1481                                            "["LPU64"-"LPU64"]\n",
1482                                            req->rq_import->imp_obd->obd_name,
1483                                            libcfs_nid2str(peer->nid),
1484                                            via, router,
1485                                            body->oa.o_valid & OBD_MD_FLFID ?
1486                                                 body->oa.o_fid : (__u64)0,
1487                                            body->oa.o_valid & OBD_MD_FLFID ?
1488                                                 body->oa.o_generation :(__u64)0,
1489                                            body->oa.o_id,
1490                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1491                                                 body->oa.o_gr : (__u64)0,
1492                                            aa->aa_ppga[0]->off,
1493                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1494                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1495                                                                         1);
1496                         CERROR("client %x, server %x, cksum_type %x\n",
1497                                client_cksum, server_cksum, cksum_type);
1498                         cksum_counter = 0;
1499                         aa->aa_oa->o_cksum = client_cksum;
1500                         rc = -EAGAIN;
1501                 } else {
1502                         cksum_counter++;
1503                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1504                         rc = 0;
1505                 }
1506         } else if (unlikely(client_cksum)) {
1507                 static int cksum_missed;
1508
1509                 cksum_missed++;
1510                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1511                         CERROR("Checksum %u requested from %s but not sent\n",
1512                                cksum_missed, libcfs_nid2str(peer->nid));
1513         } else {
1514                 rc = 0;
1515         }
1516 out:
1517         if (rc >= 0)
1518                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1519
1520         RETURN(rc);
1521 }
1522
1523 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1524                             struct lov_stripe_md *lsm,
1525                             obd_count page_count, struct brw_page **pga)
1526 {
1527         struct ptlrpc_request *request;
1528         int                    rc;
1529         cfs_waitq_t            waitq;
1530         int                    resends = 0;
1531         struct l_wait_info     lwi;
1532
1533         ENTRY;
1534         init_waitqueue_head(&waitq);
1535
1536 restart_bulk:
1537         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1538                                   page_count, pga, &request, 0);
1539         if (rc != 0)
1540                 return (rc);
1541
1542         rc = ptlrpc_queue_wait(request);
1543
1544         if (rc == -ETIMEDOUT && request->rq_resend) {
1545                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1546                 ptlrpc_req_finished(request);
1547                 goto restart_bulk;
1548         }
1549
1550         rc = osc_brw_fini_request(request, rc);
1551
1552         ptlrpc_req_finished(request);
1553         if (osc_recoverable_error(rc)) {
1554                 resends++;
1555                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1556                         CERROR("too many resend retries, returning error\n");
1557                         RETURN(-EIO);
1558                 }
1559
1560                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1561                 l_wait_event(waitq, 0, &lwi);
1562
1563                 goto restart_bulk;
1564         }
1565         RETURN(rc);
1566 }
1567
1568 int osc_brw_redo_request(struct ptlrpc_request *request,
1569                          struct osc_brw_async_args *aa)
1570 {
1571         struct ptlrpc_request *new_req;
1572         struct ptlrpc_request_set *set = request->rq_set;
1573         struct osc_brw_async_args *new_aa;
1574         struct osc_async_page *oap;
1575         int rc = 0;
1576         ENTRY;
1577
1578         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1579                 CERROR("too many resend retries, returning error\n");
1580                 RETURN(-EIO);
1581         }
1582
1583         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1584
1585         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1586                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1587                                   aa->aa_cli, aa->aa_oa,
1588                                   NULL /* lsm unused by osc currently */,
1589                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1590                                   aa->aa_pshift);
1591         if (rc)
1592                 RETURN(rc);
1593
1594         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1595
1596         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1597                 if (oap->oap_request != NULL) {
1598                         LASSERTF(request == oap->oap_request,
1599                                  "request %p != oap_request %p\n",
1600                                  request, oap->oap_request);
1601                         if (oap->oap_interrupted) {
1602                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1603                                 ptlrpc_req_finished(new_req);
1604                                 RETURN(-EINTR);
1605                         }
1606                 }
1607         }
1608         /* New request takes over pga and oaps from old request.
1609          * Note that copying a list_head doesn't work, need to move it... */
1610         aa->aa_resends++;
1611         new_req->rq_interpret_reply = request->rq_interpret_reply;
1612         new_req->rq_async_args = request->rq_async_args;
1613         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1614
1615         new_aa = ptlrpc_req_async_args(new_req);
1616
1617         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1618         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1619         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1620
1621         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1622                 if (oap->oap_request) {
1623                         ptlrpc_req_finished(oap->oap_request);
1624                         oap->oap_request = ptlrpc_request_addref(new_req);
1625                 }
1626         }
1627
1628         /* use ptlrpc_set_add_req is safe because interpret functions work
1629          * in check_set context. only one way exist with access to request
1630          * from different thread got -EINTR - this way protected with
1631          * cl_loi_list_lock */
1632         ptlrpc_set_add_req(set, new_req);
1633
1634         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1635
1636         DEBUG_REQ(D_INFO, new_req, "new request");
1637         RETURN(0);
1638 }
1639
1640 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1641                           struct lov_stripe_md *lsm, obd_count page_count,
1642                           struct brw_page **pga, struct ptlrpc_request_set *set,
1643                           int pshift)
1644 {
1645         struct ptlrpc_request     *request;
1646         struct client_obd         *cli = &exp->exp_obd->u.cli;
1647         int                        rc, i;
1648         struct osc_brw_async_args *aa;
1649         ENTRY;
1650
1651         /* Consume write credits even if doing a sync write -
1652          * otherwise we may run out of space on OST due to grant. */
1653         /* FIXME: unaligned writes must use write grants too */
1654         if (cmd == OBD_BRW_WRITE && pshift == 0) {
1655                 client_obd_list_lock(&cli->cl_loi_list_lock);
1656                 for (i = 0; i < page_count; i++) {
1657                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1658                                 osc_consume_write_grant(cli, pga[i]);
1659                 }
1660                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1661         }
1662
1663         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1664                                   page_count, pga, &request, pshift);
1665
1666         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1667
1668         if (rc == 0) {
1669                 aa = ptlrpc_req_async_args(request);
1670                 if (cmd == OBD_BRW_READ) {
1671                         lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1672                         lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1673                 } else {
1674                         lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1675                         lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1676                                          cli->cl_w_in_flight);
1677                 }
1678                 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1679
1680                 LASSERT(list_empty(&aa->aa_oaps));
1681
1682                 request->rq_interpret_reply = brw_interpret;
1683                 ptlrpc_set_add_req(set, request);
1684                 client_obd_list_lock(&cli->cl_loi_list_lock);
1685                 if (cmd == OBD_BRW_READ)
1686                         cli->cl_r_in_flight++;
1687                 else
1688                         cli->cl_w_in_flight++;
1689                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1690                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1691         } else if (cmd == OBD_BRW_WRITE) {
1692                 client_obd_list_lock(&cli->cl_loi_list_lock);
1693                 for (i = 0; i < page_count; i++)
1694                         osc_release_write_grant(cli, pga[i], 0);
1695                 osc_wake_cache_waiters(cli);
1696                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1697         }
1698
1699         RETURN (rc);
1700 }
1701
1702 /*
1703  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1704  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1705  * fine for our small page arrays and doesn't require allocation.  its an
1706  * insertion sort that swaps elements that are strides apart, shrinking the
1707  * stride down until its '1' and the array is sorted.
1708  */
1709 static void sort_brw_pages(struct brw_page **array, int num)
1710 {
1711         int stride, i, j;
1712         struct brw_page *tmp;
1713
1714         if (num == 1)
1715                 return;
1716         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1717                 ;
1718
1719         do {
1720                 stride /= 3;
1721                 for (i = stride ; i < num ; i++) {
1722                         tmp = array[i];
1723                         j = i;
1724                         while (j >= stride && array[j-stride]->off > tmp->off) {
1725                                 array[j] = array[j - stride];
1726                                 j -= stride;
1727                         }
1728                         array[j] = tmp;
1729                 }
1730         } while (stride > 1);
1731 }
1732
1733 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1734                                         int pshift)
1735 {
1736         int count = 1;
1737         int offset;
1738         int i = 0;
1739
1740         LASSERT (pages > 0);
1741         offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1742
1743         for (;;) {
1744                 pages--;
1745                 if (pages == 0)         /* that's all */
1746                         return count;
1747
1748                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1749                         return count;   /* doesn't end on page boundary */
1750
1751                 i++;
1752                 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1753                 if (offset != 0)        /* doesn't start on page boundary */
1754                         return count;
1755
1756                 count++;
1757         }
1758 }
1759
1760 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1761 {
1762         struct brw_page **ppga;
1763         int i;
1764
1765         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1766         if (ppga == NULL)
1767                 return NULL;
1768
1769         for (i = 0; i < count; i++)
1770                 ppga[i] = pga + i;
1771         return ppga;
1772 }
1773
1774 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1775 {
1776         LASSERT(ppga != NULL);
1777         OBD_FREE(ppga, sizeof(*ppga) * count);
1778 }
1779
1780 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1781                    obd_count page_count, struct brw_page *pga,
1782                    struct obd_trans_info *oti)
1783 {
1784         struct obdo *saved_oa = NULL;
1785         struct brw_page **ppga, **orig;
1786         struct obd_import *imp = class_exp2cliimp(exp);
1787         struct client_obd *cli;
1788         int rc, page_count_orig;
1789         ENTRY;
1790
1791         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1792         cli = &imp->imp_obd->u.cli;
1793
1794         if (cmd & OBD_BRW_CHECK) {
1795                 /* The caller just wants to know if there's a chance that this
1796                  * I/O can succeed */
1797
1798                 if (imp->imp_invalid)
1799                         RETURN(-EIO);
1800                 RETURN(0);
1801         }
1802
1803         /* test_brw with a failed create can trip this, maybe others. */
1804         LASSERT(cli->cl_max_pages_per_rpc);
1805
1806         rc = 0;
1807
1808         orig = ppga = osc_build_ppga(pga, page_count);
1809         if (ppga == NULL)
1810                 RETURN(-ENOMEM);
1811         page_count_orig = page_count;
1812
1813         sort_brw_pages(ppga, page_count);
1814         while (page_count) {
1815                 obd_count pages_per_brw;
1816
1817                 if (page_count > cli->cl_max_pages_per_rpc)
1818                         pages_per_brw = cli->cl_max_pages_per_rpc;
1819                 else
1820                         pages_per_brw = page_count;
1821
1822                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1823
1824                 if (saved_oa != NULL) {
1825                         /* restore previously saved oa */
1826                         *oinfo->oi_oa = *saved_oa;
1827                 } else if (page_count > pages_per_brw) {
1828                         /* save a copy of oa (brw will clobber it) */
1829                         OBDO_ALLOC(saved_oa);
1830                         if (saved_oa == NULL)
1831                                 GOTO(out, rc = -ENOMEM);
1832                         *saved_oa = *oinfo->oi_oa;
1833                 }
1834
1835                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1836                                       pages_per_brw, ppga);
1837
1838                 if (rc != 0)
1839                         break;
1840
1841                 page_count -= pages_per_brw;
1842                 ppga += pages_per_brw;
1843         }
1844
1845 out:
1846         osc_release_ppga(orig, page_count_orig);
1847
1848         if (saved_oa != NULL)
1849                 OBDO_FREE(saved_oa);
1850
1851         RETURN(rc);
1852 }
1853
1854 static int osc_brw_async(int cmd, struct obd_export *exp,
1855                          struct obd_info *oinfo, obd_count page_count,
1856                          struct brw_page *pga, struct obd_trans_info *oti,
1857                          struct ptlrpc_request_set *set, int pshift)
1858 {
1859         struct brw_page **ppga, **orig;
1860         int page_count_orig;
1861         int rc = 0;
1862         ENTRY;
1863
1864         if (cmd & OBD_BRW_CHECK) {
1865                 /* The caller just wants to know if there's a chance that this
1866                  * I/O can succeed */
1867                 struct obd_import *imp = class_exp2cliimp(exp);
1868
1869                 if (imp == NULL || imp->imp_invalid)
1870                         RETURN(-EIO);
1871                 RETURN(0);
1872         }
1873
1874         orig = ppga = osc_build_ppga(pga, page_count);
1875         if (ppga == NULL)
1876                 RETURN(-ENOMEM);
1877         page_count_orig = page_count;
1878
1879         sort_brw_pages(ppga, page_count);
1880         while (page_count) {
1881                 struct brw_page **copy;
1882                 struct obdo *oa;
1883                 obd_count pages_per_brw;
1884
1885                 /* one page less under unaligned direct i/o */
1886                 pages_per_brw = min_t(obd_count, page_count,
1887                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1888                                       !!pshift);
1889
1890                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1891                                                        pshift);
1892
1893                 /* use ppga only if single RPC is going to fly */
1894                 if (pages_per_brw != page_count_orig || ppga != orig) {
1895                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1896                         if (copy == NULL)
1897                                 GOTO(out, rc = -ENOMEM);
1898                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1899
1900                         OBDO_ALLOC(oa);
1901                         if (oa == NULL) {
1902                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1903                                 GOTO(out, rc = -ENOMEM);
1904                         }
1905                         memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1906                         oa->o_flags |= OBD_FL_TEMPORARY;
1907                 } else {
1908                         copy = ppga;
1909                         oa = oinfo->oi_oa;
1910                         LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1911                 }
1912
1913                 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1914                                     copy, set, pshift);
1915
1916                 if (rc != 0) {
1917                         if (copy != ppga)
1918                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1919
1920                         if (oa->o_flags & OBD_FL_TEMPORARY)
1921                                 OBDO_FREE(oa);
1922                         break;
1923                 }
1924
1925                 if (copy == orig) {
1926                         /* we passed it to async_internal() which is
1927                          * now responsible for releasing memory */
1928                         orig = NULL;
1929                 }
1930
1931                 page_count -= pages_per_brw;
1932                 ppga += pages_per_brw;
1933         }
1934 out:
1935         if (orig)
1936                 osc_release_ppga(orig, page_count_orig);
1937         RETURN(rc);
1938 }
1939
1940 static void osc_check_rpcs(struct client_obd *cli);
1941
1942 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1943  * the dirty accounting.  Writeback completes or truncate happens before
1944  * writing starts.  Must be called with the loi lock held. */
1945 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1946                            int sent)
1947 {
1948         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1949 }
1950
1951 /* This maintains the lists of pending pages to read/write for a given object
1952  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1953  * to quickly find objects that are ready to send an RPC. */
1954 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1955                          int cmd)
1956 {
1957         int optimal;
1958         ENTRY;
1959
1960         if (lop->lop_num_pending == 0)
1961                 RETURN(0);
1962
1963         /* if we have an invalid import we want to drain the queued pages
1964          * by forcing them through rpcs that immediately fail and complete
1965          * the pages.  recovery relies on this to empty the queued pages
1966          * before canceling the locks and evicting down the llite pages */
1967         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1968                 RETURN(1);
1969
1970         /* stream rpcs in queue order as long as as there is an urgent page
1971          * queued.  this is our cheap solution for good batching in the case
1972          * where writepage marks some random page in the middle of the file
1973          * as urgent because of, say, memory pressure */
1974         if (!list_empty(&lop->lop_urgent)) {
1975                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1976                 RETURN(1);
1977         }
1978
1979         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1980         optimal = cli->cl_max_pages_per_rpc;
1981         if (cmd & OBD_BRW_WRITE) {
1982                 /* trigger a write rpc stream as long as there are dirtiers
1983                  * waiting for space.  as they're waiting, they're not going to
1984                  * create more pages to coallesce with what's waiting.. */
1985                 if (!list_empty(&cli->cl_cache_waiters)) {
1986                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1987                         RETURN(1);
1988                 }
1989
1990                 /* +16 to avoid triggering rpcs that would want to include pages
1991                  * that are being queued but which can't be made ready until
1992                  * the queuer finishes with the page. this is a wart for
1993                  * llite::commit_write() */
1994                 optimal += 16;
1995         }
1996         if (lop->lop_num_pending >= optimal)
1997                 RETURN(1);
1998
1999         RETURN(0);
2000 }
2001
2002 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2003 {
2004         struct osc_async_page *oap;
2005         ENTRY;
2006
2007         if (list_empty(&lop->lop_urgent))
2008                 RETURN(0);
2009
2010         oap = list_entry(lop->lop_urgent.next,
2011                          struct osc_async_page, oap_urgent_item);
2012
2013         if (oap->oap_async_flags & ASYNC_HP) {
2014                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2015                 RETURN(1);
2016         }
2017
2018         RETURN(0);
2019 }
2020
2021 static void on_list(struct list_head *item, struct list_head *list,
2022                     int should_be_on)
2023 {
2024         if (list_empty(item) && should_be_on)
2025                 list_add_tail(item, list);
2026         else if (!list_empty(item) && !should_be_on)
2027                 list_del_init(item);
2028 }
2029
2030 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2031  * can find pages to build into rpcs quickly */
2032 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2033 {
2034         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2035             lop_makes_hprpc(&loi->loi_read_lop)) {
2036                 /* HP rpc */
2037                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2038                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2039         } else {
2040                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2041                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2042                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2043                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2044         }
2045
2046         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2047                 loi->loi_write_lop.lop_num_pending);
2048
2049         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2050                 loi->loi_read_lop.lop_num_pending);
2051 }
2052
2053 static void lop_update_pending(struct client_obd *cli,
2054                                struct loi_oap_pages *lop, int cmd, int delta)
2055 {
2056         lop->lop_num_pending += delta;
2057         if (cmd & OBD_BRW_WRITE)
2058                 cli->cl_pending_w_pages += delta;
2059         else
2060                 cli->cl_pending_r_pages += delta;
2061 }
2062
2063 /* this is called when a sync waiter receives an interruption.  Its job is to
2064  * get the caller woken as soon as possible.  If its page hasn't been put in an
2065  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2066  * desiring interruption which will forcefully complete the rpc once the rpc
2067  * has timed out */
2068 static void osc_occ_interrupted(struct oig_callback_context *occ)
2069 {
2070         struct osc_async_page *oap;
2071         struct loi_oap_pages *lop;
2072         struct lov_oinfo *loi;
2073         ENTRY;
2074
2075         /* XXX member_of() */
2076         oap = list_entry(occ, struct osc_async_page, oap_occ);
2077
2078         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2079
2080         oap->oap_interrupted = 1;
2081
2082         /* ok, it's been put in an rpc. only one oap gets a request reference */
2083         if (oap->oap_request != NULL) {
2084                 ptlrpc_mark_interrupted(oap->oap_request);
2085                 ptlrpcd_wake(oap->oap_request);
2086                 GOTO(unlock, 0);
2087         }
2088
2089         /* we don't get interruption callbacks until osc_trigger_group_io()
2090          * has been called and put the sync oaps in the pending/urgent lists.*/
2091         if (!list_empty(&oap->oap_pending_item)) {
2092                 list_del_init(&oap->oap_pending_item);
2093                 list_del_init(&oap->oap_urgent_item);
2094
2095                 loi = oap->oap_loi;
2096                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2097                         &loi->loi_write_lop : &loi->loi_read_lop;
2098                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2099                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2100
2101                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2102                 oap->oap_oig = NULL;
2103         }
2104
2105 unlock:
2106         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2107 }
2108
2109 /* this is trying to propogate async writeback errors back up to the
2110  * application.  As an async write fails we record the error code for later if
2111  * the app does an fsync.  As long as errors persist we force future rpcs to be
2112  * sync so that the app can get a sync error and break the cycle of queueing
2113  * pages for which writeback will fail. */
2114 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2115                            int rc)
2116 {
2117         if (rc) {
2118                 if (!ar->ar_rc)
2119                         ar->ar_rc = rc;
2120
2121                 ar->ar_force_sync = 1;
2122                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2123                 return;
2124
2125         }
2126
2127         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2128                 ar->ar_force_sync = 0;
2129 }
2130
2131 static void osc_oap_to_pending(struct osc_async_page *oap)
2132 {
2133         struct loi_oap_pages *lop;
2134
2135         if (oap->oap_cmd & OBD_BRW_WRITE)
2136                 lop = &oap->oap_loi->loi_write_lop;
2137         else
2138                 lop = &oap->oap_loi->loi_read_lop;
2139
2140         if (oap->oap_async_flags & ASYNC_HP)
2141                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2142         else if (oap->oap_async_flags & ASYNC_URGENT)
2143                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2144         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2145         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2146 }
2147
2148 /* this must be called holding the loi list lock to give coverage to exit_cache,
2149  * async_flag maintenance, and oap_request */
2150 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2151                               struct osc_async_page *oap, int sent, int rc)
2152 {
2153         __u64 xid = 0;
2154
2155         ENTRY;
2156         if (oap->oap_request != NULL) {
2157                 xid = ptlrpc_req_xid(oap->oap_request);
2158                 ptlrpc_req_finished(oap->oap_request);
2159                 oap->oap_request = NULL;
2160         }
2161
2162         spin_lock(&oap->oap_lock);
2163         oap->oap_async_flags = 0;
2164         spin_unlock(&oap->oap_lock);
2165         oap->oap_interrupted = 0;
2166
2167         if (oap->oap_cmd & OBD_BRW_WRITE) {
2168                 osc_process_ar(&cli->cl_ar, xid, rc);
2169                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2170         }
2171
2172         if (rc == 0 && oa != NULL) {
2173                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2174                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2175                 if (oa->o_valid & OBD_MD_FLMTIME)
2176                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2177                 if (oa->o_valid & OBD_MD_FLATIME)
2178                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2179                 if (oa->o_valid & OBD_MD_FLCTIME)
2180                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2181         }
2182
2183         if (oap->oap_oig) {
2184                 osc_exit_cache(cli, oap, sent);
2185                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2186                 oap->oap_oig = NULL;
2187                 EXIT;
2188                 return;
2189         }
2190
2191         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2192                                                 oap->oap_cmd, oa, rc);
2193
2194         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2195          * I/O on the page could start, but OSC calls it under lock
2196          * and thus we can add oap back to pending safely */
2197         if (rc)
2198                 /* upper layer wants to leave the page on pending queue */
2199                 osc_oap_to_pending(oap);
2200         else
2201                 osc_exit_cache(cli, oap, sent);
2202         EXIT;
2203 }
2204
2205 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2206 {
2207         struct osc_brw_async_args *aa = data;
2208         struct client_obd *cli;
2209         ENTRY;
2210
2211         rc = osc_brw_fini_request(request, rc);
2212         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2213
2214         if (osc_recoverable_error(rc)) {
2215                 rc = osc_brw_redo_request(request, aa);
2216                 if (rc == 0)
2217                         RETURN(0);
2218         }
2219
2220         cli = aa->aa_cli;
2221         client_obd_list_lock(&cli->cl_loi_list_lock);
2222         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2223          * is called so we know whether to go to sync BRWs or wait for more
2224          * RPCs to complete */
2225         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2226                 cli->cl_w_in_flight--;
2227         else
2228                 cli->cl_r_in_flight--;
2229
2230         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2231                 struct osc_async_page *oap, *tmp;
2232                 /* the caller may re-use the oap after the completion call so
2233                  * we need to clean it up a little */
2234                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2235                         list_del_init(&oap->oap_rpc_item);
2236                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2237                 }
2238                 OBDO_FREE(aa->aa_oa);
2239         } else { /* from async_internal() */
2240                 obd_count i;
2241                 for (i = 0; i < aa->aa_page_count; i++)
2242                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2243
2244                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2245                         OBDO_FREE(aa->aa_oa);
2246         }
2247         osc_wake_cache_waiters(cli);
2248         osc_check_rpcs(cli);
2249         client_obd_list_unlock(&cli->cl_loi_list_lock);
2250
2251         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2252
2253         RETURN(rc);
2254 }
2255
2256 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2257                                             struct list_head *rpc_list,
2258                                             int page_count, int cmd)
2259 {
2260         struct ptlrpc_request *req;
2261         struct brw_page **pga = NULL;
2262         struct osc_brw_async_args *aa;
2263         struct obdo *oa = NULL;
2264         struct obd_async_page_ops *ops = NULL;
2265         void *caller_data = NULL;
2266         struct osc_async_page *oap;
2267         struct ldlm_lock *lock = NULL;
2268         obd_valid valid;
2269         int i, rc;
2270
2271         ENTRY;
2272         LASSERT(!list_empty(rpc_list));
2273
2274         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2275         if (pga == NULL)
2276                 RETURN(ERR_PTR(-ENOMEM));
2277
2278         OBDO_ALLOC(oa);
2279         if (oa == NULL)
2280                 GOTO(out, req = ERR_PTR(-ENOMEM));
2281
2282         i = 0;
2283         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2284                 if (ops == NULL) {
2285                         ops = oap->oap_caller_ops;
2286                         caller_data = oap->oap_caller_data;
2287                         lock = oap->oap_ldlm_lock;
2288                 }
2289                 pga[i] = &oap->oap_brw_page;
2290                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2291                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2292                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2293                 i++;
2294         }
2295
2296         /* always get the data for the obdo for the rpc */
2297         LASSERT(ops != NULL);
2298         ops->ap_fill_obdo(caller_data, cmd, oa);
2299         if (lock) {
2300                 oa->o_handle = lock->l_remote_handle;
2301                 oa->o_valid |= OBD_MD_FLHANDLE;
2302         }
2303
2304         sort_brw_pages(pga, page_count);
2305         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2306         if (rc != 0) {
2307                 CERROR("prep_req failed: %d\n", rc);
2308                 GOTO(out, req = ERR_PTR(rc));
2309         }
2310         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2311                                                  sizeof(struct ost_body)))->oa;
2312
2313         /* Need to update the timestamps after the request is built in case
2314          * we race with setattr (locally or in queue at OST).  If OST gets
2315          * later setattr before earlier BRW (as determined by the request xid),
2316          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2317          * way to do this in a single call.  bug 10150 */
2318         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2319                 /* in case of lockless read/write do not use inode's
2320                  * timestamps because concurrent stat might fill the
2321                  * inode with out-of-date times, send current
2322                  * instead */
2323                 if (cmd & OBD_BRW_WRITE) {
2324                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2325                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2326                         valid = OBD_MD_FLATIME;
2327                 } else {
2328                         oa->o_atime = LTIME_S(CURRENT_TIME);
2329                         oa->o_valid |= OBD_MD_FLATIME;
2330                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2331                 }
2332         } else {
2333                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2334         }
2335         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2336
2337         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2338         aa = ptlrpc_req_async_args(req);
2339         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2340         list_splice(rpc_list, &aa->aa_oaps);
2341         CFS_INIT_LIST_HEAD(rpc_list);
2342
2343 out:
2344         if (IS_ERR(req)) {
2345                 if (oa)
2346                         OBDO_FREE(oa);
2347                 if (pga)
2348                         OBD_FREE(pga, sizeof(*pga) * page_count);
2349         }
2350         RETURN(req);
2351 }
2352
2353 /* the loi lock is held across this function but it's allowed to release
2354  * and reacquire it during its work */
2355 /**
2356  * prepare pages for ASYNC io and put pages in send queue.
2357  *
2358  * \param cli -
2359  * \param loi -
2360  * \param cmd - OBD_BRW_* macroses
2361  * \param lop - pending pages
2362  *
2363  * \return zero if pages successfully add to send queue.
2364  * \return not zere if error occurring.
2365  */
2366 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2367                             int cmd, struct loi_oap_pages *lop)
2368 {
2369         struct ptlrpc_request *req;
2370         obd_count page_count = 0;
2371         struct osc_async_page *oap = NULL, *tmp;
2372         struct osc_brw_async_args *aa;
2373         struct obd_async_page_ops *ops;
2374         CFS_LIST_HEAD(rpc_list);
2375         unsigned int ending_offset;
2376         unsigned  starting_offset = 0;
2377         int srvlock = 0;
2378         ENTRY;
2379
2380         /* If there are HP OAPs we need to handle at least 1 of them,
2381          * move it the beginning of the pending list for that. */
2382         if (!list_empty(&lop->lop_urgent)) {
2383                 oap = list_entry(lop->lop_urgent.next,
2384                                  struct osc_async_page, oap_urgent_item);
2385                 if (oap->oap_async_flags & ASYNC_HP)
2386                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2387         }
2388
2389         /* first we find the pages we're allowed to work with */
2390         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2391                 ops = oap->oap_caller_ops;
2392
2393                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2394                          "magic 0x%x\n", oap, oap->oap_magic);
2395
2396                 if (page_count != 0 &&
2397                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2398                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2399                                " oap %p, page %p, srvlock %u\n",
2400                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2401                         break;
2402                 }
2403                 /* in llite being 'ready' equates to the page being locked
2404                  * until completion unlocks it.  commit_write submits a page
2405                  * as not ready because its unlock will happen unconditionally
2406                  * as the call returns.  if we race with commit_write giving
2407                  * us that page we dont' want to create a hole in the page
2408                  * stream, so we stop and leave the rpc to be fired by
2409                  * another dirtier or kupdated interval (the not ready page
2410                  * will still be on the dirty list).  we could call in
2411                  * at the end of ll_file_write to process the queue again. */
2412                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2413                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2414                         if (rc < 0)
2415                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2416                                                 "instead of ready\n", oap,
2417                                                 oap->oap_page, rc);
2418                         switch (rc) {
2419                         case -EAGAIN:
2420                                 /* llite is telling us that the page is still
2421                                  * in commit_write and that we should try
2422                                  * and put it in an rpc again later.  we
2423                                  * break out of the loop so we don't create
2424                                  * a hole in the sequence of pages in the rpc
2425                                  * stream.*/
2426                                 oap = NULL;
2427                                 break;
2428                         case -EINTR:
2429                                 /* the io isn't needed.. tell the checks
2430                                  * below to complete the rpc with EINTR */
2431                                 spin_lock(&oap->oap_lock);
2432                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2433                                 spin_unlock(&oap->oap_lock);
2434                                 oap->oap_count = -EINTR;
2435                                 break;
2436                         case 0:
2437                                 spin_lock(&oap->oap_lock);
2438                                 oap->oap_async_flags |= ASYNC_READY;
2439                                 spin_unlock(&oap->oap_lock);
2440                                 break;
2441                         default:
2442                                 LASSERTF(0, "oap %p page %p returned %d "
2443                                             "from make_ready\n", oap,
2444                                             oap->oap_page, rc);
2445                                 break;
2446                         }
2447                 }
2448                 if (oap == NULL)
2449                         break;
2450                 /*
2451                  * Page submitted for IO has to be locked. Either by
2452                  * ->ap_make_ready() or by higher layers.
2453                  */
2454 #if defined(__KERNEL__) && defined(__linux__)
2455                  if(!(PageLocked(oap->oap_page) &&
2456                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2457                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2458                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2459                         LBUG();
2460                 }
2461 #endif
2462                 /* If there is a gap at the start of this page, it can't merge
2463                  * with any previous page, so we'll hand the network a
2464                  * "fragmented" page array that it can't transfer in 1 RDMA */
2465                 if (page_count != 0 && oap->oap_page_off != 0)
2466                         break;
2467
2468                 /* take the page out of our book-keeping */
2469                 list_del_init(&oap->oap_pending_item);
2470                 lop_update_pending(cli, lop, cmd, -1);
2471                 list_del_init(&oap->oap_urgent_item);
2472
2473                 if (page_count == 0)
2474                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2475                                           (PTLRPC_MAX_BRW_SIZE - 1);
2476
2477                 /* ask the caller for the size of the io as the rpc leaves. */
2478                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2479                         oap->oap_count =
2480                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2481                 if (oap->oap_count <= 0) {
2482                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2483                                oap->oap_count);
2484                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2485                         continue;
2486                 }
2487
2488                 /* now put the page back in our accounting */
2489                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2490                 if (page_count == 0)
2491                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2492                 if (++page_count >= cli->cl_max_pages_per_rpc)
2493                         break;
2494
2495                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2496                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2497                  * have the same alignment as the initial writes that allocated
2498                  * extents on the server. */
2499                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2500                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2501                 if (ending_offset == 0)
2502                         break;
2503
2504                 /* If there is a gap at the end of this page, it can't merge
2505                  * with any subsequent pages, so we'll hand the network a
2506                  * "fragmented" page array that it can't transfer in 1 RDMA */
2507                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2508                         break;
2509         }
2510
2511         osc_wake_cache_waiters(cli);
2512
2513         if (page_count == 0)
2514                 RETURN(0);
2515
2516         loi_list_maint(cli, loi);
2517
2518         client_obd_list_unlock(&cli->cl_loi_list_lock);
2519
2520         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2521         if (IS_ERR(req)) {
2522                 /* this should happen rarely and is pretty bad, it makes the
2523                  * pending list not follow the dirty order */
2524                 client_obd_list_lock(&cli->cl_loi_list_lock);
2525                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2526                         list_del_init(&oap->oap_rpc_item);
2527
2528                         /* queued sync pages can be torn down while the pages
2529                          * were between the pending list and the rpc */
2530                         if (oap->oap_interrupted) {
2531                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2532                                 osc_ap_completion(cli, NULL, oap, 0,
2533                                                   oap->oap_count);
2534                                 continue;
2535                         }
2536                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2537                 }
2538                 loi_list_maint(cli, loi);
2539                 RETURN(PTR_ERR(req));
2540         }
2541
2542         aa = ptlrpc_req_async_args(req);
2543         if (cmd == OBD_BRW_READ) {
2544                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2545                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2546                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2547                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2548         } else {
2549                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2550                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2551                                  cli->cl_w_in_flight);
2552                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2553                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2554         }
2555         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2556
2557         client_obd_list_lock(&cli->cl_loi_list_lock);
2558
2559         if (cmd == OBD_BRW_READ)
2560                 cli->cl_r_in_flight++;
2561         else
2562                 cli->cl_w_in_flight++;
2563
2564         /* queued sync pages can be torn down while the pages
2565          * were between the pending list and the rpc */
2566         tmp = NULL;
2567         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2568                 /* only one oap gets a request reference */
2569                 if (tmp == NULL)
2570                         tmp = oap;
2571                 if (oap->oap_interrupted && !req->rq_intr) {
2572                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2573                                oap, req);
2574                         ptlrpc_mark_interrupted(req);
2575                 }
2576         }
2577         if (tmp != NULL)
2578                 tmp->oap_request = ptlrpc_request_addref(req);
2579
2580         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2581                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2582
2583         req->rq_interpret_reply = brw_interpret;
2584         ptlrpcd_add_req(req);
2585         RETURN(1);
2586 }
2587
2588 #define LOI_DEBUG(LOI, STR, args...)                                     \
2589         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2590                !list_empty(&(LOI)->loi_ready_item) ||                    \
2591                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2592                (LOI)->loi_write_lop.lop_num_pending,                     \
2593                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2594                (LOI)->loi_read_lop.lop_num_pending,                      \
2595                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2596                args)                                                     \
2597
2598 /* This is called by osc_check_rpcs() to find which objects have pages that
2599  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2600 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2601 {
2602         ENTRY;
2603         /* First return objects that have blocked locks so that they
2604          * will be flushed quickly and other clients can get the lock,
2605          * then objects which have pages ready to be stuffed into RPCs */
2606         if (!list_empty(&cli->cl_loi_hp_ready_list))
2607                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2608                                   struct lov_oinfo, loi_hp_ready_item));
2609         if (!list_empty(&cli->cl_loi_ready_list))
2610                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2611                                   struct lov_oinfo, loi_ready_item));
2612
2613         /* then if we have cache waiters, return all objects with queued
2614          * writes.  This is especially important when many small files
2615          * have filled up the cache and not been fired into rpcs because
2616          * they don't pass the nr_pending/object threshhold */
2617         if (!list_empty(&cli->cl_cache_waiters) &&
2618             !list_empty(&cli->cl_loi_write_list))
2619                 RETURN(list_entry(cli->cl_loi_write_list.next,
2620                                   struct lov_oinfo, loi_write_item));
2621
2622         /* then return all queued objects when we have an invalid import
2623          * so that they get flushed */
2624         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2625                 if (!list_empty(&cli->cl_loi_write_list))
2626                         RETURN(list_entry(cli->cl_loi_write_list.next,
2627                                           struct lov_oinfo, loi_write_item));
2628                 if (!list_empty(&cli->cl_loi_read_list))
2629                         RETURN(list_entry(cli->cl_loi_read_list.next,
2630                                           struct lov_oinfo, loi_read_item));
2631         }
2632         RETURN(NULL);
2633 }
2634
2635 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2636 {
2637         struct osc_async_page *oap;
2638         int hprpc = 0;
2639
2640         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2641                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2642                                  struct osc_async_page, oap_urgent_item);
2643                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2644         }
2645
2646         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2647                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2648                                  struct osc_async_page, oap_urgent_item);
2649                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2650         }
2651
2652         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2653 }
2654
2655 /* called with the loi list lock held */
2656 static void osc_check_rpcs(struct client_obd *cli)
2657 {
2658         struct lov_oinfo *loi;
2659         int rc = 0, race_counter = 0;
2660         ENTRY;
2661
2662         while ((loi = osc_next_loi(cli)) != NULL) {
2663                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2664
2665                 if (osc_max_rpc_in_flight(cli, loi))
2666                         break;
2667
2668                 /* attempt some read/write balancing by alternating between
2669                  * reads and writes in an object.  The makes_rpc checks here
2670                  * would be redundant if we were getting read/write work items
2671                  * instead of objects.  we don't want send_oap_rpc to drain a
2672                  * partial read pending queue when we're given this object to
2673                  * do io on writes while there are cache waiters */
2674                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2675                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2676                                               &loi->loi_write_lop);
2677                         if (rc < 0)
2678                                 break;
2679                         if (rc > 0)
2680                                 race_counter = 0;
2681                         else
2682                                 race_counter++;
2683                 }
2684                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2685                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2686                                               &loi->loi_read_lop);
2687                         if (rc < 0)
2688                                 break;
2689                         if (rc > 0)
2690                                 race_counter = 0;
2691                         else
2692                                 race_counter++;
2693                 }
2694
2695                 /* attempt some inter-object balancing by issueing rpcs
2696                  * for each object in turn */
2697                 if (!list_empty(&loi->loi_hp_ready_item))
2698                         list_del_init(&loi->loi_hp_ready_item);
2699                 if (!list_empty(&loi->loi_ready_item))
2700                         list_del_init(&loi->loi_ready_item);
2701                 if (!list_empty(&loi->loi_write_item))
2702                         list_del_init(&loi->loi_write_item);
2703                 if (!list_empty(&loi->loi_read_item))
2704                         list_del_init(&loi->loi_read_item);
2705
2706                 loi_list_maint(cli, loi);
2707
2708                 /* send_oap_rpc fails with 0 when make_ready tells it to
2709                  * back off.  llite's make_ready does this when it tries
2710                  * to lock a page queued for write that is already locked.
2711                  * we want to try sending rpcs from many objects, but we
2712                  * don't want to spin failing with 0.  */
2713                 if (race_counter == 10)
2714                         break;
2715         }
2716         EXIT;
2717 }
2718
2719 /* we're trying to queue a page in the osc so we're subject to the
2720  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2721  * If the osc's queued pages are already at that limit, then we want to sleep
2722  * until there is space in the osc's queue for us.  We also may be waiting for
2723  * write credits from the OST if there are RPCs in flight that may return some
2724  * before we fall back to sync writes.
2725  *
2726  * We need this know our allocation was granted in the presence of signals */
2727 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2728 {
2729         int rc;
2730         ENTRY;
2731         client_obd_list_lock(&cli->cl_loi_list_lock);
2732         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2733         client_obd_list_unlock(&cli->cl_loi_list_lock);
2734         RETURN(rc);
2735 };
2736
2737 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2738  * grant or cache space. */
2739 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2740                            struct osc_async_page *oap)
2741 {
2742         struct osc_cache_waiter ocw;
2743         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2744         ENTRY;
2745
2746         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2747                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2748                cli->cl_dirty_max, obd_max_dirty_pages,
2749                cli->cl_lost_grant, cli->cl_avail_grant);
2750
2751         /* force the caller to try sync io.  this can jump the list
2752          * of queued writes and create a discontiguous rpc stream */
2753         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2754             loi->loi_ar.ar_force_sync)
2755                 RETURN(-EDQUOT);
2756
2757         /* Hopefully normal case - cache space and write credits available */
2758         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2759             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2760             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2761                 /* account for ourselves */
2762                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2763                 RETURN(0);
2764         }
2765
2766         /* It is safe to block as a cache waiter as long as there is grant
2767          * space available or the hope of additional grant being returned
2768          * when an in flight write completes.  Using the write back cache
2769          * if possible is preferable to sending the data synchronously
2770          * because write pages can then be merged in to large requests.
2771          * The addition of this cache waiter will causing pending write
2772          * pages to be sent immediately. */
2773         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2774                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2775                 cfs_waitq_init(&ocw.ocw_waitq);
2776                 ocw.ocw_oap = oap;
2777                 ocw.ocw_rc = 0;
2778
2779                 loi_list_maint(cli, loi);
2780                 osc_check_rpcs(cli);
2781                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2782
2783                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2784                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2785
2786                 client_obd_list_lock(&cli->cl_loi_list_lock);
2787                 if (!list_empty(&ocw.ocw_entry)) {
2788                         list_del(&ocw.ocw_entry);
2789                         RETURN(-EINTR);
2790                 }
2791                 RETURN(ocw.ocw_rc);
2792         }
2793
2794         RETURN(-EDQUOT);
2795 }
2796
2797 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2798                         void **res, int rw, obd_off start, obd_off end,
2799                         struct lustre_handle *lockh, int flags)
2800 {
2801         struct ldlm_lock *lock = NULL;
2802         int rc, release = 0;
2803
2804         ENTRY;
2805
2806         if (lockh && lustre_handle_is_used(lockh)) {
2807                 /* if a valid lockh is passed, just check that the corresponding
2808                  * lock covers the extent */
2809                 lock = ldlm_handle2lock(lockh);
2810                 release = 1;
2811         } else {
2812                 struct osc_async_page *oap = *res;
2813                 spin_lock(&oap->oap_lock);
2814                 lock = oap->oap_ldlm_lock;
2815                 if (likely(lock))
2816                         LDLM_LOCK_GET(lock);
2817                 spin_unlock(&oap->oap_lock);
2818         }
2819         /* lock can be NULL in case race obd_get_lock vs lock cancel
2820          * so we should be don't try match this */
2821         if (unlikely(!lock))
2822                 return 0;
2823
2824         rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2825         if (release == 1 && rc == 1)
2826                 /* if a valid lockh was passed, we just need to check
2827                  * that the lock covers the page, no reference should be
2828                  * taken*/
2829                 ldlm_lock_decref(lockh,
2830                                  rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2831         LDLM_LOCK_PUT(lock);
2832         RETURN(rc);
2833 }
2834
2835 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2836                         struct lov_oinfo *loi, cfs_page_t *page,
2837                         obd_off offset, struct obd_async_page_ops *ops,
2838                         void *data, void **res, int flags,
2839                         struct lustre_handle *lockh)
2840 {
2841         struct osc_async_page *oap;
2842         struct ldlm_res_id oid = {{0}};
2843         int rc = 0;
2844
2845         ENTRY;
2846
2847         if (!page)
2848                 return size_round(sizeof(*oap));
2849
2850         oap = *res;
2851         oap->oap_magic = OAP_MAGIC;
2852         oap->oap_cli = &exp->exp_obd->u.cli;
2853         oap->oap_loi = loi;
2854
2855         oap->oap_caller_ops = ops;
2856         oap->oap_caller_data = data;
2857
2858         oap->oap_page = page;
2859         oap->oap_obj_off = offset;
2860
2861         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2862         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2863         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2864         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2865
2866         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2867
2868         spin_lock_init(&oap->oap_lock);
2869
2870         /* If the page was marked as notcacheable - don't add to any locks */
2871         if (!(flags & OBD_PAGE_NO_CACHE)) {
2872                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2873                 /* This is the only place where we can call cache_add_extent
2874                    without oap_lock, because this page is locked now, and
2875                    the lock we are adding it to is referenced, so cannot lose
2876                    any pages either. */
2877                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2878                 if (rc)
2879                         RETURN(rc);
2880         }
2881
2882         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2883         RETURN(0);
2884 }
2885
2886 struct osc_async_page *oap_from_cookie(void *cookie)
2887 {
2888         struct osc_async_page *oap = cookie;
2889         if (oap->oap_magic != OAP_MAGIC)
2890                 return ERR_PTR(-EINVAL);
2891         return oap;
2892 };
2893
2894 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2895                               struct lov_oinfo *loi, void *cookie,
2896                               int cmd, obd_off off, int count,
2897                               obd_flag brw_flags, enum async_flags async_flags)
2898 {
2899         struct client_obd *cli = &exp->exp_obd->u.cli;
2900         struct osc_async_page *oap;
2901         int rc = 0;
2902         ENTRY;
2903
2904         oap = oap_from_cookie(cookie);
2905         if (IS_ERR(oap))
2906                 RETURN(PTR_ERR(oap));
2907
2908         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2909                 RETURN(-EIO);
2910
2911         if (!list_empty(&oap->oap_pending_item) ||
2912             !list_empty(&oap->oap_urgent_item) ||
2913             !list_empty(&oap->oap_rpc_item))
2914                 RETURN(-EBUSY);
2915
2916         /* check if the file's owner/group is over quota */
2917         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2918                 struct obd_async_page_ops *ops;
2919                 struct obdo *oa;
2920
2921                 OBDO_ALLOC(oa);
2922                 if (oa == NULL)
2923                         RETURN(-ENOMEM);
2924
2925                 ops = oap->oap_caller_ops;
2926                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2927                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2928                     NO_QUOTA)
2929                         rc = -EDQUOT;
2930
2931                 OBDO_FREE(oa);
2932                 if (rc)
2933                         RETURN(rc);
2934         }
2935
2936         if (loi == NULL)
2937                 loi = lsm->lsm_oinfo[0];
2938
2939         client_obd_list_lock(&cli->cl_loi_list_lock);
2940
2941         oap->oap_cmd = cmd;
2942         oap->oap_page_off = off;
2943         oap->oap_count = count;
2944         oap->oap_brw_flags = brw_flags;
2945         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2946         if (libcfs_memory_pressure_get())
2947                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2948         spin_lock(&oap->oap_lock);
2949         oap->oap_async_flags = async_flags;
2950         spin_unlock(&oap->oap_lock);
2951
2952         if (cmd & OBD_BRW_WRITE) {
2953                 rc = osc_enter_cache(cli, loi, oap);
2954                 if (rc) {
2955                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2956                         RETURN(rc);
2957                 }
2958         }
2959
2960         osc_oap_to_pending(oap);
2961         loi_list_maint(cli, loi);
2962
2963         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2964                   cmd);
2965
2966         osc_check_rpcs(cli);
2967         client_obd_list_unlock(&cli->cl_loi_list_lock);
2968
2969         RETURN(0);
2970 }
2971
2972 /* aka (~was & now & flag), but this is more clear :) */
2973 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2974
2975 static int osc_set_async_flags(struct obd_export *exp,
2976                                struct lov_stripe_md *lsm,
2977                                struct lov_oinfo *loi, void *cookie,
2978                                obd_flag async_flags)
2979 {
2980         struct client_obd *cli = &exp->exp_obd->u.cli;
2981         struct loi_oap_pages *lop;
2982         struct osc_async_page *oap;
2983         int rc = 0;
2984         ENTRY;
2985
2986         oap = oap_from_cookie(cookie);
2987         if (IS_ERR(oap))
2988                 RETURN(PTR_ERR(oap));
2989
2990         /*
2991          * bug 7311: OST-side locking is only supported for liblustre for now
2992          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2993          * implementation has to handle case where OST-locked page was picked
2994          * up by, e.g., ->writepage().
2995          */
2996         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2997         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2998                                      * tread here. */
2999
3000         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3001                 RETURN(-EIO);
3002
3003         if (loi == NULL)
3004                 loi = lsm->lsm_oinfo[0];
3005
3006         if (oap->oap_cmd & OBD_BRW_WRITE) {
3007                 lop = &loi->loi_write_lop;
3008         } else {
3009                 lop = &loi->loi_read_lop;
3010         }
3011
3012         client_obd_list_lock(&cli->cl_loi_list_lock);
3013         /* oap_lock provides atomic semantics of oap_async_flags access */
3014         spin_lock(&oap->oap_lock);
3015         if (list_empty(&oap->oap_pending_item))
3016                 GOTO(out, rc = -EINVAL);
3017
3018         if ((oap->oap_async_flags & async_flags) == async_flags)
3019                 GOTO(out, rc = 0);
3020
3021         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3022                 oap->oap_async_flags |= ASYNC_READY;
3023
3024         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3025             list_empty(&oap->oap_rpc_item)) {
3026                 if (oap->oap_async_flags & ASYNC_HP)
3027                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3028                 else
3029                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3030                 oap->oap_async_flags |= ASYNC_URGENT;
3031                 loi_list_maint(cli, loi);
3032         }
3033
3034         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3035                         oap->oap_async_flags);
3036 out:
3037         spin_unlock(&oap->oap_lock);
3038         osc_check_rpcs(cli);
3039         client_obd_list_unlock(&cli->cl_loi_list_lock);
3040         RETURN(rc);
3041 }
3042
3043 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3044                              struct lov_oinfo *loi,
3045                              struct obd_io_group *oig, void *cookie,
3046                              int cmd, obd_off off, int count,
3047                              obd_flag brw_flags,
3048                              obd_flag async_flags)
3049 {
3050         struct client_obd *cli = &exp->exp_obd->u.cli;
3051         struct osc_async_page *oap;
3052         struct loi_oap_pages *lop;
3053         int rc = 0;
3054         ENTRY;
3055
3056         oap = oap_from_cookie(cookie);
3057         if (IS_ERR(oap))
3058                 RETURN(PTR_ERR(oap));
3059
3060         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3061                 RETURN(-EIO);
3062
3063         if (!list_empty(&oap->oap_pending_item) ||
3064             !list_empty(&oap->oap_urgent_item) ||
3065             !list_empty(&oap->oap_rpc_item))
3066                 RETURN(-EBUSY);
3067
3068         if (loi == NULL)
3069                 loi = lsm->lsm_oinfo[0];
3070
3071         client_obd_list_lock(&cli->cl_loi_list_lock);
3072
3073         oap->oap_cmd = cmd;
3074         oap->oap_page_off = off;
3075         oap->oap_count = count;
3076         oap->oap_brw_flags = brw_flags;
3077         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3078         if (libcfs_memory_pressure_get())
3079                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3080         spin_lock(&oap->oap_lock);
3081         oap->oap_async_flags = async_flags;
3082         spin_unlock(&oap->oap_lock);
3083
3084         if (cmd & OBD_BRW_WRITE)
3085                 lop = &loi->loi_write_lop;
3086         else
3087                 lop = &loi->loi_read_lop;
3088
3089         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3090         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3091                 oap->oap_oig = oig;
3092                 rc = oig_add_one(oig, &oap->oap_occ);
3093         }
3094
3095         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3096                   oap, oap->oap_page, rc);
3097
3098         client_obd_list_unlock(&cli->cl_loi_list_lock);
3099
3100         RETURN(rc);
3101 }
3102
3103 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3104                                  struct loi_oap_pages *lop, int cmd)
3105 {
3106         struct list_head *pos, *tmp;
3107         struct osc_async_page *oap;
3108
3109         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3110                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3111                 list_del(&oap->oap_pending_item);
3112                 osc_oap_to_pending(oap);
3113         }
3114         loi_list_maint(cli, loi);
3115 }
3116
3117 static int osc_trigger_group_io(struct obd_export *exp,
3118                                 struct lov_stripe_md *lsm,
3119                                 struct lov_oinfo *loi,
3120                                 struct obd_io_group *oig)
3121 {
3122         struct client_obd *cli = &exp->exp_obd->u.cli;
3123         ENTRY;
3124
3125         if (loi == NULL)
3126                 loi = lsm->lsm_oinfo[0];
3127
3128         client_obd_list_lock(&cli->cl_loi_list_lock);
3129
3130         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3131         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3132
3133         osc_check_rpcs(cli);
3134         client_obd_list_unlock(&cli->cl_loi_list_lock);
3135
3136         RETURN(0);
3137 }
3138
3139 static int osc_teardown_async_page(struct obd_export *exp,
3140                                    struct lov_stripe_md *lsm,
3141                                    struct lov_oinfo *loi, void *cookie)
3142 {
3143         struct client_obd *cli = &exp->exp_obd->u.cli;
3144         struct loi_oap_pages *lop;
3145         struct osc_async_page *oap;
3146         int rc = 0;
3147         ENTRY;
3148
3149         oap = oap_from_cookie(cookie);
3150         if (IS_ERR(oap))
3151                 RETURN(PTR_ERR(oap));
3152
3153         if (loi == NULL)
3154                 loi = lsm->lsm_oinfo[0];
3155
3156         if (oap->oap_cmd & OBD_BRW_WRITE) {
3157                 lop = &loi->loi_write_lop;
3158         } else {
3159                 lop = &loi->loi_read_lop;
3160         }
3161
3162         client_obd_list_lock(&cli->cl_loi_list_lock);
3163
3164         if (!list_empty(&oap->oap_rpc_item))
3165                 GOTO(out, rc = -EBUSY);
3166
3167         osc_exit_cache(cli, oap, 0);
3168         osc_wake_cache_waiters(cli);
3169
3170         if (!list_empty(&oap->oap_urgent_item)) {
3171                 list_del_init(&oap->oap_urgent_item);
3172                 spin_lock(&oap->oap_lock);
3173                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3174                 spin_unlock(&oap->oap_lock);
3175         }
3176
3177         if (!list_empty(&oap->oap_pending_item)) {
3178                 list_del_init(&oap->oap_pending_item);
3179                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3180         }
3181         loi_list_maint(cli, loi);
3182         cache_remove_extent(cli->cl_cache, oap);
3183
3184         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3185 out:
3186         client_obd_list_unlock(&cli->cl_loi_list_lock);
3187         RETURN(rc);
3188 }
3189
3190 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3191                            struct ldlm_lock_desc *new, void *data,
3192                            int flag)
3193 {
3194         struct lustre_handle lockh = { 0 };
3195         int rc;
3196         ENTRY;
3197
3198         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3199                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3200                 LBUG();
3201         }
3202
3203         switch (flag) {
3204         case LDLM_CB_BLOCKING:
3205                 ldlm_lock2handle(lock, &lockh);
3206                 rc = ldlm_cli_cancel(&lockh);
3207                 if (rc != ELDLM_OK)
3208                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
3209                 break;
3210         case LDLM_CB_CANCELING: {
3211
3212                 ldlm_lock2handle(lock, &lockh);
3213                 /* This lock wasn't granted, don't try to do anything */
3214                 if (lock->l_req_mode != lock->l_granted_mode)
3215                         RETURN(0);
3216
3217                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3218                                   &lockh);
3219
3220                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3221                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3222                                                           lock, new, data,flag);
3223                 break;
3224         }
3225         default:
3226                 LBUG();
3227         }
3228
3229         RETURN(0);
3230 }
3231 EXPORT_SYMBOL(osc_extent_blocking_cb);
3232
3233 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3234                                     int flags)
3235 {
3236         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3237
3238         if (lock == NULL) {
3239                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3240                 return;
3241         }
3242         lock_res_and_lock(lock);
3243 #if defined (__KERNEL__) && defined (__linux__)
3244         /* Liang XXX: Darwin and Winnt checking should be added */
3245         if (lock->l_ast_data && lock->l_ast_data != data) {
3246                 struct inode *new_inode = data;
3247                 struct inode *old_inode = lock->l_ast_data;
3248                 if (!(old_inode->i_state & I_FREEING))
3249                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3250                 LASSERTF(old_inode->i_state & I_FREEING,
3251                          "Found existing inode %p/%lu/%u state %lu in lock: "
3252                          "setting data to %p/%lu/%u\n", old_inode,
3253                          old_inode->i_ino, old_inode->i_generation,
3254                          old_inode->i_state,
3255                          new_inode, new_inode->i_ino, new_inode->i_generation);
3256         }
3257 #endif
3258         lock->l_ast_data = data;
3259         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3260         unlock_res_and_lock(lock);
3261         LDLM_LOCK_PUT(lock);
3262 }
3263
3264 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3265                              ldlm_iterator_t replace, void *data)
3266 {
3267         struct ldlm_res_id res_id;
3268         struct obd_device *obd = class_exp2obd(exp);
3269
3270         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3271         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3272         return 0;
3273 }
3274
3275 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3276                             struct obd_info *oinfo, int intent, int rc)
3277 {
3278         ENTRY;
3279
3280         if (intent) {
3281                 /* The request was created before ldlm_cli_enqueue call. */
3282                 if (rc == ELDLM_LOCK_ABORTED) {
3283                         struct ldlm_reply *rep;
3284
3285                         /* swabbed by ldlm_cli_enqueue() */
3286                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3287                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3288                                              sizeof(*rep));
3289                         LASSERT(rep != NULL);
3290                         if (rep->lock_policy_res1)
3291                                 rc = rep->lock_policy_res1;
3292                 }
3293         }
3294
3295         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3296                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3297                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3298                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3299                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3300         }
3301
3302         if (!rc)
3303                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3304
3305         /* Call the update callback. */
3306         rc = oinfo->oi_cb_up(oinfo, rc);
3307         RETURN(rc);
3308 }
3309
3310 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3311                                  void *data, int rc)
3312 {
3313         struct osc_enqueue_args *aa = data;
3314         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3315         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3316         struct ldlm_lock *lock;
3317
3318         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3319          * be valid. */
3320         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3321
3322         /* Complete obtaining the lock procedure. */
3323         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3324                                    aa->oa_ei->ei_mode,
3325                                    &aa->oa_oi->oi_flags,
3326                                    &lsm->lsm_oinfo[0]->loi_lvb,
3327                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3328                                    lustre_swab_ost_lvb,
3329                                    aa->oa_oi->oi_lockh, rc);
3330
3331         /* Complete osc stuff. */
3332         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3333
3334         /* Release the lock for async request. */
3335         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3336                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3337
3338         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3339                  aa->oa_oi->oi_lockh, req, aa);
3340         LDLM_LOCK_PUT(lock);
3341         return rc;
3342 }
3343
3344 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3345  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3346  * other synchronous requests, however keeping some locks and trying to obtain
3347  * others may take a considerable amount of time in a case of ost failure; and
3348  * when other sync requests do not get released lock from a client, the client
3349  * is excluded from the cluster -- such scenarious make the life difficult, so
3350  * release locks just after they are obtained. */
3351 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3352                        struct ldlm_enqueue_info *einfo,
3353                        struct ptlrpc_request_set *rqset)
3354 {
3355         struct ldlm_res_id res_id;
3356         struct obd_device *obd = exp->exp_obd;
3357         struct ldlm_reply *rep;
3358         struct ptlrpc_request *req = NULL;
3359         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3360         ldlm_mode_t mode;
3361         int rc;
3362         ENTRY;
3363
3364         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3365                            oinfo->oi_md->lsm_object_gr, &res_id);
3366         /* Filesystem lock extents are extended to page boundaries so that
3367          * dealing with the page cache is a little smoother.  */
3368         oinfo->oi_policy.l_extent.start -=
3369                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3370         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3371
3372         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3373                 goto no_match;
3374
3375         /* Next, search for already existing extent locks that will cover us */
3376         /* If we're trying to read, we also search for an existing PW lock.  The
3377          * VFS and page cache already protect us locally, so lots of readers/
3378          * writers can share a single PW lock.
3379          *
3380          * There are problems with conversion deadlocks, so instead of
3381          * converting a read lock to a write lock, we'll just enqueue a new
3382          * one.
3383          *
3384          * At some point we should cancel the read lock instead of making them
3385          * send us a blocking callback, but there are problems with canceling
3386          * locks out from other users right now, too. */
3387         mode = einfo->ei_mode;
3388         if (einfo->ei_mode == LCK_PR)
3389                 mode |= LCK_PW;
3390         mode = ldlm_lock_match(obd->obd_namespace,
3391                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3392                                einfo->ei_type, &oinfo->oi_policy, mode,
3393                                oinfo->oi_lockh);
3394         if (mode) {
3395                 /* addref the lock only if not async requests and PW lock is
3396                  * matched whereas we asked for PR. */
3397                 if (!rqset && einfo->ei_mode != mode)
3398                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3399                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3400                                         oinfo->oi_flags);
3401                 if (intent) {
3402                         /* I would like to be able to ASSERT here that rss <=
3403                          * kms, but I can't, for reasons which are explained in
3404                          * lov_enqueue() */
3405                 }
3406
3407                 /* We already have a lock, and it's referenced */
3408                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3409
3410                 /* For async requests, decref the lock. */
3411                 if (einfo->ei_mode != mode)
3412                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3413                 else if (rqset)
3414                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3415
3416                 RETURN(ELDLM_OK);
3417         }
3418
3419  no_match:
3420         if (intent) {
3421                 __u32 size[3] = {
3422                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3423                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3424                         [DLM_LOCKREQ_OFF + 1] = 0 };
3425
3426                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3427                 if (req == NULL)
3428                         RETURN(-ENOMEM);
3429
3430                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3431                 size[DLM_REPLY_REC_OFF] =
3432                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3433                 ptlrpc_req_set_repsize(req, 3, size);
3434         }
3435
3436         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3437         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3438
3439         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3440                               &oinfo->oi_policy, &oinfo->oi_flags,
3441                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3442                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3443                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3444                               rqset ? 1 : 0);
3445         if (rqset) {
3446                 if (!rc) {
3447                         struct osc_enqueue_args *aa;
3448                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3449                         aa = ptlrpc_req_async_args(req);
3450                         aa->oa_oi = oinfo;
3451                         aa->oa_ei = einfo;
3452                         aa->oa_exp = exp;
3453
3454                         req->rq_interpret_reply = osc_enqueue_interpret;
3455                         ptlrpc_set_add_req(rqset, req);
3456                 } else if (intent) {
3457                         ptlrpc_req_finished(req);
3458                 }
3459                 RETURN(rc);
3460         }
3461
3462         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3463         if (intent)
3464                 ptlrpc_req_finished(req);
3465
3466         RETURN(rc);
3467 }
3468
3469 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3470                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3471                      int *flags, void *data, struct lustre_handle *lockh,
3472                      int *n_matches)
3473 {
3474         struct ldlm_res_id res_id;
3475         struct obd_device *obd = exp->exp_obd;
3476         int lflags = *flags;
3477         ldlm_mode_t rc;
3478         ENTRY;
3479
3480         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3481
3482         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3483
3484         /* Filesystem lock extents are extended to page boundaries so that
3485          * dealing with the page cache is a little smoother */
3486         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3487         policy->l_extent.end |= ~CFS_PAGE_MASK;
3488
3489         /* Next, search for already existing extent locks that will cover us */
3490         /* If we're trying to read, we also search for an existing PW lock.  The
3491          * VFS and page cache already protect us locally, so lots of readers/
3492          * writers can share a single PW lock. */
3493         rc = mode;
3494         if (mode == LCK_PR)
3495                 rc |= LCK_PW;
3496         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3497                              &res_id, type, policy, rc, lockh);
3498         if (rc) {
3499                 osc_set_data_with_check(lockh, data, lflags);
3500                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3501                         ldlm_lock_addref(lockh, LCK_PR);
3502                         ldlm_lock_decref(lockh, LCK_PW);
3503                 }
3504                 if (n_matches != NULL)
3505                         (*n_matches)++;
3506         }
3507
3508         RETURN(rc);
3509 }
3510
3511 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3512                       __u32 mode, struct lustre_handle *lockh, int flags,
3513                       obd_off end)
3514 {
3515         ENTRY;
3516
3517         if (unlikely(mode == LCK_GROUP))
3518                 ldlm_lock_decref_and_cancel(lockh, mode);
3519         else
3520                 ldlm_lock_decref(lockh, mode);
3521
3522         RETURN(0);
3523 }
3524
3525 static int osc_cancel_unused(struct obd_export *exp,
3526                              struct lov_stripe_md *lsm, int flags, void *opaque)
3527 {
3528         struct obd_device *obd = class_exp2obd(exp);
3529         struct ldlm_res_id res_id, *resp = NULL;
3530
3531         if (lsm != NULL) {
3532                 resp = osc_build_res_name(lsm->lsm_object_id,
3533                                           lsm->lsm_object_gr, &res_id);
3534         }
3535
3536         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3537
3538 }
3539
3540 static int osc_join_lru(struct obd_export *exp,
3541                         struct lov_stripe_md *lsm, int join)
3542 {
3543         struct obd_device *obd = class_exp2obd(exp);
3544         struct ldlm_res_id res_id, *resp = NULL;
3545
3546         if (lsm != NULL) {
3547                 resp = osc_build_res_name(lsm->lsm_object_id,
3548                                           lsm->lsm_object_gr, &res_id);
3549         }
3550
3551         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3552
3553 }
3554
3555 static int osc_statfs_interpret(struct ptlrpc_request *req,
3556                                 void *data, int rc)
3557 {
3558         struct osc_async_args *aa = data;
3559         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3560         struct obd_statfs *msfs;
3561         __u64 used;
3562         ENTRY;
3563
3564         if (rc == -EBADR)
3565                 /* The request has in fact never been sent
3566                  * due to issues at a higher level (LOV).
3567                  * Exit immediately since the caller is
3568                  * aware of the problem and takes care
3569                  * of the clean up */
3570                  RETURN(rc);
3571
3572         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3573             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3574                 GOTO(out, rc = 0);
3575
3576         if (rc != 0)
3577                 GOTO(out, rc);
3578
3579         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3580                                   lustre_swab_obd_statfs);
3581         if (msfs == NULL) {
3582                 CERROR("Can't unpack obd_statfs\n");
3583                 GOTO(out, rc = -EPROTO);
3584         }
3585
3586         /* Reinitialize the RDONLY and DEGRADED flags at the client
3587          * on each statfs, so they don't stay set permanently. */
3588         spin_lock(&cli->cl_oscc.oscc_lock);
3589
3590         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3591                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3592         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3593                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3594
3595         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3596                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3597         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3598                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3599
3600         /* Add a bit of hysteresis so this flag isn't continually flapping,
3601          * and ensure that new files don't get extremely fragmented due to
3602          * only a small amount of available space in the filesystem.
3603          * We want to set the NOSPC flag when there is less than ~0.1% free
3604          * and clear it when there is at least ~0.2% free space, so:
3605          *                   avail < ~0.1% max          max = avail + used
3606          *            1025 * avail < avail + used       used = blocks - free
3607          *            1024 * avail < used
3608          *            1024 * avail < blocks - free
3609          *                   avail < ((blocks - free) >> 10)
3610          *
3611          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3612          * lose that amount of space so in those cases we report no space left
3613          * if their is less than 1 GB left.                             */
3614         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3615         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3616                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3617                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3618         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3619                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3620                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3621
3622         spin_unlock(&cli->cl_oscc.oscc_lock);
3623
3624         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3625 out:
3626         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3627         RETURN(rc);
3628 }
3629
3630 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3631                             __u64 max_age, struct ptlrpc_request_set *rqset)
3632 {
3633         struct ptlrpc_request *req;
3634         struct osc_async_args *aa;
3635         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3636         ENTRY;
3637
3638         /* We could possibly pass max_age in the request (as an absolute
3639          * timestamp or a "seconds.usec ago") so the target can avoid doing
3640          * extra calls into the filesystem if that isn't necessary (e.g.
3641          * during mount that would help a bit).  Having relative timestamps
3642          * is not so great if request processing is slow, while absolute
3643          * timestamps are not ideal because they need time synchronization. */
3644         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3645                               OST_STATFS, 1, NULL, NULL);
3646         if (!req)
3647                 RETURN(-ENOMEM);
3648
3649         ptlrpc_req_set_repsize(req, 2, size);
3650         req->rq_request_portal = OST_CREATE_PORTAL;
3651         ptlrpc_at_set_req_timeout(req);
3652         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3653                 /* procfs requests not want stat in wait for avoid deadlock */
3654                 req->rq_no_resend = 1;
3655                 req->rq_no_delay = 1;
3656         }
3657
3658         req->rq_interpret_reply = osc_statfs_interpret;
3659         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3660         aa = ptlrpc_req_async_args(req);
3661         aa->aa_oi = oinfo;
3662
3663         ptlrpc_set_add_req(rqset, req);
3664         RETURN(0);
3665 }
3666
3667 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3668                       __u64 max_age, __u32 flags)
3669 {
3670         struct obd_statfs *msfs;
3671         struct ptlrpc_request *req;
3672         struct obd_import     *imp = NULL;
3673         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3674         int rc;
3675         ENTRY;
3676
3677         /*Since the request might also come from lprocfs, so we need
3678          *sync this with client_disconnect_export Bug15684*/
3679         down_read(&obd->u.cli.cl_sem);
3680         if (obd->u.cli.cl_import)
3681                 imp = class_import_get(obd->u.cli.cl_import);
3682         up_read(&obd->u.cli.cl_sem);
3683         if (!imp)
3684                 RETURN(-ENODEV);
3685
3686         /* We could possibly pass max_age in the request (as an absolute
3687          * timestamp or a "seconds.usec ago") so the target can avoid doing
3688          * extra calls into the filesystem if that isn't necessary (e.g.
3689          * during mount that would help a bit).  Having relative timestamps
3690          * is not so great if request processing is slow, while absolute
3691          * timestamps are not ideal because they need time synchronization. */
3692         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3693                               OST_STATFS, 1, NULL, NULL);
3694
3695         class_import_put(imp);
3696         if (!req)
3697                 RETURN(-ENOMEM);
3698
3699         ptlrpc_req_set_repsize(req, 2, size);
3700         req->rq_request_portal = OST_CREATE_PORTAL;
3701         ptlrpc_at_set_req_timeout(req);
3702
3703         if (flags & OBD_STATFS_NODELAY) {
3704                 /* procfs requests not want stat in wait for avoid deadlock */
3705                 req->rq_no_resend = 1;
3706                 req->rq_no_delay = 1;
3707         }
3708
3709         rc = ptlrpc_queue_wait(req);
3710         if (rc)
3711                 GOTO(out, rc);
3712
3713         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3714                                   lustre_swab_obd_statfs);
3715         if (msfs == NULL) {
3716                 CERROR("Can't unpack obd_statfs\n");
3717                 GOTO(out, rc = -EPROTO);
3718         }
3719
3720         memcpy(osfs, msfs, sizeof(*osfs));
3721
3722         EXIT;
3723  out:
3724         ptlrpc_req_finished(req);
3725         return rc;
3726 }
3727
3728 /* Retrieve object striping information.
3729  *
3730  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3731  * the maximum number of OST indices which will fit in the user buffer.
3732  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3733  */
3734 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3735 {
3736         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3737         struct lov_user_md_v3 lum, *lumk;
3738         int rc = 0, lum_size;
3739         struct lov_user_ost_data_v1 *lmm_objects;
3740         ENTRY;
3741
3742         if (!lsm)
3743                 RETURN(-ENODATA);
3744
3745         /* we only need the header part from user space to get lmm_magic and
3746          * lmm_stripe_count, (the header part is common to v1 and v3) */
3747         lum_size = sizeof(struct lov_user_md_v1);
3748         memset(&lum, 0x00, sizeof(lum));
3749         if (copy_from_user(&lum, lump, lum_size))
3750                 RETURN(-EFAULT);
3751
3752         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3753             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3754                 RETURN(-EINVAL);
3755
3756         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3757         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3758         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3759         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3760
3761         /* we can use lov_mds_md_size() to compute lum_size
3762          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3763         if (lum.lmm_stripe_count > 0) {
3764                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3765                 OBD_ALLOC(lumk, lum_size);
3766                 if (!lumk)
3767                         RETURN(-ENOMEM);
3768                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3769                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3770                 else
3771                         lmm_objects = &(lumk->lmm_objects[0]);
3772                 lmm_objects->l_object_id = lsm->lsm_object_id;
3773         } else {
3774                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3775                 lumk = &lum;
3776         }
3777
3778         lumk->lmm_magic = lum.lmm_magic;
3779         lumk->lmm_stripe_count = 1;
3780         lumk->lmm_object_id = lsm->lsm_object_id;
3781
3782         if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3783             (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3784                /* lsm not in host order, so count also need be in same order */
3785                 __swab32s(&lumk->lmm_magic);
3786                 __swab16s(&lumk->lmm_stripe_count);
3787                 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3788                 if (lum.lmm_stripe_count > 0)
3789                         lustre_swab_lov_user_md_objects(
3790                                 (struct lov_user_md_v1*)lumk);
3791         }
3792
3793         if (copy_to_user(lump, lumk, lum_size))
3794                 rc = -EFAULT;
3795
3796         if (lumk != &lum)
3797                 OBD_FREE(lumk, lum_size);
3798
3799         RETURN(rc);
3800 }
3801
3802
3803 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3804                          void *karg, void *uarg)
3805 {
3806         struct obd_device *obd = exp->exp_obd;
3807         struct obd_ioctl_data *data = karg;
3808         int err = 0;
3809         ENTRY;
3810
3811         if (!try_module_get(THIS_MODULE)) {
3812                 CERROR("Can't get module. Is it alive?");
3813                 return -EINVAL;
3814         }
3815         switch (cmd) {
3816         case OBD_IOC_LOV_GET_CONFIG: {
3817                 char *buf;
3818                 struct lov_desc *desc;
3819                 struct obd_uuid uuid;
3820
3821                 buf = NULL;
3822                 len = 0;
3823                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3824                         GOTO(out, err = -EINVAL);
3825
3826                 data = (struct obd_ioctl_data *)buf;
3827
3828                 if (sizeof(*desc) > data->ioc_inllen1) {
3829                         obd_ioctl_freedata(buf, len);
3830                         GOTO(out, err = -EINVAL);
3831                 }
3832
3833                 if (data->ioc_inllen2 < sizeof(uuid)) {
3834                         obd_ioctl_freedata(buf, len);
3835                         GOTO(out, err = -EINVAL);
3836                 }
3837
3838                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3839                 desc->ld_tgt_count = 1;
3840                 desc->ld_active_tgt_count = 1;
3841                 desc->ld_default_stripe_count = 1;
3842                 desc->ld_default_stripe_size = 0;
3843                 desc->ld_default_stripe_offset = 0;
3844                 desc->ld_pattern = 0;
3845                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3846
3847                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3848
3849                 err = copy_to_user((void *)uarg, buf, len);
3850                 if (err)
3851                         err = -EFAULT;
3852                 obd_ioctl_freedata(buf, len);
3853                 GOTO(out, err);
3854         }
3855         case LL_IOC_LOV_SETSTRIPE:
3856                 err = obd_alloc_memmd(exp, karg);
3857                 if (err > 0)
3858                         err = 0;
3859                 GOTO(out, err);
3860         case LL_IOC_LOV_GETSTRIPE:
3861                 err = osc_getstripe(karg, uarg);
3862                 GOTO(out, err);
3863         case OBD_IOC_CLIENT_RECOVER:
3864                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3865                                             data->ioc_inlbuf1);
3866                 if (err > 0)
3867                         err = 0;
3868                 GOTO(out, err);
3869         case IOC_OSC_SET_ACTIVE:
3870                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3871                                                data->ioc_offset);
3872                 GOTO(out, err);
3873         case OBD_IOC_POLL_QUOTACHECK:
3874                 err = lquota_poll_check(quota_interface, exp,
3875                                         (struct if_quotacheck *)karg);
3876                 GOTO(out, err);
3877         case OBD_IOC_DESTROY: {
3878                 struct obdo            *oa;
3879
3880                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3881                         GOTO (out, err = -EPERM);
3882                 oa = &data->ioc_obdo1;
3883
3884                 if (oa->o_id == 0)
3885                         GOTO(out, err = -EINVAL);
3886
3887                 oa->o_valid |= OBD_MD_FLGROUP;
3888
3889                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3890                 GOTO(out, err);
3891         }
3892         case OBD_IOC_PING_TARGET:
3893                 err = ptlrpc_obd_ping(obd);
3894                 GOTO(out, err);
3895         default:
3896                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3897                        cmd, cfs_curproc_comm());
3898                 GOTO(out, err = -ENOTTY);
3899         }
3900 out:
3901         module_put(THIS_MODULE);
3902         return err;
3903 }
3904
3905 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3906                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3907 {
3908         ENTRY;
3909         if (!vallen || !val)
3910                 RETURN(-EFAULT);
3911
3912         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3913                 __u32 *stripe = val;
3914                 *vallen = sizeof(*stripe);
3915                 *stripe = 0;
3916                 RETURN(0);
3917         } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3918                 struct client_obd *cli = &exp->exp_obd->u.cli;
3919                 __u64 *rpcsize = val;
3920                 LASSERT(*vallen == sizeof(__u64));
3921                 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3922                 RETURN(0);
3923         } else if (KEY_IS(KEY_LAST_ID)) {
3924                 struct ptlrpc_request *req;
3925                 obd_id *reply;
3926                 char *bufs[2] = { NULL, key };
3927                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3928                 int rc;
3929
3930                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3931                                       OST_GET_INFO, 2, size, bufs);
3932                 if (req == NULL)
3933                         RETURN(-ENOMEM);
3934
3935                 size[REPLY_REC_OFF] = *vallen;
3936                 ptlrpc_req_set_repsize(req, 2, size);
3937                 rc = ptlrpc_queue_wait(req);
3938                 if (rc)
3939                         GOTO(out, rc);
3940
3941                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3942                                            lustre_swab_ost_last_id);
3943                 if (reply == NULL) {
3944                         CERROR("Can't unpack OST last ID\n");
3945                         GOTO(out, rc = -EPROTO);
3946                 }
3947                 *((obd_id *)val) = *reply;
3948         out:
3949                 ptlrpc_req_finished(req);
3950                 RETURN(rc);
3951         } else if (KEY_IS(KEY_FIEMAP)) {
3952                 struct ptlrpc_request *req;
3953                 struct ll_user_fiemap *reply;
3954                 char *bufs[2] = { NULL, key };
3955                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3956                 int rc;
3957
3958                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3959                                       OST_GET_INFO, 2, size, bufs);
3960                 if (req == NULL)
3961                         RETURN(-ENOMEM);
3962
3963                 size[REPLY_REC_OFF] = *vallen;
3964                 ptlrpc_req_set_repsize(req, 2, size);
3965
3966                 rc = ptlrpc_queue_wait(req);
3967                 if (rc)
3968                         GOTO(out1, rc);
3969                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3970                                            lustre_swab_fiemap);
3971                 if (reply == NULL) {
3972                         CERROR("Can't unpack FIEMAP reply.\n");
3973                         GOTO(out1, rc = -EPROTO);
3974                 }
3975
3976                 memcpy(val, reply, *vallen);
3977
3978         out1:
3979                 ptlrpc_req_finished(req);
3980
3981                 RETURN(rc);
3982         }
3983
3984         RETURN(-EINVAL);
3985 }
3986
3987 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3988                                           void *aa, int rc)
3989 {
3990         struct llog_ctxt *ctxt;
3991         struct obd_import *imp = req->rq_import;
3992         ENTRY;
3993
3994         if (rc != 0)
3995                 RETURN(rc);
3996
3997         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3998         if (ctxt) {
3999                 if (rc == 0)
4000                         rc = llog_initiator_connect(ctxt);
4001                 else
4002                         CERROR("cannot establish connection for "
4003                                "ctxt %p: %d\n", ctxt, rc);
4004         }
4005
4006         llog_ctxt_put(ctxt);
4007         spin_lock(&imp->imp_lock);
4008         imp->imp_server_timeout = 1;
4009         imp->imp_pingable = 1;
4010         spin_unlock(&imp->imp_lock);
4011         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4012
4013         RETURN(rc);
4014 }
4015
4016 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4017                               void *key, obd_count vallen, void *val,
4018                               struct ptlrpc_request_set *set)
4019 {
4020         struct ptlrpc_request *req;
4021         struct obd_device  *obd = exp->exp_obd;
4022         struct obd_import *imp = class_exp2cliimp(exp);
4023         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4024         char *bufs[3] = { NULL, key, val };
4025         ENTRY;
4026
4027         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4028
4029         if (KEY_IS(KEY_NEXT_ID)) {
4030                 obd_id new_val;
4031                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4032
4033                 if (vallen != sizeof(obd_id))
4034                         RETURN(-EINVAL);
4035
4036                 /* avoid race between allocate new object and set next id
4037                  * from ll_sync thread */
4038                 spin_lock(&oscc->oscc_lock);
4039                 new_val = *((obd_id*)val) + 1;
4040                 if (new_val > oscc->oscc_next_id)
4041                         oscc->oscc_next_id = new_val;
4042                 spin_unlock(&oscc->oscc_lock);
4043
4044                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4045                        exp->exp_obd->obd_name,
4046                        oscc->oscc_next_id);
4047
4048                 RETURN(0);
4049         }
4050
4051         if (KEY_IS(KEY_INIT_RECOV)) {
4052                 if (vallen != sizeof(int))
4053                         RETURN(-EINVAL);
4054                 spin_lock(&imp->imp_lock);
4055                 imp->imp_initial_recov = *(int *)val;
4056                 spin_unlock(&imp->imp_lock);
4057                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4058                        exp->exp_obd->obd_name,
4059                        imp->imp_initial_recov);
4060                 RETURN(0);
4061         }
4062
4063         if (KEY_IS(KEY_CHECKSUM)) {
4064                 if (vallen != sizeof(int))
4065                         RETURN(-EINVAL);
4066                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4067                 RETURN(0);
4068         }
4069
4070         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4071                 RETURN(-EINVAL);
4072
4073         /* We pass all other commands directly to OST. Since nobody calls osc
4074            methods directly and everybody is supposed to go through LOV, we
4075            assume lov checked invalid values for us.
4076            The only recognised values so far are evict_by_nid and mds_conn.
4077            Even if something bad goes through, we'd get a -EINVAL from OST
4078            anyway. */
4079
4080         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4081                               bufs);
4082         if (req == NULL)
4083                 RETURN(-ENOMEM);
4084
4085         if (KEY_IS(KEY_MDS_CONN))
4086                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4087         else if (KEY_IS(KEY_GRANT_SHRINK))
4088                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4089
4090         if (KEY_IS(KEY_GRANT_SHRINK)) {
4091                 struct osc_grant_args *aa;
4092                 struct obdo *oa;
4093
4094                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4095                 aa = ptlrpc_req_async_args(req);
4096                 OBD_ALLOC_PTR(oa);
4097                 if (!oa) {
4098                         ptlrpc_req_finished(req);
4099                         RETURN(-ENOMEM);
4100                 }
4101                 *oa = ((struct ost_body *)val)->oa;
4102                 aa->aa_oa = oa;
4103
4104                 size[1] = vallen;
4105                 ptlrpc_req_set_repsize(req, 2, size);
4106                 ptlrpcd_add_req(req);
4107         } else {
4108                 ptlrpc_req_set_repsize(req, 1, NULL);
4109                 ptlrpc_set_add_req(set, req);
4110                 ptlrpc_check_set(set);
4111         }
4112
4113         RETURN(0);
4114 }
4115
4116
4117 static struct llog_operations osc_size_repl_logops = {
4118         lop_cancel: llog_obd_repl_cancel
4119 };
4120
4121 static struct llog_operations osc_mds_ost_orig_logops;
4122 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4123                          int *index)
4124 {
4125         struct llog_catid catid;
4126         static char name[32] = CATLIST;
4127         int rc;
4128         ENTRY;
4129
4130         LASSERT(index);
4131
4132         mutex_down(&disk_obd->obd_llog_cat_process);
4133
4134         rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4135         if (rc) {
4136                 CERROR("rc: %d\n", rc);
4137                 GOTO(out_unlock, rc);
4138         }
4139 #if 0
4140         CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4141                obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4142                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4143 #endif
4144
4145         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4146                         &catid.lci_logid, &osc_mds_ost_orig_logops);
4147         if (rc) {
4148                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4149                 GOTO (out, rc);
4150         }
4151
4152         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4153                         &osc_size_repl_logops);
4154         if (rc) {
4155                 struct llog_ctxt *ctxt =
4156                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4157                 if (ctxt)
4158                         llog_cleanup(ctxt);
4159                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4160         }
4161 out:
4162         if (rc) {
4163                 CERROR("osc '%s' tgt '%s' rc=%d\n",
4164                        obd->obd_name, disk_obd->obd_name, rc);
4165                 CERROR("logid "LPX64":0x%x\n",
4166                        catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4167         } else {
4168                 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4169                                        &catid);
4170                 if (rc)
4171                         CERROR("rc: %d\n", rc);
4172         }
4173 out_unlock:
4174         mutex_up(&disk_obd->obd_llog_cat_process);
4175
4176         RETURN(rc);
4177 }
4178
4179 static int osc_llog_finish(struct obd_device *obd, int count)
4180 {
4181         struct llog_ctxt *ctxt;
4182         int rc = 0, rc2 = 0;
4183         ENTRY;
4184
4185         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4186         if (ctxt)
4187                 rc = llog_cleanup(ctxt);
4188
4189         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4190         if (ctxt)
4191                 rc2 = llog_cleanup(ctxt);
4192         if (!rc)
4193                 rc = rc2;
4194
4195         RETURN(rc);
4196 }
4197
4198 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4199                          struct obd_uuid *cluuid,
4200                          struct obd_connect_data *data,
4201                          void *localdata)
4202 {
4203         struct client_obd *cli = &obd->u.cli;
4204
4205         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4206                 long lost_grant;
4207
4208                 client_obd_list_lock(&cli->cl_loi_list_lock);
4209                 data->ocd_grant = cli->cl_avail_grant ?:
4210                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4211                 lost_grant = cli->cl_lost_grant;
4212                 cli->cl_lost_grant = 0;
4213                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4214
4215                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4216                        "cl_lost_grant: %ld\n", data->ocd_grant,
4217                        cli->cl_avail_grant, lost_grant);
4218                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4219                        " ocd_grant: %d\n", data->ocd_connect_flags,
4220                        data->ocd_version, data->ocd_grant);
4221         }
4222
4223         RETURN(0);
4224 }
4225
4226 static int osc_disconnect(struct obd_export *exp)
4227 {
4228         struct obd_device *obd = class_exp2obd(exp);
4229         struct llog_ctxt  *ctxt;
4230         int rc;
4231
4232         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4233         if (ctxt) {
4234                 if (obd->u.cli.cl_conn_count == 1) {
4235                         /* Flush any remaining cancel messages out to the
4236                          * target */
4237                         llog_sync(ctxt, exp);
4238                 }
4239                 llog_ctxt_put(ctxt);
4240         } else {
4241                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4242                        obd);
4243         }
4244
4245         rc = client_disconnect_export(exp);
4246         /**
4247          * Initially we put del_shrink_grant before disconnect_export, but it
4248          * causes the following problem if setup (connect) and cleanup
4249          * (disconnect) are tangled together.
4250          *      connect p1                     disconnect p2
4251          *   ptlrpc_connect_import
4252          *     ...............               class_manual_cleanup
4253          *                                     osc_disconnect
4254          *                                     del_shrink_grant
4255          *   ptlrpc_connect_interrupt
4256          *     init_grant_shrink
4257          *   add this client to shrink list
4258          *                                      cleanup_osc
4259          * Bang! pinger trigger the shrink.
4260          * So the osc should be disconnected from the shrink list, after we
4261          * are sure the import has been destroyed. BUG18662
4262          */
4263         if (obd->u.cli.cl_import == NULL)
4264                 osc_del_shrink_grant(&obd->u.cli);
4265         return rc;
4266 }
4267
4268 static int osc_import_event(struct obd_device *obd,
4269                             struct obd_import *imp,
4270                             enum obd_import_event event)
4271 {
4272         struct client_obd *cli;
4273         int rc = 0;
4274
4275         ENTRY;
4276         LASSERT(imp->imp_obd == obd);
4277
4278         switch (event) {
4279         case IMP_EVENT_DISCON: {
4280                 /* Only do this on the MDS OSC's */
4281                 if (imp->imp_server_timeout) {
4282                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4283
4284                         spin_lock(&oscc->oscc_lock);
4285                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4286                         spin_unlock(&oscc->oscc_lock);
4287                 }
4288                 cli = &obd->u.cli;
4289                 client_obd_list_lock(&cli->cl_loi_list_lock);
4290                 cli->cl_avail_grant = 0;
4291                 cli->cl_lost_grant = 0;
4292                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4293                 ptlrpc_import_setasync(imp, -1);
4294
4295                 break;
4296         }
4297         case IMP_EVENT_INACTIVE: {
4298                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4299                 break;
4300         }
4301         case IMP_EVENT_INVALIDATE: {
4302                 struct ldlm_namespace *ns = obd->obd_namespace;
4303
4304                 /* Reset grants */
4305                 cli = &obd->u.cli;
4306                 client_obd_list_lock(&cli->cl_loi_list_lock);
4307                 /* all pages go to failing rpcs due to the invalid import */
4308                 osc_check_rpcs(cli);
4309                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4310
4311                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4312
4313                 break;
4314         }
4315         case IMP_EVENT_ACTIVE: {
4316                 /* Only do this on the MDS OSC's */
4317                 if (imp->imp_server_timeout) {
4318                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4319
4320                         spin_lock(&oscc->oscc_lock);
4321                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4322                         spin_unlock(&oscc->oscc_lock);
4323                 }
4324                 CDEBUG(D_INFO, "notify server \n");
4325                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4326                 break;
4327         }
4328         case IMP_EVENT_OCD: {
4329                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4330
4331                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4332                         osc_init_grant(&obd->u.cli, ocd);
4333
4334                 /* See bug 7198 */
4335                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4336                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4337
4338                 ptlrpc_import_setasync(imp, 1);
4339                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4340                 break;
4341         }
4342         default:
4343                 CERROR("Unknown import event %d\n", event);
4344                 LBUG();
4345         }
4346         RETURN(rc);
4347 }
4348
4349 /* determine whether the lock can be canceled before replaying the lock
4350  * during recovery, see bug16774 for detailed information 
4351  *
4352  * return values:
4353  *  zero  - the lock can't be canceled
4354  *  other - ok to cancel
4355  */
4356 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4357 {
4358         check_res_locked(lock->l_resource);
4359         if (lock->l_granted_mode == LCK_GROUP || 
4360             lock->l_resource->lr_type != LDLM_EXTENT)
4361                 RETURN(0);
4362
4363         /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4364         if (lock->l_granted_mode == LCK_PR ||
4365             lock->l_granted_mode == LCK_CR)
4366                 RETURN(1);
4367
4368         RETURN(0);       
4369 }
4370
4371 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4372 {
4373         int rc;
4374         ENTRY;
4375
4376         ENTRY;
4377         rc = ptlrpcd_addref();
4378         if (rc)
4379                 RETURN(rc);
4380
4381         rc = client_obd_setup(obd, len, buf);
4382         if (rc) {
4383                 ptlrpcd_decref();
4384         } else {
4385                 struct lprocfs_static_vars lvars = { 0 };
4386                 struct client_obd *cli = &obd->u.cli;
4387
4388                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4389                 lprocfs_osc_init_vars(&lvars);
4390                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4391                         lproc_osc_attach_seqstat(obd);
4392                         ptlrpc_lprocfs_register_obd(obd);
4393                 }
4394
4395                 oscc_init(obd);
4396                 /* We need to allocate a few requests more, because
4397                    brw_interpret tries to create new requests before freeing
4398                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4399                    reserved, but I afraid that might be too much wasted RAM
4400                    in fact, so 2 is just my guess and still should work. */
4401                 cli->cl_import->imp_rq_pool =
4402                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4403                                             OST_MAXREQSIZE,
4404                                             ptlrpc_add_rqs_to_pool);
4405                 cli->cl_cache = cache_create(obd);
4406                 if (!cli->cl_cache) {
4407                         osc_cleanup(obd);
4408                         rc = -ENOMEM;
4409                 }
4410                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4411                 sema_init(&cli->cl_grant_sem, 1);
4412
4413                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4414         }
4415
4416         RETURN(rc);
4417 }
4418
4419 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4420 {
4421         int rc = 0;
4422         ENTRY;
4423
4424         switch (stage) {
4425         case OBD_CLEANUP_EARLY: {
4426                 struct obd_import *imp;
4427                 imp = obd->u.cli.cl_import;
4428                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4429                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4430                 ptlrpc_deactivate_import(imp);
4431                 break;
4432         }
4433         case OBD_CLEANUP_EXPORTS: {
4434                 /* If we set up but never connected, the
4435                    client import will not have been cleaned. */
4436                 down_write(&obd->u.cli.cl_sem);
4437                 if (obd->u.cli.cl_import) {
4438                         struct obd_import *imp;
4439                         imp = obd->u.cli.cl_import;
4440                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4441                                obd->obd_name);
4442                         ptlrpc_invalidate_import(imp);
4443                         if (imp->imp_rq_pool) {
4444                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4445                                 imp->imp_rq_pool = NULL;
4446                         }
4447                         class_destroy_import(imp);
4448                         obd->u.cli.cl_import = NULL;
4449                 }
4450                 up_write(&obd->u.cli.cl_sem);
4451
4452                 rc = obd_llog_finish(obd, 0);
4453                 if (rc != 0)
4454                         CERROR("failed to cleanup llogging subsystems\n");
4455                 break;
4456         }
4457         case OBD_CLEANUP_SELF_EXP:
4458                 break;
4459         case OBD_CLEANUP_OBD:
4460                 break;
4461         }
4462         RETURN(rc);
4463 }
4464
4465 int osc_cleanup(struct obd_device *obd)
4466 {
4467         int rc;
4468
4469         ENTRY;
4470         ptlrpc_lprocfs_unregister_obd(obd);
4471         lprocfs_obd_cleanup(obd);
4472
4473         /* free memory of osc quota cache */
4474         lquota_cleanup(quota_interface, obd);
4475
4476         cache_destroy(obd->u.cli.cl_cache);
4477         rc = client_obd_cleanup(obd);
4478
4479         ptlrpcd_decref();
4480         RETURN(rc);
4481 }
4482
4483 static int osc_register_page_removal_cb(struct obd_device *obd,
4484                                         obd_page_removal_cb_t func,
4485                                         obd_pin_extent_cb pin_cb)
4486 {
4487         ENTRY;
4488
4489         /* this server - not need init */
4490         if (func == NULL)
4491                 return 0;
4492
4493         return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4494                                            pin_cb);
4495 }
4496
4497 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4498                                           obd_page_removal_cb_t func)
4499 {
4500         ENTRY;
4501         return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4502 }
4503
4504 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4505                                        obd_lock_cancel_cb cb)
4506 {
4507         ENTRY;
4508         LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4509
4510         /* this server - not need init */
4511         if (cb == NULL)
4512                 return 0;
4513
4514         obd->u.cli.cl_ext_lock_cancel_cb = cb;
4515         return 0;
4516 }
4517
4518 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4519                                          obd_lock_cancel_cb cb)
4520 {
4521         ENTRY;
4522
4523         if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4524                 CERROR("Unregistering cancel cb %p, while only %p was "
4525                        "registered\n", cb,
4526                        obd->u.cli.cl_ext_lock_cancel_cb);
4527                 RETURN(-EINVAL);
4528         }
4529
4530         obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4531         return 0;
4532 }
4533
4534 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4535 {
4536         struct lustre_cfg *lcfg = buf;
4537         struct lprocfs_static_vars lvars = { 0 };
4538         int rc = 0;
4539
4540         lprocfs_osc_init_vars(&lvars);
4541
4542         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4543         return(rc);
4544 }
4545
4546 struct obd_ops osc_obd_ops = {
4547         .o_owner                = THIS_MODULE,
4548         .o_setup                = osc_setup,
4549         .o_precleanup           = osc_precleanup,
4550         .o_cleanup              = osc_cleanup,
4551         .o_add_conn             = client_import_add_conn,
4552         .o_del_conn             = client_import_del_conn,
4553         .o_connect              = client_connect_import,
4554         .o_reconnect            = osc_reconnect,
4555         .o_disconnect           = osc_disconnect,
4556         .o_statfs               = osc_statfs,
4557         .o_statfs_async         = osc_statfs_async,
4558         .o_packmd               = osc_packmd,
4559         .o_unpackmd             = osc_unpackmd,
4560         .o_precreate            = osc_precreate,
4561         .o_create               = osc_create,
4562         .o_create_async         = osc_create_async,
4563         .o_destroy              = osc_destroy,
4564         .o_getattr              = osc_getattr,
4565         .o_getattr_async        = osc_getattr_async,
4566         .o_setattr              = osc_setattr,
4567         .o_setattr_async        = osc_setattr_async,
4568         .o_brw                  = osc_brw,
4569         .o_brw_async            = osc_brw_async,
4570         .o_prep_async_page      = osc_prep_async_page,
4571         .o_get_lock             = osc_get_lock,
4572         .o_queue_async_io       = osc_queue_async_io,
4573         .o_set_async_flags      = osc_set_async_flags,
4574         .o_queue_group_io       = osc_queue_group_io,
4575         .o_trigger_group_io     = osc_trigger_group_io,
4576         .o_teardown_async_page  = osc_teardown_async_page,
4577         .o_punch                = osc_punch,
4578         .o_sync                 = osc_sync,
4579         .o_enqueue              = osc_enqueue,
4580         .o_match                = osc_match,
4581         .o_change_cbdata        = osc_change_cbdata,
4582         .o_cancel               = osc_cancel,
4583         .o_cancel_unused        = osc_cancel_unused,
4584         .o_join_lru             = osc_join_lru,
4585         .o_iocontrol            = osc_iocontrol,
4586         .o_get_info             = osc_get_info,
4587         .o_set_info_async       = osc_set_info_async,
4588         .o_import_event         = osc_import_event,
4589         .o_llog_init            = osc_llog_init,
4590         .o_llog_finish          = osc_llog_finish,
4591         .o_process_config       = osc_process_config,
4592         .o_register_page_removal_cb = osc_register_page_removal_cb,
4593         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4594         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4595         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4596 };
4597 int __init osc_init(void)
4598 {
4599         struct lprocfs_static_vars lvars = { 0 };
4600         int rc;
4601         ENTRY;
4602
4603         lprocfs_osc_init_vars(&lvars);
4604
4605         request_module("lquota");
4606         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4607         lquota_init(quota_interface);
4608         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4609
4610         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4611                                  LUSTRE_OSC_NAME);
4612         if (rc) {
4613                 if (quota_interface)
4614                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4615                 RETURN(rc);
4616         }
4617
4618         osc_mds_ost_orig_logops = llog_lvfs_ops;
4619         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4620         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4621         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4622         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4623
4624         RETURN(rc);
4625 }
4626
4627 #ifdef __KERNEL__
4628 static void /*__exit*/ osc_exit(void)
4629 {
4630         lquota_exit(quota_interface);
4631         if (quota_interface)
4632                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4633
4634         class_unregister_type(LUSTRE_OSC_NAME);
4635 }
4636
4637 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4638 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4639 MODULE_LICENSE("GPL");
4640
4641 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4642 #endif