Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
213                                   lustre_swab_ost_body);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284  
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         *oinfo->oi_oa = body->oa;
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
315                                         oinfo->oi_oa->o_gr > 0);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331  
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         *oinfo->oi_oa = body->oa;
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(struct ptlrpc_request *req,
350                                  struct osc_async_args *aa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         *aa->aa_oi->oi_oa = body->oa;
363 out:
364         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
365         RETURN(rc);
366 }
367
368 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
369                              struct obd_trans_info *oti,
370                              struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request *req;
373         struct osc_async_args *aa;
374         int                    rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         osc_pack_req_body(req, oinfo);
389
390         ptlrpc_request_set_replen(req);
391  
392         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
393                 LASSERT(oti);
394                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
395         }
396
397         /* do mds to ost setattr asynchronouly */
398         if (!rqset) {
399                 /* Do not wait for response. */
400                 ptlrpcd_add_req(req);
401         } else {
402                 req->rq_interpret_reply = osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
405                 aa = ptlrpc_req_async_args(req);
406                 aa->aa_oi = oinfo;
407
408                 ptlrpc_set_add_req(rqset, req);
409         }
410
411         RETURN(0);
412 }
413
414 int osc_real_create(struct obd_export *exp, struct obdo *oa,
415                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
416 {
417         struct ptlrpc_request *req;
418         struct ost_body       *body;
419         struct lov_stripe_md  *lsm;
420         int                    rc;
421         ENTRY;
422
423         LASSERT(oa);
424         LASSERT(ea);
425
426         lsm = *ea;
427         if (!lsm) {
428                 rc = obd_alloc_memmd(exp, &lsm);
429                 if (rc < 0)
430                         RETURN(rc);
431         }
432
433         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
434         if (req == NULL)
435                 GOTO(out, rc = -ENOMEM);
436
437         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
438         if (rc) {
439                 ptlrpc_request_free(req);
440                 GOTO(out, rc);
441         }
442
443         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
444         LASSERT(body);
445         body->oa = *oa;
446
447         ptlrpc_request_set_replen(req);
448
449         if (oa->o_valid & OBD_MD_FLINLINE) {
450                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
451                         oa->o_flags == OBD_FL_DELORPHAN);
452                 DEBUG_REQ(D_HA, req,
453                           "delorphan from OST integration");
454                 /* Don't resend the delorphan req */
455                 req->rq_no_resend = req->rq_no_delay = 1;
456         }
457
458         rc = ptlrpc_queue_wait(req);
459         if (rc)
460                 GOTO(out_req, rc);
461
462         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
463         if (body == NULL)
464                 GOTO(out_req, rc = -EPROTO);
465
466         *oa = body->oa;
467
468         /* This should really be sent by the OST */
469         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
470         oa->o_valid |= OBD_MD_FLBLKSZ;
471
472         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
473          * have valid lsm_oinfo data structs, so don't go touching that.
474          * This needs to be fixed in a big way.
475          */
476         lsm->lsm_object_id = oa->o_id;
477         lsm->lsm_object_gr = oa->o_gr;
478         *ea = lsm;
479
480         if (oti != NULL) {
481                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
482
483                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
484                         if (!oti->oti_logcookies)
485                                 oti_alloc_cookies(oti, 1);
486                         *oti->oti_logcookies = *obdo_logcookie(oa);
487                 }
488         }
489
490         CDEBUG(D_HA, "transno: "LPD64"\n",
491                lustre_msg_get_transno(req->rq_repmsg));
492 out_req:
493         ptlrpc_req_finished(req);
494 out:
495         if (rc && !*ea)
496                 obd_free_memmd(exp, &lsm);
497         RETURN(rc);
498 }
499
500 static int osc_punch_interpret(struct ptlrpc_request *req,
501                                struct osc_async_args *aa, int rc)
502 {
503         struct ost_body *body;
504         ENTRY;
505
506         if (rc != 0)
507                 GOTO(out, rc);
508
509         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
510         if (body == NULL)
511                 GOTO(out, rc = -EPROTO);
512
513         *aa->aa_oi->oi_oa = body->oa;
514 out:
515         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
516         RETURN(rc);
517 }
518
519 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
520                      struct obd_trans_info *oti,
521                      struct ptlrpc_request_set *rqset)
522 {
523         struct ptlrpc_request *req;
524         struct osc_async_args *aa;
525         struct ost_body       *body;
526         int                    rc;
527         ENTRY;
528
529         if (!oinfo->oi_oa) {
530                 CDEBUG(D_INFO, "oa NULL\n");
531                 RETURN(-EINVAL);
532         }
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546         osc_pack_req_body(req, oinfo);
547
548         /* overload the size and blocks fields in the oa with start/end */
549         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
550         LASSERT(body);
551         body->oa.o_size = oinfo->oi_policy.l_extent.start;
552         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
553         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
554         ptlrpc_request_set_replen(req);
555
556
557         req->rq_interpret_reply = osc_punch_interpret;
558         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
559         aa = ptlrpc_req_async_args(req);
560         aa->aa_oi = oinfo;
561         ptlrpc_set_add_req(rqset, req);
562
563         RETURN(0);
564 }
565
566 static int osc_sync(struct obd_export *exp, struct obdo *oa,
567                     struct lov_stripe_md *md, obd_size start, obd_size end,
568                     void *capa)
569 {
570         struct ptlrpc_request *req;
571         struct ost_body       *body;
572         int                    rc;
573         ENTRY;
574
575         if (!oa) {
576                 CDEBUG(D_INFO, "oa NULL\n");
577                 RETURN(-EINVAL);
578         }
579
580         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
581         if (req == NULL)
582                 RETURN(-ENOMEM);
583
584         osc_set_capa_size(req, &RMF_CAPA1, capa);
585         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
586         if (rc) {
587                 ptlrpc_request_free(req);
588                 RETURN(rc);
589         }
590
591         /* overload the size and blocks fields in the oa with start/end */
592         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
593         LASSERT(body);
594         body->oa = *oa;
595         body->oa.o_size = start;
596         body->oa.o_blocks = end;
597         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
598         osc_pack_capa(req, body, capa);
599
600         ptlrpc_request_set_replen(req);
601
602         rc = ptlrpc_queue_wait(req);
603         if (rc)
604                 GOTO(out, rc);
605
606         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
607         if (body == NULL)
608                 GOTO(out, rc = -EPROTO);
609
610         *oa = body->oa;
611
612         EXIT;
613  out:
614         ptlrpc_req_finished(req);
615         return rc;
616 }
617
618 /* Find and cancel locally locks matched by @mode in the resource found by
619  * @objid. Found locks are added into @cancel list. Returns the amount of
620  * locks added to @cancels list. */
621 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
622                                    struct list_head *cancels, ldlm_mode_t mode,
623                                    int lock_flags)
624 {
625         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
626         struct ldlm_res_id res_id;
627         struct ldlm_resource *res;
628         int count;
629         ENTRY;
630
631         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
632         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
633         if (res == NULL)
634                 RETURN(0);
635
636         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
637                                            lock_flags, 0, NULL);
638         ldlm_resource_putref(res);
639         RETURN(count);
640 }
641
642 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
643                                  int rc)
644 {
645         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
646
647         atomic_dec(&cli->cl_destroy_in_flight);
648         cfs_waitq_signal(&cli->cl_destroy_waitq);
649         return 0;
650 }
651
652 static int osc_can_send_destroy(struct client_obd *cli)
653 {
654         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
655             cli->cl_max_rpcs_in_flight) {
656                 /* The destroy request can be sent */
657                 return 1;
658         }
659         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
660             cli->cl_max_rpcs_in_flight) {
661                 /*
662                  * The counter has been modified between the two atomic
663                  * operations.
664                  */
665                 cfs_waitq_signal(&cli->cl_destroy_waitq);
666         }
667         return 0;
668 }
669
670 /* Destroy requests can be async always on the client, and we don't even really
671  * care about the return code since the client cannot do anything at all about
672  * a destroy failure.
673  * When the MDS is unlinking a filename, it saves the file objects into a
674  * recovery llog, and these object records are cancelled when the OST reports
675  * they were destroyed and sync'd to disk (i.e. transaction committed).
676  * If the client dies, or the OST is down when the object should be destroyed,
677  * the records are not cancelled, and when the OST reconnects to the MDS next,
678  * it will retrieve the llog unlink logs and then sends the log cancellation
679  * cookies to the MDS after committing destroy transactions. */
680 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
681                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
682                        struct obd_export *md_export)
683 {
684         struct client_obd     *cli = &exp->exp_obd->u.cli;
685         struct ptlrpc_request *req;
686         struct ost_body       *body;
687         CFS_LIST_HEAD(cancels);
688         int rc, count;
689         ENTRY;
690
691         if (!oa) {
692                 CDEBUG(D_INFO, "oa NULL\n");
693                 RETURN(-EINVAL);
694         }
695
696         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
697                                         LDLM_FL_DISCARD_DATA);
698
699         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
700         if (req == NULL) {
701                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
702                 RETURN(-ENOMEM);
703         }
704
705         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
706                                0, &cancels, count);
707         if (rc) {
708                 ptlrpc_request_free(req);
709                 RETURN(rc);
710         }
711
712         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
713         req->rq_interpret_reply = osc_destroy_interpret;
714         ptlrpc_at_set_req_timeout(req);
715
716         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
717                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
718                        sizeof(*oti->oti_logcookies));
719         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
720         LASSERT(body);
721         body->oa = *oa;
722
723         ptlrpc_request_set_replen(req);
724
725         if (!osc_can_send_destroy(cli)) {
726                 struct l_wait_info lwi = { 0 };
727
728                 /*
729                  * Wait until the number of on-going destroy RPCs drops
730                  * under max_rpc_in_flight
731                  */
732                 l_wait_event_exclusive(cli->cl_destroy_waitq,
733                                        osc_can_send_destroy(cli), &lwi);
734         }
735
736         /* Do not wait for response */
737         ptlrpcd_add_req(req);
738         RETURN(0);
739 }
740
741 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
742                                 long writing_bytes)
743 {
744         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
745
746         LASSERT(!(oa->o_valid & bits));
747
748         oa->o_valid |= bits;
749         client_obd_list_lock(&cli->cl_loi_list_lock);
750         oa->o_dirty = cli->cl_dirty;
751         if (cli->cl_dirty > cli->cl_dirty_max) {
752                 CERROR("dirty %lu > dirty_max %lu\n",
753                        cli->cl_dirty, cli->cl_dirty_max);
754                 oa->o_undirty = 0;
755         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
756                 CERROR("dirty %d > system dirty_max %d\n",
757                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
758                 oa->o_undirty = 0;
759         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
760                 CERROR("dirty %lu - dirty_max %lu too big???\n",
761                        cli->cl_dirty, cli->cl_dirty_max);
762                 oa->o_undirty = 0;
763         } else {
764                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
765                                 (cli->cl_max_rpcs_in_flight + 1);
766                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
767         }
768         oa->o_grant = cli->cl_avail_grant;
769         oa->o_dropped = cli->cl_lost_grant;
770         cli->cl_lost_grant = 0;
771         client_obd_list_unlock(&cli->cl_loi_list_lock);
772         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
773                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
774 }
775
776 /* caller must hold loi_list_lock */
777 static void osc_consume_write_grant(struct client_obd *cli,
778                                     struct brw_page *pga)
779 {
780         atomic_inc(&obd_dirty_pages);
781         cli->cl_dirty += CFS_PAGE_SIZE;
782         cli->cl_avail_grant -= CFS_PAGE_SIZE;
783         pga->flag |= OBD_BRW_FROM_GRANT;
784         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
785                CFS_PAGE_SIZE, pga, pga->pg);
786         LASSERT(cli->cl_avail_grant >= 0);
787 }
788
789 /* the companion to osc_consume_write_grant, called when a brw has completed.
790  * must be called with the loi lock held. */
791 static void osc_release_write_grant(struct client_obd *cli,
792                                     struct brw_page *pga, int sent)
793 {
794         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
795         ENTRY;
796
797         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
798                 EXIT;
799                 return;
800         }
801
802         pga->flag &= ~OBD_BRW_FROM_GRANT;
803         atomic_dec(&obd_dirty_pages);
804         cli->cl_dirty -= CFS_PAGE_SIZE;
805         if (!sent) {
806                 cli->cl_lost_grant += CFS_PAGE_SIZE;
807                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
808                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
809         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
810                 /* For short writes we shouldn't count parts of pages that
811                  * span a whole block on the OST side, or our accounting goes
812                  * wrong.  Should match the code in filter_grant_check. */
813                 int offset = pga->off & ~CFS_PAGE_MASK;
814                 int count = pga->count + (offset & (blocksize - 1));
815                 int end = (offset + pga->count) & (blocksize - 1);
816                 if (end)
817                         count += blocksize - end;
818
819                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
820                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
821                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
822                        cli->cl_avail_grant, cli->cl_dirty);
823         }
824
825         EXIT;
826 }
827
828 static unsigned long rpcs_in_flight(struct client_obd *cli)
829 {
830         return cli->cl_r_in_flight + cli->cl_w_in_flight;
831 }
832
833 /* caller must hold loi_list_lock */
834 void osc_wake_cache_waiters(struct client_obd *cli)
835 {
836         struct list_head *l, *tmp;
837         struct osc_cache_waiter *ocw;
838
839         ENTRY;
840         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
841                 /* if we can't dirty more, we must wait until some is written */
842                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
843                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
844                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
845                                "osc max %ld, sys max %d\n", cli->cl_dirty,
846                                cli->cl_dirty_max, obd_max_dirty_pages);
847                         return;
848                 }
849
850                 /* if still dirty cache but no grant wait for pending RPCs that
851                  * may yet return us some grant before doing sync writes */
852                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
853                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
854                                cli->cl_w_in_flight);
855                         return;
856                 }
857
858                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
859                 list_del_init(&ocw->ocw_entry);
860                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
861                         /* no more RPCs in flight to return grant, do sync IO */
862                         ocw->ocw_rc = -EDQUOT;
863                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
864                 } else {
865                         osc_consume_write_grant(cli,
866                                                 &ocw->ocw_oap->oap_brw_page);
867                 }
868
869                 cfs_waitq_signal(&ocw->ocw_waitq);
870         }
871
872         EXIT;
873 }
874
875 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         cli->cl_avail_grant = ocd->ocd_grant;
879         client_obd_list_unlock(&cli->cl_loi_list_lock);
880
881         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
882                cli->cl_avail_grant, cli->cl_lost_grant);
883         LASSERT(cli->cl_avail_grant >= 0);
884 }
885
886 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
887 {
888         client_obd_list_lock(&cli->cl_loi_list_lock);
889         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
890         if (body->oa.o_valid & OBD_MD_FLGRANT)
891                 cli->cl_avail_grant += body->oa.o_grant;
892         /* waiters are woken in brw_interpret */
893         client_obd_list_unlock(&cli->cl_loi_list_lock);
894 }
895
896 /* We assume that the reason this OSC got a short read is because it read
897  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
898  * via the LOV, and it _knows_ it's reading inside the file, it's just that
899  * this stripe never got written at or beyond this stripe offset yet. */
900 static void handle_short_read(int nob_read, obd_count page_count,
901                               struct brw_page **pga)
902 {
903         char *ptr;
904         int i = 0;
905
906         /* skip bytes read OK */
907         while (nob_read > 0) {
908                 LASSERT (page_count > 0);
909
910                 if (pga[i]->count > nob_read) {
911                         /* EOF inside this page */
912                         ptr = cfs_kmap(pga[i]->pg) +
913                                 (pga[i]->off & ~CFS_PAGE_MASK);
914                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
915                         cfs_kunmap(pga[i]->pg);
916                         page_count--;
917                         i++;
918                         break;
919                 }
920
921                 nob_read -= pga[i]->count;
922                 page_count--;
923                 i++;
924         }
925
926         /* zero remaining pages */
927         while (page_count-- > 0) {
928                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
929                 memset(ptr, 0, pga[i]->count);
930                 cfs_kunmap(pga[i]->pg);
931                 i++;
932         }
933 }
934
935 static int check_write_rcs(struct ptlrpc_request *req,
936                            int requested_nob, int niocount,
937                            obd_count page_count, struct brw_page **pga)
938 {
939         int    *remote_rcs, i;
940
941         /* return error if any niobuf was in error */
942         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
943                                         sizeof(*remote_rcs) * niocount, NULL);
944         if (remote_rcs == NULL) {
945                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
946                 return(-EPROTO);
947         }
948         if (lustre_msg_swabbed(req->rq_repmsg))
949                 for (i = 0; i < niocount; i++)
950                         __swab32s(&remote_rcs[i]);
951
952         for (i = 0; i < niocount; i++) {
953                 if (remote_rcs[i] < 0)
954                         return(remote_rcs[i]);
955
956                 if (remote_rcs[i] != 0) {
957                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
958                                 i, remote_rcs[i], req);
959                         return(-EPROTO);
960                 }
961         }
962
963         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
964                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
965                        requested_nob, req->rq_bulk->bd_nob_transferred);
966                 return(-EPROTO);
967         }
968
969         return (0);
970 }
971
972 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
973 {
974         if (p1->flag != p2->flag) {
975                 unsigned mask = ~OBD_BRW_FROM_GRANT;
976
977                 /* warn if we try to combine flags that we don't know to be
978                  * safe to combine */
979                 if ((p1->flag & mask) != (p2->flag & mask))
980                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
981                                "same brw?\n", p1->flag, p2->flag);
982                 return 0;
983         }
984
985         return (p1->off + p1->count == p2->off);
986 }
987
988 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
989                                    struct brw_page **pga, int opc,
990                                    cksum_type_t cksum_type)
991 {
992         __u32 cksum;
993         int i = 0;
994
995         LASSERT (pg_count > 0);
996         cksum = init_checksum(cksum_type);
997         while (nob > 0 && pg_count > 0) {
998                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
999                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1000                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1001
1002                 /* corrupt the data before we compute the checksum, to
1003                  * simulate an OST->client data error */
1004                 if (i == 0 && opc == OST_READ &&
1005                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1006                         memcpy(ptr + off, "bad1", min(4, nob));
1007                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1008                 cfs_kunmap(pga[i]->pg);
1009                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1010                                off, cksum);
1011
1012                 nob -= pga[i]->count;
1013                 pg_count--;
1014                 i++;
1015         }
1016         /* For sending we only compute the wrong checksum instead
1017          * of corrupting the data so it is still correct on a redo */
1018         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1019                 cksum++;
1020
1021         return cksum;
1022 }
1023
1024 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1025                                 struct lov_stripe_md *lsm, obd_count page_count,
1026                                 struct brw_page **pga, 
1027                                 struct ptlrpc_request **reqp,
1028                                 struct obd_capa *ocapa)
1029 {
1030         struct ptlrpc_request   *req;
1031         struct ptlrpc_bulk_desc *desc;
1032         struct ost_body         *body;
1033         struct obd_ioobj        *ioobj;
1034         struct niobuf_remote    *niobuf;
1035         int niocount, i, requested_nob, opc, rc;
1036         struct osc_brw_async_args *aa;
1037         struct req_capsule      *pill;
1038         struct brw_page *pg_prev;
1039
1040         ENTRY;
1041         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1042                 RETURN(-ENOMEM); /* Recoverable */
1043         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1044                 RETURN(-EINVAL); /* Fatal */
1045
1046         if ((cmd & OBD_BRW_WRITE) != 0) {
1047                 opc = OST_WRITE;
1048                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1049                                                 cli->cl_import->imp_rq_pool,
1050                                                 &RQF_OST_BRW);
1051         } else {
1052                 opc = OST_READ;
1053                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1054         }
1055
1056         if (req == NULL)
1057                 RETURN(-ENOMEM);
1058
1059         for (niocount = i = 1; i < page_count; i++) {
1060                 if (!can_merge_pages(pga[i - 1], pga[i]))
1061                         niocount++;
1062         }
1063
1064         pill = &req->rq_pill;
1065         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1066                              niocount * sizeof(*niobuf));
1067         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1068
1069         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1070         if (rc) {
1071                 ptlrpc_request_free(req);
1072                 RETURN(rc);
1073         }
1074         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1075         ptlrpc_at_set_req_timeout(req);
1076
1077         if (opc == OST_WRITE)
1078                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1079                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1080         else
1081                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1082                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1083
1084         if (desc == NULL)
1085                 GOTO(out, rc = -ENOMEM);
1086         /* NB request now owns desc and will free it when it gets freed */
1087
1088         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1089         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1090         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1091         LASSERT(body && ioobj && niobuf);
1092
1093         body->oa = *oa;
1094
1095         obdo_to_ioobj(oa, ioobj);
1096         ioobj->ioo_bufcnt = niocount;
1097         osc_pack_capa(req, body, ocapa);
1098         LASSERT (page_count > 0);
1099         pg_prev = pga[0];
1100         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1101                 struct brw_page *pg = pga[i];
1102
1103                 LASSERT(pg->count > 0);
1104                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1105                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1106                          pg->off, pg->count);
1107 #ifdef __linux__
1108                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1109                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1110                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1111                          i, page_count,
1112                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1113                          pg_prev->pg, page_private(pg_prev->pg),
1114                          pg_prev->pg->index, pg_prev->off);
1115 #else
1116                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1117                          "i %d p_c %u\n", i, page_count);
1118 #endif
1119                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1120                         (pg->flag & OBD_BRW_SRVLOCK));
1121
1122                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1123                                       pg->count);
1124                 requested_nob += pg->count;
1125
1126                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1127                         niobuf--;
1128                         niobuf->len += pg->count;
1129                 } else {
1130                         niobuf->offset = pg->off;
1131                         niobuf->len    = pg->count;
1132                         niobuf->flags  = pg->flag;
1133                 }
1134                 pg_prev = pg;
1135         }
1136
1137         LASSERTF((void *)(niobuf - niocount) ==
1138                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1139                                niocount * sizeof(*niobuf)),
1140                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, 
1141                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), 
1142                 (void *)(niobuf - niocount));
1143
1144         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1145
1146         /* size[REQ_REC_OFF] still sizeof (*body) */
1147         if (opc == OST_WRITE) {
1148                 if (unlikely(cli->cl_checksum) &&
1149                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1150                         /* store cl_cksum_type in a local variable since
1151                          * it can be changed via lprocfs */
1152                         cksum_type_t cksum_type = cli->cl_cksum_type;
1153
1154                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1155                                 oa->o_flags = body->oa.o_flags = 0;
1156                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1157                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1158                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1159                                                              page_count, pga,
1160                                                              OST_WRITE,
1161                                                              cksum_type);
1162                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1163                                body->oa.o_cksum);
1164                         /* save this in 'oa', too, for later checking */
1165                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1166                         oa->o_flags |= cksum_type_pack(cksum_type);
1167                 } else {
1168                         /* clear out the checksum flag, in case this is a
1169                          * resend but cl_checksum is no longer set. b=11238 */
1170                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1171                 }
1172                 oa->o_cksum = body->oa.o_cksum;
1173                 /* 1 RC per niobuf */
1174                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1175                                      sizeof(__u32) * niocount);
1176         } else {
1177                 if (unlikely(cli->cl_checksum) &&
1178                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1179                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1180                                 body->oa.o_flags = 0;
1181                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1182                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1183                 }
1184                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1185                 /* 1 RC for the whole I/O */
1186         }
1187         ptlrpc_request_set_replen(req);
1188
1189         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1190         aa = ptlrpc_req_async_args(req);
1191         aa->aa_oa = oa;
1192         aa->aa_requested_nob = requested_nob;
1193         aa->aa_nio_count = niocount;
1194         aa->aa_page_count = page_count;
1195         aa->aa_resends = 0;
1196         aa->aa_ppga = pga;
1197         aa->aa_cli = cli;
1198         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1199
1200         *reqp = req;
1201         RETURN(0);
1202
1203  out:
1204         ptlrpc_req_finished(req);
1205         RETURN(rc);
1206 }
1207
1208 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1209                                 __u32 client_cksum, __u32 server_cksum, int nob,
1210                                 obd_count page_count, struct brw_page **pga,
1211                                 cksum_type_t client_cksum_type)
1212 {
1213         __u32 new_cksum;
1214         char *msg;
1215         cksum_type_t cksum_type;
1216
1217         if (server_cksum == client_cksum) {
1218                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1219                 return 0;
1220         }
1221
1222         if (oa->o_valid & OBD_MD_FLFLAGS)
1223                 cksum_type = cksum_type_unpack(oa->o_flags);
1224         else
1225                 cksum_type = OBD_CKSUM_CRC32;
1226
1227         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1228                                       cksum_type);
1229
1230         if (cksum_type != client_cksum_type)
1231                 msg = "the server did not use the checksum type specified in "
1232                       "the original request - likely a protocol problem";
1233         else if (new_cksum == server_cksum)
1234                 msg = "changed on the client after we checksummed it - "
1235                       "likely false positive due to mmap IO (bug 11742)";
1236         else if (new_cksum == client_cksum)
1237                 msg = "changed in transit before arrival at OST";
1238         else
1239                 msg = "changed in transit AND doesn't match the original - "
1240                       "likely false positive due to mmap IO (bug 11742)";
1241
1242         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1243                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1244                            "["LPU64"-"LPU64"]\n",
1245                            msg, libcfs_nid2str(peer->nid),
1246                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1247                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1248                                                         (__u64)0,
1249                            oa->o_id,
1250                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1251                            pga[0]->off,
1252                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1253         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1254                "client csum now %x\n", client_cksum, client_cksum_type,
1255                server_cksum, cksum_type, new_cksum);
1256         return 1;        
1257 }
1258
1259 /* Note rc enters this function as number of bytes transferred */
1260 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1261 {
1262         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1263         const lnet_process_id_t *peer =
1264                         &req->rq_import->imp_connection->c_peer;
1265         struct client_obd *cli = aa->aa_cli;
1266         struct ost_body *body;
1267         __u32 client_cksum = 0;
1268         ENTRY;
1269
1270         if (rc < 0 && rc != -EDQUOT)
1271                 RETURN(rc);
1272
1273         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1274         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1275                                   lustre_swab_ost_body);
1276         if (body == NULL) {
1277                 CDEBUG(D_INFO, "Can't unpack body\n");
1278                 RETURN(-EPROTO);
1279         }
1280
1281         /* set/clear over quota flag for a uid/gid */
1282         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1283             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1284                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1285                              body->oa.o_gid, body->oa.o_valid,
1286                              body->oa.o_flags);
1287
1288         if (rc < 0)
1289                 RETURN(rc);
1290
1291         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1292                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1293
1294         osc_update_grant(cli, body);
1295
1296         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1297                 if (rc > 0) {
1298                         CERROR("Unexpected +ve rc %d\n", rc);
1299                         RETURN(-EPROTO);
1300                 }
1301                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1302
1303                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1304                     check_write_checksum(&body->oa, peer, client_cksum,
1305                                          body->oa.o_cksum, aa->aa_requested_nob,
1306                                          aa->aa_page_count, aa->aa_ppga,
1307                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1308                         RETURN(-EAGAIN);
1309
1310                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1311                         RETURN(-EAGAIN);
1312
1313                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1314                                      aa->aa_page_count, aa->aa_ppga);
1315                 GOTO(out, rc);
1316         }
1317
1318         /* The rest of this function executes only for OST_READs */
1319         if (rc > aa->aa_requested_nob) {
1320                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1321                        aa->aa_requested_nob);
1322                 RETURN(-EPROTO);
1323         }
1324
1325         if (rc != req->rq_bulk->bd_nob_transferred) {
1326                 CERROR ("Unexpected rc %d (%d transferred)\n",
1327                         rc, req->rq_bulk->bd_nob_transferred);
1328                 return (-EPROTO);
1329         }
1330
1331         if (rc < aa->aa_requested_nob)
1332                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1333
1334         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1335                                          aa->aa_ppga))
1336                 GOTO(out, rc = -EAGAIN);
1337
1338         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1339                 static int cksum_counter;
1340                 __u32      server_cksum = body->oa.o_cksum;
1341                 char      *via;
1342                 char      *router;
1343                 cksum_type_t cksum_type;
1344
1345                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1346                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1347                 else
1348                         cksum_type = OBD_CKSUM_CRC32;
1349                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1350                                                  aa->aa_ppga, OST_READ,
1351                                                  cksum_type);
1352
1353                 if (peer->nid == req->rq_bulk->bd_sender) {
1354                         via = router = "";
1355                 } else {
1356                         via = " via ";
1357                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1358                 }
1359
1360                 if (server_cksum == ~0 && rc > 0) {
1361                         CERROR("Protocol error: server %s set the 'checksum' "
1362                                "bit, but didn't send a checksum.  Not fatal, "
1363                                "but please notify on http://bugzilla.lustre.org/\n",
1364                                libcfs_nid2str(peer->nid));
1365                 } else if (server_cksum != client_cksum) {
1366                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1367                                            "%s%s%s inum "LPU64"/"LPU64" object "
1368                                            LPU64"/"LPU64" extent "
1369                                            "["LPU64"-"LPU64"]\n",
1370                                            req->rq_import->imp_obd->obd_name,
1371                                            libcfs_nid2str(peer->nid),
1372                                            via, router,
1373                                            body->oa.o_valid & OBD_MD_FLFID ?
1374                                                 body->oa.o_fid : (__u64)0,
1375                                            body->oa.o_valid & OBD_MD_FLFID ?
1376                                                 body->oa.o_generation :(__u64)0,
1377                                            body->oa.o_id,
1378                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1379                                                 body->oa.o_gr : (__u64)0,
1380                                            aa->aa_ppga[0]->off,
1381                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1382                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1383                                                                         1);
1384                         CERROR("client %x, server %x, cksum_type %x\n",
1385                                client_cksum, server_cksum, cksum_type);
1386                         cksum_counter = 0;
1387                         aa->aa_oa->o_cksum = client_cksum;
1388                         rc = -EAGAIN;
1389                 } else {
1390                         cksum_counter++;
1391                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1392                         rc = 0;
1393                 }
1394         } else if (unlikely(client_cksum)) {
1395                 static int cksum_missed;
1396
1397                 cksum_missed++;
1398                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1399                         CERROR("Checksum %u requested from %s but not sent\n",
1400                                cksum_missed, libcfs_nid2str(peer->nid));
1401         } else {
1402                 rc = 0;
1403         }
1404 out:
1405         if (rc >= 0)
1406                 *aa->aa_oa = body->oa;
1407
1408         RETURN(rc);
1409 }
1410
1411 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1412                             struct lov_stripe_md *lsm,
1413                             obd_count page_count, struct brw_page **pga,
1414                             struct obd_capa *ocapa)
1415 {
1416         struct ptlrpc_request *req;
1417         int                    rc;
1418         cfs_waitq_t            waitq;
1419         int                    resends = 0;
1420         struct l_wait_info     lwi;
1421
1422         ENTRY;
1423
1424         cfs_waitq_init(&waitq);
1425
1426 restart_bulk:
1427         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1428                                   page_count, pga, &req, ocapa);
1429         if (rc != 0)
1430                 return (rc);
1431
1432         rc = ptlrpc_queue_wait(req);
1433
1434         if (rc == -ETIMEDOUT && req->rq_resend) {
1435                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1436                 ptlrpc_req_finished(req);
1437                 goto restart_bulk;
1438         }
1439
1440         rc = osc_brw_fini_request(req, rc);
1441
1442         ptlrpc_req_finished(req);
1443         if (osc_recoverable_error(rc)) {
1444                 resends++;
1445                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1446                         CERROR("too many resend retries, returning error\n");
1447                         RETURN(-EIO);
1448                 }
1449
1450                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1451                 l_wait_event(waitq, 0, &lwi);
1452
1453                 goto restart_bulk;
1454         }
1455         
1456         RETURN (rc);
1457 }
1458
1459 int osc_brw_redo_request(struct ptlrpc_request *request,
1460                          struct osc_brw_async_args *aa)
1461 {
1462         struct ptlrpc_request *new_req;
1463         struct ptlrpc_request_set *set = request->rq_set;
1464         struct osc_brw_async_args *new_aa;
1465         struct osc_async_page *oap;
1466         int rc = 0;
1467         ENTRY;
1468
1469         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1470                 CERROR("too many resend retries, returning error\n");
1471                 RETURN(-EIO);
1472         }
1473
1474         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1475 /*
1476         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1477         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1478                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1479                                            REQ_REC_OFF + 3);
1480 */
1481         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1482                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1483                                   aa->aa_cli, aa->aa_oa,
1484                                   NULL /* lsm unused by osc currently */,
1485                                   aa->aa_page_count, aa->aa_ppga, 
1486                                   &new_req, NULL /* ocapa */);
1487         if (rc)
1488                 RETURN(rc);
1489
1490         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1491
1492         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1493                 if (oap->oap_request != NULL) {
1494                         LASSERTF(request == oap->oap_request,
1495                                  "request %p != oap_request %p\n",
1496                                  request, oap->oap_request);
1497                         if (oap->oap_interrupted) {
1498                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1499                                 ptlrpc_req_finished(new_req);
1500                                 RETURN(-EINTR);
1501                         }
1502                 }
1503         }
1504         /* New request takes over pga and oaps from old request.
1505          * Note that copying a list_head doesn't work, need to move it... */
1506         aa->aa_resends++;
1507         new_req->rq_interpret_reply = request->rq_interpret_reply;
1508         new_req->rq_async_args = request->rq_async_args;
1509         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1510
1511         new_aa = ptlrpc_req_async_args(new_req);
1512
1513         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1514         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1515         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1516
1517         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1518                 if (oap->oap_request) {
1519                         ptlrpc_req_finished(oap->oap_request);
1520                         oap->oap_request = ptlrpc_request_addref(new_req);
1521                 }
1522         }
1523
1524         /* use ptlrpc_set_add_req is safe because interpret functions work 
1525          * in check_set context. only one way exist with access to request 
1526          * from different thread got -EINTR - this way protected with 
1527          * cl_loi_list_lock */
1528         ptlrpc_set_add_req(set, new_req);
1529
1530         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1531
1532         DEBUG_REQ(D_INFO, new_req, "new request");
1533         RETURN(0);
1534 }
1535
1536 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1537                           struct lov_stripe_md *lsm, obd_count page_count,
1538                           struct brw_page **pga, struct ptlrpc_request_set *set,
1539                           struct obd_capa *ocapa)
1540 {
1541         struct ptlrpc_request     *req;
1542         struct client_obd         *cli = &exp->exp_obd->u.cli;
1543         int                        rc, i;
1544         struct osc_brw_async_args *aa;
1545         ENTRY;
1546
1547         /* Consume write credits even if doing a sync write -
1548          * otherwise we may run out of space on OST due to grant. */
1549         if (cmd == OBD_BRW_WRITE) {
1550                 spin_lock(&cli->cl_loi_list_lock);
1551                 for (i = 0; i < page_count; i++) {
1552                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1553                                 osc_consume_write_grant(cli, pga[i]);
1554                 }
1555                 spin_unlock(&cli->cl_loi_list_lock);
1556         }
1557
1558         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1559                                   &req, ocapa);
1560
1561         aa = ptlrpc_req_async_args(req);
1562         if (cmd == OBD_BRW_READ) {
1563                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1564                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1565                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1566         } else {
1567                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1568                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1569                                  cli->cl_w_in_flight);
1570                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1571         }
1572
1573         LASSERT(list_empty(&aa->aa_oaps));
1574         if (rc == 0) {
1575                 req->rq_interpret_reply = brw_interpret;
1576                 ptlrpc_set_add_req(set, req);
1577                 client_obd_list_lock(&cli->cl_loi_list_lock);
1578                 if (cmd == OBD_BRW_READ)
1579                         cli->cl_r_in_flight++;
1580                 else
1581                         cli->cl_w_in_flight++;
1582                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1583                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1584         } else if (cmd == OBD_BRW_WRITE) {
1585                 client_obd_list_lock(&cli->cl_loi_list_lock);
1586                 for (i = 0; i < page_count; i++)
1587                         osc_release_write_grant(cli, pga[i], 0);
1588                 osc_wake_cache_waiters(cli);
1589                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1590         }
1591         RETURN (rc);
1592 }
1593
1594 /*
1595  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1596  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1597  * fine for our small page arrays and doesn't require allocation.  its an
1598  * insertion sort that swaps elements that are strides apart, shrinking the
1599  * stride down until its '1' and the array is sorted.
1600  */
1601 static void sort_brw_pages(struct brw_page **array, int num)
1602 {
1603         int stride, i, j;
1604         struct brw_page *tmp;
1605
1606         if (num == 1)
1607                 return;
1608         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1609                 ;
1610
1611         do {
1612                 stride /= 3;
1613                 for (i = stride ; i < num ; i++) {
1614                         tmp = array[i];
1615                         j = i;
1616                         while (j >= stride && array[j - stride]->off > tmp->off) {
1617                                 array[j] = array[j - stride];
1618                                 j -= stride;
1619                         }
1620                         array[j] = tmp;
1621                 }
1622         } while (stride > 1);
1623 }
1624
1625 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1626 {
1627         int count = 1;
1628         int offset;
1629         int i = 0;
1630
1631         LASSERT (pages > 0);
1632         offset = pg[i]->off & ~CFS_PAGE_MASK;
1633
1634         for (;;) {
1635                 pages--;
1636                 if (pages == 0)         /* that's all */
1637                         return count;
1638
1639                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1640                         return count;   /* doesn't end on page boundary */
1641
1642                 i++;
1643                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1644                 if (offset != 0)        /* doesn't start on page boundary */
1645                         return count;
1646
1647                 count++;
1648         }
1649 }
1650
1651 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1652 {
1653         struct brw_page **ppga;
1654         int i;
1655
1656         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1657         if (ppga == NULL)
1658                 return NULL;
1659
1660         for (i = 0; i < count; i++)
1661                 ppga[i] = pga + i;
1662         return ppga;
1663 }
1664
1665 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1666 {
1667         LASSERT(ppga != NULL);
1668         OBD_FREE(ppga, sizeof(*ppga) * count);
1669 }
1670
1671 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1672                    obd_count page_count, struct brw_page *pga,
1673                    struct obd_trans_info *oti)
1674 {
1675         struct obdo *saved_oa = NULL;
1676         struct brw_page **ppga, **orig;
1677         struct obd_import *imp = class_exp2cliimp(exp);
1678         struct client_obd *cli = &imp->imp_obd->u.cli;
1679         int rc, page_count_orig;
1680         ENTRY;
1681
1682         if (cmd & OBD_BRW_CHECK) {
1683                 /* The caller just wants to know if there's a chance that this
1684                  * I/O can succeed */
1685
1686                 if (imp == NULL || imp->imp_invalid)
1687                         RETURN(-EIO);
1688                 RETURN(0);
1689         }
1690
1691         /* test_brw with a failed create can trip this, maybe others. */
1692         LASSERT(cli->cl_max_pages_per_rpc);
1693
1694         rc = 0;
1695
1696         orig = ppga = osc_build_ppga(pga, page_count);
1697         if (ppga == NULL)
1698                 RETURN(-ENOMEM);
1699         page_count_orig = page_count;
1700
1701         sort_brw_pages(ppga, page_count);
1702         while (page_count) {
1703                 obd_count pages_per_brw;
1704
1705                 if (page_count > cli->cl_max_pages_per_rpc)
1706                         pages_per_brw = cli->cl_max_pages_per_rpc;
1707                 else
1708                         pages_per_brw = page_count;
1709
1710                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1711
1712                 if (saved_oa != NULL) {
1713                         /* restore previously saved oa */
1714                         *oinfo->oi_oa = *saved_oa;
1715                 } else if (page_count > pages_per_brw) {
1716                         /* save a copy of oa (brw will clobber it) */
1717                         OBDO_ALLOC(saved_oa);
1718                         if (saved_oa == NULL)
1719                                 GOTO(out, rc = -ENOMEM);
1720                         *saved_oa = *oinfo->oi_oa;
1721                 }
1722
1723                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1724                                       pages_per_brw, ppga, oinfo->oi_capa);
1725
1726                 if (rc != 0)
1727                         break;
1728
1729                 page_count -= pages_per_brw;
1730                 ppga += pages_per_brw;
1731         }
1732
1733 out:
1734         osc_release_ppga(orig, page_count_orig);
1735
1736         if (saved_oa != NULL)
1737                 OBDO_FREE(saved_oa);
1738
1739         RETURN(rc);
1740 }
1741
1742 static int osc_brw_async(int cmd, struct obd_export *exp,
1743                          struct obd_info *oinfo, obd_count page_count,
1744                          struct brw_page *pga, struct obd_trans_info *oti,
1745                          struct ptlrpc_request_set *set)
1746 {
1747         struct brw_page **ppga, **orig;
1748         struct client_obd *cli = &exp->exp_obd->u.cli;
1749         int page_count_orig;
1750         int rc = 0;
1751         ENTRY;
1752
1753         if (cmd & OBD_BRW_CHECK) {
1754                 struct obd_import *imp = class_exp2cliimp(exp);
1755                 /* The caller just wants to know if there's a chance that this
1756                  * I/O can succeed */
1757
1758                 if (imp == NULL || imp->imp_invalid)
1759                         RETURN(-EIO);
1760                 RETURN(0);
1761         }
1762
1763         orig = ppga = osc_build_ppga(pga, page_count);
1764         if (ppga == NULL)
1765                 RETURN(-ENOMEM);
1766         page_count_orig = page_count;
1767
1768         sort_brw_pages(ppga, page_count);
1769         while (page_count) {
1770                 struct brw_page **copy;
1771                 obd_count pages_per_brw;
1772
1773                 pages_per_brw = min_t(obd_count, page_count,
1774                                       cli->cl_max_pages_per_rpc);
1775
1776                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1777
1778                 /* use ppga only if single RPC is going to fly */
1779                 if (pages_per_brw != page_count_orig || ppga != orig) {
1780                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1781                         if (copy == NULL)
1782                                 GOTO(out, rc = -ENOMEM);
1783                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1784                 } else
1785                         copy = ppga;
1786
1787                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1788                                     pages_per_brw, copy, set, oinfo->oi_capa);
1789
1790                 if (rc != 0) {
1791                         if (copy != ppga)
1792                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1793                         break;
1794                 }
1795                 if (copy == orig) {
1796                         /* we passed it to async_internal() which is
1797                          * now responsible for releasing memory */
1798                         orig = NULL;
1799                 }
1800
1801                 page_count -= pages_per_brw;
1802                 ppga += pages_per_brw;
1803         }
1804 out:
1805         if (orig)
1806                 osc_release_ppga(orig, page_count_orig);
1807         RETURN(rc);
1808 }
1809
1810 static void osc_check_rpcs(struct client_obd *cli);
1811
1812 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1813  * the dirty accounting.  Writeback completes or truncate happens before
1814  * writing starts.  Must be called with the loi lock held. */
1815 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1816                            int sent)
1817 {
1818         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1819 }
1820
1821
1822 /* This maintains the lists of pending pages to read/write for a given object
1823  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1824  * to quickly find objects that are ready to send an RPC. */
1825 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1826                          int cmd)
1827 {
1828         int optimal;
1829         ENTRY;
1830
1831         if (lop->lop_num_pending == 0)
1832                 RETURN(0);
1833
1834         /* if we have an invalid import we want to drain the queued pages
1835          * by forcing them through rpcs that immediately fail and complete
1836          * the pages.  recovery relies on this to empty the queued pages
1837          * before canceling the locks and evicting down the llite pages */
1838         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1839                 RETURN(1);
1840
1841         /* stream rpcs in queue order as long as as there is an urgent page
1842          * queued.  this is our cheap solution for good batching in the case
1843          * where writepage marks some random page in the middle of the file
1844          * as urgent because of, say, memory pressure */
1845         if (!list_empty(&lop->lop_urgent)) {
1846                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1847                 RETURN(1);
1848         }
1849         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1850         optimal = cli->cl_max_pages_per_rpc;
1851         if (cmd & OBD_BRW_WRITE) {
1852                 /* trigger a write rpc stream as long as there are dirtiers
1853                  * waiting for space.  as they're waiting, they're not going to
1854                  * create more pages to coallesce with what's waiting.. */
1855                 if (!list_empty(&cli->cl_cache_waiters)) {
1856                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1857                         RETURN(1);
1858                 }
1859                 /* +16 to avoid triggering rpcs that would want to include pages
1860                  * that are being queued but which can't be made ready until
1861                  * the queuer finishes with the page. this is a wart for
1862                  * llite::commit_write() */
1863                 optimal += 16;
1864         }
1865         if (lop->lop_num_pending >= optimal)
1866                 RETURN(1);
1867
1868         RETURN(0);
1869 }
1870
1871 static void on_list(struct list_head *item, struct list_head *list,
1872                     int should_be_on)
1873 {
1874         if (list_empty(item) && should_be_on)
1875                 list_add_tail(item, list);
1876         else if (!list_empty(item) && !should_be_on)
1877                 list_del_init(item);
1878 }
1879
1880 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1881  * can find pages to build into rpcs quickly */
1882 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1883 {
1884         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1885                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1886                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1887
1888         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1889                 loi->loi_write_lop.lop_num_pending);
1890
1891         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1892                 loi->loi_read_lop.lop_num_pending);
1893 }
1894
1895 static void lop_update_pending(struct client_obd *cli,
1896                                struct loi_oap_pages *lop, int cmd, int delta)
1897 {
1898         lop->lop_num_pending += delta;
1899         if (cmd & OBD_BRW_WRITE)
1900                 cli->cl_pending_w_pages += delta;
1901         else
1902                 cli->cl_pending_r_pages += delta;
1903 }
1904
1905 /* this is called when a sync waiter receives an interruption.  Its job is to
1906  * get the caller woken as soon as possible.  If its page hasn't been put in an
1907  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1908  * desiring interruption which will forcefully complete the rpc once the rpc
1909  * has timed out */
1910 static void osc_occ_interrupted(struct oig_callback_context *occ)
1911 {
1912         struct osc_async_page *oap;
1913         struct loi_oap_pages *lop;
1914         struct lov_oinfo *loi;
1915         ENTRY;
1916
1917         /* XXX member_of() */
1918         oap = list_entry(occ, struct osc_async_page, oap_occ);
1919
1920         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1921
1922         oap->oap_interrupted = 1;
1923
1924         /* ok, it's been put in an rpc. only one oap gets a request reference */
1925         if (oap->oap_request != NULL) {
1926                 ptlrpc_mark_interrupted(oap->oap_request);
1927                 ptlrpcd_wake(oap->oap_request);
1928                 GOTO(unlock, 0);
1929         }
1930
1931         /* we don't get interruption callbacks until osc_trigger_group_io()
1932          * has been called and put the sync oaps in the pending/urgent lists.*/
1933         if (!list_empty(&oap->oap_pending_item)) {
1934                 list_del_init(&oap->oap_pending_item);
1935                 list_del_init(&oap->oap_urgent_item);
1936
1937                 loi = oap->oap_loi;
1938                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1939                         &loi->loi_write_lop : &loi->loi_read_lop;
1940                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1941                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1942
1943                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1944                 oap->oap_oig = NULL;
1945         }
1946
1947 unlock:
1948         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1949 }
1950
1951 /* this is trying to propogate async writeback errors back up to the
1952  * application.  As an async write fails we record the error code for later if
1953  * the app does an fsync.  As long as errors persist we force future rpcs to be
1954  * sync so that the app can get a sync error and break the cycle of queueing
1955  * pages for which writeback will fail. */
1956 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1957                            int rc)
1958 {
1959         if (rc) {
1960                 if (!ar->ar_rc)
1961                         ar->ar_rc = rc;
1962
1963                 ar->ar_force_sync = 1;
1964                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1965                 return;
1966
1967         }
1968
1969         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1970                 ar->ar_force_sync = 0;
1971 }
1972
1973 static void osc_oap_to_pending(struct osc_async_page *oap)
1974 {
1975         struct loi_oap_pages *lop;
1976
1977         if (oap->oap_cmd & OBD_BRW_WRITE)
1978                 lop = &oap->oap_loi->loi_write_lop;
1979         else
1980                 lop = &oap->oap_loi->loi_read_lop;
1981
1982         if (oap->oap_async_flags & ASYNC_URGENT)
1983                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1984         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1985         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1986 }
1987
1988 /* this must be called holding the loi list lock to give coverage to exit_cache,
1989  * async_flag maintenance, and oap_request */
1990 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1991                               struct osc_async_page *oap, int sent, int rc)
1992 {
1993         __u64 xid = 0;
1994
1995         ENTRY;
1996         if (oap->oap_request != NULL) {
1997                 xid = ptlrpc_req_xid(oap->oap_request);
1998                 ptlrpc_req_finished(oap->oap_request);
1999                 oap->oap_request = NULL;
2000         }
2001
2002         oap->oap_async_flags = 0;
2003         oap->oap_interrupted = 0;
2004
2005         if (oap->oap_cmd & OBD_BRW_WRITE) {
2006                 osc_process_ar(&cli->cl_ar, xid, rc);
2007                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2008         }
2009
2010         if (rc == 0 && oa != NULL) {
2011                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2012                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2013                 if (oa->o_valid & OBD_MD_FLMTIME)
2014                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2015                 if (oa->o_valid & OBD_MD_FLATIME)
2016                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2017                 if (oa->o_valid & OBD_MD_FLCTIME)
2018                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2019         }
2020
2021         if (oap->oap_oig) {
2022                 osc_exit_cache(cli, oap, sent);
2023                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2024                 oap->oap_oig = NULL;
2025                 EXIT;
2026                 return;
2027         }
2028
2029         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2030                                                 oap->oap_cmd, oa, rc);
2031
2032         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2033          * I/O on the page could start, but OSC calls it under lock
2034          * and thus we can add oap back to pending safely */
2035         if (rc)
2036                 /* upper layer wants to leave the page on pending queue */
2037                 osc_oap_to_pending(oap);
2038         else
2039                 osc_exit_cache(cli, oap, sent);
2040         EXIT;
2041 }
2042
2043 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2044 {
2045         struct osc_brw_async_args *aa = data;
2046         struct client_obd *cli;
2047         ENTRY;
2048
2049         rc = osc_brw_fini_request(req, rc);
2050         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2051         if (osc_recoverable_error(rc)) {
2052                 rc = osc_brw_redo_request(req, aa);
2053                 if (rc == 0)
2054                         RETURN(0);
2055         }
2056
2057         cli = aa->aa_cli;
2058
2059         client_obd_list_lock(&cli->cl_loi_list_lock);
2060
2061         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2062          * is called so we know whether to go to sync BRWs or wait for more
2063          * RPCs to complete */
2064         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2065                 cli->cl_w_in_flight--;
2066         else
2067                 cli->cl_r_in_flight--;
2068
2069         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2070                 struct osc_async_page *oap, *tmp;
2071                 /* the caller may re-use the oap after the completion call so
2072                  * we need to clean it up a little */
2073                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2074                         list_del_init(&oap->oap_rpc_item);
2075                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2076                 }
2077                 OBDO_FREE(aa->aa_oa);
2078         } else { /* from async_internal() */
2079                 int i;
2080                 for (i = 0; i < aa->aa_page_count; i++)
2081                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2082         }
2083         osc_wake_cache_waiters(cli);
2084         osc_check_rpcs(cli);
2085         client_obd_list_unlock(&cli->cl_loi_list_lock);
2086
2087         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2088         RETURN(rc);
2089 }
2090
2091 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2092                                             struct list_head *rpc_list,
2093                                             int page_count, int cmd)
2094 {
2095         struct ptlrpc_request *req;
2096         struct brw_page **pga = NULL;
2097         struct osc_brw_async_args *aa;
2098         struct obdo *oa = NULL;
2099         struct obd_async_page_ops *ops = NULL;
2100         void *caller_data = NULL;
2101         struct obd_capa *ocapa;
2102         struct osc_async_page *oap;
2103         int i, rc;
2104
2105         ENTRY;
2106         LASSERT(!list_empty(rpc_list));
2107
2108         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2109         if (pga == NULL)
2110                 RETURN(ERR_PTR(-ENOMEM));
2111
2112         OBDO_ALLOC(oa);
2113         if (oa == NULL)
2114                 GOTO(out, req = ERR_PTR(-ENOMEM));
2115
2116         i = 0;
2117         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2118                 if (ops == NULL) {
2119                         ops = oap->oap_caller_ops;
2120                         caller_data = oap->oap_caller_data;
2121                 }
2122                 pga[i] = &oap->oap_brw_page;
2123                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2124                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2125                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2126                 i++;
2127         }
2128
2129         /* always get the data for the obdo for the rpc */
2130         LASSERT(ops != NULL);
2131         ops->ap_fill_obdo(caller_data, cmd, oa);
2132         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2133
2134         sort_brw_pages(pga, page_count);
2135         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2136                                   pga, &req, ocapa);
2137         capa_put(ocapa);
2138         if (rc != 0) {
2139                 CERROR("prep_req failed: %d\n", rc);
2140                 GOTO(out, req = ERR_PTR(rc));
2141         }
2142
2143         /* Need to update the timestamps after the request is built in case
2144          * we race with setattr (locally or in queue at OST).  If OST gets
2145          * later setattr before earlier BRW (as determined by the request xid),
2146          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2147          * way to do this in a single call.  bug 10150 */
2148         ops->ap_update_obdo(caller_data, cmd, oa,
2149                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2150
2151         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2152         aa = ptlrpc_req_async_args(req);
2153         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2154         list_splice(rpc_list, &aa->aa_oaps);
2155         CFS_INIT_LIST_HEAD(rpc_list);
2156
2157 out:
2158         if (IS_ERR(req)) {
2159                 if (oa)
2160                         OBDO_FREE(oa);
2161                 if (pga)
2162                         OBD_FREE(pga, sizeof(*pga) * page_count);
2163         }
2164         RETURN(req);
2165 }
2166
2167 /* the loi lock is held across this function but it's allowed to release
2168  * and reacquire it during its work */
2169 /**
2170  * prepare pages for ASYNC io and put pages in send queue.
2171  *
2172  * \param cli -
2173  * \param loi -
2174  * \param cmd - OBD_BRW_* macroses
2175  * \param lop - pending pages
2176  *
2177  * \return zero if pages successfully add to send queue.
2178  * \return not zere if error occurring.
2179  */
2180 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2181                             int cmd, struct loi_oap_pages *lop)
2182 {
2183         struct ptlrpc_request *req;
2184         obd_count page_count = 0;
2185         struct osc_async_page *oap = NULL, *tmp;
2186         struct osc_brw_async_args *aa;
2187         struct obd_async_page_ops *ops;
2188         CFS_LIST_HEAD(rpc_list);
2189         unsigned int ending_offset;
2190         unsigned  starting_offset = 0;
2191         int srvlock = 0;
2192         ENTRY;
2193
2194         /* first we find the pages we're allowed to work with */
2195         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2196                                  oap_pending_item) {
2197                 ops = oap->oap_caller_ops;
2198
2199                 LASSERT(oap->oap_magic == OAP_MAGIC);
2200
2201                 if (page_count != 0 &&
2202                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2203                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2204                                " oap %p, page %p, srvlock %u\n",
2205                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2206                         break;
2207                 }
2208                 /* in llite being 'ready' equates to the page being locked
2209                  * until completion unlocks it.  commit_write submits a page
2210                  * as not ready because its unlock will happen unconditionally
2211                  * as the call returns.  if we race with commit_write giving
2212                  * us that page we dont' want to create a hole in the page
2213                  * stream, so we stop and leave the rpc to be fired by
2214                  * another dirtier or kupdated interval (the not ready page
2215                  * will still be on the dirty list).  we could call in
2216                  * at the end of ll_file_write to process the queue again. */
2217                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2218                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2219                         if (rc < 0)
2220                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2221                                                 "instead of ready\n", oap,
2222                                                 oap->oap_page, rc);
2223                         switch (rc) {
2224                         case -EAGAIN:
2225                                 /* llite is telling us that the page is still
2226                                  * in commit_write and that we should try
2227                                  * and put it in an rpc again later.  we
2228                                  * break out of the loop so we don't create
2229                                  * a hole in the sequence of pages in the rpc
2230                                  * stream.*/
2231                                 oap = NULL;
2232                                 break;
2233                         case -EINTR:
2234                                 /* the io isn't needed.. tell the checks
2235                                  * below to complete the rpc with EINTR */
2236                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2237                                 oap->oap_count = -EINTR;
2238                                 break;
2239                         case 0:
2240                                 oap->oap_async_flags |= ASYNC_READY;
2241                                 break;
2242                         default:
2243                                 LASSERTF(0, "oap %p page %p returned %d "
2244                                             "from make_ready\n", oap,
2245                                             oap->oap_page, rc);
2246                                 break;
2247                         }
2248                 }
2249                 if (oap == NULL)
2250                         break;
2251                 /*
2252                  * Page submitted for IO has to be locked. Either by
2253                  * ->ap_make_ready() or by higher layers.
2254                  */
2255 #if defined(__KERNEL__) && defined(__linux__)
2256                  if(!(PageLocked(oap->oap_page) &&
2257                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2258                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2259                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2260                         LBUG();
2261                 }
2262 #endif
2263                 /* If there is a gap at the start of this page, it can't merge
2264                  * with any previous page, so we'll hand the network a
2265                  * "fragmented" page array that it can't transfer in 1 RDMA */
2266                 if (page_count != 0 && oap->oap_page_off != 0)
2267                         break;
2268
2269                 /* take the page out of our book-keeping */
2270                 list_del_init(&oap->oap_pending_item);
2271                 lop_update_pending(cli, lop, cmd, -1);
2272                 list_del_init(&oap->oap_urgent_item);
2273
2274                 if (page_count == 0)
2275                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2276                                           (PTLRPC_MAX_BRW_SIZE - 1);
2277
2278                 /* ask the caller for the size of the io as the rpc leaves. */
2279                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2280                         oap->oap_count =
2281                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2282                 if (oap->oap_count <= 0) {
2283                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2284                                oap->oap_count);
2285                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2286                         continue;
2287                 }
2288
2289                 /* now put the page back in our accounting */
2290                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2291                 if (page_count == 0)
2292                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2293                 if (++page_count >= cli->cl_max_pages_per_rpc)
2294                         break;
2295
2296                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2297                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2298                  * have the same alignment as the initial writes that allocated
2299                  * extents on the server. */
2300                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2301                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2302                 if (ending_offset == 0)
2303                         break;
2304
2305                 /* If there is a gap at the end of this page, it can't merge
2306                  * with any subsequent pages, so we'll hand the network a
2307                  * "fragmented" page array that it can't transfer in 1 RDMA */
2308                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2309                         break;
2310         }
2311
2312         osc_wake_cache_waiters(cli);
2313
2314         if (page_count == 0)
2315                 RETURN(0);
2316
2317         loi_list_maint(cli, loi);
2318
2319         client_obd_list_unlock(&cli->cl_loi_list_lock);
2320
2321         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2322         if (IS_ERR(req)) {
2323                 /* this should happen rarely and is pretty bad, it makes the
2324                  * pending list not follow the dirty order */
2325                 client_obd_list_lock(&cli->cl_loi_list_lock);
2326                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2327                         list_del_init(&oap->oap_rpc_item);
2328
2329                         /* queued sync pages can be torn down while the pages
2330                          * were between the pending list and the rpc */
2331                         if (oap->oap_interrupted) {
2332                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2333                                 osc_ap_completion(cli, NULL, oap, 0,
2334                                                   oap->oap_count);
2335                                 continue;
2336                         }
2337                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2338                 }
2339                 loi_list_maint(cli, loi);
2340                 RETURN(PTR_ERR(req));
2341         }
2342
2343         aa = ptlrpc_req_async_args(req);
2344
2345         if (cmd == OBD_BRW_READ) {
2346                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2347                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2348                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2349                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2350                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2351         } else {
2352                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2353                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2354                                  cli->cl_w_in_flight);
2355                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2356                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2357                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2358         }
2359
2360         client_obd_list_lock(&cli->cl_loi_list_lock);
2361
2362         if (cmd == OBD_BRW_READ)
2363                 cli->cl_r_in_flight++;
2364         else
2365                 cli->cl_w_in_flight++;
2366
2367         /* queued sync pages can be torn down while the pages
2368          * were between the pending list and the rpc */
2369         tmp = NULL;
2370         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2371                 /* only one oap gets a request reference */
2372                 if (tmp == NULL)
2373                         tmp = oap;
2374                 if (oap->oap_interrupted && !req->rq_intr) {
2375                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2376                                oap, req);
2377                         ptlrpc_mark_interrupted(req);
2378                 }
2379         }
2380         if (tmp != NULL)
2381                 tmp->oap_request = ptlrpc_request_addref(req);
2382
2383         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2384                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2385
2386         req->rq_interpret_reply = brw_interpret;
2387         ptlrpcd_add_req(req);
2388         RETURN(1);
2389 }
2390
2391 #define LOI_DEBUG(LOI, STR, args...)                                     \
2392         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2393                !list_empty(&(LOI)->loi_cli_item),                        \
2394                (LOI)->loi_write_lop.lop_num_pending,                     \
2395                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2396                (LOI)->loi_read_lop.lop_num_pending,                      \
2397                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2398                args)                                                     \
2399
2400 /* This is called by osc_check_rpcs() to find which objects have pages that
2401  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2402 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2403 {
2404         ENTRY;
2405         /* first return all objects which we already know to have
2406          * pages ready to be stuffed into rpcs */
2407         if (!list_empty(&cli->cl_loi_ready_list))
2408                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2409                                   struct lov_oinfo, loi_cli_item));
2410
2411         /* then if we have cache waiters, return all objects with queued
2412          * writes.  This is especially important when many small files
2413          * have filled up the cache and not been fired into rpcs because
2414          * they don't pass the nr_pending/object threshhold */
2415         if (!list_empty(&cli->cl_cache_waiters) &&
2416             !list_empty(&cli->cl_loi_write_list))
2417                 RETURN(list_entry(cli->cl_loi_write_list.next,
2418                                   struct lov_oinfo, loi_write_item));
2419
2420         /* then return all queued objects when we have an invalid import
2421          * so that they get flushed */
2422         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2423                 if (!list_empty(&cli->cl_loi_write_list))
2424                         RETURN(list_entry(cli->cl_loi_write_list.next,
2425                                           struct lov_oinfo, loi_write_item));
2426                 if (!list_empty(&cli->cl_loi_read_list))
2427                         RETURN(list_entry(cli->cl_loi_read_list.next,
2428                                           struct lov_oinfo, loi_read_item));
2429         }
2430         RETURN(NULL);
2431 }
2432
2433 /* called with the loi list lock held */
2434 static void osc_check_rpcs(struct client_obd *cli)
2435 {
2436         struct lov_oinfo *loi;
2437         int rc = 0, race_counter = 0;
2438         ENTRY;
2439
2440         while ((loi = osc_next_loi(cli)) != NULL) {
2441                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2442
2443                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2444                         break;
2445
2446                 /* attempt some read/write balancing by alternating between
2447                  * reads and writes in an object.  The makes_rpc checks here
2448                  * would be redundant if we were getting read/write work items
2449                  * instead of objects.  we don't want send_oap_rpc to drain a
2450                  * partial read pending queue when we're given this object to
2451                  * do io on writes while there are cache waiters */
2452                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2453                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2454                                               &loi->loi_write_lop);
2455                         if (rc < 0)
2456                                 break;
2457                         if (rc > 0)
2458                                 race_counter = 0;
2459                         else
2460                                 race_counter++;
2461                 }
2462                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2463                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2464                                               &loi->loi_read_lop);
2465                         if (rc < 0)
2466                                 break;
2467                         if (rc > 0)
2468                                 race_counter = 0;
2469                         else
2470                                 race_counter++;
2471                 }
2472
2473                 /* attempt some inter-object balancing by issueing rpcs
2474                  * for each object in turn */
2475                 if (!list_empty(&loi->loi_cli_item))
2476                         list_del_init(&loi->loi_cli_item);
2477                 if (!list_empty(&loi->loi_write_item))
2478                         list_del_init(&loi->loi_write_item);
2479                 if (!list_empty(&loi->loi_read_item))
2480                         list_del_init(&loi->loi_read_item);
2481
2482                 loi_list_maint(cli, loi);
2483
2484                 /* send_oap_rpc fails with 0 when make_ready tells it to
2485                  * back off.  llite's make_ready does this when it tries
2486                  * to lock a page queued for write that is already locked.
2487                  * we want to try sending rpcs from many objects, but we
2488                  * don't want to spin failing with 0.  */
2489                 if (race_counter == 10)
2490                         break;
2491         }
2492         EXIT;
2493 }
2494
2495 /* we're trying to queue a page in the osc so we're subject to the
2496  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2497  * If the osc's queued pages are already at that limit, then we want to sleep
2498  * until there is space in the osc's queue for us.  We also may be waiting for
2499  * write credits from the OST if there are RPCs in flight that may return some
2500  * before we fall back to sync writes.
2501  *
2502  * We need this know our allocation was granted in the presence of signals */
2503 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2504 {
2505         int rc;
2506         ENTRY;
2507         client_obd_list_lock(&cli->cl_loi_list_lock);
2508         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2509         client_obd_list_unlock(&cli->cl_loi_list_lock);
2510         RETURN(rc);
2511 };
2512
2513 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2514  * grant or cache space. */
2515 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2516                            struct osc_async_page *oap)
2517 {
2518         struct osc_cache_waiter ocw;
2519         struct l_wait_info lwi = { 0 };
2520
2521         ENTRY;
2522
2523         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2524                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2525                cli->cl_dirty_max, obd_max_dirty_pages,
2526                cli->cl_lost_grant, cli->cl_avail_grant);
2527
2528         /* force the caller to try sync io.  this can jump the list
2529          * of queued writes and create a discontiguous rpc stream */
2530         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2531             loi->loi_ar.ar_force_sync)
2532                 RETURN(-EDQUOT);
2533
2534         /* Hopefully normal case - cache space and write credits available */
2535         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2536             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2537             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2538                 /* account for ourselves */
2539                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2540                 RETURN(0);
2541         }
2542
2543         /* Make sure that there are write rpcs in flight to wait for.  This
2544          * is a little silly as this object may not have any pending but
2545          * other objects sure might. */
2546         if (cli->cl_w_in_flight) {
2547                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2548                 cfs_waitq_init(&ocw.ocw_waitq);
2549                 ocw.ocw_oap = oap;
2550                 ocw.ocw_rc = 0;
2551
2552                 loi_list_maint(cli, loi);
2553                 osc_check_rpcs(cli);
2554                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2555
2556                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2557                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2558
2559                 client_obd_list_lock(&cli->cl_loi_list_lock);
2560                 if (!list_empty(&ocw.ocw_entry)) {
2561                         list_del(&ocw.ocw_entry);
2562                         RETURN(-EINTR);
2563                 }
2564                 RETURN(ocw.ocw_rc);
2565         }
2566
2567         RETURN(-EDQUOT);
2568 }
2569
2570 /**
2571  * Checks if requested extent lock is compatible with a lock under the page.
2572  *
2573  * Checks if the lock under \a page is compatible with a read or write lock
2574  * (specified by \a rw) for an extent [\a start , \a end].
2575  *
2576  * \param exp osc export
2577  * \param lsm striping information for the file
2578  * \param res osc_async_page placeholder
2579  * \param rw OBD_BRW_READ if requested for reading,
2580  *           OBD_BRW_WRITE if requested for writing
2581  * \param start start of the requested extent
2582  * \param end end of the requested extent
2583  * \param cookie transparent parameter for passing locking context
2584  *
2585  * \post result == 1, *cookie == context, appropriate lock is referenced or
2586  * \post result == 0
2587  *
2588  * \retval 1 owned lock is reused for the request
2589  * \retval 0 no lock reused for the request
2590  *
2591  * \see osc_release_short_lock
2592  */
2593 static int osc_reget_short_lock(struct obd_export *exp,
2594                                 struct lov_stripe_md *lsm,
2595                                 void **res, int rw,
2596                                 obd_off start, obd_off end,
2597                                 void **cookie)
2598 {
2599         struct osc_async_page *oap = *res;
2600         int rc;
2601
2602         ENTRY;
2603
2604         spin_lock(&oap->oap_lock);
2605         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2606                                   start, end, cookie);
2607         spin_unlock(&oap->oap_lock);
2608
2609         RETURN(rc);
2610 }
2611
2612 /**
2613  * Releases a reference to a lock taken in a "fast" way.
2614  *
2615  * Releases a read or a write (specified by \a rw) lock
2616  * referenced by \a cookie.
2617  *
2618  * \param exp osc export
2619  * \param lsm striping information for the file
2620  * \param end end of the locked extent
2621  * \param rw OBD_BRW_READ if requested for reading,
2622  *           OBD_BRW_WRITE if requested for writing
2623  * \param cookie transparent parameter for passing locking context
2624  *
2625  * \post appropriate lock is dereferenced
2626  *
2627  * \see osc_reget_short_lock
2628  */
2629 static int osc_release_short_lock(struct obd_export *exp,
2630                                   struct lov_stripe_md *lsm, obd_off end,
2631                                   void *cookie, int rw)
2632 {
2633         ENTRY;
2634         ldlm_lock_fast_release(cookie, rw);
2635         /* no error could have happened at this layer */
2636         RETURN(0);
2637 }
2638
2639 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2640                         struct lov_oinfo *loi, cfs_page_t *page,
2641                         obd_off offset, struct obd_async_page_ops *ops,
2642                         void *data, void **res, int nocache,
2643                         struct lustre_handle *lockh)
2644 {
2645         struct osc_async_page *oap;
2646         struct ldlm_res_id oid;
2647         int rc = 0;
2648         ENTRY;
2649
2650         if (!page)
2651                 return size_round(sizeof(*oap));
2652
2653         oap = *res;
2654         oap->oap_magic = OAP_MAGIC;
2655         oap->oap_cli = &exp->exp_obd->u.cli;
2656         oap->oap_loi = loi;
2657
2658         oap->oap_caller_ops = ops;
2659         oap->oap_caller_data = data;
2660
2661         oap->oap_page = page;
2662         oap->oap_obj_off = offset;
2663
2664         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2665         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2666         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2667         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2668
2669         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2670
2671         spin_lock_init(&oap->oap_lock);
2672
2673         /* If the page was marked as notcacheable - don't add to any locks */ 
2674         if (!nocache) {
2675                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2676                 /* This is the only place where we can call cache_add_extent
2677                    without oap_lock, because this page is locked now, and
2678                    the lock we are adding it to is referenced, so cannot lose
2679                    any pages either. */
2680                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2681                 if (rc)
2682                         RETURN(rc);
2683         }
2684
2685         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2686         RETURN(0);
2687 }
2688
2689 struct osc_async_page *oap_from_cookie(void *cookie)
2690 {
2691         struct osc_async_page *oap = cookie;
2692         if (oap->oap_magic != OAP_MAGIC)
2693                 return ERR_PTR(-EINVAL);
2694         return oap;
2695 };
2696
2697 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2698                               struct lov_oinfo *loi, void *cookie,
2699                               int cmd, obd_off off, int count,
2700                               obd_flag brw_flags, enum async_flags async_flags)
2701 {
2702         struct client_obd *cli = &exp->exp_obd->u.cli;
2703         struct osc_async_page *oap;
2704         int rc = 0;
2705         ENTRY;
2706
2707         oap = oap_from_cookie(cookie);
2708         if (IS_ERR(oap))
2709                 RETURN(PTR_ERR(oap));
2710
2711         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2712                 RETURN(-EIO);
2713
2714         if (!list_empty(&oap->oap_pending_item) ||
2715             !list_empty(&oap->oap_urgent_item) ||
2716             !list_empty(&oap->oap_rpc_item))
2717                 RETURN(-EBUSY);
2718
2719         /* check if the file's owner/group is over quota */
2720 #ifdef HAVE_QUOTA_SUPPORT
2721         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2722                 struct obd_async_page_ops *ops;
2723                 struct obdo *oa;
2724
2725                 OBDO_ALLOC(oa);
2726                 if (oa == NULL)
2727                         RETURN(-ENOMEM);
2728
2729                 ops = oap->oap_caller_ops;
2730                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2731                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2732                     NO_QUOTA)
2733                         rc = -EDQUOT;
2734
2735                 OBDO_FREE(oa);
2736                 if (rc)
2737                         RETURN(rc);
2738         }
2739 #endif
2740
2741         if (loi == NULL)
2742                 loi = lsm->lsm_oinfo[0];
2743
2744         client_obd_list_lock(&cli->cl_loi_list_lock);
2745
2746         oap->oap_cmd = cmd;
2747         oap->oap_page_off = off;
2748         oap->oap_count = count;
2749         oap->oap_brw_flags = brw_flags;
2750         oap->oap_async_flags = async_flags;
2751
2752         if (cmd & OBD_BRW_WRITE) {
2753                 rc = osc_enter_cache(cli, loi, oap);
2754                 if (rc) {
2755                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2756                         RETURN(rc);
2757                 }
2758         }
2759
2760         osc_oap_to_pending(oap);
2761         loi_list_maint(cli, loi);
2762
2763         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2764                   cmd);
2765
2766         osc_check_rpcs(cli);
2767         client_obd_list_unlock(&cli->cl_loi_list_lock);
2768
2769         RETURN(0);
2770 }
2771
2772 /* aka (~was & now & flag), but this is more clear :) */
2773 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2774
2775 static int osc_set_async_flags(struct obd_export *exp,
2776                                struct lov_stripe_md *lsm,
2777                                struct lov_oinfo *loi, void *cookie,
2778                                obd_flag async_flags)
2779 {
2780         struct client_obd *cli = &exp->exp_obd->u.cli;
2781         struct loi_oap_pages *lop;
2782         struct osc_async_page *oap;
2783         int rc = 0;
2784         ENTRY;
2785
2786         oap = oap_from_cookie(cookie);
2787         if (IS_ERR(oap))
2788                 RETURN(PTR_ERR(oap));
2789
2790         /*
2791          * bug 7311: OST-side locking is only supported for liblustre for now
2792          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2793          * implementation has to handle case where OST-locked page was picked
2794          * up by, e.g., ->writepage().
2795          */
2796         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2797         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2798                                      * tread here. */
2799
2800         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2801                 RETURN(-EIO);
2802
2803         if (loi == NULL)
2804                 loi = lsm->lsm_oinfo[0];
2805
2806         if (oap->oap_cmd & OBD_BRW_WRITE) {
2807                 lop = &loi->loi_write_lop;
2808         } else {
2809                 lop = &loi->loi_read_lop;
2810         }
2811
2812         client_obd_list_lock(&cli->cl_loi_list_lock);
2813
2814         if (list_empty(&oap->oap_pending_item))
2815                 GOTO(out, rc = -EINVAL);
2816
2817         if ((oap->oap_async_flags & async_flags) == async_flags)
2818                 GOTO(out, rc = 0);
2819
2820         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2821                 oap->oap_async_flags |= ASYNC_READY;
2822
2823         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2824                 if (list_empty(&oap->oap_rpc_item)) {
2825                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2826                         loi_list_maint(cli, loi);
2827                 }
2828         }
2829
2830         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2831                         oap->oap_async_flags);
2832 out:
2833         osc_check_rpcs(cli);
2834         client_obd_list_unlock(&cli->cl_loi_list_lock);
2835         RETURN(rc);
2836 }
2837
2838 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2839                              struct lov_oinfo *loi,
2840                              struct obd_io_group *oig, void *cookie,
2841                              int cmd, obd_off off, int count,
2842                              obd_flag brw_flags,
2843                              obd_flag async_flags)
2844 {
2845         struct client_obd *cli = &exp->exp_obd->u.cli;
2846         struct osc_async_page *oap;
2847         struct loi_oap_pages *lop;
2848         int rc = 0;
2849         ENTRY;
2850
2851         oap = oap_from_cookie(cookie);
2852         if (IS_ERR(oap))
2853                 RETURN(PTR_ERR(oap));
2854
2855         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2856                 RETURN(-EIO);
2857
2858         if (!list_empty(&oap->oap_pending_item) ||
2859             !list_empty(&oap->oap_urgent_item) ||
2860             !list_empty(&oap->oap_rpc_item))
2861                 RETURN(-EBUSY);
2862
2863         if (loi == NULL)
2864                 loi = lsm->lsm_oinfo[0];
2865
2866         client_obd_list_lock(&cli->cl_loi_list_lock);
2867
2868         oap->oap_cmd = cmd;
2869         oap->oap_page_off = off;
2870         oap->oap_count = count;
2871         oap->oap_brw_flags = brw_flags;
2872         oap->oap_async_flags = async_flags;
2873
2874         if (cmd & OBD_BRW_WRITE)
2875                 lop = &loi->loi_write_lop;
2876         else
2877                 lop = &loi->loi_read_lop;
2878
2879         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2880         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2881                 oap->oap_oig = oig;
2882                 rc = oig_add_one(oig, &oap->oap_occ);
2883         }
2884
2885         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2886                   oap, oap->oap_page, rc);
2887
2888         client_obd_list_unlock(&cli->cl_loi_list_lock);
2889
2890         RETURN(rc);
2891 }
2892
2893 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2894                                  struct loi_oap_pages *lop, int cmd)
2895 {
2896         struct list_head *pos, *tmp;
2897         struct osc_async_page *oap;
2898
2899         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2900                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2901                 list_del(&oap->oap_pending_item);
2902                 osc_oap_to_pending(oap);
2903         }
2904         loi_list_maint(cli, loi);
2905 }
2906
2907 static int osc_trigger_group_io(struct obd_export *exp,
2908                                 struct lov_stripe_md *lsm,
2909                                 struct lov_oinfo *loi,
2910                                 struct obd_io_group *oig)
2911 {
2912         struct client_obd *cli = &exp->exp_obd->u.cli;
2913         ENTRY;
2914
2915         if (loi == NULL)
2916                 loi = lsm->lsm_oinfo[0];
2917
2918         client_obd_list_lock(&cli->cl_loi_list_lock);
2919
2920         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2921         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2922
2923         osc_check_rpcs(cli);
2924         client_obd_list_unlock(&cli->cl_loi_list_lock);
2925
2926         RETURN(0);
2927 }
2928
2929 static int osc_teardown_async_page(struct obd_export *exp,
2930                                    struct lov_stripe_md *lsm,
2931                                    struct lov_oinfo *loi, void *cookie)
2932 {
2933         struct client_obd *cli = &exp->exp_obd->u.cli;
2934         struct loi_oap_pages *lop;
2935         struct osc_async_page *oap;
2936         int rc = 0;
2937         ENTRY;
2938
2939         oap = oap_from_cookie(cookie);
2940         if (IS_ERR(oap))
2941                 RETURN(PTR_ERR(oap));
2942
2943         if (loi == NULL)
2944                 loi = lsm->lsm_oinfo[0];
2945
2946         if (oap->oap_cmd & OBD_BRW_WRITE) {
2947                 lop = &loi->loi_write_lop;
2948         } else {
2949                 lop = &loi->loi_read_lop;
2950         }
2951
2952         client_obd_list_lock(&cli->cl_loi_list_lock);
2953
2954         if (!list_empty(&oap->oap_rpc_item))
2955                 GOTO(out, rc = -EBUSY);
2956
2957         osc_exit_cache(cli, oap, 0);
2958         osc_wake_cache_waiters(cli);
2959
2960         if (!list_empty(&oap->oap_urgent_item)) {
2961                 list_del_init(&oap->oap_urgent_item);
2962                 oap->oap_async_flags &= ~ASYNC_URGENT;
2963         }
2964         if (!list_empty(&oap->oap_pending_item)) {
2965                 list_del_init(&oap->oap_pending_item);
2966                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2967         }
2968         loi_list_maint(cli, loi);
2969         cache_remove_extent(cli->cl_cache, oap);
2970
2971         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2972 out:
2973         client_obd_list_unlock(&cli->cl_loi_list_lock);
2974         RETURN(rc);
2975 }
2976
2977 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2978                            struct ldlm_lock_desc *new, void *data,
2979                            int flag)
2980 {
2981         struct lustre_handle lockh = { 0 };
2982         int rc;
2983         ENTRY;  
2984                 
2985         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2986                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2987                 LBUG(); 
2988         }       
2989
2990         switch (flag) {
2991         case LDLM_CB_BLOCKING:
2992                 ldlm_lock2handle(lock, &lockh);
2993                 rc = ldlm_cli_cancel(&lockh);
2994                 if (rc != ELDLM_OK)
2995                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2996                 break;
2997         case LDLM_CB_CANCELING: {
2998
2999                 ldlm_lock2handle(lock, &lockh);
3000                 /* This lock wasn't granted, don't try to do anything */
3001                 if (lock->l_req_mode != lock->l_granted_mode)
3002                         RETURN(0);
3003
3004                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3005                                   &lockh);
3006
3007                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3008                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3009                                                           lock, new, data,flag);
3010                 break;
3011         }
3012         default:
3013                 LBUG();
3014         }
3015
3016         RETURN(0);
3017 }
3018 EXPORT_SYMBOL(osc_extent_blocking_cb);
3019
3020 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3021                                     int flags)
3022 {
3023         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3024
3025         if (lock == NULL) {
3026                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3027                 return;
3028         }
3029         lock_res_and_lock(lock);
3030 #if defined (__KERNEL__) && defined (__linux__)
3031         /* Liang XXX: Darwin and Winnt checking should be added */
3032         if (lock->l_ast_data && lock->l_ast_data != data) {
3033                 struct inode *new_inode = data;
3034                 struct inode *old_inode = lock->l_ast_data;
3035                 if (!(old_inode->i_state & I_FREEING))
3036                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3037                 LASSERTF(old_inode->i_state & I_FREEING,
3038                          "Found existing inode %p/%lu/%u state %lu in lock: "
3039                          "setting data to %p/%lu/%u\n", old_inode,
3040                          old_inode->i_ino, old_inode->i_generation,
3041                          old_inode->i_state,
3042                          new_inode, new_inode->i_ino, new_inode->i_generation);
3043         }
3044 #endif
3045         lock->l_ast_data = data;
3046         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3047         unlock_res_and_lock(lock);
3048         LDLM_LOCK_PUT(lock);
3049 }
3050
3051 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3052                              ldlm_iterator_t replace, void *data)
3053 {
3054         struct ldlm_res_id res_id; 
3055         struct obd_device *obd = class_exp2obd(exp);
3056
3057         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3058         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3059         return 0;
3060 }
3061
3062 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3063                             struct obd_info *oinfo, int intent, int rc)
3064 {
3065         ENTRY;
3066
3067         if (intent) {
3068                 /* The request was created before ldlm_cli_enqueue call. */
3069                 if (rc == ELDLM_LOCK_ABORTED) {
3070                         struct ldlm_reply *rep;
3071                         rep = req_capsule_server_get(&req->rq_pill,
3072                                                      &RMF_DLM_REP);
3073
3074                         LASSERT(rep != NULL);
3075                         if (rep->lock_policy_res1)
3076                                 rc = rep->lock_policy_res1;
3077                 }
3078         }
3079
3080         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3081                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3082                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3083                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3084                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3085         }
3086
3087         if (!rc)
3088                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3089
3090         /* Call the update callback. */
3091         rc = oinfo->oi_cb_up(oinfo, rc);
3092         RETURN(rc);
3093 }
3094
3095 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3096                                  struct osc_enqueue_args *aa, int rc)
3097 {
3098         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3099         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3100         struct ldlm_lock *lock;
3101
3102         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3103          * be valid. */
3104         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3105
3106         /* Complete obtaining the lock procedure. */
3107         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3108                                    aa->oa_ei->ei_mode,
3109                                    &aa->oa_oi->oi_flags,
3110                                    &lsm->lsm_oinfo[0]->loi_lvb,
3111                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3112                                    lustre_swab_ost_lvb,
3113                                    aa->oa_oi->oi_lockh, rc);
3114
3115         /* Complete osc stuff. */
3116         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3117
3118         /* Release the lock for async request. */
3119         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3120                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3121
3122         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3123                  aa->oa_oi->oi_lockh, req, aa);
3124         LDLM_LOCK_PUT(lock);
3125         return rc;
3126 }
3127
3128 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3129  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3130  * other synchronous requests, however keeping some locks and trying to obtain
3131  * others may take a considerable amount of time in a case of ost failure; and
3132  * when other sync requests do not get released lock from a client, the client
3133  * is excluded from the cluster -- such scenarious make the life difficult, so
3134  * release locks just after they are obtained. */
3135 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3136                        struct ldlm_enqueue_info *einfo,
3137                        struct ptlrpc_request_set *rqset)
3138 {
3139         struct ldlm_res_id res_id;
3140         struct obd_device *obd = exp->exp_obd;
3141         struct ptlrpc_request *req = NULL;
3142         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3143         ldlm_mode_t mode;
3144         int rc;
3145         ENTRY;
3146
3147
3148         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3149                            oinfo->oi_md->lsm_object_gr, &res_id);
3150         /* Filesystem lock extents are extended to page boundaries so that
3151          * dealing with the page cache is a little smoother.  */
3152         oinfo->oi_policy.l_extent.start -=
3153                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3154         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3155
3156         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3157                 goto no_match;
3158
3159         /* Next, search for already existing extent locks that will cover us */
3160         /* If we're trying to read, we also search for an existing PW lock.  The
3161          * VFS and page cache already protect us locally, so lots of readers/
3162          * writers can share a single PW lock.
3163          *
3164          * There are problems with conversion deadlocks, so instead of
3165          * converting a read lock to a write lock, we'll just enqueue a new
3166          * one.
3167          *
3168          * At some point we should cancel the read lock instead of making them
3169          * send us a blocking callback, but there are problems with canceling
3170          * locks out from other users right now, too. */
3171         mode = einfo->ei_mode;
3172         if (einfo->ei_mode == LCK_PR)
3173                 mode |= LCK_PW;
3174         mode = ldlm_lock_match(obd->obd_namespace,
3175                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3176                                einfo->ei_type, &oinfo->oi_policy, mode,
3177                                oinfo->oi_lockh);
3178         if (mode) {
3179                 /* addref the lock only if not async requests and PW lock is
3180                  * matched whereas we asked for PR. */
3181                 if (!rqset && einfo->ei_mode != mode)
3182                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3183                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3184                                         oinfo->oi_flags);
3185                 if (intent) {
3186                         /* I would like to be able to ASSERT here that rss <=
3187                          * kms, but I can't, for reasons which are explained in
3188                          * lov_enqueue() */
3189                 }
3190
3191                 /* We already have a lock, and it's referenced */
3192                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3193
3194                 /* For async requests, decref the lock. */
3195                 if (einfo->ei_mode != mode)
3196                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3197                 else if (rqset)
3198                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3199
3200                 RETURN(ELDLM_OK);
3201         }
3202
3203  no_match:
3204         if (intent) {
3205                 CFS_LIST_HEAD(cancels);
3206                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3207                                            &RQF_LDLM_ENQUEUE_LVB);
3208                 if (req == NULL)
3209                         RETURN(-ENOMEM);
3210
3211                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3212                 if (rc)
3213                         RETURN(rc);
3214
3215                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3216                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3217                 ptlrpc_request_set_replen(req);
3218         }
3219
3220         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3221         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3222
3223         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3224                               &oinfo->oi_policy, &oinfo->oi_flags,
3225                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3226                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3227                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3228                               rqset ? 1 : 0);
3229         if (rqset) {
3230                 if (!rc) {
3231                         struct osc_enqueue_args *aa;
3232                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3233                         aa = ptlrpc_req_async_args(req);
3234                         aa->oa_oi = oinfo;
3235                         aa->oa_ei = einfo;
3236                         aa->oa_exp = exp;
3237
3238                         req->rq_interpret_reply = osc_enqueue_interpret;
3239                         ptlrpc_set_add_req(rqset, req);
3240                 } else if (intent) {
3241                         ptlrpc_req_finished(req);
3242                 }
3243                 RETURN(rc);
3244         }
3245
3246         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3247         if (intent)
3248                 ptlrpc_req_finished(req);
3249
3250         RETURN(rc);
3251 }
3252
3253 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3254                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3255                      int *flags, void *data, struct lustre_handle *lockh)
3256 {
3257         struct ldlm_res_id res_id;
3258         struct obd_device *obd = exp->exp_obd;
3259         int lflags = *flags;
3260         ldlm_mode_t rc;
3261         ENTRY;
3262
3263         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3264         
3265         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3266                 RETURN(-EIO);
3267
3268         /* Filesystem lock extents are extended to page boundaries so that
3269          * dealing with the page cache is a little smoother */
3270         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3271         policy->l_extent.end |= ~CFS_PAGE_MASK;
3272
3273         /* Next, search for already existing extent locks that will cover us */
3274         /* If we're trying to read, we also search for an existing PW lock.  The
3275          * VFS and page cache already protect us locally, so lots of readers/
3276          * writers can share a single PW lock. */
3277         rc = mode;
3278         if (mode == LCK_PR)
3279                 rc |= LCK_PW;
3280         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3281                              &res_id, type, policy, rc, lockh);
3282         if (rc) {
3283                 osc_set_data_with_check(lockh, data, lflags);
3284                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3285                         ldlm_lock_addref(lockh, LCK_PR);
3286                         ldlm_lock_decref(lockh, LCK_PW);
3287                 }
3288                 RETURN(rc);
3289         }
3290         RETURN(rc);
3291 }
3292
3293 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3294                       __u32 mode, struct lustre_handle *lockh)
3295 {
3296         ENTRY;
3297
3298         if (unlikely(mode == LCK_GROUP))
3299                 ldlm_lock_decref_and_cancel(lockh, mode);
3300         else
3301                 ldlm_lock_decref(lockh, mode);
3302
3303         RETURN(0);
3304 }
3305
3306 static int osc_cancel_unused(struct obd_export *exp,
3307                              struct lov_stripe_md *lsm, int flags,
3308                              void *opaque)
3309 {
3310         struct obd_device *obd = class_exp2obd(exp);
3311         struct ldlm_res_id res_id, *resp = NULL;
3312
3313         if (lsm != NULL) {
3314                 resp = osc_build_res_name(lsm->lsm_object_id,
3315                                           lsm->lsm_object_gr, &res_id);
3316         }
3317
3318         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3319 }
3320
3321 static int osc_join_lru(struct obd_export *exp,
3322                         struct lov_stripe_md *lsm, int join)
3323 {
3324         struct obd_device *obd = class_exp2obd(exp);
3325         struct ldlm_res_id res_id, *resp = NULL;
3326
3327         if (lsm != NULL) {
3328                 resp = osc_build_res_name(lsm->lsm_object_id,
3329                                           lsm->lsm_object_gr, &res_id);
3330         }
3331
3332         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3333 }
3334
3335 static int osc_statfs_interpret(struct ptlrpc_request *req,
3336                                 struct osc_async_args *aa, int rc)
3337 {
3338         struct obd_statfs *msfs;
3339         ENTRY;
3340
3341         if (rc != 0)
3342                 GOTO(out, rc);
3343
3344         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3345         if (msfs == NULL) {
3346                 GOTO(out, rc = -EPROTO);
3347         }
3348
3349         *aa->aa_oi->oi_osfs = *msfs;
3350 out:
3351         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3352         RETURN(rc);
3353 }
3354
3355 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3356                             __u64 max_age, struct ptlrpc_request_set *rqset)
3357 {
3358         struct ptlrpc_request *req;
3359         struct osc_async_args *aa;
3360         int                    rc;
3361         ENTRY;
3362
3363         /* We could possibly pass max_age in the request (as an absolute
3364          * timestamp or a "seconds.usec ago") so the target can avoid doing
3365          * extra calls into the filesystem if that isn't necessary (e.g.
3366          * during mount that would help a bit).  Having relative timestamps
3367          * is not so great if request processing is slow, while absolute
3368          * timestamps are not ideal because they need time synchronization. */
3369         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3370         if (req == NULL)
3371                 RETURN(-ENOMEM);
3372
3373         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3374         if (rc) {
3375                 ptlrpc_request_free(req);
3376                 RETURN(rc);
3377         }
3378         ptlrpc_request_set_replen(req);
3379         req->rq_request_portal = OST_CREATE_PORTAL;
3380         ptlrpc_at_set_req_timeout(req);
3381
3382         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3383                 /* procfs requests not want stat in wait for avoid deadlock */
3384                 req->rq_no_resend = 1;
3385                 req->rq_no_delay = 1;
3386         }
3387
3388         req->rq_interpret_reply = osc_statfs_interpret;
3389         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3390         aa = ptlrpc_req_async_args(req);
3391         aa->aa_oi = oinfo;
3392
3393         ptlrpc_set_add_req(rqset, req);
3394         RETURN(0);
3395 }
3396
3397 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3398                       __u64 max_age, __u32 flags)
3399 {
3400         struct obd_statfs     *msfs;
3401         struct ptlrpc_request *req;
3402         struct obd_import     *imp = NULL;
3403         int rc;
3404         ENTRY;
3405
3406         /*Since the request might also come from lprocfs, so we need 
3407          *sync this with client_disconnect_export Bug15684*/
3408         down_read(&obd->u.cli.cl_sem);
3409         if (obd->u.cli.cl_import)
3410                 imp = class_import_get(obd->u.cli.cl_import);
3411         up_read(&obd->u.cli.cl_sem);
3412         if (!imp)
3413                 RETURN(-ENODEV);
3414         
3415         /* We could possibly pass max_age in the request (as an absolute
3416          * timestamp or a "seconds.usec ago") so the target can avoid doing
3417          * extra calls into the filesystem if that isn't necessary (e.g.
3418          * during mount that would help a bit).  Having relative timestamps
3419          * is not so great if request processing is slow, while absolute
3420          * timestamps are not ideal because they need time synchronization. */
3421         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3422         
3423         class_import_put(imp);
3424         
3425         if (req == NULL)
3426                 RETURN(-ENOMEM);
3427
3428         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3429         if (rc) {
3430                 ptlrpc_request_free(req);
3431                 RETURN(rc);
3432         }
3433         ptlrpc_request_set_replen(req);
3434         req->rq_request_portal = OST_CREATE_PORTAL;
3435         ptlrpc_at_set_req_timeout(req);
3436
3437         if (flags & OBD_STATFS_NODELAY) {
3438                 /* procfs requests not want stat in wait for avoid deadlock */
3439                 req->rq_no_resend = 1;
3440                 req->rq_no_delay = 1;
3441         }
3442
3443         rc = ptlrpc_queue_wait(req);
3444         if (rc)
3445                 GOTO(out, rc);
3446
3447         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3448         if (msfs == NULL) {
3449                 GOTO(out, rc = -EPROTO);
3450         }
3451
3452         *osfs = *msfs;
3453
3454         EXIT;
3455  out:
3456         ptlrpc_req_finished(req);
3457         return rc;
3458 }
3459
3460 /* Retrieve object striping information.
3461  *
3462  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3463  * the maximum number of OST indices which will fit in the user buffer.
3464  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3465  */
3466 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3467 {
3468         struct lov_user_md lum, *lumk;
3469         int rc = 0, lum_size;
3470         ENTRY;
3471
3472         if (!lsm)
3473                 RETURN(-ENODATA);
3474
3475         if (copy_from_user(&lum, lump, sizeof(lum)))
3476                 RETURN(-EFAULT);
3477
3478         if (lum.lmm_magic != LOV_USER_MAGIC)
3479                 RETURN(-EINVAL);
3480
3481         if (lum.lmm_stripe_count > 0) {
3482                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3483                 OBD_ALLOC(lumk, lum_size);
3484                 if (!lumk)
3485                         RETURN(-ENOMEM);
3486
3487                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3488                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3489         } else {
3490                 lum_size = sizeof(lum);
3491                 lumk = &lum;
3492         }
3493
3494         lumk->lmm_object_id = lsm->lsm_object_id;
3495         lumk->lmm_object_gr = lsm->lsm_object_gr;
3496         lumk->lmm_stripe_count = 1;
3497
3498         if (copy_to_user(lump, lumk, lum_size))
3499                 rc = -EFAULT;
3500
3501         if (lumk != &lum)
3502                 OBD_FREE(lumk, lum_size);
3503
3504         RETURN(rc);
3505 }
3506
3507
3508 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3509                          void *karg, void *uarg)
3510 {
3511         struct obd_device *obd = exp->exp_obd;
3512         struct obd_ioctl_data *data = karg;
3513         int err = 0;
3514         ENTRY;
3515
3516         if (!try_module_get(THIS_MODULE)) {
3517                 CERROR("Can't get module. Is it alive?");
3518                 return -EINVAL;
3519         }
3520         switch (cmd) {
3521         case OBD_IOC_LOV_GET_CONFIG: {
3522                 char *buf;
3523                 struct lov_desc *desc;
3524                 struct obd_uuid uuid;
3525
3526                 buf = NULL;
3527                 len = 0;
3528                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3529                         GOTO(out, err = -EINVAL);
3530
3531                 data = (struct obd_ioctl_data *)buf;
3532
3533                 if (sizeof(*desc) > data->ioc_inllen1) {
3534                         obd_ioctl_freedata(buf, len);
3535                         GOTO(out, err = -EINVAL);
3536                 }
3537
3538                 if (data->ioc_inllen2 < sizeof(uuid)) {
3539                         obd_ioctl_freedata(buf, len);
3540                         GOTO(out, err = -EINVAL);
3541                 }
3542
3543                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3544                 desc->ld_tgt_count = 1;
3545                 desc->ld_active_tgt_count = 1;
3546                 desc->ld_default_stripe_count = 1;
3547                 desc->ld_default_stripe_size = 0;
3548                 desc->ld_default_stripe_offset = 0;
3549                 desc->ld_pattern = 0;
3550                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3551
3552                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3553
3554                 err = copy_to_user((void *)uarg, buf, len);
3555                 if (err)
3556                         err = -EFAULT;
3557                 obd_ioctl_freedata(buf, len);
3558                 GOTO(out, err);
3559         }
3560         case LL_IOC_LOV_SETSTRIPE:
3561                 err = obd_alloc_memmd(exp, karg);
3562                 if (err > 0)
3563                         err = 0;
3564                 GOTO(out, err);
3565         case LL_IOC_LOV_GETSTRIPE:
3566                 err = osc_getstripe(karg, uarg);
3567                 GOTO(out, err);
3568         case OBD_IOC_CLIENT_RECOVER:
3569                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3570                                             data->ioc_inlbuf1);
3571                 if (err > 0)
3572                         err = 0;
3573                 GOTO(out, err);
3574         case IOC_OSC_SET_ACTIVE:
3575                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3576                                                data->ioc_offset);
3577                 GOTO(out, err);
3578         case OBD_IOC_POLL_QUOTACHECK:
3579                 err = lquota_poll_check(quota_interface, exp,
3580                                         (struct if_quotacheck *)karg);
3581                 GOTO(out, err);
3582         default:
3583                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3584                        cmd, cfs_curproc_comm());
3585                 GOTO(out, err = -ENOTTY);
3586         }
3587 out:
3588         module_put(THIS_MODULE);
3589         return err;
3590 }
3591
3592 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3593                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3594 {
3595         ENTRY;
3596         if (!vallen || !val)
3597                 RETURN(-EFAULT);
3598
3599         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3600                 __u32 *stripe = val;
3601                 *vallen = sizeof(*stripe);
3602                 *stripe = 0;
3603                 RETURN(0);
3604         } else if (KEY_IS(KEY_LAST_ID)) {
3605                 struct ptlrpc_request *req;
3606                 obd_id                *reply;
3607                 char                  *tmp;
3608                 int                    rc;
3609
3610                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3611                                            &RQF_OST_GET_INFO_LAST_ID);
3612                 if (req == NULL)
3613                         RETURN(-ENOMEM);
3614
3615                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3616                                      RCL_CLIENT, keylen);
3617                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3618                 if (rc) {
3619                         ptlrpc_request_free(req);
3620                         RETURN(rc);
3621                 }
3622
3623                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3624                 memcpy(tmp, key, keylen);
3625
3626                 ptlrpc_request_set_replen(req);
3627                 rc = ptlrpc_queue_wait(req);
3628                 if (rc)
3629                         GOTO(out, rc);
3630
3631                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3632                 if (reply == NULL)
3633                         GOTO(out, rc = -EPROTO);
3634
3635                 *((obd_id *)val) = *reply;
3636         out:
3637                 ptlrpc_req_finished(req);
3638                 RETURN(rc);
3639         }
3640         RETURN(-EINVAL);
3641 }
3642
3643 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3644                                           void *aa, int rc)
3645 {
3646         struct llog_ctxt *ctxt;
3647         struct obd_import *imp = req->rq_import;
3648         ENTRY;
3649
3650         if (rc != 0)
3651                 RETURN(rc);
3652
3653         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3654         if (ctxt) {
3655                 if (rc == 0)
3656                         rc = llog_initiator_connect(ctxt);
3657                 else
3658                         CERROR("cannot establish connection for "
3659                                "ctxt %p: %d\n", ctxt, rc);
3660         }
3661
3662         llog_ctxt_put(ctxt);
3663         spin_lock(&imp->imp_lock);
3664         imp->imp_server_timeout = 1;
3665         imp->imp_pingable = 1;
3666         spin_unlock(&imp->imp_lock);
3667         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3668
3669         RETURN(rc);
3670 }
3671
3672 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3673                               void *key, obd_count vallen, void *val,
3674                               struct ptlrpc_request_set *set)
3675 {
3676         struct ptlrpc_request *req;
3677         struct obd_device     *obd = exp->exp_obd;
3678         struct obd_import     *imp = class_exp2cliimp(exp);
3679         char                  *tmp;
3680         int                    rc;
3681         ENTRY;
3682
3683         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3684
3685         if (KEY_IS(KEY_NEXT_ID)) {
3686                 if (vallen != sizeof(obd_id))
3687                         RETURN(-ERANGE);
3688                 if (val == NULL)
3689                         RETURN(-EINVAL);
3690                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3691                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3692                        exp->exp_obd->obd_name,
3693                        obd->u.cli.cl_oscc.oscc_next_id);
3694
3695                 RETURN(0);
3696         }
3697
3698         if (KEY_IS(KEY_UNLINKED)) {
3699                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3700                 spin_lock(&oscc->oscc_lock);
3701                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3702                 spin_unlock(&oscc->oscc_lock);
3703                 RETURN(0);
3704         }
3705
3706         if (KEY_IS(KEY_INIT_RECOV)) {
3707                 if (vallen != sizeof(int))
3708                         RETURN(-EINVAL);
3709                 spin_lock(&imp->imp_lock);
3710                 imp->imp_initial_recov = *(int *)val;
3711                 spin_unlock(&imp->imp_lock);
3712                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3713                        exp->exp_obd->obd_name,
3714                        imp->imp_initial_recov);
3715                 RETURN(0);
3716         }
3717
3718         if (KEY_IS(KEY_CHECKSUM)) {
3719                 if (vallen != sizeof(int))
3720                         RETURN(-EINVAL);
3721                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3722                 RETURN(0);
3723         }
3724
3725         if (KEY_IS(KEY_FLUSH_CTX)) {
3726                 sptlrpc_import_flush_my_ctx(imp);
3727                 RETURN(0);
3728         }
3729
3730         if (!set)
3731                 RETURN(-EINVAL);
3732
3733         /* We pass all other commands directly to OST. Since nobody calls osc
3734            methods directly and everybody is supposed to go through LOV, we
3735            assume lov checked invalid values for us.
3736            The only recognised values so far are evict_by_nid and mds_conn.
3737            Even if something bad goes through, we'd get a -EINVAL from OST
3738            anyway. */
3739
3740
3741         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3742         if (req == NULL)
3743                 RETURN(-ENOMEM);
3744
3745         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3746                              RCL_CLIENT, keylen);
3747         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3748                              RCL_CLIENT, vallen);
3749         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3750         if (rc) {
3751                 ptlrpc_request_free(req);
3752                 RETURN(rc);
3753         }
3754
3755         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3756         memcpy(tmp, key, keylen);
3757         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3758         memcpy(tmp, val, vallen);
3759
3760         if (KEY_IS(KEY_MDS_CONN)) {
3761                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3762
3763                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3764                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3765                 LASSERT(oscc->oscc_oa.o_gr > 0);
3766                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3767         }
3768
3769         ptlrpc_request_set_replen(req);
3770         ptlrpc_set_add_req(set, req);
3771         ptlrpc_check_set(set);
3772
3773         RETURN(0);
3774 }
3775
3776
3777 static struct llog_operations osc_size_repl_logops = {
3778         lop_cancel: llog_obd_repl_cancel
3779 };
3780
3781 static struct llog_operations osc_mds_ost_orig_logops;
3782 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3783                          struct obd_device *tgt, int count,
3784                          struct llog_catid *catid, struct obd_uuid *uuid)
3785 {
3786         int rc;
3787         ENTRY;
3788
3789         LASSERT(olg == &obd->obd_olg);
3790         spin_lock(&obd->obd_dev_lock);
3791         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3792                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3793                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3794                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3795                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3796                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3797         }
3798         spin_unlock(&obd->obd_dev_lock);
3799
3800         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3801                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3802         if (rc) {
3803                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3804                 GOTO (out, rc);
3805         }
3806
3807         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3808                         NULL, &osc_size_repl_logops);
3809         if (rc)
3810                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3811 out:
3812         if (rc) {
3813                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3814                        obd->obd_name, tgt->obd_name, count, catid, rc);
3815                 CERROR("logid "LPX64":0x%x\n",
3816                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3817         }
3818         RETURN(rc);
3819 }
3820
3821 static int osc_llog_finish(struct obd_device *obd, int count)
3822 {
3823         struct llog_ctxt *ctxt;
3824         int rc = 0, rc2 = 0;
3825         ENTRY;
3826
3827         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3828         if (ctxt)
3829                 rc = llog_cleanup(ctxt);
3830
3831         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3832         if (ctxt)
3833                 rc2 = llog_cleanup(ctxt);
3834         if (!rc)
3835                 rc = rc2;
3836
3837         RETURN(rc);
3838 }
3839
3840 static int osc_reconnect(const struct lu_env *env,
3841                          struct obd_export *exp, struct obd_device *obd,
3842                          struct obd_uuid *cluuid,
3843                          struct obd_connect_data *data)
3844 {
3845         struct client_obd *cli = &obd->u.cli;
3846
3847         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3848                 long lost_grant;
3849
3850                 client_obd_list_lock(&cli->cl_loi_list_lock);
3851                 data->ocd_grant = cli->cl_avail_grant ?:
3852                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3853                 lost_grant = cli->cl_lost_grant;
3854                 cli->cl_lost_grant = 0;
3855                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3856
3857                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3858                        "cl_lost_grant: %ld\n", data->ocd_grant,
3859                        cli->cl_avail_grant, lost_grant);
3860                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3861                        " ocd_grant: %d\n", data->ocd_connect_flags,
3862                        data->ocd_version, data->ocd_grant);
3863         }
3864
3865         RETURN(0);
3866 }
3867
3868 static int osc_disconnect(struct obd_export *exp)
3869 {
3870         struct obd_device *obd = class_exp2obd(exp);
3871         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3872         int rc;
3873
3874         if (obd->u.cli.cl_conn_count == 1)
3875                 /* flush any remaining cancel messages out to the target */
3876                 llog_sync(ctxt, exp);
3877
3878         llog_ctxt_put(ctxt);
3879
3880         rc = client_disconnect_export(exp);
3881         return rc;
3882 }
3883
3884 static int osc_import_event(struct obd_device *obd,
3885                             struct obd_import *imp,
3886                             enum obd_import_event event)
3887 {
3888         struct client_obd *cli;
3889         int rc = 0;
3890
3891         ENTRY;
3892         LASSERT(imp->imp_obd == obd);
3893
3894         switch (event) {
3895         case IMP_EVENT_DISCON: {
3896                 /* Only do this on the MDS OSC's */
3897                 if (imp->imp_server_timeout) {
3898                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3899
3900                         spin_lock(&oscc->oscc_lock);
3901                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3902                         spin_unlock(&oscc->oscc_lock);
3903                 }
3904                 cli = &obd->u.cli;
3905                 client_obd_list_lock(&cli->cl_loi_list_lock);
3906                 cli->cl_avail_grant = 0;
3907                 cli->cl_lost_grant = 0;
3908                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3909                 break;
3910         }
3911         case IMP_EVENT_INACTIVE: {
3912                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3913                 break;
3914         }
3915         case IMP_EVENT_INVALIDATE: {
3916                 struct ldlm_namespace *ns = obd->obd_namespace;
3917
3918                 /* Reset grants */
3919                 cli = &obd->u.cli;
3920                 client_obd_list_lock(&cli->cl_loi_list_lock);
3921                 /* all pages go to failing rpcs due to the invalid import */
3922                 osc_check_rpcs(cli);
3923                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3924
3925                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3926
3927                 break;
3928         }
3929         case IMP_EVENT_ACTIVE: {
3930                 /* Only do this on the MDS OSC's */
3931                 if (imp->imp_server_timeout) {
3932                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3933
3934                         spin_lock(&oscc->oscc_lock);
3935                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3936                         spin_unlock(&oscc->oscc_lock);
3937                 }
3938                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3939                 break;
3940         }
3941         case IMP_EVENT_OCD: {
3942                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3943
3944                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3945                         osc_init_grant(&obd->u.cli, ocd);
3946
3947                 /* See bug 7198 */
3948                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3949                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3950
3951                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3952                 break;
3953         }
3954         default:
3955                 CERROR("Unknown import event %d\n", event);
3956                 LBUG();
3957         }
3958         RETURN(rc);
3959 }
3960
3961 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3962 {
3963         int rc;
3964         ENTRY;
3965
3966         ENTRY;
3967         rc = ptlrpcd_addref();
3968         if (rc)
3969                 RETURN(rc);
3970
3971         rc = client_obd_setup(obd, lcfg);
3972         if (rc) {
3973                 ptlrpcd_decref();
3974         } else {
3975                 struct lprocfs_static_vars lvars = { 0 };
3976                 struct client_obd *cli = &obd->u.cli;
3977
3978                 lprocfs_osc_init_vars(&lvars);
3979                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3980                         lproc_osc_attach_seqstat(obd);
3981                         sptlrpc_lprocfs_cliobd_attach(obd);
3982                         ptlrpc_lprocfs_register_obd(obd);
3983                 }
3984
3985                 oscc_init(obd);
3986                 /* We need to allocate a few requests more, because
3987                    brw_interpret tries to create new requests before freeing
3988                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3989                    reserved, but I afraid that might be too much wasted RAM
3990                    in fact, so 2 is just my guess and still should work. */
3991                 cli->cl_import->imp_rq_pool =
3992                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3993                                             OST_MAXREQSIZE,
3994                                             ptlrpc_add_rqs_to_pool);
3995                 cli->cl_cache = cache_create(obd);
3996                 if (!cli->cl_cache) {
3997                         osc_cleanup(obd);
3998                         rc = -ENOMEM;
3999                 }
4000         }
4001
4002         RETURN(rc);
4003 }
4004
4005 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4006 {
4007         int rc = 0;
4008         ENTRY;
4009
4010         switch (stage) {
4011         case OBD_CLEANUP_EARLY: {
4012                 struct obd_import *imp;
4013                 imp = obd->u.cli.cl_import;
4014                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4015                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4016                 ptlrpc_deactivate_import(imp);
4017                 spin_lock(&imp->imp_lock);
4018                 imp->imp_pingable = 0;
4019                 spin_unlock(&imp->imp_lock);
4020                 break;
4021         }
4022         case OBD_CLEANUP_EXPORTS: {
4023                 /* If we set up but never connected, the
4024                    client import will not have been cleaned. */
4025                 if (obd->u.cli.cl_import) {
4026                         struct obd_import *imp;
4027                         imp = obd->u.cli.cl_import;
4028                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4029                                obd->obd_name);
4030                         ptlrpc_invalidate_import(imp);
4031                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
4032                         class_destroy_import(imp);
4033                         obd->u.cli.cl_import = NULL;
4034                 }
4035                 rc = obd_llog_finish(obd, 0);
4036                 if (rc != 0)
4037                         CERROR("failed to cleanup llogging subsystems\n");
4038                 break;
4039                 }
4040         }
4041         RETURN(rc);
4042 }
4043
4044 int osc_cleanup(struct obd_device *obd)
4045 {
4046         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4047         int rc;
4048
4049         ENTRY;
4050         ptlrpc_lprocfs_unregister_obd(obd);
4051         lprocfs_obd_cleanup(obd);
4052
4053         spin_lock(&oscc->oscc_lock);
4054         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4055         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4056         spin_unlock(&oscc->oscc_lock);
4057
4058         /* free memory of osc quota cache */
4059         lquota_cleanup(quota_interface, obd);
4060
4061         cache_destroy(obd->u.cli.cl_cache);
4062         rc = client_obd_cleanup(obd);
4063
4064         ptlrpcd_decref();
4065         RETURN(rc);
4066 }
4067
4068 static int osc_register_page_removal_cb(struct obd_export *exp,
4069                                         obd_page_removal_cb_t func,
4070                                         obd_pin_extent_cb pin_cb)
4071 {
4072         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4073                                            pin_cb);
4074 }
4075
4076 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4077                                           obd_page_removal_cb_t func)
4078 {
4079         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4080 }
4081
4082 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4083                                        obd_lock_cancel_cb cb)
4084 {
4085         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4086
4087         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4088         return 0;
4089 }
4090
4091 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4092                                          obd_lock_cancel_cb cb)
4093 {
4094         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4095                 CERROR("Unregistering cancel cb %p, while only %p was "
4096                        "registered\n", cb,
4097                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4098                 RETURN(-EINVAL);
4099         }
4100
4101         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4102         return 0;
4103 }
4104
4105 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4106 {
4107         struct lustre_cfg *lcfg = buf;
4108         struct lprocfs_static_vars lvars = { 0 };
4109         int rc = 0;
4110
4111         lprocfs_osc_init_vars(&lvars);
4112
4113         switch (lcfg->lcfg_command) {
4114         case LCFG_SPTLRPC_CONF:
4115                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4116                 break;
4117         default:
4118                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4119                                               lcfg, obd);
4120                 break;
4121         }
4122
4123         return(rc);
4124 }
4125
4126 struct obd_ops osc_obd_ops = {
4127         .o_owner                = THIS_MODULE,
4128         .o_setup                = osc_setup,
4129         .o_precleanup           = osc_precleanup,
4130         .o_cleanup              = osc_cleanup,
4131         .o_add_conn             = client_import_add_conn,
4132         .o_del_conn             = client_import_del_conn,
4133         .o_connect              = client_connect_import,
4134         .o_reconnect            = osc_reconnect,
4135         .o_disconnect           = osc_disconnect,
4136         .o_statfs               = osc_statfs,
4137         .o_statfs_async         = osc_statfs_async,
4138         .o_packmd               = osc_packmd,
4139         .o_unpackmd             = osc_unpackmd,
4140         .o_precreate            = osc_precreate,
4141         .o_create               = osc_create,
4142         .o_destroy              = osc_destroy,
4143         .o_getattr              = osc_getattr,
4144         .o_getattr_async        = osc_getattr_async,
4145         .o_setattr              = osc_setattr,
4146         .o_setattr_async        = osc_setattr_async,
4147         .o_brw                  = osc_brw,
4148         .o_brw_async            = osc_brw_async,
4149         .o_prep_async_page      = osc_prep_async_page,
4150         .o_reget_short_lock     = osc_reget_short_lock,
4151         .o_release_short_lock   = osc_release_short_lock,
4152         .o_queue_async_io       = osc_queue_async_io,
4153         .o_set_async_flags      = osc_set_async_flags,
4154         .o_queue_group_io       = osc_queue_group_io,
4155         .o_trigger_group_io     = osc_trigger_group_io,
4156         .o_teardown_async_page  = osc_teardown_async_page,
4157         .o_punch                = osc_punch,
4158         .o_sync                 = osc_sync,
4159         .o_enqueue              = osc_enqueue,
4160         .o_match                = osc_match,
4161         .o_change_cbdata        = osc_change_cbdata,
4162         .o_cancel               = osc_cancel,
4163         .o_cancel_unused        = osc_cancel_unused,
4164         .o_join_lru             = osc_join_lru,
4165         .o_iocontrol            = osc_iocontrol,
4166         .o_get_info             = osc_get_info,
4167         .o_set_info_async       = osc_set_info_async,
4168         .o_import_event         = osc_import_event,
4169         .o_llog_init            = osc_llog_init,
4170         .o_llog_finish          = osc_llog_finish,
4171         .o_process_config       = osc_process_config,
4172         .o_register_page_removal_cb = osc_register_page_removal_cb,
4173         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4174         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4175         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4176 };
4177 int __init osc_init(void)
4178 {
4179         struct lprocfs_static_vars lvars = { 0 };
4180         int rc;
4181         ENTRY;
4182
4183         lprocfs_osc_init_vars(&lvars);
4184
4185         request_module("lquota");
4186         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4187         lquota_init(quota_interface);
4188         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4189
4190         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4191                                  LUSTRE_OSC_NAME, NULL);
4192         if (rc) {
4193                 if (quota_interface)
4194                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4195                 RETURN(rc);
4196         }
4197
4198         RETURN(rc);
4199 }
4200
4201 #ifdef __KERNEL__
4202 static void /*__exit*/ osc_exit(void)
4203 {
4204         lquota_exit(quota_interface);
4205         if (quota_interface)
4206                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4207
4208         class_unregister_type(LUSTRE_OSC_NAME);
4209 }
4210
4211 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4212 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4213 MODULE_LICENSE("GPL");
4214
4215 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4216 #endif