Whamcloud - gitweb
b=16909 Use INFO/WARN instead of WARN/ERROR for the slow messages.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  void *data, int rc)
168 {
169         struct ost_body *body;
170         struct osc_async_args *aa = data;
171         ENTRY;
172
173         if (rc != 0)
174                 GOTO(out, rc);
175
176         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177                                   lustre_swab_ost_body);
178         if (body) {
179                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
181
182                 /* This should really be sent by the OST */
183                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
185         } else {
186                 CERROR("can't unpack ost_body\n");
187                 rc = -EPROTO;
188                 aa->aa_oi->oi_oa->o_valid = 0;
189         }
190 out:
191         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
192         RETURN(rc);
193 }
194
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196                              struct ptlrpc_request_set *set)
197 {
198         struct ptlrpc_request *req;
199         struct ost_body *body;
200         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201         struct osc_async_args *aa;
202         ENTRY;
203
204         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205                               OST_GETATTR, 2, size,NULL);
206         if (!req)
207                 RETURN(-ENOMEM);
208
209         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
211
212         ptlrpc_req_set_repsize(req, 2, size);
213         req->rq_interpret_reply = osc_getattr_interpret;
214
215         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216         aa = ptlrpc_req_async_args(req);
217         aa->aa_oi = oinfo;
218
219         ptlrpc_set_add_req(set, req);
220         RETURN (0);
221 }
222
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
224 {
225         struct ptlrpc_request *req;
226         struct ost_body *body;
227         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
228         int rc;
229         ENTRY;
230
231         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232                               OST_GETATTR, 2, size, NULL);
233         if (!req)
234                 RETURN(-ENOMEM);
235
236         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
238
239         ptlrpc_req_set_repsize(req, 2, size);
240
241         rc = ptlrpc_queue_wait(req);
242         if (rc) {
243                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
244                 GOTO(out, rc);
245         }
246
247         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248                                   lustre_swab_ost_body);
249         if (body == NULL) {
250                 CERROR ("can't unpack ost_body\n");
251                 GOTO (out, rc = -EPROTO);
252         }
253
254         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
256
257         /* This should really be sent by the OST */
258         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
260
261         EXIT;
262  out:
263         ptlrpc_req_finished(req);
264         return rc;
265 }
266
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268                        struct obd_trans_info *oti)
269 {
270         struct ptlrpc_request *req;
271         struct ost_body *body;
272         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
273         int rc;
274         ENTRY;
275
276         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277                               OST_SETATTR, 2, size, NULL);
278         if (!req)
279                 RETURN(-ENOMEM);
280
281         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
283
284         ptlrpc_req_set_repsize(req, 2, size);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291                                   lustre_swab_ost_body);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         EXIT;
298 out:
299         ptlrpc_req_finished(req);
300         RETURN(rc);
301 }
302
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
304                                  void *data, int rc)
305 {
306         struct ost_body *body;
307         struct osc_async_args *aa = data;
308         ENTRY;
309
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314                                   lustre_swab_ost_body);
315         if (body == NULL) {
316                 CERROR("can't unpack ost_body\n");
317                 GOTO(out, rc = -EPROTO);
318         }
319
320         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
321 out:
322         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
323         RETURN(rc);
324 }
325
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327                              struct obd_trans_info *oti,
328                              struct ptlrpc_request_set *rqset)
329 {
330         struct ptlrpc_request *req;
331         struct ost_body *body;
332         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
333         int bufcount = 2;
334         struct osc_async_args *aa;
335         ENTRY;
336
337         if (osc_exp_is_2_0_server(exp)) {
338                 bufcount = 3;
339         }
340
341         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342                               OST_SETATTR, bufcount, size, NULL);
343         if (!req)
344                 RETURN(-ENOMEM);
345
346         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
347
348         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
349                 LASSERT(oti);
350                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
351         }
352
353         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354         ptlrpc_req_set_repsize(req, 2, size);
355         /* do mds to ost setattr asynchronouly */
356         if (!rqset) {
357                 /* Do not wait for response. */
358                 ptlrpcd_add_req(req);
359         } else {
360                 req->rq_interpret_reply = osc_setattr_interpret;
361
362                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363                 aa = ptlrpc_req_async_args(req);
364                 aa->aa_oi = oinfo;
365
366                 ptlrpc_set_add_req(rqset, req);
367         }
368
369         RETURN(0);
370 }
371
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
374 {
375         struct ptlrpc_request *req;
376         struct ost_body *body;
377         struct lov_stripe_md *lsm;
378         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
379         int rc;
380         ENTRY;
381
382         LASSERT(oa);
383         LASSERT(ea);
384
385         lsm = *ea;
386         if (!lsm) {
387                 rc = obd_alloc_memmd(exp, &lsm);
388                 if (rc < 0)
389                         RETURN(rc);
390         }
391
392         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393                               OST_CREATE, 2, size, NULL);
394         if (!req)
395                 GOTO(out, rc = -ENOMEM);
396
397         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398         lustre_set_wire_obdo(&body->oa, oa);
399
400         ptlrpc_req_set_repsize(req, 2, size);
401         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402             oa->o_flags == OBD_FL_DELORPHAN) {
403                 DEBUG_REQ(D_HA, req,
404                           "delorphan from OST integration");
405                 /* Don't resend the delorphan req */
406                 req->rq_no_resend = req->rq_no_delay = 1;
407         }
408
409         rc = ptlrpc_queue_wait(req);
410         if (rc)
411                 GOTO(out_req, rc);
412
413         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414                                   lustre_swab_ost_body);
415         if (body == NULL) {
416                 CERROR ("can't unpack ost_body\n");
417                 GOTO (out_req, rc = -EPROTO);
418         }
419
420         lustre_get_wire_obdo(oa, &body->oa);
421
422         /* This should really be sent by the OST */
423         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424         oa->o_valid |= OBD_MD_FLBLKSZ;
425
426         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427          * have valid lsm_oinfo data structs, so don't go touching that.
428          * This needs to be fixed in a big way.
429          */
430         lsm->lsm_object_id = oa->o_id;
431         *ea = lsm;
432
433         if (oti != NULL) {
434                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
435
436                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437                         if (!oti->oti_logcookies)
438                                 oti_alloc_cookies(oti, 1);
439                         *oti->oti_logcookies = oa->o_lcookie;
440                 }
441         }
442
443         CDEBUG(D_HA, "transno: "LPD64"\n",
444                lustre_msg_get_transno(req->rq_repmsg));
445 out_req:
446         ptlrpc_req_finished(req);
447 out:
448         if (rc && !*ea)
449                 obd_free_memmd(exp, &lsm);
450         RETURN(rc);
451 }
452
453 static int osc_punch_interpret(struct ptlrpc_request *req,
454                                void *data, int rc)
455 {
456         struct ost_body *body;
457         struct osc_async_args *aa = data;
458         ENTRY;
459
460         if (rc != 0)
461                 GOTO(out, rc);
462
463         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464                                   lustre_swab_ost_body);
465         if (body == NULL) {
466                 CERROR ("can't unpack ost_body\n");
467                 GOTO(out, rc = -EPROTO);
468         }
469
470         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
471 out:
472         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
473         RETURN(rc);
474 }
475
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477                      struct obd_trans_info *oti,
478                      struct ptlrpc_request_set *rqset)
479 {
480         struct ptlrpc_request *req;
481         struct osc_async_args *aa;
482         struct ost_body *body;
483         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
484         ENTRY;
485
486         if (!oinfo->oi_oa) {
487                 CERROR("oa NULL\n");
488                 RETURN(-EINVAL);
489         }
490
491         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492                               OST_PUNCH, 2, size, NULL);
493         if (!req)
494                 RETURN(-ENOMEM);
495
496         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
497         ptlrpc_at_set_req_timeout(req);
498
499         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
501
502         /* overload the size and blocks fields in the oa with start/end */
503         body->oa.o_size = oinfo->oi_policy.l_extent.start;
504         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
506
507         ptlrpc_req_set_repsize(req, 2, size);
508
509         req->rq_interpret_reply = osc_punch_interpret;
510         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511         aa = ptlrpc_req_async_args(req);
512         aa->aa_oi = oinfo;
513         ptlrpc_set_add_req(rqset, req);
514
515         RETURN(0);
516 }
517
518 static int osc_sync_interpret(struct ptlrpc_request *req,
519                               void *data, int rc)
520 {
521         struct ost_body *body;
522         struct osc_async_args *aa = data;
523         ENTRY;
524
525         if (rc)
526                 GOTO(out, rc);
527
528         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529                                   lustre_swab_ost_body);
530         if (body == NULL) {
531                 CERROR ("can't unpack ost_body\n");
532                 GOTO(out, rc = -EPROTO);
533         }
534
535         *aa->aa_oi->oi_oa = body->oa;
536 out:
537         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
538         RETURN(rc);
539 }
540
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542                     obd_size start, obd_size end,
543                     struct ptlrpc_request_set *set)
544 {
545         struct ptlrpc_request *req;
546         struct ost_body *body;
547         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548         struct osc_async_args *aa;
549         ENTRY;
550
551         if (!oinfo->oi_oa) {
552                 CERROR("oa NULL\n");
553                 RETURN(-EINVAL);
554         }
555
556         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557                               OST_SYNC, 2, size, NULL);
558         if (!req)
559                 RETURN(-ENOMEM);
560
561         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
563
564         /* overload the size and blocks fields in the oa with start/end */
565         body->oa.o_size = start;
566         body->oa.o_blocks = end;
567         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
568
569         ptlrpc_req_set_repsize(req, 2, size);
570         req->rq_interpret_reply = osc_sync_interpret;
571
572         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573         aa = ptlrpc_req_async_args(req);
574         aa->aa_oi = oinfo;
575
576         ptlrpc_set_add_req(set, req);
577         RETURN (0);
578 }
579
580 /* Find and cancel locally locks matched by @mode in the resource found by
581  * @objid. Found locks are added into @cancel list. Returns the amount of
582  * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584                                    struct list_head *cancels, ldlm_mode_t mode,
585                                    int lock_flags)
586 {
587         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588         struct ldlm_res_id res_id;
589         struct ldlm_resource *res;
590         int count;
591         ENTRY;
592
593         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
595         if (res == NULL)
596                 RETURN(0);
597
598         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599                                            lock_flags, 0, NULL);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
605                                  int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         cfs_waitq_signal(&cli->cl_destroy_waitq);
611         return 0;
612 }
613
614 static int osc_can_send_destroy(struct client_obd *cli)
615 {
616         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617             cli->cl_max_rpcs_in_flight) {
618                 /* The destroy request can be sent */
619                 return 1;
620         }
621         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622             cli->cl_max_rpcs_in_flight) {
623                 /*
624                  * The counter has been modified between the two atomic
625                  * operations.
626                  */
627                 cfs_waitq_signal(&cli->cl_destroy_waitq);
628         }
629         return 0;
630 }
631
632 /* Destroy requests can be async always on the client, and we don't even really
633  * care about the return code since the client cannot do anything at all about
634  * a destroy failure.
635  * When the MDS is unlinking a filename, it saves the file objects into a
636  * recovery llog, and these object records are cancelled when the OST reports
637  * they were destroyed and sync'd to disk (i.e. transaction committed).
638  * If the client dies, or the OST is down when the object should be destroyed,
639  * the records are not cancelled, and when the OST reconnects to the MDS next,
640  * it will retrieve the llog unlink logs and then sends the log cancellation
641  * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
644                        struct obd_export *md_export)
645 {
646         CFS_LIST_HEAD(cancels);
647         struct ptlrpc_request *req;
648         struct ost_body *body;
649         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650                         sizeof(struct ldlm_request) };
651         int count, bufcount = 2;
652         struct client_obd *cli = &exp->exp_obd->u.cli;
653         ENTRY;
654
655         if (!oa) {
656                 CERROR("oa NULL\n");
657                 RETURN(-EINVAL);
658         }
659
660         LASSERT(oa->o_id != 0);
661
662         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663                                         LDLM_FL_DISCARD_DATA);
664         if (exp_connect_cancelset(exp))
665                 bufcount = 3;
666         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
668         if (!req)
669                 RETURN(-ENOMEM);
670
671         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
672         ptlrpc_at_set_req_timeout(req);
673
674         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
675
676         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677                 oa->o_lcookie = *oti->oti_logcookies;
678         }
679
680         lustre_set_wire_obdo(&body->oa, oa);
681         ptlrpc_req_set_repsize(req, 2, size);
682
683         /* don't throttle destroy RPCs for the MDT */
684         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685                 req->rq_interpret_reply = osc_destroy_interpret;
686                 if (!osc_can_send_destroy(cli)) {
687                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
688                                                           NULL);
689
690                         /*
691                          * Wait until the number of on-going destroy RPCs drops
692                          * under max_rpc_in_flight
693                          */
694                         l_wait_event_exclusive(cli->cl_destroy_waitq,
695                                                osc_can_send_destroy(cli), &lwi);
696                 }
697         }
698
699         /* Do not wait for response */
700         ptlrpcd_add_req(req);
701         RETURN(0);
702 }
703
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
705                                 long writing_bytes)
706 {
707         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
708
709         LASSERT(!(oa->o_valid & bits));
710
711         oa->o_valid |= bits;
712         client_obd_list_lock(&cli->cl_loi_list_lock);
713         oa->o_dirty = cli->cl_dirty;
714         if (cli->cl_dirty > cli->cl_dirty_max) {
715                 CERROR("dirty %lu > dirty_max %lu\n",
716                        cli->cl_dirty, cli->cl_dirty_max);
717                 oa->o_undirty = 0;
718         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719                 /* The atomic_read() allowing the atomic_inc() are not covered
720                  * by a lock thus they may safely race and trip this CERROR()
721                  * unless we add in a small fudge factor (+1). */
722                 CERROR("dirty %d > system dirty_max %d\n",
723                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
724                 oa->o_undirty = 0;
725         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726                 CERROR("dirty %lu - dirty_max %lu too big???\n",
727                        cli->cl_dirty, cli->cl_dirty_max);
728                 oa->o_undirty = 0;
729         } else {
730                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731                                 (cli->cl_max_rpcs_in_flight + 1);
732                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
733         }
734         oa->o_grant = cli->cl_avail_grant;
735         oa->o_dropped = cli->cl_lost_grant;
736         cli->cl_lost_grant = 0;
737         client_obd_list_unlock(&cli->cl_loi_list_lock);
738         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
740
741 }
742
743 static void osc_update_next_shrink(struct client_obd *cli)
744 {
745         cli->cl_next_shrink_grant =
746                 cfs_time_shift(cli->cl_grant_shrink_interval);
747         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748                cli->cl_next_shrink_grant);
749 }
750
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
753 {
754         atomic_inc(&obd_dirty_pages);
755         cli->cl_dirty += CFS_PAGE_SIZE;
756         cli->cl_avail_grant -= CFS_PAGE_SIZE;
757         pga->flag |= OBD_BRW_FROM_GRANT;
758         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759                CFS_PAGE_SIZE, pga, pga->pg);
760         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761                  cli->cl_avail_grant);
762         osc_update_next_shrink(cli);
763 }
764
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766  * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768                                     struct brw_page *pga, int sent)
769 {
770         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
771         ENTRY;
772
773         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
774                 EXIT;
775                 return;
776         }
777
778         pga->flag &= ~OBD_BRW_FROM_GRANT;
779         atomic_dec(&obd_dirty_pages);
780         cli->cl_dirty -= CFS_PAGE_SIZE;
781         if (!sent) {
782                 cli->cl_lost_grant += CFS_PAGE_SIZE;
783                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786                 /* For short writes we shouldn't count parts of pages that
787                  * span a whole block on the OST side, or our accounting goes
788                  * wrong.  Should match the code in filter_grant_check. */
789                 int offset = pga->off & ~CFS_PAGE_MASK;
790                 int count = pga->count + (offset & (blocksize - 1));
791                 int end = (offset + pga->count) & (blocksize - 1);
792                 if (end)
793                         count += blocksize - end;
794
795                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798                        cli->cl_avail_grant, cli->cl_dirty);
799         }
800
801         EXIT;
802 }
803
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
805 {
806         return cli->cl_r_in_flight + cli->cl_w_in_flight;
807 }
808
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
811 {
812         struct list_head *l, *tmp;
813         struct osc_cache_waiter *ocw;
814
815         ENTRY;
816         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817                 /* if we can't dirty more, we must wait until some is written */
818                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821                                "osc max %ld, sys max %d\n", cli->cl_dirty,
822                                cli->cl_dirty_max, obd_max_dirty_pages);
823                         return;
824                 }
825
826                 /* if still dirty cache but no grant wait for pending RPCs that
827                  * may yet return us some grant before doing sync writes */
828                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830                                cli->cl_w_in_flight);
831                         return;
832                 }
833
834                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835                 list_del_init(&ocw->ocw_entry);
836                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837                         /* no more RPCs in flight to return grant, do sync IO */
838                         ocw->ocw_rc = -EDQUOT;
839                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
840                 } else {
841                         osc_consume_write_grant(cli,
842                                                 &ocw->ocw_oap->oap_brw_page);
843                 }
844
845                 cfs_waitq_signal(&ocw->ocw_waitq);
846         }
847
848         EXIT;
849 }
850
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
852 {
853         client_obd_list_lock(&cli->cl_loi_list_lock);
854         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855         if (body->oa.o_valid & OBD_MD_FLGRANT)
856                 cli->cl_avail_grant += body->oa.o_grant;
857         /* waiters are woken in brw_interpret */
858         client_obd_list_unlock(&cli->cl_loi_list_lock);
859 }
860
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862                               void *key, obd_count vallen, void *val,
863                               struct ptlrpc_request_set *set);
864
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
866                                       void *data, int rc)
867 {
868         struct osc_grant_args *aa = data;
869         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870         struct obdo *oa = aa->aa_oa;
871         struct ost_body *body;
872
873         if (rc != 0) {
874                 client_obd_list_lock(&cli->cl_loi_list_lock);
875                 cli->cl_avail_grant += oa->o_grant;
876                 client_obd_list_unlock(&cli->cl_loi_list_lock);
877                 GOTO(out, rc);
878         }
879         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880                                 lustre_swab_ost_body);
881         osc_update_grant(cli, body);
882 out:
883         OBD_FREE_PTR(oa);
884         return rc;
885 }
886
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
888 {
889         client_obd_list_lock(&cli->cl_loi_list_lock);
890         oa->o_grant = cli->cl_avail_grant / 4;
891         cli->cl_avail_grant -= oa->o_grant;
892         client_obd_list_unlock(&cli->cl_loi_list_lock);
893         oa->o_flags |= OBD_FL_SHRINK_GRANT;
894         osc_update_next_shrink(cli);
895 }
896
897 /* Shrink the current grant, either from some large amount to enough for a
898  * full set of in-flight RPCs, or if we have already shrunk to that limit
899  * then to enough for a single RPC.  This avoids keeping more grant than
900  * needed, and avoids shrinking the grant piecemeal. */
901 static int osc_shrink_grant(struct client_obd *cli)
902 {
903         long target = (cli->cl_max_rpcs_in_flight + 1) *
904                       cli->cl_max_pages_per_rpc;
905
906         client_obd_list_lock(&cli->cl_loi_list_lock);
907         if (cli->cl_avail_grant <= target)
908                 target = cli->cl_max_pages_per_rpc;
909         client_obd_list_unlock(&cli->cl_loi_list_lock);
910
911         return osc_shrink_grant_to_target(cli, target);
912 }
913
914 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
915 {
916         int    rc = 0;
917         struct ost_body     *body;
918         ENTRY;
919
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         /* Don't shrink if we are already above or below the desired limit
922          * We don't want to shrink below a single RPC, as that will negatively
923          * impact block allocation and long-term performance. */
924         if (target < cli->cl_max_pages_per_rpc)
925                 target = cli->cl_max_pages_per_rpc;
926
927         if (target >= cli->cl_avail_grant) {
928                 client_obd_list_unlock(&cli->cl_loi_list_lock);
929                 RETURN(0);
930         }
931         client_obd_list_unlock(&cli->cl_loi_list_lock);
932
933         OBD_ALLOC_PTR(body);
934         if (!body)
935                 RETURN(-ENOMEM);
936
937         osc_announce_cached(cli, &body->oa, 0);
938
939         client_obd_list_lock(&cli->cl_loi_list_lock);
940         body->oa.o_grant = cli->cl_avail_grant - target;
941         cli->cl_avail_grant = target;
942         client_obd_list_unlock(&cli->cl_loi_list_lock);
943         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
944         osc_update_next_shrink(cli);
945
946         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
947                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
948                                 sizeof(*body), body, NULL);
949         if (rc) {
950                 client_obd_list_lock(&cli->cl_loi_list_lock);
951                 cli->cl_avail_grant += body->oa.o_grant;
952                 client_obd_list_unlock(&cli->cl_loi_list_lock);
953         }
954         OBD_FREE_PTR(body);
955         RETURN(rc);
956 }
957
958 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
959 static int osc_should_shrink_grant(struct client_obd *client)
960 {
961         cfs_time_t time = cfs_time_current();
962         cfs_time_t next_shrink = client->cl_next_shrink_grant;
963         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
964                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
965                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
966                         return 1;
967                 else
968                         osc_update_next_shrink(client);
969         }
970         return 0;
971 }
972
973 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
974 {
975         struct client_obd *client;
976
977         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
978                 if (osc_should_shrink_grant(client))
979                         osc_shrink_grant(client);
980         }
981         return 0;
982 }
983
984 static int osc_add_shrink_grant(struct client_obd *client)
985 {
986         int rc;
987
988         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
989                                        TIMEOUT_GRANT,
990                                        osc_grant_shrink_grant_cb, NULL,
991                                        &client->cl_grant_shrink_list);
992         if (rc) {
993                 CERROR("add grant client %s error %d\n",
994                         client->cl_import->imp_obd->obd_name, rc);
995                 return rc;
996         }
997         CDEBUG(D_CACHE, "add grant client %s \n",
998                client->cl_import->imp_obd->obd_name);
999         osc_update_next_shrink(client);
1000         return 0;
1001 }
1002
1003 static int osc_del_shrink_grant(struct client_obd *client)
1004 {
1005         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1006                                          TIMEOUT_GRANT);
1007 }
1008
1009 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1010 {
1011         /*
1012          * ocd_grant is the total grant amount we're expect to hold: if we'v
1013          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1014          * to 0 as inflight rpcs fail out; otherwise, it's avail_grant + dirty.
1015          *
1016          * race is tolerable here: if we're evicted, but imp_state already
1017          * left EVICTED state, then cl_diry must be 0 already.
1018          */
1019         client_obd_list_lock(&cli->cl_loi_list_lock);
1020         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1021                 cli->cl_avail_grant = ocd->ocd_grant;
1022         else
1023                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1024         client_obd_list_unlock(&cli->cl_loi_list_lock);
1025
1026         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1027                cli->cl_avail_grant, cli->cl_lost_grant);
1028         LASSERT(cli->cl_avail_grant >= 0);
1029
1030         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1031             list_empty(&cli->cl_grant_shrink_list))
1032                 osc_add_shrink_grant(cli);
1033 }
1034
1035 /* We assume that the reason this OSC got a short read is because it read
1036  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1037  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1038  * this stripe never got written at or beyond this stripe offset yet. */
1039 static void handle_short_read(int nob_read, obd_count page_count,
1040                               struct brw_page **pga, int pshift)
1041 {
1042         char *ptr;
1043         int i = 0;
1044
1045         /* skip bytes read OK */
1046         while (nob_read > 0) {
1047                 LASSERT (page_count > 0);
1048
1049                 if (pga[i]->count > nob_read) {
1050                         /* EOF inside this page */
1051                         ptr = cfs_kmap(pga[i]->pg) +
1052                               (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1053                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1054                         cfs_kunmap(pga[i]->pg);
1055                         page_count--;
1056                         i++;
1057                         break;
1058                 }
1059
1060                 nob_read -= pga[i]->count;
1061                 page_count--;
1062                 i++;
1063         }
1064
1065         /* zero remaining pages */
1066         while (page_count-- > 0) {
1067                 ptr = cfs_kmap(pga[i]->pg) +
1068                       (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1069                 memset(ptr, 0, pga[i]->count);
1070                 cfs_kunmap(pga[i]->pg);
1071                 i++;
1072         }
1073 }
1074
1075 static int check_write_rcs(struct ptlrpc_request *req,
1076                            int requested_nob, int niocount,
1077                            obd_count page_count, struct brw_page **pga)
1078 {
1079         int    *remote_rcs, i;
1080
1081         /* return error if any niobuf was in error */
1082         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1083                                         sizeof(*remote_rcs) * niocount, NULL);
1084         if (remote_rcs == NULL) {
1085                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1086                 return(-EPROTO);
1087         }
1088         if (lustre_rep_need_swab(req))
1089                 for (i = 0; i < niocount; i++)
1090                         __swab32s(&remote_rcs[i]);
1091
1092         for (i = 0; i < niocount; i++) {
1093                 if (remote_rcs[i] < 0)
1094                         return(remote_rcs[i]);
1095
1096                 if (remote_rcs[i] != 0) {
1097                         CERROR("rc[%d] invalid (%d) req %p\n",
1098                                 i, remote_rcs[i], req);
1099                         return(-EPROTO);
1100                 }
1101         }
1102
1103         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1104                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1105                        req->rq_bulk->bd_nob_transferred, requested_nob);
1106                 return(-EPROTO);
1107         }
1108
1109         return (0);
1110 }
1111
1112 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1113 {
1114         if (p1->flag != p2->flag) {
1115                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1116
1117                 /* warn if we try to combine flags that we don't know to be
1118                  * safe to combine */
1119                 if ((p1->flag & mask) != (p2->flag & mask))
1120                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1121                                "same brw?\n", p1->flag, p2->flag);
1122                 return 0;
1123         }
1124
1125         return (p1->off + p1->count == p2->off);
1126 }
1127
1128 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1129                                    struct brw_page **pga, int opc,
1130                                    cksum_type_t cksum_type, int pshift)
1131 {
1132         __u32 cksum;
1133         int i = 0;
1134
1135         LASSERT (pg_count > 0);
1136         cksum = init_checksum(cksum_type);
1137         while (nob > 0 && pg_count > 0) {
1138                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1139                 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1140                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1141
1142                 /* corrupt the data before we compute the checksum, to
1143                  * simulate an OST->client data error */
1144                 if (i == 0 && opc == OST_READ &&
1145                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1146                         memcpy(ptr + off, "bad1", min(4, nob));
1147                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1148                 cfs_kunmap(pga[i]->pg);
1149                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1150                                off, cksum);
1151
1152                 nob -= pga[i]->count;
1153                 pg_count--;
1154                 i++;
1155         }
1156         /* For sending we only compute the wrong checksum instead
1157          * of corrupting the data so it is still correct on a redo */
1158         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1159                 cksum++;
1160
1161         return cksum;
1162 }
1163
1164 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1165                                 struct lov_stripe_md *lsm, obd_count page_count,
1166                                 struct brw_page **pga,
1167                                 struct ptlrpc_request **reqp, int pshift)
1168 {
1169         struct ptlrpc_request   *req;
1170         struct ptlrpc_bulk_desc *desc;
1171         struct ost_body         *body;
1172         struct obd_ioobj        *ioobj;
1173         struct niobuf_remote    *niobuf;
1174         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1175         int niocount, i, requested_nob, opc, rc;
1176         struct ptlrpc_request_pool *pool;
1177         struct osc_brw_async_args *aa;
1178         struct brw_page *pg_prev;
1179
1180         ENTRY;
1181         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1182         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1183
1184         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1185         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1186
1187         for (niocount = i = 1; i < page_count; i++) {
1188                 if (!can_merge_pages(pga[i - 1], pga[i]))
1189                         niocount++;
1190         }
1191
1192         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1193         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1194
1195         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1196                                    NULL, pool);
1197         if (req == NULL)
1198                 RETURN (-ENOMEM);
1199
1200         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1201         ptlrpc_at_set_req_timeout(req);
1202
1203         if (opc == OST_WRITE)
1204                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1205                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1206         else
1207                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1208                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1209         if (desc == NULL)
1210                 GOTO(out, rc = -ENOMEM);
1211         /* NB request now owns desc and will free it when it gets freed */
1212
1213         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1214         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1215         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1216                                 niocount * sizeof(*niobuf));
1217
1218         lustre_set_wire_obdo(&body->oa, oa);
1219         obdo_to_ioobj(oa, ioobj);
1220         ioobj->ioo_bufcnt = niocount;
1221
1222         LASSERT (page_count > 0);
1223         pg_prev = pga[0];
1224         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1225                 struct brw_page *pg = pga[i];
1226
1227                 LASSERT(pg->count > 0);
1228                 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1229                          pg->count <= CFS_PAGE_SIZE,
1230                          "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1231                          i, pg, pg->off, pg->count, pshift);
1232 #ifdef __linux__
1233                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1234                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1235                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1236                          i, page_count,
1237                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1238                          pg_prev->pg, page_private(pg_prev->pg),
1239                          pg_prev->pg->index, pg_prev->off);
1240 #else
1241                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1242                          "i %d p_c %u\n", i, page_count);
1243 #endif
1244                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1245                         (pg->flag & OBD_BRW_SRVLOCK));
1246
1247                 ptlrpc_prep_bulk_page(desc, pg->pg,
1248                                       OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1249                                       pg->count);
1250                 requested_nob += pg->count;
1251
1252                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1253                         niobuf--;
1254                         niobuf->len += pg->count;
1255                 } else {
1256                         niobuf->offset = pg->off;
1257                         niobuf->len    = pg->count;
1258                         niobuf->flags  = pg->flag;
1259                 }
1260                 pg_prev = pg;
1261         }
1262
1263         LASSERTF((void *)(niobuf - niocount) ==
1264                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1265                                niocount * sizeof(*niobuf)),
1266                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1267                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1268                 (void *)(niobuf - niocount));
1269
1270         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1271         if (osc_should_shrink_grant(cli))
1272                 osc_shrink_grant_local(cli, &body->oa);
1273
1274         /* size[REQ_REC_OFF] still sizeof (*body) */
1275         if (opc == OST_WRITE) {
1276                 if (cli->cl_checksum) {
1277                         /* store cl_cksum_type in a local variable since
1278                          * it can be changed via lprocfs */
1279                         cksum_type_t cksum_type = cli->cl_cksum_type;
1280
1281                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1282                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1283                                 body->oa.o_flags = 0;
1284                         }
1285                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1286                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1287                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1288                                                              page_count, pga,
1289                                                              OST_WRITE,
1290                                                              cksum_type, pshift);
1291                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1292                                body->oa.o_cksum);
1293                         /* save this in 'oa', too, for later checking */
1294                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1295                         oa->o_flags |= cksum_type_pack(cksum_type);
1296                 } else {
1297                         /* clear out the checksum flag, in case this is a
1298                          * resend but cl_checksum is no longer set. b=11238 */
1299                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1300                 }
1301                 oa->o_cksum = body->oa.o_cksum;
1302                 /* 1 RC per niobuf */
1303                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1304                 ptlrpc_req_set_repsize(req, 3, size);
1305         } else {
1306                 if (cli->cl_checksum) {
1307                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1308                                 body->oa.o_flags = 0;
1309                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1310                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1311                 }
1312                 /* 1 RC for the whole I/O */
1313                 ptlrpc_req_set_repsize(req, 2, size);
1314         }
1315
1316         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1317         aa = ptlrpc_req_async_args(req);
1318         aa->aa_oa = oa;
1319         aa->aa_requested_nob = requested_nob;
1320         aa->aa_nio_count = niocount;
1321         aa->aa_page_count = page_count;
1322         aa->aa_resends = 0;
1323         aa->aa_ppga = pga;
1324         aa->aa_cli = cli;
1325         aa->aa_pshift = pshift;
1326         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1327
1328         *reqp = req;
1329         RETURN (0);
1330
1331  out:
1332         ptlrpc_req_finished (req);
1333         RETURN (rc);
1334 }
1335
1336 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1337                                 __u32 client_cksum, __u32 server_cksum, int nob,
1338                                 obd_count page_count, struct brw_page **pga,
1339                                 cksum_type_t client_cksum_type, int pshift)
1340 {
1341         __u32 new_cksum;
1342         char *msg;
1343         cksum_type_t cksum_type;
1344
1345         if (server_cksum == client_cksum) {
1346                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1347                 return 0;
1348         }
1349
1350         if (oa->o_valid & OBD_MD_FLFLAGS)
1351                 cksum_type = cksum_type_unpack(oa->o_flags);
1352         else
1353                 cksum_type = OBD_CKSUM_CRC32;
1354
1355         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1356                                       cksum_type, pshift);
1357
1358         if (cksum_type != client_cksum_type)
1359                 msg = "the server did not use the checksum type specified in "
1360                       "the original request - likely a protocol problem";
1361         else if (new_cksum == server_cksum)
1362                 msg = "changed on the client after we checksummed it - "
1363                       "likely false positive due to mmap IO (bug 11742)";
1364         else if (new_cksum == client_cksum)
1365                 msg = "changed in transit before arrival at OST";
1366         else
1367                 msg = "changed in transit AND doesn't match the original - "
1368                       "likely false positive due to mmap IO (bug 11742)";
1369
1370         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1371                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1372                            "["LPU64"-"LPU64"]\n",
1373                            msg, libcfs_nid2str(peer->nid),
1374                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1375                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1376                                                         (__u64)0,
1377                            oa->o_id,
1378                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1379                            pga[0]->off,
1380                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1381         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1382                "client csum now %x\n", client_cksum, client_cksum_type,
1383                server_cksum, cksum_type, new_cksum);
1384
1385         return 1;
1386 }
1387
1388 /* Note rc enters this function as number of bytes transferred */
1389 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1390 {
1391         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1392         const lnet_process_id_t *peer =
1393                         &req->rq_import->imp_connection->c_peer;
1394         struct client_obd *cli = aa->aa_cli;
1395         struct ost_body *body;
1396         __u32 client_cksum = 0;
1397         ENTRY;
1398
1399         if (rc < 0 && rc != -EDQUOT)
1400                 RETURN(rc);
1401
1402         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1403         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1404                                   lustre_swab_ost_body);
1405         if (body == NULL) {
1406                 CERROR ("Can't unpack body\n");
1407                 RETURN(-EPROTO);
1408         }
1409
1410         /* set/clear over quota flag for a uid/gid */
1411         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1412             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1413                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1414                              body->oa.o_gid, body->oa.o_valid,
1415                              body->oa.o_flags);
1416
1417         if (rc < 0)
1418                 RETURN(rc);
1419
1420         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1421                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1422
1423         osc_update_grant(cli, body);
1424
1425         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1426                 if (rc > 0) {
1427                         CERROR ("Unexpected +ve rc %d\n", rc);
1428                         RETURN(-EPROTO);
1429                 }
1430                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1431
1432                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1433                     check_write_checksum(&body->oa, peer, client_cksum,
1434                                          body->oa.o_cksum, aa->aa_requested_nob,
1435                                          aa->aa_page_count, aa->aa_ppga,
1436                                          cksum_type_unpack(aa->aa_oa->o_flags),
1437                                          aa->aa_pshift))
1438                         RETURN(-EAGAIN);
1439
1440                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1441                                      aa->aa_page_count, aa->aa_ppga);
1442                 GOTO(out, rc);
1443         }
1444
1445         /* The rest of this function executes only for OST_READs */
1446         if (rc > aa->aa_requested_nob) {
1447                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1448                        aa->aa_requested_nob);
1449                 RETURN(-EPROTO);
1450         }
1451
1452         if (rc != req->rq_bulk->bd_nob_transferred) {
1453                 CERROR ("Unexpected rc %d (%d transferred)\n",
1454                         rc, req->rq_bulk->bd_nob_transferred);
1455                 return (-EPROTO);
1456         }
1457
1458         if (rc < aa->aa_requested_nob)
1459                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1460
1461         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1462                 static int cksum_counter;
1463                 __u32      server_cksum = body->oa.o_cksum;
1464                 char      *via;
1465                 char      *router;
1466                 cksum_type_t cksum_type;
1467
1468                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1469                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1470                 else
1471                         cksum_type = OBD_CKSUM_CRC32;
1472                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1473                                                  aa->aa_ppga, OST_READ,
1474                                                  cksum_type, aa->aa_pshift);
1475
1476                 if (peer->nid == req->rq_bulk->bd_sender) {
1477                         via = router = "";
1478                 } else {
1479                         via = " via ";
1480                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1481                 }
1482
1483                 if (server_cksum == ~0 && rc > 0) {
1484                         CERROR("Protocol error: server %s set the 'checksum' "
1485                                "bit, but didn't send a checksum.  Not fatal, "
1486                                "but please notify on http://bugzilla.lustre.org/\n",
1487                                libcfs_nid2str(peer->nid));
1488                 } else if (server_cksum != client_cksum) {
1489                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1490                                            "%s%s%s inum "LPU64"/"LPU64" object "
1491                                            LPU64"/"LPU64" extent "
1492                                            "["LPU64"-"LPU64"]\n",
1493                                            req->rq_import->imp_obd->obd_name,
1494                                            libcfs_nid2str(peer->nid),
1495                                            via, router,
1496                                            body->oa.o_valid & OBD_MD_FLFID ?
1497                                                 body->oa.o_fid : (__u64)0,
1498                                            body->oa.o_valid & OBD_MD_FLFID ?
1499                                                 body->oa.o_generation :(__u64)0,
1500                                            body->oa.o_id,
1501                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1502                                                 body->oa.o_gr : (__u64)0,
1503                                            aa->aa_ppga[0]->off,
1504                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1505                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1506                                                                         1);
1507                         CERROR("client %x, server %x, cksum_type %x\n",
1508                                client_cksum, server_cksum, cksum_type);
1509                         cksum_counter = 0;
1510                         aa->aa_oa->o_cksum = client_cksum;
1511                         rc = -EAGAIN;
1512                 } else {
1513                         cksum_counter++;
1514                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1515                         rc = 0;
1516                 }
1517         } else if (unlikely(client_cksum)) {
1518                 static int cksum_missed;
1519
1520                 cksum_missed++;
1521                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1522                         CERROR("Checksum %u requested from %s but not sent\n",
1523                                cksum_missed, libcfs_nid2str(peer->nid));
1524         } else {
1525                 rc = 0;
1526         }
1527 out:
1528         if (rc >= 0)
1529                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1530
1531         RETURN(rc);
1532 }
1533
1534 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1535                             struct lov_stripe_md *lsm,
1536                             obd_count page_count, struct brw_page **pga)
1537 {
1538         struct ptlrpc_request *request;
1539         int                    rc;
1540         cfs_waitq_t            waitq;
1541         int                    resends = 0;
1542         struct l_wait_info     lwi;
1543
1544         ENTRY;
1545         init_waitqueue_head(&waitq);
1546
1547 restart_bulk:
1548         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1549                                   page_count, pga, &request, 0);
1550         if (rc != 0)
1551                 return (rc);
1552
1553         rc = ptlrpc_queue_wait(request);
1554
1555         if (rc == -ETIMEDOUT && request->rq_resend) {
1556                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1557                 ptlrpc_req_finished(request);
1558                 goto restart_bulk;
1559         }
1560
1561         rc = osc_brw_fini_request(request, rc);
1562
1563         ptlrpc_req_finished(request);
1564         if (osc_recoverable_error(rc)) {
1565                 resends++;
1566                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1567                         CERROR("too many resend retries, returning error\n");
1568                         RETURN(-EIO);
1569                 }
1570
1571                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1572                 l_wait_event(waitq, 0, &lwi);
1573
1574                 goto restart_bulk;
1575         }
1576         RETURN(rc);
1577 }
1578
1579 int osc_brw_redo_request(struct ptlrpc_request *request,
1580                          struct osc_brw_async_args *aa)
1581 {
1582         struct ptlrpc_request *new_req;
1583         struct ptlrpc_request_set *set = request->rq_set;
1584         struct osc_brw_async_args *new_aa;
1585         struct osc_async_page *oap;
1586         int rc = 0;
1587         ENTRY;
1588
1589         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1590                 CERROR("too many resend retries, returning error\n");
1591                 RETURN(-EIO);
1592         }
1593
1594         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1595
1596         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1597                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1598                                   aa->aa_cli, aa->aa_oa,
1599                                   NULL /* lsm unused by osc currently */,
1600                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1601                                   aa->aa_pshift);
1602         if (rc)
1603                 RETURN(rc);
1604
1605         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1606
1607         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1608                 if (oap->oap_request != NULL) {
1609                         LASSERTF(request == oap->oap_request,
1610                                  "request %p != oap_request %p\n",
1611                                  request, oap->oap_request);
1612                         if (oap->oap_interrupted) {
1613                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1614                                 ptlrpc_req_finished(new_req);
1615                                 RETURN(-EINTR);
1616                         }
1617                 }
1618         }
1619         /* New request takes over pga and oaps from old request.
1620          * Note that copying a list_head doesn't work, need to move it... */
1621         aa->aa_resends++;
1622         new_req->rq_interpret_reply = request->rq_interpret_reply;
1623         new_req->rq_async_args = request->rq_async_args;
1624         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1625
1626         new_aa = ptlrpc_req_async_args(new_req);
1627
1628         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1629         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1630         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1631
1632         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1633                 if (oap->oap_request) {
1634                         ptlrpc_req_finished(oap->oap_request);
1635                         oap->oap_request = ptlrpc_request_addref(new_req);
1636                 }
1637         }
1638
1639         /* use ptlrpc_set_add_req is safe because interpret functions work
1640          * in check_set context. only one way exist with access to request
1641          * from different thread got -EINTR - this way protected with
1642          * cl_loi_list_lock */
1643         ptlrpc_set_add_req(set, new_req);
1644
1645         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1646
1647         DEBUG_REQ(D_INFO, new_req, "new request");
1648         RETURN(0);
1649 }
1650
1651 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1652                           struct lov_stripe_md *lsm, obd_count page_count,
1653                           struct brw_page **pga, struct ptlrpc_request_set *set,
1654                           int pshift)
1655 {
1656         struct ptlrpc_request     *request;
1657         struct client_obd         *cli = &exp->exp_obd->u.cli;
1658         int                        rc, i;
1659         struct osc_brw_async_args *aa;
1660         ENTRY;
1661
1662         /* Consume write credits even if doing a sync write -
1663          * otherwise we may run out of space on OST due to grant. */
1664         /* FIXME: unaligned writes must use write grants too */
1665         if (cmd == OBD_BRW_WRITE && pshift == 0) {
1666                 client_obd_list_lock(&cli->cl_loi_list_lock);
1667                 for (i = 0; i < page_count; i++) {
1668                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1669                                 osc_consume_write_grant(cli, pga[i]);
1670                 }
1671                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1672         }
1673
1674         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1675                                   page_count, pga, &request, pshift);
1676
1677         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1678
1679         if (rc == 0) {
1680                 aa = ptlrpc_req_async_args(request);
1681                 if (cmd == OBD_BRW_READ) {
1682                         lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1683                         lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1684                 } else {
1685                         lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1686                         lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1687                                          cli->cl_w_in_flight);
1688                 }
1689                 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1690
1691                 LASSERT(list_empty(&aa->aa_oaps));
1692
1693                 request->rq_interpret_reply = brw_interpret;
1694                 ptlrpc_set_add_req(set, request);
1695                 client_obd_list_lock(&cli->cl_loi_list_lock);
1696                 if (cmd == OBD_BRW_READ)
1697                         cli->cl_r_in_flight++;
1698                 else
1699                         cli->cl_w_in_flight++;
1700                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1701                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1702         } else if (cmd == OBD_BRW_WRITE) {
1703                 client_obd_list_lock(&cli->cl_loi_list_lock);
1704                 for (i = 0; i < page_count; i++)
1705                         osc_release_write_grant(cli, pga[i], 0);
1706                 osc_wake_cache_waiters(cli);
1707                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1708         }
1709
1710         RETURN (rc);
1711 }
1712
1713 /*
1714  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1715  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1716  * fine for our small page arrays and doesn't require allocation.  its an
1717  * insertion sort that swaps elements that are strides apart, shrinking the
1718  * stride down until its '1' and the array is sorted.
1719  */
1720 static void sort_brw_pages(struct brw_page **array, int num)
1721 {
1722         int stride, i, j;
1723         struct brw_page *tmp;
1724
1725         if (num == 1)
1726                 return;
1727         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1728                 ;
1729
1730         do {
1731                 stride /= 3;
1732                 for (i = stride ; i < num ; i++) {
1733                         tmp = array[i];
1734                         j = i;
1735                         while (j >= stride && array[j-stride]->off > tmp->off) {
1736                                 array[j] = array[j - stride];
1737                                 j -= stride;
1738                         }
1739                         array[j] = tmp;
1740                 }
1741         } while (stride > 1);
1742 }
1743
1744 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1745                                         int pshift)
1746 {
1747         int count = 1;
1748         int offset;
1749         int i = 0;
1750
1751         LASSERT (pages > 0);
1752         offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1753
1754         for (;;) {
1755                 pages--;
1756                 if (pages == 0)         /* that's all */
1757                         return count;
1758
1759                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1760                         return count;   /* doesn't end on page boundary */
1761
1762                 i++;
1763                 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1764                 if (offset != 0)        /* doesn't start on page boundary */
1765                         return count;
1766
1767                 count++;
1768         }
1769 }
1770
1771 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1772 {
1773         struct brw_page **ppga;
1774         int i;
1775
1776         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1777         if (ppga == NULL)
1778                 return NULL;
1779
1780         for (i = 0; i < count; i++)
1781                 ppga[i] = pga + i;
1782         return ppga;
1783 }
1784
1785 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1786 {
1787         LASSERT(ppga != NULL);
1788         OBD_FREE(ppga, sizeof(*ppga) * count);
1789 }
1790
1791 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1792                    obd_count page_count, struct brw_page *pga,
1793                    struct obd_trans_info *oti)
1794 {
1795         struct obdo *saved_oa = NULL;
1796         struct brw_page **ppga, **orig;
1797         struct obd_import *imp = class_exp2cliimp(exp);
1798         struct client_obd *cli;
1799         int rc, page_count_orig;
1800         ENTRY;
1801
1802         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1803         cli = &imp->imp_obd->u.cli;
1804
1805         if (cmd & OBD_BRW_CHECK) {
1806                 /* The caller just wants to know if there's a chance that this
1807                  * I/O can succeed */
1808
1809                 if (imp->imp_invalid)
1810                         RETURN(-EIO);
1811                 RETURN(0);
1812         }
1813
1814         /* test_brw with a failed create can trip this, maybe others. */
1815         LASSERT(cli->cl_max_pages_per_rpc);
1816
1817         rc = 0;
1818
1819         orig = ppga = osc_build_ppga(pga, page_count);
1820         if (ppga == NULL)
1821                 RETURN(-ENOMEM);
1822         page_count_orig = page_count;
1823
1824         sort_brw_pages(ppga, page_count);
1825         while (page_count) {
1826                 obd_count pages_per_brw;
1827
1828                 if (page_count > cli->cl_max_pages_per_rpc)
1829                         pages_per_brw = cli->cl_max_pages_per_rpc;
1830                 else
1831                         pages_per_brw = page_count;
1832
1833                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1834
1835                 if (saved_oa != NULL) {
1836                         /* restore previously saved oa */
1837                         *oinfo->oi_oa = *saved_oa;
1838                 } else if (page_count > pages_per_brw) {
1839                         /* save a copy of oa (brw will clobber it) */
1840                         OBDO_ALLOC(saved_oa);
1841                         if (saved_oa == NULL)
1842                                 GOTO(out, rc = -ENOMEM);
1843                         *saved_oa = *oinfo->oi_oa;
1844                 }
1845
1846                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1847                                       pages_per_brw, ppga);
1848
1849                 if (rc != 0)
1850                         break;
1851
1852                 page_count -= pages_per_brw;
1853                 ppga += pages_per_brw;
1854         }
1855
1856 out:
1857         osc_release_ppga(orig, page_count_orig);
1858
1859         if (saved_oa != NULL)
1860                 OBDO_FREE(saved_oa);
1861
1862         RETURN(rc);
1863 }
1864
1865 static int osc_brw_async(int cmd, struct obd_export *exp,
1866                          struct obd_info *oinfo, obd_count page_count,
1867                          struct brw_page *pga, struct obd_trans_info *oti,
1868                          struct ptlrpc_request_set *set, int pshift)
1869 {
1870         struct brw_page **ppga, **orig;
1871         int page_count_orig;
1872         int rc = 0;
1873         ENTRY;
1874
1875         if (cmd & OBD_BRW_CHECK) {
1876                 /* The caller just wants to know if there's a chance that this
1877                  * I/O can succeed */
1878                 struct obd_import *imp = class_exp2cliimp(exp);
1879
1880                 if (imp == NULL || imp->imp_invalid)
1881                         RETURN(-EIO);
1882                 RETURN(0);
1883         }
1884
1885         orig = ppga = osc_build_ppga(pga, page_count);
1886         if (ppga == NULL)
1887                 RETURN(-ENOMEM);
1888         page_count_orig = page_count;
1889
1890         sort_brw_pages(ppga, page_count);
1891         while (page_count) {
1892                 struct brw_page **copy;
1893                 struct obdo *oa;
1894                 obd_count pages_per_brw;
1895
1896                 /* one page less under unaligned direct i/o */
1897                 pages_per_brw = min_t(obd_count, page_count,
1898                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1899                                       !!pshift);
1900
1901                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1902                                                        pshift);
1903
1904                 /* use ppga only if single RPC is going to fly */
1905                 if (pages_per_brw != page_count_orig || ppga != orig) {
1906                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1907                         if (copy == NULL)
1908                                 GOTO(out, rc = -ENOMEM);
1909                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1910
1911                         OBDO_ALLOC(oa);
1912                         if (oa == NULL) {
1913                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1914                                 GOTO(out, rc = -ENOMEM);
1915                         }
1916                         memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1917                         oa->o_flags |= OBD_FL_TEMPORARY;
1918                 } else {
1919                         copy = ppga;
1920                         oa = oinfo->oi_oa;
1921                         LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1922                 }
1923
1924                 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1925                                     copy, set, pshift);
1926
1927                 if (rc != 0) {
1928                         if (copy != ppga)
1929                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1930
1931                         if (oa->o_flags & OBD_FL_TEMPORARY)
1932                                 OBDO_FREE(oa);
1933                         break;
1934                 }
1935
1936                 if (copy == orig) {
1937                         /* we passed it to async_internal() which is
1938                          * now responsible for releasing memory */
1939                         orig = NULL;
1940                 }
1941
1942                 page_count -= pages_per_brw;
1943                 ppga += pages_per_brw;
1944         }
1945 out:
1946         if (orig)
1947                 osc_release_ppga(orig, page_count_orig);
1948         RETURN(rc);
1949 }
1950
1951 static void osc_check_rpcs(struct client_obd *cli);
1952
1953 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1954  * the dirty accounting.  Writeback completes or truncate happens before
1955  * writing starts.  Must be called with the loi lock held. */
1956 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1957                            int sent)
1958 {
1959         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1960 }
1961
1962 /* This maintains the lists of pending pages to read/write for a given object
1963  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964  * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1966                          int cmd)
1967 {
1968         int optimal;
1969         ENTRY;
1970
1971         if (lop->lop_num_pending == 0)
1972                 RETURN(0);
1973
1974         /* if we have an invalid import we want to drain the queued pages
1975          * by forcing them through rpcs that immediately fail and complete
1976          * the pages.  recovery relies on this to empty the queued pages
1977          * before canceling the locks and evicting down the llite pages */
1978         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1979                 RETURN(1);
1980
1981         /* stream rpcs in queue order as long as as there is an urgent page
1982          * queued.  this is our cheap solution for good batching in the case
1983          * where writepage marks some random page in the middle of the file
1984          * as urgent because of, say, memory pressure */
1985         if (!list_empty(&lop->lop_urgent)) {
1986                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1987                 RETURN(1);
1988         }
1989
1990         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1991         optimal = cli->cl_max_pages_per_rpc;
1992         if (cmd & OBD_BRW_WRITE) {
1993                 /* trigger a write rpc stream as long as there are dirtiers
1994                  * waiting for space.  as they're waiting, they're not going to
1995                  * create more pages to coallesce with what's waiting.. */
1996                 if (!list_empty(&cli->cl_cache_waiters)) {
1997                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1998                         RETURN(1);
1999                 }
2000
2001                 /* +16 to avoid triggering rpcs that would want to include pages
2002                  * that are being queued but which can't be made ready until
2003                  * the queuer finishes with the page. this is a wart for
2004                  * llite::commit_write() */
2005                 optimal += 16;
2006         }
2007         if (lop->lop_num_pending >= optimal)
2008                 RETURN(1);
2009
2010         RETURN(0);
2011 }
2012
2013 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2014 {
2015         struct osc_async_page *oap;
2016         ENTRY;
2017
2018         if (list_empty(&lop->lop_urgent))
2019                 RETURN(0);
2020
2021         oap = list_entry(lop->lop_urgent.next,
2022                          struct osc_async_page, oap_urgent_item);
2023
2024         if (oap->oap_async_flags & ASYNC_HP) {
2025                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2026                 RETURN(1);
2027         }
2028
2029         RETURN(0);
2030 }
2031
2032 static void on_list(struct list_head *item, struct list_head *list,
2033                     int should_be_on)
2034 {
2035         if (list_empty(item) && should_be_on)
2036                 list_add_tail(item, list);
2037         else if (!list_empty(item) && !should_be_on)
2038                 list_del_init(item);
2039 }
2040
2041 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2042  * can find pages to build into rpcs quickly */
2043 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2044 {
2045         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2046             lop_makes_hprpc(&loi->loi_read_lop)) {
2047                 /* HP rpc */
2048                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2049                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2050         } else {
2051                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2052                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2053                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2054                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2055         }
2056
2057         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2058                 loi->loi_write_lop.lop_num_pending);
2059
2060         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2061                 loi->loi_read_lop.lop_num_pending);
2062 }
2063
2064 static void lop_update_pending(struct client_obd *cli,
2065                                struct loi_oap_pages *lop, int cmd, int delta)
2066 {
2067         lop->lop_num_pending += delta;
2068         if (cmd & OBD_BRW_WRITE)
2069                 cli->cl_pending_w_pages += delta;
2070         else
2071                 cli->cl_pending_r_pages += delta;
2072 }
2073
2074 /* this is called when a sync waiter receives an interruption.  Its job is to
2075  * get the caller woken as soon as possible.  If its page hasn't been put in an
2076  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2077  * desiring interruption which will forcefully complete the rpc once the rpc
2078  * has timed out */
2079 static void osc_occ_interrupted(struct oig_callback_context *occ)
2080 {
2081         struct osc_async_page *oap;
2082         struct loi_oap_pages *lop;
2083         struct lov_oinfo *loi;
2084         ENTRY;
2085
2086         /* XXX member_of() */
2087         oap = list_entry(occ, struct osc_async_page, oap_occ);
2088
2089         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2090
2091         oap->oap_interrupted = 1;
2092
2093         /* ok, it's been put in an rpc. only one oap gets a request reference */
2094         if (oap->oap_request != NULL) {
2095                 ptlrpc_mark_interrupted(oap->oap_request);
2096                 ptlrpcd_wake(oap->oap_request);
2097                 GOTO(unlock, 0);
2098         }
2099
2100         /* we don't get interruption callbacks until osc_trigger_group_io()
2101          * has been called and put the sync oaps in the pending/urgent lists.*/
2102         if (!list_empty(&oap->oap_pending_item)) {
2103                 list_del_init(&oap->oap_pending_item);
2104                 list_del_init(&oap->oap_urgent_item);
2105
2106                 loi = oap->oap_loi;
2107                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2108                         &loi->loi_write_lop : &loi->loi_read_lop;
2109                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2110                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2111
2112                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2113                 oap->oap_oig = NULL;
2114         }
2115
2116 unlock:
2117         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2118 }
2119
2120 /* this is trying to propogate async writeback errors back up to the
2121  * application.  As an async write fails we record the error code for later if
2122  * the app does an fsync.  As long as errors persist we force future rpcs to be
2123  * sync so that the app can get a sync error and break the cycle of queueing
2124  * pages for which writeback will fail. */
2125 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2126                            int rc)
2127 {
2128         if (rc) {
2129                 if (!ar->ar_rc)
2130                         ar->ar_rc = rc;
2131
2132                 ar->ar_force_sync = 1;
2133                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2134                 return;
2135
2136         }
2137
2138         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2139                 ar->ar_force_sync = 0;
2140 }
2141
2142 static void osc_oap_to_pending(struct osc_async_page *oap)
2143 {
2144         struct loi_oap_pages *lop;
2145
2146         if (oap->oap_cmd & OBD_BRW_WRITE)
2147                 lop = &oap->oap_loi->loi_write_lop;
2148         else
2149                 lop = &oap->oap_loi->loi_read_lop;
2150
2151         if (oap->oap_async_flags & ASYNC_HP)
2152                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2153         else if (oap->oap_async_flags & ASYNC_URGENT)
2154                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2155         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2156         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2157 }
2158
2159 /* this must be called holding the loi list lock to give coverage to exit_cache,
2160  * async_flag maintenance, and oap_request */
2161 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2162                               struct osc_async_page *oap, int sent, int rc)
2163 {
2164         __u64 xid = 0;
2165
2166         ENTRY;
2167         if (oap->oap_request != NULL) {
2168                 xid = ptlrpc_req_xid(oap->oap_request);
2169                 ptlrpc_req_finished(oap->oap_request);
2170                 oap->oap_request = NULL;
2171         }
2172
2173         spin_lock(&oap->oap_lock);
2174         oap->oap_async_flags = 0;
2175         spin_unlock(&oap->oap_lock);
2176         oap->oap_interrupted = 0;
2177
2178         if (oap->oap_cmd & OBD_BRW_WRITE) {
2179                 osc_process_ar(&cli->cl_ar, xid, rc);
2180                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2181         }
2182
2183         if (rc == 0 && oa != NULL) {
2184                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2185                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2186                 if (oa->o_valid & OBD_MD_FLMTIME)
2187                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2188                 if (oa->o_valid & OBD_MD_FLATIME)
2189                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2190                 if (oa->o_valid & OBD_MD_FLCTIME)
2191                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2192         }
2193
2194         if (oap->oap_oig) {
2195                 osc_exit_cache(cli, oap, sent);
2196                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2197                 oap->oap_oig = NULL;
2198                 EXIT;
2199                 return;
2200         }
2201
2202         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2203                                                 oap->oap_cmd, oa, rc);
2204
2205         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2206          * I/O on the page could start, but OSC calls it under lock
2207          * and thus we can add oap back to pending safely */
2208         if (rc)
2209                 /* upper layer wants to leave the page on pending queue */
2210                 osc_oap_to_pending(oap);
2211         else
2212                 osc_exit_cache(cli, oap, sent);
2213         EXIT;
2214 }
2215
2216 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2217 {
2218         struct osc_brw_async_args *aa = data;
2219         struct client_obd *cli;
2220         ENTRY;
2221
2222         rc = osc_brw_fini_request(request, rc);
2223         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2224
2225         if (osc_recoverable_error(rc)) {
2226                 rc = osc_brw_redo_request(request, aa);
2227                 if (rc == 0)
2228                         RETURN(0);
2229         }
2230
2231         cli = aa->aa_cli;
2232         client_obd_list_lock(&cli->cl_loi_list_lock);
2233         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2234          * is called so we know whether to go to sync BRWs or wait for more
2235          * RPCs to complete */
2236         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2237                 cli->cl_w_in_flight--;
2238         else
2239                 cli->cl_r_in_flight--;
2240
2241         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2242                 struct osc_async_page *oap, *tmp;
2243                 /* the caller may re-use the oap after the completion call so
2244                  * we need to clean it up a little */
2245                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2246                         list_del_init(&oap->oap_rpc_item);
2247                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2248                 }
2249                 OBDO_FREE(aa->aa_oa);
2250         } else { /* from async_internal() */
2251                 obd_count i;
2252                 for (i = 0; i < aa->aa_page_count; i++)
2253                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2254
2255                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2256                         OBDO_FREE(aa->aa_oa);
2257         }
2258         osc_wake_cache_waiters(cli);
2259         osc_check_rpcs(cli);
2260         client_obd_list_unlock(&cli->cl_loi_list_lock);
2261
2262         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2263
2264         RETURN(rc);
2265 }
2266
2267 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2268                                             struct list_head *rpc_list,
2269                                             int page_count, int cmd)
2270 {
2271         struct ptlrpc_request *req;
2272         struct brw_page **pga = NULL;
2273         struct osc_brw_async_args *aa;
2274         struct obdo *oa = NULL;
2275         struct obd_async_page_ops *ops = NULL;
2276         void *caller_data = NULL;
2277         struct osc_async_page *oap;
2278         struct ldlm_lock *lock = NULL;
2279         obd_valid valid;
2280         int i, rc;
2281
2282         ENTRY;
2283         LASSERT(!list_empty(rpc_list));
2284
2285         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2286         if (pga == NULL)
2287                 RETURN(ERR_PTR(-ENOMEM));
2288
2289         OBDO_ALLOC(oa);
2290         if (oa == NULL)
2291                 GOTO(out, req = ERR_PTR(-ENOMEM));
2292
2293         i = 0;
2294         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2295                 if (ops == NULL) {
2296                         ops = oap->oap_caller_ops;
2297                         caller_data = oap->oap_caller_data;
2298                         lock = oap->oap_ldlm_lock;
2299                 }
2300                 pga[i] = &oap->oap_brw_page;
2301                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2302                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2303                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2304                 i++;
2305         }
2306
2307         /* always get the data for the obdo for the rpc */
2308         LASSERT(ops != NULL);
2309         ops->ap_fill_obdo(caller_data, cmd, oa);
2310         if (lock) {
2311                 oa->o_handle = lock->l_remote_handle;
2312                 oa->o_valid |= OBD_MD_FLHANDLE;
2313         }
2314
2315         sort_brw_pages(pga, page_count);
2316         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2317         if (rc != 0) {
2318                 CERROR("prep_req failed: %d\n", rc);
2319                 GOTO(out, req = ERR_PTR(rc));
2320         }
2321         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2322                                                  sizeof(struct ost_body)))->oa;
2323
2324         /* Need to update the timestamps after the request is built in case
2325          * we race with setattr (locally or in queue at OST).  If OST gets
2326          * later setattr before earlier BRW (as determined by the request xid),
2327          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2328          * way to do this in a single call.  bug 10150 */
2329         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2330                 /* in case of lockless read/write do not use inode's
2331                  * timestamps because concurrent stat might fill the
2332                  * inode with out-of-date times, send current
2333                  * instead */
2334                 if (cmd & OBD_BRW_WRITE) {
2335                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2336                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2337                         valid = OBD_MD_FLATIME;
2338                 } else {
2339                         oa->o_atime = LTIME_S(CURRENT_TIME);
2340                         oa->o_valid |= OBD_MD_FLATIME;
2341                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2342                 }
2343         } else {
2344                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2345         }
2346         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2347
2348         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2349         aa = ptlrpc_req_async_args(req);
2350         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2351         list_splice(rpc_list, &aa->aa_oaps);
2352         CFS_INIT_LIST_HEAD(rpc_list);
2353
2354 out:
2355         if (IS_ERR(req)) {
2356                 if (oa)
2357                         OBDO_FREE(oa);
2358                 if (pga)
2359                         OBD_FREE(pga, sizeof(*pga) * page_count);
2360         }
2361         RETURN(req);
2362 }
2363
2364 /* the loi lock is held across this function but it's allowed to release
2365  * and reacquire it during its work */
2366 /**
2367  * prepare pages for ASYNC io and put pages in send queue.
2368  *
2369  * \param cli -
2370  * \param loi -
2371  * \param cmd - OBD_BRW_* macroses
2372  * \param lop - pending pages
2373  *
2374  * \return zero if pages successfully add to send queue.
2375  * \return not zere if error occurring.
2376  */
2377 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2378                             int cmd, struct loi_oap_pages *lop)
2379 {
2380         struct ptlrpc_request *req;
2381         obd_count page_count = 0;
2382         struct osc_async_page *oap = NULL, *tmp;
2383         struct osc_brw_async_args *aa;
2384         struct obd_async_page_ops *ops;
2385         CFS_LIST_HEAD(rpc_list);
2386         unsigned int ending_offset;
2387         unsigned  starting_offset = 0;
2388         int srvlock = 0;
2389         ENTRY;
2390
2391         /* If there are HP OAPs we need to handle at least 1 of them,
2392          * move it the beginning of the pending list for that. */
2393         if (!list_empty(&lop->lop_urgent)) {
2394                 oap = list_entry(lop->lop_urgent.next,
2395                                  struct osc_async_page, oap_urgent_item);
2396                 if (oap->oap_async_flags & ASYNC_HP)
2397                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2398         }
2399
2400         /* first we find the pages we're allowed to work with */
2401         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2402                 ops = oap->oap_caller_ops;
2403
2404                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2405                          "magic 0x%x\n", oap, oap->oap_magic);
2406
2407                 if (page_count != 0 &&
2408                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2409                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2410                                " oap %p, page %p, srvlock %u\n",
2411                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2412                         break;
2413                 }
2414                 /* in llite being 'ready' equates to the page being locked
2415                  * until completion unlocks it.  commit_write submits a page
2416                  * as not ready because its unlock will happen unconditionally
2417                  * as the call returns.  if we race with commit_write giving
2418                  * us that page we dont' want to create a hole in the page
2419                  * stream, so we stop and leave the rpc to be fired by
2420                  * another dirtier or kupdated interval (the not ready page
2421                  * will still be on the dirty list).  we could call in
2422                  * at the end of ll_file_write to process the queue again. */
2423                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2424                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2425                         if (rc < 0)
2426                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2427                                                 "instead of ready\n", oap,
2428                                                 oap->oap_page, rc);
2429                         switch (rc) {
2430                         case -EAGAIN:
2431                                 /* llite is telling us that the page is still
2432                                  * in commit_write and that we should try
2433                                  * and put it in an rpc again later.  we
2434                                  * break out of the loop so we don't create
2435                                  * a hole in the sequence of pages in the rpc
2436                                  * stream.*/
2437                                 oap = NULL;
2438                                 break;
2439                         case -EINTR:
2440                                 /* the io isn't needed.. tell the checks
2441                                  * below to complete the rpc with EINTR */
2442                                 spin_lock(&oap->oap_lock);
2443                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2444                                 spin_unlock(&oap->oap_lock);
2445                                 oap->oap_count = -EINTR;
2446                                 break;
2447                         case 0:
2448                                 spin_lock(&oap->oap_lock);
2449                                 oap->oap_async_flags |= ASYNC_READY;
2450                                 spin_unlock(&oap->oap_lock);
2451                                 break;
2452                         default:
2453                                 LASSERTF(0, "oap %p page %p returned %d "
2454                                             "from make_ready\n", oap,
2455                                             oap->oap_page, rc);
2456                                 break;
2457                         }
2458                 }
2459                 if (oap == NULL)
2460                         break;
2461                 /*
2462                  * Page submitted for IO has to be locked. Either by
2463                  * ->ap_make_ready() or by higher layers.
2464                  */
2465 #if defined(__KERNEL__) && defined(__linux__)
2466                  if(!(PageLocked(oap->oap_page) &&
2467                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2468                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2469                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2470                         LBUG();
2471                 }
2472 #endif
2473                 /* If there is a gap at the start of this page, it can't merge
2474                  * with any previous page, so we'll hand the network a
2475                  * "fragmented" page array that it can't transfer in 1 RDMA */
2476                 if (page_count != 0 && oap->oap_page_off != 0)
2477                         break;
2478
2479                 /* take the page out of our book-keeping */
2480                 list_del_init(&oap->oap_pending_item);
2481                 lop_update_pending(cli, lop, cmd, -1);
2482                 list_del_init(&oap->oap_urgent_item);
2483
2484                 if (page_count == 0)
2485                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2486                                           (PTLRPC_MAX_BRW_SIZE - 1);
2487
2488                 /* ask the caller for the size of the io as the rpc leaves. */
2489                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2490                         oap->oap_count =
2491                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2492                 if (oap->oap_count <= 0) {
2493                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2494                                oap->oap_count);
2495                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2496                         continue;
2497                 }
2498
2499                 /* now put the page back in our accounting */
2500                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2501                 if (page_count == 0)
2502                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2503                 if (++page_count >= cli->cl_max_pages_per_rpc)
2504                         break;
2505
2506                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2507                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2508                  * have the same alignment as the initial writes that allocated
2509                  * extents on the server. */
2510                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2511                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2512                 if (ending_offset == 0)
2513                         break;
2514
2515                 /* If there is a gap at the end of this page, it can't merge
2516                  * with any subsequent pages, so we'll hand the network a
2517                  * "fragmented" page array that it can't transfer in 1 RDMA */
2518                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2519                         break;
2520         }
2521
2522         osc_wake_cache_waiters(cli);
2523
2524         if (page_count == 0)
2525                 RETURN(0);
2526
2527         loi_list_maint(cli, loi);
2528
2529         client_obd_list_unlock(&cli->cl_loi_list_lock);
2530
2531         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2532         if (IS_ERR(req)) {
2533                 /* this should happen rarely and is pretty bad, it makes the
2534                  * pending list not follow the dirty order */
2535                 client_obd_list_lock(&cli->cl_loi_list_lock);
2536                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2537                         list_del_init(&oap->oap_rpc_item);
2538
2539                         /* queued sync pages can be torn down while the pages
2540                          * were between the pending list and the rpc */
2541                         if (oap->oap_interrupted) {
2542                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2543                                 osc_ap_completion(cli, NULL, oap, 0,
2544                                                   oap->oap_count);
2545                                 continue;
2546                         }
2547                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2548                 }
2549                 loi_list_maint(cli, loi);
2550                 RETURN(PTR_ERR(req));
2551         }
2552
2553         aa = ptlrpc_req_async_args(req);
2554         if (cmd == OBD_BRW_READ) {
2555                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2556                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2557                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2558                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2559         } else {
2560                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2561                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2562                                  cli->cl_w_in_flight);
2563                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2564                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2565         }
2566         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2567
2568         client_obd_list_lock(&cli->cl_loi_list_lock);
2569
2570         if (cmd == OBD_BRW_READ)
2571                 cli->cl_r_in_flight++;
2572         else
2573                 cli->cl_w_in_flight++;
2574
2575         /* queued sync pages can be torn down while the pages
2576          * were between the pending list and the rpc */
2577         tmp = NULL;
2578         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2579                 /* only one oap gets a request reference */
2580                 if (tmp == NULL)
2581                         tmp = oap;
2582                 if (oap->oap_interrupted && !req->rq_intr) {
2583                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2584                                oap, req);
2585                         ptlrpc_mark_interrupted(req);
2586                 }
2587         }
2588         if (tmp != NULL)
2589                 tmp->oap_request = ptlrpc_request_addref(req);
2590
2591         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2592                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2593
2594         req->rq_interpret_reply = brw_interpret;
2595         ptlrpcd_add_req(req);
2596         RETURN(1);
2597 }
2598
2599 #define LOI_DEBUG(LOI, STR, args...)                                     \
2600         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2601                !list_empty(&(LOI)->loi_ready_item) ||                    \
2602                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2603                (LOI)->loi_write_lop.lop_num_pending,                     \
2604                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2605                (LOI)->loi_read_lop.lop_num_pending,                      \
2606                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2607                args)                                                     \
2608
2609 /* This is called by osc_check_rpcs() to find which objects have pages that
2610  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2611 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2612 {
2613         ENTRY;
2614         /* First return objects that have blocked locks so that they
2615          * will be flushed quickly and other clients can get the lock,
2616          * then objects which have pages ready to be stuffed into RPCs */
2617         if (!list_empty(&cli->cl_loi_hp_ready_list))
2618                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2619                                   struct lov_oinfo, loi_hp_ready_item));
2620         if (!list_empty(&cli->cl_loi_ready_list))
2621                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2622                                   struct lov_oinfo, loi_ready_item));
2623
2624         /* then if we have cache waiters, return all objects with queued
2625          * writes.  This is especially important when many small files
2626          * have filled up the cache and not been fired into rpcs because
2627          * they don't pass the nr_pending/object threshhold */
2628         if (!list_empty(&cli->cl_cache_waiters) &&
2629             !list_empty(&cli->cl_loi_write_list))
2630                 RETURN(list_entry(cli->cl_loi_write_list.next,
2631                                   struct lov_oinfo, loi_write_item));
2632
2633         /* then return all queued objects when we have an invalid import
2634          * so that they get flushed */
2635         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2636                 if (!list_empty(&cli->cl_loi_write_list))
2637                         RETURN(list_entry(cli->cl_loi_write_list.next,
2638                                           struct lov_oinfo, loi_write_item));
2639                 if (!list_empty(&cli->cl_loi_read_list))
2640                         RETURN(list_entry(cli->cl_loi_read_list.next,
2641                                           struct lov_oinfo, loi_read_item));
2642         }
2643         RETURN(NULL);
2644 }
2645
2646 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2647 {
2648         struct osc_async_page *oap;
2649         int hprpc = 0;
2650
2651         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2652                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2653                                  struct osc_async_page, oap_urgent_item);
2654                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2655         }
2656
2657         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2658                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2659                                  struct osc_async_page, oap_urgent_item);
2660                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2661         }
2662
2663         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2664 }
2665
2666 /* called with the loi list lock held */
2667 static void osc_check_rpcs(struct client_obd *cli)
2668 {
2669         struct lov_oinfo *loi;
2670         int rc = 0, race_counter = 0;
2671         ENTRY;
2672
2673         while ((loi = osc_next_loi(cli)) != NULL) {
2674                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2675
2676                 if (osc_max_rpc_in_flight(cli, loi))
2677                         break;
2678
2679                 /* attempt some read/write balancing by alternating between
2680                  * reads and writes in an object.  The makes_rpc checks here
2681                  * would be redundant if we were getting read/write work items
2682                  * instead of objects.  we don't want send_oap_rpc to drain a
2683                  * partial read pending queue when we're given this object to
2684                  * do io on writes while there are cache waiters */
2685                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2686                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2687                                               &loi->loi_write_lop);
2688                         if (rc < 0)
2689                                 break;
2690                         if (rc > 0)
2691                                 race_counter = 0;
2692                         else
2693                                 race_counter++;
2694                 }
2695                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2696                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2697                                               &loi->loi_read_lop);
2698                         if (rc < 0)
2699                                 break;
2700                         if (rc > 0)
2701                                 race_counter = 0;
2702                         else
2703                                 race_counter++;
2704                 }
2705
2706                 /* attempt some inter-object balancing by issueing rpcs
2707                  * for each object in turn */
2708                 if (!list_empty(&loi->loi_hp_ready_item))
2709                         list_del_init(&loi->loi_hp_ready_item);
2710                 if (!list_empty(&loi->loi_ready_item))
2711                         list_del_init(&loi->loi_ready_item);
2712                 if (!list_empty(&loi->loi_write_item))
2713                         list_del_init(&loi->loi_write_item);
2714                 if (!list_empty(&loi->loi_read_item))
2715                         list_del_init(&loi->loi_read_item);
2716
2717                 loi_list_maint(cli, loi);
2718
2719                 /* send_oap_rpc fails with 0 when make_ready tells it to
2720                  * back off.  llite's make_ready does this when it tries
2721                  * to lock a page queued for write that is already locked.
2722                  * we want to try sending rpcs from many objects, but we
2723                  * don't want to spin failing with 0.  */
2724                 if (race_counter == 10)
2725                         break;
2726         }
2727         EXIT;
2728 }
2729
2730 /* we're trying to queue a page in the osc so we're subject to the
2731  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2732  * If the osc's queued pages are already at that limit, then we want to sleep
2733  * until there is space in the osc's queue for us.  We also may be waiting for
2734  * write credits from the OST if there are RPCs in flight that may return some
2735  * before we fall back to sync writes.
2736  *
2737  * We need this know our allocation was granted in the presence of signals */
2738 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2739 {
2740         int rc;
2741         ENTRY;
2742         client_obd_list_lock(&cli->cl_loi_list_lock);
2743         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2744         client_obd_list_unlock(&cli->cl_loi_list_lock);
2745         RETURN(rc);
2746 };
2747
2748 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2749  * grant or cache space. */
2750 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2751                            struct osc_async_page *oap)
2752 {
2753         struct osc_cache_waiter ocw;
2754         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2755         ENTRY;
2756
2757         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2758                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2759                cli->cl_dirty_max, obd_max_dirty_pages,
2760                cli->cl_lost_grant, cli->cl_avail_grant);
2761
2762         /* force the caller to try sync io.  this can jump the list
2763          * of queued writes and create a discontiguous rpc stream */
2764         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2765             loi->loi_ar.ar_force_sync)
2766                 RETURN(-EDQUOT);
2767
2768         /* Hopefully normal case - cache space and write credits available */
2769         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2770             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2771             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2772                 /* account for ourselves */
2773                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2774                 RETURN(0);
2775         }
2776
2777         /* It is safe to block as a cache waiter as long as there is grant
2778          * space available or the hope of additional grant being returned
2779          * when an in flight write completes.  Using the write back cache
2780          * if possible is preferable to sending the data synchronously
2781          * because write pages can then be merged in to large requests.
2782          * The addition of this cache waiter will causing pending write
2783          * pages to be sent immediately. */
2784         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2785                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2786                 cfs_waitq_init(&ocw.ocw_waitq);
2787                 ocw.ocw_oap = oap;
2788                 ocw.ocw_rc = 0;
2789
2790                 loi_list_maint(cli, loi);
2791                 osc_check_rpcs(cli);
2792                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2793
2794                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2795                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2796
2797                 client_obd_list_lock(&cli->cl_loi_list_lock);
2798                 if (!list_empty(&ocw.ocw_entry)) {
2799                         list_del(&ocw.ocw_entry);
2800                         RETURN(-EINTR);
2801                 }
2802                 RETURN(ocw.ocw_rc);
2803         }
2804
2805         RETURN(-EDQUOT);
2806 }
2807
2808 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2809                         void **res, int rw, obd_off start, obd_off end,
2810                         struct lustre_handle *lockh, int flags)
2811 {
2812         struct ldlm_lock *lock = NULL;
2813         int rc, release = 0;
2814
2815         ENTRY;
2816
2817         if (lockh && lustre_handle_is_used(lockh)) {
2818                 /* if a valid lockh is passed, just check that the corresponding
2819                  * lock covers the extent */
2820                 lock = ldlm_handle2lock(lockh);
2821                 release = 1;
2822         } else {
2823                 struct osc_async_page *oap = *res;
2824                 spin_lock(&oap->oap_lock);
2825                 lock = oap->oap_ldlm_lock;
2826                 if (likely(lock))
2827                         LDLM_LOCK_GET(lock);
2828                 spin_unlock(&oap->oap_lock);
2829         }
2830         /* lock can be NULL in case race obd_get_lock vs lock cancel
2831          * so we should be don't try match this */
2832         if (unlikely(!lock))
2833                 return 0;
2834
2835         rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2836         if (release == 1 && rc == 1)
2837                 /* if a valid lockh was passed, we just need to check
2838                  * that the lock covers the page, no reference should be
2839                  * taken*/
2840                 ldlm_lock_decref(lockh,
2841                                  rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2842         LDLM_LOCK_PUT(lock);
2843         RETURN(rc);
2844 }
2845
2846 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2847                         struct lov_oinfo *loi, cfs_page_t *page,
2848                         obd_off offset, struct obd_async_page_ops *ops,
2849                         void *data, void **res, int flags,
2850                         struct lustre_handle *lockh)
2851 {
2852         struct osc_async_page *oap;
2853         struct ldlm_res_id oid = {{0}};
2854         int rc = 0;
2855
2856         ENTRY;
2857
2858         if (!page)
2859                 return size_round(sizeof(*oap));
2860
2861         oap = *res;
2862         oap->oap_magic = OAP_MAGIC;
2863         oap->oap_cli = &exp->exp_obd->u.cli;
2864         oap->oap_loi = loi;
2865
2866         oap->oap_caller_ops = ops;
2867         oap->oap_caller_data = data;
2868
2869         oap->oap_page = page;
2870         oap->oap_obj_off = offset;
2871
2872         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2873         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2874         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2875         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2876
2877         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2878
2879         spin_lock_init(&oap->oap_lock);
2880
2881         /* If the page was marked as notcacheable - don't add to any locks */
2882         if (!(flags & OBD_PAGE_NO_CACHE)) {
2883                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2884                 /* This is the only place where we can call cache_add_extent
2885                    without oap_lock, because this page is locked now, and
2886                    the lock we are adding it to is referenced, so cannot lose
2887                    any pages either. */
2888                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2889                 if (rc)
2890                         RETURN(rc);
2891         }
2892
2893         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2894         RETURN(0);
2895 }
2896
2897 struct osc_async_page *oap_from_cookie(void *cookie)
2898 {
2899         struct osc_async_page *oap = cookie;
2900         if (oap->oap_magic != OAP_MAGIC)
2901                 return ERR_PTR(-EINVAL);
2902         return oap;
2903 };
2904
2905 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2906                               struct lov_oinfo *loi, void *cookie,
2907                               int cmd, obd_off off, int count,
2908                               obd_flag brw_flags, enum async_flags async_flags)
2909 {
2910         struct client_obd *cli = &exp->exp_obd->u.cli;
2911         struct osc_async_page *oap;
2912         int rc = 0;
2913         ENTRY;
2914
2915         oap = oap_from_cookie(cookie);
2916         if (IS_ERR(oap))
2917                 RETURN(PTR_ERR(oap));
2918
2919         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2920                 RETURN(-EIO);
2921
2922         if (!list_empty(&oap->oap_pending_item) ||
2923             !list_empty(&oap->oap_urgent_item) ||
2924             !list_empty(&oap->oap_rpc_item))
2925                 RETURN(-EBUSY);
2926
2927         /* check if the file's owner/group is over quota */
2928         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2929                 struct obd_async_page_ops *ops;
2930                 struct obdo *oa;
2931
2932                 OBDO_ALLOC(oa);
2933                 if (oa == NULL)
2934                         RETURN(-ENOMEM);
2935
2936                 ops = oap->oap_caller_ops;
2937                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2938                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2939                     NO_QUOTA)
2940                         rc = -EDQUOT;
2941
2942                 OBDO_FREE(oa);
2943                 if (rc)
2944                         RETURN(rc);
2945         }
2946
2947         if (loi == NULL)
2948                 loi = lsm->lsm_oinfo[0];
2949
2950         client_obd_list_lock(&cli->cl_loi_list_lock);
2951
2952         oap->oap_cmd = cmd;
2953         oap->oap_page_off = off;
2954         oap->oap_count = count;
2955         oap->oap_brw_flags = brw_flags;
2956         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2957         if (libcfs_memory_pressure_get())
2958                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2959         spin_lock(&oap->oap_lock);
2960         oap->oap_async_flags = async_flags;
2961         spin_unlock(&oap->oap_lock);
2962
2963         if (cmd & OBD_BRW_WRITE) {
2964                 rc = osc_enter_cache(cli, loi, oap);
2965                 if (rc) {
2966                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2967                         RETURN(rc);
2968                 }
2969         }
2970
2971         osc_oap_to_pending(oap);
2972         loi_list_maint(cli, loi);
2973
2974         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2975                   cmd);
2976
2977         osc_check_rpcs(cli);
2978         client_obd_list_unlock(&cli->cl_loi_list_lock);
2979
2980         RETURN(0);
2981 }
2982
2983 /* aka (~was & now & flag), but this is more clear :) */
2984 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2985
2986 static int osc_set_async_flags(struct obd_export *exp,
2987                                struct lov_stripe_md *lsm,
2988                                struct lov_oinfo *loi, void *cookie,
2989                                obd_flag async_flags)
2990 {
2991         struct client_obd *cli = &exp->exp_obd->u.cli;
2992         struct loi_oap_pages *lop;
2993         struct osc_async_page *oap;
2994         int rc = 0;
2995         ENTRY;
2996
2997         oap = oap_from_cookie(cookie);
2998         if (IS_ERR(oap))
2999                 RETURN(PTR_ERR(oap));
3000
3001         /*
3002          * bug 7311: OST-side locking is only supported for liblustre for now
3003          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
3004          * implementation has to handle case where OST-locked page was picked
3005          * up by, e.g., ->writepage().
3006          */
3007         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
3008         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
3009                                      * tread here. */
3010
3011         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3012                 RETURN(-EIO);
3013
3014         if (loi == NULL)
3015                 loi = lsm->lsm_oinfo[0];
3016
3017         if (oap->oap_cmd & OBD_BRW_WRITE) {
3018                 lop = &loi->loi_write_lop;
3019         } else {
3020                 lop = &loi->loi_read_lop;
3021         }
3022
3023         client_obd_list_lock(&cli->cl_loi_list_lock);
3024         /* oap_lock provides atomic semantics of oap_async_flags access */
3025         spin_lock(&oap->oap_lock);
3026         if (list_empty(&oap->oap_pending_item))
3027                 GOTO(out, rc = -EINVAL);
3028
3029         if ((oap->oap_async_flags & async_flags) == async_flags)
3030                 GOTO(out, rc = 0);
3031
3032         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3033                 oap->oap_async_flags |= ASYNC_READY;
3034
3035         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3036             list_empty(&oap->oap_rpc_item)) {
3037                 if (oap->oap_async_flags & ASYNC_HP)
3038                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3039                 else
3040                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3041                 oap->oap_async_flags |= ASYNC_URGENT;
3042                 loi_list_maint(cli, loi);
3043         }
3044
3045         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3046                         oap->oap_async_flags);
3047 out:
3048         spin_unlock(&oap->oap_lock);
3049         osc_check_rpcs(cli);
3050         client_obd_list_unlock(&cli->cl_loi_list_lock);
3051         RETURN(rc);
3052 }
3053
3054 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3055                              struct lov_oinfo *loi,
3056                              struct obd_io_group *oig, void *cookie,
3057                              int cmd, obd_off off, int count,
3058                              obd_flag brw_flags,
3059                              obd_flag async_flags)
3060 {
3061         struct client_obd *cli = &exp->exp_obd->u.cli;
3062         struct osc_async_page *oap;
3063         struct loi_oap_pages *lop;
3064         int rc = 0;
3065         ENTRY;
3066
3067         oap = oap_from_cookie(cookie);
3068         if (IS_ERR(oap))
3069                 RETURN(PTR_ERR(oap));
3070
3071         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3072                 RETURN(-EIO);
3073
3074         if (!list_empty(&oap->oap_pending_item) ||
3075             !list_empty(&oap->oap_urgent_item) ||
3076             !list_empty(&oap->oap_rpc_item))
3077                 RETURN(-EBUSY);
3078
3079         if (loi == NULL)
3080                 loi = lsm->lsm_oinfo[0];
3081
3082         client_obd_list_lock(&cli->cl_loi_list_lock);
3083
3084         oap->oap_cmd = cmd;
3085         oap->oap_page_off = off;
3086         oap->oap_count = count;
3087         oap->oap_brw_flags = brw_flags;
3088         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3089         if (libcfs_memory_pressure_get())
3090                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3091         spin_lock(&oap->oap_lock);
3092         oap->oap_async_flags = async_flags;
3093         spin_unlock(&oap->oap_lock);
3094
3095         if (cmd & OBD_BRW_WRITE)
3096                 lop = &loi->loi_write_lop;
3097         else
3098                 lop = &loi->loi_read_lop;
3099
3100         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3101         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3102                 oap->oap_oig = oig;
3103                 rc = oig_add_one(oig, &oap->oap_occ);
3104         }
3105
3106         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3107                   oap, oap->oap_page, rc);
3108
3109         client_obd_list_unlock(&cli->cl_loi_list_lock);
3110
3111         RETURN(rc);
3112 }
3113
3114 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3115                                  struct loi_oap_pages *lop, int cmd)
3116 {
3117         struct list_head *pos, *tmp;
3118         struct osc_async_page *oap;
3119
3120         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3121                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3122                 list_del(&oap->oap_pending_item);
3123                 osc_oap_to_pending(oap);
3124         }
3125         loi_list_maint(cli, loi);
3126 }
3127
3128 static int osc_trigger_group_io(struct obd_export *exp,
3129                                 struct lov_stripe_md *lsm,
3130                                 struct lov_oinfo *loi,
3131                                 struct obd_io_group *oig)
3132 {
3133         struct client_obd *cli = &exp->exp_obd->u.cli;
3134         ENTRY;
3135
3136         if (loi == NULL)
3137                 loi = lsm->lsm_oinfo[0];
3138
3139         client_obd_list_lock(&cli->cl_loi_list_lock);
3140
3141         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3142         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3143
3144         osc_check_rpcs(cli);
3145         client_obd_list_unlock(&cli->cl_loi_list_lock);
3146
3147         RETURN(0);
3148 }
3149
3150 static int osc_teardown_async_page(struct obd_export *exp,
3151                                    struct lov_stripe_md *lsm,
3152                                    struct lov_oinfo *loi, void *cookie)
3153 {
3154         struct client_obd *cli = &exp->exp_obd->u.cli;
3155         struct loi_oap_pages *lop;
3156         struct osc_async_page *oap;
3157         int rc = 0;
3158         ENTRY;
3159
3160         oap = oap_from_cookie(cookie);
3161         if (IS_ERR(oap))
3162                 RETURN(PTR_ERR(oap));
3163
3164         if (loi == NULL)
3165                 loi = lsm->lsm_oinfo[0];
3166
3167         if (oap->oap_cmd & OBD_BRW_WRITE) {
3168                 lop = &loi->loi_write_lop;
3169         } else {
3170                 lop = &loi->loi_read_lop;
3171         }
3172
3173         client_obd_list_lock(&cli->cl_loi_list_lock);
3174
3175         if (!list_empty(&oap->oap_rpc_item))
3176                 GOTO(out, rc = -EBUSY);
3177
3178         osc_exit_cache(cli, oap, 0);
3179         osc_wake_cache_waiters(cli);
3180
3181         if (!list_empty(&oap->oap_urgent_item)) {
3182                 list_del_init(&oap->oap_urgent_item);
3183                 spin_lock(&oap->oap_lock);
3184                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3185                 spin_unlock(&oap->oap_lock);
3186         }
3187
3188         if (!list_empty(&oap->oap_pending_item)) {
3189                 list_del_init(&oap->oap_pending_item);
3190                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3191         }
3192         loi_list_maint(cli, loi);
3193         cache_remove_extent(cli->cl_cache, oap);
3194
3195         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3196 out:
3197         client_obd_list_unlock(&cli->cl_loi_list_lock);
3198         RETURN(rc);
3199 }
3200
3201 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3202                            struct ldlm_lock_desc *new, void *data,
3203                            int flag)
3204 {
3205         struct lustre_handle lockh = { 0 };
3206         int rc;
3207         ENTRY;
3208
3209         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3210                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3211                 LBUG();
3212         }
3213
3214         switch (flag) {
3215         case LDLM_CB_BLOCKING:
3216                 ldlm_lock2handle(lock, &lockh);
3217                 rc = ldlm_cli_cancel(&lockh);
3218                 if (rc != ELDLM_OK)
3219                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
3220                 break;
3221         case LDLM_CB_CANCELING: {
3222
3223                 ldlm_lock2handle(lock, &lockh);
3224                 /* This lock wasn't granted, don't try to do anything */
3225                 if (lock->l_req_mode != lock->l_granted_mode)
3226                         RETURN(0);
3227
3228                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3229                                   &lockh);
3230
3231                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3232                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3233                                                           lock, new, data,flag);
3234                 break;
3235         }
3236         default:
3237                 LBUG();
3238         }
3239
3240         RETURN(0);
3241 }
3242 EXPORT_SYMBOL(osc_extent_blocking_cb);
3243
3244 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3245                                     int flags)
3246 {
3247         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3248
3249         if (lock == NULL) {
3250                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3251                 return;
3252         }
3253         lock_res_and_lock(lock);
3254 #if defined (__KERNEL__) && defined (__linux__)
3255         /* Liang XXX: Darwin and Winnt checking should be added */
3256         if (lock->l_ast_data && lock->l_ast_data != data) {
3257                 struct inode *new_inode = data;
3258                 struct inode *old_inode = lock->l_ast_data;
3259                 if (!(old_inode->i_state & I_FREEING))
3260                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3261                 LASSERTF(old_inode->i_state & I_FREEING,
3262                          "Found existing inode %p/%lu/%u state %lu in lock: "
3263                          "setting data to %p/%lu/%u\n", old_inode,
3264                          old_inode->i_ino, old_inode->i_generation,
3265                          old_inode->i_state,
3266                          new_inode, new_inode->i_ino, new_inode->i_generation);
3267         }
3268 #endif
3269         lock->l_ast_data = data;
3270         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3271         unlock_res_and_lock(lock);
3272         LDLM_LOCK_PUT(lock);
3273 }
3274
3275 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3276                              ldlm_iterator_t replace, void *data)
3277 {
3278         struct ldlm_res_id res_id;
3279         struct obd_device *obd = class_exp2obd(exp);
3280
3281         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3282         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3283         return 0;
3284 }
3285
3286 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3287                             struct obd_info *oinfo, int intent, int rc)
3288 {
3289         ENTRY;
3290
3291         if (intent) {
3292                 /* The request was created before ldlm_cli_enqueue call. */
3293                 if (rc == ELDLM_LOCK_ABORTED) {
3294                         struct ldlm_reply *rep;
3295
3296                         /* swabbed by ldlm_cli_enqueue() */
3297                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3298                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3299                                              sizeof(*rep));
3300                         LASSERT(rep != NULL);
3301                         if (rep->lock_policy_res1)
3302                                 rc = rep->lock_policy_res1;
3303                 }
3304         }
3305
3306         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3307                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3308                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3309                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3310                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3311         }
3312
3313         if (!rc)
3314                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3315
3316         /* Call the update callback. */
3317         rc = oinfo->oi_cb_up(oinfo, rc);
3318         RETURN(rc);
3319 }
3320
3321 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3322                                  void *data, int rc)
3323 {
3324         struct osc_enqueue_args *aa = data;
3325         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3326         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3327         struct ldlm_lock *lock;
3328
3329         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3330          * be valid. */
3331         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3332
3333         /* Complete obtaining the lock procedure. */
3334         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3335                                    aa->oa_ei->ei_mode,
3336                                    &aa->oa_oi->oi_flags,
3337                                    &lsm->lsm_oinfo[0]->loi_lvb,
3338                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3339                                    lustre_swab_ost_lvb,
3340                                    aa->oa_oi->oi_lockh, rc);
3341
3342         /* Complete osc stuff. */
3343         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3344
3345         /* Release the lock for async request. */
3346         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3347                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3348
3349         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3350                  aa->oa_oi->oi_lockh, req, aa);
3351         LDLM_LOCK_PUT(lock);
3352         return rc;
3353 }
3354
3355 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3356  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3357  * other synchronous requests, however keeping some locks and trying to obtain
3358  * others may take a considerable amount of time in a case of ost failure; and
3359  * when other sync requests do not get released lock from a client, the client
3360  * is excluded from the cluster -- such scenarious make the life difficult, so
3361  * release locks just after they are obtained. */
3362 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3363                        struct ldlm_enqueue_info *einfo,
3364                        struct ptlrpc_request_set *rqset)
3365 {
3366         struct ldlm_res_id res_id;
3367         struct obd_device *obd = exp->exp_obd;
3368         struct ldlm_reply *rep;
3369         struct ptlrpc_request *req = NULL;
3370         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3371         ldlm_mode_t mode;
3372         int rc;
3373         ENTRY;
3374
3375         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3376                            oinfo->oi_md->lsm_object_gr, &res_id);
3377         /* Filesystem lock extents are extended to page boundaries so that
3378          * dealing with the page cache is a little smoother.  */
3379         oinfo->oi_policy.l_extent.start -=
3380                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3381         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3382
3383         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3384                 goto no_match;
3385
3386         /* Next, search for already existing extent locks that will cover us */
3387         /* If we're trying to read, we also search for an existing PW lock.  The
3388          * VFS and page cache already protect us locally, so lots of readers/
3389          * writers can share a single PW lock.
3390          *
3391          * There are problems with conversion deadlocks, so instead of
3392          * converting a read lock to a write lock, we'll just enqueue a new
3393          * one.
3394          *
3395          * At some point we should cancel the read lock instead of making them
3396          * send us a blocking callback, but there are problems with canceling
3397          * locks out from other users right now, too. */
3398         mode = einfo->ei_mode;
3399         if (einfo->ei_mode == LCK_PR)
3400                 mode |= LCK_PW;
3401         mode = ldlm_lock_match(obd->obd_namespace,
3402                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3403                                einfo->ei_type, &oinfo->oi_policy, mode,
3404                                oinfo->oi_lockh);
3405         if (mode) {
3406                 /* addref the lock only if not async requests and PW lock is
3407                  * matched whereas we asked for PR. */
3408                 if (!rqset && einfo->ei_mode != mode)
3409                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3410                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3411                                         oinfo->oi_flags);
3412                 if (intent) {
3413                         /* I would like to be able to ASSERT here that rss <=
3414                          * kms, but I can't, for reasons which are explained in
3415                          * lov_enqueue() */
3416                 }
3417
3418                 /* We already have a lock, and it's referenced */
3419                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3420
3421                 /* For async requests, decref the lock. */
3422                 if (einfo->ei_mode != mode)
3423                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3424                 else if (rqset)
3425                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3426
3427                 RETURN(ELDLM_OK);
3428         }
3429
3430  no_match:
3431         if (intent) {
3432                 __u32 size[3] = {
3433                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3434                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3435                         [DLM_LOCKREQ_OFF + 1] = 0 };
3436
3437                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3438                 if (req == NULL)
3439                         RETURN(-ENOMEM);
3440
3441                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3442                 size[DLM_REPLY_REC_OFF] =
3443                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3444                 ptlrpc_req_set_repsize(req, 3, size);
3445         }
3446
3447         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3448         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3449
3450         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3451                               &oinfo->oi_policy, &oinfo->oi_flags,
3452                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3453                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3454                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3455                               rqset ? 1 : 0);
3456         if (rqset) {
3457                 if (!rc) {
3458                         struct osc_enqueue_args *aa;
3459                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3460                         aa = ptlrpc_req_async_args(req);
3461                         aa->oa_oi = oinfo;
3462                         aa->oa_ei = einfo;
3463                         aa->oa_exp = exp;
3464
3465                         req->rq_interpret_reply = osc_enqueue_interpret;
3466                         ptlrpc_set_add_req(rqset, req);
3467                 } else if (intent) {
3468                         ptlrpc_req_finished(req);
3469                 }
3470                 RETURN(rc);
3471         }
3472
3473         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3474         if (intent)
3475                 ptlrpc_req_finished(req);
3476
3477         RETURN(rc);
3478 }
3479
3480 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3481                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3482                      int *flags, void *data, struct lustre_handle *lockh,
3483                      int *n_matches)
3484 {
3485         struct ldlm_res_id res_id;
3486         struct obd_device *obd = exp->exp_obd;
3487         int lflags = *flags;
3488         ldlm_mode_t rc;
3489         ENTRY;
3490
3491         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3492
3493         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3494
3495         /* Filesystem lock extents are extended to page boundaries so that
3496          * dealing with the page cache is a little smoother */
3497         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3498         policy->l_extent.end |= ~CFS_PAGE_MASK;
3499
3500         /* Next, search for already existing extent locks that will cover us */
3501         /* If we're trying to read, we also search for an existing PW lock.  The
3502          * VFS and page cache already protect us locally, so lots of readers/
3503          * writers can share a single PW lock. */
3504         rc = mode;
3505         if (mode == LCK_PR)
3506                 rc |= LCK_PW;
3507         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3508                              &res_id, type, policy, rc, lockh);
3509         if (rc) {
3510                 osc_set_data_with_check(lockh, data, lflags);
3511                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3512                         ldlm_lock_addref(lockh, LCK_PR);
3513                         ldlm_lock_decref(lockh, LCK_PW);
3514                 }
3515                 if (n_matches != NULL)
3516                         (*n_matches)++;
3517         }
3518
3519         RETURN(rc);
3520 }
3521
3522 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3523                       __u32 mode, struct lustre_handle *lockh, int flags,
3524                       obd_off end)
3525 {
3526         ENTRY;
3527
3528         if (unlikely(mode == LCK_GROUP))
3529                 ldlm_lock_decref_and_cancel(lockh, mode);
3530         else
3531                 ldlm_lock_decref(lockh, mode);
3532
3533         RETURN(0);
3534 }
3535
3536 static int osc_cancel_unused(struct obd_export *exp,
3537                              struct lov_stripe_md *lsm, int flags, void *opaque)
3538 {
3539         struct obd_device *obd = class_exp2obd(exp);
3540         struct ldlm_res_id res_id, *resp = NULL;
3541
3542         if (lsm != NULL) {
3543                 resp = osc_build_res_name(lsm->lsm_object_id,
3544                                           lsm->lsm_object_gr, &res_id);
3545         }
3546
3547         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3548
3549 }
3550
3551 static int osc_join_lru(struct obd_export *exp,
3552                         struct lov_stripe_md *lsm, int join)
3553 {
3554         struct obd_device *obd = class_exp2obd(exp);
3555         struct ldlm_res_id res_id, *resp = NULL;
3556
3557         if (lsm != NULL) {
3558                 resp = osc_build_res_name(lsm->lsm_object_id,
3559                                           lsm->lsm_object_gr, &res_id);
3560         }
3561
3562         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3563
3564 }
3565
3566 static int osc_statfs_interpret(struct ptlrpc_request *req,
3567                                 void *data, int rc)
3568 {
3569         struct osc_async_args *aa = data;
3570         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3571         struct obd_statfs *msfs;
3572         __u64 used;
3573         ENTRY;
3574
3575         if (rc == -EBADR)
3576                 /* The request has in fact never been sent
3577                  * due to issues at a higher level (LOV).
3578                  * Exit immediately since the caller is
3579                  * aware of the problem and takes care
3580                  * of the clean up */
3581                  RETURN(rc);
3582
3583         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3584             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3585                 GOTO(out, rc = 0);
3586
3587         if (rc != 0)
3588                 GOTO(out, rc);
3589
3590         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3591                                   lustre_swab_obd_statfs);
3592         if (msfs == NULL) {
3593                 CERROR("Can't unpack obd_statfs\n");
3594                 GOTO(out, rc = -EPROTO);
3595         }
3596
3597         /* Reinitialize the RDONLY and DEGRADED flags at the client
3598          * on each statfs, so they don't stay set permanently. */
3599         spin_lock(&cli->cl_oscc.oscc_lock);
3600
3601         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3602                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3603         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3604                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3605
3606         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3607                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3608         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3609                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3610
3611         /* Add a bit of hysteresis so this flag isn't continually flapping,
3612          * and ensure that new files don't get extremely fragmented due to
3613          * only a small amount of available space in the filesystem.
3614          * We want to set the NOSPC flag when there is less than ~0.1% free
3615          * and clear it when there is at least ~0.2% free space, so:
3616          *                   avail < ~0.1% max          max = avail + used
3617          *            1025 * avail < avail + used       used = blocks - free
3618          *            1024 * avail < used
3619          *            1024 * avail < blocks - free
3620          *                   avail < ((blocks - free) >> 10)
3621          *
3622          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3623          * lose that amount of space so in those cases we report no space left
3624          * if their is less than 1 GB left.                             */
3625         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3626         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3627                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3628                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3629         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3630                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3631                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3632
3633         spin_unlock(&cli->cl_oscc.oscc_lock);
3634
3635         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3636 out:
3637         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3638         RETURN(rc);
3639 }
3640
3641 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3642                             __u64 max_age, struct ptlrpc_request_set *rqset)
3643 {
3644         struct ptlrpc_request *req;
3645         struct osc_async_args *aa;
3646         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3647         ENTRY;
3648
3649         /* We could possibly pass max_age in the request (as an absolute
3650          * timestamp or a "seconds.usec ago") so the target can avoid doing
3651          * extra calls into the filesystem if that isn't necessary (e.g.
3652          * during mount that would help a bit).  Having relative timestamps
3653          * is not so great if request processing is slow, while absolute
3654          * timestamps are not ideal because they need time synchronization. */
3655         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3656                               OST_STATFS, 1, NULL, NULL);
3657         if (!req)
3658                 RETURN(-ENOMEM);
3659
3660         ptlrpc_req_set_repsize(req, 2, size);
3661         req->rq_request_portal = OST_CREATE_PORTAL;
3662         ptlrpc_at_set_req_timeout(req);
3663         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3664                 /* procfs requests not want stat in wait for avoid deadlock */
3665                 req->rq_no_resend = 1;
3666                 req->rq_no_delay = 1;
3667         }
3668
3669         req->rq_interpret_reply = osc_statfs_interpret;
3670         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3671         aa = ptlrpc_req_async_args(req);
3672         aa->aa_oi = oinfo;
3673
3674         ptlrpc_set_add_req(rqset, req);
3675         RETURN(0);
3676 }
3677
3678 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3679                       __u64 max_age, __u32 flags)
3680 {
3681         struct obd_statfs *msfs;
3682         struct ptlrpc_request *req;
3683         struct obd_import     *imp = NULL;
3684         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3685         int rc;
3686         ENTRY;
3687
3688         /*Since the request might also come from lprocfs, so we need
3689          *sync this with client_disconnect_export Bug15684*/
3690         down_read(&obd->u.cli.cl_sem);
3691         if (obd->u.cli.cl_import)
3692                 imp = class_import_get(obd->u.cli.cl_import);
3693         up_read(&obd->u.cli.cl_sem);
3694         if (!imp)
3695                 RETURN(-ENODEV);
3696
3697         /* We could possibly pass max_age in the request (as an absolute
3698          * timestamp or a "seconds.usec ago") so the target can avoid doing
3699          * extra calls into the filesystem if that isn't necessary (e.g.
3700          * during mount that would help a bit).  Having relative timestamps
3701          * is not so great if request processing is slow, while absolute
3702          * timestamps are not ideal because they need time synchronization. */
3703         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3704                               OST_STATFS, 1, NULL, NULL);
3705
3706         class_import_put(imp);
3707         if (!req)
3708                 RETURN(-ENOMEM);
3709
3710         ptlrpc_req_set_repsize(req, 2, size);
3711         req->rq_request_portal = OST_CREATE_PORTAL;
3712         ptlrpc_at_set_req_timeout(req);
3713
3714         if (flags & OBD_STATFS_NODELAY) {
3715                 /* procfs requests not want stat in wait for avoid deadlock */
3716                 req->rq_no_resend = 1;
3717                 req->rq_no_delay = 1;
3718         }
3719
3720         rc = ptlrpc_queue_wait(req);
3721         if (rc)
3722                 GOTO(out, rc);
3723
3724         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3725                                   lustre_swab_obd_statfs);
3726         if (msfs == NULL) {
3727                 CERROR("Can't unpack obd_statfs\n");
3728                 GOTO(out, rc = -EPROTO);
3729         }
3730
3731         memcpy(osfs, msfs, sizeof(*osfs));
3732
3733         EXIT;
3734  out:
3735         ptlrpc_req_finished(req);
3736         return rc;
3737 }
3738
3739 /* Retrieve object striping information.
3740  *
3741  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3742  * the maximum number of OST indices which will fit in the user buffer.
3743  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3744  */
3745 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3746 {
3747         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3748         struct lov_user_md_v3 lum, *lumk;
3749         int rc = 0, lum_size;
3750         struct lov_user_ost_data_v1 *lmm_objects;
3751         ENTRY;
3752
3753         if (!lsm)
3754                 RETURN(-ENODATA);
3755
3756         /* we only need the header part from user space to get lmm_magic and
3757          * lmm_stripe_count, (the header part is common to v1 and v3) */
3758         lum_size = sizeof(struct lov_user_md_v1);
3759         memset(&lum, 0x00, sizeof(lum));
3760         if (copy_from_user(&lum, lump, lum_size))
3761                 RETURN(-EFAULT);
3762
3763         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3764             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3765                 RETURN(-EINVAL);
3766
3767         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3768         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3769         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3770         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3771
3772         /* we can use lov_mds_md_size() to compute lum_size
3773          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3774         if (lum.lmm_stripe_count > 0) {
3775                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3776                 OBD_ALLOC(lumk, lum_size);
3777                 if (!lumk)
3778                         RETURN(-ENOMEM);
3779                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3780                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3781                 else
3782                         lmm_objects = &(lumk->lmm_objects[0]);
3783                 lmm_objects->l_object_id = lsm->lsm_object_id;
3784         } else {
3785                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3786                 lumk = &lum;
3787         }
3788
3789         lumk->lmm_magic = lum.lmm_magic;
3790         lumk->lmm_stripe_count = 1;
3791         lumk->lmm_object_id = lsm->lsm_object_id;
3792
3793         if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3794             (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3795                /* lsm not in host order, so count also need be in same order */
3796                 __swab32s(&lumk->lmm_magic);
3797                 __swab16s(&lumk->lmm_stripe_count);
3798                 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3799                 if (lum.lmm_stripe_count > 0)
3800                         lustre_swab_lov_user_md_objects(
3801                                 (struct lov_user_md_v1*)lumk);
3802         }
3803
3804         if (copy_to_user(lump, lumk, lum_size))
3805                 rc = -EFAULT;
3806
3807         if (lumk != &lum)
3808                 OBD_FREE(lumk, lum_size);
3809
3810         RETURN(rc);
3811 }
3812
3813
3814 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3815                          void *karg, void *uarg)
3816 {
3817         struct obd_device *obd = exp->exp_obd;
3818         struct obd_ioctl_data *data = karg;
3819         int err = 0;
3820         ENTRY;
3821
3822         if (!try_module_get(THIS_MODULE)) {
3823                 CERROR("Can't get module. Is it alive?");
3824                 return -EINVAL;
3825         }
3826         switch (cmd) {
3827         case OBD_IOC_LOV_GET_CONFIG: {
3828                 char *buf;
3829                 struct lov_desc *desc;
3830                 struct obd_uuid uuid;
3831
3832                 buf = NULL;
3833                 len = 0;
3834                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3835                         GOTO(out, err = -EINVAL);
3836
3837                 data = (struct obd_ioctl_data *)buf;
3838
3839                 if (sizeof(*desc) > data->ioc_inllen1) {
3840                         obd_ioctl_freedata(buf, len);
3841                         GOTO(out, err = -EINVAL);
3842                 }
3843
3844                 if (data->ioc_inllen2 < sizeof(uuid)) {
3845                         obd_ioctl_freedata(buf, len);
3846                         GOTO(out, err = -EINVAL);
3847                 }
3848
3849                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3850                 desc->ld_tgt_count = 1;
3851                 desc->ld_active_tgt_count = 1;
3852                 desc->ld_default_stripe_count = 1;
3853                 desc->ld_default_stripe_size = 0;
3854                 desc->ld_default_stripe_offset = 0;
3855                 desc->ld_pattern = 0;
3856                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3857
3858                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3859
3860                 err = copy_to_user((void *)uarg, buf, len);
3861                 if (err)
3862                         err = -EFAULT;
3863                 obd_ioctl_freedata(buf, len);
3864                 GOTO(out, err);
3865         }
3866         case LL_IOC_LOV_SETSTRIPE:
3867                 err = obd_alloc_memmd(exp, karg);
3868                 if (err > 0)
3869                         err = 0;
3870                 GOTO(out, err);
3871         case LL_IOC_LOV_GETSTRIPE:
3872                 err = osc_getstripe(karg, uarg);
3873                 GOTO(out, err);
3874         case OBD_IOC_CLIENT_RECOVER:
3875                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3876                                             data->ioc_inlbuf1);
3877                 if (err > 0)
3878                         err = 0;
3879                 GOTO(out, err);
3880         case IOC_OSC_SET_ACTIVE:
3881                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3882                                                data->ioc_offset);
3883                 GOTO(out, err);
3884         case OBD_IOC_POLL_QUOTACHECK:
3885                 err = lquota_poll_check(quota_interface, exp,
3886                                         (struct if_quotacheck *)karg);
3887                 GOTO(out, err);
3888         case OBD_IOC_DESTROY: {
3889                 struct obdo            *oa;
3890
3891                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3892                         GOTO (out, err = -EPERM);
3893                 oa = &data->ioc_obdo1;
3894
3895                 if (oa->o_id == 0)
3896                         GOTO(out, err = -EINVAL);
3897
3898                 oa->o_valid |= OBD_MD_FLGROUP;
3899
3900                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3901                 GOTO(out, err);
3902         }
3903         case OBD_IOC_PING_TARGET:
3904                 err = ptlrpc_obd_ping(obd);
3905                 GOTO(out, err);
3906         default:
3907                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3908                        cmd, cfs_curproc_comm());
3909                 GOTO(out, err = -ENOTTY);
3910         }
3911 out:
3912         module_put(THIS_MODULE);
3913         return err;
3914 }
3915
3916 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3917                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3918 {
3919         ENTRY;
3920         if (!vallen || !val)
3921                 RETURN(-EFAULT);
3922
3923         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3924                 __u32 *stripe = val;
3925                 *vallen = sizeof(*stripe);
3926                 *stripe = 0;
3927                 RETURN(0);
3928         } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3929                 struct client_obd *cli = &exp->exp_obd->u.cli;
3930                 __u64 *rpcsize = val;
3931                 LASSERT(*vallen == sizeof(__u64));
3932                 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3933                 RETURN(0);
3934         } else if (KEY_IS(KEY_LAST_ID)) {
3935                 struct ptlrpc_request *req;
3936                 obd_id *reply;
3937                 char *bufs[2] = { NULL, key };
3938                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3939                 int rc;
3940
3941                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3942                                       OST_GET_INFO, 2, size, bufs);
3943                 if (req == NULL)
3944                         RETURN(-ENOMEM);
3945
3946                 size[REPLY_REC_OFF] = *vallen;
3947                 ptlrpc_req_set_repsize(req, 2, size);
3948                 rc = ptlrpc_queue_wait(req);
3949                 if (rc)
3950                         GOTO(out, rc);
3951
3952                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3953                                            lustre_swab_ost_last_id);
3954                 if (reply == NULL) {
3955                         CERROR("Can't unpack OST last ID\n");
3956                         GOTO(out, rc = -EPROTO);
3957                 }
3958                 *((obd_id *)val) = *reply;
3959         out:
3960                 ptlrpc_req_finished(req);
3961                 RETURN(rc);
3962         } else if (KEY_IS(KEY_FIEMAP)) {
3963                 struct ptlrpc_request *req;
3964                 struct ll_user_fiemap *reply;
3965                 char *bufs[2] = { NULL, key };
3966                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3967                 int rc;
3968
3969                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3970                                       OST_GET_INFO, 2, size, bufs);
3971                 if (req == NULL)
3972                         RETURN(-ENOMEM);
3973
3974                 size[REPLY_REC_OFF] = *vallen;
3975                 ptlrpc_req_set_repsize(req, 2, size);
3976
3977                 rc = ptlrpc_queue_wait(req);
3978                 if (rc)
3979                         GOTO(out1, rc);
3980                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3981                                            lustre_swab_fiemap);
3982                 if (reply == NULL) {
3983                         CERROR("Can't unpack FIEMAP reply.\n");
3984                         GOTO(out1, rc = -EPROTO);
3985                 }
3986
3987                 memcpy(val, reply, *vallen);
3988
3989         out1:
3990                 ptlrpc_req_finished(req);
3991
3992                 RETURN(rc);
3993         }
3994
3995         RETURN(-EINVAL);
3996 }
3997
3998 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3999                                           void *aa, int rc)
4000 {
4001         struct llog_ctxt *ctxt;
4002         struct obd_import *imp = req->rq_import;
4003         ENTRY;
4004
4005         if (rc != 0)
4006                 RETURN(rc);
4007
4008         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4009         if (ctxt) {
4010                 if (rc == 0)
4011                         rc = llog_initiator_connect(ctxt);
4012                 else
4013                         CERROR("cannot establish connection for "
4014                                "ctxt %p: %d\n", ctxt, rc);
4015         }
4016
4017         llog_ctxt_put(ctxt);
4018         spin_lock(&imp->imp_lock);
4019         imp->imp_server_timeout = 1;
4020         imp->imp_pingable = 1;
4021         spin_unlock(&imp->imp_lock);
4022         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4023
4024         RETURN(rc);
4025 }
4026
4027 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4028                               void *key, obd_count vallen, void *val,
4029                               struct ptlrpc_request_set *set)
4030 {
4031         struct ptlrpc_request *req;
4032         struct obd_device  *obd = exp->exp_obd;
4033         struct obd_import *imp = class_exp2cliimp(exp);
4034         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4035         char *bufs[3] = { NULL, key, val };
4036         ENTRY;
4037
4038         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4039
4040         if (KEY_IS(KEY_NEXT_ID)) {
4041                 obd_id new_val;
4042                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4043
4044                 if (vallen != sizeof(obd_id))
4045                         RETURN(-EINVAL);
4046
4047                 /* avoid race between allocate new object and set next id
4048                  * from ll_sync thread */
4049                 spin_lock(&oscc->oscc_lock);
4050                 new_val = *((obd_id*)val) + 1;
4051                 if (new_val > oscc->oscc_next_id)
4052                         oscc->oscc_next_id = new_val;
4053                 spin_unlock(&oscc->oscc_lock);
4054
4055                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4056                        exp->exp_obd->obd_name,
4057                        oscc->oscc_next_id);
4058
4059                 RETURN(0);
4060         }
4061
4062         if (KEY_IS(KEY_INIT_RECOV)) {
4063                 if (vallen != sizeof(int))
4064                         RETURN(-EINVAL);
4065                 spin_lock(&imp->imp_lock);
4066                 imp->imp_initial_recov = *(int *)val;
4067                 spin_unlock(&imp->imp_lock);
4068                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4069                        exp->exp_obd->obd_name,
4070                        imp->imp_initial_recov);
4071                 RETURN(0);
4072         }
4073
4074         if (KEY_IS(KEY_CHECKSUM)) {
4075                 if (vallen != sizeof(int))
4076                         RETURN(-EINVAL);
4077                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4078                 RETURN(0);
4079         }
4080
4081         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4082                 RETURN(-EINVAL);
4083
4084         /* We pass all other commands directly to OST. Since nobody calls osc
4085            methods directly and everybody is supposed to go through LOV, we
4086            assume lov checked invalid values for us.
4087            The only recognised values so far are evict_by_nid and mds_conn.
4088            Even if something bad goes through, we'd get a -EINVAL from OST
4089            anyway. */
4090
4091         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4092                               bufs);
4093         if (req == NULL)
4094                 RETURN(-ENOMEM);
4095
4096         if (KEY_IS(KEY_MDS_CONN))
4097                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4098         else if (KEY_IS(KEY_GRANT_SHRINK))
4099                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4100
4101         if (KEY_IS(KEY_GRANT_SHRINK)) {
4102                 struct osc_grant_args *aa;
4103                 struct obdo *oa;
4104
4105                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4106                 aa = ptlrpc_req_async_args(req);
4107                 OBD_ALLOC_PTR(oa);
4108                 if (!oa) {
4109                         ptlrpc_req_finished(req);
4110                         RETURN(-ENOMEM);
4111                 }
4112                 *oa = ((struct ost_body *)val)->oa;
4113                 aa->aa_oa = oa;
4114
4115                 size[1] = vallen;
4116                 ptlrpc_req_set_repsize(req, 2, size);
4117                 ptlrpcd_add_req(req);
4118         } else {
4119                 ptlrpc_req_set_repsize(req, 1, NULL);
4120                 ptlrpc_set_add_req(set, req);
4121                 ptlrpc_check_set(set);
4122         }
4123
4124         RETURN(0);
4125 }
4126
4127
4128 static struct llog_operations osc_size_repl_logops = {
4129         lop_cancel: llog_obd_repl_cancel
4130 };
4131
4132 static struct llog_operations osc_mds_ost_orig_logops;
4133 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4134                          int *index)
4135 {
4136         struct llog_catid catid;
4137         static char name[32] = CATLIST;
4138         int rc;
4139         ENTRY;
4140
4141         LASSERT(index);
4142
4143         mutex_down(&disk_obd->obd_llog_cat_process);
4144
4145         rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4146         if (rc) {
4147                 CERROR("rc: %d\n", rc);
4148                 GOTO(out_unlock, rc);
4149         }
4150 #if 0
4151         CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4152                obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4153                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4154 #endif
4155
4156         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4157                         &catid.lci_logid, &osc_mds_ost_orig_logops);
4158         if (rc) {
4159                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4160                 GOTO (out, rc);
4161         }
4162
4163         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4164                         &osc_size_repl_logops);
4165         if (rc) {
4166                 struct llog_ctxt *ctxt =
4167                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4168                 if (ctxt)
4169                         llog_cleanup(ctxt);
4170                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4171         }
4172 out:
4173         if (rc) {
4174                 CERROR("osc '%s' tgt '%s' rc=%d\n",
4175                        obd->obd_name, disk_obd->obd_name, rc);
4176                 CERROR("logid "LPX64":0x%x\n",
4177                        catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4178         } else {
4179                 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4180                                        &catid);
4181                 if (rc)
4182                         CERROR("rc: %d\n", rc);
4183         }
4184 out_unlock:
4185         mutex_up(&disk_obd->obd_llog_cat_process);
4186
4187         RETURN(rc);
4188 }
4189
4190 static int osc_llog_finish(struct obd_device *obd, int count)
4191 {
4192         struct llog_ctxt *ctxt;
4193         int rc = 0, rc2 = 0;
4194         ENTRY;
4195
4196         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4197         if (ctxt)
4198                 rc = llog_cleanup(ctxt);
4199
4200         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4201         if (ctxt)
4202                 rc2 = llog_cleanup(ctxt);
4203         if (!rc)
4204                 rc = rc2;
4205
4206         RETURN(rc);
4207 }
4208
4209 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4210                          struct obd_uuid *cluuid,
4211                          struct obd_connect_data *data,
4212                          void *localdata)
4213 {
4214         struct client_obd *cli = &obd->u.cli;
4215
4216         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4217                 long lost_grant;
4218
4219                 client_obd_list_lock(&cli->cl_loi_list_lock);
4220                 data->ocd_grant = cli->cl_avail_grant + cli->cl_dirty ?:
4221                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4222                 lost_grant = cli->cl_lost_grant;
4223                 cli->cl_lost_grant = 0;
4224                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4225
4226                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4227                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4228                        cli->cl_dirty, cli->cl_avail_grant, lost_grant);
4229                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4230                        " ocd_grant: %d\n", data->ocd_connect_flags,
4231                        data->ocd_version, data->ocd_grant);
4232         }
4233
4234         RETURN(0);
4235 }
4236
4237 static int osc_disconnect(struct obd_export *exp)
4238 {
4239         struct obd_device *obd = class_exp2obd(exp);
4240         struct llog_ctxt  *ctxt;
4241         int rc;
4242
4243         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4244         if (ctxt) {
4245                 if (obd->u.cli.cl_conn_count == 1) {
4246                         /* Flush any remaining cancel messages out to the
4247                          * target */
4248                         llog_sync(ctxt, exp);
4249                 }
4250                 llog_ctxt_put(ctxt);
4251         } else {
4252                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4253                        obd);
4254         }
4255
4256         rc = client_disconnect_export(exp);
4257         /**
4258          * Initially we put del_shrink_grant before disconnect_export, but it
4259          * causes the following problem if setup (connect) and cleanup
4260          * (disconnect) are tangled together.
4261          *      connect p1                     disconnect p2
4262          *   ptlrpc_connect_import
4263          *     ...............               class_manual_cleanup
4264          *                                     osc_disconnect
4265          *                                     del_shrink_grant
4266          *   ptlrpc_connect_interrupt
4267          *     init_grant_shrink
4268          *   add this client to shrink list
4269          *                                      cleanup_osc
4270          * Bang! pinger trigger the shrink.
4271          * So the osc should be disconnected from the shrink list, after we
4272          * are sure the import has been destroyed. BUG18662
4273          */
4274         if (obd->u.cli.cl_import == NULL)
4275                 osc_del_shrink_grant(&obd->u.cli);
4276         return rc;
4277 }
4278
4279 static int osc_import_event(struct obd_device *obd,
4280                             struct obd_import *imp,
4281                             enum obd_import_event event)
4282 {
4283         struct client_obd *cli;
4284         int rc = 0;
4285
4286         ENTRY;
4287         LASSERT(imp->imp_obd == obd);
4288
4289         switch (event) {
4290         case IMP_EVENT_DISCON: {
4291                 /* Only do this on the MDS OSC's */
4292                 if (imp->imp_server_timeout) {
4293                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4294
4295                         spin_lock(&oscc->oscc_lock);
4296                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4297                         spin_unlock(&oscc->oscc_lock);
4298                 }
4299                 cli = &obd->u.cli;
4300                 client_obd_list_lock(&cli->cl_loi_list_lock);
4301                 cli->cl_avail_grant = 0;
4302                 cli->cl_lost_grant = 0;
4303                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4304                 ptlrpc_import_setasync(imp, -1);
4305
4306                 break;
4307         }
4308         case IMP_EVENT_INACTIVE: {
4309                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4310                 break;
4311         }
4312         case IMP_EVENT_INVALIDATE: {
4313                 struct ldlm_namespace *ns = obd->obd_namespace;
4314
4315                 /* Reset grants */
4316                 cli = &obd->u.cli;
4317                 client_obd_list_lock(&cli->cl_loi_list_lock);
4318                 /* all pages go to failing rpcs due to the invalid import */
4319                 osc_check_rpcs(cli);
4320                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4321
4322                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4323
4324                 break;
4325         }
4326         case IMP_EVENT_ACTIVE: {
4327                 /* Only do this on the MDS OSC's */
4328                 if (imp->imp_server_timeout) {
4329                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4330
4331                         spin_lock(&oscc->oscc_lock);
4332                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4333                         spin_unlock(&oscc->oscc_lock);
4334                 }
4335                 CDEBUG(D_INFO, "notify server \n");
4336                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4337                 break;
4338         }
4339         case IMP_EVENT_OCD: {
4340                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4341
4342                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4343                         osc_init_grant(&obd->u.cli, ocd);
4344
4345                 /* See bug 7198 */
4346                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4347                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4348
4349                 ptlrpc_import_setasync(imp, 1);
4350                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4351                 break;
4352         }
4353         default:
4354                 CERROR("Unknown import event %d\n", event);
4355                 LBUG();
4356         }
4357         RETURN(rc);
4358 }
4359
4360 /* determine whether the lock can be canceled before replaying the lock
4361  * during recovery, see bug16774 for detailed information 
4362  *
4363  * return values:
4364  *  zero  - the lock can't be canceled
4365  *  other - ok to cancel
4366  */
4367 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4368 {
4369         check_res_locked(lock->l_resource);
4370         if (lock->l_granted_mode == LCK_GROUP || 
4371             lock->l_resource->lr_type != LDLM_EXTENT)
4372                 RETURN(0);
4373
4374         /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4375         if (lock->l_granted_mode == LCK_PR ||
4376             lock->l_granted_mode == LCK_CR)
4377                 RETURN(1);
4378
4379         RETURN(0);       
4380 }
4381
4382 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4383 {
4384         int rc;
4385         ENTRY;
4386
4387         ENTRY;
4388         rc = ptlrpcd_addref();
4389         if (rc)
4390                 RETURN(rc);
4391
4392         rc = client_obd_setup(obd, len, buf);
4393         if (rc) {
4394                 ptlrpcd_decref();
4395         } else {
4396                 struct lprocfs_static_vars lvars = { 0 };
4397                 struct client_obd *cli = &obd->u.cli;
4398
4399                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4400                 lprocfs_osc_init_vars(&lvars);
4401                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4402                         lproc_osc_attach_seqstat(obd);
4403                         ptlrpc_lprocfs_register_obd(obd);
4404                 }
4405
4406                 oscc_init(obd);
4407                 /* We need to allocate a few requests more, because
4408                    brw_interpret tries to create new requests before freeing
4409                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4410                    reserved, but I afraid that might be too much wasted RAM
4411                    in fact, so 2 is just my guess and still should work. */
4412                 cli->cl_import->imp_rq_pool =
4413                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4414                                             OST_MAXREQSIZE,
4415                                             ptlrpc_add_rqs_to_pool);
4416                 cli->cl_cache = cache_create(obd);
4417                 if (!cli->cl_cache) {
4418                         osc_cleanup(obd);
4419                         rc = -ENOMEM;
4420                 }
4421                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4422                 sema_init(&cli->cl_grant_sem, 1);
4423
4424                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4425         }
4426
4427         RETURN(rc);
4428 }
4429
4430 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4431 {
4432         int rc = 0;
4433         ENTRY;
4434
4435         switch (stage) {
4436         case OBD_CLEANUP_EARLY: {
4437                 struct obd_import *imp;
4438                 imp = obd->u.cli.cl_import;
4439                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4440                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4441                 ptlrpc_deactivate_import(imp);
4442                 break;
4443         }
4444         case OBD_CLEANUP_EXPORTS: {
4445                 /* If we set up but never connected, the
4446                    client import will not have been cleaned. */
4447                 down_write(&obd->u.cli.cl_sem);
4448                 if (obd->u.cli.cl_import) {
4449                         struct obd_import *imp;
4450                         imp = obd->u.cli.cl_import;
4451                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4452                                obd->obd_name);
4453                         ptlrpc_invalidate_import(imp);
4454                         if (imp->imp_rq_pool) {
4455                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4456                                 imp->imp_rq_pool = NULL;
4457                         }
4458                         class_destroy_import(imp);
4459                         obd->u.cli.cl_import = NULL;
4460                 }
4461                 up_write(&obd->u.cli.cl_sem);
4462
4463                 rc = obd_llog_finish(obd, 0);
4464                 if (rc != 0)
4465                         CERROR("failed to cleanup llogging subsystems\n");
4466                 break;
4467         }
4468         case OBD_CLEANUP_SELF_EXP:
4469                 break;
4470         case OBD_CLEANUP_OBD:
4471                 break;
4472         }
4473         RETURN(rc);
4474 }
4475
4476 int osc_cleanup(struct obd_device *obd)
4477 {
4478         int rc;
4479
4480         ENTRY;
4481         ptlrpc_lprocfs_unregister_obd(obd);
4482         lprocfs_obd_cleanup(obd);
4483
4484         /* free memory of osc quota cache */
4485         lquota_cleanup(quota_interface, obd);
4486
4487         cache_destroy(obd->u.cli.cl_cache);
4488         rc = client_obd_cleanup(obd);
4489
4490         ptlrpcd_decref();
4491         RETURN(rc);
4492 }
4493
4494 static int osc_register_page_removal_cb(struct obd_device *obd,
4495                                         obd_page_removal_cb_t func,
4496                                         obd_pin_extent_cb pin_cb)
4497 {
4498         ENTRY;
4499
4500         /* this server - not need init */
4501         if (func == NULL)
4502                 return 0;
4503
4504         return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4505                                            pin_cb);
4506 }
4507
4508 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4509                                           obd_page_removal_cb_t func)
4510 {
4511         ENTRY;
4512         return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4513 }
4514
4515 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4516                                        obd_lock_cancel_cb cb)
4517 {
4518         ENTRY;
4519         LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4520
4521         /* this server - not need init */
4522         if (cb == NULL)
4523                 return 0;
4524
4525         obd->u.cli.cl_ext_lock_cancel_cb = cb;
4526         return 0;
4527 }
4528
4529 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4530                                          obd_lock_cancel_cb cb)
4531 {
4532         ENTRY;
4533
4534         if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4535                 CERROR("Unregistering cancel cb %p, while only %p was "
4536                        "registered\n", cb,
4537                        obd->u.cli.cl_ext_lock_cancel_cb);
4538                 RETURN(-EINVAL);
4539         }
4540
4541         obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4542         return 0;
4543 }
4544
4545 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4546 {
4547         struct lustre_cfg *lcfg = buf;
4548         struct lprocfs_static_vars lvars = { 0 };
4549         int rc = 0;
4550
4551         lprocfs_osc_init_vars(&lvars);
4552
4553         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4554         return(rc);
4555 }
4556
4557 struct obd_ops osc_obd_ops = {
4558         .o_owner                = THIS_MODULE,
4559         .o_setup                = osc_setup,
4560         .o_precleanup           = osc_precleanup,
4561         .o_cleanup              = osc_cleanup,
4562         .o_add_conn             = client_import_add_conn,
4563         .o_del_conn             = client_import_del_conn,
4564         .o_connect              = client_connect_import,
4565         .o_reconnect            = osc_reconnect,
4566         .o_disconnect           = osc_disconnect,
4567         .o_statfs               = osc_statfs,
4568         .o_statfs_async         = osc_statfs_async,
4569         .o_packmd               = osc_packmd,
4570         .o_unpackmd             = osc_unpackmd,
4571         .o_precreate            = osc_precreate,
4572         .o_create               = osc_create,
4573         .o_create_async         = osc_create_async,
4574         .o_destroy              = osc_destroy,
4575         .o_getattr              = osc_getattr,
4576         .o_getattr_async        = osc_getattr_async,
4577         .o_setattr              = osc_setattr,
4578         .o_setattr_async        = osc_setattr_async,
4579         .o_brw                  = osc_brw,
4580         .o_brw_async            = osc_brw_async,
4581         .o_prep_async_page      = osc_prep_async_page,
4582         .o_get_lock             = osc_get_lock,
4583         .o_queue_async_io       = osc_queue_async_io,
4584         .o_set_async_flags      = osc_set_async_flags,
4585         .o_queue_group_io       = osc_queue_group_io,
4586         .o_trigger_group_io     = osc_trigger_group_io,
4587         .o_teardown_async_page  = osc_teardown_async_page,
4588         .o_punch                = osc_punch,
4589         .o_sync                 = osc_sync,
4590         .o_enqueue              = osc_enqueue,
4591         .o_match                = osc_match,
4592         .o_change_cbdata        = osc_change_cbdata,
4593         .o_cancel               = osc_cancel,
4594         .o_cancel_unused        = osc_cancel_unused,
4595         .o_join_lru             = osc_join_lru,
4596         .o_iocontrol            = osc_iocontrol,
4597         .o_get_info             = osc_get_info,
4598         .o_set_info_async       = osc_set_info_async,
4599         .o_import_event         = osc_import_event,
4600         .o_llog_init            = osc_llog_init,
4601         .o_llog_finish          = osc_llog_finish,
4602         .o_process_config       = osc_process_config,
4603         .o_register_page_removal_cb = osc_register_page_removal_cb,
4604         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4605         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4606         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4607 };
4608 int __init osc_init(void)
4609 {
4610         struct lprocfs_static_vars lvars = { 0 };
4611         int rc;
4612         ENTRY;
4613
4614         lprocfs_osc_init_vars(&lvars);
4615
4616         request_module("lquota");
4617         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4618         lquota_init(quota_interface);
4619         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4620
4621         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4622                                  LUSTRE_OSC_NAME);
4623         if (rc) {
4624                 if (quota_interface)
4625                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4626                 RETURN(rc);
4627         }
4628
4629         osc_mds_ost_orig_logops = llog_lvfs_ops;
4630         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4631         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4632         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4633         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4634
4635         RETURN(rc);
4636 }
4637
4638 #ifdef __KERNEL__
4639 static void /*__exit*/ osc_exit(void)
4640 {
4641         lquota_exit(quota_interface);
4642         if (quota_interface)
4643                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4644
4645         class_unregister_type(LUSTRE_OSC_NAME);
4646 }
4647
4648 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4649 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4650 MODULE_LICENSE("GPL");
4651
4652 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4653 #endif