Whamcloud - gitweb
b=20433 decrease the usage of memory on clients.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  void *data, int rc)
168 {
169         struct ost_body *body;
170         struct osc_async_args *aa = data;
171         ENTRY;
172
173         if (rc != 0)
174                 GOTO(out, rc);
175
176         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177                                   lustre_swab_ost_body);
178         if (body) {
179                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
181
182                 /* This should really be sent by the OST */
183                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
185         } else {
186                 CERROR("can't unpack ost_body\n");
187                 rc = -EPROTO;
188                 aa->aa_oi->oi_oa->o_valid = 0;
189         }
190 out:
191         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
192         RETURN(rc);
193 }
194
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196                              struct ptlrpc_request_set *set)
197 {
198         struct ptlrpc_request *req;
199         struct ost_body *body;
200         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201         struct osc_async_args *aa;
202         ENTRY;
203
204         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205                               OST_GETATTR, 2, size,NULL);
206         if (!req)
207                 RETURN(-ENOMEM);
208
209         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
211
212         ptlrpc_req_set_repsize(req, 2, size);
213         req->rq_interpret_reply = osc_getattr_interpret;
214
215         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216         aa = ptlrpc_req_async_args(req);
217         aa->aa_oi = oinfo;
218
219         ptlrpc_set_add_req(set, req);
220         RETURN (0);
221 }
222
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
224 {
225         struct ptlrpc_request *req;
226         struct ost_body *body;
227         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
228         int rc;
229         ENTRY;
230
231         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232                               OST_GETATTR, 2, size, NULL);
233         if (!req)
234                 RETURN(-ENOMEM);
235
236         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
238
239         ptlrpc_req_set_repsize(req, 2, size);
240
241         rc = ptlrpc_queue_wait(req);
242         if (rc) {
243                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
244                 GOTO(out, rc);
245         }
246
247         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248                                   lustre_swab_ost_body);
249         if (body == NULL) {
250                 CERROR ("can't unpack ost_body\n");
251                 GOTO (out, rc = -EPROTO);
252         }
253
254         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
256
257         /* This should really be sent by the OST */
258         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
260
261         EXIT;
262  out:
263         ptlrpc_req_finished(req);
264         return rc;
265 }
266
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268                        struct obd_trans_info *oti)
269 {
270         struct ptlrpc_request *req;
271         struct ost_body *body;
272         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
273         int rc;
274         ENTRY;
275
276         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277                               OST_SETATTR, 2, size, NULL);
278         if (!req)
279                 RETURN(-ENOMEM);
280
281         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
283
284         ptlrpc_req_set_repsize(req, 2, size);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291                                   lustre_swab_ost_body);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         EXIT;
298 out:
299         ptlrpc_req_finished(req);
300         RETURN(rc);
301 }
302
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
304                                  void *data, int rc)
305 {
306         struct ost_body *body;
307         struct osc_async_args *aa = data;
308         ENTRY;
309
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314                                   lustre_swab_ost_body);
315         if (body == NULL) {
316                 CERROR("can't unpack ost_body\n");
317                 GOTO(out, rc = -EPROTO);
318         }
319
320         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
321 out:
322         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
323         RETURN(rc);
324 }
325
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327                              struct obd_trans_info *oti,
328                              struct ptlrpc_request_set *rqset)
329 {
330         struct ptlrpc_request *req;
331         struct ost_body *body;
332         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
333         int bufcount = 2;
334         struct osc_async_args *aa;
335         ENTRY;
336
337         if (osc_exp_is_2_0_server(exp)) {
338                 bufcount = 3;
339         }
340
341         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342                               OST_SETATTR, bufcount, size, NULL);
343         if (!req)
344                 RETURN(-ENOMEM);
345
346         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
347
348         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
349                 LASSERT(oti);
350                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
351         }
352
353         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354         ptlrpc_req_set_repsize(req, 2, size);
355         /* do mds to ost setattr asynchronouly */
356         if (!rqset) {
357                 /* Do not wait for response. */
358                 ptlrpcd_add_req(req);
359         } else {
360                 req->rq_interpret_reply = osc_setattr_interpret;
361
362                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363                 aa = ptlrpc_req_async_args(req);
364                 aa->aa_oi = oinfo;
365
366                 ptlrpc_set_add_req(rqset, req);
367         }
368
369         RETURN(0);
370 }
371
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
374 {
375         struct ptlrpc_request *req;
376         struct ost_body *body;
377         struct lov_stripe_md *lsm;
378         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
379         int rc;
380         ENTRY;
381
382         LASSERT(oa);
383         LASSERT(ea);
384
385         lsm = *ea;
386         if (!lsm) {
387                 rc = obd_alloc_memmd(exp, &lsm);
388                 if (rc < 0)
389                         RETURN(rc);
390         }
391
392         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393                               OST_CREATE, 2, size, NULL);
394         if (!req)
395                 GOTO(out, rc = -ENOMEM);
396
397         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398         lustre_set_wire_obdo(&body->oa, oa);
399
400         ptlrpc_req_set_repsize(req, 2, size);
401         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402             oa->o_flags == OBD_FL_DELORPHAN) {
403                 DEBUG_REQ(D_HA, req,
404                           "delorphan from OST integration");
405                 /* Don't resend the delorphan req */
406                 req->rq_no_resend = req->rq_no_delay = 1;
407         }
408
409         rc = ptlrpc_queue_wait(req);
410         if (rc)
411                 GOTO(out_req, rc);
412
413         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414                                   lustre_swab_ost_body);
415         if (body == NULL) {
416                 CERROR ("can't unpack ost_body\n");
417                 GOTO (out_req, rc = -EPROTO);
418         }
419
420         lustre_get_wire_obdo(oa, &body->oa);
421
422         /* This should really be sent by the OST */
423         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424         oa->o_valid |= OBD_MD_FLBLKSZ;
425
426         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427          * have valid lsm_oinfo data structs, so don't go touching that.
428          * This needs to be fixed in a big way.
429          */
430         lsm->lsm_object_id = oa->o_id;
431         *ea = lsm;
432
433         if (oti != NULL) {
434                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
435
436                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437                         if (!oti->oti_logcookies)
438                                 oti_alloc_cookies(oti, 1);
439                         *oti->oti_logcookies = oa->o_lcookie;
440                 }
441         }
442
443         CDEBUG(D_HA, "transno: "LPD64"\n",
444                lustre_msg_get_transno(req->rq_repmsg));
445 out_req:
446         ptlrpc_req_finished(req);
447 out:
448         if (rc && !*ea)
449                 obd_free_memmd(exp, &lsm);
450         RETURN(rc);
451 }
452
453 static int osc_punch_interpret(struct ptlrpc_request *req,
454                                void *data, int rc)
455 {
456         struct ost_body *body;
457         struct osc_async_args *aa = data;
458         ENTRY;
459
460         if (rc != 0)
461                 GOTO(out, rc);
462
463         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464                                   lustre_swab_ost_body);
465         if (body == NULL) {
466                 CERROR ("can't unpack ost_body\n");
467                 GOTO(out, rc = -EPROTO);
468         }
469
470         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
471 out:
472         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
473         RETURN(rc);
474 }
475
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477                      struct obd_trans_info *oti,
478                      struct ptlrpc_request_set *rqset)
479 {
480         struct ptlrpc_request *req;
481         struct osc_async_args *aa;
482         struct ost_body *body;
483         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
484         ENTRY;
485
486         if (!oinfo->oi_oa) {
487                 CERROR("oa NULL\n");
488                 RETURN(-EINVAL);
489         }
490
491         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492                               OST_PUNCH, 2, size, NULL);
493         if (!req)
494                 RETURN(-ENOMEM);
495
496         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
497         ptlrpc_at_set_req_timeout(req);
498
499         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
501
502         /* overload the size and blocks fields in the oa with start/end */
503         body->oa.o_size = oinfo->oi_policy.l_extent.start;
504         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
506
507         ptlrpc_req_set_repsize(req, 2, size);
508
509         req->rq_interpret_reply = osc_punch_interpret;
510         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511         aa = ptlrpc_req_async_args(req);
512         aa->aa_oi = oinfo;
513         ptlrpc_set_add_req(rqset, req);
514
515         RETURN(0);
516 }
517
518 static int osc_sync_interpret(struct ptlrpc_request *req,
519                               void *data, int rc)
520 {
521         struct ost_body *body;
522         struct osc_async_args *aa = data;
523         ENTRY;
524
525         if (rc)
526                 GOTO(out, rc);
527
528         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529                                   lustre_swab_ost_body);
530         if (body == NULL) {
531                 CERROR ("can't unpack ost_body\n");
532                 GOTO(out, rc = -EPROTO);
533         }
534
535         *aa->aa_oi->oi_oa = body->oa;
536 out:
537         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
538         RETURN(rc);
539 }
540
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542                     obd_size start, obd_size end,
543                     struct ptlrpc_request_set *set)
544 {
545         struct ptlrpc_request *req;
546         struct ost_body *body;
547         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548         struct osc_async_args *aa;
549         ENTRY;
550
551         if (!oinfo->oi_oa) {
552                 CERROR("oa NULL\n");
553                 RETURN(-EINVAL);
554         }
555
556         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557                               OST_SYNC, 2, size, NULL);
558         if (!req)
559                 RETURN(-ENOMEM);
560
561         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
563
564         /* overload the size and blocks fields in the oa with start/end */
565         body->oa.o_size = start;
566         body->oa.o_blocks = end;
567         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
568
569         ptlrpc_req_set_repsize(req, 2, size);
570         req->rq_interpret_reply = osc_sync_interpret;
571
572         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573         aa = ptlrpc_req_async_args(req);
574         aa->aa_oi = oinfo;
575
576         ptlrpc_set_add_req(set, req);
577         RETURN (0);
578 }
579
580 /* Find and cancel locally locks matched by @mode in the resource found by
581  * @objid. Found locks are added into @cancel list. Returns the amount of
582  * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584                                    struct list_head *cancels, ldlm_mode_t mode,
585                                    int lock_flags)
586 {
587         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588         struct ldlm_res_id res_id;
589         struct ldlm_resource *res;
590         int count;
591         ENTRY;
592
593         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
595         if (res == NULL)
596                 RETURN(0);
597
598         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599                                            lock_flags, 0, NULL);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
605                                  int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         cfs_waitq_signal(&cli->cl_destroy_waitq);
611         return 0;
612 }
613
614 static int osc_can_send_destroy(struct client_obd *cli)
615 {
616         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617             cli->cl_max_rpcs_in_flight) {
618                 /* The destroy request can be sent */
619                 return 1;
620         }
621         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622             cli->cl_max_rpcs_in_flight) {
623                 /*
624                  * The counter has been modified between the two atomic
625                  * operations.
626                  */
627                 cfs_waitq_signal(&cli->cl_destroy_waitq);
628         }
629         return 0;
630 }
631
632 /* Destroy requests can be async always on the client, and we don't even really
633  * care about the return code since the client cannot do anything at all about
634  * a destroy failure.
635  * When the MDS is unlinking a filename, it saves the file objects into a
636  * recovery llog, and these object records are cancelled when the OST reports
637  * they were destroyed and sync'd to disk (i.e. transaction committed).
638  * If the client dies, or the OST is down when the object should be destroyed,
639  * the records are not cancelled, and when the OST reconnects to the MDS next,
640  * it will retrieve the llog unlink logs and then sends the log cancellation
641  * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
644                        struct obd_export *md_export)
645 {
646         CFS_LIST_HEAD(cancels);
647         struct ptlrpc_request *req;
648         struct ost_body *body;
649         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650                         sizeof(struct ldlm_request) };
651         int count, bufcount = 2;
652         struct client_obd *cli = &exp->exp_obd->u.cli;
653         ENTRY;
654
655         if (!oa) {
656                 CERROR("oa NULL\n");
657                 RETURN(-EINVAL);
658         }
659
660         LASSERT(oa->o_id != 0);
661
662         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663                                         LDLM_FL_DISCARD_DATA);
664         if (exp_connect_cancelset(exp))
665                 bufcount = 3;
666         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
668         if (!req)
669                 RETURN(-ENOMEM);
670
671         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
672         ptlrpc_at_set_req_timeout(req);
673
674         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
675
676         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677                 oa->o_lcookie = *oti->oti_logcookies;
678         }
679
680         lustre_set_wire_obdo(&body->oa, oa);
681         ptlrpc_req_set_repsize(req, 2, size);
682
683         /* don't throttle destroy RPCs for the MDT */
684         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685                 req->rq_interpret_reply = osc_destroy_interpret;
686                 if (!osc_can_send_destroy(cli)) {
687                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
688                                                           NULL);
689
690                         /*
691                          * Wait until the number of on-going destroy RPCs drops
692                          * under max_rpc_in_flight
693                          */
694                         l_wait_event_exclusive(cli->cl_destroy_waitq,
695                                                osc_can_send_destroy(cli), &lwi);
696                 }
697         }
698
699         /* Do not wait for response */
700         ptlrpcd_add_req(req);
701         RETURN(0);
702 }
703
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
705                                 long writing_bytes)
706 {
707         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
708
709         LASSERT(!(oa->o_valid & bits));
710
711         oa->o_valid |= bits;
712         client_obd_list_lock(&cli->cl_loi_list_lock);
713         oa->o_dirty = cli->cl_dirty;
714         if (cli->cl_dirty > cli->cl_dirty_max) {
715                 CERROR("dirty %lu > dirty_max %lu\n",
716                        cli->cl_dirty, cli->cl_dirty_max);
717                 oa->o_undirty = 0;
718         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719                 /* The atomic_read() allowing the atomic_inc() are not covered
720                  * by a lock thus they may safely race and trip this CERROR()
721                  * unless we add in a small fudge factor (+1). */
722                 CERROR("dirty %d > system dirty_max %d\n",
723                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
724                 oa->o_undirty = 0;
725         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726                 CERROR("dirty %lu - dirty_max %lu too big???\n",
727                        cli->cl_dirty, cli->cl_dirty_max);
728                 oa->o_undirty = 0;
729         } else {
730                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731                                 (cli->cl_max_rpcs_in_flight + 1);
732                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
733         }
734         oa->o_grant = cli->cl_avail_grant;
735         oa->o_dropped = cli->cl_lost_grant;
736         cli->cl_lost_grant = 0;
737         client_obd_list_unlock(&cli->cl_loi_list_lock);
738         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
740
741 }
742
743 static void osc_update_next_shrink(struct client_obd *cli)
744 {
745         cli->cl_next_shrink_grant =
746                 cfs_time_shift(cli->cl_grant_shrink_interval);
747         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748                cli->cl_next_shrink_grant);
749 }
750
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
753 {
754         atomic_inc(&obd_dirty_pages);
755         cli->cl_dirty += CFS_PAGE_SIZE;
756         cli->cl_avail_grant -= CFS_PAGE_SIZE;
757         pga->flag |= OBD_BRW_FROM_GRANT;
758         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759                CFS_PAGE_SIZE, pga, pga->pg);
760         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761                  cli->cl_avail_grant);
762         osc_update_next_shrink(cli);
763 }
764
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766  * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768                                     struct brw_page *pga, int sent)
769 {
770         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
771         ENTRY;
772
773         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
774                 EXIT;
775                 return;
776         }
777
778         pga->flag &= ~OBD_BRW_FROM_GRANT;
779         atomic_dec(&obd_dirty_pages);
780         cli->cl_dirty -= CFS_PAGE_SIZE;
781         if (!sent) {
782                 cli->cl_lost_grant += CFS_PAGE_SIZE;
783                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786                 /* For short writes we shouldn't count parts of pages that
787                  * span a whole block on the OST side, or our accounting goes
788                  * wrong.  Should match the code in filter_grant_check. */
789                 int offset = pga->off & ~CFS_PAGE_MASK;
790                 int count = pga->count + (offset & (blocksize - 1));
791                 int end = (offset + pga->count) & (blocksize - 1);
792                 if (end)
793                         count += blocksize - end;
794
795                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798                        cli->cl_avail_grant, cli->cl_dirty);
799         }
800
801         EXIT;
802 }
803
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
805 {
806         return cli->cl_r_in_flight + cli->cl_w_in_flight;
807 }
808
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
811 {
812         struct list_head *l, *tmp;
813         struct osc_cache_waiter *ocw;
814
815         ENTRY;
816         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817                 /* if we can't dirty more, we must wait until some is written */
818                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821                                "osc max %ld, sys max %d\n", cli->cl_dirty,
822                                cli->cl_dirty_max, obd_max_dirty_pages);
823                         return;
824                 }
825
826                 /* if still dirty cache but no grant wait for pending RPCs that
827                  * may yet return us some grant before doing sync writes */
828                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830                                cli->cl_w_in_flight);
831                         return;
832                 }
833
834                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835                 list_del_init(&ocw->ocw_entry);
836                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837                         /* no more RPCs in flight to return grant, do sync IO */
838                         ocw->ocw_rc = -EDQUOT;
839                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
840                 } else {
841                         osc_consume_write_grant(cli,
842                                                 &ocw->ocw_oap->oap_brw_page);
843                 }
844
845                 cfs_waitq_signal(&ocw->ocw_waitq);
846         }
847
848         EXIT;
849 }
850
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
852 {
853         client_obd_list_lock(&cli->cl_loi_list_lock);
854         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855         if (body->oa.o_valid & OBD_MD_FLGRANT)
856                 cli->cl_avail_grant += body->oa.o_grant;
857         /* waiters are woken in brw_interpret */
858         client_obd_list_unlock(&cli->cl_loi_list_lock);
859 }
860
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862                               void *key, obd_count vallen, void *val,
863                               struct ptlrpc_request_set *set);
864
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
866                                       void *data, int rc)
867 {
868         struct osc_grant_args *aa = data;
869         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870         struct obdo *oa = aa->aa_oa;
871         struct ost_body *body;
872
873         if (rc != 0) {
874                 client_obd_list_lock(&cli->cl_loi_list_lock);
875                 cli->cl_avail_grant += oa->o_grant;
876                 client_obd_list_unlock(&cli->cl_loi_list_lock);
877                 GOTO(out, rc);
878         }
879         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880                                 lustre_swab_ost_body);
881         osc_update_grant(cli, body);
882 out:
883         OBD_FREE_PTR(oa);
884         return rc;
885 }
886
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
888 {
889         client_obd_list_lock(&cli->cl_loi_list_lock);
890         oa->o_grant = cli->cl_avail_grant / 4;
891         cli->cl_avail_grant -= oa->o_grant;
892         client_obd_list_unlock(&cli->cl_loi_list_lock);
893         oa->o_flags |= OBD_FL_SHRINK_GRANT;
894         osc_update_next_shrink(cli);
895 }
896
897 /* Shrink the current grant, either from some large amount to enough for a
898  * full set of in-flight RPCs, or if we have already shrunk to that limit
899  * then to enough for a single RPC.  This avoids keeping more grant than
900  * needed, and avoids shrinking the grant piecemeal. */
901 static int osc_shrink_grant(struct client_obd *cli)
902 {
903         long target = (cli->cl_max_rpcs_in_flight + 1) *
904                       cli->cl_max_pages_per_rpc;
905
906         client_obd_list_lock(&cli->cl_loi_list_lock);
907         if (cli->cl_avail_grant <= target)
908                 target = cli->cl_max_pages_per_rpc;
909         client_obd_list_unlock(&cli->cl_loi_list_lock);
910
911         return osc_shrink_grant_to_target(cli, target);
912 }
913
914 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
915 {
916         int    rc = 0;
917         struct ost_body     *body;
918         ENTRY;
919
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         /* Don't shrink if we are already above or below the desired limit
922          * We don't want to shrink below a single RPC, as that will negatively
923          * impact block allocation and long-term performance. */
924         if (target < cli->cl_max_pages_per_rpc)
925                 target = cli->cl_max_pages_per_rpc;
926
927         if (target >= cli->cl_avail_grant) {
928                 client_obd_list_unlock(&cli->cl_loi_list_lock);
929                 RETURN(0);
930         }
931         client_obd_list_unlock(&cli->cl_loi_list_lock);
932
933         OBD_ALLOC_PTR(body);
934         if (!body)
935                 RETURN(-ENOMEM);
936
937         osc_announce_cached(cli, &body->oa, 0);
938
939         client_obd_list_lock(&cli->cl_loi_list_lock);
940         body->oa.o_grant = cli->cl_avail_grant - target;
941         cli->cl_avail_grant = target;
942         client_obd_list_unlock(&cli->cl_loi_list_lock);
943         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
944         osc_update_next_shrink(cli);
945
946         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
947                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
948                                 sizeof(*body), body, NULL);
949         if (rc) {
950                 client_obd_list_lock(&cli->cl_loi_list_lock);
951                 cli->cl_avail_grant += body->oa.o_grant;
952                 client_obd_list_unlock(&cli->cl_loi_list_lock);
953         }
954         OBD_FREE_PTR(body);
955         RETURN(rc);
956 }
957
958 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
959 static int osc_should_shrink_grant(struct client_obd *client)
960 {
961         cfs_time_t time = cfs_time_current();
962         cfs_time_t next_shrink = client->cl_next_shrink_grant;
963         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
964                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
965                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
966                         return 1;
967                 else
968                         osc_update_next_shrink(client);
969         }
970         return 0;
971 }
972
973 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
974 {
975         struct client_obd *client;
976
977         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
978                 if (osc_should_shrink_grant(client))
979                         osc_shrink_grant(client);
980         }
981         return 0;
982 }
983
984 static int osc_add_shrink_grant(struct client_obd *client)
985 {
986         int rc;
987
988         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
989                                        TIMEOUT_GRANT,
990                                        osc_grant_shrink_grant_cb, NULL,
991                                        &client->cl_grant_shrink_list);
992         if (rc) {
993                 CERROR("add grant client %s error %d\n",
994                         client->cl_import->imp_obd->obd_name, rc);
995                 return rc;
996         }
997         CDEBUG(D_CACHE, "add grant client %s \n",
998                client->cl_import->imp_obd->obd_name);
999         osc_update_next_shrink(client);
1000         return 0;
1001 }
1002
1003 static int osc_del_shrink_grant(struct client_obd *client)
1004 {
1005         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1006                                          TIMEOUT_GRANT);
1007 }
1008
1009 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1010 {
1011         client_obd_list_lock(&cli->cl_loi_list_lock);
1012         cli->cl_avail_grant = ocd->ocd_grant;
1013         client_obd_list_unlock(&cli->cl_loi_list_lock);
1014
1015         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1016             list_empty(&cli->cl_grant_shrink_list))
1017                 osc_add_shrink_grant(cli);
1018
1019         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1020                cli->cl_avail_grant, cli->cl_lost_grant);
1021         LASSERT(cli->cl_avail_grant >= 0);
1022 }
1023
1024 /* We assume that the reason this OSC got a short read is because it read
1025  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1026  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1027  * this stripe never got written at or beyond this stripe offset yet. */
1028 static void handle_short_read(int nob_read, obd_count page_count,
1029                               struct brw_page **pga, int pshift)
1030 {
1031         char *ptr;
1032         int i = 0;
1033
1034         /* skip bytes read OK */
1035         while (nob_read > 0) {
1036                 LASSERT (page_count > 0);
1037
1038                 if (pga[i]->count > nob_read) {
1039                         /* EOF inside this page */
1040                         ptr = cfs_kmap(pga[i]->pg) +
1041                               (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1042                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1043                         cfs_kunmap(pga[i]->pg);
1044                         page_count--;
1045                         i++;
1046                         break;
1047                 }
1048
1049                 nob_read -= pga[i]->count;
1050                 page_count--;
1051                 i++;
1052         }
1053
1054         /* zero remaining pages */
1055         while (page_count-- > 0) {
1056                 ptr = cfs_kmap(pga[i]->pg) +
1057                       (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1058                 memset(ptr, 0, pga[i]->count);
1059                 cfs_kunmap(pga[i]->pg);
1060                 i++;
1061         }
1062 }
1063
1064 static int check_write_rcs(struct ptlrpc_request *req,
1065                            int requested_nob, int niocount,
1066                            obd_count page_count, struct brw_page **pga)
1067 {
1068         int    *remote_rcs, i;
1069
1070         /* return error if any niobuf was in error */
1071         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1072                                         sizeof(*remote_rcs) * niocount, NULL);
1073         if (remote_rcs == NULL) {
1074                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1075                 return(-EPROTO);
1076         }
1077         if (lustre_rep_need_swab(req))
1078                 for (i = 0; i < niocount; i++)
1079                         __swab32s(&remote_rcs[i]);
1080
1081         for (i = 0; i < niocount; i++) {
1082                 if (remote_rcs[i] < 0)
1083                         return(remote_rcs[i]);
1084
1085                 if (remote_rcs[i] != 0) {
1086                         CERROR("rc[%d] invalid (%d) req %p\n",
1087                                 i, remote_rcs[i], req);
1088                         return(-EPROTO);
1089                 }
1090         }
1091
1092         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1093                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1094                        req->rq_bulk->bd_nob_transferred, requested_nob);
1095                 return(-EPROTO);
1096         }
1097
1098         return (0);
1099 }
1100
1101 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1102 {
1103         if (p1->flag != p2->flag) {
1104                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1105
1106                 /* warn if we try to combine flags that we don't know to be
1107                  * safe to combine */
1108                 if ((p1->flag & mask) != (p2->flag & mask))
1109                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1110                                "same brw?\n", p1->flag, p2->flag);
1111                 return 0;
1112         }
1113
1114         return (p1->off + p1->count == p2->off);
1115 }
1116
1117 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1118                                    struct brw_page **pga, int opc,
1119                                    cksum_type_t cksum_type, int pshift)
1120 {
1121         __u32 cksum;
1122         int i = 0;
1123
1124         LASSERT (pg_count > 0);
1125         cksum = init_checksum(cksum_type);
1126         while (nob > 0 && pg_count > 0) {
1127                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1128                 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1129                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1130
1131                 /* corrupt the data before we compute the checksum, to
1132                  * simulate an OST->client data error */
1133                 if (i == 0 && opc == OST_READ &&
1134                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1135                         memcpy(ptr + off, "bad1", min(4, nob));
1136                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1137                 cfs_kunmap(pga[i]->pg);
1138                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1139                                off, cksum);
1140
1141                 nob -= pga[i]->count;
1142                 pg_count--;
1143                 i++;
1144         }
1145         /* For sending we only compute the wrong checksum instead
1146          * of corrupting the data so it is still correct on a redo */
1147         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1148                 cksum++;
1149
1150         return cksum;
1151 }
1152
1153 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1154                                 struct lov_stripe_md *lsm, obd_count page_count,
1155                                 struct brw_page **pga,
1156                                 struct ptlrpc_request **reqp, int pshift)
1157 {
1158         struct ptlrpc_request   *req;
1159         struct ptlrpc_bulk_desc *desc;
1160         struct ost_body         *body;
1161         struct obd_ioobj        *ioobj;
1162         struct niobuf_remote    *niobuf;
1163         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1164         int niocount, i, requested_nob, opc, rc;
1165         struct ptlrpc_request_pool *pool;
1166         struct osc_brw_async_args *aa;
1167         struct brw_page *pg_prev;
1168
1169         ENTRY;
1170         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1171         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1172
1173         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1174         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1175
1176         for (niocount = i = 1; i < page_count; i++) {
1177                 if (!can_merge_pages(pga[i - 1], pga[i]))
1178                         niocount++;
1179         }
1180
1181         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1182         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1183
1184         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1185                                    NULL, pool);
1186         if (req == NULL)
1187                 RETURN (-ENOMEM);
1188
1189         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1190         ptlrpc_at_set_req_timeout(req);
1191
1192         if (opc == OST_WRITE)
1193                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1194                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1195         else
1196                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1197                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1198         if (desc == NULL)
1199                 GOTO(out, rc = -ENOMEM);
1200         /* NB request now owns desc and will free it when it gets freed */
1201
1202         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1203         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1204         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1205                                 niocount * sizeof(*niobuf));
1206
1207         lustre_set_wire_obdo(&body->oa, oa);
1208         obdo_to_ioobj(oa, ioobj);
1209         ioobj->ioo_bufcnt = niocount;
1210
1211         LASSERT (page_count > 0);
1212         pg_prev = pga[0];
1213         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1214                 struct brw_page *pg = pga[i];
1215
1216                 LASSERT(pg->count > 0);
1217                 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1218                          pg->count <= CFS_PAGE_SIZE,
1219                          "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1220                          i, pg, pg->off, pg->count, pshift);
1221 #ifdef __linux__
1222                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1223                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1224                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1225                          i, page_count,
1226                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1227                          pg_prev->pg, page_private(pg_prev->pg),
1228                          pg_prev->pg->index, pg_prev->off);
1229 #else
1230                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1231                          "i %d p_c %u\n", i, page_count);
1232 #endif
1233                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1234                         (pg->flag & OBD_BRW_SRVLOCK));
1235
1236                 ptlrpc_prep_bulk_page(desc, pg->pg,
1237                                       OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1238                                       pg->count);
1239                 requested_nob += pg->count;
1240
1241                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1242                         niobuf--;
1243                         niobuf->len += pg->count;
1244                 } else {
1245                         niobuf->offset = pg->off;
1246                         niobuf->len    = pg->count;
1247                         niobuf->flags  = pg->flag;
1248                 }
1249                 pg_prev = pg;
1250         }
1251
1252         LASSERTF((void *)(niobuf - niocount) ==
1253                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1254                                niocount * sizeof(*niobuf)),
1255                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1256                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1257                 (void *)(niobuf - niocount));
1258
1259         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1260         if (osc_should_shrink_grant(cli))
1261                 osc_shrink_grant_local(cli, &body->oa);
1262
1263         /* size[REQ_REC_OFF] still sizeof (*body) */
1264         if (opc == OST_WRITE) {
1265                 if (cli->cl_checksum) {
1266                         /* store cl_cksum_type in a local variable since
1267                          * it can be changed via lprocfs */
1268                         cksum_type_t cksum_type = cli->cl_cksum_type;
1269
1270                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1271                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1272                                 body->oa.o_flags = 0;
1273                         }
1274                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1275                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1277                                                              page_count, pga,
1278                                                              OST_WRITE,
1279                                                              cksum_type, pshift);
1280                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1281                                body->oa.o_cksum);
1282                         /* save this in 'oa', too, for later checking */
1283                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1284                         oa->o_flags |= cksum_type_pack(cksum_type);
1285                 } else {
1286                         /* clear out the checksum flag, in case this is a
1287                          * resend but cl_checksum is no longer set. b=11238 */
1288                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1289                 }
1290                 oa->o_cksum = body->oa.o_cksum;
1291                 /* 1 RC per niobuf */
1292                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1293                 ptlrpc_req_set_repsize(req, 3, size);
1294         } else {
1295                 if (cli->cl_checksum) {
1296                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1297                                 body->oa.o_flags = 0;
1298                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1299                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1300                 }
1301                 /* 1 RC for the whole I/O */
1302                 ptlrpc_req_set_repsize(req, 2, size);
1303         }
1304
1305         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1306         aa = ptlrpc_req_async_args(req);
1307         aa->aa_oa = oa;
1308         aa->aa_requested_nob = requested_nob;
1309         aa->aa_nio_count = niocount;
1310         aa->aa_page_count = page_count;
1311         aa->aa_resends = 0;
1312         aa->aa_ppga = pga;
1313         aa->aa_cli = cli;
1314         aa->aa_pshift = pshift;
1315         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1316
1317         *reqp = req;
1318         RETURN (0);
1319
1320  out:
1321         ptlrpc_req_finished (req);
1322         RETURN (rc);
1323 }
1324
1325 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1326                                 __u32 client_cksum, __u32 server_cksum, int nob,
1327                                 obd_count page_count, struct brw_page **pga,
1328                                 cksum_type_t client_cksum_type, int pshift)
1329 {
1330         __u32 new_cksum;
1331         char *msg;
1332         cksum_type_t cksum_type;
1333
1334         if (server_cksum == client_cksum) {
1335                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1336                 return 0;
1337         }
1338
1339         if (oa->o_valid & OBD_MD_FLFLAGS)
1340                 cksum_type = cksum_type_unpack(oa->o_flags);
1341         else
1342                 cksum_type = OBD_CKSUM_CRC32;
1343
1344         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1345                                       cksum_type, pshift);
1346
1347         if (cksum_type != client_cksum_type)
1348                 msg = "the server did not use the checksum type specified in "
1349                       "the original request - likely a protocol problem";
1350         else if (new_cksum == server_cksum)
1351                 msg = "changed on the client after we checksummed it - "
1352                       "likely false positive due to mmap IO (bug 11742)";
1353         else if (new_cksum == client_cksum)
1354                 msg = "changed in transit before arrival at OST";
1355         else
1356                 msg = "changed in transit AND doesn't match the original - "
1357                       "likely false positive due to mmap IO (bug 11742)";
1358
1359         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1360                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1361                            "["LPU64"-"LPU64"]\n",
1362                            msg, libcfs_nid2str(peer->nid),
1363                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1364                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1365                                                         (__u64)0,
1366                            oa->o_id,
1367                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1368                            pga[0]->off,
1369                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1370         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1371                "client csum now %x\n", client_cksum, client_cksum_type,
1372                server_cksum, cksum_type, new_cksum);
1373
1374         return 1;
1375 }
1376
1377 /* Note rc enters this function as number of bytes transferred */
1378 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1379 {
1380         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1381         const lnet_process_id_t *peer =
1382                         &req->rq_import->imp_connection->c_peer;
1383         struct client_obd *cli = aa->aa_cli;
1384         struct ost_body *body;
1385         __u32 client_cksum = 0;
1386         ENTRY;
1387
1388         if (rc < 0 && rc != -EDQUOT)
1389                 RETURN(rc);
1390
1391         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1392         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1393                                   lustre_swab_ost_body);
1394         if (body == NULL) {
1395                 CERROR ("Can't unpack body\n");
1396                 RETURN(-EPROTO);
1397         }
1398
1399         /* set/clear over quota flag for a uid/gid */
1400         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1401             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1402                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1403                              body->oa.o_gid, body->oa.o_valid,
1404                              body->oa.o_flags);
1405
1406         if (rc < 0)
1407                 RETURN(rc);
1408
1409         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1410                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1411
1412         osc_update_grant(cli, body);
1413
1414         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1415                 if (rc > 0) {
1416                         CERROR ("Unexpected +ve rc %d\n", rc);
1417                         RETURN(-EPROTO);
1418                 }
1419                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1420
1421                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422                     check_write_checksum(&body->oa, peer, client_cksum,
1423                                          body->oa.o_cksum, aa->aa_requested_nob,
1424                                          aa->aa_page_count, aa->aa_ppga,
1425                                          cksum_type_unpack(aa->aa_oa->o_flags),
1426                                          aa->aa_pshift))
1427                         RETURN(-EAGAIN);
1428
1429                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1430                                      aa->aa_page_count, aa->aa_ppga);
1431                 GOTO(out, rc);
1432         }
1433
1434         /* The rest of this function executes only for OST_READs */
1435         if (rc > aa->aa_requested_nob) {
1436                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1437                        aa->aa_requested_nob);
1438                 RETURN(-EPROTO);
1439         }
1440
1441         if (rc != req->rq_bulk->bd_nob_transferred) {
1442                 CERROR ("Unexpected rc %d (%d transferred)\n",
1443                         rc, req->rq_bulk->bd_nob_transferred);
1444                 return (-EPROTO);
1445         }
1446
1447         if (rc < aa->aa_requested_nob)
1448                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1449
1450         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1451                 static int cksum_counter;
1452                 __u32      server_cksum = body->oa.o_cksum;
1453                 char      *via;
1454                 char      *router;
1455                 cksum_type_t cksum_type;
1456
1457                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1458                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1459                 else
1460                         cksum_type = OBD_CKSUM_CRC32;
1461                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1462                                                  aa->aa_ppga, OST_READ,
1463                                                  cksum_type, aa->aa_pshift);
1464
1465                 if (peer->nid == req->rq_bulk->bd_sender) {
1466                         via = router = "";
1467                 } else {
1468                         via = " via ";
1469                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1470                 }
1471
1472                 if (server_cksum == ~0 && rc > 0) {
1473                         CERROR("Protocol error: server %s set the 'checksum' "
1474                                "bit, but didn't send a checksum.  Not fatal, "
1475                                "but please notify on http://bugzilla.lustre.org/\n",
1476                                libcfs_nid2str(peer->nid));
1477                 } else if (server_cksum != client_cksum) {
1478                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1479                                            "%s%s%s inum "LPU64"/"LPU64" object "
1480                                            LPU64"/"LPU64" extent "
1481                                            "["LPU64"-"LPU64"]\n",
1482                                            req->rq_import->imp_obd->obd_name,
1483                                            libcfs_nid2str(peer->nid),
1484                                            via, router,
1485                                            body->oa.o_valid & OBD_MD_FLFID ?
1486                                                 body->oa.o_fid : (__u64)0,
1487                                            body->oa.o_valid & OBD_MD_FLFID ?
1488                                                 body->oa.o_generation :(__u64)0,
1489                                            body->oa.o_id,
1490                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1491                                                 body->oa.o_gr : (__u64)0,
1492                                            aa->aa_ppga[0]->off,
1493                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1494                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1495                                                                         1);
1496                         CERROR("client %x, server %x, cksum_type %x\n",
1497                                client_cksum, server_cksum, cksum_type);
1498                         cksum_counter = 0;
1499                         aa->aa_oa->o_cksum = client_cksum;
1500                         rc = -EAGAIN;
1501                 } else {
1502                         cksum_counter++;
1503                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1504                         rc = 0;
1505                 }
1506         } else if (unlikely(client_cksum)) {
1507                 static int cksum_missed;
1508
1509                 cksum_missed++;
1510                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1511                         CERROR("Checksum %u requested from %s but not sent\n",
1512                                cksum_missed, libcfs_nid2str(peer->nid));
1513         } else {
1514                 rc = 0;
1515         }
1516 out:
1517         if (rc >= 0)
1518                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1519
1520         RETURN(rc);
1521 }
1522
1523 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1524                             struct lov_stripe_md *lsm,
1525                             obd_count page_count, struct brw_page **pga)
1526 {
1527         struct ptlrpc_request *request;
1528         int                    rc;
1529         cfs_waitq_t            waitq;
1530         int                    resends = 0;
1531         struct l_wait_info     lwi;
1532
1533         ENTRY;
1534         init_waitqueue_head(&waitq);
1535
1536 restart_bulk:
1537         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1538                                   page_count, pga, &request, 0);
1539         if (rc != 0)
1540                 return (rc);
1541
1542         rc = ptlrpc_queue_wait(request);
1543
1544         if (rc == -ETIMEDOUT && request->rq_resend) {
1545                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1546                 ptlrpc_req_finished(request);
1547                 goto restart_bulk;
1548         }
1549
1550         rc = osc_brw_fini_request(request, rc);
1551
1552         ptlrpc_req_finished(request);
1553         if (osc_recoverable_error(rc)) {
1554                 resends++;
1555                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1556                         CERROR("too many resend retries, returning error\n");
1557                         RETURN(-EIO);
1558                 }
1559
1560                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1561                 l_wait_event(waitq, 0, &lwi);
1562
1563                 goto restart_bulk;
1564         }
1565         RETURN(rc);
1566 }
1567
1568 int osc_brw_redo_request(struct ptlrpc_request *request,
1569                          struct osc_brw_async_args *aa)
1570 {
1571         struct ptlrpc_request *new_req;
1572         struct ptlrpc_request_set *set = request->rq_set;
1573         struct osc_brw_async_args *new_aa;
1574         struct osc_async_page *oap;
1575         int rc = 0;
1576         ENTRY;
1577
1578         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1579                 CERROR("too many resend retries, returning error\n");
1580                 RETURN(-EIO);
1581         }
1582
1583         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1584
1585         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1586                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1587                                   aa->aa_cli, aa->aa_oa,
1588                                   NULL /* lsm unused by osc currently */,
1589                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1590                                   aa->aa_pshift);
1591         if (rc)
1592                 RETURN(rc);
1593
1594         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1595
1596         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1597                 if (oap->oap_request != NULL) {
1598                         LASSERTF(request == oap->oap_request,
1599                                  "request %p != oap_request %p\n",
1600                                  request, oap->oap_request);
1601                         if (oap->oap_interrupted) {
1602                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1603                                 ptlrpc_req_finished(new_req);
1604                                 RETURN(-EINTR);
1605                         }
1606                 }
1607         }
1608         /* New request takes over pga and oaps from old request.
1609          * Note that copying a list_head doesn't work, need to move it... */
1610         aa->aa_resends++;
1611         new_req->rq_interpret_reply = request->rq_interpret_reply;
1612         new_req->rq_async_args = request->rq_async_args;
1613         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1614
1615         new_aa = ptlrpc_req_async_args(new_req);
1616
1617         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1618         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1619         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1620
1621         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1622                 if (oap->oap_request) {
1623                         ptlrpc_req_finished(oap->oap_request);
1624                         oap->oap_request = ptlrpc_request_addref(new_req);
1625                 }
1626         }
1627
1628         /* use ptlrpc_set_add_req is safe because interpret functions work
1629          * in check_set context. only one way exist with access to request
1630          * from different thread got -EINTR - this way protected with
1631          * cl_loi_list_lock */
1632         ptlrpc_set_add_req(set, new_req);
1633
1634         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1635
1636         DEBUG_REQ(D_INFO, new_req, "new request");
1637         RETURN(0);
1638 }
1639
1640 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1641                           struct lov_stripe_md *lsm, obd_count page_count,
1642                           struct brw_page **pga, struct ptlrpc_request_set *set,
1643                           int pshift)
1644 {
1645         struct ptlrpc_request     *request;
1646         struct client_obd         *cli = &exp->exp_obd->u.cli;
1647         int                        rc, i;
1648         struct osc_brw_async_args *aa;
1649         ENTRY;
1650
1651         /* Consume write credits even if doing a sync write -
1652          * otherwise we may run out of space on OST due to grant. */
1653         /* FIXME: unaligned writes must use write grants too */
1654         if (cmd == OBD_BRW_WRITE && pshift == 0) {
1655                 client_obd_list_lock(&cli->cl_loi_list_lock);
1656                 for (i = 0; i < page_count; i++) {
1657                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1658                                 osc_consume_write_grant(cli, pga[i]);
1659                 }
1660                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1661         }
1662
1663         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1664                                   page_count, pga, &request, pshift);
1665
1666         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1667
1668         if (rc == 0) {
1669                 aa = ptlrpc_req_async_args(request);
1670                 if (cmd == OBD_BRW_READ) {
1671                         lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1672                         lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1673                 } else {
1674                         lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1675                         lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1676                                          cli->cl_w_in_flight);
1677                 }
1678                 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1679
1680                 LASSERT(list_empty(&aa->aa_oaps));
1681
1682                 request->rq_interpret_reply = brw_interpret;
1683                 ptlrpc_set_add_req(set, request);
1684                 client_obd_list_lock(&cli->cl_loi_list_lock);
1685                 if (cmd == OBD_BRW_READ)
1686                         cli->cl_r_in_flight++;
1687                 else
1688                         cli->cl_w_in_flight++;
1689                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1690                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1691         } else if (cmd == OBD_BRW_WRITE) {
1692                 client_obd_list_lock(&cli->cl_loi_list_lock);
1693                 for (i = 0; i < page_count; i++)
1694                         osc_release_write_grant(cli, pga[i], 0);
1695                 osc_wake_cache_waiters(cli);
1696                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1697         }
1698
1699         RETURN (rc);
1700 }
1701
1702 /*
1703  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1704  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1705  * fine for our small page arrays and doesn't require allocation.  its an
1706  * insertion sort that swaps elements that are strides apart, shrinking the
1707  * stride down until its '1' and the array is sorted.
1708  */
1709 static void sort_brw_pages(struct brw_page **array, int num)
1710 {
1711         int stride, i, j;
1712         struct brw_page *tmp;
1713
1714         if (num == 1)
1715                 return;
1716         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1717                 ;
1718
1719         do {
1720                 stride /= 3;
1721                 for (i = stride ; i < num ; i++) {
1722                         tmp = array[i];
1723                         j = i;
1724                         while (j >= stride && array[j-stride]->off > tmp->off) {
1725                                 array[j] = array[j - stride];
1726                                 j -= stride;
1727                         }
1728                         array[j] = tmp;
1729                 }
1730         } while (stride > 1);
1731 }
1732
1733 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1734                                         int pshift)
1735 {
1736         int count = 1;
1737         int offset;
1738         int i = 0;
1739
1740         LASSERT (pages > 0);
1741         offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1742
1743         for (;;) {
1744                 pages--;
1745                 if (pages == 0)         /* that's all */
1746                         return count;
1747
1748                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1749                         return count;   /* doesn't end on page boundary */
1750
1751                 i++;
1752                 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1753                 if (offset != 0)        /* doesn't start on page boundary */
1754                         return count;
1755
1756                 count++;
1757         }
1758 }
1759
1760 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1761 {
1762         struct brw_page **ppga;
1763         int i;
1764
1765         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1766         if (ppga == NULL)
1767                 return NULL;
1768
1769         for (i = 0; i < count; i++)
1770                 ppga[i] = pga + i;
1771         return ppga;
1772 }
1773
1774 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1775 {
1776         LASSERT(ppga != NULL);
1777         OBD_FREE(ppga, sizeof(*ppga) * count);
1778 }
1779
1780 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1781                    obd_count page_count, struct brw_page *pga,
1782                    struct obd_trans_info *oti)
1783 {
1784         struct obdo *saved_oa = NULL;
1785         struct brw_page **ppga, **orig;
1786         struct obd_import *imp = class_exp2cliimp(exp);
1787         struct client_obd *cli;
1788         int rc, page_count_orig;
1789         ENTRY;
1790
1791         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1792         cli = &imp->imp_obd->u.cli;
1793
1794         if (cmd & OBD_BRW_CHECK) {
1795                 /* The caller just wants to know if there's a chance that this
1796                  * I/O can succeed */
1797
1798                 if (imp->imp_invalid)
1799                         RETURN(-EIO);
1800                 RETURN(0);
1801         }
1802
1803         /* test_brw with a failed create can trip this, maybe others. */
1804         LASSERT(cli->cl_max_pages_per_rpc);
1805
1806         rc = 0;
1807
1808         orig = ppga = osc_build_ppga(pga, page_count);
1809         if (ppga == NULL)
1810                 RETURN(-ENOMEM);
1811         page_count_orig = page_count;
1812
1813         sort_brw_pages(ppga, page_count);
1814         while (page_count) {
1815                 obd_count pages_per_brw;
1816
1817                 if (page_count > cli->cl_max_pages_per_rpc)
1818                         pages_per_brw = cli->cl_max_pages_per_rpc;
1819                 else
1820                         pages_per_brw = page_count;
1821
1822                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1823
1824                 if (saved_oa != NULL) {
1825                         /* restore previously saved oa */
1826                         *oinfo->oi_oa = *saved_oa;
1827                 } else if (page_count > pages_per_brw) {
1828                         /* save a copy of oa (brw will clobber it) */
1829                         OBDO_ALLOC(saved_oa);
1830                         if (saved_oa == NULL)
1831                                 GOTO(out, rc = -ENOMEM);
1832                         *saved_oa = *oinfo->oi_oa;
1833                 }
1834
1835                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1836                                       pages_per_brw, ppga);
1837
1838                 if (rc != 0)
1839                         break;
1840
1841                 page_count -= pages_per_brw;
1842                 ppga += pages_per_brw;
1843         }
1844
1845 out:
1846         osc_release_ppga(orig, page_count_orig);
1847
1848         if (saved_oa != NULL)
1849                 OBDO_FREE(saved_oa);
1850
1851         RETURN(rc);
1852 }
1853
1854 static int osc_brw_async(int cmd, struct obd_export *exp,
1855                          struct obd_info *oinfo, obd_count page_count,
1856                          struct brw_page *pga, struct obd_trans_info *oti,
1857                          struct ptlrpc_request_set *set, int pshift)
1858 {
1859         struct brw_page **ppga, **orig;
1860         int page_count_orig;
1861         int rc = 0;
1862         ENTRY;
1863
1864         if (cmd & OBD_BRW_CHECK) {
1865                 /* The caller just wants to know if there's a chance that this
1866                  * I/O can succeed */
1867                 struct obd_import *imp = class_exp2cliimp(exp);
1868
1869                 if (imp == NULL || imp->imp_invalid)
1870                         RETURN(-EIO);
1871                 RETURN(0);
1872         }
1873
1874         orig = ppga = osc_build_ppga(pga, page_count);
1875         if (ppga == NULL)
1876                 RETURN(-ENOMEM);
1877         page_count_orig = page_count;
1878
1879         sort_brw_pages(ppga, page_count);
1880         while (page_count) {
1881                 struct brw_page **copy;
1882                 struct obdo *oa;
1883                 obd_count pages_per_brw;
1884
1885                 /* one page less under unaligned direct i/o */
1886                 pages_per_brw = min_t(obd_count, page_count,
1887                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1888                                       !!pshift);
1889
1890                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1891                                                        pshift);
1892
1893                 /* use ppga only if single RPC is going to fly */
1894                 if (pages_per_brw != page_count_orig || ppga != orig) {
1895                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1896                         if (copy == NULL)
1897                                 GOTO(out, rc = -ENOMEM);
1898                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1899
1900                         OBDO_ALLOC(oa);
1901                         if (oa == NULL) {
1902                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1903                                 GOTO(out, rc = -ENOMEM);
1904                         }
1905                         memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1906                         oa->o_flags |= OBD_FL_TEMPORARY;
1907                 } else {
1908                         copy = ppga;
1909                         oa = oinfo->oi_oa;
1910                         LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1911                 }
1912
1913                 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1914                                     copy, set, pshift);
1915
1916                 if (rc != 0) {
1917                         if (copy != ppga)
1918                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1919
1920                         if (oa->o_flags & OBD_FL_TEMPORARY)
1921                                 OBDO_FREE(oa);
1922                         break;
1923                 }
1924
1925                 if (copy == orig) {
1926                         /* we passed it to async_internal() which is
1927                          * now responsible for releasing memory */
1928                         orig = NULL;
1929                 }
1930
1931                 page_count -= pages_per_brw;
1932                 ppga += pages_per_brw;
1933         }
1934 out:
1935         if (orig)
1936                 osc_release_ppga(orig, page_count_orig);
1937         RETURN(rc);
1938 }
1939
1940 static void osc_check_rpcs(struct client_obd *cli);
1941
1942 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1943  * the dirty accounting.  Writeback completes or truncate happens before
1944  * writing starts.  Must be called with the loi lock held. */
1945 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1946                            int sent)
1947 {
1948         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1949 }
1950
1951 /* This maintains the lists of pending pages to read/write for a given object
1952  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1953  * to quickly find objects that are ready to send an RPC. */
1954 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1955                          int cmd)
1956 {
1957         int optimal;
1958         ENTRY;
1959
1960         if (lop->lop_num_pending == 0)
1961                 RETURN(0);
1962
1963         /* if we have an invalid import we want to drain the queued pages
1964          * by forcing them through rpcs that immediately fail and complete
1965          * the pages.  recovery relies on this to empty the queued pages
1966          * before canceling the locks and evicting down the llite pages */
1967         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1968                 RETURN(1);
1969
1970         /* stream rpcs in queue order as long as as there is an urgent page
1971          * queued.  this is our cheap solution for good batching in the case
1972          * where writepage marks some random page in the middle of the file
1973          * as urgent because of, say, memory pressure */
1974         if (!list_empty(&lop->lop_urgent)) {
1975                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1976                 RETURN(1);
1977         }
1978
1979         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1980         optimal = cli->cl_max_pages_per_rpc;
1981         if (cmd & OBD_BRW_WRITE) {
1982                 /* trigger a write rpc stream as long as there are dirtiers
1983                  * waiting for space.  as they're waiting, they're not going to
1984                  * create more pages to coallesce with what's waiting.. */
1985                 if (!list_empty(&cli->cl_cache_waiters)) {
1986                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1987                         RETURN(1);
1988                 }
1989
1990                 /* +16 to avoid triggering rpcs that would want to include pages
1991                  * that are being queued but which can't be made ready until
1992                  * the queuer finishes with the page. this is a wart for
1993                  * llite::commit_write() */
1994                 optimal += 16;
1995         }
1996         if (lop->lop_num_pending >= optimal)
1997                 RETURN(1);
1998
1999         RETURN(0);
2000 }
2001
2002 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2003 {
2004         struct osc_async_page *oap;
2005         ENTRY;
2006
2007         if (list_empty(&lop->lop_urgent))
2008                 RETURN(0);
2009
2010         oap = list_entry(lop->lop_urgent.next,
2011                          struct osc_async_page, oap_urgent_item);
2012
2013         if (oap->oap_async_flags & ASYNC_HP) {
2014                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2015                 RETURN(1);
2016         }
2017
2018         RETURN(0);
2019 }
2020
2021 static void on_list(struct list_head *item, struct list_head *list,
2022                     int should_be_on)
2023 {
2024         if (list_empty(item) && should_be_on)
2025                 list_add_tail(item, list);
2026         else if (!list_empty(item) && !should_be_on)
2027                 list_del_init(item);
2028 }
2029
2030 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2031  * can find pages to build into rpcs quickly */
2032 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2033 {
2034         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2035             lop_makes_hprpc(&loi->loi_read_lop)) {
2036                 /* HP rpc */
2037                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2038                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2039         } else {
2040                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2041                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2042                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2043                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2044         }
2045
2046         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2047                 loi->loi_write_lop.lop_num_pending);
2048
2049         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2050                 loi->loi_read_lop.lop_num_pending);
2051 }
2052
2053 static void lop_update_pending(struct client_obd *cli,
2054                                struct loi_oap_pages *lop, int cmd, int delta)
2055 {
2056         lop->lop_num_pending += delta;
2057         if (cmd & OBD_BRW_WRITE)
2058                 cli->cl_pending_w_pages += delta;
2059         else
2060                 cli->cl_pending_r_pages += delta;
2061 }
2062
2063 /* this is called when a sync waiter receives an interruption.  Its job is to
2064  * get the caller woken as soon as possible.  If its page hasn't been put in an
2065  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2066  * desiring interruption which will forcefully complete the rpc once the rpc
2067  * has timed out */
2068 static void osc_occ_interrupted(struct oig_callback_context *occ)
2069 {
2070         struct osc_async_page *oap;
2071         struct loi_oap_pages *lop;
2072         struct lov_oinfo *loi;
2073         ENTRY;
2074
2075         /* XXX member_of() */
2076         oap = list_entry(occ, struct osc_async_page, oap_occ);
2077
2078         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2079
2080         oap->oap_interrupted = 1;
2081
2082         /* ok, it's been put in an rpc. only one oap gets a request reference */
2083         if (oap->oap_request != NULL) {
2084                 ptlrpc_mark_interrupted(oap->oap_request);
2085                 ptlrpcd_wake(oap->oap_request);
2086                 GOTO(unlock, 0);
2087         }
2088
2089         /* we don't get interruption callbacks until osc_trigger_group_io()
2090          * has been called and put the sync oaps in the pending/urgent lists.*/
2091         if (!list_empty(&oap->oap_pending_item)) {
2092                 list_del_init(&oap->oap_pending_item);
2093                 list_del_init(&oap->oap_urgent_item);
2094
2095                 loi = oap->oap_loi;
2096                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2097                         &loi->loi_write_lop : &loi->loi_read_lop;
2098                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2099                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2100
2101                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2102                 oap->oap_oig = NULL;
2103         }
2104
2105 unlock:
2106         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2107 }
2108
2109 /* this is trying to propogate async writeback errors back up to the
2110  * application.  As an async write fails we record the error code for later if
2111  * the app does an fsync.  As long as errors persist we force future rpcs to be
2112  * sync so that the app can get a sync error and break the cycle of queueing
2113  * pages for which writeback will fail. */
2114 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2115                            int rc)
2116 {
2117         if (rc) {
2118                 if (!ar->ar_rc)
2119                         ar->ar_rc = rc;
2120
2121                 ar->ar_force_sync = 1;
2122                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2123                 return;
2124
2125         }
2126
2127         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2128                 ar->ar_force_sync = 0;
2129 }
2130
2131 static void osc_oap_to_pending(struct osc_async_page *oap)
2132 {
2133         struct loi_oap_pages *lop;
2134
2135         if (oap->oap_cmd & OBD_BRW_WRITE)
2136                 lop = &oap->oap_loi->loi_write_lop;
2137         else
2138                 lop = &oap->oap_loi->loi_read_lop;
2139
2140         if (oap->oap_async_flags & ASYNC_HP)
2141                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2142         else if (oap->oap_async_flags & ASYNC_URGENT)
2143                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2144         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2145         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2146 }
2147
2148 /* this must be called holding the loi list lock to give coverage to exit_cache,
2149  * async_flag maintenance, and oap_request */
2150 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2151                               struct osc_async_page *oap, int sent, int rc)
2152 {
2153         __u64 xid = 0;
2154
2155         ENTRY;
2156         if (oap->oap_request != NULL) {
2157                 xid = ptlrpc_req_xid(oap->oap_request);
2158                 ptlrpc_req_finished(oap->oap_request);
2159                 oap->oap_request = NULL;
2160         }
2161
2162         spin_lock(&oap->oap_lock);
2163         oap->oap_async_flags = 0;
2164         spin_unlock(&oap->oap_lock);
2165         oap->oap_interrupted = 0;
2166
2167         if (oap->oap_cmd & OBD_BRW_WRITE) {
2168                 osc_process_ar(&cli->cl_ar, xid, rc);
2169                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2170         }
2171
2172         if (rc == 0 && oa != NULL) {
2173                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2174                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2175                 if (oa->o_valid & OBD_MD_FLMTIME)
2176                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2177                 if (oa->o_valid & OBD_MD_FLATIME)
2178                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2179                 if (oa->o_valid & OBD_MD_FLCTIME)
2180                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2181         }
2182
2183         if (oap->oap_oig) {
2184                 osc_exit_cache(cli, oap, sent);
2185                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2186                 oap->oap_oig = NULL;
2187                 EXIT;
2188                 return;
2189         }
2190
2191         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2192                                                 oap->oap_cmd, oa, rc);
2193
2194         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2195          * I/O on the page could start, but OSC calls it under lock
2196          * and thus we can add oap back to pending safely */
2197         if (rc)
2198                 /* upper layer wants to leave the page on pending queue */
2199                 osc_oap_to_pending(oap);
2200         else
2201                 osc_exit_cache(cli, oap, sent);
2202         EXIT;
2203 }
2204
2205 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2206 {
2207         struct osc_brw_async_args *aa = data;
2208         struct client_obd *cli;
2209         ENTRY;
2210
2211         rc = osc_brw_fini_request(request, rc);
2212         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2213
2214         if (osc_recoverable_error(rc)) {
2215                 rc = osc_brw_redo_request(request, aa);
2216                 if (rc == 0)
2217                         RETURN(0);
2218         }
2219
2220         cli = aa->aa_cli;
2221         client_obd_list_lock(&cli->cl_loi_list_lock);
2222         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2223          * is called so we know whether to go to sync BRWs or wait for more
2224          * RPCs to complete */
2225         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2226                 cli->cl_w_in_flight--;
2227         else
2228                 cli->cl_r_in_flight--;
2229
2230         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2231                 struct osc_async_page *oap, *tmp;
2232                 /* the caller may re-use the oap after the completion call so
2233                  * we need to clean it up a little */
2234                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2235                         list_del_init(&oap->oap_rpc_item);
2236                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2237                 }
2238                 OBDO_FREE(aa->aa_oa);
2239         } else { /* from async_internal() */
2240                 obd_count i;
2241                 for (i = 0; i < aa->aa_page_count; i++)
2242                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2243
2244                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2245                         OBDO_FREE(aa->aa_oa);
2246         }
2247         osc_wake_cache_waiters(cli);
2248         osc_check_rpcs(cli);
2249         client_obd_list_unlock(&cli->cl_loi_list_lock);
2250
2251         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2252
2253         RETURN(rc);
2254 }
2255
2256 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2257                                             struct list_head *rpc_list,
2258                                             int page_count, int cmd)
2259 {
2260         struct ptlrpc_request *req;
2261         struct brw_page **pga = NULL;
2262         struct osc_brw_async_args *aa;
2263         struct obdo *oa = NULL;
2264         struct obd_async_page_ops *ops = NULL;
2265         void *caller_data = NULL;
2266         struct osc_async_page *oap;
2267         struct ldlm_lock *lock = NULL;
2268         obd_valid valid;
2269         int i, rc;
2270
2271         ENTRY;
2272         LASSERT(!list_empty(rpc_list));
2273
2274         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2275         if (pga == NULL)
2276                 RETURN(ERR_PTR(-ENOMEM));
2277
2278         OBDO_ALLOC(oa);
2279         if (oa == NULL)
2280                 GOTO(out, req = ERR_PTR(-ENOMEM));
2281
2282         i = 0;
2283         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2284                 if (ops == NULL) {
2285                         ops = oap->oap_caller_ops;
2286                         caller_data = oap->oap_caller_data;
2287                         lock = oap->oap_ldlm_lock;
2288                 }
2289                 pga[i] = &oap->oap_brw_page;
2290                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2291                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2292                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2293                 i++;
2294         }
2295
2296         /* always get the data for the obdo for the rpc */
2297         LASSERT(ops != NULL);
2298         ops->ap_fill_obdo(caller_data, cmd, oa);
2299         if (lock) {
2300                 oa->o_handle = lock->l_remote_handle;
2301                 oa->o_valid |= OBD_MD_FLHANDLE;
2302         }
2303
2304         sort_brw_pages(pga, page_count);
2305         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2306         if (rc != 0) {
2307                 CERROR("prep_req failed: %d\n", rc);
2308                 GOTO(out, req = ERR_PTR(rc));
2309         }
2310         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2311                                                  sizeof(struct ost_body)))->oa;
2312
2313         /* Need to update the timestamps after the request is built in case
2314          * we race with setattr (locally or in queue at OST).  If OST gets
2315          * later setattr before earlier BRW (as determined by the request xid),
2316          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2317          * way to do this in a single call.  bug 10150 */
2318         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2319                 /* in case of lockless read/write do not use inode's
2320                  * timestamps because concurrent stat might fill the
2321                  * inode with out-of-date times, send current
2322                  * instead */
2323                 if (cmd & OBD_BRW_WRITE) {
2324                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2325                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2326                         valid = OBD_MD_FLATIME;
2327                 } else {
2328                         oa->o_atime = LTIME_S(CURRENT_TIME);
2329                         oa->o_valid |= OBD_MD_FLATIME;
2330                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2331                 }
2332         } else {
2333                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2334         }
2335         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2336
2337         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2338         aa = ptlrpc_req_async_args(req);
2339         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2340         list_splice(rpc_list, &aa->aa_oaps);
2341         CFS_INIT_LIST_HEAD(rpc_list);
2342
2343 out:
2344         if (IS_ERR(req)) {
2345                 if (oa)
2346                         OBDO_FREE(oa);
2347                 if (pga)
2348                         OBD_FREE(pga, sizeof(*pga) * page_count);
2349         }
2350         RETURN(req);
2351 }
2352
2353 /* the loi lock is held across this function but it's allowed to release
2354  * and reacquire it during its work */
2355 /**
2356  * prepare pages for ASYNC io and put pages in send queue.
2357  *
2358  * \param cli -
2359  * \param loi -
2360  * \param cmd - OBD_BRW_* macroses
2361  * \param lop - pending pages
2362  *
2363  * \return zero if pages successfully add to send queue.
2364  * \return not zere if error occurring.
2365  */
2366 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2367                             int cmd, struct loi_oap_pages *lop)
2368 {
2369         struct ptlrpc_request *req;
2370         obd_count page_count = 0;
2371         struct osc_async_page *oap = NULL, *tmp;
2372         struct osc_brw_async_args *aa;
2373         struct obd_async_page_ops *ops;
2374         CFS_LIST_HEAD(rpc_list);
2375         unsigned int ending_offset;
2376         unsigned  starting_offset = 0;
2377         int srvlock = 0;
2378         ENTRY;
2379
2380         /* If there are HP OAPs we need to handle at least 1 of them,
2381          * move it the beginning of the pending list for that. */
2382         if (!list_empty(&lop->lop_urgent)) {
2383                 oap = list_entry(lop->lop_urgent.next,
2384                                  struct osc_async_page, oap_urgent_item);
2385                 if (oap->oap_async_flags & ASYNC_HP)
2386                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2387         }
2388
2389         /* first we find the pages we're allowed to work with */
2390         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2391                 ops = oap->oap_caller_ops;
2392
2393                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2394                          "magic 0x%x\n", oap, oap->oap_magic);
2395
2396                 if (page_count != 0 &&
2397                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2398                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2399                                " oap %p, page %p, srvlock %u\n",
2400                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2401                         break;
2402                 }
2403                 /* in llite being 'ready' equates to the page being locked
2404                  * until completion unlocks it.  commit_write submits a page
2405                  * as not ready because its unlock will happen unconditionally
2406                  * as the call returns.  if we race with commit_write giving
2407                  * us that page we dont' want to create a hole in the page
2408                  * stream, so we stop and leave the rpc to be fired by
2409                  * another dirtier or kupdated interval (the not ready page
2410                  * will still be on the dirty list).  we could call in
2411                  * at the end of ll_file_write to process the queue again. */
2412                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2413                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2414                         if (rc < 0)
2415                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2416                                                 "instead of ready\n", oap,
2417                                                 oap->oap_page, rc);
2418                         switch (rc) {
2419                         case -EAGAIN:
2420                                 /* llite is telling us that the page is still
2421                                  * in commit_write and that we should try
2422                                  * and put it in an rpc again later.  we
2423                                  * break out of the loop so we don't create
2424                                  * a hole in the sequence of pages in the rpc
2425                                  * stream.*/
2426                                 oap = NULL;
2427                                 break;
2428                         case -EINTR:
2429                                 /* the io isn't needed.. tell the checks
2430                                  * below to complete the rpc with EINTR */
2431                                 spin_lock(&oap->oap_lock);
2432                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2433                                 spin_unlock(&oap->oap_lock);
2434                                 oap->oap_count = -EINTR;
2435                                 break;
2436                         case 0:
2437                                 spin_lock(&oap->oap_lock);
2438                                 oap->oap_async_flags |= ASYNC_READY;
2439                                 spin_unlock(&oap->oap_lock);
2440                                 break;
2441                         default:
2442                                 LASSERTF(0, "oap %p page %p returned %d "
2443                                             "from make_ready\n", oap,
2444                                             oap->oap_page, rc);
2445                                 break;
2446                         }
2447                 }
2448                 if (oap == NULL)
2449                         break;
2450                 /*
2451                  * Page submitted for IO has to be locked. Either by
2452                  * ->ap_make_ready() or by higher layers.
2453                  */
2454 #if defined(__KERNEL__) && defined(__linux__)
2455                  if(!(PageLocked(oap->oap_page) &&
2456                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2457                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2458                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2459                         LBUG();
2460                 }
2461 #endif
2462                 /* If there is a gap at the start of this page, it can't merge
2463                  * with any previous page, so we'll hand the network a
2464                  * "fragmented" page array that it can't transfer in 1 RDMA */
2465                 if (page_count != 0 && oap->oap_page_off != 0)
2466                         break;
2467
2468                 /* take the page out of our book-keeping */
2469                 list_del_init(&oap->oap_pending_item);
2470                 lop_update_pending(cli, lop, cmd, -1);
2471                 list_del_init(&oap->oap_urgent_item);
2472
2473                 if (page_count == 0)
2474                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2475                                           (PTLRPC_MAX_BRW_SIZE - 1);
2476
2477                 /* ask the caller for the size of the io as the rpc leaves. */
2478                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2479                         oap->oap_count =
2480                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2481                 if (oap->oap_count <= 0) {
2482                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2483                                oap->oap_count);
2484                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2485                         continue;
2486                 }
2487
2488                 /* now put the page back in our accounting */
2489                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2490                 if (page_count == 0)
2491                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2492                 if (++page_count >= cli->cl_max_pages_per_rpc)
2493                         break;
2494
2495                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2496                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2497                  * have the same alignment as the initial writes that allocated
2498                  * extents on the server. */
2499                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2500                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2501                 if (ending_offset == 0)
2502                         break;
2503
2504                 /* If there is a gap at the end of this page, it can't merge
2505                  * with any subsequent pages, so we'll hand the network a
2506                  * "fragmented" page array that it can't transfer in 1 RDMA */
2507                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2508                         break;
2509         }
2510
2511         osc_wake_cache_waiters(cli);
2512
2513         if (page_count == 0)
2514                 RETURN(0);
2515
2516         loi_list_maint(cli, loi);
2517
2518         client_obd_list_unlock(&cli->cl_loi_list_lock);
2519
2520         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2521         if (IS_ERR(req)) {
2522                 /* this should happen rarely and is pretty bad, it makes the
2523                  * pending list not follow the dirty order */
2524                 client_obd_list_lock(&cli->cl_loi_list_lock);
2525                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2526                         list_del_init(&oap->oap_rpc_item);
2527
2528                         /* queued sync pages can be torn down while the pages
2529                          * were between the pending list and the rpc */
2530                         if (oap->oap_interrupted) {
2531                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2532                                 osc_ap_completion(cli, NULL, oap, 0,
2533                                                   oap->oap_count);
2534                                 continue;
2535                         }
2536                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2537                 }
2538                 loi_list_maint(cli, loi);
2539                 RETURN(PTR_ERR(req));
2540         }
2541
2542         aa = ptlrpc_req_async_args(req);
2543         if (cmd == OBD_BRW_READ) {
2544                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2545                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2546                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2547                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2548         } else {
2549                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2550                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2551                                  cli->cl_w_in_flight);
2552                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2553                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2554         }
2555         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2556
2557         client_obd_list_lock(&cli->cl_loi_list_lock);
2558
2559         if (cmd == OBD_BRW_READ)
2560                 cli->cl_r_in_flight++;
2561         else
2562                 cli->cl_w_in_flight++;
2563
2564         /* queued sync pages can be torn down while the pages
2565          * were between the pending list and the rpc */
2566         tmp = NULL;
2567         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2568                 /* only one oap gets a request reference */
2569                 if (tmp == NULL)
2570                         tmp = oap;
2571                 if (oap->oap_interrupted && !req->rq_intr) {
2572                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2573                                oap, req);
2574                         ptlrpc_mark_interrupted(req);
2575                 }
2576         }
2577         if (tmp != NULL)
2578                 tmp->oap_request = ptlrpc_request_addref(req);
2579
2580         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2581                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2582
2583         req->rq_interpret_reply = brw_interpret;
2584         ptlrpcd_add_req(req);
2585         RETURN(1);
2586 }
2587
2588 #define LOI_DEBUG(LOI, STR, args...)                                     \
2589         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2590                !list_empty(&(LOI)->loi_ready_item) ||                    \
2591                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2592                (LOI)->loi_write_lop.lop_num_pending,                     \
2593                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2594                (LOI)->loi_read_lop.lop_num_pending,                      \
2595                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2596                args)                                                     \
2597
2598 /* This is called by osc_check_rpcs() to find which objects have pages that
2599  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2600 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2601 {
2602         ENTRY;
2603         /* First return objects that have blocked locks so that they
2604          * will be flushed quickly and other clients can get the lock,
2605          * then objects which have pages ready to be stuffed into RPCs */
2606         if (!list_empty(&cli->cl_loi_hp_ready_list))
2607                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2608                                   struct lov_oinfo, loi_hp_ready_item));
2609         if (!list_empty(&cli->cl_loi_ready_list))
2610                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2611                                   struct lov_oinfo, loi_ready_item));
2612
2613         /* then if we have cache waiters, return all objects with queued
2614          * writes.  This is especially important when many small files
2615          * have filled up the cache and not been fired into rpcs because
2616          * they don't pass the nr_pending/object threshhold */
2617         if (!list_empty(&cli->cl_cache_waiters) &&
2618             !list_empty(&cli->cl_loi_write_list))
2619                 RETURN(list_entry(cli->cl_loi_write_list.next,
2620                                   struct lov_oinfo, loi_write_item));
2621
2622         /* then return all queued objects when we have an invalid import
2623          * so that they get flushed */
2624         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2625                 if (!list_empty(&cli->cl_loi_write_list))
2626                         RETURN(list_entry(cli->cl_loi_write_list.next,
2627                                           struct lov_oinfo, loi_write_item));
2628                 if (!list_empty(&cli->cl_loi_read_list))
2629                         RETURN(list_entry(cli->cl_loi_read_list.next,
2630                                           struct lov_oinfo, loi_read_item));
2631         }
2632         RETURN(NULL);
2633 }
2634
2635 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2636 {
2637         struct osc_async_page *oap;
2638         int hprpc = 0;
2639
2640         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2641                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2642                                  struct osc_async_page, oap_urgent_item);
2643                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2644         }
2645
2646         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2647                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2648                                  struct osc_async_page, oap_urgent_item);
2649                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2650         }
2651
2652         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2653 }
2654
2655 /* called with the loi list lock held */
2656 static void osc_check_rpcs(struct client_obd *cli)
2657 {
2658         struct lov_oinfo *loi;
2659         int rc = 0, race_counter = 0;
2660         ENTRY;
2661
2662         while ((loi = osc_next_loi(cli)) != NULL) {
2663                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2664
2665                 if (osc_max_rpc_in_flight(cli, loi))
2666                         break;
2667
2668                 /* attempt some read/write balancing by alternating between
2669                  * reads and writes in an object.  The makes_rpc checks here
2670                  * would be redundant if we were getting read/write work items
2671                  * instead of objects.  we don't want send_oap_rpc to drain a
2672                  * partial read pending queue when we're given this object to
2673                  * do io on writes while there are cache waiters */
2674                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2675                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2676                                               &loi->loi_write_lop);
2677                         if (rc < 0)
2678                                 break;
2679                         if (rc > 0)
2680                                 race_counter = 0;
2681                         else
2682                                 race_counter++;
2683                 }
2684                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2685                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2686                                               &loi->loi_read_lop);
2687                         if (rc < 0)
2688                                 break;
2689                         if (rc > 0)
2690                                 race_counter = 0;
2691                         else
2692                                 race_counter++;
2693                 }
2694
2695                 /* attempt some inter-object balancing by issueing rpcs
2696                  * for each object in turn */
2697                 if (!list_empty(&loi->loi_hp_ready_item))
2698                         list_del_init(&loi->loi_hp_ready_item);
2699                 if (!list_empty(&loi->loi_ready_item))
2700                         list_del_init(&loi->loi_ready_item);
2701                 if (!list_empty(&loi->loi_write_item))
2702                         list_del_init(&loi->loi_write_item);
2703                 if (!list_empty(&loi->loi_read_item))
2704                         list_del_init(&loi->loi_read_item);
2705
2706                 loi_list_maint(cli, loi);
2707
2708                 /* send_oap_rpc fails with 0 when make_ready tells it to
2709                  * back off.  llite's make_ready does this when it tries
2710                  * to lock a page queued for write that is already locked.
2711                  * we want to try sending rpcs from many objects, but we
2712                  * don't want to spin failing with 0.  */
2713                 if (race_counter == 10)
2714                         break;
2715         }
2716         EXIT;
2717 }
2718
2719 /* we're trying to queue a page in the osc so we're subject to the
2720  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2721  * If the osc's queued pages are already at that limit, then we want to sleep
2722  * until there is space in the osc's queue for us.  We also may be waiting for
2723  * write credits from the OST if there are RPCs in flight that may return some
2724  * before we fall back to sync writes.
2725  *
2726  * We need this know our allocation was granted in the presence of signals */
2727 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2728 {
2729         int rc;
2730         ENTRY;
2731         client_obd_list_lock(&cli->cl_loi_list_lock);
2732         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2733         client_obd_list_unlock(&cli->cl_loi_list_lock);
2734         RETURN(rc);
2735 };
2736
2737 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2738  * grant or cache space. */
2739 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2740                            struct osc_async_page *oap)
2741 {
2742         struct osc_cache_waiter ocw;
2743         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2744         ENTRY;
2745
2746         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2747                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2748                cli->cl_dirty_max, obd_max_dirty_pages,
2749                cli->cl_lost_grant, cli->cl_avail_grant);
2750
2751         /* force the caller to try sync io.  this can jump the list
2752          * of queued writes and create a discontiguous rpc stream */
2753         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2754             loi->loi_ar.ar_force_sync)
2755                 RETURN(-EDQUOT);
2756
2757         /* Hopefully normal case - cache space and write credits available */
2758         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2759             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2760             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2761                 /* account for ourselves */
2762                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2763                 RETURN(0);
2764         }
2765
2766         /* It is safe to block as a cache waiter as long as there is grant
2767          * space available or the hope of additional grant being returned
2768          * when an in flight write completes.  Using the write back cache
2769          * if possible is preferable to sending the data synchronously
2770          * because write pages can then be merged in to large requests.
2771          * The addition of this cache waiter will causing pending write
2772          * pages to be sent immediately. */
2773         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2774                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2775                 cfs_waitq_init(&ocw.ocw_waitq);
2776                 ocw.ocw_oap = oap;
2777                 ocw.ocw_rc = 0;
2778
2779                 loi_list_maint(cli, loi);
2780                 osc_check_rpcs(cli);
2781                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2782
2783                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2784                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2785
2786                 client_obd_list_lock(&cli->cl_loi_list_lock);
2787                 if (!list_empty(&ocw.ocw_entry)) {
2788                         list_del(&ocw.ocw_entry);
2789                         RETURN(-EINTR);
2790                 }
2791                 RETURN(ocw.ocw_rc);
2792         }
2793
2794         RETURN(-EDQUOT);
2795 }
2796
2797 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2798                         void **res, int rw, obd_off start, obd_off end,
2799                         struct lustre_handle *lockh, int flags)
2800 {
2801         struct ldlm_lock *lock = NULL;
2802         int rc, release = 0;
2803
2804         ENTRY;
2805
2806         if (lockh && lustre_handle_is_used(lockh)) {
2807                 /* if a valid lockh is passed, just check that the corresponding
2808                  * lock covers the extent */
2809                 lock = ldlm_handle2lock(lockh);
2810                 release = 1;
2811         } else {
2812                 struct osc_async_page *oap = *res;
2813                 spin_lock(&oap->oap_lock);
2814                 lock = oap->oap_ldlm_lock;
2815                 if (likely(lock))
2816                         LDLM_LOCK_GET(lock);
2817                 spin_unlock(&oap->oap_lock);
2818         }
2819         /* lock can be NULL in case race obd_get_lock vs lock cancel
2820          * so we should be don't try match this */
2821         if (unlikely(!lock))
2822                 return 0;
2823
2824         rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2825         if (release == 1 && rc == 1)
2826                 /* if a valid lockh was passed, we just need to check
2827                  * that the lock covers the page, no reference should be
2828                  * taken*/
2829                 ldlm_lock_decref(lockh,
2830                                  rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2831         LDLM_LOCK_PUT(lock);
2832         RETURN(rc);
2833 }
2834
2835 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2836                         struct lov_oinfo *loi, cfs_page_t *page,
2837                         obd_off offset, struct obd_async_page_ops *ops,
2838                         void *data, void **res, int flags,
2839                         struct lustre_handle *lockh)
2840 {
2841         struct osc_async_page *oap;
2842         struct ldlm_res_id oid = {{0}};
2843         int rc = 0;
2844
2845         ENTRY;
2846
2847         if (!page)
2848                 return size_round(sizeof(*oap));
2849
2850         oap = *res;
2851         oap->oap_magic = OAP_MAGIC;
2852         oap->oap_cli = &exp->exp_obd->u.cli;
2853         oap->oap_loi = loi;
2854
2855         oap->oap_caller_ops = ops;
2856         oap->oap_caller_data = data;
2857
2858         oap->oap_page = page;
2859         oap->oap_obj_off = offset;
2860
2861         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2862         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2863         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2864         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2865
2866         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2867
2868         spin_lock_init(&oap->oap_lock);
2869
2870         /* If the page was marked as notcacheable - don't add to any locks */
2871         if (!(flags & OBD_PAGE_NO_CACHE)) {
2872                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2873                 /* This is the only place where we can call cache_add_extent
2874                    without oap_lock, because this page is locked now, and
2875                    the lock we are adding it to is referenced, so cannot lose
2876                    any pages either. */
2877                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2878                 if (rc)
2879                         RETURN(rc);
2880         }
2881
2882         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2883         RETURN(0);
2884 }
2885
2886 struct osc_async_page *oap_from_cookie(void *cookie)
2887 {
2888         struct osc_async_page *oap = cookie;
2889         if (oap->oap_magic != OAP_MAGIC)
2890                 return ERR_PTR(-EINVAL);
2891         return oap;
2892 };
2893
2894 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2895                               struct lov_oinfo *loi, void *cookie,
2896                               int cmd, obd_off off, int count,
2897                               obd_flag brw_flags, enum async_flags async_flags)
2898 {
2899         struct client_obd *cli = &exp->exp_obd->u.cli;
2900         struct osc_async_page *oap;
2901         int rc = 0;
2902         ENTRY;
2903
2904         oap = oap_from_cookie(cookie);
2905         if (IS_ERR(oap))
2906                 RETURN(PTR_ERR(oap));
2907
2908         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2909                 RETURN(-EIO);
2910
2911         if (!list_empty(&oap->oap_pending_item) ||
2912             !list_empty(&oap->oap_urgent_item) ||
2913             !list_empty(&oap->oap_rpc_item))
2914                 RETURN(-EBUSY);
2915
2916         /* check if the file's owner/group is over quota */
2917         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2918                 struct obd_async_page_ops *ops;
2919                 struct obdo *oa;
2920
2921                 OBDO_ALLOC(oa);
2922                 if (oa == NULL)
2923                         RETURN(-ENOMEM);
2924
2925                 ops = oap->oap_caller_ops;
2926                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2927                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2928                     NO_QUOTA)
2929                         rc = -EDQUOT;
2930
2931                 OBDO_FREE(oa);
2932                 if (rc)
2933                         RETURN(rc);
2934         }
2935
2936         if (loi == NULL)
2937                 loi = lsm->lsm_oinfo[0];
2938
2939         client_obd_list_lock(&cli->cl_loi_list_lock);
2940
2941         oap->oap_cmd = cmd;
2942         oap->oap_page_off = off;
2943         oap->oap_count = count;
2944         oap->oap_brw_flags = brw_flags;
2945         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2946         if (libcfs_memory_pressure_get())
2947                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2948         spin_lock(&oap->oap_lock);
2949         oap->oap_async_flags = async_flags;
2950         spin_unlock(&oap->oap_lock);
2951
2952         if (cmd & OBD_BRW_WRITE) {
2953                 rc = osc_enter_cache(cli, loi, oap);
2954                 if (rc) {
2955                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2956                         RETURN(rc);
2957                 }
2958         }
2959
2960         osc_oap_to_pending(oap);
2961         loi_list_maint(cli, loi);
2962
2963         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2964                   cmd);
2965
2966         osc_check_rpcs(cli);
2967         client_obd_list_unlock(&cli->cl_loi_list_lock);
2968
2969         RETURN(0);
2970 }
2971
2972 /* aka (~was & now & flag), but this is more clear :) */
2973 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2974
2975 static int osc_set_async_flags(struct obd_export *exp,
2976                                struct lov_stripe_md *lsm,
2977                                struct lov_oinfo *loi, void *cookie,
2978                                obd_flag async_flags)
2979 {
2980         struct client_obd *cli = &exp->exp_obd->u.cli;
2981         struct loi_oap_pages *lop;
2982         struct osc_async_page *oap;
2983         int rc = 0;
2984         ENTRY;
2985
2986         oap = oap_from_cookie(cookie);
2987         if (IS_ERR(oap))
2988                 RETURN(PTR_ERR(oap));
2989
2990         /*
2991          * bug 7311: OST-side locking is only supported for liblustre for now
2992          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2993          * implementation has to handle case where OST-locked page was picked
2994          * up by, e.g., ->writepage().
2995          */
2996         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2997         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2998                                      * tread here. */
2999
3000         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3001                 RETURN(-EIO);
3002
3003         if (loi == NULL)
3004                 loi = lsm->lsm_oinfo[0];
3005
3006         if (oap->oap_cmd & OBD_BRW_WRITE) {
3007                 lop = &loi->loi_write_lop;
3008         } else {
3009                 lop = &loi->loi_read_lop;
3010         }
3011
3012         client_obd_list_lock(&cli->cl_loi_list_lock);
3013         /* oap_lock provides atomic semantics of oap_async_flags access */
3014         spin_lock(&oap->oap_lock);
3015         if (list_empty(&oap->oap_pending_item))
3016                 GOTO(out, rc = -EINVAL);
3017
3018         if ((oap->oap_async_flags & async_flags) == async_flags)
3019                 GOTO(out, rc = 0);
3020
3021         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3022                 oap->oap_async_flags |= ASYNC_READY;
3023
3024         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3025             list_empty(&oap->oap_rpc_item)) {
3026                 if (oap->oap_async_flags & ASYNC_HP)
3027                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3028                 else
3029                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3030                 oap->oap_async_flags |= ASYNC_URGENT;
3031                 loi_list_maint(cli, loi);
3032         }
3033
3034         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3035                         oap->oap_async_flags);
3036 out:
3037         spin_unlock(&oap->oap_lock);
3038         osc_check_rpcs(cli);
3039         client_obd_list_unlock(&cli->cl_loi_list_lock);
3040         RETURN(rc);
3041 }
3042
3043 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3044                              struct lov_oinfo *loi,
3045                              struct obd_io_group *oig, void *cookie,
3046                              int cmd, obd_off off, int count,
3047                              obd_flag brw_flags,
3048                              obd_flag async_flags)
3049 {
3050         struct client_obd *cli = &exp->exp_obd->u.cli;
3051         struct osc_async_page *oap;
3052         struct loi_oap_pages *lop;
3053         int rc = 0;
3054         ENTRY;
3055
3056         oap = oap_from_cookie(cookie);
3057         if (IS_ERR(oap))
3058                 RETURN(PTR_ERR(oap));
3059
3060         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3061                 RETURN(-EIO);
3062
3063         if (!list_empty(&oap->oap_pending_item) ||
3064             !list_empty(&oap->oap_urgent_item) ||
3065             !list_empty(&oap->oap_rpc_item))
3066                 RETURN(-EBUSY);
3067
3068         if (loi == NULL)
3069                 loi = lsm->lsm_oinfo[0];
3070
3071         client_obd_list_lock(&cli->cl_loi_list_lock);
3072
3073         oap->oap_cmd = cmd;
3074         oap->oap_page_off = off;
3075         oap->oap_count = count;
3076         oap->oap_brw_flags = brw_flags;
3077         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3078         if (libcfs_memory_pressure_get())
3079                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3080         spin_lock(&oap->oap_lock);
3081         oap->oap_async_flags = async_flags;
3082         spin_unlock(&oap->oap_lock);
3083
3084         if (cmd & OBD_BRW_WRITE)
3085                 lop = &loi->loi_write_lop;
3086         else
3087                 lop = &loi->loi_read_lop;
3088
3089         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3090         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3091                 oap->oap_oig = oig;
3092                 rc = oig_add_one(oig, &oap->oap_occ);
3093         }
3094
3095         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3096                   oap, oap->oap_page, rc);
3097
3098         client_obd_list_unlock(&cli->cl_loi_list_lock);
3099
3100         RETURN(rc);
3101 }
3102
3103 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3104                                  struct loi_oap_pages *lop, int cmd)
3105 {
3106         struct list_head *pos, *tmp;
3107         struct osc_async_page *oap;
3108
3109         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3110                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3111                 list_del(&oap->oap_pending_item);
3112                 osc_oap_to_pending(oap);
3113         }
3114         loi_list_maint(cli, loi);
3115 }
3116
3117 static int osc_trigger_group_io(struct obd_export *exp,
3118                                 struct lov_stripe_md *lsm,
3119                                 struct lov_oinfo *loi,
3120                                 struct obd_io_group *oig)
3121 {
3122         struct client_obd *cli = &exp->exp_obd->u.cli;
3123         ENTRY;
3124
3125         if (loi == NULL)
3126                 loi = lsm->lsm_oinfo[0];
3127
3128         client_obd_list_lock(&cli->cl_loi_list_lock);
3129
3130         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3131         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3132
3133         osc_check_rpcs(cli);
3134         client_obd_list_unlock(&cli->cl_loi_list_lock);
3135
3136         RETURN(0);
3137 }
3138
3139 static int osc_teardown_async_page(struct obd_export *exp,
3140                                    struct lov_stripe_md *lsm,
3141                                    struct lov_oinfo *loi, void *cookie)
3142 {
3143         struct client_obd *cli = &exp->exp_obd->u.cli;
3144         struct loi_oap_pages *lop;
3145         struct osc_async_page *oap;
3146         int rc = 0;
3147         ENTRY;
3148
3149         oap = oap_from_cookie(cookie);
3150         if (IS_ERR(oap))
3151                 RETURN(PTR_ERR(oap));
3152
3153         if (loi == NULL)
3154                 loi = lsm->lsm_oinfo[0];
3155
3156         if (oap->oap_cmd & OBD_BRW_WRITE) {
3157                 lop = &loi->loi_write_lop;
3158         } else {
3159                 lop = &loi->loi_read_lop;
3160         }
3161
3162         client_obd_list_lock(&cli->cl_loi_list_lock);
3163
3164         if (!list_empty(&oap->oap_rpc_item))
3165                 GOTO(out, rc = -EBUSY);
3166
3167         osc_exit_cache(cli, oap, 0);
3168         osc_wake_cache_waiters(cli);
3169
3170         if (!list_empty(&oap->oap_urgent_item)) {
3171                 list_del_init(&oap->oap_urgent_item);
3172                 spin_lock(&oap->oap_lock);
3173                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3174                 spin_unlock(&oap->oap_lock);
3175         }
3176
3177         if (!list_empty(&oap->oap_pending_item)) {
3178                 list_del_init(&oap->oap_pending_item);
3179                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3180         }
3181         loi_list_maint(cli, loi);
3182         cache_remove_extent(cli->cl_cache, oap);
3183
3184         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3185 out:
3186         client_obd_list_unlock(&cli->cl_loi_list_lock);
3187         RETURN(rc);
3188 }
3189
3190 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3191                            struct ldlm_lock_desc *new, void *data,
3192                            int flag)
3193 {
3194         struct lustre_handle lockh = { 0 };
3195         int rc;
3196         ENTRY;
3197
3198         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3199                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3200                 LBUG();
3201         }
3202
3203         switch (flag) {
3204         case LDLM_CB_BLOCKING:
3205                 ldlm_lock2handle(lock, &lockh);
3206                 rc = ldlm_cli_cancel(&lockh);
3207                 if (rc != ELDLM_OK)
3208                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
3209                 break;
3210         case LDLM_CB_CANCELING: {
3211
3212                 ldlm_lock2handle(lock, &lockh);
3213                 /* This lock wasn't granted, don't try to do anything */
3214                 if (lock->l_req_mode != lock->l_granted_mode)
3215                         RETURN(0);
3216
3217                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3218                                   &lockh);
3219
3220                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3221                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3222                                                           lock, new, data,flag);
3223                 break;
3224         }
3225         default:
3226                 LBUG();
3227         }
3228
3229         RETURN(0);
3230 }
3231 EXPORT_SYMBOL(osc_extent_blocking_cb);
3232
3233 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3234                                     int flags)
3235 {
3236         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3237
3238         if (lock == NULL) {
3239                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3240                 return;
3241         }
3242         lock_res_and_lock(lock);
3243 #if defined (__KERNEL__) && defined (__linux__)
3244         /* Liang XXX: Darwin and Winnt checking should be added */
3245         if (lock->l_ast_data && lock->l_ast_data != data) {
3246                 struct inode *new_inode = data;
3247                 struct inode *old_inode = lock->l_ast_data;
3248                 if (!(old_inode->i_state & I_FREEING))
3249                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3250                 LASSERTF(old_inode->i_state & I_FREEING,
3251                          "Found existing inode %p/%lu/%u state %lu in lock: "
3252                          "setting data to %p/%lu/%u\n", old_inode,
3253                          old_inode->i_ino, old_inode->i_generation,
3254                          old_inode->i_state,
3255                          new_inode, new_inode->i_ino, new_inode->i_generation);
3256         }
3257 #endif
3258         lock->l_ast_data = data;
3259         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3260         unlock_res_and_lock(lock);
3261         LDLM_LOCK_PUT(lock);
3262 }
3263
3264 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3265                              ldlm_iterator_t replace, void *data)
3266 {
3267         struct ldlm_res_id res_id;
3268         struct obd_device *obd = class_exp2obd(exp);
3269
3270         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3271         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3272         return 0;
3273 }
3274
3275 /* find any ldlm lock of the inode in osc
3276  * return 0    not find
3277  *        1    find one
3278  *      < 0    error */
3279 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3280                            ldlm_iterator_t replace, void *data)
3281 {
3282         struct ldlm_res_id res_id;
3283         struct obd_device *obd = class_exp2obd(exp);
3284         int rc = 0;
3285
3286         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3287         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3288         if (rc == LDLM_ITER_STOP)
3289                 return(1);
3290         if (rc == LDLM_ITER_CONTINUE)
3291                 return(0);
3292         return(rc);
3293 }
3294
3295 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3296                             struct obd_info *oinfo, int intent, int rc)
3297 {
3298         ENTRY;
3299
3300         if (intent) {
3301                 /* The request was created before ldlm_cli_enqueue call. */
3302                 if (rc == ELDLM_LOCK_ABORTED) {
3303                         struct ldlm_reply *rep;
3304
3305                         /* swabbed by ldlm_cli_enqueue() */
3306                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3307                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3308                                              sizeof(*rep));
3309                         LASSERT(rep != NULL);
3310                         if (rep->lock_policy_res1)
3311                                 rc = rep->lock_policy_res1;
3312                 }
3313         }
3314
3315         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3316                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3317                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3318                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3319                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3320         }
3321
3322         if (!rc)
3323                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3324
3325         /* Call the update callback. */
3326         rc = oinfo->oi_cb_up(oinfo, rc);
3327         RETURN(rc);
3328 }
3329
3330 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3331                                  void *data, int rc)
3332 {
3333         struct osc_enqueue_args *aa = data;
3334         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3335         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3336         struct ldlm_lock *lock;
3337
3338         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3339          * be valid. */
3340         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3341
3342         /* Complete obtaining the lock procedure. */
3343         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3344                                    aa->oa_ei->ei_mode,
3345                                    &aa->oa_oi->oi_flags,
3346                                    &lsm->lsm_oinfo[0]->loi_lvb,
3347                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3348                                    lustre_swab_ost_lvb,
3349                                    aa->oa_oi->oi_lockh, rc);
3350
3351         /* Complete osc stuff. */
3352         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3353
3354         /* Release the lock for async request. */
3355         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3356                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3357
3358         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3359                  aa->oa_oi->oi_lockh, req, aa);
3360         LDLM_LOCK_PUT(lock);
3361         return rc;
3362 }
3363
3364 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3365  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3366  * other synchronous requests, however keeping some locks and trying to obtain
3367  * others may take a considerable amount of time in a case of ost failure; and
3368  * when other sync requests do not get released lock from a client, the client
3369  * is excluded from the cluster -- such scenarious make the life difficult, so
3370  * release locks just after they are obtained. */
3371 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3372                        struct ldlm_enqueue_info *einfo,
3373                        struct ptlrpc_request_set *rqset)
3374 {
3375         struct ldlm_res_id res_id;
3376         struct obd_device *obd = exp->exp_obd;
3377         struct ldlm_reply *rep;
3378         struct ptlrpc_request *req = NULL;
3379         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3380         ldlm_mode_t mode;
3381         int rc;
3382         ENTRY;
3383
3384         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3385                            oinfo->oi_md->lsm_object_gr, &res_id);
3386         /* Filesystem lock extents are extended to page boundaries so that
3387          * dealing with the page cache is a little smoother.  */
3388         oinfo->oi_policy.l_extent.start -=
3389                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3390         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3391
3392         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3393                 goto no_match;
3394
3395         /* Next, search for already existing extent locks that will cover us */
3396         /* If we're trying to read, we also search for an existing PW lock.  The
3397          * VFS and page cache already protect us locally, so lots of readers/
3398          * writers can share a single PW lock.
3399          *
3400          * There are problems with conversion deadlocks, so instead of
3401          * converting a read lock to a write lock, we'll just enqueue a new
3402          * one.
3403          *
3404          * At some point we should cancel the read lock instead of making them
3405          * send us a blocking callback, but there are problems with canceling
3406          * locks out from other users right now, too. */
3407         mode = einfo->ei_mode;
3408         if (einfo->ei_mode == LCK_PR)
3409                 mode |= LCK_PW;
3410         mode = ldlm_lock_match(obd->obd_namespace,
3411                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3412                                einfo->ei_type, &oinfo->oi_policy, mode,
3413                                oinfo->oi_lockh);
3414         if (mode) {
3415                 /* addref the lock only if not async requests and PW lock is
3416                  * matched whereas we asked for PR. */
3417                 if (!rqset && einfo->ei_mode != mode)
3418                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3419                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3420                                         oinfo->oi_flags);
3421                 if (intent) {
3422                         /* I would like to be able to ASSERT here that rss <=
3423                          * kms, but I can't, for reasons which are explained in
3424                          * lov_enqueue() */
3425                 }
3426
3427                 /* We already have a lock, and it's referenced */
3428                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3429
3430                 /* For async requests, decref the lock. */
3431                 if (einfo->ei_mode != mode)
3432                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3433                 else if (rqset)
3434                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3435
3436                 RETURN(ELDLM_OK);
3437         }
3438
3439  no_match:
3440         if (intent) {
3441                 __u32 size[3] = {
3442                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3443                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3444                         [DLM_LOCKREQ_OFF + 1] = 0 };
3445
3446                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3447                 if (req == NULL)
3448                         RETURN(-ENOMEM);
3449
3450                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3451                 size[DLM_REPLY_REC_OFF] =
3452                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3453                 ptlrpc_req_set_repsize(req, 3, size);
3454         }
3455
3456         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3457         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3458
3459         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3460                               &oinfo->oi_policy, &oinfo->oi_flags,
3461                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3462                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3463                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3464                               rqset ? 1 : 0);
3465         if (rqset) {
3466                 if (!rc) {
3467                         struct osc_enqueue_args *aa;
3468                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3469                         aa = ptlrpc_req_async_args(req);
3470                         aa->oa_oi = oinfo;
3471                         aa->oa_ei = einfo;
3472                         aa->oa_exp = exp;
3473
3474                         req->rq_interpret_reply = osc_enqueue_interpret;
3475                         ptlrpc_set_add_req(rqset, req);
3476                 } else if (intent) {
3477                         ptlrpc_req_finished(req);
3478                 }
3479                 RETURN(rc);
3480         }
3481
3482         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3483         if (intent)
3484                 ptlrpc_req_finished(req);
3485
3486         RETURN(rc);
3487 }
3488
3489 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3490                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3491                      int *flags, void *data, struct lustre_handle *lockh,
3492                      int *n_matches)
3493 {
3494         struct ldlm_res_id res_id;
3495         struct obd_device *obd = exp->exp_obd;
3496         int lflags = *flags;
3497         ldlm_mode_t rc;
3498         ENTRY;
3499
3500         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3501
3502         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3503
3504         /* Filesystem lock extents are extended to page boundaries so that
3505          * dealing with the page cache is a little smoother */
3506         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3507         policy->l_extent.end |= ~CFS_PAGE_MASK;
3508
3509         /* Next, search for already existing extent locks that will cover us */
3510         /* If we're trying to read, we also search for an existing PW lock.  The
3511          * VFS and page cache already protect us locally, so lots of readers/
3512          * writers can share a single PW lock. */
3513         rc = mode;
3514         if (mode == LCK_PR)
3515                 rc |= LCK_PW;
3516         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3517                              &res_id, type, policy, rc, lockh);
3518         if (rc) {
3519                 osc_set_data_with_check(lockh, data, lflags);
3520                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3521                         ldlm_lock_addref(lockh, LCK_PR);
3522                         ldlm_lock_decref(lockh, LCK_PW);
3523                 }
3524                 if (n_matches != NULL)
3525                         (*n_matches)++;
3526         }
3527
3528         RETURN(rc);
3529 }
3530
3531 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3532                       __u32 mode, struct lustre_handle *lockh, int flags,
3533                       obd_off end)
3534 {
3535         ENTRY;
3536
3537         if (unlikely(mode == LCK_GROUP))
3538                 ldlm_lock_decref_and_cancel(lockh, mode);
3539         else
3540                 ldlm_lock_decref(lockh, mode);
3541
3542         RETURN(0);
3543 }
3544
3545 static int osc_cancel_unused(struct obd_export *exp,
3546                              struct lov_stripe_md *lsm, int flags, void *opaque)
3547 {
3548         struct obd_device *obd = class_exp2obd(exp);
3549         struct ldlm_res_id res_id, *resp = NULL;
3550
3551         if (lsm != NULL) {
3552                 resp = osc_build_res_name(lsm->lsm_object_id,
3553                                           lsm->lsm_object_gr, &res_id);
3554         }
3555
3556         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3557
3558 }
3559
3560 static int osc_join_lru(struct obd_export *exp,
3561                         struct lov_stripe_md *lsm, int join)
3562 {
3563         struct obd_device *obd = class_exp2obd(exp);
3564         struct ldlm_res_id res_id, *resp = NULL;
3565
3566         if (lsm != NULL) {
3567                 resp = osc_build_res_name(lsm->lsm_object_id,
3568                                           lsm->lsm_object_gr, &res_id);
3569         }
3570
3571         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3572
3573 }
3574
3575 static int osc_statfs_interpret(struct ptlrpc_request *req,
3576                                 void *data, int rc)
3577 {
3578         struct osc_async_args *aa = data;
3579         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3580         struct obd_statfs *msfs;
3581         __u64 used;
3582         ENTRY;
3583
3584         if (rc == -EBADR)
3585                 /* The request has in fact never been sent
3586                  * due to issues at a higher level (LOV).
3587                  * Exit immediately since the caller is
3588                  * aware of the problem and takes care
3589                  * of the clean up */
3590                  RETURN(rc);
3591
3592         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3593             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3594                 GOTO(out, rc = 0);
3595
3596         if (rc != 0)
3597                 GOTO(out, rc);
3598
3599         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3600                                   lustre_swab_obd_statfs);
3601         if (msfs == NULL) {
3602                 CERROR("Can't unpack obd_statfs\n");
3603                 GOTO(out, rc = -EPROTO);
3604         }
3605
3606         /* Reinitialize the RDONLY and DEGRADED flags at the client
3607          * on each statfs, so they don't stay set permanently. */
3608         spin_lock(&cli->cl_oscc.oscc_lock);
3609
3610         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3611                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3612         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3613                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3614
3615         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3616                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3617         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3618                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3619
3620         /* Add a bit of hysteresis so this flag isn't continually flapping,
3621          * and ensure that new files don't get extremely fragmented due to
3622          * only a small amount of available space in the filesystem.
3623          * We want to set the NOSPC flag when there is less than ~0.1% free
3624          * and clear it when there is at least ~0.2% free space, so:
3625          *                   avail < ~0.1% max          max = avail + used
3626          *            1025 * avail < avail + used       used = blocks - free
3627          *            1024 * avail < used
3628          *            1024 * avail < blocks - free
3629          *                   avail < ((blocks - free) >> 10)
3630          *
3631          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3632          * lose that amount of space so in those cases we report no space left
3633          * if their is less than 1 GB left.                             */
3634         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3635         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3636                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3637                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3638         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3639                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3640                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3641
3642         spin_unlock(&cli->cl_oscc.oscc_lock);
3643
3644         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3645 out:
3646         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3647         RETURN(rc);
3648 }
3649
3650 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3651                             __u64 max_age, struct ptlrpc_request_set *rqset)
3652 {
3653         struct ptlrpc_request *req;
3654         struct osc_async_args *aa;
3655         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3656         ENTRY;
3657
3658         /* We could possibly pass max_age in the request (as an absolute
3659          * timestamp or a "seconds.usec ago") so the target can avoid doing
3660          * extra calls into the filesystem if that isn't necessary (e.g.
3661          * during mount that would help a bit).  Having relative timestamps
3662          * is not so great if request processing is slow, while absolute
3663          * timestamps are not ideal because they need time synchronization. */
3664         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3665                               OST_STATFS, 1, NULL, NULL);
3666         if (!req)
3667                 RETURN(-ENOMEM);
3668
3669         ptlrpc_req_set_repsize(req, 2, size);
3670         req->rq_request_portal = OST_CREATE_PORTAL;
3671         ptlrpc_at_set_req_timeout(req);
3672         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3673                 /* procfs requests not want stat in wait for avoid deadlock */
3674                 req->rq_no_resend = 1;
3675                 req->rq_no_delay = 1;
3676         }
3677
3678         req->rq_interpret_reply = osc_statfs_interpret;
3679         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3680         aa = ptlrpc_req_async_args(req);
3681         aa->aa_oi = oinfo;
3682
3683         ptlrpc_set_add_req(rqset, req);
3684         RETURN(0);
3685 }
3686
3687 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3688                       __u64 max_age, __u32 flags)
3689 {
3690         struct obd_statfs *msfs;
3691         struct ptlrpc_request *req;
3692         struct obd_import     *imp = NULL;
3693         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3694         int rc;
3695         ENTRY;
3696
3697         /*Since the request might also come from lprocfs, so we need
3698          *sync this with client_disconnect_export Bug15684*/
3699         down_read(&obd->u.cli.cl_sem);
3700         if (obd->u.cli.cl_import)
3701                 imp = class_import_get(obd->u.cli.cl_import);
3702         up_read(&obd->u.cli.cl_sem);
3703         if (!imp)
3704                 RETURN(-ENODEV);
3705
3706         /* We could possibly pass max_age in the request (as an absolute
3707          * timestamp or a "seconds.usec ago") so the target can avoid doing
3708          * extra calls into the filesystem if that isn't necessary (e.g.
3709          * during mount that would help a bit).  Having relative timestamps
3710          * is not so great if request processing is slow, while absolute
3711          * timestamps are not ideal because they need time synchronization. */
3712         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3713                               OST_STATFS, 1, NULL, NULL);
3714
3715         class_import_put(imp);
3716         if (!req)
3717                 RETURN(-ENOMEM);
3718
3719         ptlrpc_req_set_repsize(req, 2, size);
3720         req->rq_request_portal = OST_CREATE_PORTAL;
3721         ptlrpc_at_set_req_timeout(req);
3722
3723         if (flags & OBD_STATFS_NODELAY) {
3724                 /* procfs requests not want stat in wait for avoid deadlock */
3725                 req->rq_no_resend = 1;
3726                 req->rq_no_delay = 1;
3727         }
3728
3729         rc = ptlrpc_queue_wait(req);
3730         if (rc)
3731                 GOTO(out, rc);
3732
3733         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3734                                   lustre_swab_obd_statfs);
3735         if (msfs == NULL) {
3736                 CERROR("Can't unpack obd_statfs\n");
3737                 GOTO(out, rc = -EPROTO);
3738         }
3739
3740         memcpy(osfs, msfs, sizeof(*osfs));
3741
3742         EXIT;
3743  out:
3744         ptlrpc_req_finished(req);
3745         return rc;
3746 }
3747
3748 /* Retrieve object striping information.
3749  *
3750  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3751  * the maximum number of OST indices which will fit in the user buffer.
3752  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3753  */
3754 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3755 {
3756         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3757         struct lov_user_md_v3 lum, *lumk;
3758         int rc = 0, lum_size;
3759         struct lov_user_ost_data_v1 *lmm_objects;
3760         ENTRY;
3761
3762         if (!lsm)
3763                 RETURN(-ENODATA);
3764
3765         /* we only need the header part from user space to get lmm_magic and
3766          * lmm_stripe_count, (the header part is common to v1 and v3) */
3767         lum_size = sizeof(struct lov_user_md_v1);
3768         memset(&lum, 0x00, sizeof(lum));
3769         if (copy_from_user(&lum, lump, lum_size))
3770                 RETURN(-EFAULT);
3771
3772         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3773             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3774                 RETURN(-EINVAL);
3775
3776         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3777         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3778         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3779         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3780
3781         /* we can use lov_mds_md_size() to compute lum_size
3782          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3783         if (lum.lmm_stripe_count > 0) {
3784                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3785                 OBD_ALLOC(lumk, lum_size);
3786                 if (!lumk)
3787                         RETURN(-ENOMEM);
3788                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3789                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3790                 else
3791                         lmm_objects = &(lumk->lmm_objects[0]);
3792                 lmm_objects->l_object_id = lsm->lsm_object_id;
3793         } else {
3794                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3795                 lumk = &lum;
3796         }
3797
3798         lumk->lmm_magic = lum.lmm_magic;
3799         lumk->lmm_stripe_count = 1;
3800         lumk->lmm_object_id = lsm->lsm_object_id;
3801
3802         if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3803             (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3804                /* lsm not in host order, so count also need be in same order */
3805                 __swab32s(&lumk->lmm_magic);
3806                 __swab16s(&lumk->lmm_stripe_count);
3807                 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3808                 if (lum.lmm_stripe_count > 0)
3809                         lustre_swab_lov_user_md_objects(
3810                                 (struct lov_user_md_v1*)lumk);
3811         }
3812
3813         if (copy_to_user(lump, lumk, lum_size))
3814                 rc = -EFAULT;
3815
3816         if (lumk != &lum)
3817                 OBD_FREE(lumk, lum_size);
3818
3819         RETURN(rc);
3820 }
3821
3822
3823 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3824                          void *karg, void *uarg)
3825 {
3826         struct obd_device *obd = exp->exp_obd;
3827         struct obd_ioctl_data *data = karg;
3828         int err = 0;
3829         ENTRY;
3830
3831         if (!try_module_get(THIS_MODULE)) {
3832                 CERROR("Can't get module. Is it alive?");
3833                 return -EINVAL;
3834         }
3835         switch (cmd) {
3836         case OBD_IOC_LOV_GET_CONFIG: {
3837                 char *buf;
3838                 struct lov_desc *desc;
3839                 struct obd_uuid uuid;
3840
3841                 buf = NULL;
3842                 len = 0;
3843                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3844                         GOTO(out, err = -EINVAL);
3845
3846                 data = (struct obd_ioctl_data *)buf;
3847
3848                 if (sizeof(*desc) > data->ioc_inllen1) {
3849                         obd_ioctl_freedata(buf, len);
3850                         GOTO(out, err = -EINVAL);
3851                 }
3852
3853                 if (data->ioc_inllen2 < sizeof(uuid)) {
3854                         obd_ioctl_freedata(buf, len);
3855                         GOTO(out, err = -EINVAL);
3856                 }
3857
3858                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3859                 desc->ld_tgt_count = 1;
3860                 desc->ld_active_tgt_count = 1;
3861                 desc->ld_default_stripe_count = 1;
3862                 desc->ld_default_stripe_size = 0;
3863                 desc->ld_default_stripe_offset = 0;
3864                 desc->ld_pattern = 0;
3865                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3866
3867                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3868
3869                 err = copy_to_user((void *)uarg, buf, len);
3870                 if (err)
3871                         err = -EFAULT;
3872                 obd_ioctl_freedata(buf, len);
3873                 GOTO(out, err);
3874         }
3875         case LL_IOC_LOV_SETSTRIPE:
3876                 err = obd_alloc_memmd(exp, karg);
3877                 if (err > 0)
3878                         err = 0;
3879                 GOTO(out, err);
3880         case LL_IOC_LOV_GETSTRIPE:
3881                 err = osc_getstripe(karg, uarg);
3882                 GOTO(out, err);
3883         case OBD_IOC_CLIENT_RECOVER:
3884                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3885                                             data->ioc_inlbuf1);
3886                 if (err > 0)
3887                         err = 0;
3888                 GOTO(out, err);
3889         case IOC_OSC_SET_ACTIVE:
3890                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3891                                                data->ioc_offset);
3892                 GOTO(out, err);
3893         case OBD_IOC_POLL_QUOTACHECK:
3894                 err = lquota_poll_check(quota_interface, exp,
3895                                         (struct if_quotacheck *)karg);
3896                 GOTO(out, err);
3897         case OBD_IOC_DESTROY: {
3898                 struct obdo            *oa;
3899
3900                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3901                         GOTO (out, err = -EPERM);
3902                 oa = &data->ioc_obdo1;
3903
3904                 if (oa->o_id == 0)
3905                         GOTO(out, err = -EINVAL);
3906
3907                 oa->o_valid |= OBD_MD_FLGROUP;
3908
3909                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3910                 GOTO(out, err);
3911         }
3912         case OBD_IOC_PING_TARGET:
3913                 err = ptlrpc_obd_ping(obd);
3914                 GOTO(out, err);
3915         default:
3916                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3917                        cmd, cfs_curproc_comm());
3918                 GOTO(out, err = -ENOTTY);
3919         }
3920 out:
3921         module_put(THIS_MODULE);
3922         return err;
3923 }
3924
3925 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3926                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3927 {
3928         ENTRY;
3929         if (!vallen || !val)
3930                 RETURN(-EFAULT);
3931
3932         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3933                 __u32 *stripe = val;
3934                 *vallen = sizeof(*stripe);
3935                 *stripe = 0;
3936                 RETURN(0);
3937         } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3938                 struct client_obd *cli = &exp->exp_obd->u.cli;
3939                 __u64 *rpcsize = val;
3940                 LASSERT(*vallen == sizeof(__u64));
3941                 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3942                 RETURN(0);
3943         } else if (KEY_IS(KEY_LAST_ID)) {
3944                 struct ptlrpc_request *req;
3945                 obd_id *reply;
3946                 char *bufs[2] = { NULL, key };
3947                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3948                 int rc;
3949
3950                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3951                                       OST_GET_INFO, 2, size, bufs);
3952                 if (req == NULL)
3953                         RETURN(-ENOMEM);
3954
3955                 size[REPLY_REC_OFF] = *vallen;
3956                 ptlrpc_req_set_repsize(req, 2, size);
3957                 rc = ptlrpc_queue_wait(req);
3958                 if (rc)
3959                         GOTO(out, rc);
3960
3961                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3962                                            lustre_swab_ost_last_id);
3963                 if (reply == NULL) {
3964                         CERROR("Can't unpack OST last ID\n");
3965                         GOTO(out, rc = -EPROTO);
3966                 }
3967                 *((obd_id *)val) = *reply;
3968         out:
3969                 ptlrpc_req_finished(req);
3970                 RETURN(rc);
3971         } else if (KEY_IS(KEY_FIEMAP)) {
3972                 struct ptlrpc_request *req;
3973                 struct ll_user_fiemap *reply;
3974                 char *bufs[2] = { NULL, key };
3975                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3976                 int rc;
3977
3978                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3979                                       OST_GET_INFO, 2, size, bufs);
3980                 if (req == NULL)
3981                         RETURN(-ENOMEM);
3982
3983                 size[REPLY_REC_OFF] = *vallen;
3984                 ptlrpc_req_set_repsize(req, 2, size);
3985
3986                 rc = ptlrpc_queue_wait(req);
3987                 if (rc)
3988                         GOTO(out1, rc);
3989                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3990                                            lustre_swab_fiemap);
3991                 if (reply == NULL) {
3992                         CERROR("Can't unpack FIEMAP reply.\n");
3993                         GOTO(out1, rc = -EPROTO);
3994                 }
3995
3996                 memcpy(val, reply, *vallen);
3997
3998         out1:
3999                 ptlrpc_req_finished(req);
4000
4001                 RETURN(rc);
4002         }
4003
4004         RETURN(-EINVAL);
4005 }
4006
4007 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
4008                                           void *aa, int rc)
4009 {
4010         struct llog_ctxt *ctxt;
4011         struct obd_import *imp = req->rq_import;
4012         ENTRY;
4013
4014         if (rc != 0)
4015                 RETURN(rc);
4016
4017         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4018         if (ctxt) {
4019                 if (rc == 0)
4020                         rc = llog_initiator_connect(ctxt);
4021                 else
4022                         CERROR("cannot establish connection for "
4023                                "ctxt %p: %d\n", ctxt, rc);
4024         }
4025
4026         llog_ctxt_put(ctxt);
4027         spin_lock(&imp->imp_lock);
4028         imp->imp_server_timeout = 1;
4029         imp->imp_pingable = 1;
4030         spin_unlock(&imp->imp_lock);
4031         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4032
4033         RETURN(rc);
4034 }
4035
4036 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4037                               void *key, obd_count vallen, void *val,
4038                               struct ptlrpc_request_set *set)
4039 {
4040         struct ptlrpc_request *req;
4041         struct obd_device  *obd = exp->exp_obd;
4042         struct obd_import *imp = class_exp2cliimp(exp);
4043         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4044         char *bufs[3] = { NULL, key, val };
4045         ENTRY;
4046
4047         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4048
4049         if (KEY_IS(KEY_NEXT_ID)) {
4050                 obd_id new_val;
4051                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4052
4053                 if (vallen != sizeof(obd_id))
4054                         RETURN(-EINVAL);
4055
4056                 /* avoid race between allocate new object and set next id
4057                  * from ll_sync thread */
4058                 spin_lock(&oscc->oscc_lock);
4059                 new_val = *((obd_id*)val) + 1;
4060                 if (new_val > oscc->oscc_next_id)
4061                         oscc->oscc_next_id = new_val;
4062                 spin_unlock(&oscc->oscc_lock);
4063
4064                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4065                        exp->exp_obd->obd_name,
4066                        oscc->oscc_next_id);
4067
4068                 RETURN(0);
4069         }
4070
4071         if (KEY_IS(KEY_INIT_RECOV)) {
4072                 if (vallen != sizeof(int))
4073                         RETURN(-EINVAL);
4074                 spin_lock(&imp->imp_lock);
4075                 imp->imp_initial_recov = *(int *)val;
4076                 spin_unlock(&imp->imp_lock);
4077                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4078                        exp->exp_obd->obd_name,
4079                        imp->imp_initial_recov);
4080                 RETURN(0);
4081         }
4082
4083         if (KEY_IS(KEY_CHECKSUM)) {
4084                 if (vallen != sizeof(int))
4085                         RETURN(-EINVAL);
4086                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4087                 RETURN(0);
4088         }
4089
4090         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4091                 RETURN(-EINVAL);
4092
4093         /* We pass all other commands directly to OST. Since nobody calls osc
4094            methods directly and everybody is supposed to go through LOV, we
4095            assume lov checked invalid values for us.
4096            The only recognised values so far are evict_by_nid and mds_conn.
4097            Even if something bad goes through, we'd get a -EINVAL from OST
4098            anyway. */
4099
4100         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4101                               bufs);
4102         if (req == NULL)
4103                 RETURN(-ENOMEM);
4104
4105         if (KEY_IS(KEY_MDS_CONN))
4106                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4107         else if (KEY_IS(KEY_GRANT_SHRINK))
4108                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4109
4110         if (KEY_IS(KEY_GRANT_SHRINK)) {
4111                 struct osc_grant_args *aa;
4112                 struct obdo *oa;
4113
4114                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4115                 aa = ptlrpc_req_async_args(req);
4116                 OBD_ALLOC_PTR(oa);
4117                 if (!oa) {
4118                         ptlrpc_req_finished(req);
4119                         RETURN(-ENOMEM);
4120                 }
4121                 *oa = ((struct ost_body *)val)->oa;
4122                 aa->aa_oa = oa;
4123
4124                 size[1] = vallen;
4125                 ptlrpc_req_set_repsize(req, 2, size);
4126                 ptlrpcd_add_req(req);
4127         } else {
4128                 ptlrpc_req_set_repsize(req, 1, NULL);
4129                 ptlrpc_set_add_req(set, req);
4130                 ptlrpc_check_set(set);
4131         }
4132
4133         RETURN(0);
4134 }
4135
4136
4137 static struct llog_operations osc_size_repl_logops = {
4138         lop_cancel: llog_obd_repl_cancel
4139 };
4140
4141 static struct llog_operations osc_mds_ost_orig_logops;
4142 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4143                          int *index)
4144 {
4145         struct llog_catid catid;
4146         static char name[32] = CATLIST;
4147         int rc;
4148         ENTRY;
4149
4150         LASSERT(index);
4151
4152         mutex_down(&disk_obd->obd_llog_cat_process);
4153
4154         rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4155         if (rc) {
4156                 CERROR("rc: %d\n", rc);
4157                 GOTO(out_unlock, rc);
4158         }
4159 #if 0
4160         CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4161                obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4162                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4163 #endif
4164
4165         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4166                         &catid.lci_logid, &osc_mds_ost_orig_logops);
4167         if (rc) {
4168                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4169                 GOTO (out, rc);
4170         }
4171
4172         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4173                         &osc_size_repl_logops);
4174         if (rc) {
4175                 struct llog_ctxt *ctxt =
4176                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4177                 if (ctxt)
4178                         llog_cleanup(ctxt);
4179                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4180         }
4181 out:
4182         if (rc) {
4183                 CERROR("osc '%s' tgt '%s' rc=%d\n",
4184                        obd->obd_name, disk_obd->obd_name, rc);
4185                 CERROR("logid "LPX64":0x%x\n",
4186                        catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4187         } else {
4188                 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4189                                        &catid);
4190                 if (rc)
4191                         CERROR("rc: %d\n", rc);
4192         }
4193 out_unlock:
4194         mutex_up(&disk_obd->obd_llog_cat_process);
4195
4196         RETURN(rc);
4197 }
4198
4199 static int osc_llog_finish(struct obd_device *obd, int count)
4200 {
4201         struct llog_ctxt *ctxt;
4202         int rc = 0, rc2 = 0;
4203         ENTRY;
4204
4205         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4206         if (ctxt)
4207                 rc = llog_cleanup(ctxt);
4208
4209         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4210         if (ctxt)
4211                 rc2 = llog_cleanup(ctxt);
4212         if (!rc)
4213                 rc = rc2;
4214
4215         RETURN(rc);
4216 }
4217
4218 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4219                          struct obd_uuid *cluuid,
4220                          struct obd_connect_data *data,
4221                          void *localdata)
4222 {
4223         struct client_obd *cli = &obd->u.cli;
4224
4225         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4226                 long lost_grant;
4227
4228                 client_obd_list_lock(&cli->cl_loi_list_lock);
4229                 data->ocd_grant = cli->cl_avail_grant ?:
4230                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4231                 lost_grant = cli->cl_lost_grant;
4232                 cli->cl_lost_grant = 0;
4233                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4234
4235                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4236                        "cl_lost_grant: %ld\n", data->ocd_grant,
4237                        cli->cl_avail_grant, lost_grant);
4238                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4239                        " ocd_grant: %d\n", data->ocd_connect_flags,
4240                        data->ocd_version, data->ocd_grant);
4241         }
4242
4243         RETURN(0);
4244 }
4245
4246 static int osc_disconnect(struct obd_export *exp)
4247 {
4248         struct obd_device *obd = class_exp2obd(exp);
4249         struct llog_ctxt  *ctxt;
4250         int rc;
4251
4252         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4253         if (ctxt) {
4254                 if (obd->u.cli.cl_conn_count == 1) {
4255                         /* Flush any remaining cancel messages out to the
4256                          * target */
4257                         llog_sync(ctxt, exp);
4258                 }
4259                 llog_ctxt_put(ctxt);
4260         } else {
4261                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4262                        obd);
4263         }
4264
4265         rc = client_disconnect_export(exp);
4266         /**
4267          * Initially we put del_shrink_grant before disconnect_export, but it
4268          * causes the following problem if setup (connect) and cleanup
4269          * (disconnect) are tangled together.
4270          *      connect p1                     disconnect p2
4271          *   ptlrpc_connect_import
4272          *     ...............               class_manual_cleanup
4273          *                                     osc_disconnect
4274          *                                     del_shrink_grant
4275          *   ptlrpc_connect_interrupt
4276          *     init_grant_shrink
4277          *   add this client to shrink list
4278          *                                      cleanup_osc
4279          * Bang! pinger trigger the shrink.
4280          * So the osc should be disconnected from the shrink list, after we
4281          * are sure the import has been destroyed. BUG18662
4282          */
4283         if (obd->u.cli.cl_import == NULL)
4284                 osc_del_shrink_grant(&obd->u.cli);
4285         return rc;
4286 }
4287
4288 static int osc_import_event(struct obd_device *obd,
4289                             struct obd_import *imp,
4290                             enum obd_import_event event)
4291 {
4292         struct client_obd *cli;
4293         int rc = 0;
4294
4295         ENTRY;
4296         LASSERT(imp->imp_obd == obd);
4297
4298         switch (event) {
4299         case IMP_EVENT_DISCON: {
4300                 /* Only do this on the MDS OSC's */
4301                 if (imp->imp_server_timeout) {
4302                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4303
4304                         spin_lock(&oscc->oscc_lock);
4305                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4306                         spin_unlock(&oscc->oscc_lock);
4307                 }
4308                 cli = &obd->u.cli;
4309                 client_obd_list_lock(&cli->cl_loi_list_lock);
4310                 cli->cl_avail_grant = 0;
4311                 cli->cl_lost_grant = 0;
4312                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4313                 ptlrpc_import_setasync(imp, -1);
4314
4315                 break;
4316         }
4317         case IMP_EVENT_INACTIVE: {
4318                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4319                 break;
4320         }
4321         case IMP_EVENT_INVALIDATE: {
4322                 struct ldlm_namespace *ns = obd->obd_namespace;
4323
4324                 /* Reset grants */
4325                 cli = &obd->u.cli;
4326                 client_obd_list_lock(&cli->cl_loi_list_lock);
4327                 /* all pages go to failing rpcs due to the invalid import */
4328                 osc_check_rpcs(cli);
4329                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4330
4331                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4332
4333                 break;
4334         }
4335         case IMP_EVENT_ACTIVE: {
4336                 /* Only do this on the MDS OSC's */
4337                 if (imp->imp_server_timeout) {
4338                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4339
4340                         spin_lock(&oscc->oscc_lock);
4341                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4342                         spin_unlock(&oscc->oscc_lock);
4343                 }
4344                 CDEBUG(D_INFO, "notify server \n");
4345                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4346                 break;
4347         }
4348         case IMP_EVENT_OCD: {
4349                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4350
4351                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4352                         osc_init_grant(&obd->u.cli, ocd);
4353
4354                 /* See bug 7198 */
4355                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4356                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4357
4358                 ptlrpc_import_setasync(imp, 1);
4359                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4360                 break;
4361         }
4362         default:
4363                 CERROR("Unknown import event %d\n", event);
4364                 LBUG();
4365         }
4366         RETURN(rc);
4367 }
4368
4369 /* determine whether the lock can be canceled before replaying the lock
4370  * during recovery, see bug16774 for detailed information 
4371  *
4372  * return values:
4373  *  zero  - the lock can't be canceled
4374  *  other - ok to cancel
4375  */
4376 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4377 {
4378         check_res_locked(lock->l_resource);
4379         if (lock->l_granted_mode == LCK_GROUP || 
4380             lock->l_resource->lr_type != LDLM_EXTENT)
4381                 RETURN(0);
4382
4383         /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4384         if (lock->l_granted_mode == LCK_PR ||
4385             lock->l_granted_mode == LCK_CR)
4386                 RETURN(1);
4387
4388         RETURN(0);       
4389 }
4390
4391 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4392 {
4393         int rc;
4394         ENTRY;
4395
4396         ENTRY;
4397         rc = ptlrpcd_addref();
4398         if (rc)
4399                 RETURN(rc);
4400
4401         rc = client_obd_setup(obd, len, buf);
4402         if (rc) {
4403                 ptlrpcd_decref();
4404         } else {
4405                 struct lprocfs_static_vars lvars = { 0 };
4406                 struct client_obd *cli = &obd->u.cli;
4407
4408                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4409                 lprocfs_osc_init_vars(&lvars);
4410                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4411                         lproc_osc_attach_seqstat(obd);
4412                         ptlrpc_lprocfs_register_obd(obd);
4413                 }
4414
4415                 oscc_init(obd);
4416                 /* We need to allocate a few requests more, because
4417                    brw_interpret tries to create new requests before freeing
4418                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4419                    reserved, but I afraid that might be too much wasted RAM
4420                    in fact, so 2 is just my guess and still should work. */
4421                 cli->cl_import->imp_rq_pool =
4422                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4423                                             OST_MAXREQSIZE,
4424                                             ptlrpc_add_rqs_to_pool);
4425                 cli->cl_cache = cache_create(obd);
4426                 if (!cli->cl_cache) {
4427                         osc_cleanup(obd);
4428                         rc = -ENOMEM;
4429                 }
4430                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4431                 sema_init(&cli->cl_grant_sem, 1);
4432
4433                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4434         }
4435
4436         RETURN(rc);
4437 }
4438
4439 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4440 {
4441         int rc = 0;
4442         ENTRY;
4443
4444         switch (stage) {
4445         case OBD_CLEANUP_EARLY: {
4446                 struct obd_import *imp;
4447                 imp = obd->u.cli.cl_import;
4448                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4449                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4450                 ptlrpc_deactivate_import(imp);
4451                 break;
4452         }
4453         case OBD_CLEANUP_EXPORTS: {
4454                 /* If we set up but never connected, the
4455                    client import will not have been cleaned. */
4456                 down_write(&obd->u.cli.cl_sem);
4457                 if (obd->u.cli.cl_import) {
4458                         struct obd_import *imp;
4459                         imp = obd->u.cli.cl_import;
4460                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4461                                obd->obd_name);
4462                         ptlrpc_invalidate_import(imp);
4463                         if (imp->imp_rq_pool) {
4464                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4465                                 imp->imp_rq_pool = NULL;
4466                         }
4467                         class_destroy_import(imp);
4468                         obd->u.cli.cl_import = NULL;
4469                 }
4470                 up_write(&obd->u.cli.cl_sem);
4471
4472                 rc = obd_llog_finish(obd, 0);
4473                 if (rc != 0)
4474                         CERROR("failed to cleanup llogging subsystems\n");
4475                 break;
4476         }
4477         case OBD_CLEANUP_SELF_EXP:
4478                 break;
4479         case OBD_CLEANUP_OBD:
4480                 break;
4481         }
4482         RETURN(rc);
4483 }
4484
4485 int osc_cleanup(struct obd_device *obd)
4486 {
4487         int rc;
4488
4489         ENTRY;
4490         ptlrpc_lprocfs_unregister_obd(obd);
4491         lprocfs_obd_cleanup(obd);
4492
4493         /* free memory of osc quota cache */
4494         lquota_cleanup(quota_interface, obd);
4495
4496         cache_destroy(obd->u.cli.cl_cache);
4497         rc = client_obd_cleanup(obd);
4498
4499         ptlrpcd_decref();
4500         RETURN(rc);
4501 }
4502
4503 static int osc_register_page_removal_cb(struct obd_device *obd,
4504                                         obd_page_removal_cb_t func,
4505                                         obd_pin_extent_cb pin_cb)
4506 {
4507         ENTRY;
4508
4509         /* this server - not need init */
4510         if (func == NULL)
4511                 return 0;
4512
4513         return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4514                                            pin_cb);
4515 }
4516
4517 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4518                                           obd_page_removal_cb_t func)
4519 {
4520         ENTRY;
4521         return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4522 }
4523
4524 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4525                                        obd_lock_cancel_cb cb)
4526 {
4527         ENTRY;
4528         LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4529
4530         /* this server - not need init */
4531         if (cb == NULL)
4532                 return 0;
4533
4534         obd->u.cli.cl_ext_lock_cancel_cb = cb;
4535         return 0;
4536 }
4537
4538 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4539                                          obd_lock_cancel_cb cb)
4540 {
4541         ENTRY;
4542
4543         if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4544                 CERROR("Unregistering cancel cb %p, while only %p was "
4545                        "registered\n", cb,
4546                        obd->u.cli.cl_ext_lock_cancel_cb);
4547                 RETURN(-EINVAL);
4548         }
4549
4550         obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4551         return 0;
4552 }
4553
4554 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4555 {
4556         struct lustre_cfg *lcfg = buf;
4557         struct lprocfs_static_vars lvars = { 0 };
4558         int rc = 0;
4559
4560         lprocfs_osc_init_vars(&lvars);
4561
4562         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4563         return(rc);
4564 }
4565
4566 struct obd_ops osc_obd_ops = {
4567         .o_owner                = THIS_MODULE,
4568         .o_setup                = osc_setup,
4569         .o_precleanup           = osc_precleanup,
4570         .o_cleanup              = osc_cleanup,
4571         .o_add_conn             = client_import_add_conn,
4572         .o_del_conn             = client_import_del_conn,
4573         .o_connect              = client_connect_import,
4574         .o_reconnect            = osc_reconnect,
4575         .o_disconnect           = osc_disconnect,
4576         .o_statfs               = osc_statfs,
4577         .o_statfs_async         = osc_statfs_async,
4578         .o_packmd               = osc_packmd,
4579         .o_unpackmd             = osc_unpackmd,
4580         .o_precreate            = osc_precreate,
4581         .o_create               = osc_create,
4582         .o_create_async         = osc_create_async,
4583         .o_destroy              = osc_destroy,
4584         .o_getattr              = osc_getattr,
4585         .o_getattr_async        = osc_getattr_async,
4586         .o_setattr              = osc_setattr,
4587         .o_setattr_async        = osc_setattr_async,
4588         .o_brw                  = osc_brw,
4589         .o_brw_async            = osc_brw_async,
4590         .o_prep_async_page      = osc_prep_async_page,
4591         .o_get_lock             = osc_get_lock,
4592         .o_queue_async_io       = osc_queue_async_io,
4593         .o_set_async_flags      = osc_set_async_flags,
4594         .o_queue_group_io       = osc_queue_group_io,
4595         .o_trigger_group_io     = osc_trigger_group_io,
4596         .o_teardown_async_page  = osc_teardown_async_page,
4597         .o_punch                = osc_punch,
4598         .o_sync                 = osc_sync,
4599         .o_enqueue              = osc_enqueue,
4600         .o_match                = osc_match,
4601         .o_change_cbdata        = osc_change_cbdata,
4602         .o_find_cbdata          = osc_find_cbdata,
4603         .o_cancel               = osc_cancel,
4604         .o_cancel_unused        = osc_cancel_unused,
4605         .o_join_lru             = osc_join_lru,
4606         .o_iocontrol            = osc_iocontrol,
4607         .o_get_info             = osc_get_info,
4608         .o_set_info_async       = osc_set_info_async,
4609         .o_import_event         = osc_import_event,
4610         .o_llog_init            = osc_llog_init,
4611         .o_llog_finish          = osc_llog_finish,
4612         .o_process_config       = osc_process_config,
4613         .o_register_page_removal_cb = osc_register_page_removal_cb,
4614         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4615         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4616         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4617 };
4618 int __init osc_init(void)
4619 {
4620         struct lprocfs_static_vars lvars = { 0 };
4621         int rc;
4622         ENTRY;
4623
4624         lprocfs_osc_init_vars(&lvars);
4625
4626         request_module("lquota");
4627         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4628         lquota_init(quota_interface);
4629         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4630
4631         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4632                                  LUSTRE_OSC_NAME);
4633         if (rc) {
4634                 if (quota_interface)
4635                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4636                 RETURN(rc);
4637         }
4638
4639         osc_mds_ost_orig_logops = llog_lvfs_ops;
4640         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4641         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4642         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4643         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4644
4645         RETURN(rc);
4646 }
4647
4648 #ifdef __KERNEL__
4649 static void /*__exit*/ osc_exit(void)
4650 {
4651         lquota_exit(quota_interface);
4652         if (quota_interface)
4653                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4654
4655         class_unregister_type(LUSTRE_OSC_NAME);
4656 }
4657
4658 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4659 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4660 MODULE_LICENSE("GPL");
4661
4662 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4663 #endif