Whamcloud - gitweb
Revert "b=22176 Add .sync_fs super block handler"
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568                     struct lov_stripe_md *md, obd_size start, obd_size end,
569                     void *capa)
570 {
571         struct ptlrpc_request *req;
572         struct ost_body       *body;
573         int                    rc;
574         ENTRY;
575
576         if (!oa) {
577                 CDEBUG(D_INFO, "oa NULL\n");
578                 RETURN(-EINVAL);
579         }
580
581         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
582         if (req == NULL)
583                 RETURN(-ENOMEM);
584
585         osc_set_capa_size(req, &RMF_CAPA1, capa);
586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
587         if (rc) {
588                 ptlrpc_request_free(req);
589                 RETURN(rc);
590         }
591
592         /* overload the size and blocks fields in the oa with start/end */
593         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
594         LASSERT(body);
595         lustre_set_wire_obdo(&body->oa, oa);
596         body->oa.o_size = start;
597         body->oa.o_blocks = end;
598         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599         osc_pack_capa(req, body, capa);
600
601         ptlrpc_request_set_replen(req);
602
603         rc = ptlrpc_queue_wait(req);
604         if (rc)
605                 GOTO(out, rc);
606
607         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
608         if (body == NULL)
609                 GOTO(out, rc = -EPROTO);
610
611         lustre_get_wire_obdo(oa, &body->oa);
612
613         EXIT;
614  out:
615         ptlrpc_req_finished(req);
616         return rc;
617 }
618
619 /* Find and cancel locally locks matched by @mode in the resource found by
620  * @objid. Found locks are added into @cancel list. Returns the amount of
621  * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
623                                    cfs_list_t *cancels,
624                                    ldlm_mode_t mode, int lock_flags)
625 {
626         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627         struct ldlm_res_id res_id;
628         struct ldlm_resource *res;
629         int count;
630         ENTRY;
631
632         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         LDLM_RESOURCE_ADDREF(res);
638         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639                                            lock_flags, 0, NULL);
640         LDLM_RESOURCE_DELREF(res);
641         ldlm_resource_putref(res);
642         RETURN(count);
643 }
644
645 static int osc_destroy_interpret(const struct lu_env *env,
646                                  struct ptlrpc_request *req, void *data,
647                                  int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650
651         cfs_atomic_dec(&cli->cl_destroy_in_flight);
652         cfs_waitq_signal(&cli->cl_destroy_waitq);
653         return 0;
654 }
655
656 static int osc_can_send_destroy(struct client_obd *cli)
657 {
658         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659             cli->cl_max_rpcs_in_flight) {
660                 /* The destroy request can be sent */
661                 return 1;
662         }
663         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664             cli->cl_max_rpcs_in_flight) {
665                 /*
666                  * The counter has been modified between the two atomic
667                  * operations.
668                  */
669                 cfs_waitq_signal(&cli->cl_destroy_waitq);
670         }
671         return 0;
672 }
673
674 /* Destroy requests can be async always on the client, and we don't even really
675  * care about the return code since the client cannot do anything at all about
676  * a destroy failure.
677  * When the MDS is unlinking a filename, it saves the file objects into a
678  * recovery llog, and these object records are cancelled when the OST reports
679  * they were destroyed and sync'd to disk (i.e. transaction committed).
680  * If the client dies, or the OST is down when the object should be destroyed,
681  * the records are not cancelled, and when the OST reconnects to the MDS next,
682  * it will retrieve the llog unlink logs and then sends the log cancellation
683  * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
686                        struct obd_export *md_export, void *capa)
687 {
688         struct client_obd     *cli = &exp->exp_obd->u.cli;
689         struct ptlrpc_request *req;
690         struct ost_body       *body;
691         CFS_LIST_HEAD(cancels);
692         int rc, count;
693         ENTRY;
694
695         if (!oa) {
696                 CDEBUG(D_INFO, "oa NULL\n");
697                 RETURN(-EINVAL);
698         }
699
700         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701                                         LDLM_FL_DISCARD_DATA);
702
703         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
704         if (req == NULL) {
705                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
706                 RETURN(-ENOMEM);
707         }
708
709         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
711                                0, &cancels, count);
712         if (rc) {
713                 ptlrpc_request_free(req);
714                 RETURN(rc);
715         }
716
717         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718         ptlrpc_at_set_req_timeout(req);
719
720         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721                 oa->o_lcookie = *oti->oti_logcookies;
722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
723         LASSERT(body);
724         lustre_set_wire_obdo(&body->oa, oa);
725
726         osc_pack_capa(req, body, (struct obd_capa *)capa);
727         ptlrpc_request_set_replen(req);
728
729         /* don't throttle destroy RPCs for the MDT */
730         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731                 req->rq_interpret_reply = osc_destroy_interpret;
732                 if (!osc_can_send_destroy(cli)) {
733                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
734                                                           NULL);
735
736                         /*
737                          * Wait until the number of on-going destroy RPCs drops
738                          * under max_rpc_in_flight
739                          */
740                         l_wait_event_exclusive(cli->cl_destroy_waitq,
741                                                osc_can_send_destroy(cli), &lwi);
742                 }
743         }
744
745         /* Do not wait for response */
746         ptlrpcd_add_req(req, PSCOPE_OTHER);
747         RETURN(0);
748 }
749
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
751                                 long writing_bytes)
752 {
753         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
754
755         LASSERT(!(oa->o_valid & bits));
756
757         oa->o_valid |= bits;
758         client_obd_list_lock(&cli->cl_loi_list_lock);
759         oa->o_dirty = cli->cl_dirty;
760         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
763                 oa->o_undirty = 0;
764         } else if (cfs_atomic_read(&obd_dirty_pages) -
765                    cfs_atomic_read(&obd_dirty_transit_pages) >
766                    obd_max_dirty_pages + 1){
767                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768                  * not covered by a lock thus they may safely race and trip
769                  * this CERROR() unless we add in a small fudge factor (+1). */
770                 CERROR("dirty %d - %d > system dirty_max %d\n",
771                        cfs_atomic_read(&obd_dirty_pages),
772                        cfs_atomic_read(&obd_dirty_transit_pages),
773                        obd_max_dirty_pages);
774                 oa->o_undirty = 0;
775         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776                 CERROR("dirty %lu - dirty_max %lu too big???\n",
777                        cli->cl_dirty, cli->cl_dirty_max);
778                 oa->o_undirty = 0;
779         } else {
780                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781                                 (cli->cl_max_rpcs_in_flight + 1);
782                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
783         }
784         oa->o_grant = cli->cl_avail_grant;
785         oa->o_dropped = cli->cl_lost_grant;
786         cli->cl_lost_grant = 0;
787         client_obd_list_unlock(&cli->cl_loi_list_lock);
788         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
790
791 }
792
793 static void osc_update_next_shrink(struct client_obd *cli)
794 {
795         cli->cl_next_shrink_grant =
796                 cfs_time_shift(cli->cl_grant_shrink_interval);
797         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798                cli->cl_next_shrink_grant);
799 }
800
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803                                     struct brw_page *pga)
804 {
805         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807         cfs_atomic_inc(&obd_dirty_pages);
808         cli->cl_dirty += CFS_PAGE_SIZE;
809         cli->cl_avail_grant -= CFS_PAGE_SIZE;
810         pga->flag |= OBD_BRW_FROM_GRANT;
811         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812                CFS_PAGE_SIZE, pga, pga->pg);
813         LASSERT(cli->cl_avail_grant >= 0);
814         osc_update_next_shrink(cli);
815 }
816
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818  * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga, int sent)
821 {
822         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
823         ENTRY;
824
825         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827                 EXIT;
828                 return;
829         }
830
831         pga->flag &= ~OBD_BRW_FROM_GRANT;
832         cfs_atomic_dec(&obd_dirty_pages);
833         cli->cl_dirty -= CFS_PAGE_SIZE;
834         if (pga->flag & OBD_BRW_NOCACHE) {
835                 pga->flag &= ~OBD_BRW_NOCACHE;
836                 cfs_atomic_dec(&obd_dirty_transit_pages);
837                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
838         }
839         if (!sent) {
840                 cli->cl_lost_grant += CFS_PAGE_SIZE;
841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844                 /* For short writes we shouldn't count parts of pages that
845                  * span a whole block on the OST side, or our accounting goes
846                  * wrong.  Should match the code in filter_grant_check. */
847                 int offset = pga->off & ~CFS_PAGE_MASK;
848                 int count = pga->count + (offset & (blocksize - 1));
849                 int end = (offset + pga->count) & (blocksize - 1);
850                 if (end)
851                         count += blocksize - end;
852
853                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856                        cli->cl_avail_grant, cli->cl_dirty);
857         }
858
859         EXIT;
860 }
861
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
863 {
864         return cli->cl_r_in_flight + cli->cl_w_in_flight;
865 }
866
867 /* caller must hold loi_list_lock */
868 void osc_wake_cache_waiters(struct client_obd *cli)
869 {
870         cfs_list_t *l, *tmp;
871         struct osc_cache_waiter *ocw;
872
873         ENTRY;
874         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
875                 /* if we can't dirty more, we must wait until some is written */
876                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
877                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
878                     obd_max_dirty_pages)) {
879                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
880                                "osc max %ld, sys max %d\n", cli->cl_dirty,
881                                cli->cl_dirty_max, obd_max_dirty_pages);
882                         return;
883                 }
884
885                 /* if still dirty cache but no grant wait for pending RPCs that
886                  * may yet return us some grant before doing sync writes */
887                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
888                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
889                                cli->cl_w_in_flight);
890                         return;
891                 }
892
893                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
894                 cfs_list_del_init(&ocw->ocw_entry);
895                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
896                         /* no more RPCs in flight to return grant, do sync IO */
897                         ocw->ocw_rc = -EDQUOT;
898                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
899                 } else {
900                         osc_consume_write_grant(cli,
901                                                 &ocw->ocw_oap->oap_brw_page);
902                 }
903
904                 cfs_waitq_signal(&ocw->ocw_waitq);
905         }
906
907         EXIT;
908 }
909
910 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
911 {
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         cli->cl_avail_grant += grant;
914         client_obd_list_unlock(&cli->cl_loi_list_lock);
915 }
916
917 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
918 {
919         if (body->oa.o_valid & OBD_MD_FLGRANT) {
920                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
921                 __osc_update_grant(cli, body->oa.o_grant);
922         }
923 }
924
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926                               void *key, obd_count vallen, void *val,
927                               struct ptlrpc_request_set *set);
928
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930                                       struct ptlrpc_request *req,
931                                       void *aa, int rc)
932 {
933         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935         struct ost_body *body;
936
937         if (rc != 0) {
938                 __osc_update_grant(cli, oa->o_grant);
939                 GOTO(out, rc);
940         }
941
942         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
943         LASSERT(body);
944         osc_update_grant(cli, body);
945 out:
946         OBDO_FREE(oa);
947         return rc;
948 }
949
950 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
951 {
952         client_obd_list_lock(&cli->cl_loi_list_lock);
953         oa->o_grant = cli->cl_avail_grant / 4;
954         cli->cl_avail_grant -= oa->o_grant;
955         client_obd_list_unlock(&cli->cl_loi_list_lock);
956         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
957                 oa->o_valid |= OBD_MD_FLFLAGS;
958                 oa->o_flags = 0;
959         }
960         oa->o_flags |= OBD_FL_SHRINK_GRANT;
961         osc_update_next_shrink(cli);
962 }
963
964 /* Shrink the current grant, either from some large amount to enough for a
965  * full set of in-flight RPCs, or if we have already shrunk to that limit
966  * then to enough for a single RPC.  This avoids keeping more grant than
967  * needed, and avoids shrinking the grant piecemeal. */
968 static int osc_shrink_grant(struct client_obd *cli)
969 {
970         long target = (cli->cl_max_rpcs_in_flight + 1) *
971                       cli->cl_max_pages_per_rpc;
972
973         client_obd_list_lock(&cli->cl_loi_list_lock);
974         if (cli->cl_avail_grant <= target)
975                 target = cli->cl_max_pages_per_rpc;
976         client_obd_list_unlock(&cli->cl_loi_list_lock);
977
978         return osc_shrink_grant_to_target(cli, target);
979 }
980
981 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
982 {
983         int    rc = 0;
984         struct ost_body     *body;
985         ENTRY;
986
987         client_obd_list_lock(&cli->cl_loi_list_lock);
988         /* Don't shrink if we are already above or below the desired limit
989          * We don't want to shrink below a single RPC, as that will negatively
990          * impact block allocation and long-term performance. */
991         if (target < cli->cl_max_pages_per_rpc)
992                 target = cli->cl_max_pages_per_rpc;
993
994         if (target >= cli->cl_avail_grant) {
995                 client_obd_list_unlock(&cli->cl_loi_list_lock);
996                 RETURN(0);
997         }
998         client_obd_list_unlock(&cli->cl_loi_list_lock);
999
1000         OBD_ALLOC_PTR(body);
1001         if (!body)
1002                 RETURN(-ENOMEM);
1003
1004         osc_announce_cached(cli, &body->oa, 0);
1005
1006         client_obd_list_lock(&cli->cl_loi_list_lock);
1007         body->oa.o_grant = cli->cl_avail_grant - target;
1008         cli->cl_avail_grant = target;
1009         client_obd_list_unlock(&cli->cl_loi_list_lock);
1010         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1011                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1012                 body->oa.o_flags = 0;
1013         }
1014         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1015         osc_update_next_shrink(cli);
1016
1017         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1018                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1019                                 sizeof(*body), body, NULL);
1020         if (rc != 0)
1021                 __osc_update_grant(cli, body->oa.o_grant);
1022         OBD_FREE_PTR(body);
1023         RETURN(rc);
1024 }
1025
1026 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1027 static int osc_should_shrink_grant(struct client_obd *client)
1028 {
1029         cfs_time_t time = cfs_time_current();
1030         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1031
1032         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1033              OBD_CONNECT_GRANT_SHRINK) == 0)
1034                 return 0;
1035
1036         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1037                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1038                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1039                         return 1;
1040                 else
1041                         osc_update_next_shrink(client);
1042         }
1043         return 0;
1044 }
1045
1046 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1047 {
1048         struct client_obd *client;
1049
1050         cfs_list_for_each_entry(client, &item->ti_obd_list,
1051                                 cl_grant_shrink_list) {
1052                 if (osc_should_shrink_grant(client))
1053                         osc_shrink_grant(client);
1054         }
1055         return 0;
1056 }
1057
1058 static int osc_add_shrink_grant(struct client_obd *client)
1059 {
1060         int rc;
1061
1062         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1063                                        TIMEOUT_GRANT,
1064                                        osc_grant_shrink_grant_cb, NULL,
1065                                        &client->cl_grant_shrink_list);
1066         if (rc) {
1067                 CERROR("add grant client %s error %d\n",
1068                         client->cl_import->imp_obd->obd_name, rc);
1069                 return rc;
1070         }
1071         CDEBUG(D_CACHE, "add grant client %s \n",
1072                client->cl_import->imp_obd->obd_name);
1073         osc_update_next_shrink(client);
1074         return 0;
1075 }
1076
1077 static int osc_del_shrink_grant(struct client_obd *client)
1078 {
1079         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1080                                          TIMEOUT_GRANT);
1081 }
1082
1083 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1084 {
1085         /*
1086          * ocd_grant is the total grant amount we're expect to hold: if we've
1087          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1088          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1089          *
1090          * race is tolerable here: if we're evicted, but imp_state already
1091          * left EVICTED state, then cl_dirty must be 0 already.
1092          */
1093         client_obd_list_lock(&cli->cl_loi_list_lock);
1094         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1095                 cli->cl_avail_grant = ocd->ocd_grant;
1096         else
1097                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1098
1099         if (cli->cl_avail_grant < 0) {
1100                 CWARN("%s: available grant < 0, the OSS is probably not running"
1101                       " with patch from bug20278 (%ld) \n",
1102                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1103                 /* workaround for 1.6 servers which do not have 
1104                  * the patch from bug20278 */
1105                 cli->cl_avail_grant = ocd->ocd_grant;
1106         }
1107
1108         client_obd_list_unlock(&cli->cl_loi_list_lock);
1109
1110         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1111                cli->cl_import->imp_obd->obd_name,
1112                cli->cl_avail_grant, cli->cl_lost_grant);
1113
1114         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1115             cfs_list_empty(&cli->cl_grant_shrink_list))
1116                 osc_add_shrink_grant(cli);
1117 }
1118
1119 /* We assume that the reason this OSC got a short read is because it read
1120  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1121  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1122  * this stripe never got written at or beyond this stripe offset yet. */
1123 static void handle_short_read(int nob_read, obd_count page_count,
1124                               struct brw_page **pga)
1125 {
1126         char *ptr;
1127         int i = 0;
1128
1129         /* skip bytes read OK */
1130         while (nob_read > 0) {
1131                 LASSERT (page_count > 0);
1132
1133                 if (pga[i]->count > nob_read) {
1134                         /* EOF inside this page */
1135                         ptr = cfs_kmap(pga[i]->pg) +
1136                                 (pga[i]->off & ~CFS_PAGE_MASK);
1137                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1138                         cfs_kunmap(pga[i]->pg);
1139                         page_count--;
1140                         i++;
1141                         break;
1142                 }
1143
1144                 nob_read -= pga[i]->count;
1145                 page_count--;
1146                 i++;
1147         }
1148
1149         /* zero remaining pages */
1150         while (page_count-- > 0) {
1151                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1152                 memset(ptr, 0, pga[i]->count);
1153                 cfs_kunmap(pga[i]->pg);
1154                 i++;
1155         }
1156 }
1157
1158 static int check_write_rcs(struct ptlrpc_request *req,
1159                            int requested_nob, int niocount,
1160                            obd_count page_count, struct brw_page **pga)
1161 {
1162         int     i;
1163         __u32   *remote_rcs;
1164
1165         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1166                                                   sizeof(*remote_rcs) *
1167                                                   niocount);
1168         if (remote_rcs == NULL) {
1169                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1170                 return(-EPROTO);
1171         }
1172
1173         /* return error if any niobuf was in error */
1174         for (i = 0; i < niocount; i++) {
1175                 if (remote_rcs[i] < 0)
1176                         return(remote_rcs[i]);
1177
1178                 if (remote_rcs[i] != 0) {
1179                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1180                                 i, remote_rcs[i], req);
1181                         return(-EPROTO);
1182                 }
1183         }
1184
1185         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1186                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1187                        req->rq_bulk->bd_nob_transferred, requested_nob);
1188                 return(-EPROTO);
1189         }
1190
1191         return (0);
1192 }
1193
1194 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1195 {
1196         if (p1->flag != p2->flag) {
1197                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1198                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1199
1200                 /* warn if we try to combine flags that we don't know to be
1201                  * safe to combine */
1202                 if ((p1->flag & mask) != (p2->flag & mask))
1203                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1204                                "same brw?\n", p1->flag, p2->flag);
1205                 return 0;
1206         }
1207
1208         return (p1->off + p1->count == p2->off);
1209 }
1210
1211 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1212                                    struct brw_page **pga, int opc,
1213                                    cksum_type_t cksum_type)
1214 {
1215         __u32 cksum;
1216         int i = 0;
1217
1218         LASSERT (pg_count > 0);
1219         cksum = init_checksum(cksum_type);
1220         while (nob > 0 && pg_count > 0) {
1221                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1222                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1223                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1224
1225                 /* corrupt the data before we compute the checksum, to
1226                  * simulate an OST->client data error */
1227                 if (i == 0 && opc == OST_READ &&
1228                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1229                         memcpy(ptr + off, "bad1", min(4, nob));
1230                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1231                 cfs_kunmap(pga[i]->pg);
1232                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1233                                off, cksum);
1234
1235                 nob -= pga[i]->count;
1236                 pg_count--;
1237                 i++;
1238         }
1239         /* For sending we only compute the wrong checksum instead
1240          * of corrupting the data so it is still correct on a redo */
1241         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1242                 cksum++;
1243
1244         return cksum;
1245 }
1246
1247 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1248                                 struct lov_stripe_md *lsm, obd_count page_count,
1249                                 struct brw_page **pga,
1250                                 struct ptlrpc_request **reqp,
1251                                 struct obd_capa *ocapa, int reserve,
1252                                 int resend)
1253 {
1254         struct ptlrpc_request   *req;
1255         struct ptlrpc_bulk_desc *desc;
1256         struct ost_body         *body;
1257         struct obd_ioobj        *ioobj;
1258         struct niobuf_remote    *niobuf;
1259         int niocount, i, requested_nob, opc, rc;
1260         struct osc_brw_async_args *aa;
1261         struct req_capsule      *pill;
1262         struct brw_page *pg_prev;
1263
1264         ENTRY;
1265         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1266                 RETURN(-ENOMEM); /* Recoverable */
1267         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1268                 RETURN(-EINVAL); /* Fatal */
1269
1270         if ((cmd & OBD_BRW_WRITE) != 0) {
1271                 opc = OST_WRITE;
1272                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1273                                                 cli->cl_import->imp_rq_pool,
1274                                                 &RQF_OST_BRW_WRITE);
1275         } else {
1276                 opc = OST_READ;
1277                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1278         }
1279         if (req == NULL)
1280                 RETURN(-ENOMEM);
1281
1282         for (niocount = i = 1; i < page_count; i++) {
1283                 if (!can_merge_pages(pga[i - 1], pga[i]))
1284                         niocount++;
1285         }
1286
1287         pill = &req->rq_pill;
1288         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1289                              sizeof(*ioobj));
1290         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1291                              niocount * sizeof(*niobuf));
1292         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1293
1294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1295         if (rc) {
1296                 ptlrpc_request_free(req);
1297                 RETURN(rc);
1298         }
1299         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1300         ptlrpc_at_set_req_timeout(req);
1301
1302         if (opc == OST_WRITE)
1303                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1304                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1305         else
1306                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1307                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1308
1309         if (desc == NULL)
1310                 GOTO(out, rc = -ENOMEM);
1311         /* NB request now owns desc and will free it when it gets freed */
1312
1313         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1314         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1315         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1316         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1317
1318         lustre_set_wire_obdo(&body->oa, oa);
1319
1320         obdo_to_ioobj(oa, ioobj);
1321         ioobj->ioo_bufcnt = niocount;
1322         osc_pack_capa(req, body, ocapa);
1323         LASSERT (page_count > 0);
1324         pg_prev = pga[0];
1325         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1326                 struct brw_page *pg = pga[i];
1327
1328                 LASSERT(pg->count > 0);
1329                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1330                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1331                          pg->off, pg->count);
1332 #ifdef __linux__
1333                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1334                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1335                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1336                          i, page_count,
1337                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1338                          pg_prev->pg, page_private(pg_prev->pg),
1339                          pg_prev->pg->index, pg_prev->off);
1340 #else
1341                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1342                          "i %d p_c %u\n", i, page_count);
1343 #endif
1344                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1345                         (pg->flag & OBD_BRW_SRVLOCK));
1346
1347                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1348                                       pg->count);
1349                 requested_nob += pg->count;
1350
1351                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1352                         niobuf--;
1353                         niobuf->len += pg->count;
1354                 } else {
1355                         niobuf->offset = pg->off;
1356                         niobuf->len    = pg->count;
1357                         niobuf->flags  = pg->flag;
1358                 }
1359                 pg_prev = pg;
1360         }
1361
1362         LASSERTF((void *)(niobuf - niocount) ==
1363                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1364                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1365                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1366
1367         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1368         if (resend) {
1369                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1370                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1371                         body->oa.o_flags = 0;
1372                 }
1373                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1374         }
1375
1376         if (osc_should_shrink_grant(cli))
1377                 osc_shrink_grant_local(cli, &body->oa);
1378
1379         /* size[REQ_REC_OFF] still sizeof (*body) */
1380         if (opc == OST_WRITE) {
1381                 if (unlikely(cli->cl_checksum) &&
1382                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1383                         /* store cl_cksum_type in a local variable since
1384                          * it can be changed via lprocfs */
1385                         cksum_type_t cksum_type = cli->cl_cksum_type;
1386
1387                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1388                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1389                                 body->oa.o_flags = 0;
1390                         }
1391                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1392                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1393                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1394                                                              page_count, pga,
1395                                                              OST_WRITE,
1396                                                              cksum_type);
1397                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1398                                body->oa.o_cksum);
1399                         /* save this in 'oa', too, for later checking */
1400                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1401                         oa->o_flags |= cksum_type_pack(cksum_type);
1402                 } else {
1403                         /* clear out the checksum flag, in case this is a
1404                          * resend but cl_checksum is no longer set. b=11238 */
1405                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1406                 }
1407                 oa->o_cksum = body->oa.o_cksum;
1408                 /* 1 RC per niobuf */
1409                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1410                                      sizeof(__u32) * niocount);
1411         } else {
1412                 if (unlikely(cli->cl_checksum) &&
1413                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1414                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1415                                 body->oa.o_flags = 0;
1416                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1417                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1418                 }
1419         }
1420         ptlrpc_request_set_replen(req);
1421
1422         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1423         aa = ptlrpc_req_async_args(req);
1424         aa->aa_oa = oa;
1425         aa->aa_requested_nob = requested_nob;
1426         aa->aa_nio_count = niocount;
1427         aa->aa_page_count = page_count;
1428         aa->aa_resends = 0;
1429         aa->aa_ppga = pga;
1430         aa->aa_cli = cli;
1431         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1432         if (ocapa && reserve)
1433                 aa->aa_ocapa = capa_get(ocapa);
1434
1435         *reqp = req;
1436         RETURN(0);
1437
1438  out:
1439         ptlrpc_req_finished(req);
1440         RETURN(rc);
1441 }
1442
1443 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1444                                 __u32 client_cksum, __u32 server_cksum, int nob,
1445                                 obd_count page_count, struct brw_page **pga,
1446                                 cksum_type_t client_cksum_type)
1447 {
1448         __u32 new_cksum;
1449         char *msg;
1450         cksum_type_t cksum_type;
1451
1452         if (server_cksum == client_cksum) {
1453                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1454                 return 0;
1455         }
1456
1457         /* If this is mmaped file - it can be changed at any time */
1458         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1459                 return 1;
1460
1461         if (oa->o_valid & OBD_MD_FLFLAGS)
1462                 cksum_type = cksum_type_unpack(oa->o_flags);
1463         else
1464                 cksum_type = OBD_CKSUM_CRC32;
1465
1466         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1467                                       cksum_type);
1468
1469         if (cksum_type != client_cksum_type)
1470                 msg = "the server did not use the checksum type specified in "
1471                       "the original request - likely a protocol problem";
1472         else if (new_cksum == server_cksum)
1473                 msg = "changed on the client after we checksummed it - "
1474                       "likely false positive due to mmap IO (bug 11742)";
1475         else if (new_cksum == client_cksum)
1476                 msg = "changed in transit before arrival at OST";
1477         else
1478                 msg = "changed in transit AND doesn't match the original - "
1479                       "likely false positive due to mmap IO (bug 11742)";
1480
1481         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1482                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1483                            msg, libcfs_nid2str(peer->nid),
1484                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1485                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1486                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1487                            oa->o_id,
1488                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1489                            pga[0]->off,
1490                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1491         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1492                "client csum now %x\n", client_cksum, client_cksum_type,
1493                server_cksum, cksum_type, new_cksum);
1494         return 1;
1495 }
1496
1497 /* Note rc enters this function as number of bytes transferred */
1498 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1499 {
1500         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1501         const lnet_process_id_t *peer =
1502                         &req->rq_import->imp_connection->c_peer;
1503         struct client_obd *cli = aa->aa_cli;
1504         struct ost_body *body;
1505         __u32 client_cksum = 0;
1506         ENTRY;
1507
1508         if (rc < 0 && rc != -EDQUOT) {
1509                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1510                 RETURN(rc);
1511         }
1512
1513         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1514         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1515         if (body == NULL) {
1516                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1517                 RETURN(-EPROTO);
1518         }
1519
1520 #ifdef HAVE_QUOTA_SUPPORT
1521         /* set/clear over quota flag for a uid/gid */
1522         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1523             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1524                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1525
1526                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1527                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1528                        body->oa.o_flags);
1529                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1530                              body->oa.o_flags);
1531         }
1532 #endif
1533
1534         osc_update_grant(cli, body);
1535
1536         if (rc < 0)
1537                 RETURN(rc);
1538
1539         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1540                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1541
1542         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1543                 if (rc > 0) {
1544                         CERROR("Unexpected +ve rc %d\n", rc);
1545                         RETURN(-EPROTO);
1546                 }
1547                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1548
1549                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1550                         RETURN(-EAGAIN);
1551
1552                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1553                     check_write_checksum(&body->oa, peer, client_cksum,
1554                                          body->oa.o_cksum, aa->aa_requested_nob,
1555                                          aa->aa_page_count, aa->aa_ppga,
1556                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1557                         RETURN(-EAGAIN);
1558
1559                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1560                                      aa->aa_page_count, aa->aa_ppga);
1561                 GOTO(out, rc);
1562         }
1563
1564         /* The rest of this function executes only for OST_READs */
1565
1566         /* if unwrap_bulk failed, return -EAGAIN to retry */
1567         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1568         if (rc < 0)
1569                 GOTO(out, rc = -EAGAIN);
1570
1571         if (rc > aa->aa_requested_nob) {
1572                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1573                        aa->aa_requested_nob);
1574                 RETURN(-EPROTO);
1575         }
1576
1577         if (rc != req->rq_bulk->bd_nob_transferred) {
1578                 CERROR ("Unexpected rc %d (%d transferred)\n",
1579                         rc, req->rq_bulk->bd_nob_transferred);
1580                 return (-EPROTO);
1581         }
1582
1583         if (rc < aa->aa_requested_nob)
1584                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1585
1586         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1587                 static int cksum_counter;
1588                 __u32      server_cksum = body->oa.o_cksum;
1589                 char      *via;
1590                 char      *router;
1591                 cksum_type_t cksum_type;
1592
1593                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1594                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1595                 else
1596                         cksum_type = OBD_CKSUM_CRC32;
1597                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1598                                                  aa->aa_ppga, OST_READ,
1599                                                  cksum_type);
1600
1601                 if (peer->nid == req->rq_bulk->bd_sender) {
1602                         via = router = "";
1603                 } else {
1604                         via = " via ";
1605                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1606                 }
1607
1608                 if (server_cksum == ~0 && rc > 0) {
1609                         CERROR("Protocol error: server %s set the 'checksum' "
1610                                "bit, but didn't send a checksum.  Not fatal, "
1611                                "but please notify on http://bugzilla.lustre.org/\n",
1612                                libcfs_nid2str(peer->nid));
1613                 } else if (server_cksum != client_cksum) {
1614                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1615                                            "%s%s%s inode "DFID" object "
1616                                            LPU64"/"LPU64" extent "
1617                                            "["LPU64"-"LPU64"]\n",
1618                                            req->rq_import->imp_obd->obd_name,
1619                                            libcfs_nid2str(peer->nid),
1620                                            via, router,
1621                                            body->oa.o_valid & OBD_MD_FLFID ?
1622                                                 body->oa.o_parent_seq : (__u64)0,
1623                                            body->oa.o_valid & OBD_MD_FLFID ?
1624                                                 body->oa.o_parent_oid : 0,
1625                                            body->oa.o_valid & OBD_MD_FLFID ?
1626                                                 body->oa.o_parent_ver : 0,
1627                                            body->oa.o_id,
1628                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1629                                                 body->oa.o_seq : (__u64)0,
1630                                            aa->aa_ppga[0]->off,
1631                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1632                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1633                                                                         1);
1634                         CERROR("client %x, server %x, cksum_type %x\n",
1635                                client_cksum, server_cksum, cksum_type);
1636                         cksum_counter = 0;
1637                         aa->aa_oa->o_cksum = client_cksum;
1638                         rc = -EAGAIN;
1639                 } else {
1640                         cksum_counter++;
1641                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1642                         rc = 0;
1643                 }
1644         } else if (unlikely(client_cksum)) {
1645                 static int cksum_missed;
1646
1647                 cksum_missed++;
1648                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1649                         CERROR("Checksum %u requested from %s but not sent\n",
1650                                cksum_missed, libcfs_nid2str(peer->nid));
1651         } else {
1652                 rc = 0;
1653         }
1654 out:
1655         if (rc >= 0)
1656                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1657
1658         RETURN(rc);
1659 }
1660
1661 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1662                             struct lov_stripe_md *lsm,
1663                             obd_count page_count, struct brw_page **pga,
1664                             struct obd_capa *ocapa)
1665 {
1666         struct ptlrpc_request *req;
1667         int                    rc;
1668         cfs_waitq_t            waitq;
1669         int                    resends = 0;
1670         struct l_wait_info     lwi;
1671
1672         ENTRY;
1673
1674         cfs_waitq_init(&waitq);
1675
1676 restart_bulk:
1677         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1678                                   page_count, pga, &req, ocapa, 0, resends);
1679         if (rc != 0)
1680                 return (rc);
1681
1682         rc = ptlrpc_queue_wait(req);
1683
1684         if (rc == -ETIMEDOUT && req->rq_resend) {
1685                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1686                 ptlrpc_req_finished(req);
1687                 goto restart_bulk;
1688         }
1689
1690         rc = osc_brw_fini_request(req, rc);
1691
1692         ptlrpc_req_finished(req);
1693         if (osc_recoverable_error(rc)) {
1694                 resends++;
1695                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1696                         CERROR("too many resend retries, returning error\n");
1697                         RETURN(-EIO);
1698                 }
1699
1700                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1701                 l_wait_event(waitq, 0, &lwi);
1702
1703                 goto restart_bulk;
1704         }
1705
1706         RETURN (rc);
1707 }
1708
1709 int osc_brw_redo_request(struct ptlrpc_request *request,
1710                          struct osc_brw_async_args *aa)
1711 {
1712         struct ptlrpc_request *new_req;
1713         struct ptlrpc_request_set *set = request->rq_set;
1714         struct osc_brw_async_args *new_aa;
1715         struct osc_async_page *oap;
1716         int rc = 0;
1717         ENTRY;
1718
1719         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1720                 CERROR("too many resent retries, returning error\n");
1721                 RETURN(-EIO);
1722         }
1723
1724         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1725
1726         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1727                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1728                                   aa->aa_cli, aa->aa_oa,
1729                                   NULL /* lsm unused by osc currently */,
1730                                   aa->aa_page_count, aa->aa_ppga,
1731                                   &new_req, aa->aa_ocapa, 0, 1);
1732         if (rc)
1733                 RETURN(rc);
1734
1735         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1736
1737         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1738                 if (oap->oap_request != NULL) {
1739                         LASSERTF(request == oap->oap_request,
1740                                  "request %p != oap_request %p\n",
1741                                  request, oap->oap_request);
1742                         if (oap->oap_interrupted) {
1743                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1744                                 ptlrpc_req_finished(new_req);
1745                                 RETURN(-EINTR);
1746                         }
1747                 }
1748         }
1749         /* New request takes over pga and oaps from old request.
1750          * Note that copying a list_head doesn't work, need to move it... */
1751         aa->aa_resends++;
1752         new_req->rq_interpret_reply = request->rq_interpret_reply;
1753         new_req->rq_async_args = request->rq_async_args;
1754         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1755
1756         new_aa = ptlrpc_req_async_args(new_req);
1757
1758         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1759         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1760         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1761
1762         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1763                 if (oap->oap_request) {
1764                         ptlrpc_req_finished(oap->oap_request);
1765                         oap->oap_request = ptlrpc_request_addref(new_req);
1766                 }
1767         }
1768
1769         new_aa->aa_ocapa = aa->aa_ocapa;
1770         aa->aa_ocapa = NULL;
1771
1772         /* use ptlrpc_set_add_req is safe because interpret functions work
1773          * in check_set context. only one way exist with access to request
1774          * from different thread got -EINTR - this way protected with
1775          * cl_loi_list_lock */
1776         ptlrpc_set_add_req(set, new_req);
1777
1778         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1779
1780         DEBUG_REQ(D_INFO, new_req, "new request");
1781         RETURN(0);
1782 }
1783
1784 /*
1785  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1786  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1787  * fine for our small page arrays and doesn't require allocation.  its an
1788  * insertion sort that swaps elements that are strides apart, shrinking the
1789  * stride down until its '1' and the array is sorted.
1790  */
1791 static void sort_brw_pages(struct brw_page **array, int num)
1792 {
1793         int stride, i, j;
1794         struct brw_page *tmp;
1795
1796         if (num == 1)
1797                 return;
1798         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1799                 ;
1800
1801         do {
1802                 stride /= 3;
1803                 for (i = stride ; i < num ; i++) {
1804                         tmp = array[i];
1805                         j = i;
1806                         while (j >= stride && array[j - stride]->off > tmp->off) {
1807                                 array[j] = array[j - stride];
1808                                 j -= stride;
1809                         }
1810                         array[j] = tmp;
1811                 }
1812         } while (stride > 1);
1813 }
1814
1815 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1816 {
1817         int count = 1;
1818         int offset;
1819         int i = 0;
1820
1821         LASSERT (pages > 0);
1822         offset = pg[i]->off & ~CFS_PAGE_MASK;
1823
1824         for (;;) {
1825                 pages--;
1826                 if (pages == 0)         /* that's all */
1827                         return count;
1828
1829                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1830                         return count;   /* doesn't end on page boundary */
1831
1832                 i++;
1833                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1834                 if (offset != 0)        /* doesn't start on page boundary */
1835                         return count;
1836
1837                 count++;
1838         }
1839 }
1840
1841 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1842 {
1843         struct brw_page **ppga;
1844         int i;
1845
1846         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1847         if (ppga == NULL)
1848                 return NULL;
1849
1850         for (i = 0; i < count; i++)
1851                 ppga[i] = pga + i;
1852         return ppga;
1853 }
1854
1855 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1856 {
1857         LASSERT(ppga != NULL);
1858         OBD_FREE(ppga, sizeof(*ppga) * count);
1859 }
1860
1861 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1862                    obd_count page_count, struct brw_page *pga,
1863                    struct obd_trans_info *oti)
1864 {
1865         struct obdo *saved_oa = NULL;
1866         struct brw_page **ppga, **orig;
1867         struct obd_import *imp = class_exp2cliimp(exp);
1868         struct client_obd *cli;
1869         int rc, page_count_orig;
1870         ENTRY;
1871
1872         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1873         cli = &imp->imp_obd->u.cli;
1874
1875         if (cmd & OBD_BRW_CHECK) {
1876                 /* The caller just wants to know if there's a chance that this
1877                  * I/O can succeed */
1878
1879                 if (imp->imp_invalid)
1880                         RETURN(-EIO);
1881                 RETURN(0);
1882         }
1883
1884         /* test_brw with a failed create can trip this, maybe others. */
1885         LASSERT(cli->cl_max_pages_per_rpc);
1886
1887         rc = 0;
1888
1889         orig = ppga = osc_build_ppga(pga, page_count);
1890         if (ppga == NULL)
1891                 RETURN(-ENOMEM);
1892         page_count_orig = page_count;
1893
1894         sort_brw_pages(ppga, page_count);
1895         while (page_count) {
1896                 obd_count pages_per_brw;
1897
1898                 if (page_count > cli->cl_max_pages_per_rpc)
1899                         pages_per_brw = cli->cl_max_pages_per_rpc;
1900                 else
1901                         pages_per_brw = page_count;
1902
1903                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1904
1905                 if (saved_oa != NULL) {
1906                         /* restore previously saved oa */
1907                         *oinfo->oi_oa = *saved_oa;
1908                 } else if (page_count > pages_per_brw) {
1909                         /* save a copy of oa (brw will clobber it) */
1910                         OBDO_ALLOC(saved_oa);
1911                         if (saved_oa == NULL)
1912                                 GOTO(out, rc = -ENOMEM);
1913                         *saved_oa = *oinfo->oi_oa;
1914                 }
1915
1916                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1917                                       pages_per_brw, ppga, oinfo->oi_capa);
1918
1919                 if (rc != 0)
1920                         break;
1921
1922                 page_count -= pages_per_brw;
1923                 ppga += pages_per_brw;
1924         }
1925
1926 out:
1927         osc_release_ppga(orig, page_count_orig);
1928
1929         if (saved_oa != NULL)
1930                 OBDO_FREE(saved_oa);
1931
1932         RETURN(rc);
1933 }
1934
1935 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1936  * the dirty accounting.  Writeback completes or truncate happens before
1937  * writing starts.  Must be called with the loi lock held. */
1938 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1939                            int sent)
1940 {
1941         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1942 }
1943
1944
1945 /* This maintains the lists of pending pages to read/write for a given object
1946  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1947  * to quickly find objects that are ready to send an RPC. */
1948 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1949                          int cmd)
1950 {
1951         int optimal;
1952         ENTRY;
1953
1954         if (lop->lop_num_pending == 0)
1955                 RETURN(0);
1956
1957         /* if we have an invalid import we want to drain the queued pages
1958          * by forcing them through rpcs that immediately fail and complete
1959          * the pages.  recovery relies on this to empty the queued pages
1960          * before canceling the locks and evicting down the llite pages */
1961         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1962                 RETURN(1);
1963
1964         /* stream rpcs in queue order as long as as there is an urgent page
1965          * queued.  this is our cheap solution for good batching in the case
1966          * where writepage marks some random page in the middle of the file
1967          * as urgent because of, say, memory pressure */
1968         if (!cfs_list_empty(&lop->lop_urgent)) {
1969                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1970                 RETURN(1);
1971         }
1972         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1973         optimal = cli->cl_max_pages_per_rpc;
1974         if (cmd & OBD_BRW_WRITE) {
1975                 /* trigger a write rpc stream as long as there are dirtiers
1976                  * waiting for space.  as they're waiting, they're not going to
1977                  * create more pages to coalesce with what's waiting.. */
1978                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1979                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1980                         RETURN(1);
1981                 }
1982                 /* +16 to avoid triggering rpcs that would want to include pages
1983                  * that are being queued but which can't be made ready until
1984                  * the queuer finishes with the page. this is a wart for
1985                  * llite::commit_write() */
1986                 optimal += 16;
1987         }
1988         if (lop->lop_num_pending >= optimal)
1989                 RETURN(1);
1990
1991         RETURN(0);
1992 }
1993
1994 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1995 {
1996         struct osc_async_page *oap;
1997         ENTRY;
1998
1999         if (cfs_list_empty(&lop->lop_urgent))
2000                 RETURN(0);
2001
2002         oap = cfs_list_entry(lop->lop_urgent.next,
2003                          struct osc_async_page, oap_urgent_item);
2004
2005         if (oap->oap_async_flags & ASYNC_HP) {
2006                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2007                 RETURN(1);
2008         }
2009
2010         RETURN(0);
2011 }
2012
2013 static void on_list(cfs_list_t *item, cfs_list_t *list,
2014                     int should_be_on)
2015 {
2016         if (cfs_list_empty(item) && should_be_on)
2017                 cfs_list_add_tail(item, list);
2018         else if (!cfs_list_empty(item) && !should_be_on)
2019                 cfs_list_del_init(item);
2020 }
2021
2022 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2023  * can find pages to build into rpcs quickly */
2024 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2025 {
2026         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2027             lop_makes_hprpc(&loi->loi_read_lop)) {
2028                 /* HP rpc */
2029                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2030                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2031         } else {
2032                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2033                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2034                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2035                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2036         }
2037
2038         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2039                 loi->loi_write_lop.lop_num_pending);
2040
2041         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2042                 loi->loi_read_lop.lop_num_pending);
2043 }
2044
2045 static void lop_update_pending(struct client_obd *cli,
2046                                struct loi_oap_pages *lop, int cmd, int delta)
2047 {
2048         lop->lop_num_pending += delta;
2049         if (cmd & OBD_BRW_WRITE)
2050                 cli->cl_pending_w_pages += delta;
2051         else
2052                 cli->cl_pending_r_pages += delta;
2053 }
2054
2055 /**
2056  * this is called when a sync waiter receives an interruption.  Its job is to
2057  * get the caller woken as soon as possible.  If its page hasn't been put in an
2058  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2059  * desiring interruption which will forcefully complete the rpc once the rpc
2060  * has timed out.
2061  */
2062 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2063 {
2064         struct loi_oap_pages *lop;
2065         struct lov_oinfo *loi;
2066         int rc = -EBUSY;
2067         ENTRY;
2068
2069         LASSERT(!oap->oap_interrupted);
2070         oap->oap_interrupted = 1;
2071
2072         /* ok, it's been put in an rpc. only one oap gets a request reference */
2073         if (oap->oap_request != NULL) {
2074                 ptlrpc_mark_interrupted(oap->oap_request);
2075                 ptlrpcd_wake(oap->oap_request);
2076                 ptlrpc_req_finished(oap->oap_request);
2077                 oap->oap_request = NULL;
2078         }
2079
2080         /*
2081          * page completion may be called only if ->cpo_prep() method was
2082          * executed by osc_io_submit(), that also adds page the to pending list
2083          */
2084         if (!cfs_list_empty(&oap->oap_pending_item)) {
2085                 cfs_list_del_init(&oap->oap_pending_item);
2086                 cfs_list_del_init(&oap->oap_urgent_item);
2087
2088                 loi = oap->oap_loi;
2089                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2090                         &loi->loi_write_lop : &loi->loi_read_lop;
2091                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2092                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2093                 rc = oap->oap_caller_ops->ap_completion(env,
2094                                           oap->oap_caller_data,
2095                                           oap->oap_cmd, NULL, -EINTR);
2096         }
2097
2098         RETURN(rc);
2099 }
2100
2101 /* this is trying to propogate async writeback errors back up to the
2102  * application.  As an async write fails we record the error code for later if
2103  * the app does an fsync.  As long as errors persist we force future rpcs to be
2104  * sync so that the app can get a sync error and break the cycle of queueing
2105  * pages for which writeback will fail. */
2106 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2107                            int rc)
2108 {
2109         if (rc) {
2110                 if (!ar->ar_rc)
2111                         ar->ar_rc = rc;
2112
2113                 ar->ar_force_sync = 1;
2114                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2115                 return;
2116
2117         }
2118
2119         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2120                 ar->ar_force_sync = 0;
2121 }
2122
2123 void osc_oap_to_pending(struct osc_async_page *oap)
2124 {
2125         struct loi_oap_pages *lop;
2126
2127         if (oap->oap_cmd & OBD_BRW_WRITE)
2128                 lop = &oap->oap_loi->loi_write_lop;
2129         else
2130                 lop = &oap->oap_loi->loi_read_lop;
2131
2132         if (oap->oap_async_flags & ASYNC_HP)
2133                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2134         else if (oap->oap_async_flags & ASYNC_URGENT)
2135                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2136         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2137         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2138 }
2139
2140 /* this must be called holding the loi list lock to give coverage to exit_cache,
2141  * async_flag maintenance, and oap_request */
2142 static void osc_ap_completion(const struct lu_env *env,
2143                               struct client_obd *cli, struct obdo *oa,
2144                               struct osc_async_page *oap, int sent, int rc)
2145 {
2146         __u64 xid = 0;
2147
2148         ENTRY;
2149         if (oap->oap_request != NULL) {
2150                 xid = ptlrpc_req_xid(oap->oap_request);
2151                 ptlrpc_req_finished(oap->oap_request);
2152                 oap->oap_request = NULL;
2153         }
2154
2155         cfs_spin_lock(&oap->oap_lock);
2156         oap->oap_async_flags = 0;
2157         cfs_spin_unlock(&oap->oap_lock);
2158         oap->oap_interrupted = 0;
2159
2160         if (oap->oap_cmd & OBD_BRW_WRITE) {
2161                 osc_process_ar(&cli->cl_ar, xid, rc);
2162                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2163         }
2164
2165         if (rc == 0 && oa != NULL) {
2166                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2167                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2168                 if (oa->o_valid & OBD_MD_FLMTIME)
2169                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2170                 if (oa->o_valid & OBD_MD_FLATIME)
2171                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2172                 if (oa->o_valid & OBD_MD_FLCTIME)
2173                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2174         }
2175
2176         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2177                                                 oap->oap_cmd, oa, rc);
2178
2179         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2180          * I/O on the page could start, but OSC calls it under lock
2181          * and thus we can add oap back to pending safely */
2182         if (rc)
2183                 /* upper layer wants to leave the page on pending queue */
2184                 osc_oap_to_pending(oap);
2185         else
2186                 osc_exit_cache(cli, oap, sent);
2187         EXIT;
2188 }
2189
2190 static int brw_interpret(const struct lu_env *env,
2191                          struct ptlrpc_request *req, void *data, int rc)
2192 {
2193         struct osc_brw_async_args *aa = data;
2194         struct client_obd *cli;
2195         int async;
2196         ENTRY;
2197
2198         rc = osc_brw_fini_request(req, rc);
2199         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2200         if (osc_recoverable_error(rc)) {
2201                 /* Only retry once for mmaped files since the mmaped page
2202                  * might be modified at anytime. We have to retry at least
2203                  * once in case there WAS really a corruption of the page
2204                  * on the network, that was not caused by mmap() modifying
2205                  * the page. Bug11742 */
2206                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2207                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2208                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2209                         rc = 0;
2210                 } else {
2211                         rc = osc_brw_redo_request(req, aa);
2212                         if (rc == 0)
2213                                 RETURN(0);
2214                 }
2215         }
2216
2217         if (aa->aa_ocapa) {
2218                 capa_put(aa->aa_ocapa);
2219                 aa->aa_ocapa = NULL;
2220         }
2221
2222         cli = aa->aa_cli;
2223
2224         client_obd_list_lock(&cli->cl_loi_list_lock);
2225
2226         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2227          * is called so we know whether to go to sync BRWs or wait for more
2228          * RPCs to complete */
2229         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2230                 cli->cl_w_in_flight--;
2231         else
2232                 cli->cl_r_in_flight--;
2233
2234         async = cfs_list_empty(&aa->aa_oaps);
2235         if (!async) { /* from osc_send_oap_rpc() */
2236                 struct osc_async_page *oap, *tmp;
2237                 /* the caller may re-use the oap after the completion call so
2238                  * we need to clean it up a little */
2239                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2240                                              oap_rpc_item) {
2241                         cfs_list_del_init(&oap->oap_rpc_item);
2242                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2243                 }
2244                 OBDO_FREE(aa->aa_oa);
2245         } else { /* from async_internal() */
2246                 obd_count i;
2247                 for (i = 0; i < aa->aa_page_count; i++)
2248                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2249         }
2250         osc_wake_cache_waiters(cli);
2251         osc_check_rpcs(env, cli);
2252         client_obd_list_unlock(&cli->cl_loi_list_lock);
2253         if (!async)
2254                 cl_req_completion(env, aa->aa_clerq, rc);
2255         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2256
2257         RETURN(rc);
2258 }
2259
2260 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2261                                             struct client_obd *cli,
2262                                             cfs_list_t *rpc_list,
2263                                             int page_count, int cmd)
2264 {
2265         struct ptlrpc_request *req;
2266         struct brw_page **pga = NULL;
2267         struct osc_brw_async_args *aa;
2268         struct obdo *oa = NULL;
2269         const struct obd_async_page_ops *ops = NULL;
2270         void *caller_data = NULL;
2271         struct osc_async_page *oap;
2272         struct osc_async_page *tmp;
2273         struct ost_body *body;
2274         struct cl_req *clerq = NULL;
2275         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2276         struct ldlm_lock *lock = NULL;
2277         struct cl_req_attr crattr;
2278         int i, rc, mpflag = 0;
2279
2280         ENTRY;
2281         LASSERT(!cfs_list_empty(rpc_list));
2282
2283         if (cmd & OBD_BRW_MEMALLOC)
2284                 mpflag = cfs_memory_pressure_get_and_set();
2285
2286         memset(&crattr, 0, sizeof crattr);
2287         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2288         if (pga == NULL)
2289                 GOTO(out, req = ERR_PTR(-ENOMEM));
2290
2291         OBDO_ALLOC(oa);
2292         if (oa == NULL)
2293                 GOTO(out, req = ERR_PTR(-ENOMEM));
2294
2295         i = 0;
2296         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2297                 struct cl_page *page = osc_oap2cl_page(oap);
2298                 if (ops == NULL) {
2299                         ops = oap->oap_caller_ops;
2300                         caller_data = oap->oap_caller_data;
2301
2302                         clerq = cl_req_alloc(env, page, crt,
2303                                              1 /* only 1-object rpcs for
2304                                                 * now */);
2305                         if (IS_ERR(clerq))
2306                                 GOTO(out, req = (void *)clerq);
2307                         lock = oap->oap_ldlm_lock;
2308                 }
2309                 pga[i] = &oap->oap_brw_page;
2310                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2311                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2312                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2313                 i++;
2314                 cl_req_page_add(env, clerq, page);
2315         }
2316
2317         /* always get the data for the obdo for the rpc */
2318         LASSERT(ops != NULL);
2319         crattr.cra_oa = oa;
2320         crattr.cra_capa = NULL;
2321         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2322         if (lock) {
2323                 oa->o_handle = lock->l_remote_handle;
2324                 oa->o_valid |= OBD_MD_FLHANDLE;
2325         }
2326
2327         rc = cl_req_prep(env, clerq);
2328         if (rc != 0) {
2329                 CERROR("cl_req_prep failed: %d\n", rc);
2330                 GOTO(out, req = ERR_PTR(rc));
2331         }
2332
2333         sort_brw_pages(pga, page_count);
2334         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2335                                   pga, &req, crattr.cra_capa, 1, 0);
2336         if (rc != 0) {
2337                 CERROR("prep_req failed: %d\n", rc);
2338                 GOTO(out, req = ERR_PTR(rc));
2339         }
2340
2341         if (cmd & OBD_BRW_MEMALLOC)
2342                 req->rq_memalloc = 1;
2343
2344         /* Need to update the timestamps after the request is built in case
2345          * we race with setattr (locally or in queue at OST).  If OST gets
2346          * later setattr before earlier BRW (as determined by the request xid),
2347          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2348          * way to do this in a single call.  bug 10150 */
2349         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2350         cl_req_attr_set(env, clerq, &crattr,
2351                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2352
2353         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2354         aa = ptlrpc_req_async_args(req);
2355         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2356         cfs_list_splice(rpc_list, &aa->aa_oaps);
2357         CFS_INIT_LIST_HEAD(rpc_list);
2358         aa->aa_clerq = clerq;
2359 out:
2360         if (cmd & OBD_BRW_MEMALLOC)
2361                 cfs_memory_pressure_restore(mpflag);
2362
2363         capa_put(crattr.cra_capa);
2364         if (IS_ERR(req)) {
2365                 if (oa)
2366                         OBDO_FREE(oa);
2367                 if (pga)
2368                         OBD_FREE(pga, sizeof(*pga) * page_count);
2369                 /* this should happen rarely and is pretty bad, it makes the
2370                  * pending list not follow the dirty order */
2371                 client_obd_list_lock(&cli->cl_loi_list_lock);
2372                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2373                         cfs_list_del_init(&oap->oap_rpc_item);
2374
2375                         /* queued sync pages can be torn down while the pages
2376                          * were between the pending list and the rpc */
2377                         if (oap->oap_interrupted) {
2378                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2379                                 osc_ap_completion(env, cli, NULL, oap, 0,
2380                                                   oap->oap_count);
2381                                 continue;
2382                         }
2383                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2384                 }
2385                 if (clerq && !IS_ERR(clerq))
2386                         cl_req_completion(env, clerq, PTR_ERR(req));
2387         }
2388         RETURN(req);
2389 }
2390
2391 /**
2392  * prepare pages for ASYNC io and put pages in send queue.
2393  *
2394  * \param cmd OBD_BRW_* macroses
2395  * \param lop pending pages
2396  *
2397  * \return zero if no page added to send queue.
2398  * \return 1 if pages successfully added to send queue.
2399  * \return negative on errors.
2400  */
2401 static int
2402 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2403                  struct lov_oinfo *loi,
2404                  int cmd, struct loi_oap_pages *lop)
2405 {
2406         struct ptlrpc_request *req;
2407         obd_count page_count = 0;
2408         struct osc_async_page *oap = NULL, *tmp;
2409         struct osc_brw_async_args *aa;
2410         const struct obd_async_page_ops *ops;
2411         CFS_LIST_HEAD(rpc_list);
2412         CFS_LIST_HEAD(tmp_list);
2413         unsigned int ending_offset;
2414         unsigned  starting_offset = 0;
2415         int srvlock = 0, mem_tight = 0;
2416         struct cl_object *clob = NULL;
2417         ENTRY;
2418
2419         /* ASYNC_HP pages first. At present, when the lock the pages is
2420          * to be canceled, the pages covered by the lock will be sent out
2421          * with ASYNC_HP. We have to send out them as soon as possible. */
2422         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2423                 if (oap->oap_async_flags & ASYNC_HP) 
2424                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2425                 else
2426                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2427                 if (++page_count >= cli->cl_max_pages_per_rpc)
2428                         break;
2429         }
2430
2431         cfs_list_splice(&tmp_list, &lop->lop_pending);
2432         page_count = 0;
2433
2434         /* first we find the pages we're allowed to work with */
2435         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2436                                      oap_pending_item) {
2437                 ops = oap->oap_caller_ops;
2438
2439                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2440                          "magic 0x%x\n", oap, oap->oap_magic);
2441
2442                 if (clob == NULL) {
2443                         /* pin object in memory, so that completion call-backs
2444                          * can be safely called under client_obd_list lock. */
2445                         clob = osc_oap2cl_page(oap)->cp_obj;
2446                         cl_object_get(clob);
2447                 }
2448
2449                 if (page_count != 0 &&
2450                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2451                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2452                                " oap %p, page %p, srvlock %u\n",
2453                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2454                         break;
2455                 }
2456
2457                 /* If there is a gap at the start of this page, it can't merge
2458                  * with any previous page, so we'll hand the network a
2459                  * "fragmented" page array that it can't transfer in 1 RDMA */
2460                 if (page_count != 0 && oap->oap_page_off != 0)
2461                         break;
2462
2463                 /* in llite being 'ready' equates to the page being locked
2464                  * until completion unlocks it.  commit_write submits a page
2465                  * as not ready because its unlock will happen unconditionally
2466                  * as the call returns.  if we race with commit_write giving
2467                  * us that page we don't want to create a hole in the page
2468                  * stream, so we stop and leave the rpc to be fired by
2469                  * another dirtier or kupdated interval (the not ready page
2470                  * will still be on the dirty list).  we could call in
2471                  * at the end of ll_file_write to process the queue again. */
2472                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2473                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2474                                                     cmd);
2475                         if (rc < 0)
2476                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2477                                                 "instead of ready\n", oap,
2478                                                 oap->oap_page, rc);
2479                         switch (rc) {
2480                         case -EAGAIN:
2481                                 /* llite is telling us that the page is still
2482                                  * in commit_write and that we should try
2483                                  * and put it in an rpc again later.  we
2484                                  * break out of the loop so we don't create
2485                                  * a hole in the sequence of pages in the rpc
2486                                  * stream.*/
2487                                 oap = NULL;
2488                                 break;
2489                         case -EINTR:
2490                                 /* the io isn't needed.. tell the checks
2491                                  * below to complete the rpc with EINTR */
2492                                 cfs_spin_lock(&oap->oap_lock);
2493                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2494                                 cfs_spin_unlock(&oap->oap_lock);
2495                                 oap->oap_count = -EINTR;
2496                                 break;
2497                         case 0:
2498                                 cfs_spin_lock(&oap->oap_lock);
2499                                 oap->oap_async_flags |= ASYNC_READY;
2500                                 cfs_spin_unlock(&oap->oap_lock);
2501                                 break;
2502                         default:
2503                                 LASSERTF(0, "oap %p page %p returned %d "
2504                                             "from make_ready\n", oap,
2505                                             oap->oap_page, rc);
2506                                 break;
2507                         }
2508                 }
2509                 if (oap == NULL)
2510                         break;
2511                 /*
2512                  * Page submitted for IO has to be locked. Either by
2513                  * ->ap_make_ready() or by higher layers.
2514                  */
2515 #if defined(__KERNEL__) && defined(__linux__)
2516                 {
2517                         struct cl_page *page;
2518
2519                         page = osc_oap2cl_page(oap);
2520
2521                         if (page->cp_type == CPT_CACHEABLE &&
2522                             !(PageLocked(oap->oap_page) &&
2523                               (CheckWriteback(oap->oap_page, cmd)))) {
2524                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2525                                        oap->oap_page,
2526                                        (long)oap->oap_page->flags,
2527                                        oap->oap_async_flags);
2528                                 LBUG();
2529                         }
2530                 }
2531 #endif
2532
2533                 /* take the page out of our book-keeping */
2534                 cfs_list_del_init(&oap->oap_pending_item);
2535                 lop_update_pending(cli, lop, cmd, -1);
2536                 cfs_list_del_init(&oap->oap_urgent_item);
2537
2538                 if (page_count == 0)
2539                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2540                                           (PTLRPC_MAX_BRW_SIZE - 1);
2541
2542                 /* ask the caller for the size of the io as the rpc leaves. */
2543                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2544                         oap->oap_count =
2545                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2546                                                       cmd);
2547                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2548                 }
2549                 if (oap->oap_count <= 0) {
2550                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2551                                oap->oap_count);
2552                         osc_ap_completion(env, cli, NULL,
2553                                           oap, 0, oap->oap_count);
2554                         continue;
2555                 }
2556
2557                 /* now put the page back in our accounting */
2558                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2559                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2560                         mem_tight = 1;
2561                 if (page_count == 0)
2562                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2563                 if (++page_count >= cli->cl_max_pages_per_rpc)
2564                         break;
2565
2566                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2567                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2568                  * have the same alignment as the initial writes that allocated
2569                  * extents on the server. */
2570                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2571                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2572                 if (ending_offset == 0)
2573                         break;
2574
2575                 /* If there is a gap at the end of this page, it can't merge
2576                  * with any subsequent pages, so we'll hand the network a
2577                  * "fragmented" page array that it can't transfer in 1 RDMA */
2578                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2579                         break;
2580         }
2581
2582         osc_wake_cache_waiters(cli);
2583
2584         loi_list_maint(cli, loi);
2585
2586         client_obd_list_unlock(&cli->cl_loi_list_lock);
2587
2588         if (clob != NULL)
2589                 cl_object_put(env, clob);
2590
2591         if (page_count == 0) {
2592                 client_obd_list_lock(&cli->cl_loi_list_lock);
2593                 RETURN(0);
2594         }
2595
2596         req = osc_build_req(env, cli, &rpc_list, page_count,
2597                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2598         if (IS_ERR(req)) {
2599                 LASSERT(cfs_list_empty(&rpc_list));
2600                 loi_list_maint(cli, loi);
2601                 RETURN(PTR_ERR(req));
2602         }
2603
2604         aa = ptlrpc_req_async_args(req);
2605
2606         if (cmd == OBD_BRW_READ) {
2607                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2608                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2609                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2610                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2611         } else {
2612                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2613                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2614                                  cli->cl_w_in_flight);
2615                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2616                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2617         }
2618         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2619
2620         client_obd_list_lock(&cli->cl_loi_list_lock);
2621
2622         if (cmd == OBD_BRW_READ)
2623                 cli->cl_r_in_flight++;
2624         else
2625                 cli->cl_w_in_flight++;
2626
2627         /* queued sync pages can be torn down while the pages
2628          * were between the pending list and the rpc */
2629         tmp = NULL;
2630         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2631                 /* only one oap gets a request reference */
2632                 if (tmp == NULL)
2633                         tmp = oap;
2634                 if (oap->oap_interrupted && !req->rq_intr) {
2635                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2636                                oap, req);
2637                         ptlrpc_mark_interrupted(req);
2638                 }
2639         }
2640         if (tmp != NULL)
2641                 tmp->oap_request = ptlrpc_request_addref(req);
2642
2643         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2644                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2645
2646         req->rq_interpret_reply = brw_interpret;
2647         ptlrpcd_add_req(req, PSCOPE_BRW);
2648         RETURN(1);
2649 }
2650
2651 #define LOI_DEBUG(LOI, STR, args...)                                     \
2652         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2653                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2654                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2655                (LOI)->loi_write_lop.lop_num_pending,                     \
2656                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2657                (LOI)->loi_read_lop.lop_num_pending,                      \
2658                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2659                args)                                                     \
2660
2661 /* This is called by osc_check_rpcs() to find which objects have pages that
2662  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2663 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2664 {
2665         ENTRY;
2666
2667         /* First return objects that have blocked locks so that they
2668          * will be flushed quickly and other clients can get the lock,
2669          * then objects which have pages ready to be stuffed into RPCs */
2670         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2671                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2672                                       struct lov_oinfo, loi_hp_ready_item));
2673         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2674                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2675                                       struct lov_oinfo, loi_ready_item));
2676
2677         /* then if we have cache waiters, return all objects with queued
2678          * writes.  This is especially important when many small files
2679          * have filled up the cache and not been fired into rpcs because
2680          * they don't pass the nr_pending/object threshhold */
2681         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2682             !cfs_list_empty(&cli->cl_loi_write_list))
2683                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2684                                       struct lov_oinfo, loi_write_item));
2685
2686         /* then return all queued objects when we have an invalid import
2687          * so that they get flushed */
2688         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2689                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2690                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2691                                               struct lov_oinfo,
2692                                               loi_write_item));
2693                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2694                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2695                                               struct lov_oinfo, loi_read_item));
2696         }
2697         RETURN(NULL);
2698 }
2699
2700 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2701 {
2702         struct osc_async_page *oap;
2703         int hprpc = 0;
2704
2705         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2706                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2707                                      struct osc_async_page, oap_urgent_item);
2708                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2709         }
2710
2711         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2712                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2713                                      struct osc_async_page, oap_urgent_item);
2714                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2715         }
2716
2717         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2718 }
2719
2720 /* called with the loi list lock held */
2721 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2722 {
2723         struct lov_oinfo *loi;
2724         int rc = 0, race_counter = 0;
2725         ENTRY;
2726
2727         while ((loi = osc_next_loi(cli)) != NULL) {
2728                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2729
2730                 if (osc_max_rpc_in_flight(cli, loi))
2731                         break;
2732
2733                 /* attempt some read/write balancing by alternating between
2734                  * reads and writes in an object.  The makes_rpc checks here
2735                  * would be redundant if we were getting read/write work items
2736                  * instead of objects.  we don't want send_oap_rpc to drain a
2737                  * partial read pending queue when we're given this object to
2738                  * do io on writes while there are cache waiters */
2739                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2740                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2741                                               &loi->loi_write_lop);
2742                         if (rc < 0) {
2743                                 CERROR("Write request failed with %d\n", rc);
2744
2745                                 /* osc_send_oap_rpc failed, mostly because of
2746                                  * memory pressure.
2747                                  *
2748                                  * It can't break here, because if:
2749                                  *  - a page was submitted by osc_io_submit, so
2750                                  *    page locked;
2751                                  *  - no request in flight
2752                                  *  - no subsequent request
2753                                  * The system will be in live-lock state,
2754                                  * because there is no chance to call
2755                                  * osc_io_unplug() and osc_check_rpcs() any
2756                                  * more. pdflush can't help in this case,
2757                                  * because it might be blocked at grabbing
2758                                  * the page lock as we mentioned.
2759                                  *
2760                                  * Anyway, continue to drain pages. */
2761                                 /* break; */
2762                         }
2763
2764                         if (rc > 0)
2765                                 race_counter = 0;
2766                         else
2767                                 race_counter++;
2768                 }
2769                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2770                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2771                                               &loi->loi_read_lop);
2772                         if (rc < 0)
2773                                 CERROR("Read request failed with %d\n", rc);
2774
2775                         if (rc > 0)
2776                                 race_counter = 0;
2777                         else
2778                                 race_counter++;
2779                 }
2780
2781                 /* attempt some inter-object balancing by issuing rpcs
2782                  * for each object in turn */
2783                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2784                         cfs_list_del_init(&loi->loi_hp_ready_item);
2785                 if (!cfs_list_empty(&loi->loi_ready_item))
2786                         cfs_list_del_init(&loi->loi_ready_item);
2787                 if (!cfs_list_empty(&loi->loi_write_item))
2788                         cfs_list_del_init(&loi->loi_write_item);
2789                 if (!cfs_list_empty(&loi->loi_read_item))
2790                         cfs_list_del_init(&loi->loi_read_item);
2791
2792                 loi_list_maint(cli, loi);
2793
2794                 /* send_oap_rpc fails with 0 when make_ready tells it to
2795                  * back off.  llite's make_ready does this when it tries
2796                  * to lock a page queued for write that is already locked.
2797                  * we want to try sending rpcs from many objects, but we
2798                  * don't want to spin failing with 0.  */
2799                 if (race_counter == 10)
2800                         break;
2801         }
2802         EXIT;
2803 }
2804
2805 /* we're trying to queue a page in the osc so we're subject to the
2806  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2807  * If the osc's queued pages are already at that limit, then we want to sleep
2808  * until there is space in the osc's queue for us.  We also may be waiting for
2809  * write credits from the OST if there are RPCs in flight that may return some
2810  * before we fall back to sync writes.
2811  *
2812  * We need this know our allocation was granted in the presence of signals */
2813 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2814 {
2815         int rc;
2816         ENTRY;
2817         client_obd_list_lock(&cli->cl_loi_list_lock);
2818         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2819         client_obd_list_unlock(&cli->cl_loi_list_lock);
2820         RETURN(rc);
2821 };
2822
2823 /**
2824  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2825  * is available.
2826  */
2827 int osc_enter_cache_try(const struct lu_env *env,
2828                         struct client_obd *cli, struct lov_oinfo *loi,
2829                         struct osc_async_page *oap, int transient)
2830 {
2831         int has_grant;
2832
2833         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2834         if (has_grant) {
2835                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2836                 if (transient) {
2837                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2838                         cfs_atomic_inc(&obd_dirty_transit_pages);
2839                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2840                 }
2841         }
2842         return has_grant;
2843 }
2844
2845 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2846  * grant or cache space. */
2847 static int osc_enter_cache(const struct lu_env *env,
2848                            struct client_obd *cli, struct lov_oinfo *loi,
2849                            struct osc_async_page *oap)
2850 {
2851         struct osc_cache_waiter ocw;
2852         struct l_wait_info lwi = { 0 };
2853
2854         ENTRY;
2855
2856         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2857                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2858                cli->cl_dirty_max, obd_max_dirty_pages,
2859                cli->cl_lost_grant, cli->cl_avail_grant);
2860
2861         /* force the caller to try sync io.  this can jump the list
2862          * of queued writes and create a discontiguous rpc stream */
2863         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2864             loi->loi_ar.ar_force_sync)
2865                 RETURN(-EDQUOT);
2866
2867         /* Hopefully normal case - cache space and write credits available */
2868         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2869             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2870             osc_enter_cache_try(env, cli, loi, oap, 0))
2871                 RETURN(0);
2872
2873         /* It is safe to block as a cache waiter as long as there is grant
2874          * space available or the hope of additional grant being returned
2875          * when an in flight write completes.  Using the write back cache
2876          * if possible is preferable to sending the data synchronously
2877          * because write pages can then be merged in to large requests.
2878          * The addition of this cache waiter will causing pending write
2879          * pages to be sent immediately. */
2880         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2881                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2882                 cfs_waitq_init(&ocw.ocw_waitq);
2883                 ocw.ocw_oap = oap;
2884                 ocw.ocw_rc = 0;
2885
2886                 loi_list_maint(cli, loi);
2887                 osc_check_rpcs(env, cli);
2888                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2889
2890                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2891                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2892
2893                 client_obd_list_lock(&cli->cl_loi_list_lock);
2894                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2895                         cfs_list_del(&ocw.ocw_entry);
2896                         RETURN(-EINTR);
2897                 }
2898                 RETURN(ocw.ocw_rc);
2899         }
2900
2901         RETURN(-EDQUOT);
2902 }
2903
2904
2905 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2906                         struct lov_oinfo *loi, cfs_page_t *page,
2907                         obd_off offset, const struct obd_async_page_ops *ops,
2908                         void *data, void **res, int nocache,
2909                         struct lustre_handle *lockh)
2910 {
2911         struct osc_async_page *oap;
2912
2913         ENTRY;
2914
2915         if (!page)
2916                 return cfs_size_round(sizeof(*oap));
2917
2918         oap = *res;
2919         oap->oap_magic = OAP_MAGIC;
2920         oap->oap_cli = &exp->exp_obd->u.cli;
2921         oap->oap_loi = loi;
2922
2923         oap->oap_caller_ops = ops;
2924         oap->oap_caller_data = data;
2925
2926         oap->oap_page = page;
2927         oap->oap_obj_off = offset;
2928         if (!client_is_remote(exp) &&
2929             cfs_capable(CFS_CAP_SYS_RESOURCE))
2930                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2931
2932         LASSERT(!(offset & ~CFS_PAGE_MASK));
2933
2934         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2935         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2936         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2937         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2938
2939         cfs_spin_lock_init(&oap->oap_lock);
2940         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2941         RETURN(0);
2942 }
2943
2944 struct osc_async_page *oap_from_cookie(void *cookie)
2945 {
2946         struct osc_async_page *oap = cookie;
2947         if (oap->oap_magic != OAP_MAGIC)
2948                 return ERR_PTR(-EINVAL);
2949         return oap;
2950 };
2951
2952 int osc_queue_async_io(const struct lu_env *env,
2953                        struct obd_export *exp, struct lov_stripe_md *lsm,
2954                        struct lov_oinfo *loi, void *cookie,
2955                        int cmd, obd_off off, int count,
2956                        obd_flag brw_flags, enum async_flags async_flags)
2957 {
2958         struct client_obd *cli = &exp->exp_obd->u.cli;
2959         struct osc_async_page *oap;
2960         int rc = 0;
2961         ENTRY;
2962
2963         oap = oap_from_cookie(cookie);
2964         if (IS_ERR(oap))
2965                 RETURN(PTR_ERR(oap));
2966
2967         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2968                 RETURN(-EIO);
2969
2970         if (!cfs_list_empty(&oap->oap_pending_item) ||
2971             !cfs_list_empty(&oap->oap_urgent_item) ||
2972             !cfs_list_empty(&oap->oap_rpc_item))
2973                 RETURN(-EBUSY);
2974
2975         /* check if the file's owner/group is over quota */
2976         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2977                 struct cl_object *obj;
2978                 struct cl_attr    attr; /* XXX put attr into thread info */
2979                 unsigned int qid[MAXQUOTAS];
2980
2981                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2982
2983                 cl_object_attr_lock(obj);
2984                 rc = cl_object_attr_get(env, obj, &attr);
2985                 cl_object_attr_unlock(obj);
2986
2987                 qid[USRQUOTA] = attr.cat_uid;
2988                 qid[GRPQUOTA] = attr.cat_gid;
2989                 if (rc == 0 &&
2990                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2991                         rc = -EDQUOT;
2992                 if (rc)
2993                         RETURN(rc);
2994         }
2995
2996         if (loi == NULL)
2997                 loi = lsm->lsm_oinfo[0];
2998
2999         client_obd_list_lock(&cli->cl_loi_list_lock);
3000
3001         LASSERT(off + count <= CFS_PAGE_SIZE);
3002         oap->oap_cmd = cmd;
3003         oap->oap_page_off = off;
3004         oap->oap_count = count;
3005         oap->oap_brw_flags = brw_flags;
3006         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3007         if (cfs_memory_pressure_get())
3008                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3009         cfs_spin_lock(&oap->oap_lock);
3010         oap->oap_async_flags = async_flags;
3011         cfs_spin_unlock(&oap->oap_lock);
3012
3013         if (cmd & OBD_BRW_WRITE) {
3014                 rc = osc_enter_cache(env, cli, loi, oap);
3015                 if (rc) {
3016                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3017                         RETURN(rc);
3018                 }
3019         }
3020
3021         osc_oap_to_pending(oap);
3022         loi_list_maint(cli, loi);
3023
3024         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3025                   cmd);
3026
3027         osc_check_rpcs(env, cli);
3028         client_obd_list_unlock(&cli->cl_loi_list_lock);
3029
3030         RETURN(0);
3031 }
3032
3033 /* aka (~was & now & flag), but this is more clear :) */
3034 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3035
3036 int osc_set_async_flags_base(struct client_obd *cli,
3037                              struct lov_oinfo *loi, struct osc_async_page *oap,
3038                              obd_flag async_flags)
3039 {
3040         struct loi_oap_pages *lop;
3041         int flags = 0;
3042         ENTRY;
3043
3044         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3045
3046         if (oap->oap_cmd & OBD_BRW_WRITE) {
3047                 lop = &loi->loi_write_lop;
3048         } else {
3049                 lop = &loi->loi_read_lop;
3050         }
3051
3052         if ((oap->oap_async_flags & async_flags) == async_flags)
3053                 RETURN(0);
3054
3055         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3056                 flags |= ASYNC_READY;
3057
3058         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3059             cfs_list_empty(&oap->oap_rpc_item)) {
3060                 if (oap->oap_async_flags & ASYNC_HP)
3061                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3062                 else
3063                         cfs_list_add_tail(&oap->oap_urgent_item,
3064                                           &lop->lop_urgent);
3065                 flags |= ASYNC_URGENT;
3066                 loi_list_maint(cli, loi);
3067         }
3068         cfs_spin_lock(&oap->oap_lock);
3069         oap->oap_async_flags |= flags;
3070         cfs_spin_unlock(&oap->oap_lock);
3071
3072         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3073                         oap->oap_async_flags);
3074         RETURN(0);
3075 }
3076
3077 int osc_teardown_async_page(struct obd_export *exp,
3078                             struct lov_stripe_md *lsm,
3079                             struct lov_oinfo *loi, void *cookie)
3080 {
3081         struct client_obd *cli = &exp->exp_obd->u.cli;
3082         struct loi_oap_pages *lop;
3083         struct osc_async_page *oap;
3084         int rc = 0;
3085         ENTRY;
3086
3087         oap = oap_from_cookie(cookie);
3088         if (IS_ERR(oap))
3089                 RETURN(PTR_ERR(oap));
3090
3091         if (loi == NULL)
3092                 loi = lsm->lsm_oinfo[0];
3093
3094         if (oap->oap_cmd & OBD_BRW_WRITE) {
3095                 lop = &loi->loi_write_lop;
3096         } else {
3097                 lop = &loi->loi_read_lop;
3098         }
3099
3100         client_obd_list_lock(&cli->cl_loi_list_lock);
3101
3102         if (!cfs_list_empty(&oap->oap_rpc_item))
3103                 GOTO(out, rc = -EBUSY);
3104
3105         osc_exit_cache(cli, oap, 0);
3106         osc_wake_cache_waiters(cli);
3107
3108         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3109                 cfs_list_del_init(&oap->oap_urgent_item);
3110                 cfs_spin_lock(&oap->oap_lock);
3111                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3112                 cfs_spin_unlock(&oap->oap_lock);
3113         }
3114         if (!cfs_list_empty(&oap->oap_pending_item)) {
3115                 cfs_list_del_init(&oap->oap_pending_item);
3116                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3117         }
3118         loi_list_maint(cli, loi);
3119         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3120 out:
3121         client_obd_list_unlock(&cli->cl_loi_list_lock);
3122         RETURN(rc);
3123 }
3124
3125 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3126                                          struct ldlm_enqueue_info *einfo,
3127                                          int flags)
3128 {
3129         void *data = einfo->ei_cbdata;
3130
3131         LASSERT(lock != NULL);
3132         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3133         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3134         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3135         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3136
3137         lock_res_and_lock(lock);
3138         cfs_spin_lock(&osc_ast_guard);
3139         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3140         lock->l_ast_data = data;
3141         cfs_spin_unlock(&osc_ast_guard);
3142         unlock_res_and_lock(lock);
3143 }
3144
3145 static void osc_set_data_with_check(struct lustre_handle *lockh,
3146                                     struct ldlm_enqueue_info *einfo,
3147                                     int flags)
3148 {
3149         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3150
3151         if (lock != NULL) {
3152                 osc_set_lock_data_with_check(lock, einfo, flags);
3153                 LDLM_LOCK_PUT(lock);
3154         } else
3155                 CERROR("lockh %p, data %p - client evicted?\n",
3156                        lockh, einfo->ei_cbdata);
3157 }
3158
3159 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3160                              ldlm_iterator_t replace, void *data)
3161 {
3162         struct ldlm_res_id res_id;
3163         struct obd_device *obd = class_exp2obd(exp);
3164
3165         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3166         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3167         return 0;
3168 }
3169
3170 /* find any ldlm lock of the inode in osc
3171  * return 0    not find
3172  *        1    find one
3173  *      < 0    error */
3174 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3175                            ldlm_iterator_t replace, void *data)
3176 {
3177         struct ldlm_res_id res_id;
3178         struct obd_device *obd = class_exp2obd(exp);
3179         int rc = 0;
3180
3181         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3182         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3183         if (rc == LDLM_ITER_STOP)
3184                 return(1);
3185         if (rc == LDLM_ITER_CONTINUE)
3186                 return(0);
3187         return(rc);
3188 }
3189
3190 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3191                             obd_enqueue_update_f upcall, void *cookie,
3192                             int *flags, int rc)
3193 {
3194         int intent = *flags & LDLM_FL_HAS_INTENT;
3195         ENTRY;
3196
3197         if (intent) {
3198                 /* The request was created before ldlm_cli_enqueue call. */
3199                 if (rc == ELDLM_LOCK_ABORTED) {
3200                         struct ldlm_reply *rep;
3201                         rep = req_capsule_server_get(&req->rq_pill,
3202                                                      &RMF_DLM_REP);
3203
3204                         LASSERT(rep != NULL);
3205                         if (rep->lock_policy_res1)
3206                                 rc = rep->lock_policy_res1;
3207                 }
3208         }
3209
3210         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3211                 *flags |= LDLM_FL_LVB_READY;
3212                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3213                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3214         }
3215
3216         /* Call the update callback. */
3217         rc = (*upcall)(cookie, rc);
3218         RETURN(rc);
3219 }
3220
3221 static int osc_enqueue_interpret(const struct lu_env *env,
3222                                  struct ptlrpc_request *req,
3223                                  struct osc_enqueue_args *aa, int rc)
3224 {
3225         struct ldlm_lock *lock;
3226         struct lustre_handle handle;
3227         __u32 mode;
3228
3229         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3230          * might be freed anytime after lock upcall has been called. */
3231         lustre_handle_copy(&handle, aa->oa_lockh);
3232         mode = aa->oa_ei->ei_mode;
3233
3234         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3235          * be valid. */
3236         lock = ldlm_handle2lock(&handle);
3237
3238         /* Take an additional reference so that a blocking AST that
3239          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3240          * to arrive after an upcall has been executed by
3241          * osc_enqueue_fini(). */
3242         ldlm_lock_addref(&handle, mode);
3243
3244         /* Let CP AST to grant the lock first. */
3245         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3246
3247         /* Complete obtaining the lock procedure. */
3248         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3249                                    mode, aa->oa_flags, aa->oa_lvb,
3250                                    sizeof(*aa->oa_lvb), &handle, rc);
3251         /* Complete osc stuff. */
3252         rc = osc_enqueue_fini(req, aa->oa_lvb,
3253                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3254
3255         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3256
3257         /* Release the lock for async request. */
3258         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3259                 /*
3260                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3261                  * not already released by
3262                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3263                  */
3264                 ldlm_lock_decref(&handle, mode);
3265
3266         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3267                  aa->oa_lockh, req, aa);
3268         ldlm_lock_decref(&handle, mode);
3269         LDLM_LOCK_PUT(lock);
3270         return rc;
3271 }
3272
3273 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3274                         struct lov_oinfo *loi, int flags,
3275                         struct ost_lvb *lvb, __u32 mode, int rc)
3276 {
3277         if (rc == ELDLM_OK) {
3278                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3279                 __u64 tmp;
3280
3281                 LASSERT(lock != NULL);
3282                 loi->loi_lvb = *lvb;
3283                 tmp = loi->loi_lvb.lvb_size;
3284                 /* Extend KMS up to the end of this lock and no further
3285                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3286                 if (tmp > lock->l_policy_data.l_extent.end)
3287                         tmp = lock->l_policy_data.l_extent.end + 1;
3288                 if (tmp >= loi->loi_kms) {
3289                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3290                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3291                         loi_kms_set(loi, tmp);
3292                 } else {
3293                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3294                                    LPU64"; leaving kms="LPU64", end="LPU64,
3295                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3296                                    lock->l_policy_data.l_extent.end);
3297                 }
3298                 ldlm_lock_allow_match(lock);
3299                 LDLM_LOCK_PUT(lock);
3300         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3301                 loi->loi_lvb = *lvb;
3302                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3303                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3304                 rc = ELDLM_OK;
3305         }
3306 }
3307 EXPORT_SYMBOL(osc_update_enqueue);
3308
3309 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3310
3311 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3312  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3313  * other synchronous requests, however keeping some locks and trying to obtain
3314  * others may take a considerable amount of time in a case of ost failure; and
3315  * when other sync requests do not get released lock from a client, the client
3316  * is excluded from the cluster -- such scenarious make the life difficult, so
3317  * release locks just after they are obtained. */
3318 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3319                      int *flags, ldlm_policy_data_t *policy,
3320                      struct ost_lvb *lvb, int kms_valid,
3321                      obd_enqueue_update_f upcall, void *cookie,
3322                      struct ldlm_enqueue_info *einfo,
3323                      struct lustre_handle *lockh,
3324                      struct ptlrpc_request_set *rqset, int async)
3325 {
3326         struct obd_device *obd = exp->exp_obd;
3327         struct ptlrpc_request *req = NULL;
3328         int intent = *flags & LDLM_FL_HAS_INTENT;
3329         ldlm_mode_t mode;
3330         int rc;
3331         ENTRY;
3332
3333         /* Filesystem lock extents are extended to page boundaries so that
3334          * dealing with the page cache is a little smoother.  */
3335         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3336         policy->l_extent.end |= ~CFS_PAGE_MASK;
3337
3338         /*
3339          * kms is not valid when either object is completely fresh (so that no
3340          * locks are cached), or object was evicted. In the latter case cached
3341          * lock cannot be used, because it would prime inode state with
3342          * potentially stale LVB.
3343          */
3344         if (!kms_valid)
3345                 goto no_match;
3346
3347         /* Next, search for already existing extent locks that will cover us */
3348         /* If we're trying to read, we also search for an existing PW lock.  The
3349          * VFS and page cache already protect us locally, so lots of readers/
3350          * writers can share a single PW lock.
3351          *
3352          * There are problems with conversion deadlocks, so instead of
3353          * converting a read lock to a write lock, we'll just enqueue a new
3354          * one.
3355          *
3356          * At some point we should cancel the read lock instead of making them
3357          * send us a blocking callback, but there are problems with canceling
3358          * locks out from other users right now, too. */
3359         mode = einfo->ei_mode;
3360         if (einfo->ei_mode == LCK_PR)
3361                 mode |= LCK_PW;
3362         mode = ldlm_lock_match(obd->obd_namespace,
3363                                *flags | LDLM_FL_LVB_READY, res_id,
3364                                einfo->ei_type, policy, mode, lockh, 0);
3365         if (mode) {
3366                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3367
3368                 if (matched->l_ast_data == NULL ||
3369                     matched->l_ast_data == einfo->ei_cbdata) {
3370                         /* addref the lock only if not async requests and PW
3371                          * lock is matched whereas we asked for PR. */
3372                         if (!rqset && einfo->ei_mode != mode)
3373                                 ldlm_lock_addref(lockh, LCK_PR);
3374                         osc_set_lock_data_with_check(matched, einfo, *flags);
3375                         if (intent) {
3376                                 /* I would like to be able to ASSERT here that
3377                                  * rss <= kms, but I can't, for reasons which
3378                                  * are explained in lov_enqueue() */
3379                         }
3380
3381                         /* We already have a lock, and it's referenced */
3382                         (*upcall)(cookie, ELDLM_OK);
3383
3384                         /* For async requests, decref the lock. */
3385                         if (einfo->ei_mode != mode)
3386                                 ldlm_lock_decref(lockh, LCK_PW);
3387                         else if (rqset)
3388                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3389                         LDLM_LOCK_PUT(matched);
3390                         RETURN(ELDLM_OK);
3391                 } else
3392                         ldlm_lock_decref(lockh, mode);
3393                 LDLM_LOCK_PUT(matched);
3394         }
3395
3396  no_match:
3397         if (intent) {
3398                 CFS_LIST_HEAD(cancels);
3399                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3400                                            &RQF_LDLM_ENQUEUE_LVB);
3401                 if (req == NULL)
3402                         RETURN(-ENOMEM);
3403
3404                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3405                 if (rc) {
3406                         ptlrpc_request_free(req);
3407                         RETURN(rc);
3408                 }
3409
3410                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3411                                      sizeof *lvb);
3412                 ptlrpc_request_set_replen(req);
3413         }
3414
3415         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3416         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3417
3418         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3419                               sizeof(*lvb), lockh, async);
3420         if (rqset) {
3421                 if (!rc) {
3422                         struct osc_enqueue_args *aa;
3423                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3424                         aa = ptlrpc_req_async_args(req);
3425                         aa->oa_ei = einfo;
3426                         aa->oa_exp = exp;
3427                         aa->oa_flags  = flags;
3428                         aa->oa_upcall = upcall;
3429                         aa->oa_cookie = cookie;
3430                         aa->oa_lvb    = lvb;
3431                         aa->oa_lockh  = lockh;
3432
3433                         req->rq_interpret_reply =
3434                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3435                         if (rqset == PTLRPCD_SET)
3436                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3437                         else
3438                                 ptlrpc_set_add_req(rqset, req);
3439                 } else if (intent) {
3440                         ptlrpc_req_finished(req);
3441                 }
3442                 RETURN(rc);
3443         }
3444
3445         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3446         if (intent)
3447                 ptlrpc_req_finished(req);
3448
3449         RETURN(rc);
3450 }
3451
3452 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3453                        struct ldlm_enqueue_info *einfo,
3454                        struct ptlrpc_request_set *rqset)
3455 {
3456         struct ldlm_res_id res_id;
3457         int rc;
3458         ENTRY;
3459
3460         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3461                            oinfo->oi_md->lsm_object_seq, &res_id);
3462
3463         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3464                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3465                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3466                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3467                               rqset, rqset != NULL);
3468         RETURN(rc);
3469 }
3470
3471 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3472                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3473                    int *flags, void *data, struct lustre_handle *lockh,
3474                    int unref)
3475 {
3476         struct obd_device *obd = exp->exp_obd;
3477         int lflags = *flags;
3478         ldlm_mode_t rc;
3479         ENTRY;
3480
3481         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3482                 RETURN(-EIO);
3483
3484         /* Filesystem lock extents are extended to page boundaries so that
3485          * dealing with the page cache is a little smoother */
3486         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3487         policy->l_extent.end |= ~CFS_PAGE_MASK;
3488
3489         /* Next, search for already existing extent locks that will cover us */
3490         /* If we're trying to read, we also search for an existing PW lock.  The
3491          * VFS and page cache already protect us locally, so lots of readers/
3492          * writers can share a single PW lock. */
3493         rc = mode;
3494         if (mode == LCK_PR)
3495                 rc |= LCK_PW;
3496         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3497                              res_id, type, policy, rc, lockh, unref);
3498         if (rc) {
3499                 if (data != NULL)
3500                         osc_set_data_with_check(lockh, data, lflags);
3501                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3502                         ldlm_lock_addref(lockh, LCK_PR);
3503                         ldlm_lock_decref(lockh, LCK_PW);
3504                 }
3505                 RETURN(rc);
3506         }
3507         RETURN(rc);
3508 }
3509
3510 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3511 {
3512         ENTRY;
3513
3514         if (unlikely(mode == LCK_GROUP))
3515                 ldlm_lock_decref_and_cancel(lockh, mode);
3516         else
3517                 ldlm_lock_decref(lockh, mode);
3518
3519         RETURN(0);
3520 }
3521
3522 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3523                       __u32 mode, struct lustre_handle *lockh)
3524 {
3525         ENTRY;
3526         RETURN(osc_cancel_base(lockh, mode));
3527 }
3528
3529 static int osc_cancel_unused(struct obd_export *exp,
3530                              struct lov_stripe_md *lsm,
3531                              ldlm_cancel_flags_t flags,
3532                              void *opaque)
3533 {
3534         struct obd_device *obd = class_exp2obd(exp);
3535         struct ldlm_res_id res_id, *resp = NULL;
3536
3537         if (lsm != NULL) {
3538                 resp = osc_build_res_name(lsm->lsm_object_id,
3539                                           lsm->lsm_object_seq, &res_id);
3540         }
3541
3542         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3543 }
3544
3545 static int osc_statfs_interpret(const struct lu_env *env,
3546                                 struct ptlrpc_request *req,
3547                                 struct osc_async_args *aa, int rc)
3548 {
3549         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3550         struct obd_statfs *msfs;
3551         __u64 used;
3552         ENTRY;
3553
3554         if (rc == -EBADR)
3555                 /* The request has in fact never been sent
3556                  * due to issues at a higher level (LOV).
3557                  * Exit immediately since the caller is
3558                  * aware of the problem and takes care
3559                  * of the clean up */
3560                  RETURN(rc);
3561
3562         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3563             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3564                 GOTO(out, rc = 0);
3565
3566         if (rc != 0)
3567                 GOTO(out, rc);
3568
3569         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3570         if (msfs == NULL) {
3571                 GOTO(out, rc = -EPROTO);
3572         }
3573
3574         /* Reinitialize the RDONLY and DEGRADED flags at the client
3575          * on each statfs, so they don't stay set permanently. */
3576         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3577
3578         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3579                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3580         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3581                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3582
3583         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3584                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3585         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3586                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3587
3588         /* Add a bit of hysteresis so this flag isn't continually flapping,
3589          * and ensure that new files don't get extremely fragmented due to
3590          * only a small amount of available space in the filesystem.
3591          * We want to set the NOSPC flag when there is less than ~0.1% free
3592          * and clear it when there is at least ~0.2% free space, so:
3593          *                   avail < ~0.1% max          max = avail + used
3594          *            1025 * avail < avail + used       used = blocks - free
3595          *            1024 * avail < used
3596          *            1024 * avail < blocks - free                      
3597          *                   avail < ((blocks - free) >> 10)    
3598          *
3599          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3600          * lose that amount of space so in those cases we report no space left
3601          * if their is less than 1 GB left.                             */
3602         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3603         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3604                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3605                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3606         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3607                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3608                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3609
3610         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3611
3612         *aa->aa_oi->oi_osfs = *msfs;
3613 out:
3614         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3615         RETURN(rc);
3616 }
3617
3618 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3619                             __u64 max_age, struct ptlrpc_request_set *rqset)
3620 {
3621         struct ptlrpc_request *req;
3622         struct osc_async_args *aa;
3623         int                    rc;
3624         ENTRY;
3625
3626         /* We could possibly pass max_age in the request (as an absolute
3627          * timestamp or a "seconds.usec ago") so the target can avoid doing
3628          * extra calls into the filesystem if that isn't necessary (e.g.
3629          * during mount that would help a bit).  Having relative timestamps
3630          * is not so great if request processing is slow, while absolute
3631          * timestamps are not ideal because they need time synchronization. */
3632         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3633         if (req == NULL)
3634                 RETURN(-ENOMEM);
3635
3636         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3637         if (rc) {
3638                 ptlrpc_request_free(req);
3639                 RETURN(rc);
3640         }
3641         ptlrpc_request_set_replen(req);
3642         req->rq_request_portal = OST_CREATE_PORTAL;
3643         ptlrpc_at_set_req_timeout(req);
3644
3645         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3646                 /* procfs requests not want stat in wait for avoid deadlock */
3647                 req->rq_no_resend = 1;
3648                 req->rq_no_delay = 1;
3649         }
3650
3651         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3652         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3653         aa = ptlrpc_req_async_args(req);
3654         aa->aa_oi = oinfo;
3655
3656         ptlrpc_set_add_req(rqset, req);
3657         RETURN(0);
3658 }
3659
3660 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3661                       __u64 max_age, __u32 flags)
3662 {
3663         struct obd_statfs     *msfs;
3664         struct ptlrpc_request *req;
3665         struct obd_import     *imp = NULL;
3666         int rc;
3667         ENTRY;
3668
3669         /*Since the request might also come from lprocfs, so we need
3670          *sync this with client_disconnect_export Bug15684*/
3671         cfs_down_read(&obd->u.cli.cl_sem);
3672         if (obd->u.cli.cl_import)
3673                 imp = class_import_get(obd->u.cli.cl_import);
3674         cfs_up_read(&obd->u.cli.cl_sem);
3675         if (!imp)
3676                 RETURN(-ENODEV);
3677
3678         /* We could possibly pass max_age in the request (as an absolute
3679          * timestamp or a "seconds.usec ago") so the target can avoid doing
3680          * extra calls into the filesystem if that isn't necessary (e.g.
3681          * during mount that would help a bit).  Having relative timestamps
3682          * is not so great if request processing is slow, while absolute
3683          * timestamps are not ideal because they need time synchronization. */
3684         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3685
3686         class_import_put(imp);
3687
3688         if (req == NULL)
3689                 RETURN(-ENOMEM);
3690
3691         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3692         if (rc) {
3693                 ptlrpc_request_free(req);
3694                 RETURN(rc);
3695         }
3696         ptlrpc_request_set_replen(req);
3697         req->rq_request_portal = OST_CREATE_PORTAL;
3698         ptlrpc_at_set_req_timeout(req);
3699
3700         if (flags & OBD_STATFS_NODELAY) {
3701                 /* procfs requests not want stat in wait for avoid deadlock */
3702                 req->rq_no_resend = 1;
3703                 req->rq_no_delay = 1;
3704         }
3705
3706         rc = ptlrpc_queue_wait(req);
3707         if (rc)
3708                 GOTO(out, rc);
3709
3710         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3711         if (msfs == NULL) {
3712                 GOTO(out, rc = -EPROTO);
3713         }
3714
3715         *osfs = *msfs;
3716
3717         EXIT;
3718  out:
3719         ptlrpc_req_finished(req);
3720         return rc;
3721 }
3722
3723 /* Retrieve object striping information.
3724  *
3725  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3726  * the maximum number of OST indices which will fit in the user buffer.
3727  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3728  */
3729 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3730 {
3731         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3732         struct lov_user_md_v3 lum, *lumk;
3733         struct lov_user_ost_data_v1 *lmm_objects;
3734         int rc = 0, lum_size;
3735         ENTRY;
3736
3737         if (!lsm)
3738                 RETURN(-ENODATA);
3739
3740         /* we only need the header part from user space to get lmm_magic and
3741          * lmm_stripe_count, (the header part is common to v1 and v3) */
3742         lum_size = sizeof(struct lov_user_md_v1);
3743         if (cfs_copy_from_user(&lum, lump, lum_size))
3744                 RETURN(-EFAULT);
3745
3746         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3747             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3748                 RETURN(-EINVAL);
3749
3750         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3751         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3752         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3753         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3754
3755         /* we can use lov_mds_md_size() to compute lum_size
3756          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3757         if (lum.lmm_stripe_count > 0) {
3758                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3759                 OBD_ALLOC(lumk, lum_size);
3760                 if (!lumk)
3761                         RETURN(-ENOMEM);
3762
3763                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3764                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3765                 else
3766                         lmm_objects = &(lumk->lmm_objects[0]);
3767                 lmm_objects->l_object_id = lsm->lsm_object_id;
3768         } else {
3769                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3770                 lumk = &lum;
3771         }
3772
3773         lumk->lmm_object_id = lsm->lsm_object_id;
3774         lumk->lmm_object_seq = lsm->lsm_object_seq;
3775         lumk->lmm_stripe_count = 1;
3776
3777         if (cfs_copy_to_user(lump, lumk, lum_size))
3778                 rc = -EFAULT;
3779
3780         if (lumk != &lum)
3781                 OBD_FREE(lumk, lum_size);
3782
3783         RETURN(rc);
3784 }
3785
3786
3787 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3788                          void *karg, void *uarg)
3789 {
3790         struct obd_device *obd = exp->exp_obd;
3791         struct obd_ioctl_data *data = karg;
3792         int err = 0;
3793         ENTRY;
3794
3795         if (!cfs_try_module_get(THIS_MODULE)) {
3796                 CERROR("Can't get module. Is it alive?");
3797                 return -EINVAL;
3798         }
3799         switch (cmd) {
3800         case OBD_IOC_LOV_GET_CONFIG: {
3801                 char *buf;
3802                 struct lov_desc *desc;
3803                 struct obd_uuid uuid;
3804
3805                 buf = NULL;
3806                 len = 0;
3807                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3808                         GOTO(out, err = -EINVAL);
3809
3810                 data = (struct obd_ioctl_data *)buf;
3811
3812                 if (sizeof(*desc) > data->ioc_inllen1) {
3813                         obd_ioctl_freedata(buf, len);
3814                         GOTO(out, err = -EINVAL);
3815                 }
3816
3817                 if (data->ioc_inllen2 < sizeof(uuid)) {
3818                         obd_ioctl_freedata(buf, len);
3819                         GOTO(out, err = -EINVAL);
3820                 }
3821
3822                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3823                 desc->ld_tgt_count = 1;
3824                 desc->ld_active_tgt_count = 1;
3825                 desc->ld_default_stripe_count = 1;
3826                 desc->ld_default_stripe_size = 0;
3827                 desc->ld_default_stripe_offset = 0;
3828                 desc->ld_pattern = 0;
3829                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3830
3831                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3832
3833                 err = cfs_copy_to_user((void *)uarg, buf, len);
3834                 if (err)
3835                         err = -EFAULT;
3836                 obd_ioctl_freedata(buf, len);
3837                 GOTO(out, err);
3838         }
3839         case LL_IOC_LOV_SETSTRIPE:
3840                 err = obd_alloc_memmd(exp, karg);
3841                 if (err > 0)
3842                         err = 0;
3843                 GOTO(out, err);
3844         case LL_IOC_LOV_GETSTRIPE:
3845                 err = osc_getstripe(karg, uarg);
3846                 GOTO(out, err);
3847         case OBD_IOC_CLIENT_RECOVER:
3848                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3849                                             data->ioc_inlbuf1);
3850                 if (err > 0)
3851                         err = 0;
3852                 GOTO(out, err);
3853         case IOC_OSC_SET_ACTIVE:
3854                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3855                                                data->ioc_offset);
3856                 GOTO(out, err);
3857         case OBD_IOC_POLL_QUOTACHECK:
3858                 err = lquota_poll_check(quota_interface, exp,
3859                                         (struct if_quotacheck *)karg);
3860                 GOTO(out, err);
3861         case OBD_IOC_PING_TARGET:
3862                 err = ptlrpc_obd_ping(obd);
3863                 GOTO(out, err);
3864         default:
3865                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3866                        cmd, cfs_curproc_comm());
3867                 GOTO(out, err = -ENOTTY);
3868         }
3869 out:
3870         cfs_module_put(THIS_MODULE);
3871         return err;
3872 }
3873
3874 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3875                         void *key, __u32 *vallen, void *val,
3876                         struct lov_stripe_md *lsm)
3877 {
3878         ENTRY;
3879         if (!vallen || !val)
3880                 RETURN(-EFAULT);
3881
3882         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3883                 __u32 *stripe = val;
3884                 *vallen = sizeof(*stripe);
3885                 *stripe = 0;
3886                 RETURN(0);
3887         } else if (KEY_IS(KEY_LAST_ID)) {
3888                 struct ptlrpc_request *req;
3889                 obd_id                *reply;
3890                 char                  *tmp;
3891                 int                    rc;
3892
3893                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3894                                            &RQF_OST_GET_INFO_LAST_ID);
3895                 if (req == NULL)
3896                         RETURN(-ENOMEM);
3897
3898                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3899                                      RCL_CLIENT, keylen);
3900                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3901                 if (rc) {
3902                         ptlrpc_request_free(req);
3903                         RETURN(rc);
3904                 }
3905
3906                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3907                 memcpy(tmp, key, keylen);
3908
3909                 req->rq_no_delay = req->rq_no_resend = 1;
3910                 ptlrpc_request_set_replen(req);
3911                 rc = ptlrpc_queue_wait(req);
3912                 if (rc)
3913                         GOTO(out, rc);
3914
3915                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3916                 if (reply == NULL)
3917                         GOTO(out, rc = -EPROTO);
3918
3919                 *((obd_id *)val) = *reply;
3920         out:
3921                 ptlrpc_req_finished(req);
3922                 RETURN(rc);
3923         } else if (KEY_IS(KEY_FIEMAP)) {
3924                 struct ptlrpc_request *req;
3925                 struct ll_user_fiemap *reply;
3926                 char *tmp;
3927                 int rc;
3928
3929                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3930                                            &RQF_OST_GET_INFO_FIEMAP);
3931                 if (req == NULL)
3932                         RETURN(-ENOMEM);
3933
3934                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3935                                      RCL_CLIENT, keylen);
3936                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3937                                      RCL_CLIENT, *vallen);
3938                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3939                                      RCL_SERVER, *vallen);
3940
3941                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3942                 if (rc) {
3943                         ptlrpc_request_free(req);
3944                         RETURN(rc);
3945                 }
3946
3947                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3948                 memcpy(tmp, key, keylen);
3949                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3950                 memcpy(tmp, val, *vallen);
3951
3952                 ptlrpc_request_set_replen(req);
3953                 rc = ptlrpc_queue_wait(req);
3954                 if (rc)
3955                         GOTO(out1, rc);
3956
3957                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3958                 if (reply == NULL)
3959                         GOTO(out1, rc = -EPROTO);
3960
3961                 memcpy(val, reply, *vallen);
3962         out1:
3963                 ptlrpc_req_finished(req);
3964
3965                 RETURN(rc);
3966         }
3967
3968         RETURN(-EINVAL);
3969 }
3970
3971 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3972 {
3973         struct llog_ctxt *ctxt;
3974         int rc = 0;
3975         ENTRY;
3976
3977         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3978         if (ctxt) {
3979                 rc = llog_initiator_connect(ctxt);
3980                 llog_ctxt_put(ctxt);
3981         } else {
3982                 /* XXX return an error? skip setting below flags? */
3983         }
3984
3985         cfs_spin_lock(&imp->imp_lock);
3986         imp->imp_server_timeout = 1;
3987         imp->imp_pingable = 1;
3988         cfs_spin_unlock(&imp->imp_lock);
3989         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3990
3991         RETURN(rc);
3992 }
3993
3994 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3995                                           struct ptlrpc_request *req,
3996                                           void *aa, int rc)
3997 {
3998         ENTRY;
3999         if (rc != 0)
4000                 RETURN(rc);
4001
4002         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4003 }
4004
4005 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4006                               void *key, obd_count vallen, void *val,
4007                               struct ptlrpc_request_set *set)
4008 {
4009         struct ptlrpc_request *req;
4010         struct obd_device     *obd = exp->exp_obd;
4011         struct obd_import     *imp = class_exp2cliimp(exp);
4012         char                  *tmp;
4013         int                    rc;
4014         ENTRY;
4015
4016         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4017
4018         if (KEY_IS(KEY_NEXT_ID)) {
4019                 obd_id new_val;
4020                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4021
4022                 if (vallen != sizeof(obd_id))
4023                         RETURN(-ERANGE);
4024                 if (val == NULL)
4025                         RETURN(-EINVAL);
4026
4027                 if (vallen != sizeof(obd_id))
4028                         RETURN(-EINVAL);
4029
4030                 /* avoid race between allocate new object and set next id
4031                  * from ll_sync thread */
4032                 cfs_spin_lock(&oscc->oscc_lock);
4033                 new_val = *((obd_id*)val) + 1;
4034                 if (new_val > oscc->oscc_next_id)
4035                         oscc->oscc_next_id = new_val;
4036                 cfs_spin_unlock(&oscc->oscc_lock);
4037                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4038                        exp->exp_obd->obd_name,
4039                        obd->u.cli.cl_oscc.oscc_next_id);
4040
4041                 RETURN(0);
4042         }
4043
4044         if (KEY_IS(KEY_CHECKSUM)) {
4045                 if (vallen != sizeof(int))
4046                         RETURN(-EINVAL);
4047                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4048                 RETURN(0);
4049         }
4050
4051         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4052                 sptlrpc_conf_client_adapt(obd);
4053                 RETURN(0);
4054         }
4055
4056         if (KEY_IS(KEY_FLUSH_CTX)) {
4057                 sptlrpc_import_flush_my_ctx(imp);
4058                 RETURN(0);
4059         }
4060
4061         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4062                 RETURN(-EINVAL);
4063
4064         /* We pass all other commands directly to OST. Since nobody calls osc
4065            methods directly and everybody is supposed to go through LOV, we
4066            assume lov checked invalid values for us.
4067            The only recognised values so far are evict_by_nid and mds_conn.
4068            Even if something bad goes through, we'd get a -EINVAL from OST
4069            anyway. */
4070
4071         if (KEY_IS(KEY_GRANT_SHRINK))
4072                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4073         else
4074                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4075
4076         if (req == NULL)
4077                 RETURN(-ENOMEM);
4078
4079         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4080                              RCL_CLIENT, keylen);
4081         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4082                              RCL_CLIENT, vallen);
4083         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4084         if (rc) {
4085                 ptlrpc_request_free(req);
4086                 RETURN(rc);
4087         }
4088
4089         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4090         memcpy(tmp, key, keylen);
4091         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4092         memcpy(tmp, val, vallen);
4093
4094         if (KEY_IS(KEY_MDS_CONN)) {
4095                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4096
4097                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4098                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4099                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4100                 req->rq_no_delay = req->rq_no_resend = 1;
4101                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4102         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4103                 struct osc_grant_args *aa;
4104                 struct obdo *oa;
4105
4106                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4107                 aa = ptlrpc_req_async_args(req);
4108                 OBDO_ALLOC(oa);
4109                 if (!oa) {
4110                         ptlrpc_req_finished(req);
4111                         RETURN(-ENOMEM);
4112                 }
4113                 *oa = ((struct ost_body *)val)->oa;
4114                 aa->aa_oa = oa;
4115                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4116         }
4117
4118         ptlrpc_request_set_replen(req);
4119         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4120                 LASSERT(set != NULL);
4121                 ptlrpc_set_add_req(set, req);
4122                 ptlrpc_check_set(NULL, set);
4123         } else
4124                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4125
4126         RETURN(0);
4127 }
4128
4129
4130 static struct llog_operations osc_size_repl_logops = {
4131         lop_cancel: llog_obd_repl_cancel
4132 };
4133
4134 static struct llog_operations osc_mds_ost_orig_logops;
4135
4136 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4137                            struct obd_device *tgt, struct llog_catid *catid)
4138 {
4139         int rc;
4140         ENTRY;
4141
4142         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4143                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4144         if (rc) {
4145                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4146                 GOTO(out, rc);
4147         }
4148
4149         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4150                         NULL, &osc_size_repl_logops);
4151         if (rc) {
4152                 struct llog_ctxt *ctxt =
4153                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4154                 if (ctxt)
4155                         llog_cleanup(ctxt);
4156                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4157         }
4158         GOTO(out, rc);
4159 out:
4160         if (rc) {
4161                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4162                        obd->obd_name, tgt->obd_name, catid, rc);
4163                 CERROR("logid "LPX64":0x%x\n",
4164                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4165         }
4166         return rc;
4167 }
4168
4169 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4170                          struct obd_device *disk_obd, int *index)
4171 {
4172         struct llog_catid catid;
4173         static char name[32] = CATLIST;
4174         int rc;
4175         ENTRY;
4176
4177         LASSERT(olg == &obd->obd_olg);
4178
4179         cfs_mutex_down(&olg->olg_cat_processing);
4180         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4181         if (rc) {
4182                 CERROR("rc: %d\n", rc);
4183                 GOTO(out, rc);
4184         }
4185
4186         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4187                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4188                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4189
4190         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4191         if (rc) {
4192                 CERROR("rc: %d\n", rc);
4193                 GOTO(out, rc);
4194         }
4195
4196         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4197         if (rc) {
4198                 CERROR("rc: %d\n", rc);
4199                 GOTO(out, rc);
4200         }
4201
4202  out:
4203         cfs_mutex_up(&olg->olg_cat_processing);
4204
4205         return rc;
4206 }
4207
4208 static int osc_llog_finish(struct obd_device *obd, int count)
4209 {
4210         struct llog_ctxt *ctxt;
4211         int rc = 0, rc2 = 0;
4212         ENTRY;
4213
4214         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4215         if (ctxt)
4216                 rc = llog_cleanup(ctxt);
4217
4218         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4219         if (ctxt)
4220                 rc2 = llog_cleanup(ctxt);
4221         if (!rc)
4222                 rc = rc2;
4223
4224         RETURN(rc);
4225 }
4226
4227 static int osc_reconnect(const struct lu_env *env,
4228                          struct obd_export *exp, struct obd_device *obd,
4229                          struct obd_uuid *cluuid,
4230                          struct obd_connect_data *data,
4231                          void *localdata)
4232 {
4233         struct client_obd *cli = &obd->u.cli;
4234
4235         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4236                 long lost_grant;
4237
4238                 client_obd_list_lock(&cli->cl_loi_list_lock);
4239                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4240                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4241                 lost_grant = cli->cl_lost_grant;
4242                 cli->cl_lost_grant = 0;
4243                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4244
4245                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4246                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4247                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4248                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4249                        " ocd_grant: %d\n", data->ocd_connect_flags,
4250                        data->ocd_version, data->ocd_grant);
4251         }
4252
4253         RETURN(0);
4254 }
4255
4256 static int osc_disconnect(struct obd_export *exp)
4257 {
4258         struct obd_device *obd = class_exp2obd(exp);
4259         struct llog_ctxt  *ctxt;
4260         int rc;
4261
4262         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4263         if (ctxt) {
4264                 if (obd->u.cli.cl_conn_count == 1) {
4265                         /* Flush any remaining cancel messages out to the
4266                          * target */
4267                         llog_sync(ctxt, exp);
4268                 }
4269                 llog_ctxt_put(ctxt);
4270         } else {
4271                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4272                        obd);
4273         }
4274
4275         rc = client_disconnect_export(exp);
4276         /**
4277          * Initially we put del_shrink_grant before disconnect_export, but it
4278          * causes the following problem if setup (connect) and cleanup
4279          * (disconnect) are tangled together.
4280          *      connect p1                     disconnect p2
4281          *   ptlrpc_connect_import
4282          *     ...............               class_manual_cleanup
4283          *                                     osc_disconnect
4284          *                                     del_shrink_grant
4285          *   ptlrpc_connect_interrupt
4286          *     init_grant_shrink
4287          *   add this client to shrink list
4288          *                                      cleanup_osc
4289          * Bang! pinger trigger the shrink.
4290          * So the osc should be disconnected from the shrink list, after we
4291          * are sure the import has been destroyed. BUG18662
4292          */
4293         if (obd->u.cli.cl_import == NULL)
4294                 osc_del_shrink_grant(&obd->u.cli);
4295         return rc;
4296 }
4297
4298 static int osc_import_event(struct obd_device *obd,
4299                             struct obd_import *imp,
4300                             enum obd_import_event event)
4301 {
4302         struct client_obd *cli;
4303         int rc = 0;
4304
4305         ENTRY;
4306         LASSERT(imp->imp_obd == obd);
4307
4308         switch (event) {
4309         case IMP_EVENT_DISCON: {
4310                 /* Only do this on the MDS OSC's */
4311                 if (imp->imp_server_timeout) {
4312                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4313
4314                         cfs_spin_lock(&oscc->oscc_lock);
4315                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4316                         cfs_spin_unlock(&oscc->oscc_lock);
4317                 }
4318                 cli = &obd->u.cli;
4319                 client_obd_list_lock(&cli->cl_loi_list_lock);
4320                 cli->cl_avail_grant = 0;
4321                 cli->cl_lost_grant = 0;
4322                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4323                 break;
4324         }
4325         case IMP_EVENT_INACTIVE: {
4326                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4327                 break;
4328         }
4329         case IMP_EVENT_INVALIDATE: {
4330                 struct ldlm_namespace *ns = obd->obd_namespace;
4331                 struct lu_env         *env;
4332                 int                    refcheck;
4333
4334                 env = cl_env_get(&refcheck);
4335                 if (!IS_ERR(env)) {
4336                         /* Reset grants */
4337                         cli = &obd->u.cli;
4338                         client_obd_list_lock(&cli->cl_loi_list_lock);
4339                         /* all pages go to failing rpcs due to the invalid
4340                          * import */
4341                         osc_check_rpcs(env, cli);
4342                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4343
4344                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4345                         cl_env_put(env, &refcheck);
4346                 } else
4347                         rc = PTR_ERR(env);
4348                 break;
4349         }
4350         case IMP_EVENT_ACTIVE: {
4351                 /* Only do this on the MDS OSC's */
4352                 if (imp->imp_server_timeout) {
4353                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4354
4355                         cfs_spin_lock(&oscc->oscc_lock);
4356                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4357                         cfs_spin_unlock(&oscc->oscc_lock);
4358                 }
4359                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4360                 break;
4361         }
4362         case IMP_EVENT_OCD: {
4363                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4364
4365                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4366                         osc_init_grant(&obd->u.cli, ocd);
4367
4368                 /* See bug 7198 */
4369                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4370                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4371
4372                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4373                 break;
4374         }
4375         default:
4376                 CERROR("Unknown import event %d\n", event);
4377                 LBUG();
4378         }
4379         RETURN(rc);
4380 }
4381
4382 /**
4383  * Determine whether the lock can be canceled before replaying the lock
4384  * during recovery, see bug16774 for detailed information.
4385  *
4386  * \retval zero the lock can't be canceled
4387  * \retval other ok to cancel
4388  */
4389 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4390 {
4391         check_res_locked(lock->l_resource);
4392
4393         /*
4394          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4395          *
4396          * XXX as a future improvement, we can also cancel unused write lock
4397          * if it doesn't have dirty data and active mmaps.
4398          */
4399         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4400             (lock->l_granted_mode == LCK_PR ||
4401              lock->l_granted_mode == LCK_CR) &&
4402             (osc_dlm_lock_pageref(lock) == 0))
4403                 RETURN(1);
4404
4405         RETURN(0);
4406 }
4407
4408 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4409 {
4410         int rc;
4411         ENTRY;
4412
4413         ENTRY;
4414         rc = ptlrpcd_addref();
4415         if (rc)
4416                 RETURN(rc);
4417
4418         rc = client_obd_setup(obd, lcfg);
4419         if (rc) {
4420                 ptlrpcd_decref();
4421         } else {
4422                 struct lprocfs_static_vars lvars = { 0 };
4423                 struct client_obd *cli = &obd->u.cli;
4424
4425                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4426                 lprocfs_osc_init_vars(&lvars);
4427                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4428                         lproc_osc_attach_seqstat(obd);
4429                         sptlrpc_lprocfs_cliobd_attach(obd);
4430                         ptlrpc_lprocfs_register_obd(obd);
4431                 }
4432
4433                 oscc_init(obd);
4434                 /* We need to allocate a few requests more, because
4435                    brw_interpret tries to create new requests before freeing
4436                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4437                    reserved, but I afraid that might be too much wasted RAM
4438                    in fact, so 2 is just my guess and still should work. */
4439                 cli->cl_import->imp_rq_pool =
4440                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4441                                             OST_MAXREQSIZE,
4442                                             ptlrpc_add_rqs_to_pool);
4443
4444                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4445                 cfs_sema_init(&cli->cl_grant_sem, 1);
4446
4447                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4448         }
4449
4450         RETURN(rc);
4451 }
4452
4453 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4454 {
4455         int rc = 0;
4456         ENTRY;
4457
4458         switch (stage) {
4459         case OBD_CLEANUP_EARLY: {
4460                 struct obd_import *imp;
4461                 imp = obd->u.cli.cl_import;
4462                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4463                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4464                 ptlrpc_deactivate_import(imp);
4465                 cfs_spin_lock(&imp->imp_lock);
4466                 imp->imp_pingable = 0;
4467                 cfs_spin_unlock(&imp->imp_lock);
4468                 break;
4469         }
4470         case OBD_CLEANUP_EXPORTS: {
4471                 /* If we set up but never connected, the
4472                    client import will not have been cleaned. */
4473                 if (obd->u.cli.cl_import) {
4474                         struct obd_import *imp;
4475                         cfs_down_write(&obd->u.cli.cl_sem);
4476                         imp = obd->u.cli.cl_import;
4477                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4478                                obd->obd_name);
4479                         ptlrpc_invalidate_import(imp);
4480                         if (imp->imp_rq_pool) {
4481                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4482                                 imp->imp_rq_pool = NULL;
4483                         }
4484                         class_destroy_import(imp);
4485                         cfs_up_write(&obd->u.cli.cl_sem);
4486                         obd->u.cli.cl_import = NULL;
4487                 }
4488                 rc = obd_llog_finish(obd, 0);
4489                 if (rc != 0)
4490                         CERROR("failed to cleanup llogging subsystems\n");
4491                 break;
4492                 }
4493         }
4494         RETURN(rc);
4495 }
4496
4497 int osc_cleanup(struct obd_device *obd)
4498 {
4499         int rc;
4500
4501         ENTRY;
4502         ptlrpc_lprocfs_unregister_obd(obd);
4503         lprocfs_obd_cleanup(obd);
4504
4505         /* free memory of osc quota cache */
4506         lquota_cleanup(quota_interface, obd);
4507
4508         rc = client_obd_cleanup(obd);
4509
4510         ptlrpcd_decref();
4511         RETURN(rc);
4512 }
4513
4514 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4515 {
4516         struct lprocfs_static_vars lvars = { 0 };
4517         int rc = 0;
4518
4519         lprocfs_osc_init_vars(&lvars);
4520
4521         switch (lcfg->lcfg_command) {
4522         default:
4523                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4524                                               lcfg, obd);
4525                 if (rc > 0)
4526                         rc = 0;
4527                 break;
4528         }
4529
4530         return(rc);
4531 }
4532
4533 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4534 {
4535         return osc_process_config_base(obd, buf);
4536 }
4537
4538 struct obd_ops osc_obd_ops = {
4539         .o_owner                = THIS_MODULE,
4540         .o_setup                = osc_setup,
4541         .o_precleanup           = osc_precleanup,
4542         .o_cleanup              = osc_cleanup,
4543         .o_add_conn             = client_import_add_conn,
4544         .o_del_conn             = client_import_del_conn,
4545         .o_connect              = client_connect_import,
4546         .o_reconnect            = osc_reconnect,
4547         .o_disconnect           = osc_disconnect,
4548         .o_statfs               = osc_statfs,
4549         .o_statfs_async         = osc_statfs_async,
4550         .o_packmd               = osc_packmd,
4551         .o_unpackmd             = osc_unpackmd,
4552         .o_precreate            = osc_precreate,
4553         .o_create               = osc_create,
4554         .o_create_async         = osc_create_async,
4555         .o_destroy              = osc_destroy,
4556         .o_getattr              = osc_getattr,
4557         .o_getattr_async        = osc_getattr_async,
4558         .o_setattr              = osc_setattr,
4559         .o_setattr_async        = osc_setattr_async,
4560         .o_brw                  = osc_brw,
4561         .o_punch                = osc_punch,
4562         .o_sync                 = osc_sync,
4563         .o_enqueue              = osc_enqueue,
4564         .o_change_cbdata        = osc_change_cbdata,
4565         .o_find_cbdata          = osc_find_cbdata,
4566         .o_cancel               = osc_cancel,
4567         .o_cancel_unused        = osc_cancel_unused,
4568         .o_iocontrol            = osc_iocontrol,
4569         .o_get_info             = osc_get_info,
4570         .o_set_info_async       = osc_set_info_async,
4571         .o_import_event         = osc_import_event,
4572         .o_llog_init            = osc_llog_init,
4573         .o_llog_finish          = osc_llog_finish,
4574         .o_process_config       = osc_process_config,
4575 };
4576
4577 extern struct lu_kmem_descr osc_caches[];
4578 extern cfs_spinlock_t       osc_ast_guard;
4579 extern cfs_lock_class_key_t osc_ast_guard_class;
4580
4581 int __init osc_init(void)
4582 {
4583         struct lprocfs_static_vars lvars = { 0 };
4584         int rc;
4585         ENTRY;
4586
4587         /* print an address of _any_ initialized kernel symbol from this
4588          * module, to allow debugging with gdb that doesn't support data
4589          * symbols from modules.*/
4590         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4591
4592         rc = lu_kmem_init(osc_caches);
4593
4594         lprocfs_osc_init_vars(&lvars);
4595
4596         cfs_request_module("lquota");
4597         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4598         lquota_init(quota_interface);
4599         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4600
4601         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4602                                  LUSTRE_OSC_NAME, &osc_device_type);
4603         if (rc) {
4604                 if (quota_interface)
4605                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4606                 lu_kmem_fini(osc_caches);
4607                 RETURN(rc);
4608         }
4609
4610         cfs_spin_lock_init(&osc_ast_guard);
4611         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4612
4613         osc_mds_ost_orig_logops = llog_lvfs_ops;
4614         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4615         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4616         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4617         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4618
4619         RETURN(rc);
4620 }
4621
4622 #ifdef __KERNEL__
4623 static void /*__exit*/ osc_exit(void)
4624 {
4625         lu_device_type_fini(&osc_device_type);
4626
4627         lquota_exit(quota_interface);
4628         if (quota_interface)
4629                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4630
4631         class_unregister_type(LUSTRE_OSC_NAME);
4632         lu_kmem_fini(osc_caches);
4633 }
4634
4635 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4636 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4637 MODULE_LICENSE("GPL");
4638
4639 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4640 #endif