Whamcloud - gitweb
Revert "b=22176 Add .sync_fs super block handler"
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568                     struct lov_stripe_md *md, obd_size start, obd_size end,
569                     void *capa)
570 {
571         struct ptlrpc_request *req;
572         struct ost_body       *body;
573         int                    rc;
574         ENTRY;
575
576         if (!oa) {
577                 CDEBUG(D_INFO, "oa NULL\n");
578                 RETURN(-EINVAL);
579         }
580
581         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
582         if (req == NULL)
583                 RETURN(-ENOMEM);
584
585         osc_set_capa_size(req, &RMF_CAPA1, capa);
586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
587         if (rc) {
588                 ptlrpc_request_free(req);
589                 RETURN(rc);
590         }
591
592         /* overload the size and blocks fields in the oa with start/end */
593         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
594         LASSERT(body);
595         lustre_set_wire_obdo(&body->oa, oa);
596         body->oa.o_size = start;
597         body->oa.o_blocks = end;
598         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599         osc_pack_capa(req, body, capa);
600
601         ptlrpc_request_set_replen(req);
602
603         rc = ptlrpc_queue_wait(req);
604         if (rc)
605                 GOTO(out, rc);
606
607         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
608         if (body == NULL)
609                 GOTO(out, rc = -EPROTO);
610
611         lustre_get_wire_obdo(oa, &body->oa);
612
613         EXIT;
614  out:
615         ptlrpc_req_finished(req);
616         return rc;
617 }
618
619 /* Find and cancel locally locks matched by @mode in the resource found by
620  * @objid. Found locks are added into @cancel list. Returns the amount of
621  * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
623                                    cfs_list_t *cancels,
624                                    ldlm_mode_t mode, int lock_flags)
625 {
626         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627         struct ldlm_res_id res_id;
628         struct ldlm_resource *res;
629         int count;
630         ENTRY;
631
632         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         LDLM_RESOURCE_ADDREF(res);
638         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639                                            lock_flags, 0, NULL);
640         LDLM_RESOURCE_DELREF(res);
641         ldlm_resource_putref(res);
642         RETURN(count);
643 }
644
645 static int osc_destroy_interpret(const struct lu_env *env,
646                                  struct ptlrpc_request *req, void *data,
647                                  int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650
651         cfs_atomic_dec(&cli->cl_destroy_in_flight);
652         cfs_waitq_signal(&cli->cl_destroy_waitq);
653         return 0;
654 }
655
656 static int osc_can_send_destroy(struct client_obd *cli)
657 {
658         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659             cli->cl_max_rpcs_in_flight) {
660                 /* The destroy request can be sent */
661                 return 1;
662         }
663         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664             cli->cl_max_rpcs_in_flight) {
665                 /*
666                  * The counter has been modified between the two atomic
667                  * operations.
668                  */
669                 cfs_waitq_signal(&cli->cl_destroy_waitq);
670         }
671         return 0;
672 }
673
674 /* Destroy requests can be async always on the client, and we don't even really
675  * care about the return code since the client cannot do anything at all about
676  * a destroy failure.
677  * When the MDS is unlinking a filename, it saves the file objects into a
678  * recovery llog, and these object records are cancelled when the OST reports
679  * they were destroyed and sync'd to disk (i.e. transaction committed).
680  * If the client dies, or the OST is down when the object should be destroyed,
681  * the records are not cancelled, and when the OST reconnects to the MDS next,
682  * it will retrieve the llog unlink logs and then sends the log cancellation
683  * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
686                        struct obd_export *md_export, void *capa)
687 {
688         struct client_obd     *cli = &exp->exp_obd->u.cli;
689         struct ptlrpc_request *req;
690         struct ost_body       *body;
691         CFS_LIST_HEAD(cancels);
692         int rc, count;
693         ENTRY;
694
695         if (!oa) {
696                 CDEBUG(D_INFO, "oa NULL\n");
697                 RETURN(-EINVAL);
698         }
699
700         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701                                         LDLM_FL_DISCARD_DATA);
702
703         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
704         if (req == NULL) {
705                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
706                 RETURN(-ENOMEM);
707         }
708
709         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
711                                0, &cancels, count);
712         if (rc) {
713                 ptlrpc_request_free(req);
714                 RETURN(rc);
715         }
716
717         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718         ptlrpc_at_set_req_timeout(req);
719
720         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721                 oa->o_lcookie = *oti->oti_logcookies;
722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
723         LASSERT(body);
724         lustre_set_wire_obdo(&body->oa, oa);
725
726         osc_pack_capa(req, body, (struct obd_capa *)capa);
727         ptlrpc_request_set_replen(req);
728
729         /* don't throttle destroy RPCs for the MDT */
730         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731                 req->rq_interpret_reply = osc_destroy_interpret;
732                 if (!osc_can_send_destroy(cli)) {
733                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
734                                                           NULL);
735
736                         /*
737                          * Wait until the number of on-going destroy RPCs drops
738                          * under max_rpc_in_flight
739                          */
740                         l_wait_event_exclusive(cli->cl_destroy_waitq,
741                                                osc_can_send_destroy(cli), &lwi);
742                 }
743         }
744
745         /* Do not wait for response */
746         ptlrpcd_add_req(req, PSCOPE_OTHER);
747         RETURN(0);
748 }
749
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
751                                 long writing_bytes)
752 {
753         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
754
755         LASSERT(!(oa->o_valid & bits));
756
757         oa->o_valid |= bits;
758         client_obd_list_lock(&cli->cl_loi_list_lock);
759         oa->o_dirty = cli->cl_dirty;
760         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
763                 oa->o_undirty = 0;
764         } else if (cfs_atomic_read(&obd_dirty_pages) -
765                    cfs_atomic_read(&obd_dirty_transit_pages) >
766                    obd_max_dirty_pages + 1){
767                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768                  * not covered by a lock thus they may safely race and trip
769                  * this CERROR() unless we add in a small fudge factor (+1). */
770                 CERROR("dirty %d - %d > system dirty_max %d\n",
771                        cfs_atomic_read(&obd_dirty_pages),
772                        cfs_atomic_read(&obd_dirty_transit_pages),
773                        obd_max_dirty_pages);
774                 oa->o_undirty = 0;
775         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776                 CERROR("dirty %lu - dirty_max %lu too big???\n",
777                        cli->cl_dirty, cli->cl_dirty_max);
778                 oa->o_undirty = 0;
779         } else {
780                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781                                 (cli->cl_max_rpcs_in_flight + 1);
782                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
783         }
784         oa->o_grant = cli->cl_avail_grant;
785         oa->o_dropped = cli->cl_lost_grant;
786         cli->cl_lost_grant = 0;
787         client_obd_list_unlock(&cli->cl_loi_list_lock);
788         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
790
791 }
792
793 static void osc_update_next_shrink(struct client_obd *cli)
794 {
795         cli->cl_next_shrink_grant =
796                 cfs_time_shift(cli->cl_grant_shrink_interval);
797         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798                cli->cl_next_shrink_grant);
799 }
800
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803                                     struct brw_page *pga)
804 {
805         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807         cfs_atomic_inc(&obd_dirty_pages);
808         cli->cl_dirty += CFS_PAGE_SIZE;
809         cli->cl_avail_grant -= CFS_PAGE_SIZE;
810         pga->flag |= OBD_BRW_FROM_GRANT;
811         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812                CFS_PAGE_SIZE, pga, pga->pg);
813         LASSERT(cli->cl_avail_grant >= 0);
814         osc_update_next_shrink(cli);
815 }
816
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818  * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga, int sent)
821 {
822         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
823         ENTRY;
824
825         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827                 EXIT;
828                 return;
829         }
830
831         pga->flag &= ~OBD_BRW_FROM_GRANT;
832         cfs_atomic_dec(&obd_dirty_pages);
833         cli->cl_dirty -= CFS_PAGE_SIZE;
834         if (pga->flag & OBD_BRW_NOCACHE) {
835                 pga->flag &= ~OBD_BRW_NOCACHE;
836                 cfs_atomic_dec(&obd_dirty_transit_pages);
837                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
838         }
839         if (!sent) {
840                 cli->cl_lost_grant += CFS_PAGE_SIZE;
841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844                 /* For short writes we shouldn't count parts of pages that
845                  * span a whole block on the OST side, or our accounting goes
846                  * wrong.  Should match the code in filter_grant_check. */
847                 int offset = pga->off & ~CFS_PAGE_MASK;
848                 int count = pga->count + (offset & (blocksize - 1));
849                 int end = (offset + pga->count) & (blocksize - 1);
850                 if (end)
851                         count += blocksize - end;
852
853                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856                        cli->cl_avail_grant, cli->cl_dirty);
857         }
858
859         EXIT;
860 }
861
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
863 {
864         return cli->cl_r_in_flight + cli->cl_w_in_flight;
865 }
866
867 /* caller must hold loi_list_lock */
868 void osc_wake_cache_waiters(struct client_obd *cli)
869 {
870         cfs_list_t *l, *tmp;
871         struct osc_cache_waiter *ocw;
872
873         ENTRY;
874         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
875                 /* if we can't dirty more, we must wait until some is written */
876                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
877                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
878                     obd_max_dirty_pages)) {
879                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
880                                "osc max %ld, sys max %d\n", cli->cl_dirty,
881                                cli->cl_dirty_max, obd_max_dirty_pages);
882                         return;
883                 }
884
885                 /* if still dirty cache but no grant wait for pending RPCs that
886                  * may yet return us some grant before doing sync writes */
887                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
888                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
889                                cli->cl_w_in_flight);
890                         return;
891                 }
892
893                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
894                 cfs_list_del_init(&ocw->ocw_entry);
895                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
896                         /* no more RPCs in flight to return grant, do sync IO */
897                         ocw->ocw_rc = -EDQUOT;
898                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
899                 } else {
900                         osc_consume_write_grant(cli,
901                                                 &ocw->ocw_oap->oap_brw_page);
902                 }
903
904                 cfs_waitq_signal(&ocw->ocw_waitq);
905         }
906
907         EXIT;
908 }
909
910 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
911 {
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         cli->cl_avail_grant += grant;
914         client_obd_list_unlock(&cli->cl_loi_list_lock);
915 }
916
917 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
918 {
919         if (body->oa.o_valid & OBD_MD_FLGRANT) {
920                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
921                 __osc_update_grant(cli, body->oa.o_grant);
922         }
923 }
924
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926                               void *key, obd_count vallen, void *val,
927                               struct ptlrpc_request_set *set);
928
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930                                       struct ptlrpc_request *req,
931                                       void *aa, int rc)
932 {
933         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935         struct ost_body *body;
936
937         if (rc != 0) {
938                 __osc_update_grant(cli, oa->o_grant);
939                 GOTO(out, rc);
940         }
941
942         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
943         LASSERT(body);
944         osc_update_grant(cli, body);
945 out:
946         OBDO_FREE(oa);
947         return rc;
948 }
949
950 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
951 {
952         client_obd_list_lock(&cli->cl_loi_list_lock);
953         oa->o_grant = cli->cl_avail_grant / 4;
954         cli->cl_avail_grant -= oa->o_grant;
955         client_obd_list_unlock(&cli->cl_loi_list_lock);
956         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
957                 oa->o_valid |= OBD_MD_FLFLAGS;
958                 oa->o_flags = 0;
959         }
960         oa->o_flags |= OBD_FL_SHRINK_GRANT;
961         osc_update_next_shrink(cli);
962 }
963
964 /* Shrink the current grant, either from some large amount to enough for a
965  * full set of in-flight RPCs, or if we have already shrunk to that limit
966  * then to enough for a single RPC.  This avoids keeping more grant than
967  * needed, and avoids shrinking the grant piecemeal. */
968 static int osc_shrink_grant(struct client_obd *cli)
969 {
970         long target = (cli->cl_max_rpcs_in_flight + 1) *
971                       cli->cl_max_pages_per_rpc;
972
973         client_obd_list_lock(&cli->cl_loi_list_lock);
974         if (cli->cl_avail_grant <= target)
975                 target = cli->cl_max_pages_per_rpc;
976         client_obd_list_unlock(&cli->cl_loi_list_lock);
977
978         return osc_shrink_grant_to_target(cli, target);
979 }
980
981 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
982 {
983         int    rc = 0;
984         struct ost_body     *body;
985         ENTRY;
986
987         client_obd_list_lock(&cli->cl_loi_list_lock);
988         /* Don't shrink if we are already above or below the desired limit
989          * We don't want to shrink below a single RPC, as that will negatively
990          * impact block allocation and long-term performance. */
991         if (target < cli->cl_max_pages_per_rpc)
992                 target = cli->cl_max_pages_per_rpc;
993
994         if (target >= cli->cl_avail_grant) {
995                 client_obd_list_unlock(&cli->cl_loi_list_lock);
996                 RETURN(0);
997         }
998         client_obd_list_unlock(&cli->cl_loi_list_lock);
999
1000         OBD_ALLOC_PTR(body);
1001         if (!body)
1002                 RETURN(-ENOMEM);
1003
1004         osc_announce_cached(cli, &body->oa, 0);
1005
1006         client_obd_list_lock(&cli->cl_loi_list_lock);
1007         body->oa.o_grant = cli->cl_avail_grant - target;
1008         cli->cl_avail_grant = target;
1009         client_obd_list_unlock(&cli->cl_loi_list_lock);
1010         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1011                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1012                 body->oa.o_flags = 0;
1013         }
1014         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1015         osc_update_next_shrink(cli);
1016
1017         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1018                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1019                                 sizeof(*body), body, NULL);
1020         if (rc != 0)
1021                 __osc_update_grant(cli, body->oa.o_grant);
1022         OBD_FREE_PTR(body);
1023         RETURN(rc);
1024 }
1025
1026 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1027 static int osc_should_shrink_grant(struct client_obd *client)
1028 {
1029         cfs_time_t time = cfs_time_current();
1030         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1031
1032         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1033              OBD_CONNECT_GRANT_SHRINK) == 0)
1034                 return 0;
1035
1036         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1037                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1038                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1039                         return 1;
1040                 else
1041                         osc_update_next_shrink(client);
1042         }
1043         return 0;
1044 }
1045
1046 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1047 {
1048         struct client_obd *client;
1049
1050         cfs_list_for_each_entry(client, &item->ti_obd_list,
1051                                 cl_grant_shrink_list) {
1052                 if (osc_should_shrink_grant(client))
1053                         osc_shrink_grant(client);
1054         }
1055         return 0;
1056 }
1057
1058 static int osc_add_shrink_grant(struct client_obd *client)
1059 {
1060         int rc;
1061
1062         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1063                                        TIMEOUT_GRANT,
1064                                        osc_grant_shrink_grant_cb, NULL,
1065                                        &client->cl_grant_shrink_list);
1066         if (rc) {
1067                 CERROR("add grant client %s error %d\n",
1068                         client->cl_import->imp_obd->obd_name, rc);
1069                 return rc;
1070         }
1071         CDEBUG(D_CACHE, "add grant client %s \n",
1072                client->cl_import->imp_obd->obd_name);
1073         osc_update_next_shrink(client);
1074         return 0;
1075 }
1076
1077 static int osc_del_shrink_grant(struct client_obd *client)
1078 {
1079         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1080                                          TIMEOUT_GRANT);
1081 }
1082
1083 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1084 {
1085         /*
1086          * ocd_grant is the total grant amount we're expect to hold: if we've
1087          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1088          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1089          *
1090          * race is tolerable here: if we're evicted, but imp_state already
1091          * left EVICTED state, then cl_dirty must be 0 already.
1092          */
1093         client_obd_list_lock(&cli->cl_loi_list_lock);
1094         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1095                 cli->cl_avail_grant = ocd->ocd_grant;
1096         else
1097                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1098
1099         if (cli->cl_avail_grant < 0) {
1100                 CWARN("%s: available grant < 0, the OSS is probably not running"
1101                       " with patch from bug20278 (%ld) \n",
1102                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1103                 /* workaround for 1.6 servers which do not have 
1104                  * the patch from bug20278 */
1105                 cli->cl_avail_grant = ocd->ocd_grant;
1106         }
1107
1108         client_obd_list_unlock(&cli->cl_loi_list_lock);
1109
1110         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1111                cli->cl_import->imp_obd->obd_name,
1112                cli->cl_avail_grant, cli->cl_lost_grant);
1113
1114         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1115             cfs_list_empty(&cli->cl_grant_shrink_list))
1116                 osc_add_shrink_grant(cli);
1117 }
1118
1119 /* We assume that the reason this OSC got a short read is because it read
1120  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1121  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1122  * this stripe never got written at or beyond this stripe offset yet. */
1123 static void handle_short_read(int nob_read, obd_count page_count,
1124                               struct brw_page **pga)
1125 {
1126         char *ptr;
1127         int i = 0;
1128
1129         /* skip bytes read OK */
1130         while (nob_read > 0) {
1131                 LASSERT (page_count > 0);
1132
1133                 if (pga[i]->count > nob_read) {
1134                         /* EOF inside this page */
1135                         ptr = cfs_kmap(pga[i]->pg) +
1136                                 (pga[i]->off & ~CFS_PAGE_MASK);
1137                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1138                         cfs_kunmap(pga[i]->pg);
1139                         page_count--;
1140                         i++;
1141                         break;
1142                 }
1143
1144                 nob_read -= pga[i]->count;
1145                 page_count--;
1146                 i++;
1147         }
1148
1149         /* zero remaining pages */
1150         while (page_count-- > 0) {
1151                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1152                 memset(ptr, 0, pga[i]->count);
1153                 cfs_kunmap(pga[i]->pg);
1154                 i++;
1155         }
1156 }
1157
1158 static int check_write_rcs(struct ptlrpc_request *req,
1159                            int requested_nob, int niocount,
1160                            obd_count page_count, struct brw_page **pga)
1161 {
1162         int     i;
1163         __u32   *remote_rcs;
1164
1165         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1166                                                   sizeof(*remote_rcs) *
1167                                                   niocount);
1168         if (remote_rcs == NULL) {
1169                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1170                 return(-EPROTO);
1171         }
1172
1173         /* return error if any niobuf was in error */
1174         for (i = 0; i < niocount; i++) {
1175                 if (remote_rcs[i] < 0)
1176                         return(remote_rcs[i]);
1177
1178                 if (remote_rcs[i] != 0) {
1179                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1180                                 i, remote_rcs[i], req);
1181                         return(-EPROTO);
1182                 }
1183         }
1184
1185         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1186                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1187                        req->rq_bulk->bd_nob_transferred, requested_nob);
1188                 return(-EPROTO);
1189         }
1190
1191         return (0);
1192 }
1193
1194 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1195 {
1196         if (p1->flag != p2->flag) {
1197                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1198                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1199
1200                 /* warn if we try to combine flags that we don't know to be
1201                  * safe to combine */
1202                 if ((p1->flag & mask) != (p2->flag & mask))
1203                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1204                                "same brw?\n", p1->flag, p2->flag);
1205                 return 0;
1206         }
1207
1208         return (p1->off + p1->count == p2->off);
1209 }
1210
1211 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1212                                    struct brw_page **pga, int opc,
1213                                    cksum_type_t cksum_type)
1214 {
1215         __u32 cksum;
1216         int i = 0;
1217
1218         LASSERT (pg_count > 0);
1219         cksum = init_checksum(cksum_type);
1220         while (nob > 0 && pg_count > 0) {
1221                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1222                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1223                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1224
1225                 /* corrupt the data before we compute the checksum, to
1226                  * simulate an OST->client data error */
1227                 if (i == 0 && opc == OST_READ &&
1228                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1229                         memcpy(ptr + off, "bad1", min(4, nob));
1230                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1231                 cfs_kunmap(pga[i]->pg);
1232                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1233                                off, cksum);
1234
1235                 nob -= pga[i]->count;
1236                 pg_count--;
1237                 i++;
1238         }
1239         /* For sending we only compute the wrong checksum instead
1240          * of corrupting the data so it is still correct on a redo */
1241         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1242                 cksum++;
1243
1244         return cksum;
1245 }
1246
1247 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1248                                 struct lov_stripe_md *lsm, obd_count page_count,
1249                                 struct brw_page **pga,
1250                                 struct ptlrpc_request **reqp,
1251                                 struct obd_capa *ocapa, int reserve)
1252 {
1253         struct ptlrpc_request   *req;
1254         struct ptlrpc_bulk_desc *desc;
1255         struct ost_body         *body;
1256         struct obd_ioobj        *ioobj;
1257         struct niobuf_remote    *niobuf;
1258         int niocount, i, requested_nob, opc, rc;
1259         struct osc_brw_async_args *aa;
1260         struct req_capsule      *pill;
1261         struct brw_page *pg_prev;
1262
1263         ENTRY;
1264         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1265                 RETURN(-ENOMEM); /* Recoverable */
1266         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1267                 RETURN(-EINVAL); /* Fatal */
1268
1269         if ((cmd & OBD_BRW_WRITE) != 0) {
1270                 opc = OST_WRITE;
1271                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1272                                                 cli->cl_import->imp_rq_pool,
1273                                                 &RQF_OST_BRW_WRITE);
1274         } else {
1275                 opc = OST_READ;
1276                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1277         }
1278         if (req == NULL)
1279                 RETURN(-ENOMEM);
1280
1281         for (niocount = i = 1; i < page_count; i++) {
1282                 if (!can_merge_pages(pga[i - 1], pga[i]))
1283                         niocount++;
1284         }
1285
1286         pill = &req->rq_pill;
1287         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1288                              sizeof(*ioobj));
1289         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1290                              niocount * sizeof(*niobuf));
1291         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1292
1293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1294         if (rc) {
1295                 ptlrpc_request_free(req);
1296                 RETURN(rc);
1297         }
1298         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1299         ptlrpc_at_set_req_timeout(req);
1300
1301         if (opc == OST_WRITE)
1302                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1303                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1304         else
1305                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1306                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1307
1308         if (desc == NULL)
1309                 GOTO(out, rc = -ENOMEM);
1310         /* NB request now owns desc and will free it when it gets freed */
1311
1312         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1313         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1314         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1315         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1316
1317         lustre_set_wire_obdo(&body->oa, oa);
1318
1319         obdo_to_ioobj(oa, ioobj);
1320         ioobj->ioo_bufcnt = niocount;
1321         osc_pack_capa(req, body, ocapa);
1322         LASSERT (page_count > 0);
1323         pg_prev = pga[0];
1324         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1325                 struct brw_page *pg = pga[i];
1326
1327                 LASSERT(pg->count > 0);
1328                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1329                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1330                          pg->off, pg->count);
1331 #ifdef __linux__
1332                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1333                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1334                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1335                          i, page_count,
1336                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1337                          pg_prev->pg, page_private(pg_prev->pg),
1338                          pg_prev->pg->index, pg_prev->off);
1339 #else
1340                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1341                          "i %d p_c %u\n", i, page_count);
1342 #endif
1343                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1344                         (pg->flag & OBD_BRW_SRVLOCK));
1345
1346                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1347                                       pg->count);
1348                 requested_nob += pg->count;
1349
1350                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1351                         niobuf--;
1352                         niobuf->len += pg->count;
1353                 } else {
1354                         niobuf->offset = pg->off;
1355                         niobuf->len    = pg->count;
1356                         niobuf->flags  = pg->flag;
1357                 }
1358                 pg_prev = pg;
1359         }
1360
1361         LASSERTF((void *)(niobuf - niocount) ==
1362                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1363                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1364                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1365
1366         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1367         if (osc_should_shrink_grant(cli))
1368                 osc_shrink_grant_local(cli, &body->oa);
1369
1370         /* size[REQ_REC_OFF] still sizeof (*body) */
1371         if (opc == OST_WRITE) {
1372                 if (unlikely(cli->cl_checksum) &&
1373                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1374                         /* store cl_cksum_type in a local variable since
1375                          * it can be changed via lprocfs */
1376                         cksum_type_t cksum_type = cli->cl_cksum_type;
1377
1378                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1379                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1380                                 body->oa.o_flags = 0;
1381                         }
1382                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1383                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1384                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1385                                                              page_count, pga,
1386                                                              OST_WRITE,
1387                                                              cksum_type);
1388                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1389                                body->oa.o_cksum);
1390                         /* save this in 'oa', too, for later checking */
1391                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1392                         oa->o_flags |= cksum_type_pack(cksum_type);
1393                 } else {
1394                         /* clear out the checksum flag, in case this is a
1395                          * resend but cl_checksum is no longer set. b=11238 */
1396                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1397                 }
1398                 oa->o_cksum = body->oa.o_cksum;
1399                 /* 1 RC per niobuf */
1400                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1401                                      sizeof(__u32) * niocount);
1402         } else {
1403                 if (unlikely(cli->cl_checksum) &&
1404                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1405                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1406                                 body->oa.o_flags = 0;
1407                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1408                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1409                 }
1410         }
1411         ptlrpc_request_set_replen(req);
1412
1413         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1414         aa = ptlrpc_req_async_args(req);
1415         aa->aa_oa = oa;
1416         aa->aa_requested_nob = requested_nob;
1417         aa->aa_nio_count = niocount;
1418         aa->aa_page_count = page_count;
1419         aa->aa_resends = 0;
1420         aa->aa_ppga = pga;
1421         aa->aa_cli = cli;
1422         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1423         if (ocapa && reserve)
1424                 aa->aa_ocapa = capa_get(ocapa);
1425
1426         *reqp = req;
1427         RETURN(0);
1428
1429  out:
1430         ptlrpc_req_finished(req);
1431         RETURN(rc);
1432 }
1433
1434 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1435                                 __u32 client_cksum, __u32 server_cksum, int nob,
1436                                 obd_count page_count, struct brw_page **pga,
1437                                 cksum_type_t client_cksum_type)
1438 {
1439         __u32 new_cksum;
1440         char *msg;
1441         cksum_type_t cksum_type;
1442
1443         if (server_cksum == client_cksum) {
1444                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1445                 return 0;
1446         }
1447
1448         /* If this is mmaped file - it can be changed at any time */
1449         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1450                 return 1;
1451
1452         if (oa->o_valid & OBD_MD_FLFLAGS)
1453                 cksum_type = cksum_type_unpack(oa->o_flags);
1454         else
1455                 cksum_type = OBD_CKSUM_CRC32;
1456
1457         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1458                                       cksum_type);
1459
1460         if (cksum_type != client_cksum_type)
1461                 msg = "the server did not use the checksum type specified in "
1462                       "the original request - likely a protocol problem";
1463         else if (new_cksum == server_cksum)
1464                 msg = "changed on the client after we checksummed it - "
1465                       "likely false positive due to mmap IO (bug 11742)";
1466         else if (new_cksum == client_cksum)
1467                 msg = "changed in transit before arrival at OST";
1468         else
1469                 msg = "changed in transit AND doesn't match the original - "
1470                       "likely false positive due to mmap IO (bug 11742)";
1471
1472         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1473                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1474                            msg, libcfs_nid2str(peer->nid),
1475                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1476                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1477                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1478                            oa->o_id,
1479                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1480                            pga[0]->off,
1481                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1482         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1483                "client csum now %x\n", client_cksum, client_cksum_type,
1484                server_cksum, cksum_type, new_cksum);
1485         return 1;
1486 }
1487
1488 /* Note rc enters this function as number of bytes transferred */
1489 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1490 {
1491         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1492         const lnet_process_id_t *peer =
1493                         &req->rq_import->imp_connection->c_peer;
1494         struct client_obd *cli = aa->aa_cli;
1495         struct ost_body *body;
1496         __u32 client_cksum = 0;
1497         ENTRY;
1498
1499         if (rc < 0 && rc != -EDQUOT) {
1500                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1501                 RETURN(rc);
1502         }
1503
1504         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1505         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1506         if (body == NULL) {
1507                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1508                 RETURN(-EPROTO);
1509         }
1510
1511 #ifdef HAVE_QUOTA_SUPPORT
1512         /* set/clear over quota flag for a uid/gid */
1513         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1514             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1515                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1516
1517                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1518                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1519                        body->oa.o_flags);
1520                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1521                              body->oa.o_flags);
1522         }
1523 #endif
1524
1525         osc_update_grant(cli, body);
1526
1527         if (rc < 0)
1528                 RETURN(rc);
1529
1530         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1531                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1532
1533         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1534                 if (rc > 0) {
1535                         CERROR("Unexpected +ve rc %d\n", rc);
1536                         RETURN(-EPROTO);
1537                 }
1538                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1539
1540                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1541                         RETURN(-EAGAIN);
1542
1543                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1544                     check_write_checksum(&body->oa, peer, client_cksum,
1545                                          body->oa.o_cksum, aa->aa_requested_nob,
1546                                          aa->aa_page_count, aa->aa_ppga,
1547                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1548                         RETURN(-EAGAIN);
1549
1550                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1551                                      aa->aa_page_count, aa->aa_ppga);
1552                 GOTO(out, rc);
1553         }
1554
1555         /* The rest of this function executes only for OST_READs */
1556
1557         /* if unwrap_bulk failed, return -EAGAIN to retry */
1558         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1559         if (rc < 0)
1560                 GOTO(out, rc = -EAGAIN);
1561
1562         if (rc > aa->aa_requested_nob) {
1563                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1564                        aa->aa_requested_nob);
1565                 RETURN(-EPROTO);
1566         }
1567
1568         if (rc != req->rq_bulk->bd_nob_transferred) {
1569                 CERROR ("Unexpected rc %d (%d transferred)\n",
1570                         rc, req->rq_bulk->bd_nob_transferred);
1571                 return (-EPROTO);
1572         }
1573
1574         if (rc < aa->aa_requested_nob)
1575                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1576
1577         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1578                 static int cksum_counter;
1579                 __u32      server_cksum = body->oa.o_cksum;
1580                 char      *via;
1581                 char      *router;
1582                 cksum_type_t cksum_type;
1583
1584                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1585                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1586                 else
1587                         cksum_type = OBD_CKSUM_CRC32;
1588                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1589                                                  aa->aa_ppga, OST_READ,
1590                                                  cksum_type);
1591
1592                 if (peer->nid == req->rq_bulk->bd_sender) {
1593                         via = router = "";
1594                 } else {
1595                         via = " via ";
1596                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1597                 }
1598
1599                 if (server_cksum == ~0 && rc > 0) {
1600                         CERROR("Protocol error: server %s set the 'checksum' "
1601                                "bit, but didn't send a checksum.  Not fatal, "
1602                                "but please notify on http://bugzilla.lustre.org/\n",
1603                                libcfs_nid2str(peer->nid));
1604                 } else if (server_cksum != client_cksum) {
1605                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1606                                            "%s%s%s inode "DFID" object "
1607                                            LPU64"/"LPU64" extent "
1608                                            "["LPU64"-"LPU64"]\n",
1609                                            req->rq_import->imp_obd->obd_name,
1610                                            libcfs_nid2str(peer->nid),
1611                                            via, router,
1612                                            body->oa.o_valid & OBD_MD_FLFID ?
1613                                                 body->oa.o_parent_seq : (__u64)0,
1614                                            body->oa.o_valid & OBD_MD_FLFID ?
1615                                                 body->oa.o_parent_oid : 0,
1616                                            body->oa.o_valid & OBD_MD_FLFID ?
1617                                                 body->oa.o_parent_ver : 0,
1618                                            body->oa.o_id,
1619                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1620                                                 body->oa.o_seq : (__u64)0,
1621                                            aa->aa_ppga[0]->off,
1622                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1623                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1624                                                                         1);
1625                         CERROR("client %x, server %x, cksum_type %x\n",
1626                                client_cksum, server_cksum, cksum_type);
1627                         cksum_counter = 0;
1628                         aa->aa_oa->o_cksum = client_cksum;
1629                         rc = -EAGAIN;
1630                 } else {
1631                         cksum_counter++;
1632                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1633                         rc = 0;
1634                 }
1635         } else if (unlikely(client_cksum)) {
1636                 static int cksum_missed;
1637
1638                 cksum_missed++;
1639                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1640                         CERROR("Checksum %u requested from %s but not sent\n",
1641                                cksum_missed, libcfs_nid2str(peer->nid));
1642         } else {
1643                 rc = 0;
1644         }
1645 out:
1646         if (rc >= 0)
1647                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1648
1649         RETURN(rc);
1650 }
1651
1652 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1653                             struct lov_stripe_md *lsm,
1654                             obd_count page_count, struct brw_page **pga,
1655                             struct obd_capa *ocapa)
1656 {
1657         struct ptlrpc_request *req;
1658         int                    rc;
1659         cfs_waitq_t            waitq;
1660         int                    resends = 0;
1661         struct l_wait_info     lwi;
1662
1663         ENTRY;
1664
1665         cfs_waitq_init(&waitq);
1666
1667 restart_bulk:
1668         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1669                                   page_count, pga, &req, ocapa, 0);
1670         if (rc != 0)
1671                 return (rc);
1672
1673         rc = ptlrpc_queue_wait(req);
1674
1675         if (rc == -ETIMEDOUT && req->rq_resend) {
1676                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1677                 ptlrpc_req_finished(req);
1678                 goto restart_bulk;
1679         }
1680
1681         rc = osc_brw_fini_request(req, rc);
1682
1683         ptlrpc_req_finished(req);
1684         if (osc_recoverable_error(rc)) {
1685                 resends++;
1686                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1687                         CERROR("too many resend retries, returning error\n");
1688                         RETURN(-EIO);
1689                 }
1690
1691                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1692                 l_wait_event(waitq, 0, &lwi);
1693
1694                 goto restart_bulk;
1695         }
1696
1697         RETURN (rc);
1698 }
1699
1700 int osc_brw_redo_request(struct ptlrpc_request *request,
1701                          struct osc_brw_async_args *aa)
1702 {
1703         struct ptlrpc_request *new_req;
1704         struct ptlrpc_request_set *set = request->rq_set;
1705         struct osc_brw_async_args *new_aa;
1706         struct osc_async_page *oap;
1707         int rc = 0;
1708         ENTRY;
1709
1710         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1711                 CERROR("too many resent retries, returning error\n");
1712                 RETURN(-EIO);
1713         }
1714
1715         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1716
1717         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1718                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1719                                   aa->aa_cli, aa->aa_oa,
1720                                   NULL /* lsm unused by osc currently */,
1721                                   aa->aa_page_count, aa->aa_ppga,
1722                                   &new_req, aa->aa_ocapa, 0);
1723         if (rc)
1724                 RETURN(rc);
1725
1726         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1727
1728         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1729                 if (oap->oap_request != NULL) {
1730                         LASSERTF(request == oap->oap_request,
1731                                  "request %p != oap_request %p\n",
1732                                  request, oap->oap_request);
1733                         if (oap->oap_interrupted) {
1734                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1735                                 ptlrpc_req_finished(new_req);
1736                                 RETURN(-EINTR);
1737                         }
1738                 }
1739         }
1740         /* New request takes over pga and oaps from old request.
1741          * Note that copying a list_head doesn't work, need to move it... */
1742         aa->aa_resends++;
1743         new_req->rq_interpret_reply = request->rq_interpret_reply;
1744         new_req->rq_async_args = request->rq_async_args;
1745         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1746
1747         new_aa = ptlrpc_req_async_args(new_req);
1748
1749         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1750         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1751         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1752
1753         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1754                 if (oap->oap_request) {
1755                         ptlrpc_req_finished(oap->oap_request);
1756                         oap->oap_request = ptlrpc_request_addref(new_req);
1757                 }
1758         }
1759
1760         new_aa->aa_ocapa = aa->aa_ocapa;
1761         aa->aa_ocapa = NULL;
1762
1763         /* use ptlrpc_set_add_req is safe because interpret functions work
1764          * in check_set context. only one way exist with access to request
1765          * from different thread got -EINTR - this way protected with
1766          * cl_loi_list_lock */
1767         ptlrpc_set_add_req(set, new_req);
1768
1769         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1770
1771         DEBUG_REQ(D_INFO, new_req, "new request");
1772         RETURN(0);
1773 }
1774
1775 /*
1776  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1777  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1778  * fine for our small page arrays and doesn't require allocation.  its an
1779  * insertion sort that swaps elements that are strides apart, shrinking the
1780  * stride down until its '1' and the array is sorted.
1781  */
1782 static void sort_brw_pages(struct brw_page **array, int num)
1783 {
1784         int stride, i, j;
1785         struct brw_page *tmp;
1786
1787         if (num == 1)
1788                 return;
1789         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1790                 ;
1791
1792         do {
1793                 stride /= 3;
1794                 for (i = stride ; i < num ; i++) {
1795                         tmp = array[i];
1796                         j = i;
1797                         while (j >= stride && array[j - stride]->off > tmp->off) {
1798                                 array[j] = array[j - stride];
1799                                 j -= stride;
1800                         }
1801                         array[j] = tmp;
1802                 }
1803         } while (stride > 1);
1804 }
1805
1806 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1807 {
1808         int count = 1;
1809         int offset;
1810         int i = 0;
1811
1812         LASSERT (pages > 0);
1813         offset = pg[i]->off & ~CFS_PAGE_MASK;
1814
1815         for (;;) {
1816                 pages--;
1817                 if (pages == 0)         /* that's all */
1818                         return count;
1819
1820                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1821                         return count;   /* doesn't end on page boundary */
1822
1823                 i++;
1824                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1825                 if (offset != 0)        /* doesn't start on page boundary */
1826                         return count;
1827
1828                 count++;
1829         }
1830 }
1831
1832 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1833 {
1834         struct brw_page **ppga;
1835         int i;
1836
1837         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1838         if (ppga == NULL)
1839                 return NULL;
1840
1841         for (i = 0; i < count; i++)
1842                 ppga[i] = pga + i;
1843         return ppga;
1844 }
1845
1846 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1847 {
1848         LASSERT(ppga != NULL);
1849         OBD_FREE(ppga, sizeof(*ppga) * count);
1850 }
1851
1852 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1853                    obd_count page_count, struct brw_page *pga,
1854                    struct obd_trans_info *oti)
1855 {
1856         struct obdo *saved_oa = NULL;
1857         struct brw_page **ppga, **orig;
1858         struct obd_import *imp = class_exp2cliimp(exp);
1859         struct client_obd *cli;
1860         int rc, page_count_orig;
1861         ENTRY;
1862
1863         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1864         cli = &imp->imp_obd->u.cli;
1865
1866         if (cmd & OBD_BRW_CHECK) {
1867                 /* The caller just wants to know if there's a chance that this
1868                  * I/O can succeed */
1869
1870                 if (imp->imp_invalid)
1871                         RETURN(-EIO);
1872                 RETURN(0);
1873         }
1874
1875         /* test_brw with a failed create can trip this, maybe others. */
1876         LASSERT(cli->cl_max_pages_per_rpc);
1877
1878         rc = 0;
1879
1880         orig = ppga = osc_build_ppga(pga, page_count);
1881         if (ppga == NULL)
1882                 RETURN(-ENOMEM);
1883         page_count_orig = page_count;
1884
1885         sort_brw_pages(ppga, page_count);
1886         while (page_count) {
1887                 obd_count pages_per_brw;
1888
1889                 if (page_count > cli->cl_max_pages_per_rpc)
1890                         pages_per_brw = cli->cl_max_pages_per_rpc;
1891                 else
1892                         pages_per_brw = page_count;
1893
1894                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1895
1896                 if (saved_oa != NULL) {
1897                         /* restore previously saved oa */
1898                         *oinfo->oi_oa = *saved_oa;
1899                 } else if (page_count > pages_per_brw) {
1900                         /* save a copy of oa (brw will clobber it) */
1901                         OBDO_ALLOC(saved_oa);
1902                         if (saved_oa == NULL)
1903                                 GOTO(out, rc = -ENOMEM);
1904                         *saved_oa = *oinfo->oi_oa;
1905                 }
1906
1907                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1908                                       pages_per_brw, ppga, oinfo->oi_capa);
1909
1910                 if (rc != 0)
1911                         break;
1912
1913                 page_count -= pages_per_brw;
1914                 ppga += pages_per_brw;
1915         }
1916
1917 out:
1918         osc_release_ppga(orig, page_count_orig);
1919
1920         if (saved_oa != NULL)
1921                 OBDO_FREE(saved_oa);
1922
1923         RETURN(rc);
1924 }
1925
1926 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1927  * the dirty accounting.  Writeback completes or truncate happens before
1928  * writing starts.  Must be called with the loi lock held. */
1929 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1930                            int sent)
1931 {
1932         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1933 }
1934
1935
1936 /* This maintains the lists of pending pages to read/write for a given object
1937  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1938  * to quickly find objects that are ready to send an RPC. */
1939 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1940                          int cmd)
1941 {
1942         int optimal;
1943         ENTRY;
1944
1945         if (lop->lop_num_pending == 0)
1946                 RETURN(0);
1947
1948         /* if we have an invalid import we want to drain the queued pages
1949          * by forcing them through rpcs that immediately fail and complete
1950          * the pages.  recovery relies on this to empty the queued pages
1951          * before canceling the locks and evicting down the llite pages */
1952         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1953                 RETURN(1);
1954
1955         /* stream rpcs in queue order as long as as there is an urgent page
1956          * queued.  this is our cheap solution for good batching in the case
1957          * where writepage marks some random page in the middle of the file
1958          * as urgent because of, say, memory pressure */
1959         if (!cfs_list_empty(&lop->lop_urgent)) {
1960                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1961                 RETURN(1);
1962         }
1963         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1964         optimal = cli->cl_max_pages_per_rpc;
1965         if (cmd & OBD_BRW_WRITE) {
1966                 /* trigger a write rpc stream as long as there are dirtiers
1967                  * waiting for space.  as they're waiting, they're not going to
1968                  * create more pages to coalesce with what's waiting.. */
1969                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1970                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1971                         RETURN(1);
1972                 }
1973                 /* +16 to avoid triggering rpcs that would want to include pages
1974                  * that are being queued but which can't be made ready until
1975                  * the queuer finishes with the page. this is a wart for
1976                  * llite::commit_write() */
1977                 optimal += 16;
1978         }
1979         if (lop->lop_num_pending >= optimal)
1980                 RETURN(1);
1981
1982         RETURN(0);
1983 }
1984
1985 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1986 {
1987         struct osc_async_page *oap;
1988         ENTRY;
1989
1990         if (cfs_list_empty(&lop->lop_urgent))
1991                 RETURN(0);
1992
1993         oap = cfs_list_entry(lop->lop_urgent.next,
1994                          struct osc_async_page, oap_urgent_item);
1995
1996         if (oap->oap_async_flags & ASYNC_HP) {
1997                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1998                 RETURN(1);
1999         }
2000
2001         RETURN(0);
2002 }
2003
2004 static void on_list(cfs_list_t *item, cfs_list_t *list,
2005                     int should_be_on)
2006 {
2007         if (cfs_list_empty(item) && should_be_on)
2008                 cfs_list_add_tail(item, list);
2009         else if (!cfs_list_empty(item) && !should_be_on)
2010                 cfs_list_del_init(item);
2011 }
2012
2013 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2014  * can find pages to build into rpcs quickly */
2015 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2016 {
2017         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2018             lop_makes_hprpc(&loi->loi_read_lop)) {
2019                 /* HP rpc */
2020                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2021                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2022         } else {
2023                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2024                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2025                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2026                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2027         }
2028
2029         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2030                 loi->loi_write_lop.lop_num_pending);
2031
2032         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2033                 loi->loi_read_lop.lop_num_pending);
2034 }
2035
2036 static void lop_update_pending(struct client_obd *cli,
2037                                struct loi_oap_pages *lop, int cmd, int delta)
2038 {
2039         lop->lop_num_pending += delta;
2040         if (cmd & OBD_BRW_WRITE)
2041                 cli->cl_pending_w_pages += delta;
2042         else
2043                 cli->cl_pending_r_pages += delta;
2044 }
2045
2046 /**
2047  * this is called when a sync waiter receives an interruption.  Its job is to
2048  * get the caller woken as soon as possible.  If its page hasn't been put in an
2049  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2050  * desiring interruption which will forcefully complete the rpc once the rpc
2051  * has timed out.
2052  */
2053 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2054 {
2055         struct loi_oap_pages *lop;
2056         struct lov_oinfo *loi;
2057         int rc = -EBUSY;
2058         ENTRY;
2059
2060         LASSERT(!oap->oap_interrupted);
2061         oap->oap_interrupted = 1;
2062
2063         /* ok, it's been put in an rpc. only one oap gets a request reference */
2064         if (oap->oap_request != NULL) {
2065                 ptlrpc_mark_interrupted(oap->oap_request);
2066                 ptlrpcd_wake(oap->oap_request);
2067                 ptlrpc_req_finished(oap->oap_request);
2068                 oap->oap_request = NULL;
2069         }
2070
2071         /*
2072          * page completion may be called only if ->cpo_prep() method was
2073          * executed by osc_io_submit(), that also adds page the to pending list
2074          */
2075         if (!cfs_list_empty(&oap->oap_pending_item)) {
2076                 cfs_list_del_init(&oap->oap_pending_item);
2077                 cfs_list_del_init(&oap->oap_urgent_item);
2078
2079                 loi = oap->oap_loi;
2080                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2081                         &loi->loi_write_lop : &loi->loi_read_lop;
2082                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2083                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2084                 rc = oap->oap_caller_ops->ap_completion(env,
2085                                           oap->oap_caller_data,
2086                                           oap->oap_cmd, NULL, -EINTR);
2087         }
2088
2089         RETURN(rc);
2090 }
2091
2092 /* this is trying to propogate async writeback errors back up to the
2093  * application.  As an async write fails we record the error code for later if
2094  * the app does an fsync.  As long as errors persist we force future rpcs to be
2095  * sync so that the app can get a sync error and break the cycle of queueing
2096  * pages for which writeback will fail. */
2097 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2098                            int rc)
2099 {
2100         if (rc) {
2101                 if (!ar->ar_rc)
2102                         ar->ar_rc = rc;
2103
2104                 ar->ar_force_sync = 1;
2105                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2106                 return;
2107
2108         }
2109
2110         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2111                 ar->ar_force_sync = 0;
2112 }
2113
2114 void osc_oap_to_pending(struct osc_async_page *oap)
2115 {
2116         struct loi_oap_pages *lop;
2117
2118         if (oap->oap_cmd & OBD_BRW_WRITE)
2119                 lop = &oap->oap_loi->loi_write_lop;
2120         else
2121                 lop = &oap->oap_loi->loi_read_lop;
2122
2123         if (oap->oap_async_flags & ASYNC_HP)
2124                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2125         else if (oap->oap_async_flags & ASYNC_URGENT)
2126                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2127         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2128         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2129 }
2130
2131 /* this must be called holding the loi list lock to give coverage to exit_cache,
2132  * async_flag maintenance, and oap_request */
2133 static void osc_ap_completion(const struct lu_env *env,
2134                               struct client_obd *cli, struct obdo *oa,
2135                               struct osc_async_page *oap, int sent, int rc)
2136 {
2137         __u64 xid = 0;
2138
2139         ENTRY;
2140         if (oap->oap_request != NULL) {
2141                 xid = ptlrpc_req_xid(oap->oap_request);
2142                 ptlrpc_req_finished(oap->oap_request);
2143                 oap->oap_request = NULL;
2144         }
2145
2146         cfs_spin_lock(&oap->oap_lock);
2147         oap->oap_async_flags = 0;
2148         cfs_spin_unlock(&oap->oap_lock);
2149         oap->oap_interrupted = 0;
2150
2151         if (oap->oap_cmd & OBD_BRW_WRITE) {
2152                 osc_process_ar(&cli->cl_ar, xid, rc);
2153                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2154         }
2155
2156         if (rc == 0 && oa != NULL) {
2157                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2158                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2159                 if (oa->o_valid & OBD_MD_FLMTIME)
2160                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2161                 if (oa->o_valid & OBD_MD_FLATIME)
2162                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2163                 if (oa->o_valid & OBD_MD_FLCTIME)
2164                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2165         }
2166
2167         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2168                                                 oap->oap_cmd, oa, rc);
2169
2170         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2171          * I/O on the page could start, but OSC calls it under lock
2172          * and thus we can add oap back to pending safely */
2173         if (rc)
2174                 /* upper layer wants to leave the page on pending queue */
2175                 osc_oap_to_pending(oap);
2176         else
2177                 osc_exit_cache(cli, oap, sent);
2178         EXIT;
2179 }
2180
2181 static int brw_interpret(const struct lu_env *env,
2182                          struct ptlrpc_request *req, void *data, int rc)
2183 {
2184         struct osc_brw_async_args *aa = data;
2185         struct client_obd *cli;
2186         int async;
2187         ENTRY;
2188
2189         rc = osc_brw_fini_request(req, rc);
2190         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2191         if (osc_recoverable_error(rc)) {
2192                 /* Only retry once for mmaped files since the mmaped page
2193                  * might be modified at anytime. We have to retry at least
2194                  * once in case there WAS really a corruption of the page
2195                  * on the network, that was not caused by mmap() modifying
2196                  * the page. Bug11742 */
2197                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2198                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2199                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2200                         rc = 0;
2201                 } else {
2202                         rc = osc_brw_redo_request(req, aa);
2203                         if (rc == 0)
2204                                 RETURN(0);
2205                 }
2206         }
2207
2208         if (aa->aa_ocapa) {
2209                 capa_put(aa->aa_ocapa);
2210                 aa->aa_ocapa = NULL;
2211         }
2212
2213         cli = aa->aa_cli;
2214
2215         client_obd_list_lock(&cli->cl_loi_list_lock);
2216
2217         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2218          * is called so we know whether to go to sync BRWs or wait for more
2219          * RPCs to complete */
2220         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2221                 cli->cl_w_in_flight--;
2222         else
2223                 cli->cl_r_in_flight--;
2224
2225         async = cfs_list_empty(&aa->aa_oaps);
2226         if (!async) { /* from osc_send_oap_rpc() */
2227                 struct osc_async_page *oap, *tmp;
2228                 /* the caller may re-use the oap after the completion call so
2229                  * we need to clean it up a little */
2230                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2231                                              oap_rpc_item) {
2232                         cfs_list_del_init(&oap->oap_rpc_item);
2233                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2234                 }
2235                 OBDO_FREE(aa->aa_oa);
2236         } else { /* from async_internal() */
2237                 obd_count i;
2238                 for (i = 0; i < aa->aa_page_count; i++)
2239                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2240         }
2241         osc_wake_cache_waiters(cli);
2242         osc_check_rpcs(env, cli);
2243         client_obd_list_unlock(&cli->cl_loi_list_lock);
2244         if (!async)
2245                 cl_req_completion(env, aa->aa_clerq, rc);
2246         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2247
2248         RETURN(rc);
2249 }
2250
2251 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2252                                             struct client_obd *cli,
2253                                             cfs_list_t *rpc_list,
2254                                             int page_count, int cmd)
2255 {
2256         struct ptlrpc_request *req;
2257         struct brw_page **pga = NULL;
2258         struct osc_brw_async_args *aa;
2259         struct obdo *oa = NULL;
2260         const struct obd_async_page_ops *ops = NULL;
2261         void *caller_data = NULL;
2262         struct osc_async_page *oap;
2263         struct osc_async_page *tmp;
2264         struct ost_body *body;
2265         struct cl_req *clerq = NULL;
2266         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2267         struct ldlm_lock *lock = NULL;
2268         struct cl_req_attr crattr;
2269         int i, rc, mpflag = 0;
2270
2271         ENTRY;
2272         LASSERT(!cfs_list_empty(rpc_list));
2273
2274         if (cmd & OBD_BRW_MEMALLOC)
2275                 mpflag = cfs_memory_pressure_get_and_set();
2276
2277         memset(&crattr, 0, sizeof crattr);
2278         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2279         if (pga == NULL)
2280                 GOTO(out, req = ERR_PTR(-ENOMEM));
2281
2282         OBDO_ALLOC(oa);
2283         if (oa == NULL)
2284                 GOTO(out, req = ERR_PTR(-ENOMEM));
2285
2286         i = 0;
2287         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2288                 struct cl_page *page = osc_oap2cl_page(oap);
2289                 if (ops == NULL) {
2290                         ops = oap->oap_caller_ops;
2291                         caller_data = oap->oap_caller_data;
2292
2293                         clerq = cl_req_alloc(env, page, crt,
2294                                              1 /* only 1-object rpcs for
2295                                                 * now */);
2296                         if (IS_ERR(clerq))
2297                                 GOTO(out, req = (void *)clerq);
2298                         lock = oap->oap_ldlm_lock;
2299                 }
2300                 pga[i] = &oap->oap_brw_page;
2301                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2302                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2303                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2304                 i++;
2305                 cl_req_page_add(env, clerq, page);
2306         }
2307
2308         /* always get the data for the obdo for the rpc */
2309         LASSERT(ops != NULL);
2310         crattr.cra_oa = oa;
2311         crattr.cra_capa = NULL;
2312         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2313         if (lock) {
2314                 oa->o_handle = lock->l_remote_handle;
2315                 oa->o_valid |= OBD_MD_FLHANDLE;
2316         }
2317
2318         rc = cl_req_prep(env, clerq);
2319         if (rc != 0) {
2320                 CERROR("cl_req_prep failed: %d\n", rc);
2321                 GOTO(out, req = ERR_PTR(rc));
2322         }
2323
2324         sort_brw_pages(pga, page_count);
2325         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2326                                   pga, &req, crattr.cra_capa, 1);
2327         if (rc != 0) {
2328                 CERROR("prep_req failed: %d\n", rc);
2329                 GOTO(out, req = ERR_PTR(rc));
2330         }
2331
2332         if (cmd & OBD_BRW_MEMALLOC)
2333                 req->rq_memalloc = 1;
2334
2335         /* Need to update the timestamps after the request is built in case
2336          * we race with setattr (locally or in queue at OST).  If OST gets
2337          * later setattr before earlier BRW (as determined by the request xid),
2338          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2339          * way to do this in a single call.  bug 10150 */
2340         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2341         cl_req_attr_set(env, clerq, &crattr,
2342                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2343
2344         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2345         aa = ptlrpc_req_async_args(req);
2346         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2347         cfs_list_splice(rpc_list, &aa->aa_oaps);
2348         CFS_INIT_LIST_HEAD(rpc_list);
2349         aa->aa_clerq = clerq;
2350 out:
2351         if (cmd & OBD_BRW_MEMALLOC)
2352                 cfs_memory_pressure_restore(mpflag);
2353
2354         capa_put(crattr.cra_capa);
2355         if (IS_ERR(req)) {
2356                 if (oa)
2357                         OBDO_FREE(oa);
2358                 if (pga)
2359                         OBD_FREE(pga, sizeof(*pga) * page_count);
2360                 /* this should happen rarely and is pretty bad, it makes the
2361                  * pending list not follow the dirty order */
2362                 client_obd_list_lock(&cli->cl_loi_list_lock);
2363                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2364                         cfs_list_del_init(&oap->oap_rpc_item);
2365
2366                         /* queued sync pages can be torn down while the pages
2367                          * were between the pending list and the rpc */
2368                         if (oap->oap_interrupted) {
2369                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2370                                 osc_ap_completion(env, cli, NULL, oap, 0,
2371                                                   oap->oap_count);
2372                                 continue;
2373                         }
2374                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2375                 }
2376                 if (clerq && !IS_ERR(clerq))
2377                         cl_req_completion(env, clerq, PTR_ERR(req));
2378         }
2379         RETURN(req);
2380 }
2381
2382 /**
2383  * prepare pages for ASYNC io and put pages in send queue.
2384  *
2385  * \param cmd OBD_BRW_* macroses
2386  * \param lop pending pages
2387  *
2388  * \return zero if no page added to send queue.
2389  * \return 1 if pages successfully added to send queue.
2390  * \return negative on errors.
2391  */
2392 static int
2393 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2394                  struct lov_oinfo *loi,
2395                  int cmd, struct loi_oap_pages *lop)
2396 {
2397         struct ptlrpc_request *req;
2398         obd_count page_count = 0;
2399         struct osc_async_page *oap = NULL, *tmp;
2400         struct osc_brw_async_args *aa;
2401         const struct obd_async_page_ops *ops;
2402         CFS_LIST_HEAD(rpc_list);
2403         CFS_LIST_HEAD(tmp_list);
2404         unsigned int ending_offset;
2405         unsigned  starting_offset = 0;
2406         int srvlock = 0, mem_tight = 0;
2407         struct cl_object *clob = NULL;
2408         ENTRY;
2409
2410         /* ASYNC_HP pages first. At present, when the lock the pages is
2411          * to be canceled, the pages covered by the lock will be sent out
2412          * with ASYNC_HP. We have to send out them as soon as possible. */
2413         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2414                 if (oap->oap_async_flags & ASYNC_HP) 
2415                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2416                 else
2417                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2418                 if (++page_count >= cli->cl_max_pages_per_rpc)
2419                         break;
2420         }
2421
2422         cfs_list_splice(&tmp_list, &lop->lop_pending);
2423         page_count = 0;
2424
2425         /* first we find the pages we're allowed to work with */
2426         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2427                                      oap_pending_item) {
2428                 ops = oap->oap_caller_ops;
2429
2430                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2431                          "magic 0x%x\n", oap, oap->oap_magic);
2432
2433                 if (clob == NULL) {
2434                         /* pin object in memory, so that completion call-backs
2435                          * can be safely called under client_obd_list lock. */
2436                         clob = osc_oap2cl_page(oap)->cp_obj;
2437                         cl_object_get(clob);
2438                 }
2439
2440                 if (page_count != 0 &&
2441                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2442                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2443                                " oap %p, page %p, srvlock %u\n",
2444                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2445                         break;
2446                 }
2447
2448                 /* If there is a gap at the start of this page, it can't merge
2449                  * with any previous page, so we'll hand the network a
2450                  * "fragmented" page array that it can't transfer in 1 RDMA */
2451                 if (page_count != 0 && oap->oap_page_off != 0)
2452                         break;
2453
2454                 /* in llite being 'ready' equates to the page being locked
2455                  * until completion unlocks it.  commit_write submits a page
2456                  * as not ready because its unlock will happen unconditionally
2457                  * as the call returns.  if we race with commit_write giving
2458                  * us that page we don't want to create a hole in the page
2459                  * stream, so we stop and leave the rpc to be fired by
2460                  * another dirtier or kupdated interval (the not ready page
2461                  * will still be on the dirty list).  we could call in
2462                  * at the end of ll_file_write to process the queue again. */
2463                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2464                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2465                                                     cmd);
2466                         if (rc < 0)
2467                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2468                                                 "instead of ready\n", oap,
2469                                                 oap->oap_page, rc);
2470                         switch (rc) {
2471                         case -EAGAIN:
2472                                 /* llite is telling us that the page is still
2473                                  * in commit_write and that we should try
2474                                  * and put it in an rpc again later.  we
2475                                  * break out of the loop so we don't create
2476                                  * a hole in the sequence of pages in the rpc
2477                                  * stream.*/
2478                                 oap = NULL;
2479                                 break;
2480                         case -EINTR:
2481                                 /* the io isn't needed.. tell the checks
2482                                  * below to complete the rpc with EINTR */
2483                                 cfs_spin_lock(&oap->oap_lock);
2484                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2485                                 cfs_spin_unlock(&oap->oap_lock);
2486                                 oap->oap_count = -EINTR;
2487                                 break;
2488                         case 0:
2489                                 cfs_spin_lock(&oap->oap_lock);
2490                                 oap->oap_async_flags |= ASYNC_READY;
2491                                 cfs_spin_unlock(&oap->oap_lock);
2492                                 break;
2493                         default:
2494                                 LASSERTF(0, "oap %p page %p returned %d "
2495                                             "from make_ready\n", oap,
2496                                             oap->oap_page, rc);
2497                                 break;
2498                         }
2499                 }
2500                 if (oap == NULL)
2501                         break;
2502                 /*
2503                  * Page submitted for IO has to be locked. Either by
2504                  * ->ap_make_ready() or by higher layers.
2505                  */
2506 #if defined(__KERNEL__) && defined(__linux__)
2507                 {
2508                         struct cl_page *page;
2509
2510                         page = osc_oap2cl_page(oap);
2511
2512                         if (page->cp_type == CPT_CACHEABLE &&
2513                             !(PageLocked(oap->oap_page) &&
2514                               (CheckWriteback(oap->oap_page, cmd)))) {
2515                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2516                                        oap->oap_page,
2517                                        (long)oap->oap_page->flags,
2518                                        oap->oap_async_flags);
2519                                 LBUG();
2520                         }
2521                 }
2522 #endif
2523
2524                 /* take the page out of our book-keeping */
2525                 cfs_list_del_init(&oap->oap_pending_item);
2526                 lop_update_pending(cli, lop, cmd, -1);
2527                 cfs_list_del_init(&oap->oap_urgent_item);
2528
2529                 if (page_count == 0)
2530                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2531                                           (PTLRPC_MAX_BRW_SIZE - 1);
2532
2533                 /* ask the caller for the size of the io as the rpc leaves. */
2534                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2535                         oap->oap_count =
2536                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2537                                                       cmd);
2538                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2539                 }
2540                 if (oap->oap_count <= 0) {
2541                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2542                                oap->oap_count);
2543                         osc_ap_completion(env, cli, NULL,
2544                                           oap, 0, oap->oap_count);
2545                         continue;
2546                 }
2547
2548                 /* now put the page back in our accounting */
2549                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2550                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2551                         mem_tight = 1;
2552                 if (page_count == 0)
2553                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2554                 if (++page_count >= cli->cl_max_pages_per_rpc)
2555                         break;
2556
2557                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2558                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2559                  * have the same alignment as the initial writes that allocated
2560                  * extents on the server. */
2561                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2562                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2563                 if (ending_offset == 0)
2564                         break;
2565
2566                 /* If there is a gap at the end of this page, it can't merge
2567                  * with any subsequent pages, so we'll hand the network a
2568                  * "fragmented" page array that it can't transfer in 1 RDMA */
2569                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2570                         break;
2571         }
2572
2573         osc_wake_cache_waiters(cli);
2574
2575         loi_list_maint(cli, loi);
2576
2577         client_obd_list_unlock(&cli->cl_loi_list_lock);
2578
2579         if (clob != NULL)
2580                 cl_object_put(env, clob);
2581
2582         if (page_count == 0) {
2583                 client_obd_list_lock(&cli->cl_loi_list_lock);
2584                 RETURN(0);
2585         }
2586
2587         req = osc_build_req(env, cli, &rpc_list, page_count,
2588                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2589         if (IS_ERR(req)) {
2590                 LASSERT(cfs_list_empty(&rpc_list));
2591                 loi_list_maint(cli, loi);
2592                 RETURN(PTR_ERR(req));
2593         }
2594
2595         aa = ptlrpc_req_async_args(req);
2596
2597         if (cmd == OBD_BRW_READ) {
2598                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2599                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2600                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2601                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2602         } else {
2603                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2604                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2605                                  cli->cl_w_in_flight);
2606                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2607                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2608         }
2609         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2610
2611         client_obd_list_lock(&cli->cl_loi_list_lock);
2612
2613         if (cmd == OBD_BRW_READ)
2614                 cli->cl_r_in_flight++;
2615         else
2616                 cli->cl_w_in_flight++;
2617
2618         /* queued sync pages can be torn down while the pages
2619          * were between the pending list and the rpc */
2620         tmp = NULL;
2621         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2622                 /* only one oap gets a request reference */
2623                 if (tmp == NULL)
2624                         tmp = oap;
2625                 if (oap->oap_interrupted && !req->rq_intr) {
2626                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2627                                oap, req);
2628                         ptlrpc_mark_interrupted(req);
2629                 }
2630         }
2631         if (tmp != NULL)
2632                 tmp->oap_request = ptlrpc_request_addref(req);
2633
2634         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2635                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2636
2637         req->rq_interpret_reply = brw_interpret;
2638         ptlrpcd_add_req(req, PSCOPE_BRW);
2639         RETURN(1);
2640 }
2641
2642 #define LOI_DEBUG(LOI, STR, args...)                                     \
2643         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2644                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2645                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2646                (LOI)->loi_write_lop.lop_num_pending,                     \
2647                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2648                (LOI)->loi_read_lop.lop_num_pending,                      \
2649                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2650                args)                                                     \
2651
2652 /* This is called by osc_check_rpcs() to find which objects have pages that
2653  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2654 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2655 {
2656         ENTRY;
2657
2658         /* First return objects that have blocked locks so that they
2659          * will be flushed quickly and other clients can get the lock,
2660          * then objects which have pages ready to be stuffed into RPCs */
2661         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2662                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2663                                       struct lov_oinfo, loi_hp_ready_item));
2664         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2665                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2666                                       struct lov_oinfo, loi_ready_item));
2667
2668         /* then if we have cache waiters, return all objects with queued
2669          * writes.  This is especially important when many small files
2670          * have filled up the cache and not been fired into rpcs because
2671          * they don't pass the nr_pending/object threshhold */
2672         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2673             !cfs_list_empty(&cli->cl_loi_write_list))
2674                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2675                                       struct lov_oinfo, loi_write_item));
2676
2677         /* then return all queued objects when we have an invalid import
2678          * so that they get flushed */
2679         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2680                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2681                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2682                                               struct lov_oinfo,
2683                                               loi_write_item));
2684                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2685                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2686                                               struct lov_oinfo, loi_read_item));
2687         }
2688         RETURN(NULL);
2689 }
2690
2691 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2692 {
2693         struct osc_async_page *oap;
2694         int hprpc = 0;
2695
2696         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2697                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2698                                      struct osc_async_page, oap_urgent_item);
2699                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2700         }
2701
2702         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2703                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2704                                      struct osc_async_page, oap_urgent_item);
2705                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2706         }
2707
2708         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2709 }
2710
2711 /* called with the loi list lock held */
2712 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2713 {
2714         struct lov_oinfo *loi;
2715         int rc = 0, race_counter = 0;
2716         ENTRY;
2717
2718         while ((loi = osc_next_loi(cli)) != NULL) {
2719                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2720
2721                 if (osc_max_rpc_in_flight(cli, loi))
2722                         break;
2723
2724                 /* attempt some read/write balancing by alternating between
2725                  * reads and writes in an object.  The makes_rpc checks here
2726                  * would be redundant if we were getting read/write work items
2727                  * instead of objects.  we don't want send_oap_rpc to drain a
2728                  * partial read pending queue when we're given this object to
2729                  * do io on writes while there are cache waiters */
2730                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2731                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2732                                               &loi->loi_write_lop);
2733                         if (rc < 0) {
2734                                 CERROR("Write request failed with %d\n", rc);
2735
2736                                 /* osc_send_oap_rpc failed, mostly because of
2737                                  * memory pressure.
2738                                  *
2739                                  * It can't break here, because if:
2740                                  *  - a page was submitted by osc_io_submit, so
2741                                  *    page locked;
2742                                  *  - no request in flight
2743                                  *  - no subsequent request
2744                                  * The system will be in live-lock state,
2745                                  * because there is no chance to call
2746                                  * osc_io_unplug() and osc_check_rpcs() any
2747                                  * more. pdflush can't help in this case,
2748                                  * because it might be blocked at grabbing
2749                                  * the page lock as we mentioned.
2750                                  *
2751                                  * Anyway, continue to drain pages. */
2752                                 /* break; */
2753                         }
2754
2755                         if (rc > 0)
2756                                 race_counter = 0;
2757                         else
2758                                 race_counter++;
2759                 }
2760                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2761                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2762                                               &loi->loi_read_lop);
2763                         if (rc < 0)
2764                                 CERROR("Read request failed with %d\n", rc);
2765
2766                         if (rc > 0)
2767                                 race_counter = 0;
2768                         else
2769                                 race_counter++;
2770                 }
2771
2772                 /* attempt some inter-object balancing by issuing rpcs
2773                  * for each object in turn */
2774                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2775                         cfs_list_del_init(&loi->loi_hp_ready_item);
2776                 if (!cfs_list_empty(&loi->loi_ready_item))
2777                         cfs_list_del_init(&loi->loi_ready_item);
2778                 if (!cfs_list_empty(&loi->loi_write_item))
2779                         cfs_list_del_init(&loi->loi_write_item);
2780                 if (!cfs_list_empty(&loi->loi_read_item))
2781                         cfs_list_del_init(&loi->loi_read_item);
2782
2783                 loi_list_maint(cli, loi);
2784
2785                 /* send_oap_rpc fails with 0 when make_ready tells it to
2786                  * back off.  llite's make_ready does this when it tries
2787                  * to lock a page queued for write that is already locked.
2788                  * we want to try sending rpcs from many objects, but we
2789                  * don't want to spin failing with 0.  */
2790                 if (race_counter == 10)
2791                         break;
2792         }
2793         EXIT;
2794 }
2795
2796 /* we're trying to queue a page in the osc so we're subject to the
2797  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2798  * If the osc's queued pages are already at that limit, then we want to sleep
2799  * until there is space in the osc's queue for us.  We also may be waiting for
2800  * write credits from the OST if there are RPCs in flight that may return some
2801  * before we fall back to sync writes.
2802  *
2803  * We need this know our allocation was granted in the presence of signals */
2804 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2805 {
2806         int rc;
2807         ENTRY;
2808         client_obd_list_lock(&cli->cl_loi_list_lock);
2809         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2810         client_obd_list_unlock(&cli->cl_loi_list_lock);
2811         RETURN(rc);
2812 };
2813
2814 /**
2815  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2816  * is available.
2817  */
2818 int osc_enter_cache_try(const struct lu_env *env,
2819                         struct client_obd *cli, struct lov_oinfo *loi,
2820                         struct osc_async_page *oap, int transient)
2821 {
2822         int has_grant;
2823
2824         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2825         if (has_grant) {
2826                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2827                 if (transient) {
2828                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2829                         cfs_atomic_inc(&obd_dirty_transit_pages);
2830                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2831                 }
2832         }
2833         return has_grant;
2834 }
2835
2836 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2837  * grant or cache space. */
2838 static int osc_enter_cache(const struct lu_env *env,
2839                            struct client_obd *cli, struct lov_oinfo *loi,
2840                            struct osc_async_page *oap)
2841 {
2842         struct osc_cache_waiter ocw;
2843         struct l_wait_info lwi = { 0 };
2844
2845         ENTRY;
2846
2847         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2848                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2849                cli->cl_dirty_max, obd_max_dirty_pages,
2850                cli->cl_lost_grant, cli->cl_avail_grant);
2851
2852         /* force the caller to try sync io.  this can jump the list
2853          * of queued writes and create a discontiguous rpc stream */
2854         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2855             loi->loi_ar.ar_force_sync)
2856                 RETURN(-EDQUOT);
2857
2858         /* Hopefully normal case - cache space and write credits available */
2859         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2860             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2861             osc_enter_cache_try(env, cli, loi, oap, 0))
2862                 RETURN(0);
2863
2864         /* It is safe to block as a cache waiter as long as there is grant
2865          * space available or the hope of additional grant being returned
2866          * when an in flight write completes.  Using the write back cache
2867          * if possible is preferable to sending the data synchronously
2868          * because write pages can then be merged in to large requests.
2869          * The addition of this cache waiter will causing pending write
2870          * pages to be sent immediately. */
2871         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2872                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2873                 cfs_waitq_init(&ocw.ocw_waitq);
2874                 ocw.ocw_oap = oap;
2875                 ocw.ocw_rc = 0;
2876
2877                 loi_list_maint(cli, loi);
2878                 osc_check_rpcs(env, cli);
2879                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2880
2881                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2882                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2883
2884                 client_obd_list_lock(&cli->cl_loi_list_lock);
2885                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2886                         cfs_list_del(&ocw.ocw_entry);
2887                         RETURN(-EINTR);
2888                 }
2889                 RETURN(ocw.ocw_rc);
2890         }
2891
2892         RETURN(-EDQUOT);
2893 }
2894
2895
2896 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2897                         struct lov_oinfo *loi, cfs_page_t *page,
2898                         obd_off offset, const struct obd_async_page_ops *ops,
2899                         void *data, void **res, int nocache,
2900                         struct lustre_handle *lockh)
2901 {
2902         struct osc_async_page *oap;
2903
2904         ENTRY;
2905
2906         if (!page)
2907                 return cfs_size_round(sizeof(*oap));
2908
2909         oap = *res;
2910         oap->oap_magic = OAP_MAGIC;
2911         oap->oap_cli = &exp->exp_obd->u.cli;
2912         oap->oap_loi = loi;
2913
2914         oap->oap_caller_ops = ops;
2915         oap->oap_caller_data = data;
2916
2917         oap->oap_page = page;
2918         oap->oap_obj_off = offset;
2919         if (!client_is_remote(exp) &&
2920             cfs_capable(CFS_CAP_SYS_RESOURCE))
2921                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2922
2923         LASSERT(!(offset & ~CFS_PAGE_MASK));
2924
2925         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2926         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2927         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2928         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2929
2930         cfs_spin_lock_init(&oap->oap_lock);
2931         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2932         RETURN(0);
2933 }
2934
2935 struct osc_async_page *oap_from_cookie(void *cookie)
2936 {
2937         struct osc_async_page *oap = cookie;
2938         if (oap->oap_magic != OAP_MAGIC)
2939                 return ERR_PTR(-EINVAL);
2940         return oap;
2941 };
2942
2943 int osc_queue_async_io(const struct lu_env *env,
2944                        struct obd_export *exp, struct lov_stripe_md *lsm,
2945                        struct lov_oinfo *loi, void *cookie,
2946                        int cmd, obd_off off, int count,
2947                        obd_flag brw_flags, enum async_flags async_flags)
2948 {
2949         struct client_obd *cli = &exp->exp_obd->u.cli;
2950         struct osc_async_page *oap;
2951         int rc = 0;
2952         ENTRY;
2953
2954         oap = oap_from_cookie(cookie);
2955         if (IS_ERR(oap))
2956                 RETURN(PTR_ERR(oap));
2957
2958         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2959                 RETURN(-EIO);
2960
2961         if (!cfs_list_empty(&oap->oap_pending_item) ||
2962             !cfs_list_empty(&oap->oap_urgent_item) ||
2963             !cfs_list_empty(&oap->oap_rpc_item))
2964                 RETURN(-EBUSY);
2965
2966         /* check if the file's owner/group is over quota */
2967         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2968                 struct cl_object *obj;
2969                 struct cl_attr    attr; /* XXX put attr into thread info */
2970                 unsigned int qid[MAXQUOTAS];
2971
2972                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2973
2974                 cl_object_attr_lock(obj);
2975                 rc = cl_object_attr_get(env, obj, &attr);
2976                 cl_object_attr_unlock(obj);
2977
2978                 qid[USRQUOTA] = attr.cat_uid;
2979                 qid[GRPQUOTA] = attr.cat_gid;
2980                 if (rc == 0 &&
2981                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2982                         rc = -EDQUOT;
2983                 if (rc)
2984                         RETURN(rc);
2985         }
2986
2987         if (loi == NULL)
2988                 loi = lsm->lsm_oinfo[0];
2989
2990         client_obd_list_lock(&cli->cl_loi_list_lock);
2991
2992         LASSERT(off + count <= CFS_PAGE_SIZE);
2993         oap->oap_cmd = cmd;
2994         oap->oap_page_off = off;
2995         oap->oap_count = count;
2996         oap->oap_brw_flags = brw_flags;
2997         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2998         if (cfs_memory_pressure_get())
2999                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3000         cfs_spin_lock(&oap->oap_lock);
3001         oap->oap_async_flags = async_flags;
3002         cfs_spin_unlock(&oap->oap_lock);
3003
3004         if (cmd & OBD_BRW_WRITE) {
3005                 rc = osc_enter_cache(env, cli, loi, oap);
3006                 if (rc) {
3007                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3008                         RETURN(rc);
3009                 }
3010         }
3011
3012         osc_oap_to_pending(oap);
3013         loi_list_maint(cli, loi);
3014
3015         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3016                   cmd);
3017
3018         osc_check_rpcs(env, cli);
3019         client_obd_list_unlock(&cli->cl_loi_list_lock);
3020
3021         RETURN(0);
3022 }
3023
3024 /* aka (~was & now & flag), but this is more clear :) */
3025 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3026
3027 int osc_set_async_flags_base(struct client_obd *cli,
3028                              struct lov_oinfo *loi, struct osc_async_page *oap,
3029                              obd_flag async_flags)
3030 {
3031         struct loi_oap_pages *lop;
3032         int flags = 0;
3033         ENTRY;
3034
3035         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3036
3037         if (oap->oap_cmd & OBD_BRW_WRITE) {
3038                 lop = &loi->loi_write_lop;
3039         } else {
3040                 lop = &loi->loi_read_lop;
3041         }
3042
3043         if ((oap->oap_async_flags & async_flags) == async_flags)
3044                 RETURN(0);
3045
3046         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3047                 flags |= ASYNC_READY;
3048
3049         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3050             cfs_list_empty(&oap->oap_rpc_item)) {
3051                 if (oap->oap_async_flags & ASYNC_HP)
3052                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3053                 else
3054                         cfs_list_add_tail(&oap->oap_urgent_item,
3055                                           &lop->lop_urgent);
3056                 flags |= ASYNC_URGENT;
3057                 loi_list_maint(cli, loi);
3058         }
3059         cfs_spin_lock(&oap->oap_lock);
3060         oap->oap_async_flags |= flags;
3061         cfs_spin_unlock(&oap->oap_lock);
3062
3063         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3064                         oap->oap_async_flags);
3065         RETURN(0);
3066 }
3067
3068 int osc_teardown_async_page(struct obd_export *exp,
3069                             struct lov_stripe_md *lsm,
3070                             struct lov_oinfo *loi, void *cookie)
3071 {
3072         struct client_obd *cli = &exp->exp_obd->u.cli;
3073         struct loi_oap_pages *lop;
3074         struct osc_async_page *oap;
3075         int rc = 0;
3076         ENTRY;
3077
3078         oap = oap_from_cookie(cookie);
3079         if (IS_ERR(oap))
3080                 RETURN(PTR_ERR(oap));
3081
3082         if (loi == NULL)
3083                 loi = lsm->lsm_oinfo[0];
3084
3085         if (oap->oap_cmd & OBD_BRW_WRITE) {
3086                 lop = &loi->loi_write_lop;
3087         } else {
3088                 lop = &loi->loi_read_lop;
3089         }
3090
3091         client_obd_list_lock(&cli->cl_loi_list_lock);
3092
3093         if (!cfs_list_empty(&oap->oap_rpc_item))
3094                 GOTO(out, rc = -EBUSY);
3095
3096         osc_exit_cache(cli, oap, 0);
3097         osc_wake_cache_waiters(cli);
3098
3099         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3100                 cfs_list_del_init(&oap->oap_urgent_item);
3101                 cfs_spin_lock(&oap->oap_lock);
3102                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3103                 cfs_spin_unlock(&oap->oap_lock);
3104         }
3105         if (!cfs_list_empty(&oap->oap_pending_item)) {
3106                 cfs_list_del_init(&oap->oap_pending_item);
3107                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3108         }
3109         loi_list_maint(cli, loi);
3110         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3111 out:
3112         client_obd_list_unlock(&cli->cl_loi_list_lock);
3113         RETURN(rc);
3114 }
3115
3116 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3117                                          struct ldlm_enqueue_info *einfo,
3118                                          int flags)
3119 {
3120         void *data = einfo->ei_cbdata;
3121
3122         LASSERT(lock != NULL);
3123         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3124         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3125         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3126         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3127
3128         lock_res_and_lock(lock);
3129         cfs_spin_lock(&osc_ast_guard);
3130         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3131         lock->l_ast_data = data;
3132         cfs_spin_unlock(&osc_ast_guard);
3133         unlock_res_and_lock(lock);
3134 }
3135
3136 static void osc_set_data_with_check(struct lustre_handle *lockh,
3137                                     struct ldlm_enqueue_info *einfo,
3138                                     int flags)
3139 {
3140         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3141
3142         if (lock != NULL) {
3143                 osc_set_lock_data_with_check(lock, einfo, flags);
3144                 LDLM_LOCK_PUT(lock);
3145         } else
3146                 CERROR("lockh %p, data %p - client evicted?\n",
3147                        lockh, einfo->ei_cbdata);
3148 }
3149
3150 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3151                              ldlm_iterator_t replace, void *data)
3152 {
3153         struct ldlm_res_id res_id;
3154         struct obd_device *obd = class_exp2obd(exp);
3155
3156         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3157         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3158         return 0;
3159 }
3160
3161 /* find any ldlm lock of the inode in osc
3162  * return 0    not find
3163  *        1    find one
3164  *      < 0    error */
3165 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3166                            ldlm_iterator_t replace, void *data)
3167 {
3168         struct ldlm_res_id res_id;
3169         struct obd_device *obd = class_exp2obd(exp);
3170         int rc = 0;
3171
3172         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3173         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3174         if (rc == LDLM_ITER_STOP)
3175                 return(1);
3176         if (rc == LDLM_ITER_CONTINUE)
3177                 return(0);
3178         return(rc);
3179 }
3180
3181 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3182                             obd_enqueue_update_f upcall, void *cookie,
3183                             int *flags, int rc)
3184 {
3185         int intent = *flags & LDLM_FL_HAS_INTENT;
3186         ENTRY;
3187
3188         if (intent) {
3189                 /* The request was created before ldlm_cli_enqueue call. */
3190                 if (rc == ELDLM_LOCK_ABORTED) {
3191                         struct ldlm_reply *rep;
3192                         rep = req_capsule_server_get(&req->rq_pill,
3193                                                      &RMF_DLM_REP);
3194
3195                         LASSERT(rep != NULL);
3196                         if (rep->lock_policy_res1)
3197                                 rc = rep->lock_policy_res1;
3198                 }
3199         }
3200
3201         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3202                 *flags |= LDLM_FL_LVB_READY;
3203                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3204                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3205         }
3206
3207         /* Call the update callback. */
3208         rc = (*upcall)(cookie, rc);
3209         RETURN(rc);
3210 }
3211
3212 static int osc_enqueue_interpret(const struct lu_env *env,
3213                                  struct ptlrpc_request *req,
3214                                  struct osc_enqueue_args *aa, int rc)
3215 {
3216         struct ldlm_lock *lock;
3217         struct lustre_handle handle;
3218         __u32 mode;
3219
3220         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3221          * might be freed anytime after lock upcall has been called. */
3222         lustre_handle_copy(&handle, aa->oa_lockh);
3223         mode = aa->oa_ei->ei_mode;
3224
3225         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3226          * be valid. */
3227         lock = ldlm_handle2lock(&handle);
3228
3229         /* Take an additional reference so that a blocking AST that
3230          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3231          * to arrive after an upcall has been executed by
3232          * osc_enqueue_fini(). */
3233         ldlm_lock_addref(&handle, mode);
3234
3235         /* Let CP AST to grant the lock first. */
3236         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3237
3238         /* Complete obtaining the lock procedure. */
3239         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3240                                    mode, aa->oa_flags, aa->oa_lvb,
3241                                    sizeof(*aa->oa_lvb), &handle, rc);
3242         /* Complete osc stuff. */
3243         rc = osc_enqueue_fini(req, aa->oa_lvb,
3244                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3245
3246         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3247
3248         /* Release the lock for async request. */
3249         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3250                 /*
3251                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3252                  * not already released by
3253                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3254                  */
3255                 ldlm_lock_decref(&handle, mode);
3256
3257         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3258                  aa->oa_lockh, req, aa);
3259         ldlm_lock_decref(&handle, mode);
3260         LDLM_LOCK_PUT(lock);
3261         return rc;
3262 }
3263
3264 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3265                         struct lov_oinfo *loi, int flags,
3266                         struct ost_lvb *lvb, __u32 mode, int rc)
3267 {
3268         if (rc == ELDLM_OK) {
3269                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3270                 __u64 tmp;
3271
3272                 LASSERT(lock != NULL);
3273                 loi->loi_lvb = *lvb;
3274                 tmp = loi->loi_lvb.lvb_size;
3275                 /* Extend KMS up to the end of this lock and no further
3276                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3277                 if (tmp > lock->l_policy_data.l_extent.end)
3278                         tmp = lock->l_policy_data.l_extent.end + 1;
3279                 if (tmp >= loi->loi_kms) {
3280                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3281                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3282                         loi_kms_set(loi, tmp);
3283                 } else {
3284                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3285                                    LPU64"; leaving kms="LPU64", end="LPU64,
3286                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3287                                    lock->l_policy_data.l_extent.end);
3288                 }
3289                 ldlm_lock_allow_match(lock);
3290                 LDLM_LOCK_PUT(lock);
3291         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3292                 loi->loi_lvb = *lvb;
3293                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3294                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3295                 rc = ELDLM_OK;
3296         }
3297 }
3298 EXPORT_SYMBOL(osc_update_enqueue);
3299
3300 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3301
3302 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3303  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3304  * other synchronous requests, however keeping some locks and trying to obtain
3305  * others may take a considerable amount of time in a case of ost failure; and
3306  * when other sync requests do not get released lock from a client, the client
3307  * is excluded from the cluster -- such scenarious make the life difficult, so
3308  * release locks just after they are obtained. */
3309 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3310                      int *flags, ldlm_policy_data_t *policy,
3311                      struct ost_lvb *lvb, int kms_valid,
3312                      obd_enqueue_update_f upcall, void *cookie,
3313                      struct ldlm_enqueue_info *einfo,
3314                      struct lustre_handle *lockh,
3315                      struct ptlrpc_request_set *rqset, int async)
3316 {
3317         struct obd_device *obd = exp->exp_obd;
3318         struct ptlrpc_request *req = NULL;
3319         int intent = *flags & LDLM_FL_HAS_INTENT;
3320         ldlm_mode_t mode;
3321         int rc;
3322         ENTRY;
3323
3324         /* Filesystem lock extents are extended to page boundaries so that
3325          * dealing with the page cache is a little smoother.  */
3326         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3327         policy->l_extent.end |= ~CFS_PAGE_MASK;
3328
3329         /*
3330          * kms is not valid when either object is completely fresh (so that no
3331          * locks are cached), or object was evicted. In the latter case cached
3332          * lock cannot be used, because it would prime inode state with
3333          * potentially stale LVB.
3334          */
3335         if (!kms_valid)
3336                 goto no_match;
3337
3338         /* Next, search for already existing extent locks that will cover us */
3339         /* If we're trying to read, we also search for an existing PW lock.  The
3340          * VFS and page cache already protect us locally, so lots of readers/
3341          * writers can share a single PW lock.
3342          *
3343          * There are problems with conversion deadlocks, so instead of
3344          * converting a read lock to a write lock, we'll just enqueue a new
3345          * one.
3346          *
3347          * At some point we should cancel the read lock instead of making them
3348          * send us a blocking callback, but there are problems with canceling
3349          * locks out from other users right now, too. */
3350         mode = einfo->ei_mode;
3351         if (einfo->ei_mode == LCK_PR)
3352                 mode |= LCK_PW;
3353         mode = ldlm_lock_match(obd->obd_namespace,
3354                                *flags | LDLM_FL_LVB_READY, res_id,
3355                                einfo->ei_type, policy, mode, lockh, 0);
3356         if (mode) {
3357                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3358
3359                 if (matched->l_ast_data == NULL ||
3360                     matched->l_ast_data == einfo->ei_cbdata) {
3361                         /* addref the lock only if not async requests and PW
3362                          * lock is matched whereas we asked for PR. */
3363                         if (!rqset && einfo->ei_mode != mode)
3364                                 ldlm_lock_addref(lockh, LCK_PR);
3365                         osc_set_lock_data_with_check(matched, einfo, *flags);
3366                         if (intent) {
3367                                 /* I would like to be able to ASSERT here that
3368                                  * rss <= kms, but I can't, for reasons which
3369                                  * are explained in lov_enqueue() */
3370                         }
3371
3372                         /* We already have a lock, and it's referenced */
3373                         (*upcall)(cookie, ELDLM_OK);
3374
3375                         /* For async requests, decref the lock. */
3376                         if (einfo->ei_mode != mode)
3377                                 ldlm_lock_decref(lockh, LCK_PW);
3378                         else if (rqset)
3379                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3380                         LDLM_LOCK_PUT(matched);
3381                         RETURN(ELDLM_OK);
3382                 } else
3383                         ldlm_lock_decref(lockh, mode);
3384                 LDLM_LOCK_PUT(matched);
3385         }
3386
3387  no_match:
3388         if (intent) {
3389                 CFS_LIST_HEAD(cancels);
3390                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3391                                            &RQF_LDLM_ENQUEUE_LVB);
3392                 if (req == NULL)
3393                         RETURN(-ENOMEM);
3394
3395                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3396                 if (rc) {
3397                         ptlrpc_request_free(req);
3398                         RETURN(rc);
3399                 }
3400
3401                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3402                                      sizeof *lvb);
3403                 ptlrpc_request_set_replen(req);
3404         }
3405
3406         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3407         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3408
3409         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3410                               sizeof(*lvb), lockh, async);
3411         if (rqset) {
3412                 if (!rc) {
3413                         struct osc_enqueue_args *aa;
3414                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3415                         aa = ptlrpc_req_async_args(req);
3416                         aa->oa_ei = einfo;
3417                         aa->oa_exp = exp;
3418                         aa->oa_flags  = flags;
3419                         aa->oa_upcall = upcall;
3420                         aa->oa_cookie = cookie;
3421                         aa->oa_lvb    = lvb;
3422                         aa->oa_lockh  = lockh;
3423
3424                         req->rq_interpret_reply =
3425                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3426                         if (rqset == PTLRPCD_SET)
3427                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3428                         else
3429                                 ptlrpc_set_add_req(rqset, req);
3430                 } else if (intent) {
3431                         ptlrpc_req_finished(req);
3432                 }
3433                 RETURN(rc);
3434         }
3435
3436         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3437         if (intent)
3438                 ptlrpc_req_finished(req);
3439
3440         RETURN(rc);
3441 }
3442
3443 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3444                        struct ldlm_enqueue_info *einfo,
3445                        struct ptlrpc_request_set *rqset)
3446 {
3447         struct ldlm_res_id res_id;
3448         int rc;
3449         ENTRY;
3450
3451         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3452                            oinfo->oi_md->lsm_object_seq, &res_id);
3453
3454         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3455                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3456                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3457                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3458                               rqset, rqset != NULL);
3459         RETURN(rc);
3460 }
3461
3462 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3463                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3464                    int *flags, void *data, struct lustre_handle *lockh,
3465                    int unref)
3466 {
3467         struct obd_device *obd = exp->exp_obd;
3468         int lflags = *flags;
3469         ldlm_mode_t rc;
3470         ENTRY;
3471
3472         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3473                 RETURN(-EIO);
3474
3475         /* Filesystem lock extents are extended to page boundaries so that
3476          * dealing with the page cache is a little smoother */
3477         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3478         policy->l_extent.end |= ~CFS_PAGE_MASK;
3479
3480         /* Next, search for already existing extent locks that will cover us */
3481         /* If we're trying to read, we also search for an existing PW lock.  The
3482          * VFS and page cache already protect us locally, so lots of readers/
3483          * writers can share a single PW lock. */
3484         rc = mode;
3485         if (mode == LCK_PR)
3486                 rc |= LCK_PW;
3487         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3488                              res_id, type, policy, rc, lockh, unref);
3489         if (rc) {
3490                 if (data != NULL)
3491                         osc_set_data_with_check(lockh, data, lflags);
3492                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3493                         ldlm_lock_addref(lockh, LCK_PR);
3494                         ldlm_lock_decref(lockh, LCK_PW);
3495                 }
3496                 RETURN(rc);
3497         }
3498         RETURN(rc);
3499 }
3500
3501 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3502 {
3503         ENTRY;
3504
3505         if (unlikely(mode == LCK_GROUP))
3506                 ldlm_lock_decref_and_cancel(lockh, mode);
3507         else
3508                 ldlm_lock_decref(lockh, mode);
3509
3510         RETURN(0);
3511 }
3512
3513 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3514                       __u32 mode, struct lustre_handle *lockh)
3515 {
3516         ENTRY;
3517         RETURN(osc_cancel_base(lockh, mode));
3518 }
3519
3520 static int osc_cancel_unused(struct obd_export *exp,
3521                              struct lov_stripe_md *lsm,
3522                              ldlm_cancel_flags_t flags,
3523                              void *opaque)
3524 {
3525         struct obd_device *obd = class_exp2obd(exp);
3526         struct ldlm_res_id res_id, *resp = NULL;
3527
3528         if (lsm != NULL) {
3529                 resp = osc_build_res_name(lsm->lsm_object_id,
3530                                           lsm->lsm_object_seq, &res_id);
3531         }
3532
3533         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3534 }
3535
3536 static int osc_statfs_interpret(const struct lu_env *env,
3537                                 struct ptlrpc_request *req,
3538                                 struct osc_async_args *aa, int rc)
3539 {
3540         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3541         struct obd_statfs *msfs;
3542         __u64 used;
3543         ENTRY;
3544
3545         if (rc == -EBADR)
3546                 /* The request has in fact never been sent
3547                  * due to issues at a higher level (LOV).
3548                  * Exit immediately since the caller is
3549                  * aware of the problem and takes care
3550                  * of the clean up */
3551                  RETURN(rc);
3552
3553         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3554             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3555                 GOTO(out, rc = 0);
3556
3557         if (rc != 0)
3558                 GOTO(out, rc);
3559
3560         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3561         if (msfs == NULL) {
3562                 GOTO(out, rc = -EPROTO);
3563         }
3564
3565         /* Reinitialize the RDONLY and DEGRADED flags at the client
3566          * on each statfs, so they don't stay set permanently. */
3567         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3568
3569         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3570                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3571         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3572                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3573
3574         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3575                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3576         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3577                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3578
3579         /* Add a bit of hysteresis so this flag isn't continually flapping,
3580          * and ensure that new files don't get extremely fragmented due to
3581          * only a small amount of available space in the filesystem.
3582          * We want to set the NOSPC flag when there is less than ~0.1% free
3583          * and clear it when there is at least ~0.2% free space, so:
3584          *                   avail < ~0.1% max          max = avail + used
3585          *            1025 * avail < avail + used       used = blocks - free
3586          *            1024 * avail < used
3587          *            1024 * avail < blocks - free                      
3588          *                   avail < ((blocks - free) >> 10)    
3589          *
3590          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3591          * lose that amount of space so in those cases we report no space left
3592          * if their is less than 1 GB left.                             */
3593         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3594         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3595                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3596                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3597         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3598                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3599                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3600
3601         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3602
3603         *aa->aa_oi->oi_osfs = *msfs;
3604 out:
3605         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3606         RETURN(rc);
3607 }
3608
3609 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3610                             __u64 max_age, struct ptlrpc_request_set *rqset)
3611 {
3612         struct ptlrpc_request *req;
3613         struct osc_async_args *aa;
3614         int                    rc;
3615         ENTRY;
3616
3617         /* We could possibly pass max_age in the request (as an absolute
3618          * timestamp or a "seconds.usec ago") so the target can avoid doing
3619          * extra calls into the filesystem if that isn't necessary (e.g.
3620          * during mount that would help a bit).  Having relative timestamps
3621          * is not so great if request processing is slow, while absolute
3622          * timestamps are not ideal because they need time synchronization. */
3623         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3624         if (req == NULL)
3625                 RETURN(-ENOMEM);
3626
3627         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3628         if (rc) {
3629                 ptlrpc_request_free(req);
3630                 RETURN(rc);
3631         }
3632         ptlrpc_request_set_replen(req);
3633         req->rq_request_portal = OST_CREATE_PORTAL;
3634         ptlrpc_at_set_req_timeout(req);
3635
3636         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3637                 /* procfs requests not want stat in wait for avoid deadlock */
3638                 req->rq_no_resend = 1;
3639                 req->rq_no_delay = 1;
3640         }
3641
3642         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3643         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3644         aa = ptlrpc_req_async_args(req);
3645         aa->aa_oi = oinfo;
3646
3647         ptlrpc_set_add_req(rqset, req);
3648         RETURN(0);
3649 }
3650
3651 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3652                       __u64 max_age, __u32 flags)
3653 {
3654         struct obd_statfs     *msfs;
3655         struct ptlrpc_request *req;
3656         struct obd_import     *imp = NULL;
3657         int rc;
3658         ENTRY;
3659
3660         /*Since the request might also come from lprocfs, so we need
3661          *sync this with client_disconnect_export Bug15684*/
3662         cfs_down_read(&obd->u.cli.cl_sem);
3663         if (obd->u.cli.cl_import)
3664                 imp = class_import_get(obd->u.cli.cl_import);
3665         cfs_up_read(&obd->u.cli.cl_sem);
3666         if (!imp)
3667                 RETURN(-ENODEV);
3668
3669         /* We could possibly pass max_age in the request (as an absolute
3670          * timestamp or a "seconds.usec ago") so the target can avoid doing
3671          * extra calls into the filesystem if that isn't necessary (e.g.
3672          * during mount that would help a bit).  Having relative timestamps
3673          * is not so great if request processing is slow, while absolute
3674          * timestamps are not ideal because they need time synchronization. */
3675         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3676
3677         class_import_put(imp);
3678
3679         if (req == NULL)
3680                 RETURN(-ENOMEM);
3681
3682         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3683         if (rc) {
3684                 ptlrpc_request_free(req);
3685                 RETURN(rc);
3686         }
3687         ptlrpc_request_set_replen(req);
3688         req->rq_request_portal = OST_CREATE_PORTAL;
3689         ptlrpc_at_set_req_timeout(req);
3690
3691         if (flags & OBD_STATFS_NODELAY) {
3692                 /* procfs requests not want stat in wait for avoid deadlock */
3693                 req->rq_no_resend = 1;
3694                 req->rq_no_delay = 1;
3695         }
3696
3697         rc = ptlrpc_queue_wait(req);
3698         if (rc)
3699                 GOTO(out, rc);
3700
3701         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3702         if (msfs == NULL) {
3703                 GOTO(out, rc = -EPROTO);
3704         }
3705
3706         *osfs = *msfs;
3707
3708         EXIT;
3709  out:
3710         ptlrpc_req_finished(req);
3711         return rc;
3712 }
3713
3714 /* Retrieve object striping information.
3715  *
3716  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3717  * the maximum number of OST indices which will fit in the user buffer.
3718  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3719  */
3720 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3721 {
3722         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3723         struct lov_user_md_v3 lum, *lumk;
3724         struct lov_user_ost_data_v1 *lmm_objects;
3725         int rc = 0, lum_size;
3726         ENTRY;
3727
3728         if (!lsm)
3729                 RETURN(-ENODATA);
3730
3731         /* we only need the header part from user space to get lmm_magic and
3732          * lmm_stripe_count, (the header part is common to v1 and v3) */
3733         lum_size = sizeof(struct lov_user_md_v1);
3734         if (cfs_copy_from_user(&lum, lump, lum_size))
3735                 RETURN(-EFAULT);
3736
3737         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3738             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3739                 RETURN(-EINVAL);
3740
3741         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3742         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3743         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3744         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3745
3746         /* we can use lov_mds_md_size() to compute lum_size
3747          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3748         if (lum.lmm_stripe_count > 0) {
3749                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3750                 OBD_ALLOC(lumk, lum_size);
3751                 if (!lumk)
3752                         RETURN(-ENOMEM);
3753
3754                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3755                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3756                 else
3757                         lmm_objects = &(lumk->lmm_objects[0]);
3758                 lmm_objects->l_object_id = lsm->lsm_object_id;
3759         } else {
3760                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3761                 lumk = &lum;
3762         }
3763
3764         lumk->lmm_object_id = lsm->lsm_object_id;
3765         lumk->lmm_object_seq = lsm->lsm_object_seq;
3766         lumk->lmm_stripe_count = 1;
3767
3768         if (cfs_copy_to_user(lump, lumk, lum_size))
3769                 rc = -EFAULT;
3770
3771         if (lumk != &lum)
3772                 OBD_FREE(lumk, lum_size);
3773
3774         RETURN(rc);
3775 }
3776
3777
3778 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3779                          void *karg, void *uarg)
3780 {
3781         struct obd_device *obd = exp->exp_obd;
3782         struct obd_ioctl_data *data = karg;
3783         int err = 0;
3784         ENTRY;
3785
3786         if (!cfs_try_module_get(THIS_MODULE)) {
3787                 CERROR("Can't get module. Is it alive?");
3788                 return -EINVAL;
3789         }
3790         switch (cmd) {
3791         case OBD_IOC_LOV_GET_CONFIG: {
3792                 char *buf;
3793                 struct lov_desc *desc;
3794                 struct obd_uuid uuid;
3795
3796                 buf = NULL;
3797                 len = 0;
3798                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3799                         GOTO(out, err = -EINVAL);
3800
3801                 data = (struct obd_ioctl_data *)buf;
3802
3803                 if (sizeof(*desc) > data->ioc_inllen1) {
3804                         obd_ioctl_freedata(buf, len);
3805                         GOTO(out, err = -EINVAL);
3806                 }
3807
3808                 if (data->ioc_inllen2 < sizeof(uuid)) {
3809                         obd_ioctl_freedata(buf, len);
3810                         GOTO(out, err = -EINVAL);
3811                 }
3812
3813                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3814                 desc->ld_tgt_count = 1;
3815                 desc->ld_active_tgt_count = 1;
3816                 desc->ld_default_stripe_count = 1;
3817                 desc->ld_default_stripe_size = 0;
3818                 desc->ld_default_stripe_offset = 0;
3819                 desc->ld_pattern = 0;
3820                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3821
3822                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3823
3824                 err = cfs_copy_to_user((void *)uarg, buf, len);
3825                 if (err)
3826                         err = -EFAULT;
3827                 obd_ioctl_freedata(buf, len);
3828                 GOTO(out, err);
3829         }
3830         case LL_IOC_LOV_SETSTRIPE:
3831                 err = obd_alloc_memmd(exp, karg);
3832                 if (err > 0)
3833                         err = 0;
3834                 GOTO(out, err);
3835         case LL_IOC_LOV_GETSTRIPE:
3836                 err = osc_getstripe(karg, uarg);
3837                 GOTO(out, err);
3838         case OBD_IOC_CLIENT_RECOVER:
3839                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3840                                             data->ioc_inlbuf1);
3841                 if (err > 0)
3842                         err = 0;
3843                 GOTO(out, err);
3844         case IOC_OSC_SET_ACTIVE:
3845                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3846                                                data->ioc_offset);
3847                 GOTO(out, err);
3848         case OBD_IOC_POLL_QUOTACHECK:
3849                 err = lquota_poll_check(quota_interface, exp,
3850                                         (struct if_quotacheck *)karg);
3851                 GOTO(out, err);
3852         case OBD_IOC_PING_TARGET:
3853                 err = ptlrpc_obd_ping(obd);
3854                 GOTO(out, err);
3855         default:
3856                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3857                        cmd, cfs_curproc_comm());
3858                 GOTO(out, err = -ENOTTY);
3859         }
3860 out:
3861         cfs_module_put(THIS_MODULE);
3862         return err;
3863 }
3864
3865 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3866                         void *key, __u32 *vallen, void *val,
3867                         struct lov_stripe_md *lsm)
3868 {
3869         ENTRY;
3870         if (!vallen || !val)
3871                 RETURN(-EFAULT);
3872
3873         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3874                 __u32 *stripe = val;
3875                 *vallen = sizeof(*stripe);
3876                 *stripe = 0;
3877                 RETURN(0);
3878         } else if (KEY_IS(KEY_LAST_ID)) {
3879                 struct ptlrpc_request *req;
3880                 obd_id                *reply;
3881                 char                  *tmp;
3882                 int                    rc;
3883
3884                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3885                                            &RQF_OST_GET_INFO_LAST_ID);
3886                 if (req == NULL)
3887                         RETURN(-ENOMEM);
3888
3889                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3890                                      RCL_CLIENT, keylen);
3891                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3892                 if (rc) {
3893                         ptlrpc_request_free(req);
3894                         RETURN(rc);
3895                 }
3896
3897                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3898                 memcpy(tmp, key, keylen);
3899
3900                 req->rq_no_delay = req->rq_no_resend = 1;
3901                 ptlrpc_request_set_replen(req);
3902                 rc = ptlrpc_queue_wait(req);
3903                 if (rc)
3904                         GOTO(out, rc);
3905
3906                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3907                 if (reply == NULL)
3908                         GOTO(out, rc = -EPROTO);
3909
3910                 *((obd_id *)val) = *reply;
3911         out:
3912                 ptlrpc_req_finished(req);
3913                 RETURN(rc);
3914         } else if (KEY_IS(KEY_FIEMAP)) {
3915                 struct ptlrpc_request *req;
3916                 struct ll_user_fiemap *reply;
3917                 char *tmp;
3918                 int rc;
3919
3920                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3921                                            &RQF_OST_GET_INFO_FIEMAP);
3922                 if (req == NULL)
3923                         RETURN(-ENOMEM);
3924
3925                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3926                                      RCL_CLIENT, keylen);
3927                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3928                                      RCL_CLIENT, *vallen);
3929                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3930                                      RCL_SERVER, *vallen);
3931
3932                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3933                 if (rc) {
3934                         ptlrpc_request_free(req);
3935                         RETURN(rc);
3936                 }
3937
3938                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3939                 memcpy(tmp, key, keylen);
3940                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3941                 memcpy(tmp, val, *vallen);
3942
3943                 ptlrpc_request_set_replen(req);
3944                 rc = ptlrpc_queue_wait(req);
3945                 if (rc)
3946                         GOTO(out1, rc);
3947
3948                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3949                 if (reply == NULL)
3950                         GOTO(out1, rc = -EPROTO);
3951
3952                 memcpy(val, reply, *vallen);
3953         out1:
3954                 ptlrpc_req_finished(req);
3955
3956                 RETURN(rc);
3957         }
3958
3959         RETURN(-EINVAL);
3960 }
3961
3962 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3963 {
3964         struct llog_ctxt *ctxt;
3965         int rc = 0;
3966         ENTRY;
3967
3968         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3969         if (ctxt) {
3970                 rc = llog_initiator_connect(ctxt);
3971                 llog_ctxt_put(ctxt);
3972         } else {
3973                 /* XXX return an error? skip setting below flags? */
3974         }
3975
3976         cfs_spin_lock(&imp->imp_lock);
3977         imp->imp_server_timeout = 1;
3978         imp->imp_pingable = 1;
3979         cfs_spin_unlock(&imp->imp_lock);
3980         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3981
3982         RETURN(rc);
3983 }
3984
3985 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3986                                           struct ptlrpc_request *req,
3987                                           void *aa, int rc)
3988 {
3989         ENTRY;
3990         if (rc != 0)
3991                 RETURN(rc);
3992
3993         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3994 }
3995
3996 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3997                               void *key, obd_count vallen, void *val,
3998                               struct ptlrpc_request_set *set)
3999 {
4000         struct ptlrpc_request *req;
4001         struct obd_device     *obd = exp->exp_obd;
4002         struct obd_import     *imp = class_exp2cliimp(exp);
4003         char                  *tmp;
4004         int                    rc;
4005         ENTRY;
4006
4007         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4008
4009         if (KEY_IS(KEY_NEXT_ID)) {
4010                 obd_id new_val;
4011                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4012
4013                 if (vallen != sizeof(obd_id))
4014                         RETURN(-ERANGE);
4015                 if (val == NULL)
4016                         RETURN(-EINVAL);
4017
4018                 if (vallen != sizeof(obd_id))
4019                         RETURN(-EINVAL);
4020
4021                 /* avoid race between allocate new object and set next id
4022                  * from ll_sync thread */
4023                 cfs_spin_lock(&oscc->oscc_lock);
4024                 new_val = *((obd_id*)val) + 1;
4025                 if (new_val > oscc->oscc_next_id)
4026                         oscc->oscc_next_id = new_val;
4027                 cfs_spin_unlock(&oscc->oscc_lock);
4028                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4029                        exp->exp_obd->obd_name,
4030                        obd->u.cli.cl_oscc.oscc_next_id);
4031
4032                 RETURN(0);
4033         }
4034
4035         if (KEY_IS(KEY_CHECKSUM)) {
4036                 if (vallen != sizeof(int))
4037                         RETURN(-EINVAL);
4038                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4039                 RETURN(0);
4040         }
4041
4042         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4043                 sptlrpc_conf_client_adapt(obd);
4044                 RETURN(0);
4045         }
4046
4047         if (KEY_IS(KEY_FLUSH_CTX)) {
4048                 sptlrpc_import_flush_my_ctx(imp);
4049                 RETURN(0);
4050         }
4051
4052         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4053                 RETURN(-EINVAL);
4054
4055         /* We pass all other commands directly to OST. Since nobody calls osc
4056            methods directly and everybody is supposed to go through LOV, we
4057            assume lov checked invalid values for us.
4058            The only recognised values so far are evict_by_nid and mds_conn.
4059            Even if something bad goes through, we'd get a -EINVAL from OST
4060            anyway. */
4061
4062         if (KEY_IS(KEY_GRANT_SHRINK))
4063                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4064         else
4065                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4066
4067         if (req == NULL)
4068                 RETURN(-ENOMEM);
4069
4070         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4071                              RCL_CLIENT, keylen);
4072         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4073                              RCL_CLIENT, vallen);
4074         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4075         if (rc) {
4076                 ptlrpc_request_free(req);
4077                 RETURN(rc);
4078         }
4079
4080         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4081         memcpy(tmp, key, keylen);
4082         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4083         memcpy(tmp, val, vallen);
4084
4085         if (KEY_IS(KEY_MDS_CONN)) {
4086                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4087
4088                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4089                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4090                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4091                 req->rq_no_delay = req->rq_no_resend = 1;
4092                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4093         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4094                 struct osc_grant_args *aa;
4095                 struct obdo *oa;
4096
4097                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4098                 aa = ptlrpc_req_async_args(req);
4099                 OBDO_ALLOC(oa);
4100                 if (!oa) {
4101                         ptlrpc_req_finished(req);
4102                         RETURN(-ENOMEM);
4103                 }
4104                 *oa = ((struct ost_body *)val)->oa;
4105                 aa->aa_oa = oa;
4106                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4107         }
4108
4109         ptlrpc_request_set_replen(req);
4110         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4111                 LASSERT(set != NULL);
4112                 ptlrpc_set_add_req(set, req);
4113                 ptlrpc_check_set(NULL, set);
4114         } else
4115                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4116
4117         RETURN(0);
4118 }
4119
4120
4121 static struct llog_operations osc_size_repl_logops = {
4122         lop_cancel: llog_obd_repl_cancel
4123 };
4124
4125 static struct llog_operations osc_mds_ost_orig_logops;
4126
4127 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4128                            struct obd_device *tgt, struct llog_catid *catid)
4129 {
4130         int rc;
4131         ENTRY;
4132
4133         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4134                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4135         if (rc) {
4136                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4137                 GOTO(out, rc);
4138         }
4139
4140         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4141                         NULL, &osc_size_repl_logops);
4142         if (rc) {
4143                 struct llog_ctxt *ctxt =
4144                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4145                 if (ctxt)
4146                         llog_cleanup(ctxt);
4147                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4148         }
4149         GOTO(out, rc);
4150 out:
4151         if (rc) {
4152                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4153                        obd->obd_name, tgt->obd_name, catid, rc);
4154                 CERROR("logid "LPX64":0x%x\n",
4155                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4156         }
4157         return rc;
4158 }
4159
4160 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4161                          struct obd_device *disk_obd, int *index)
4162 {
4163         struct llog_catid catid;
4164         static char name[32] = CATLIST;
4165         int rc;
4166         ENTRY;
4167
4168         LASSERT(olg == &obd->obd_olg);
4169
4170         cfs_mutex_down(&olg->olg_cat_processing);
4171         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4172         if (rc) {
4173                 CERROR("rc: %d\n", rc);
4174                 GOTO(out, rc);
4175         }
4176
4177         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4178                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4179                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4180
4181         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4182         if (rc) {
4183                 CERROR("rc: %d\n", rc);
4184                 GOTO(out, rc);
4185         }
4186
4187         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4188         if (rc) {
4189                 CERROR("rc: %d\n", rc);
4190                 GOTO(out, rc);
4191         }
4192
4193  out:
4194         cfs_mutex_up(&olg->olg_cat_processing);
4195
4196         return rc;
4197 }
4198
4199 static int osc_llog_finish(struct obd_device *obd, int count)
4200 {
4201         struct llog_ctxt *ctxt;
4202         int rc = 0, rc2 = 0;
4203         ENTRY;
4204
4205         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4206         if (ctxt)
4207                 rc = llog_cleanup(ctxt);
4208
4209         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4210         if (ctxt)
4211                 rc2 = llog_cleanup(ctxt);
4212         if (!rc)
4213                 rc = rc2;
4214
4215         RETURN(rc);
4216 }
4217
4218 static int osc_reconnect(const struct lu_env *env,
4219                          struct obd_export *exp, struct obd_device *obd,
4220                          struct obd_uuid *cluuid,
4221                          struct obd_connect_data *data,
4222                          void *localdata)
4223 {
4224         struct client_obd *cli = &obd->u.cli;
4225
4226         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4227                 long lost_grant;
4228
4229                 client_obd_list_lock(&cli->cl_loi_list_lock);
4230                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4231                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4232                 lost_grant = cli->cl_lost_grant;
4233                 cli->cl_lost_grant = 0;
4234                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4235
4236                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4237                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4238                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4239                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4240                        " ocd_grant: %d\n", data->ocd_connect_flags,
4241                        data->ocd_version, data->ocd_grant);
4242         }
4243
4244         RETURN(0);
4245 }
4246
4247 static int osc_disconnect(struct obd_export *exp)
4248 {
4249         struct obd_device *obd = class_exp2obd(exp);
4250         struct llog_ctxt  *ctxt;
4251         int rc;
4252
4253         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4254         if (ctxt) {
4255                 if (obd->u.cli.cl_conn_count == 1) {
4256                         /* Flush any remaining cancel messages out to the
4257                          * target */
4258                         llog_sync(ctxt, exp);
4259                 }
4260                 llog_ctxt_put(ctxt);
4261         } else {
4262                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4263                        obd);
4264         }
4265
4266         rc = client_disconnect_export(exp);
4267         /**
4268          * Initially we put del_shrink_grant before disconnect_export, but it
4269          * causes the following problem if setup (connect) and cleanup
4270          * (disconnect) are tangled together.
4271          *      connect p1                     disconnect p2
4272          *   ptlrpc_connect_import
4273          *     ...............               class_manual_cleanup
4274          *                                     osc_disconnect
4275          *                                     del_shrink_grant
4276          *   ptlrpc_connect_interrupt
4277          *     init_grant_shrink
4278          *   add this client to shrink list
4279          *                                      cleanup_osc
4280          * Bang! pinger trigger the shrink.
4281          * So the osc should be disconnected from the shrink list, after we
4282          * are sure the import has been destroyed. BUG18662
4283          */
4284         if (obd->u.cli.cl_import == NULL)
4285                 osc_del_shrink_grant(&obd->u.cli);
4286         return rc;
4287 }
4288
4289 static int osc_import_event(struct obd_device *obd,
4290                             struct obd_import *imp,
4291                             enum obd_import_event event)
4292 {
4293         struct client_obd *cli;
4294         int rc = 0;
4295
4296         ENTRY;
4297         LASSERT(imp->imp_obd == obd);
4298
4299         switch (event) {
4300         case IMP_EVENT_DISCON: {
4301                 /* Only do this on the MDS OSC's */
4302                 if (imp->imp_server_timeout) {
4303                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4304
4305                         cfs_spin_lock(&oscc->oscc_lock);
4306                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4307                         cfs_spin_unlock(&oscc->oscc_lock);
4308                 }
4309                 cli = &obd->u.cli;
4310                 client_obd_list_lock(&cli->cl_loi_list_lock);
4311                 cli->cl_avail_grant = 0;
4312                 cli->cl_lost_grant = 0;
4313                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4314                 break;
4315         }
4316         case IMP_EVENT_INACTIVE: {
4317                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4318                 break;
4319         }
4320         case IMP_EVENT_INVALIDATE: {
4321                 struct ldlm_namespace *ns = obd->obd_namespace;
4322                 struct lu_env         *env;
4323                 int                    refcheck;
4324
4325                 env = cl_env_get(&refcheck);
4326                 if (!IS_ERR(env)) {
4327                         /* Reset grants */
4328                         cli = &obd->u.cli;
4329                         client_obd_list_lock(&cli->cl_loi_list_lock);
4330                         /* all pages go to failing rpcs due to the invalid
4331                          * import */
4332                         osc_check_rpcs(env, cli);
4333                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4334
4335                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4336                         cl_env_put(env, &refcheck);
4337                 } else
4338                         rc = PTR_ERR(env);
4339                 break;
4340         }
4341         case IMP_EVENT_ACTIVE: {
4342                 /* Only do this on the MDS OSC's */
4343                 if (imp->imp_server_timeout) {
4344                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4345
4346                         cfs_spin_lock(&oscc->oscc_lock);
4347                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4348                         cfs_spin_unlock(&oscc->oscc_lock);
4349                 }
4350                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4351                 break;
4352         }
4353         case IMP_EVENT_OCD: {
4354                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4355
4356                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4357                         osc_init_grant(&obd->u.cli, ocd);
4358
4359                 /* See bug 7198 */
4360                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4361                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4362
4363                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4364                 break;
4365         }
4366         default:
4367                 CERROR("Unknown import event %d\n", event);
4368                 LBUG();
4369         }
4370         RETURN(rc);
4371 }
4372
4373 /**
4374  * Determine whether the lock can be canceled before replaying the lock
4375  * during recovery, see bug16774 for detailed information.
4376  *
4377  * \retval zero the lock can't be canceled
4378  * \retval other ok to cancel
4379  */
4380 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4381 {
4382         check_res_locked(lock->l_resource);
4383
4384         /*
4385          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4386          *
4387          * XXX as a future improvement, we can also cancel unused write lock
4388          * if it doesn't have dirty data and active mmaps.
4389          */
4390         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4391             (lock->l_granted_mode == LCK_PR ||
4392              lock->l_granted_mode == LCK_CR) &&
4393             (osc_dlm_lock_pageref(lock) == 0))
4394                 RETURN(1);
4395
4396         RETURN(0);
4397 }
4398
4399 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4400 {
4401         int rc;
4402         ENTRY;
4403
4404         ENTRY;
4405         rc = ptlrpcd_addref();
4406         if (rc)
4407                 RETURN(rc);
4408
4409         rc = client_obd_setup(obd, lcfg);
4410         if (rc) {
4411                 ptlrpcd_decref();
4412         } else {
4413                 struct lprocfs_static_vars lvars = { 0 };
4414                 struct client_obd *cli = &obd->u.cli;
4415
4416                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4417                 lprocfs_osc_init_vars(&lvars);
4418                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4419                         lproc_osc_attach_seqstat(obd);
4420                         sptlrpc_lprocfs_cliobd_attach(obd);
4421                         ptlrpc_lprocfs_register_obd(obd);
4422                 }
4423
4424                 oscc_init(obd);
4425                 /* We need to allocate a few requests more, because
4426                    brw_interpret tries to create new requests before freeing
4427                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4428                    reserved, but I afraid that might be too much wasted RAM
4429                    in fact, so 2 is just my guess and still should work. */
4430                 cli->cl_import->imp_rq_pool =
4431                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4432                                             OST_MAXREQSIZE,
4433                                             ptlrpc_add_rqs_to_pool);
4434
4435                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4436                 cfs_sema_init(&cli->cl_grant_sem, 1);
4437
4438                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4439         }
4440
4441         RETURN(rc);
4442 }
4443
4444 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4445 {
4446         int rc = 0;
4447         ENTRY;
4448
4449         switch (stage) {
4450         case OBD_CLEANUP_EARLY: {
4451                 struct obd_import *imp;
4452                 imp = obd->u.cli.cl_import;
4453                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4454                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4455                 ptlrpc_deactivate_import(imp);
4456                 cfs_spin_lock(&imp->imp_lock);
4457                 imp->imp_pingable = 0;
4458                 cfs_spin_unlock(&imp->imp_lock);
4459                 break;
4460         }
4461         case OBD_CLEANUP_EXPORTS: {
4462                 /* If we set up but never connected, the
4463                    client import will not have been cleaned. */
4464                 if (obd->u.cli.cl_import) {
4465                         struct obd_import *imp;
4466                         cfs_down_write(&obd->u.cli.cl_sem);
4467                         imp = obd->u.cli.cl_import;
4468                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4469                                obd->obd_name);
4470                         ptlrpc_invalidate_import(imp);
4471                         if (imp->imp_rq_pool) {
4472                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4473                                 imp->imp_rq_pool = NULL;
4474                         }
4475                         class_destroy_import(imp);
4476                         cfs_up_write(&obd->u.cli.cl_sem);
4477                         obd->u.cli.cl_import = NULL;
4478                 }
4479                 rc = obd_llog_finish(obd, 0);
4480                 if (rc != 0)
4481                         CERROR("failed to cleanup llogging subsystems\n");
4482                 break;
4483                 }
4484         }
4485         RETURN(rc);
4486 }
4487
4488 int osc_cleanup(struct obd_device *obd)
4489 {
4490         int rc;
4491
4492         ENTRY;
4493         ptlrpc_lprocfs_unregister_obd(obd);
4494         lprocfs_obd_cleanup(obd);
4495
4496         /* free memory of osc quota cache */
4497         lquota_cleanup(quota_interface, obd);
4498
4499         rc = client_obd_cleanup(obd);
4500
4501         ptlrpcd_decref();
4502         RETURN(rc);
4503 }
4504
4505 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4506 {
4507         struct lprocfs_static_vars lvars = { 0 };
4508         int rc = 0;
4509
4510         lprocfs_osc_init_vars(&lvars);
4511
4512         switch (lcfg->lcfg_command) {
4513         default:
4514                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4515                                               lcfg, obd);
4516                 if (rc > 0)
4517                         rc = 0;
4518                 break;
4519         }
4520
4521         return(rc);
4522 }
4523
4524 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4525 {
4526         return osc_process_config_base(obd, buf);
4527 }
4528
4529 struct obd_ops osc_obd_ops = {
4530         .o_owner                = THIS_MODULE,
4531         .o_setup                = osc_setup,
4532         .o_precleanup           = osc_precleanup,
4533         .o_cleanup              = osc_cleanup,
4534         .o_add_conn             = client_import_add_conn,
4535         .o_del_conn             = client_import_del_conn,
4536         .o_connect              = client_connect_import,
4537         .o_reconnect            = osc_reconnect,
4538         .o_disconnect           = osc_disconnect,
4539         .o_statfs               = osc_statfs,
4540         .o_statfs_async         = osc_statfs_async,
4541         .o_packmd               = osc_packmd,
4542         .o_unpackmd             = osc_unpackmd,
4543         .o_precreate            = osc_precreate,
4544         .o_create               = osc_create,
4545         .o_create_async         = osc_create_async,
4546         .o_destroy              = osc_destroy,
4547         .o_getattr              = osc_getattr,
4548         .o_getattr_async        = osc_getattr_async,
4549         .o_setattr              = osc_setattr,
4550         .o_setattr_async        = osc_setattr_async,
4551         .o_brw                  = osc_brw,
4552         .o_punch                = osc_punch,
4553         .o_sync                 = osc_sync,
4554         .o_enqueue              = osc_enqueue,
4555         .o_change_cbdata        = osc_change_cbdata,
4556         .o_find_cbdata          = osc_find_cbdata,
4557         .o_cancel               = osc_cancel,
4558         .o_cancel_unused        = osc_cancel_unused,
4559         .o_iocontrol            = osc_iocontrol,
4560         .o_get_info             = osc_get_info,
4561         .o_set_info_async       = osc_set_info_async,
4562         .o_import_event         = osc_import_event,
4563         .o_llog_init            = osc_llog_init,
4564         .o_llog_finish          = osc_llog_finish,
4565         .o_process_config       = osc_process_config,
4566 };
4567
4568 extern struct lu_kmem_descr osc_caches[];
4569 extern cfs_spinlock_t       osc_ast_guard;
4570 extern cfs_lock_class_key_t osc_ast_guard_class;
4571
4572 int __init osc_init(void)
4573 {
4574         struct lprocfs_static_vars lvars = { 0 };
4575         int rc;
4576         ENTRY;
4577
4578         /* print an address of _any_ initialized kernel symbol from this
4579          * module, to allow debugging with gdb that doesn't support data
4580          * symbols from modules.*/
4581         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4582
4583         rc = lu_kmem_init(osc_caches);
4584
4585         lprocfs_osc_init_vars(&lvars);
4586
4587         cfs_request_module("lquota");
4588         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4589         lquota_init(quota_interface);
4590         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4591
4592         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4593                                  LUSTRE_OSC_NAME, &osc_device_type);
4594         if (rc) {
4595                 if (quota_interface)
4596                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4597                 lu_kmem_fini(osc_caches);
4598                 RETURN(rc);
4599         }
4600
4601         cfs_spin_lock_init(&osc_ast_guard);
4602         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4603
4604         osc_mds_ost_orig_logops = llog_lvfs_ops;
4605         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4606         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4607         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4608         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4609
4610         RETURN(rc);
4611 }
4612
4613 #ifdef __KERNEL__
4614 static void /*__exit*/ osc_exit(void)
4615 {
4616         lu_device_type_fini(&osc_device_type);
4617
4618         lquota_exit(quota_interface);
4619         if (quota_interface)
4620                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4621
4622         class_unregister_type(LUSTRE_OSC_NAME);
4623         lu_kmem_fini(osc_caches);
4624 }
4625
4626 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4627 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4628 MODULE_LICENSE("GPL");
4629
4630 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4631 #endif