Whamcloud - gitweb
LU-498 pass oap instead of cookie to osc_teardown_async_page/osc_queue_async_io
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_async_args *aa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *aa->aa_oi->oi_oa = body->oa;
585 out:
586         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
587         RETURN(rc);
588 }
589
590 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
591                     obd_size start, obd_size end,
592                     struct ptlrpc_request_set *set)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_async_args *aa;
597         int                    rc;
598         ENTRY;
599
600         if (!oinfo->oi_oa) {
601                 CDEBUG(D_INFO, "oa NULL\n");
602                 RETURN(-EINVAL);
603         }
604
605         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
606         if (req == NULL)
607                 RETURN(-ENOMEM);
608
609         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
610         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
611         if (rc) {
612                 ptlrpc_request_free(req);
613                 RETURN(rc);
614         }
615
616         /* overload the size and blocks fields in the oa with start/end */
617         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
618         LASSERT(body);
619         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
620         body->oa.o_size = start;
621         body->oa.o_blocks = end;
622         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
623         osc_pack_capa(req, body, oinfo->oi_capa);
624
625         ptlrpc_request_set_replen(req);
626         req->rq_interpret_reply = osc_sync_interpret;
627
628         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
629         aa = ptlrpc_req_async_args(req);
630         aa->aa_oi = oinfo;
631
632         ptlrpc_set_add_req(set, req);
633         RETURN (0);
634 }
635
636 /* Find and cancel locally locks matched by @mode in the resource found by
637  * @objid. Found locks are added into @cancel list. Returns the amount of
638  * locks added to @cancels list. */
639 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
640                                    cfs_list_t *cancels,
641                                    ldlm_mode_t mode, int lock_flags)
642 {
643         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
644         struct ldlm_res_id res_id;
645         struct ldlm_resource *res;
646         int count;
647         ENTRY;
648
649         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
650         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
651         if (res == NULL)
652                 RETURN(0);
653
654         LDLM_RESOURCE_ADDREF(res);
655         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
656                                            lock_flags, 0, NULL);
657         LDLM_RESOURCE_DELREF(res);
658         ldlm_resource_putref(res);
659         RETURN(count);
660 }
661
662 static int osc_destroy_interpret(const struct lu_env *env,
663                                  struct ptlrpc_request *req, void *data,
664                                  int rc)
665 {
666         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
667
668         cfs_atomic_dec(&cli->cl_destroy_in_flight);
669         cfs_waitq_signal(&cli->cl_destroy_waitq);
670         return 0;
671 }
672
673 static int osc_can_send_destroy(struct client_obd *cli)
674 {
675         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
676             cli->cl_max_rpcs_in_flight) {
677                 /* The destroy request can be sent */
678                 return 1;
679         }
680         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
681             cli->cl_max_rpcs_in_flight) {
682                 /*
683                  * The counter has been modified between the two atomic
684                  * operations.
685                  */
686                 cfs_waitq_signal(&cli->cl_destroy_waitq);
687         }
688         return 0;
689 }
690
691 /* Destroy requests can be async always on the client, and we don't even really
692  * care about the return code since the client cannot do anything at all about
693  * a destroy failure.
694  * When the MDS is unlinking a filename, it saves the file objects into a
695  * recovery llog, and these object records are cancelled when the OST reports
696  * they were destroyed and sync'd to disk (i.e. transaction committed).
697  * If the client dies, or the OST is down when the object should be destroyed,
698  * the records are not cancelled, and when the OST reconnects to the MDS next,
699  * it will retrieve the llog unlink logs and then sends the log cancellation
700  * cookies to the MDS after committing destroy transactions. */
701 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
702                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
703                        struct obd_export *md_export, void *capa)
704 {
705         struct client_obd     *cli = &exp->exp_obd->u.cli;
706         struct ptlrpc_request *req;
707         struct ost_body       *body;
708         CFS_LIST_HEAD(cancels);
709         int rc, count;
710         ENTRY;
711
712         if (!oa) {
713                 CDEBUG(D_INFO, "oa NULL\n");
714                 RETURN(-EINVAL);
715         }
716
717         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
718                                         LDLM_FL_DISCARD_DATA);
719
720         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
721         if (req == NULL) {
722                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
723                 RETURN(-ENOMEM);
724         }
725
726         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
727         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
728                                0, &cancels, count);
729         if (rc) {
730                 ptlrpc_request_free(req);
731                 RETURN(rc);
732         }
733
734         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
735         ptlrpc_at_set_req_timeout(req);
736
737         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
738                 oa->o_lcookie = *oti->oti_logcookies;
739         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
740         LASSERT(body);
741         lustre_set_wire_obdo(&body->oa, oa);
742
743         osc_pack_capa(req, body, (struct obd_capa *)capa);
744         ptlrpc_request_set_replen(req);
745
746         /* don't throttle destroy RPCs for the MDT */
747         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
748                 req->rq_interpret_reply = osc_destroy_interpret;
749                 if (!osc_can_send_destroy(cli)) {
750                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
751                                                           NULL);
752
753                         /*
754                          * Wait until the number of on-going destroy RPCs drops
755                          * under max_rpc_in_flight
756                          */
757                         l_wait_event_exclusive(cli->cl_destroy_waitq,
758                                                osc_can_send_destroy(cli), &lwi);
759                 }
760         }
761
762         /* Do not wait for response */
763         ptlrpcd_add_req(req, PSCOPE_OTHER);
764         RETURN(0);
765 }
766
767 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
768                                 long writing_bytes)
769 {
770         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
771
772         LASSERT(!(oa->o_valid & bits));
773
774         oa->o_valid |= bits;
775         client_obd_list_lock(&cli->cl_loi_list_lock);
776         oa->o_dirty = cli->cl_dirty;
777         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
778                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
779                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
780                 oa->o_undirty = 0;
781         } else if (cfs_atomic_read(&obd_dirty_pages) -
782                    cfs_atomic_read(&obd_dirty_transit_pages) >
783                    obd_max_dirty_pages + 1){
784                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
785                  * not covered by a lock thus they may safely race and trip
786                  * this CERROR() unless we add in a small fudge factor (+1). */
787                 CERROR("dirty %d - %d > system dirty_max %d\n",
788                        cfs_atomic_read(&obd_dirty_pages),
789                        cfs_atomic_read(&obd_dirty_transit_pages),
790                        obd_max_dirty_pages);
791                 oa->o_undirty = 0;
792         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
793                 CERROR("dirty %lu - dirty_max %lu too big???\n",
794                        cli->cl_dirty, cli->cl_dirty_max);
795                 oa->o_undirty = 0;
796         } else {
797                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
798                                 (cli->cl_max_rpcs_in_flight + 1);
799                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
800         }
801         oa->o_grant = cli->cl_avail_grant;
802         oa->o_dropped = cli->cl_lost_grant;
803         cli->cl_lost_grant = 0;
804         client_obd_list_unlock(&cli->cl_loi_list_lock);
805         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
806                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
807
808 }
809
810 static void osc_update_next_shrink(struct client_obd *cli)
811 {
812         cli->cl_next_shrink_grant =
813                 cfs_time_shift(cli->cl_grant_shrink_interval);
814         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
815                cli->cl_next_shrink_grant);
816 }
817
818 /* caller must hold loi_list_lock */
819 static void osc_consume_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga)
821 {
822         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
823         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
824         cfs_atomic_inc(&obd_dirty_pages);
825         cli->cl_dirty += CFS_PAGE_SIZE;
826         cli->cl_avail_grant -= CFS_PAGE_SIZE;
827         pga->flag |= OBD_BRW_FROM_GRANT;
828         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
829                CFS_PAGE_SIZE, pga, pga->pg);
830         LASSERT(cli->cl_avail_grant >= 0);
831         osc_update_next_shrink(cli);
832 }
833
834 /* the companion to osc_consume_write_grant, called when a brw has completed.
835  * must be called with the loi lock held. */
836 static void osc_release_write_grant(struct client_obd *cli,
837                                     struct brw_page *pga, int sent)
838 {
839         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
840         ENTRY;
841
842         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
843         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
844                 EXIT;
845                 return;
846         }
847
848         pga->flag &= ~OBD_BRW_FROM_GRANT;
849         cfs_atomic_dec(&obd_dirty_pages);
850         cli->cl_dirty -= CFS_PAGE_SIZE;
851         if (pga->flag & OBD_BRW_NOCACHE) {
852                 pga->flag &= ~OBD_BRW_NOCACHE;
853                 cfs_atomic_dec(&obd_dirty_transit_pages);
854                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
855         }
856         if (!sent) {
857                 cli->cl_lost_grant += CFS_PAGE_SIZE;
858                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
859                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
860         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
861                 /* For short writes we shouldn't count parts of pages that
862                  * span a whole block on the OST side, or our accounting goes
863                  * wrong.  Should match the code in filter_grant_check. */
864                 int offset = pga->off & ~CFS_PAGE_MASK;
865                 int count = pga->count + (offset & (blocksize - 1));
866                 int end = (offset + pga->count) & (blocksize - 1);
867                 if (end)
868                         count += blocksize - end;
869
870                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
871                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
872                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
873                        cli->cl_avail_grant, cli->cl_dirty);
874         }
875
876         EXIT;
877 }
878
879 static unsigned long rpcs_in_flight(struct client_obd *cli)
880 {
881         return cli->cl_r_in_flight + cli->cl_w_in_flight;
882 }
883
884 /* caller must hold loi_list_lock */
885 void osc_wake_cache_waiters(struct client_obd *cli)
886 {
887         cfs_list_t *l, *tmp;
888         struct osc_cache_waiter *ocw;
889
890         ENTRY;
891         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
892                 /* if we can't dirty more, we must wait until some is written */
893                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
894                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
895                     obd_max_dirty_pages)) {
896                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
897                                "osc max %ld, sys max %d\n", cli->cl_dirty,
898                                cli->cl_dirty_max, obd_max_dirty_pages);
899                         return;
900                 }
901
902                 /* if still dirty cache but no grant wait for pending RPCs that
903                  * may yet return us some grant before doing sync writes */
904                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
905                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
906                                cli->cl_w_in_flight);
907                         return;
908                 }
909
910                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
911                 cfs_list_del_init(&ocw->ocw_entry);
912                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
913                         /* no more RPCs in flight to return grant, do sync IO */
914                         ocw->ocw_rc = -EDQUOT;
915                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
916                 } else {
917                         osc_consume_write_grant(cli,
918                                                 &ocw->ocw_oap->oap_brw_page);
919                 }
920
921                 cfs_waitq_signal(&ocw->ocw_waitq);
922         }
923
924         EXIT;
925 }
926
927 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
928 {
929         client_obd_list_lock(&cli->cl_loi_list_lock);
930         cli->cl_avail_grant += grant;
931         client_obd_list_unlock(&cli->cl_loi_list_lock);
932 }
933
934 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
935 {
936         if (body->oa.o_valid & OBD_MD_FLGRANT) {
937                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
938                 __osc_update_grant(cli, body->oa.o_grant);
939         }
940 }
941
942 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
943                               void *key, obd_count vallen, void *val,
944                               struct ptlrpc_request_set *set);
945
946 static int osc_shrink_grant_interpret(const struct lu_env *env,
947                                       struct ptlrpc_request *req,
948                                       void *aa, int rc)
949 {
950         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
951         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
952         struct ost_body *body;
953
954         if (rc != 0) {
955                 __osc_update_grant(cli, oa->o_grant);
956                 GOTO(out, rc);
957         }
958
959         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
960         LASSERT(body);
961         osc_update_grant(cli, body);
962 out:
963         OBDO_FREE(oa);
964         return rc;
965 }
966
967 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
968 {
969         client_obd_list_lock(&cli->cl_loi_list_lock);
970         oa->o_grant = cli->cl_avail_grant / 4;
971         cli->cl_avail_grant -= oa->o_grant;
972         client_obd_list_unlock(&cli->cl_loi_list_lock);
973         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
974                 oa->o_valid |= OBD_MD_FLFLAGS;
975                 oa->o_flags = 0;
976         }
977         oa->o_flags |= OBD_FL_SHRINK_GRANT;
978         osc_update_next_shrink(cli);
979 }
980
981 /* Shrink the current grant, either from some large amount to enough for a
982  * full set of in-flight RPCs, or if we have already shrunk to that limit
983  * then to enough for a single RPC.  This avoids keeping more grant than
984  * needed, and avoids shrinking the grant piecemeal. */
985 static int osc_shrink_grant(struct client_obd *cli)
986 {
987         long target = (cli->cl_max_rpcs_in_flight + 1) *
988                       cli->cl_max_pages_per_rpc;
989
990         client_obd_list_lock(&cli->cl_loi_list_lock);
991         if (cli->cl_avail_grant <= target)
992                 target = cli->cl_max_pages_per_rpc;
993         client_obd_list_unlock(&cli->cl_loi_list_lock);
994
995         return osc_shrink_grant_to_target(cli, target);
996 }
997
998 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
999 {
1000         int    rc = 0;
1001         struct ost_body     *body;
1002         ENTRY;
1003
1004         client_obd_list_lock(&cli->cl_loi_list_lock);
1005         /* Don't shrink if we are already above or below the desired limit
1006          * We don't want to shrink below a single RPC, as that will negatively
1007          * impact block allocation and long-term performance. */
1008         if (target < cli->cl_max_pages_per_rpc)
1009                 target = cli->cl_max_pages_per_rpc;
1010
1011         if (target >= cli->cl_avail_grant) {
1012                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013                 RETURN(0);
1014         }
1015         client_obd_list_unlock(&cli->cl_loi_list_lock);
1016
1017         OBD_ALLOC_PTR(body);
1018         if (!body)
1019                 RETURN(-ENOMEM);
1020
1021         osc_announce_cached(cli, &body->oa, 0);
1022
1023         client_obd_list_lock(&cli->cl_loi_list_lock);
1024         body->oa.o_grant = cli->cl_avail_grant - target;
1025         cli->cl_avail_grant = target;
1026         client_obd_list_unlock(&cli->cl_loi_list_lock);
1027         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1028                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1029                 body->oa.o_flags = 0;
1030         }
1031         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1032         osc_update_next_shrink(cli);
1033
1034         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1035                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1036                                 sizeof(*body), body, NULL);
1037         if (rc != 0)
1038                 __osc_update_grant(cli, body->oa.o_grant);
1039         OBD_FREE_PTR(body);
1040         RETURN(rc);
1041 }
1042
1043 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1044 static int osc_should_shrink_grant(struct client_obd *client)
1045 {
1046         cfs_time_t time = cfs_time_current();
1047         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1048
1049         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1050              OBD_CONNECT_GRANT_SHRINK) == 0)
1051                 return 0;
1052
1053         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1054                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1055                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1056                         return 1;
1057                 else
1058                         osc_update_next_shrink(client);
1059         }
1060         return 0;
1061 }
1062
1063 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1064 {
1065         struct client_obd *client;
1066
1067         cfs_list_for_each_entry(client, &item->ti_obd_list,
1068                                 cl_grant_shrink_list) {
1069                 if (osc_should_shrink_grant(client))
1070                         osc_shrink_grant(client);
1071         }
1072         return 0;
1073 }
1074
1075 static int osc_add_shrink_grant(struct client_obd *client)
1076 {
1077         int rc;
1078
1079         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1080                                        TIMEOUT_GRANT,
1081                                        osc_grant_shrink_grant_cb, NULL,
1082                                        &client->cl_grant_shrink_list);
1083         if (rc) {
1084                 CERROR("add grant client %s error %d\n",
1085                         client->cl_import->imp_obd->obd_name, rc);
1086                 return rc;
1087         }
1088         CDEBUG(D_CACHE, "add grant client %s \n",
1089                client->cl_import->imp_obd->obd_name);
1090         osc_update_next_shrink(client);
1091         return 0;
1092 }
1093
1094 static int osc_del_shrink_grant(struct client_obd *client)
1095 {
1096         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1097                                          TIMEOUT_GRANT);
1098 }
1099
1100 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1101 {
1102         /*
1103          * ocd_grant is the total grant amount we're expect to hold: if we've
1104          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1105          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1106          *
1107          * race is tolerable here: if we're evicted, but imp_state already
1108          * left EVICTED state, then cl_dirty must be 0 already.
1109          */
1110         client_obd_list_lock(&cli->cl_loi_list_lock);
1111         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1112                 cli->cl_avail_grant = ocd->ocd_grant;
1113         else
1114                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1115
1116         if (cli->cl_avail_grant < 0) {
1117                 CWARN("%s: available grant < 0, the OSS is probably not running"
1118                       " with patch from bug20278 (%ld) \n",
1119                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1120                 /* workaround for 1.6 servers which do not have 
1121                  * the patch from bug20278 */
1122                 cli->cl_avail_grant = ocd->ocd_grant;
1123         }
1124
1125         client_obd_list_unlock(&cli->cl_loi_list_lock);
1126
1127         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1128                cli->cl_import->imp_obd->obd_name,
1129                cli->cl_avail_grant, cli->cl_lost_grant);
1130
1131         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1132             cfs_list_empty(&cli->cl_grant_shrink_list))
1133                 osc_add_shrink_grant(cli);
1134 }
1135
1136 /* We assume that the reason this OSC got a short read is because it read
1137  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1138  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1139  * this stripe never got written at or beyond this stripe offset yet. */
1140 static void handle_short_read(int nob_read, obd_count page_count,
1141                               struct brw_page **pga)
1142 {
1143         char *ptr;
1144         int i = 0;
1145
1146         /* skip bytes read OK */
1147         while (nob_read > 0) {
1148                 LASSERT (page_count > 0);
1149
1150                 if (pga[i]->count > nob_read) {
1151                         /* EOF inside this page */
1152                         ptr = cfs_kmap(pga[i]->pg) +
1153                                 (pga[i]->off & ~CFS_PAGE_MASK);
1154                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1155                         cfs_kunmap(pga[i]->pg);
1156                         page_count--;
1157                         i++;
1158                         break;
1159                 }
1160
1161                 nob_read -= pga[i]->count;
1162                 page_count--;
1163                 i++;
1164         }
1165
1166         /* zero remaining pages */
1167         while (page_count-- > 0) {
1168                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1169                 memset(ptr, 0, pga[i]->count);
1170                 cfs_kunmap(pga[i]->pg);
1171                 i++;
1172         }
1173 }
1174
1175 static int check_write_rcs(struct ptlrpc_request *req,
1176                            int requested_nob, int niocount,
1177                            obd_count page_count, struct brw_page **pga)
1178 {
1179         int     i;
1180         __u32   *remote_rcs;
1181
1182         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1183                                                   sizeof(*remote_rcs) *
1184                                                   niocount);
1185         if (remote_rcs == NULL) {
1186                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1187                 return(-EPROTO);
1188         }
1189
1190         /* return error if any niobuf was in error */
1191         for (i = 0; i < niocount; i++) {
1192                 if (remote_rcs[i] < 0)
1193                         return(remote_rcs[i]);
1194
1195                 if (remote_rcs[i] != 0) {
1196                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1197                                 i, remote_rcs[i], req);
1198                         return(-EPROTO);
1199                 }
1200         }
1201
1202         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1203                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1204                        req->rq_bulk->bd_nob_transferred, requested_nob);
1205                 return(-EPROTO);
1206         }
1207
1208         return (0);
1209 }
1210
1211 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1212 {
1213         if (p1->flag != p2->flag) {
1214                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1215                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1216
1217                 /* warn if we try to combine flags that we don't know to be
1218                  * safe to combine */
1219                 if ((p1->flag & mask) != (p2->flag & mask))
1220                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1221                                "same brw?\n", p1->flag, p2->flag);
1222                 return 0;
1223         }
1224
1225         return (p1->off + p1->count == p2->off);
1226 }
1227
1228 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1229                                    struct brw_page **pga, int opc,
1230                                    cksum_type_t cksum_type)
1231 {
1232         __u32 cksum;
1233         int i = 0;
1234
1235         LASSERT (pg_count > 0);
1236         cksum = init_checksum(cksum_type);
1237         while (nob > 0 && pg_count > 0) {
1238                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1239                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1240                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1241
1242                 /* corrupt the data before we compute the checksum, to
1243                  * simulate an OST->client data error */
1244                 if (i == 0 && opc == OST_READ &&
1245                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1246                         memcpy(ptr + off, "bad1", min(4, nob));
1247                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1248                 cfs_kunmap(pga[i]->pg);
1249                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1250                                off, cksum);
1251
1252                 nob -= pga[i]->count;
1253                 pg_count--;
1254                 i++;
1255         }
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         return cksum;
1262 }
1263
1264 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1265                                 struct lov_stripe_md *lsm, obd_count page_count,
1266                                 struct brw_page **pga,
1267                                 struct ptlrpc_request **reqp,
1268                                 struct obd_capa *ocapa, int reserve,
1269                                 int resend)
1270 {
1271         struct ptlrpc_request   *req;
1272         struct ptlrpc_bulk_desc *desc;
1273         struct ost_body         *body;
1274         struct obd_ioobj        *ioobj;
1275         struct niobuf_remote    *niobuf;
1276         int niocount, i, requested_nob, opc, rc;
1277         struct osc_brw_async_args *aa;
1278         struct req_capsule      *pill;
1279         struct brw_page *pg_prev;
1280
1281         ENTRY;
1282         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1283                 RETURN(-ENOMEM); /* Recoverable */
1284         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1285                 RETURN(-EINVAL); /* Fatal */
1286
1287         if ((cmd & OBD_BRW_WRITE) != 0) {
1288                 opc = OST_WRITE;
1289                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1290                                                 cli->cl_import->imp_rq_pool,
1291                                                 &RQF_OST_BRW_WRITE);
1292         } else {
1293                 opc = OST_READ;
1294                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1295         }
1296         if (req == NULL)
1297                 RETURN(-ENOMEM);
1298
1299         for (niocount = i = 1; i < page_count; i++) {
1300                 if (!can_merge_pages(pga[i - 1], pga[i]))
1301                         niocount++;
1302         }
1303
1304         pill = &req->rq_pill;
1305         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1306                              sizeof(*ioobj));
1307         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1308                              niocount * sizeof(*niobuf));
1309         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1310
1311         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1312         if (rc) {
1313                 ptlrpc_request_free(req);
1314                 RETURN(rc);
1315         }
1316         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1317         ptlrpc_at_set_req_timeout(req);
1318
1319         if (opc == OST_WRITE)
1320                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1321                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1322         else
1323                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1324                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1325
1326         if (desc == NULL)
1327                 GOTO(out, rc = -ENOMEM);
1328         /* NB request now owns desc and will free it when it gets freed */
1329
1330         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1331         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1332         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1333         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1334
1335         lustre_set_wire_obdo(&body->oa, oa);
1336
1337         obdo_to_ioobj(oa, ioobj);
1338         ioobj->ioo_bufcnt = niocount;
1339         osc_pack_capa(req, body, ocapa);
1340         LASSERT (page_count > 0);
1341         pg_prev = pga[0];
1342         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1343                 struct brw_page *pg = pga[i];
1344
1345                 LASSERT(pg->count > 0);
1346                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1347                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1348                          pg->off, pg->count);
1349 #ifdef __linux__
1350                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1351                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1352                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1353                          i, page_count,
1354                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1355                          pg_prev->pg, page_private(pg_prev->pg),
1356                          pg_prev->pg->index, pg_prev->off);
1357 #else
1358                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1359                          "i %d p_c %u\n", i, page_count);
1360 #endif
1361                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1362                         (pg->flag & OBD_BRW_SRVLOCK));
1363
1364                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1365                                       pg->count);
1366                 requested_nob += pg->count;
1367
1368                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1369                         niobuf--;
1370                         niobuf->len += pg->count;
1371                 } else {
1372                         niobuf->offset = pg->off;
1373                         niobuf->len    = pg->count;
1374                         niobuf->flags  = pg->flag;
1375                 }
1376                 pg_prev = pg;
1377         }
1378
1379         LASSERTF((void *)(niobuf - niocount) ==
1380                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1381                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1382                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1383
1384         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1385         if (resend) {
1386                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1387                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1388                         body->oa.o_flags = 0;
1389                 }
1390                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1391         }
1392
1393         if (osc_should_shrink_grant(cli))
1394                 osc_shrink_grant_local(cli, &body->oa);
1395
1396         /* size[REQ_REC_OFF] still sizeof (*body) */
1397         if (opc == OST_WRITE) {
1398                 if (unlikely(cli->cl_checksum) &&
1399                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1400                         /* store cl_cksum_type in a local variable since
1401                          * it can be changed via lprocfs */
1402                         cksum_type_t cksum_type = cli->cl_cksum_type;
1403
1404                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1405                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1406                                 body->oa.o_flags = 0;
1407                         }
1408                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1409                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1410                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1411                                                              page_count, pga,
1412                                                              OST_WRITE,
1413                                                              cksum_type);
1414                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1415                                body->oa.o_cksum);
1416                         /* save this in 'oa', too, for later checking */
1417                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1418                         oa->o_flags |= cksum_type_pack(cksum_type);
1419                 } else {
1420                         /* clear out the checksum flag, in case this is a
1421                          * resend but cl_checksum is no longer set. b=11238 */
1422                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1423                 }
1424                 oa->o_cksum = body->oa.o_cksum;
1425                 /* 1 RC per niobuf */
1426                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1427                                      sizeof(__u32) * niocount);
1428         } else {
1429                 if (unlikely(cli->cl_checksum) &&
1430                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1431                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1432                                 body->oa.o_flags = 0;
1433                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1434                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1435                 }
1436         }
1437         ptlrpc_request_set_replen(req);
1438
1439         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1440         aa = ptlrpc_req_async_args(req);
1441         aa->aa_oa = oa;
1442         aa->aa_requested_nob = requested_nob;
1443         aa->aa_nio_count = niocount;
1444         aa->aa_page_count = page_count;
1445         aa->aa_resends = 0;
1446         aa->aa_ppga = pga;
1447         aa->aa_cli = cli;
1448         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1449         if (ocapa && reserve)
1450                 aa->aa_ocapa = capa_get(ocapa);
1451
1452         *reqp = req;
1453         RETURN(0);
1454
1455  out:
1456         ptlrpc_req_finished(req);
1457         RETURN(rc);
1458 }
1459
1460 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1461                                 __u32 client_cksum, __u32 server_cksum, int nob,
1462                                 obd_count page_count, struct brw_page **pga,
1463                                 cksum_type_t client_cksum_type)
1464 {
1465         __u32 new_cksum;
1466         char *msg;
1467         cksum_type_t cksum_type;
1468
1469         if (server_cksum == client_cksum) {
1470                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1471                 return 0;
1472         }
1473
1474         /* If this is mmaped file - it can be changed at any time */
1475         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1476                 return 1;
1477
1478         if (oa->o_valid & OBD_MD_FLFLAGS)
1479                 cksum_type = cksum_type_unpack(oa->o_flags);
1480         else
1481                 cksum_type = OBD_CKSUM_CRC32;
1482
1483         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1484                                       cksum_type);
1485
1486         if (cksum_type != client_cksum_type)
1487                 msg = "the server did not use the checksum type specified in "
1488                       "the original request - likely a protocol problem";
1489         else if (new_cksum == server_cksum)
1490                 msg = "changed on the client after we checksummed it - "
1491                       "likely false positive due to mmap IO (bug 11742)";
1492         else if (new_cksum == client_cksum)
1493                 msg = "changed in transit before arrival at OST";
1494         else
1495                 msg = "changed in transit AND doesn't match the original - "
1496                       "likely false positive due to mmap IO (bug 11742)";
1497
1498         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1499                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1500                            msg, libcfs_nid2str(peer->nid),
1501                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1502                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1503                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1504                            oa->o_id,
1505                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1506                            pga[0]->off,
1507                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1508         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1509                "client csum now %x\n", client_cksum, client_cksum_type,
1510                server_cksum, cksum_type, new_cksum);
1511         return 1;
1512 }
1513
1514 /* Note rc enters this function as number of bytes transferred */
1515 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1516 {
1517         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1518         const lnet_process_id_t *peer =
1519                         &req->rq_import->imp_connection->c_peer;
1520         struct client_obd *cli = aa->aa_cli;
1521         struct ost_body *body;
1522         __u32 client_cksum = 0;
1523         ENTRY;
1524
1525         if (rc < 0 && rc != -EDQUOT) {
1526                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1527                 RETURN(rc);
1528         }
1529
1530         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1531         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1532         if (body == NULL) {
1533                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1534                 RETURN(-EPROTO);
1535         }
1536
1537 #ifdef HAVE_QUOTA_SUPPORT
1538         /* set/clear over quota flag for a uid/gid */
1539         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1540             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1541                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1542
1543                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1544                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1545                        body->oa.o_flags);
1546                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1547                              body->oa.o_flags);
1548         }
1549 #endif
1550
1551         osc_update_grant(cli, body);
1552
1553         if (rc < 0)
1554                 RETURN(rc);
1555
1556         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1557                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1558
1559         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1560                 if (rc > 0) {
1561                         CERROR("Unexpected +ve rc %d\n", rc);
1562                         RETURN(-EPROTO);
1563                 }
1564                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1565
1566                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1567                         RETURN(-EAGAIN);
1568
1569                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1570                     check_write_checksum(&body->oa, peer, client_cksum,
1571                                          body->oa.o_cksum, aa->aa_requested_nob,
1572                                          aa->aa_page_count, aa->aa_ppga,
1573                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1574                         RETURN(-EAGAIN);
1575
1576                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1577                                      aa->aa_page_count, aa->aa_ppga);
1578                 GOTO(out, rc);
1579         }
1580
1581         /* The rest of this function executes only for OST_READs */
1582
1583         /* if unwrap_bulk failed, return -EAGAIN to retry */
1584         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1585         if (rc < 0)
1586                 GOTO(out, rc = -EAGAIN);
1587
1588         if (rc > aa->aa_requested_nob) {
1589                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1590                        aa->aa_requested_nob);
1591                 RETURN(-EPROTO);
1592         }
1593
1594         if (rc != req->rq_bulk->bd_nob_transferred) {
1595                 CERROR ("Unexpected rc %d (%d transferred)\n",
1596                         rc, req->rq_bulk->bd_nob_transferred);
1597                 return (-EPROTO);
1598         }
1599
1600         if (rc < aa->aa_requested_nob)
1601                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1602
1603         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1604                 static int cksum_counter;
1605                 __u32      server_cksum = body->oa.o_cksum;
1606                 char      *via;
1607                 char      *router;
1608                 cksum_type_t cksum_type;
1609
1610                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1611                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1612                 else
1613                         cksum_type = OBD_CKSUM_CRC32;
1614                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1615                                                  aa->aa_ppga, OST_READ,
1616                                                  cksum_type);
1617
1618                 if (peer->nid == req->rq_bulk->bd_sender) {
1619                         via = router = "";
1620                 } else {
1621                         via = " via ";
1622                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1623                 }
1624
1625                 if (server_cksum == ~0 && rc > 0) {
1626                         CERROR("Protocol error: server %s set the 'checksum' "
1627                                "bit, but didn't send a checksum.  Not fatal, "
1628                                "but please notify on http://bugzilla.lustre.org/\n",
1629                                libcfs_nid2str(peer->nid));
1630                 } else if (server_cksum != client_cksum) {
1631                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1632                                            "%s%s%s inode "DFID" object "
1633                                            LPU64"/"LPU64" extent "
1634                                            "["LPU64"-"LPU64"]\n",
1635                                            req->rq_import->imp_obd->obd_name,
1636                                            libcfs_nid2str(peer->nid),
1637                                            via, router,
1638                                            body->oa.o_valid & OBD_MD_FLFID ?
1639                                                 body->oa.o_parent_seq : (__u64)0,
1640                                            body->oa.o_valid & OBD_MD_FLFID ?
1641                                                 body->oa.o_parent_oid : 0,
1642                                            body->oa.o_valid & OBD_MD_FLFID ?
1643                                                 body->oa.o_parent_ver : 0,
1644                                            body->oa.o_id,
1645                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1646                                                 body->oa.o_seq : (__u64)0,
1647                                            aa->aa_ppga[0]->off,
1648                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1649                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1650                                                                         1);
1651                         CERROR("client %x, server %x, cksum_type %x\n",
1652                                client_cksum, server_cksum, cksum_type);
1653                         cksum_counter = 0;
1654                         aa->aa_oa->o_cksum = client_cksum;
1655                         rc = -EAGAIN;
1656                 } else {
1657                         cksum_counter++;
1658                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1659                         rc = 0;
1660                 }
1661         } else if (unlikely(client_cksum)) {
1662                 static int cksum_missed;
1663
1664                 cksum_missed++;
1665                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1666                         CERROR("Checksum %u requested from %s but not sent\n",
1667                                cksum_missed, libcfs_nid2str(peer->nid));
1668         } else {
1669                 rc = 0;
1670         }
1671 out:
1672         if (rc >= 0)
1673                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1674
1675         RETURN(rc);
1676 }
1677
1678 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1679                             struct lov_stripe_md *lsm,
1680                             obd_count page_count, struct brw_page **pga,
1681                             struct obd_capa *ocapa)
1682 {
1683         struct ptlrpc_request *req;
1684         int                    rc;
1685         cfs_waitq_t            waitq;
1686         int                    resends = 0;
1687         struct l_wait_info     lwi;
1688
1689         ENTRY;
1690
1691         cfs_waitq_init(&waitq);
1692
1693 restart_bulk:
1694         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1695                                   page_count, pga, &req, ocapa, 0, resends);
1696         if (rc != 0)
1697                 return (rc);
1698
1699         rc = ptlrpc_queue_wait(req);
1700
1701         if (rc == -ETIMEDOUT && req->rq_resend) {
1702                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1703                 ptlrpc_req_finished(req);
1704                 goto restart_bulk;
1705         }
1706
1707         rc = osc_brw_fini_request(req, rc);
1708
1709         ptlrpc_req_finished(req);
1710         if (osc_recoverable_error(rc)) {
1711                 resends++;
1712                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1713                         CERROR("too many resend retries, returning error\n");
1714                         RETURN(-EIO);
1715                 }
1716
1717                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1718                 l_wait_event(waitq, 0, &lwi);
1719
1720                 goto restart_bulk;
1721         }
1722
1723         RETURN (rc);
1724 }
1725
1726 int osc_brw_redo_request(struct ptlrpc_request *request,
1727                          struct osc_brw_async_args *aa)
1728 {
1729         struct ptlrpc_request *new_req;
1730         struct ptlrpc_request_set *set = request->rq_set;
1731         struct osc_brw_async_args *new_aa;
1732         struct osc_async_page *oap;
1733         int rc = 0;
1734         ENTRY;
1735
1736         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1737                 CERROR("too many resent retries, returning error\n");
1738                 RETURN(-EIO);
1739         }
1740
1741         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1742
1743         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1744                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1745                                   aa->aa_cli, aa->aa_oa,
1746                                   NULL /* lsm unused by osc currently */,
1747                                   aa->aa_page_count, aa->aa_ppga,
1748                                   &new_req, aa->aa_ocapa, 0, 1);
1749         if (rc)
1750                 RETURN(rc);
1751
1752         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1753
1754         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1755                 if (oap->oap_request != NULL) {
1756                         LASSERTF(request == oap->oap_request,
1757                                  "request %p != oap_request %p\n",
1758                                  request, oap->oap_request);
1759                         if (oap->oap_interrupted) {
1760                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1761                                 ptlrpc_req_finished(new_req);
1762                                 RETURN(-EINTR);
1763                         }
1764                 }
1765         }
1766         /* New request takes over pga and oaps from old request.
1767          * Note that copying a list_head doesn't work, need to move it... */
1768         aa->aa_resends++;
1769         new_req->rq_interpret_reply = request->rq_interpret_reply;
1770         new_req->rq_async_args = request->rq_async_args;
1771         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1772
1773         new_aa = ptlrpc_req_async_args(new_req);
1774
1775         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1776         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1777         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1778
1779         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1780                 if (oap->oap_request) {
1781                         ptlrpc_req_finished(oap->oap_request);
1782                         oap->oap_request = ptlrpc_request_addref(new_req);
1783                 }
1784         }
1785
1786         new_aa->aa_ocapa = aa->aa_ocapa;
1787         aa->aa_ocapa = NULL;
1788
1789         /* use ptlrpc_set_add_req is safe because interpret functions work
1790          * in check_set context. only one way exist with access to request
1791          * from different thread got -EINTR - this way protected with
1792          * cl_loi_list_lock */
1793         ptlrpc_set_add_req(set, new_req);
1794
1795         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1796
1797         DEBUG_REQ(D_INFO, new_req, "new request");
1798         RETURN(0);
1799 }
1800
1801 /*
1802  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1803  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1804  * fine for our small page arrays and doesn't require allocation.  its an
1805  * insertion sort that swaps elements that are strides apart, shrinking the
1806  * stride down until its '1' and the array is sorted.
1807  */
1808 static void sort_brw_pages(struct brw_page **array, int num)
1809 {
1810         int stride, i, j;
1811         struct brw_page *tmp;
1812
1813         if (num == 1)
1814                 return;
1815         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1816                 ;
1817
1818         do {
1819                 stride /= 3;
1820                 for (i = stride ; i < num ; i++) {
1821                         tmp = array[i];
1822                         j = i;
1823                         while (j >= stride && array[j - stride]->off > tmp->off) {
1824                                 array[j] = array[j - stride];
1825                                 j -= stride;
1826                         }
1827                         array[j] = tmp;
1828                 }
1829         } while (stride > 1);
1830 }
1831
1832 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1833 {
1834         int count = 1;
1835         int offset;
1836         int i = 0;
1837
1838         LASSERT (pages > 0);
1839         offset = pg[i]->off & ~CFS_PAGE_MASK;
1840
1841         for (;;) {
1842                 pages--;
1843                 if (pages == 0)         /* that's all */
1844                         return count;
1845
1846                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1847                         return count;   /* doesn't end on page boundary */
1848
1849                 i++;
1850                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1851                 if (offset != 0)        /* doesn't start on page boundary */
1852                         return count;
1853
1854                 count++;
1855         }
1856 }
1857
1858 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1859 {
1860         struct brw_page **ppga;
1861         int i;
1862
1863         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1864         if (ppga == NULL)
1865                 return NULL;
1866
1867         for (i = 0; i < count; i++)
1868                 ppga[i] = pga + i;
1869         return ppga;
1870 }
1871
1872 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1873 {
1874         LASSERT(ppga != NULL);
1875         OBD_FREE(ppga, sizeof(*ppga) * count);
1876 }
1877
1878 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1879                    obd_count page_count, struct brw_page *pga,
1880                    struct obd_trans_info *oti)
1881 {
1882         struct obdo *saved_oa = NULL;
1883         struct brw_page **ppga, **orig;
1884         struct obd_import *imp = class_exp2cliimp(exp);
1885         struct client_obd *cli;
1886         int rc, page_count_orig;
1887         ENTRY;
1888
1889         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1890         cli = &imp->imp_obd->u.cli;
1891
1892         if (cmd & OBD_BRW_CHECK) {
1893                 /* The caller just wants to know if there's a chance that this
1894                  * I/O can succeed */
1895
1896                 if (imp->imp_invalid)
1897                         RETURN(-EIO);
1898                 RETURN(0);
1899         }
1900
1901         /* test_brw with a failed create can trip this, maybe others. */
1902         LASSERT(cli->cl_max_pages_per_rpc);
1903
1904         rc = 0;
1905
1906         orig = ppga = osc_build_ppga(pga, page_count);
1907         if (ppga == NULL)
1908                 RETURN(-ENOMEM);
1909         page_count_orig = page_count;
1910
1911         sort_brw_pages(ppga, page_count);
1912         while (page_count) {
1913                 obd_count pages_per_brw;
1914
1915                 if (page_count > cli->cl_max_pages_per_rpc)
1916                         pages_per_brw = cli->cl_max_pages_per_rpc;
1917                 else
1918                         pages_per_brw = page_count;
1919
1920                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1921
1922                 if (saved_oa != NULL) {
1923                         /* restore previously saved oa */
1924                         *oinfo->oi_oa = *saved_oa;
1925                 } else if (page_count > pages_per_brw) {
1926                         /* save a copy of oa (brw will clobber it) */
1927                         OBDO_ALLOC(saved_oa);
1928                         if (saved_oa == NULL)
1929                                 GOTO(out, rc = -ENOMEM);
1930                         *saved_oa = *oinfo->oi_oa;
1931                 }
1932
1933                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1934                                       pages_per_brw, ppga, oinfo->oi_capa);
1935
1936                 if (rc != 0)
1937                         break;
1938
1939                 page_count -= pages_per_brw;
1940                 ppga += pages_per_brw;
1941         }
1942
1943 out:
1944         osc_release_ppga(orig, page_count_orig);
1945
1946         if (saved_oa != NULL)
1947                 OBDO_FREE(saved_oa);
1948
1949         RETURN(rc);
1950 }
1951
1952 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1953  * the dirty accounting.  Writeback completes or truncate happens before
1954  * writing starts.  Must be called with the loi lock held. */
1955 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1956                            int sent)
1957 {
1958         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1959 }
1960
1961
1962 /* This maintains the lists of pending pages to read/write for a given object
1963  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964  * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1966                          int cmd)
1967 {
1968         int optimal;
1969         ENTRY;
1970
1971         if (lop->lop_num_pending == 0)
1972                 RETURN(0);
1973
1974         /* if we have an invalid import we want to drain the queued pages
1975          * by forcing them through rpcs that immediately fail and complete
1976          * the pages.  recovery relies on this to empty the queued pages
1977          * before canceling the locks and evicting down the llite pages */
1978         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1979                 RETURN(1);
1980
1981         /* stream rpcs in queue order as long as as there is an urgent page
1982          * queued.  this is our cheap solution for good batching in the case
1983          * where writepage marks some random page in the middle of the file
1984          * as urgent because of, say, memory pressure */
1985         if (!cfs_list_empty(&lop->lop_urgent)) {
1986                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1987                 RETURN(1);
1988         }
1989         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1990         optimal = cli->cl_max_pages_per_rpc;
1991         if (cmd & OBD_BRW_WRITE) {
1992                 /* trigger a write rpc stream as long as there are dirtiers
1993                  * waiting for space.  as they're waiting, they're not going to
1994                  * create more pages to coalesce with what's waiting.. */
1995                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1997                         RETURN(1);
1998                 }
1999                 /* +16 to avoid triggering rpcs that would want to include pages
2000                  * that are being queued but which can't be made ready until
2001                  * the queuer finishes with the page. this is a wart for
2002                  * llite::commit_write() */
2003                 optimal += 16;
2004         }
2005         if (lop->lop_num_pending >= optimal)
2006                 RETURN(1);
2007
2008         RETURN(0);
2009 }
2010
2011 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2012 {
2013         struct osc_async_page *oap;
2014         ENTRY;
2015
2016         if (cfs_list_empty(&lop->lop_urgent))
2017                 RETURN(0);
2018
2019         oap = cfs_list_entry(lop->lop_urgent.next,
2020                          struct osc_async_page, oap_urgent_item);
2021
2022         if (oap->oap_async_flags & ASYNC_HP) {
2023                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2024                 RETURN(1);
2025         }
2026
2027         RETURN(0);
2028 }
2029
2030 static void on_list(cfs_list_t *item, cfs_list_t *list,
2031                     int should_be_on)
2032 {
2033         if (cfs_list_empty(item) && should_be_on)
2034                 cfs_list_add_tail(item, list);
2035         else if (!cfs_list_empty(item) && !should_be_on)
2036                 cfs_list_del_init(item);
2037 }
2038
2039 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2040  * can find pages to build into rpcs quickly */
2041 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2042 {
2043         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2044             lop_makes_hprpc(&loi->loi_read_lop)) {
2045                 /* HP rpc */
2046                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2047                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2048         } else {
2049                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2050                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2051                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2052                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2053         }
2054
2055         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2056                 loi->loi_write_lop.lop_num_pending);
2057
2058         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2059                 loi->loi_read_lop.lop_num_pending);
2060 }
2061
2062 static void lop_update_pending(struct client_obd *cli,
2063                                struct loi_oap_pages *lop, int cmd, int delta)
2064 {
2065         lop->lop_num_pending += delta;
2066         if (cmd & OBD_BRW_WRITE)
2067                 cli->cl_pending_w_pages += delta;
2068         else
2069                 cli->cl_pending_r_pages += delta;
2070 }
2071
2072 /**
2073  * this is called when a sync waiter receives an interruption.  Its job is to
2074  * get the caller woken as soon as possible.  If its page hasn't been put in an
2075  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2076  * desiring interruption which will forcefully complete the rpc once the rpc
2077  * has timed out.
2078  */
2079 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2080 {
2081         struct loi_oap_pages *lop;
2082         struct lov_oinfo *loi;
2083         int rc = -EBUSY;
2084         ENTRY;
2085
2086         LASSERT(!oap->oap_interrupted);
2087         oap->oap_interrupted = 1;
2088
2089         /* ok, it's been put in an rpc. only one oap gets a request reference */
2090         if (oap->oap_request != NULL) {
2091                 ptlrpc_mark_interrupted(oap->oap_request);
2092                 ptlrpcd_wake(oap->oap_request);
2093                 ptlrpc_req_finished(oap->oap_request);
2094                 oap->oap_request = NULL;
2095         }
2096
2097         /*
2098          * page completion may be called only if ->cpo_prep() method was
2099          * executed by osc_io_submit(), that also adds page the to pending list
2100          */
2101         if (!cfs_list_empty(&oap->oap_pending_item)) {
2102                 cfs_list_del_init(&oap->oap_pending_item);
2103                 cfs_list_del_init(&oap->oap_urgent_item);
2104
2105                 loi = oap->oap_loi;
2106                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2107                         &loi->loi_write_lop : &loi->loi_read_lop;
2108                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2109                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2110                 rc = oap->oap_caller_ops->ap_completion(env,
2111                                           oap->oap_caller_data,
2112                                           oap->oap_cmd, NULL, -EINTR);
2113         }
2114
2115         RETURN(rc);
2116 }
2117
2118 /* this is trying to propogate async writeback errors back up to the
2119  * application.  As an async write fails we record the error code for later if
2120  * the app does an fsync.  As long as errors persist we force future rpcs to be
2121  * sync so that the app can get a sync error and break the cycle of queueing
2122  * pages for which writeback will fail. */
2123 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2124                            int rc)
2125 {
2126         if (rc) {
2127                 if (!ar->ar_rc)
2128                         ar->ar_rc = rc;
2129
2130                 ar->ar_force_sync = 1;
2131                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2132                 return;
2133
2134         }
2135
2136         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2137                 ar->ar_force_sync = 0;
2138 }
2139
2140 void osc_oap_to_pending(struct osc_async_page *oap)
2141 {
2142         struct loi_oap_pages *lop;
2143
2144         if (oap->oap_cmd & OBD_BRW_WRITE)
2145                 lop = &oap->oap_loi->loi_write_lop;
2146         else
2147                 lop = &oap->oap_loi->loi_read_lop;
2148
2149         if (oap->oap_async_flags & ASYNC_HP)
2150                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2151         else if (oap->oap_async_flags & ASYNC_URGENT)
2152                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2153         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2154         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2155 }
2156
2157 /* this must be called holding the loi list lock to give coverage to exit_cache,
2158  * async_flag maintenance, and oap_request */
2159 static void osc_ap_completion(const struct lu_env *env,
2160                               struct client_obd *cli, struct obdo *oa,
2161                               struct osc_async_page *oap, int sent, int rc)
2162 {
2163         __u64 xid = 0;
2164
2165         ENTRY;
2166         if (oap->oap_request != NULL) {
2167                 xid = ptlrpc_req_xid(oap->oap_request);
2168                 ptlrpc_req_finished(oap->oap_request);
2169                 oap->oap_request = NULL;
2170         }
2171
2172         cfs_spin_lock(&oap->oap_lock);
2173         oap->oap_async_flags = 0;
2174         cfs_spin_unlock(&oap->oap_lock);
2175         oap->oap_interrupted = 0;
2176
2177         if (oap->oap_cmd & OBD_BRW_WRITE) {
2178                 osc_process_ar(&cli->cl_ar, xid, rc);
2179                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2180         }
2181
2182         if (rc == 0 && oa != NULL) {
2183                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2184                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2185                 if (oa->o_valid & OBD_MD_FLMTIME)
2186                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2187                 if (oa->o_valid & OBD_MD_FLATIME)
2188                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2189                 if (oa->o_valid & OBD_MD_FLCTIME)
2190                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2191         }
2192
2193         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2194                                                 oap->oap_cmd, oa, rc);
2195
2196         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2197          * I/O on the page could start, but OSC calls it under lock
2198          * and thus we can add oap back to pending safely */
2199         if (rc)
2200                 /* upper layer wants to leave the page on pending queue */
2201                 osc_oap_to_pending(oap);
2202         else
2203                 osc_exit_cache(cli, oap, sent);
2204         EXIT;
2205 }
2206
2207 static int brw_interpret(const struct lu_env *env,
2208                          struct ptlrpc_request *req, void *data, int rc)
2209 {
2210         struct osc_brw_async_args *aa = data;
2211         struct client_obd *cli;
2212         int async;
2213         ENTRY;
2214
2215         rc = osc_brw_fini_request(req, rc);
2216         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2217         if (osc_recoverable_error(rc)) {
2218                 /* Only retry once for mmaped files since the mmaped page
2219                  * might be modified at anytime. We have to retry at least
2220                  * once in case there WAS really a corruption of the page
2221                  * on the network, that was not caused by mmap() modifying
2222                  * the page. Bug11742 */
2223                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2224                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2225                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2226                         rc = 0;
2227                 } else {
2228                         rc = osc_brw_redo_request(req, aa);
2229                         if (rc == 0)
2230                                 RETURN(0);
2231                 }
2232         }
2233
2234         if (aa->aa_ocapa) {
2235                 capa_put(aa->aa_ocapa);
2236                 aa->aa_ocapa = NULL;
2237         }
2238
2239         cli = aa->aa_cli;
2240
2241         client_obd_list_lock(&cli->cl_loi_list_lock);
2242
2243         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2244          * is called so we know whether to go to sync BRWs or wait for more
2245          * RPCs to complete */
2246         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2247                 cli->cl_w_in_flight--;
2248         else
2249                 cli->cl_r_in_flight--;
2250
2251         async = cfs_list_empty(&aa->aa_oaps);
2252         if (!async) { /* from osc_send_oap_rpc() */
2253                 struct osc_async_page *oap, *tmp;
2254                 /* the caller may re-use the oap after the completion call so
2255                  * we need to clean it up a little */
2256                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2257                                              oap_rpc_item) {
2258                         cfs_list_del_init(&oap->oap_rpc_item);
2259                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2260                 }
2261                 OBDO_FREE(aa->aa_oa);
2262         } else { /* from async_internal() */
2263                 obd_count i;
2264                 for (i = 0; i < aa->aa_page_count; i++)
2265                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2266         }
2267         osc_wake_cache_waiters(cli);
2268         osc_check_rpcs(env, cli);
2269         client_obd_list_unlock(&cli->cl_loi_list_lock);
2270         if (!async)
2271                 cl_req_completion(env, aa->aa_clerq, rc);
2272         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2273
2274         RETURN(rc);
2275 }
2276
2277 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2278                                             struct client_obd *cli,
2279                                             cfs_list_t *rpc_list,
2280                                             int page_count, int cmd)
2281 {
2282         struct ptlrpc_request *req;
2283         struct brw_page **pga = NULL;
2284         struct osc_brw_async_args *aa;
2285         struct obdo *oa = NULL;
2286         const struct obd_async_page_ops *ops = NULL;
2287         void *caller_data = NULL;
2288         struct osc_async_page *oap;
2289         struct osc_async_page *tmp;
2290         struct ost_body *body;
2291         struct cl_req *clerq = NULL;
2292         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2293         struct ldlm_lock *lock = NULL;
2294         struct cl_req_attr crattr;
2295         int i, rc, mpflag = 0;
2296
2297         ENTRY;
2298         LASSERT(!cfs_list_empty(rpc_list));
2299
2300         if (cmd & OBD_BRW_MEMALLOC)
2301                 mpflag = cfs_memory_pressure_get_and_set();
2302
2303         memset(&crattr, 0, sizeof crattr);
2304         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2305         if (pga == NULL)
2306                 GOTO(out, req = ERR_PTR(-ENOMEM));
2307
2308         OBDO_ALLOC(oa);
2309         if (oa == NULL)
2310                 GOTO(out, req = ERR_PTR(-ENOMEM));
2311
2312         i = 0;
2313         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2314                 struct cl_page *page = osc_oap2cl_page(oap);
2315                 if (ops == NULL) {
2316                         ops = oap->oap_caller_ops;
2317                         caller_data = oap->oap_caller_data;
2318
2319                         clerq = cl_req_alloc(env, page, crt,
2320                                              1 /* only 1-object rpcs for
2321                                                 * now */);
2322                         if (IS_ERR(clerq))
2323                                 GOTO(out, req = (void *)clerq);
2324                         lock = oap->oap_ldlm_lock;
2325                 }
2326                 pga[i] = &oap->oap_brw_page;
2327                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2328                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2329                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2330                 i++;
2331                 cl_req_page_add(env, clerq, page);
2332         }
2333
2334         /* always get the data for the obdo for the rpc */
2335         LASSERT(ops != NULL);
2336         crattr.cra_oa = oa;
2337         crattr.cra_capa = NULL;
2338         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2339         if (lock) {
2340                 oa->o_handle = lock->l_remote_handle;
2341                 oa->o_valid |= OBD_MD_FLHANDLE;
2342         }
2343
2344         rc = cl_req_prep(env, clerq);
2345         if (rc != 0) {
2346                 CERROR("cl_req_prep failed: %d\n", rc);
2347                 GOTO(out, req = ERR_PTR(rc));
2348         }
2349
2350         sort_brw_pages(pga, page_count);
2351         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2352                                   pga, &req, crattr.cra_capa, 1, 0);
2353         if (rc != 0) {
2354                 CERROR("prep_req failed: %d\n", rc);
2355                 GOTO(out, req = ERR_PTR(rc));
2356         }
2357
2358         if (cmd & OBD_BRW_MEMALLOC)
2359                 req->rq_memalloc = 1;
2360
2361         /* Need to update the timestamps after the request is built in case
2362          * we race with setattr (locally or in queue at OST).  If OST gets
2363          * later setattr before earlier BRW (as determined by the request xid),
2364          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2365          * way to do this in a single call.  bug 10150 */
2366         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2367         cl_req_attr_set(env, clerq, &crattr,
2368                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2369
2370         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2371         aa = ptlrpc_req_async_args(req);
2372         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2373         cfs_list_splice(rpc_list, &aa->aa_oaps);
2374         CFS_INIT_LIST_HEAD(rpc_list);
2375         aa->aa_clerq = clerq;
2376 out:
2377         if (cmd & OBD_BRW_MEMALLOC)
2378                 cfs_memory_pressure_restore(mpflag);
2379
2380         capa_put(crattr.cra_capa);
2381         if (IS_ERR(req)) {
2382                 if (oa)
2383                         OBDO_FREE(oa);
2384                 if (pga)
2385                         OBD_FREE(pga, sizeof(*pga) * page_count);
2386                 /* this should happen rarely and is pretty bad, it makes the
2387                  * pending list not follow the dirty order */
2388                 client_obd_list_lock(&cli->cl_loi_list_lock);
2389                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2390                         cfs_list_del_init(&oap->oap_rpc_item);
2391
2392                         /* queued sync pages can be torn down while the pages
2393                          * were between the pending list and the rpc */
2394                         if (oap->oap_interrupted) {
2395                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2396                                 osc_ap_completion(env, cli, NULL, oap, 0,
2397                                                   oap->oap_count);
2398                                 continue;
2399                         }
2400                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2401                 }
2402                 if (clerq && !IS_ERR(clerq))
2403                         cl_req_completion(env, clerq, PTR_ERR(req));
2404         }
2405         RETURN(req);
2406 }
2407
2408 /**
2409  * prepare pages for ASYNC io and put pages in send queue.
2410  *
2411  * \param cmd OBD_BRW_* macroses
2412  * \param lop pending pages
2413  *
2414  * \return zero if no page added to send queue.
2415  * \return 1 if pages successfully added to send queue.
2416  * \return negative on errors.
2417  */
2418 static int
2419 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2420                  struct lov_oinfo *loi,
2421                  int cmd, struct loi_oap_pages *lop)
2422 {
2423         struct ptlrpc_request *req;
2424         obd_count page_count = 0;
2425         struct osc_async_page *oap = NULL, *tmp;
2426         struct osc_brw_async_args *aa;
2427         const struct obd_async_page_ops *ops;
2428         CFS_LIST_HEAD(rpc_list);
2429         CFS_LIST_HEAD(tmp_list);
2430         unsigned int ending_offset;
2431         unsigned  starting_offset = 0;
2432         int srvlock = 0, mem_tight = 0;
2433         struct cl_object *clob = NULL;
2434         ENTRY;
2435
2436         /* ASYNC_HP pages first. At present, when the lock the pages is
2437          * to be canceled, the pages covered by the lock will be sent out
2438          * with ASYNC_HP. We have to send out them as soon as possible. */
2439         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2440                 if (oap->oap_async_flags & ASYNC_HP) 
2441                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2442                 else
2443                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2444                 if (++page_count >= cli->cl_max_pages_per_rpc)
2445                         break;
2446         }
2447
2448         cfs_list_splice(&tmp_list, &lop->lop_pending);
2449         page_count = 0;
2450
2451         /* first we find the pages we're allowed to work with */
2452         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2453                                      oap_pending_item) {
2454                 ops = oap->oap_caller_ops;
2455
2456                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2457                          "magic 0x%x\n", oap, oap->oap_magic);
2458
2459                 if (clob == NULL) {
2460                         /* pin object in memory, so that completion call-backs
2461                          * can be safely called under client_obd_list lock. */
2462                         clob = osc_oap2cl_page(oap)->cp_obj;
2463                         cl_object_get(clob);
2464                 }
2465
2466                 if (page_count != 0 &&
2467                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2468                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2469                                " oap %p, page %p, srvlock %u\n",
2470                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2471                         break;
2472                 }
2473
2474                 /* If there is a gap at the start of this page, it can't merge
2475                  * with any previous page, so we'll hand the network a
2476                  * "fragmented" page array that it can't transfer in 1 RDMA */
2477                 if (page_count != 0 && oap->oap_page_off != 0)
2478                         break;
2479
2480                 /* in llite being 'ready' equates to the page being locked
2481                  * until completion unlocks it.  commit_write submits a page
2482                  * as not ready because its unlock will happen unconditionally
2483                  * as the call returns.  if we race with commit_write giving
2484                  * us that page we don't want to create a hole in the page
2485                  * stream, so we stop and leave the rpc to be fired by
2486                  * another dirtier or kupdated interval (the not ready page
2487                  * will still be on the dirty list).  we could call in
2488                  * at the end of ll_file_write to process the queue again. */
2489                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2490                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2491                                                     cmd);
2492                         if (rc < 0)
2493                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2494                                                 "instead of ready\n", oap,
2495                                                 oap->oap_page, rc);
2496                         switch (rc) {
2497                         case -EAGAIN:
2498                                 /* llite is telling us that the page is still
2499                                  * in commit_write and that we should try
2500                                  * and put it in an rpc again later.  we
2501                                  * break out of the loop so we don't create
2502                                  * a hole in the sequence of pages in the rpc
2503                                  * stream.*/
2504                                 oap = NULL;
2505                                 break;
2506                         case -EINTR:
2507                                 /* the io isn't needed.. tell the checks
2508                                  * below to complete the rpc with EINTR */
2509                                 cfs_spin_lock(&oap->oap_lock);
2510                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2511                                 cfs_spin_unlock(&oap->oap_lock);
2512                                 oap->oap_count = -EINTR;
2513                                 break;
2514                         case 0:
2515                                 cfs_spin_lock(&oap->oap_lock);
2516                                 oap->oap_async_flags |= ASYNC_READY;
2517                                 cfs_spin_unlock(&oap->oap_lock);
2518                                 break;
2519                         default:
2520                                 LASSERTF(0, "oap %p page %p returned %d "
2521                                             "from make_ready\n", oap,
2522                                             oap->oap_page, rc);
2523                                 break;
2524                         }
2525                 }
2526                 if (oap == NULL)
2527                         break;
2528                 /*
2529                  * Page submitted for IO has to be locked. Either by
2530                  * ->ap_make_ready() or by higher layers.
2531                  */
2532 #if defined(__KERNEL__) && defined(__linux__)
2533                 {
2534                         struct cl_page *page;
2535
2536                         page = osc_oap2cl_page(oap);
2537
2538                         if (page->cp_type == CPT_CACHEABLE &&
2539                             !(PageLocked(oap->oap_page) &&
2540                               (CheckWriteback(oap->oap_page, cmd)))) {
2541                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2542                                        oap->oap_page,
2543                                        (long)oap->oap_page->flags,
2544                                        oap->oap_async_flags);
2545                                 LBUG();
2546                         }
2547                 }
2548 #endif
2549
2550                 /* take the page out of our book-keeping */
2551                 cfs_list_del_init(&oap->oap_pending_item);
2552                 lop_update_pending(cli, lop, cmd, -1);
2553                 cfs_list_del_init(&oap->oap_urgent_item);
2554
2555                 if (page_count == 0)
2556                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2557                                           (PTLRPC_MAX_BRW_SIZE - 1);
2558
2559                 /* ask the caller for the size of the io as the rpc leaves. */
2560                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2561                         oap->oap_count =
2562                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2563                                                       cmd);
2564                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2565                 }
2566                 if (oap->oap_count <= 0) {
2567                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2568                                oap->oap_count);
2569                         osc_ap_completion(env, cli, NULL,
2570                                           oap, 0, oap->oap_count);
2571                         continue;
2572                 }
2573
2574                 /* now put the page back in our accounting */
2575                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2576                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2577                         mem_tight = 1;
2578                 if (page_count == 0)
2579                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2580                 if (++page_count >= cli->cl_max_pages_per_rpc)
2581                         break;
2582
2583                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2584                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2585                  * have the same alignment as the initial writes that allocated
2586                  * extents on the server. */
2587                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2588                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2589                 if (ending_offset == 0)
2590                         break;
2591
2592                 /* If there is a gap at the end of this page, it can't merge
2593                  * with any subsequent pages, so we'll hand the network a
2594                  * "fragmented" page array that it can't transfer in 1 RDMA */
2595                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2596                         break;
2597         }
2598
2599         osc_wake_cache_waiters(cli);
2600
2601         loi_list_maint(cli, loi);
2602
2603         client_obd_list_unlock(&cli->cl_loi_list_lock);
2604
2605         if (clob != NULL)
2606                 cl_object_put(env, clob);
2607
2608         if (page_count == 0) {
2609                 client_obd_list_lock(&cli->cl_loi_list_lock);
2610                 RETURN(0);
2611         }
2612
2613         req = osc_build_req(env, cli, &rpc_list, page_count,
2614                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2615         if (IS_ERR(req)) {
2616                 LASSERT(cfs_list_empty(&rpc_list));
2617                 loi_list_maint(cli, loi);
2618                 RETURN(PTR_ERR(req));
2619         }
2620
2621         aa = ptlrpc_req_async_args(req);
2622
2623         if (cmd == OBD_BRW_READ) {
2624                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2625                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2626                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2627                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2628         } else {
2629                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2630                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2631                                  cli->cl_w_in_flight);
2632                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2633                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2634         }
2635         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2636
2637         client_obd_list_lock(&cli->cl_loi_list_lock);
2638
2639         if (cmd == OBD_BRW_READ)
2640                 cli->cl_r_in_flight++;
2641         else
2642                 cli->cl_w_in_flight++;
2643
2644         /* queued sync pages can be torn down while the pages
2645          * were between the pending list and the rpc */
2646         tmp = NULL;
2647         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2648                 /* only one oap gets a request reference */
2649                 if (tmp == NULL)
2650                         tmp = oap;
2651                 if (oap->oap_interrupted && !req->rq_intr) {
2652                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2653                                oap, req);
2654                         ptlrpc_mark_interrupted(req);
2655                 }
2656         }
2657         if (tmp != NULL)
2658                 tmp->oap_request = ptlrpc_request_addref(req);
2659
2660         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2661                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2662
2663         req->rq_interpret_reply = brw_interpret;
2664         ptlrpcd_add_req(req, PSCOPE_BRW);
2665         RETURN(1);
2666 }
2667
2668 #define LOI_DEBUG(LOI, STR, args...)                                     \
2669         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2670                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2671                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2672                (LOI)->loi_write_lop.lop_num_pending,                     \
2673                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2674                (LOI)->loi_read_lop.lop_num_pending,                      \
2675                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2676                args)                                                     \
2677
2678 /* This is called by osc_check_rpcs() to find which objects have pages that
2679  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2680 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2681 {
2682         ENTRY;
2683
2684         /* First return objects that have blocked locks so that they
2685          * will be flushed quickly and other clients can get the lock,
2686          * then objects which have pages ready to be stuffed into RPCs */
2687         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2688                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2689                                       struct lov_oinfo, loi_hp_ready_item));
2690         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2691                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2692                                       struct lov_oinfo, loi_ready_item));
2693
2694         /* then if we have cache waiters, return all objects with queued
2695          * writes.  This is especially important when many small files
2696          * have filled up the cache and not been fired into rpcs because
2697          * they don't pass the nr_pending/object threshhold */
2698         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2699             !cfs_list_empty(&cli->cl_loi_write_list))
2700                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2701                                       struct lov_oinfo, loi_write_item));
2702
2703         /* then return all queued objects when we have an invalid import
2704          * so that they get flushed */
2705         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2706                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2707                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2708                                               struct lov_oinfo,
2709                                               loi_write_item));
2710                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2711                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2712                                               struct lov_oinfo, loi_read_item));
2713         }
2714         RETURN(NULL);
2715 }
2716
2717 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2718 {
2719         struct osc_async_page *oap;
2720         int hprpc = 0;
2721
2722         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2723                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2724                                      struct osc_async_page, oap_urgent_item);
2725                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2726         }
2727
2728         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2729                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2730                                      struct osc_async_page, oap_urgent_item);
2731                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2732         }
2733
2734         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2735 }
2736
2737 /* called with the loi list lock held */
2738 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2739 {
2740         struct lov_oinfo *loi;
2741         int rc = 0, race_counter = 0;
2742         ENTRY;
2743
2744         while ((loi = osc_next_loi(cli)) != NULL) {
2745                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2746
2747                 if (osc_max_rpc_in_flight(cli, loi))
2748                         break;
2749
2750                 /* attempt some read/write balancing by alternating between
2751                  * reads and writes in an object.  The makes_rpc checks here
2752                  * would be redundant if we were getting read/write work items
2753                  * instead of objects.  we don't want send_oap_rpc to drain a
2754                  * partial read pending queue when we're given this object to
2755                  * do io on writes while there are cache waiters */
2756                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2757                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2758                                               &loi->loi_write_lop);
2759                         if (rc < 0) {
2760                                 CERROR("Write request failed with %d\n", rc);
2761
2762                                 /* osc_send_oap_rpc failed, mostly because of
2763                                  * memory pressure.
2764                                  *
2765                                  * It can't break here, because if:
2766                                  *  - a page was submitted by osc_io_submit, so
2767                                  *    page locked;
2768                                  *  - no request in flight
2769                                  *  - no subsequent request
2770                                  * The system will be in live-lock state,
2771                                  * because there is no chance to call
2772                                  * osc_io_unplug() and osc_check_rpcs() any
2773                                  * more. pdflush can't help in this case,
2774                                  * because it might be blocked at grabbing
2775                                  * the page lock as we mentioned.
2776                                  *
2777                                  * Anyway, continue to drain pages. */
2778                                 /* break; */
2779                         }
2780
2781                         if (rc > 0)
2782                                 race_counter = 0;
2783                         else
2784                                 race_counter++;
2785                 }
2786                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2787                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2788                                               &loi->loi_read_lop);
2789                         if (rc < 0)
2790                                 CERROR("Read request failed with %d\n", rc);
2791
2792                         if (rc > 0)
2793                                 race_counter = 0;
2794                         else
2795                                 race_counter++;
2796                 }
2797
2798                 /* attempt some inter-object balancing by issuing rpcs
2799                  * for each object in turn */
2800                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2801                         cfs_list_del_init(&loi->loi_hp_ready_item);
2802                 if (!cfs_list_empty(&loi->loi_ready_item))
2803                         cfs_list_del_init(&loi->loi_ready_item);
2804                 if (!cfs_list_empty(&loi->loi_write_item))
2805                         cfs_list_del_init(&loi->loi_write_item);
2806                 if (!cfs_list_empty(&loi->loi_read_item))
2807                         cfs_list_del_init(&loi->loi_read_item);
2808
2809                 loi_list_maint(cli, loi);
2810
2811                 /* send_oap_rpc fails with 0 when make_ready tells it to
2812                  * back off.  llite's make_ready does this when it tries
2813                  * to lock a page queued for write that is already locked.
2814                  * we want to try sending rpcs from many objects, but we
2815                  * don't want to spin failing with 0.  */
2816                 if (race_counter == 10)
2817                         break;
2818         }
2819         EXIT;
2820 }
2821
2822 /* we're trying to queue a page in the osc so we're subject to the
2823  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2824  * If the osc's queued pages are already at that limit, then we want to sleep
2825  * until there is space in the osc's queue for us.  We also may be waiting for
2826  * write credits from the OST if there are RPCs in flight that may return some
2827  * before we fall back to sync writes.
2828  *
2829  * We need this know our allocation was granted in the presence of signals */
2830 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2831 {
2832         int rc;
2833         ENTRY;
2834         client_obd_list_lock(&cli->cl_loi_list_lock);
2835         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2836         client_obd_list_unlock(&cli->cl_loi_list_lock);
2837         RETURN(rc);
2838 };
2839
2840 /**
2841  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2842  * is available.
2843  */
2844 int osc_enter_cache_try(const struct lu_env *env,
2845                         struct client_obd *cli, struct lov_oinfo *loi,
2846                         struct osc_async_page *oap, int transient)
2847 {
2848         int has_grant;
2849
2850         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2851         if (has_grant) {
2852                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2853                 if (transient) {
2854                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2855                         cfs_atomic_inc(&obd_dirty_transit_pages);
2856                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2857                 }
2858         }
2859         return has_grant;
2860 }
2861
2862 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2863  * grant or cache space. */
2864 static int osc_enter_cache(const struct lu_env *env,
2865                            struct client_obd *cli, struct lov_oinfo *loi,
2866                            struct osc_async_page *oap)
2867 {
2868         struct osc_cache_waiter ocw;
2869         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2870
2871         ENTRY;
2872
2873         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2874                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2875                cli->cl_dirty_max, obd_max_dirty_pages,
2876                cli->cl_lost_grant, cli->cl_avail_grant);
2877
2878         /* force the caller to try sync io.  this can jump the list
2879          * of queued writes and create a discontiguous rpc stream */
2880         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2881             loi->loi_ar.ar_force_sync)
2882                 RETURN(-EDQUOT);
2883
2884         /* Hopefully normal case - cache space and write credits available */
2885         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2886             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2887             osc_enter_cache_try(env, cli, loi, oap, 0))
2888                 RETURN(0);
2889
2890         /* It is safe to block as a cache waiter as long as there is grant
2891          * space available or the hope of additional grant being returned
2892          * when an in flight write completes.  Using the write back cache
2893          * if possible is preferable to sending the data synchronously
2894          * because write pages can then be merged in to large requests.
2895          * The addition of this cache waiter will causing pending write
2896          * pages to be sent immediately. */
2897         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2898                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2899                 cfs_waitq_init(&ocw.ocw_waitq);
2900                 ocw.ocw_oap = oap;
2901                 ocw.ocw_rc = 0;
2902
2903                 loi_list_maint(cli, loi);
2904                 osc_check_rpcs(env, cli);
2905                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2906
2907                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2908                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2909
2910                 client_obd_list_lock(&cli->cl_loi_list_lock);
2911                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2912                         cfs_list_del(&ocw.ocw_entry);
2913                         RETURN(-EINTR);
2914                 }
2915                 RETURN(ocw.ocw_rc);
2916         }
2917
2918         RETURN(-EDQUOT);
2919 }
2920
2921
2922 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2923                         struct lov_oinfo *loi, cfs_page_t *page,
2924                         obd_off offset, const struct obd_async_page_ops *ops,
2925                         void *data, void **res, int nocache,
2926                         struct lustre_handle *lockh)
2927 {
2928         struct osc_async_page *oap;
2929
2930         ENTRY;
2931
2932         if (!page)
2933                 return cfs_size_round(sizeof(*oap));
2934
2935         oap = *res;
2936         oap->oap_magic = OAP_MAGIC;
2937         oap->oap_cli = &exp->exp_obd->u.cli;
2938         oap->oap_loi = loi;
2939
2940         oap->oap_caller_ops = ops;
2941         oap->oap_caller_data = data;
2942
2943         oap->oap_page = page;
2944         oap->oap_obj_off = offset;
2945         if (!client_is_remote(exp) &&
2946             cfs_capable(CFS_CAP_SYS_RESOURCE))
2947                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2948
2949         LASSERT(!(offset & ~CFS_PAGE_MASK));
2950
2951         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2952         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2953         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2954         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2955
2956         cfs_spin_lock_init(&oap->oap_lock);
2957         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2958         RETURN(0);
2959 }
2960
2961 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2962                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2963                        struct osc_async_page *oap, int cmd, obd_off off,
2964                        int count, obd_flag brw_flags,
2965                        enum async_flags async_flags)
2966 {
2967         struct client_obd *cli = &exp->exp_obd->u.cli;
2968         int rc = 0;
2969         ENTRY;
2970
2971         if (oap->oap_magic != OAP_MAGIC)
2972                 RETURN(-EINVAL);
2973
2974         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2975                 RETURN(-EIO);
2976
2977         if (!cfs_list_empty(&oap->oap_pending_item) ||
2978             !cfs_list_empty(&oap->oap_urgent_item) ||
2979             !cfs_list_empty(&oap->oap_rpc_item))
2980                 RETURN(-EBUSY);
2981
2982         /* check if the file's owner/group is over quota */
2983         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2984                 struct cl_object *obj;
2985                 struct cl_attr    attr; /* XXX put attr into thread info */
2986                 unsigned int qid[MAXQUOTAS];
2987
2988                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2989
2990                 cl_object_attr_lock(obj);
2991                 rc = cl_object_attr_get(env, obj, &attr);
2992                 cl_object_attr_unlock(obj);
2993
2994                 qid[USRQUOTA] = attr.cat_uid;
2995                 qid[GRPQUOTA] = attr.cat_gid;
2996                 if (rc == 0 &&
2997                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2998                         rc = -EDQUOT;
2999                 if (rc)
3000                         RETURN(rc);
3001         }
3002
3003         if (loi == NULL)
3004                 loi = lsm->lsm_oinfo[0];
3005
3006         client_obd_list_lock(&cli->cl_loi_list_lock);
3007
3008         LASSERT(off + count <= CFS_PAGE_SIZE);
3009         oap->oap_cmd = cmd;
3010         oap->oap_page_off = off;
3011         oap->oap_count = count;
3012         oap->oap_brw_flags = brw_flags;
3013         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3014         if (cfs_memory_pressure_get())
3015                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3016         cfs_spin_lock(&oap->oap_lock);
3017         oap->oap_async_flags = async_flags;
3018         cfs_spin_unlock(&oap->oap_lock);
3019
3020         if (cmd & OBD_BRW_WRITE) {
3021                 rc = osc_enter_cache(env, cli, loi, oap);
3022                 if (rc) {
3023                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3024                         RETURN(rc);
3025                 }
3026         }
3027
3028         osc_oap_to_pending(oap);
3029         loi_list_maint(cli, loi);
3030
3031         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3032                   cmd);
3033
3034         osc_check_rpcs(env, cli);
3035         client_obd_list_unlock(&cli->cl_loi_list_lock);
3036
3037         RETURN(0);
3038 }
3039
3040 /* aka (~was & now & flag), but this is more clear :) */
3041 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3042
3043 int osc_set_async_flags_base(struct client_obd *cli,
3044                              struct lov_oinfo *loi, struct osc_async_page *oap,
3045                              obd_flag async_flags)
3046 {
3047         struct loi_oap_pages *lop;
3048         int flags = 0;
3049         ENTRY;
3050
3051         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3052
3053         if (oap->oap_cmd & OBD_BRW_WRITE) {
3054                 lop = &loi->loi_write_lop;
3055         } else {
3056                 lop = &loi->loi_read_lop;
3057         }
3058
3059         if ((oap->oap_async_flags & async_flags) == async_flags)
3060                 RETURN(0);
3061
3062         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3063                 flags |= ASYNC_READY;
3064
3065         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3066             cfs_list_empty(&oap->oap_rpc_item)) {
3067                 if (oap->oap_async_flags & ASYNC_HP)
3068                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3069                 else
3070                         cfs_list_add_tail(&oap->oap_urgent_item,
3071                                           &lop->lop_urgent);
3072                 flags |= ASYNC_URGENT;
3073                 loi_list_maint(cli, loi);
3074         }
3075         cfs_spin_lock(&oap->oap_lock);
3076         oap->oap_async_flags |= flags;
3077         cfs_spin_unlock(&oap->oap_lock);
3078
3079         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3080                         oap->oap_async_flags);
3081         RETURN(0);
3082 }
3083
3084 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3085                             struct lov_oinfo *loi, struct osc_async_page *oap)
3086 {
3087         struct client_obd *cli = &exp->exp_obd->u.cli;
3088         struct loi_oap_pages *lop;
3089         int rc = 0;
3090         ENTRY;
3091
3092         if (oap->oap_magic != OAP_MAGIC)
3093                 RETURN(-EINVAL);
3094
3095         if (loi == NULL)
3096                 loi = lsm->lsm_oinfo[0];
3097
3098         if (oap->oap_cmd & OBD_BRW_WRITE) {
3099                 lop = &loi->loi_write_lop;
3100         } else {
3101                 lop = &loi->loi_read_lop;
3102         }
3103
3104         client_obd_list_lock(&cli->cl_loi_list_lock);
3105
3106         if (!cfs_list_empty(&oap->oap_rpc_item))
3107                 GOTO(out, rc = -EBUSY);
3108
3109         osc_exit_cache(cli, oap, 0);
3110         osc_wake_cache_waiters(cli);
3111
3112         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3113                 cfs_list_del_init(&oap->oap_urgent_item);
3114                 cfs_spin_lock(&oap->oap_lock);
3115                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3116                 cfs_spin_unlock(&oap->oap_lock);
3117         }
3118         if (!cfs_list_empty(&oap->oap_pending_item)) {
3119                 cfs_list_del_init(&oap->oap_pending_item);
3120                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3121         }
3122         loi_list_maint(cli, loi);
3123         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3124 out:
3125         client_obd_list_unlock(&cli->cl_loi_list_lock);
3126         RETURN(rc);
3127 }
3128
3129 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3130                                         struct ldlm_enqueue_info *einfo)
3131 {
3132         void *data = einfo->ei_cbdata;
3133         int set = 0;
3134
3135         LASSERT(lock != NULL);
3136         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3137         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3138         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3139         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3140
3141         lock_res_and_lock(lock);
3142         cfs_spin_lock(&osc_ast_guard);
3143
3144         if (lock->l_ast_data == NULL)
3145                 lock->l_ast_data = data;
3146         if (lock->l_ast_data == data)
3147                 set = 1;
3148
3149         cfs_spin_unlock(&osc_ast_guard);
3150         unlock_res_and_lock(lock);
3151
3152         return set;
3153 }
3154
3155 static int osc_set_data_with_check(struct lustre_handle *lockh,
3156                                    struct ldlm_enqueue_info *einfo)
3157 {
3158         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3159         int set = 0;
3160
3161         if (lock != NULL) {
3162                 set = osc_set_lock_data_with_check(lock, einfo);
3163                 LDLM_LOCK_PUT(lock);
3164         } else
3165                 CERROR("lockh %p, data %p - client evicted?\n",
3166                        lockh, einfo->ei_cbdata);
3167         return set;
3168 }
3169
3170 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3171                              ldlm_iterator_t replace, void *data)
3172 {
3173         struct ldlm_res_id res_id;
3174         struct obd_device *obd = class_exp2obd(exp);
3175
3176         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3177         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3178         return 0;
3179 }
3180
3181 /* find any ldlm lock of the inode in osc
3182  * return 0    not find
3183  *        1    find one
3184  *      < 0    error */
3185 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3186                            ldlm_iterator_t replace, void *data)
3187 {
3188         struct ldlm_res_id res_id;
3189         struct obd_device *obd = class_exp2obd(exp);
3190         int rc = 0;
3191
3192         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3193         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3194         if (rc == LDLM_ITER_STOP)
3195                 return(1);
3196         if (rc == LDLM_ITER_CONTINUE)
3197                 return(0);
3198         return(rc);
3199 }
3200
3201 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3202                             obd_enqueue_update_f upcall, void *cookie,
3203                             int *flags, int rc)
3204 {
3205         int intent = *flags & LDLM_FL_HAS_INTENT;
3206         ENTRY;
3207
3208         if (intent) {
3209                 /* The request was created before ldlm_cli_enqueue call. */
3210                 if (rc == ELDLM_LOCK_ABORTED) {
3211                         struct ldlm_reply *rep;
3212                         rep = req_capsule_server_get(&req->rq_pill,
3213                                                      &RMF_DLM_REP);
3214
3215                         LASSERT(rep != NULL);
3216                         if (rep->lock_policy_res1)
3217                                 rc = rep->lock_policy_res1;
3218                 }
3219         }
3220
3221         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3222                 *flags |= LDLM_FL_LVB_READY;
3223                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3224                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3225         }
3226
3227         /* Call the update callback. */
3228         rc = (*upcall)(cookie, rc);
3229         RETURN(rc);
3230 }
3231
3232 static int osc_enqueue_interpret(const struct lu_env *env,
3233                                  struct ptlrpc_request *req,
3234                                  struct osc_enqueue_args *aa, int rc)
3235 {
3236         struct ldlm_lock *lock;
3237         struct lustre_handle handle;
3238         __u32 mode;
3239
3240         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3241          * might be freed anytime after lock upcall has been called. */
3242         lustre_handle_copy(&handle, aa->oa_lockh);
3243         mode = aa->oa_ei->ei_mode;
3244
3245         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3246          * be valid. */
3247         lock = ldlm_handle2lock(&handle);
3248
3249         /* Take an additional reference so that a blocking AST that
3250          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3251          * to arrive after an upcall has been executed by
3252          * osc_enqueue_fini(). */
3253         ldlm_lock_addref(&handle, mode);
3254
3255         /* Let CP AST to grant the lock first. */
3256         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3257
3258         /* Complete obtaining the lock procedure. */
3259         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3260                                    mode, aa->oa_flags, aa->oa_lvb,
3261                                    sizeof(*aa->oa_lvb), &handle, rc);
3262         /* Complete osc stuff. */
3263         rc = osc_enqueue_fini(req, aa->oa_lvb,
3264                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3265
3266         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3267
3268         /* Release the lock for async request. */
3269         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3270                 /*
3271                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3272                  * not already released by
3273                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3274                  */
3275                 ldlm_lock_decref(&handle, mode);
3276
3277         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3278                  aa->oa_lockh, req, aa);
3279         ldlm_lock_decref(&handle, mode);
3280         LDLM_LOCK_PUT(lock);
3281         return rc;
3282 }
3283
3284 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3285                         struct lov_oinfo *loi, int flags,
3286                         struct ost_lvb *lvb, __u32 mode, int rc)
3287 {
3288         if (rc == ELDLM_OK) {
3289                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3290                 __u64 tmp;
3291
3292                 LASSERT(lock != NULL);
3293                 loi->loi_lvb = *lvb;
3294                 tmp = loi->loi_lvb.lvb_size;
3295                 /* Extend KMS up to the end of this lock and no further
3296                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3297                 if (tmp > lock->l_policy_data.l_extent.end)
3298                         tmp = lock->l_policy_data.l_extent.end + 1;
3299                 if (tmp >= loi->loi_kms) {
3300                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3301                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3302                         loi_kms_set(loi, tmp);
3303                 } else {
3304                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3305                                    LPU64"; leaving kms="LPU64", end="LPU64,
3306                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3307                                    lock->l_policy_data.l_extent.end);
3308                 }
3309                 ldlm_lock_allow_match(lock);
3310                 LDLM_LOCK_PUT(lock);
3311         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3312                 loi->loi_lvb = *lvb;
3313                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3314                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3315                 rc = ELDLM_OK;
3316         }
3317 }
3318 EXPORT_SYMBOL(osc_update_enqueue);
3319
3320 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3321
3322 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3323  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3324  * other synchronous requests, however keeping some locks and trying to obtain
3325  * others may take a considerable amount of time in a case of ost failure; and
3326  * when other sync requests do not get released lock from a client, the client
3327  * is excluded from the cluster -- such scenarious make the life difficult, so
3328  * release locks just after they are obtained. */
3329 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3330                      int *flags, ldlm_policy_data_t *policy,
3331                      struct ost_lvb *lvb, int kms_valid,
3332                      obd_enqueue_update_f upcall, void *cookie,
3333                      struct ldlm_enqueue_info *einfo,
3334                      struct lustre_handle *lockh,
3335                      struct ptlrpc_request_set *rqset, int async)
3336 {
3337         struct obd_device *obd = exp->exp_obd;
3338         struct ptlrpc_request *req = NULL;
3339         int intent = *flags & LDLM_FL_HAS_INTENT;
3340         ldlm_mode_t mode;
3341         int rc;
3342         ENTRY;
3343
3344         /* Filesystem lock extents are extended to page boundaries so that
3345          * dealing with the page cache is a little smoother.  */
3346         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3347         policy->l_extent.end |= ~CFS_PAGE_MASK;
3348
3349         /*
3350          * kms is not valid when either object is completely fresh (so that no
3351          * locks are cached), or object was evicted. In the latter case cached
3352          * lock cannot be used, because it would prime inode state with
3353          * potentially stale LVB.
3354          */
3355         if (!kms_valid)
3356                 goto no_match;
3357
3358         /* Next, search for already existing extent locks that will cover us */
3359         /* If we're trying to read, we also search for an existing PW lock.  The
3360          * VFS and page cache already protect us locally, so lots of readers/
3361          * writers can share a single PW lock.
3362          *
3363          * There are problems with conversion deadlocks, so instead of
3364          * converting a read lock to a write lock, we'll just enqueue a new
3365          * one.
3366          *
3367          * At some point we should cancel the read lock instead of making them
3368          * send us a blocking callback, but there are problems with canceling
3369          * locks out from other users right now, too. */
3370         mode = einfo->ei_mode;
3371         if (einfo->ei_mode == LCK_PR)
3372                 mode |= LCK_PW;
3373         mode = ldlm_lock_match(obd->obd_namespace,
3374                                *flags | LDLM_FL_LVB_READY, res_id,
3375                                einfo->ei_type, policy, mode, lockh, 0);
3376         if (mode) {
3377                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3378
3379                 if (osc_set_lock_data_with_check(matched, einfo)) {
3380                         /* addref the lock only if not async requests and PW
3381                          * lock is matched whereas we asked for PR. */
3382                         if (!rqset && einfo->ei_mode != mode)
3383                                 ldlm_lock_addref(lockh, LCK_PR);
3384                         if (intent) {
3385                                 /* I would like to be able to ASSERT here that
3386                                  * rss <= kms, but I can't, for reasons which
3387                                  * are explained in lov_enqueue() */
3388                         }
3389
3390                         /* We already have a lock, and it's referenced */
3391                         (*upcall)(cookie, ELDLM_OK);
3392
3393                         /* For async requests, decref the lock. */
3394                         if (einfo->ei_mode != mode)
3395                                 ldlm_lock_decref(lockh, LCK_PW);
3396                         else if (rqset)
3397                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3398                         LDLM_LOCK_PUT(matched);
3399                         RETURN(ELDLM_OK);
3400                 } else
3401                         ldlm_lock_decref(lockh, mode);
3402                 LDLM_LOCK_PUT(matched);
3403         }
3404
3405  no_match:
3406         if (intent) {
3407                 CFS_LIST_HEAD(cancels);
3408                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3409                                            &RQF_LDLM_ENQUEUE_LVB);
3410                 if (req == NULL)
3411                         RETURN(-ENOMEM);
3412
3413                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3414                 if (rc) {
3415                         ptlrpc_request_free(req);
3416                         RETURN(rc);
3417                 }
3418
3419                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3420                                      sizeof *lvb);
3421                 ptlrpc_request_set_replen(req);
3422         }
3423
3424         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3425         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3426
3427         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3428                               sizeof(*lvb), lockh, async);
3429         if (rqset) {
3430                 if (!rc) {
3431                         struct osc_enqueue_args *aa;
3432                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3433                         aa = ptlrpc_req_async_args(req);
3434                         aa->oa_ei = einfo;
3435                         aa->oa_exp = exp;
3436                         aa->oa_flags  = flags;
3437                         aa->oa_upcall = upcall;
3438                         aa->oa_cookie = cookie;
3439                         aa->oa_lvb    = lvb;
3440                         aa->oa_lockh  = lockh;
3441
3442                         req->rq_interpret_reply =
3443                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3444                         if (rqset == PTLRPCD_SET)
3445                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3446                         else
3447                                 ptlrpc_set_add_req(rqset, req);
3448                 } else if (intent) {
3449                         ptlrpc_req_finished(req);
3450                 }
3451                 RETURN(rc);
3452         }
3453
3454         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3455         if (intent)
3456                 ptlrpc_req_finished(req);
3457
3458         RETURN(rc);
3459 }
3460
3461 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3462                        struct ldlm_enqueue_info *einfo,
3463                        struct ptlrpc_request_set *rqset)
3464 {
3465         struct ldlm_res_id res_id;
3466         int rc;
3467         ENTRY;
3468
3469         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3470                            oinfo->oi_md->lsm_object_seq, &res_id);
3471
3472         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3473                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3474                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3475                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3476                               rqset, rqset != NULL);
3477         RETURN(rc);
3478 }
3479
3480 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3481                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3482                    int *flags, void *data, struct lustre_handle *lockh,
3483                    int unref)
3484 {
3485         struct obd_device *obd = exp->exp_obd;
3486         int lflags = *flags;
3487         ldlm_mode_t rc;
3488         ENTRY;
3489
3490         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3491                 RETURN(-EIO);
3492
3493         /* Filesystem lock extents are extended to page boundaries so that
3494          * dealing with the page cache is a little smoother */
3495         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3496         policy->l_extent.end |= ~CFS_PAGE_MASK;
3497
3498         /* Next, search for already existing extent locks that will cover us */
3499         /* If we're trying to read, we also search for an existing PW lock.  The
3500          * VFS and page cache already protect us locally, so lots of readers/
3501          * writers can share a single PW lock. */
3502         rc = mode;
3503         if (mode == LCK_PR)
3504                 rc |= LCK_PW;
3505         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3506                              res_id, type, policy, rc, lockh, unref);
3507         if (rc) {
3508                 if (data != NULL) {
3509                         if (!osc_set_data_with_check(lockh, data)) {
3510                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3511                                         ldlm_lock_decref(lockh, rc);
3512                                 RETURN(0);
3513                         }
3514                 }
3515                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3516                         ldlm_lock_addref(lockh, LCK_PR);
3517                         ldlm_lock_decref(lockh, LCK_PW);
3518                 }
3519                 RETURN(rc);
3520         }
3521         RETURN(rc);
3522 }
3523
3524 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3525 {
3526         ENTRY;
3527
3528         if (unlikely(mode == LCK_GROUP))
3529                 ldlm_lock_decref_and_cancel(lockh, mode);
3530         else
3531                 ldlm_lock_decref(lockh, mode);
3532
3533         RETURN(0);
3534 }
3535
3536 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3537                       __u32 mode, struct lustre_handle *lockh)
3538 {
3539         ENTRY;
3540         RETURN(osc_cancel_base(lockh, mode));
3541 }
3542
3543 static int osc_cancel_unused(struct obd_export *exp,
3544                              struct lov_stripe_md *lsm,
3545                              ldlm_cancel_flags_t flags,
3546                              void *opaque)
3547 {
3548         struct obd_device *obd = class_exp2obd(exp);
3549         struct ldlm_res_id res_id, *resp = NULL;
3550
3551         if (lsm != NULL) {
3552                 resp = osc_build_res_name(lsm->lsm_object_id,
3553                                           lsm->lsm_object_seq, &res_id);
3554         }
3555
3556         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3557 }
3558
3559 static int osc_statfs_interpret(const struct lu_env *env,
3560                                 struct ptlrpc_request *req,
3561                                 struct osc_async_args *aa, int rc)
3562 {
3563         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3564         struct obd_statfs *msfs;
3565         __u64 used;
3566         ENTRY;
3567
3568         if (rc == -EBADR)
3569                 /* The request has in fact never been sent
3570                  * due to issues at a higher level (LOV).
3571                  * Exit immediately since the caller is
3572                  * aware of the problem and takes care
3573                  * of the clean up */
3574                  RETURN(rc);
3575
3576         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3577             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3578                 GOTO(out, rc = 0);
3579
3580         if (rc != 0)
3581                 GOTO(out, rc);
3582
3583         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3584         if (msfs == NULL) {
3585                 GOTO(out, rc = -EPROTO);
3586         }
3587
3588         /* Reinitialize the RDONLY and DEGRADED flags at the client
3589          * on each statfs, so they don't stay set permanently. */
3590         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3591
3592         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3593                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3594         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3595                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3596
3597         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3598                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3599         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3600                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3601
3602         /* Add a bit of hysteresis so this flag isn't continually flapping,
3603          * and ensure that new files don't get extremely fragmented due to
3604          * only a small amount of available space in the filesystem.
3605          * We want to set the NOSPC flag when there is less than ~0.1% free
3606          * and clear it when there is at least ~0.2% free space, so:
3607          *                   avail < ~0.1% max          max = avail + used
3608          *            1025 * avail < avail + used       used = blocks - free
3609          *            1024 * avail < used
3610          *            1024 * avail < blocks - free                      
3611          *                   avail < ((blocks - free) >> 10)    
3612          *
3613          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3614          * lose that amount of space so in those cases we report no space left
3615          * if their is less than 1 GB left.                             */
3616         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3617         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3618                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3619                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3620         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3621                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3622                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3623
3624         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3625
3626         *aa->aa_oi->oi_osfs = *msfs;
3627 out:
3628         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3629         RETURN(rc);
3630 }
3631
3632 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3633                             __u64 max_age, struct ptlrpc_request_set *rqset)
3634 {
3635         struct ptlrpc_request *req;
3636         struct osc_async_args *aa;
3637         int                    rc;
3638         ENTRY;
3639
3640         /* We could possibly pass max_age in the request (as an absolute
3641          * timestamp or a "seconds.usec ago") so the target can avoid doing
3642          * extra calls into the filesystem if that isn't necessary (e.g.
3643          * during mount that would help a bit).  Having relative timestamps
3644          * is not so great if request processing is slow, while absolute
3645          * timestamps are not ideal because they need time synchronization. */
3646         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3647         if (req == NULL)
3648                 RETURN(-ENOMEM);
3649
3650         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3651         if (rc) {
3652                 ptlrpc_request_free(req);
3653                 RETURN(rc);
3654         }
3655         ptlrpc_request_set_replen(req);
3656         req->rq_request_portal = OST_CREATE_PORTAL;
3657         ptlrpc_at_set_req_timeout(req);
3658
3659         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3660                 /* procfs requests not want stat in wait for avoid deadlock */
3661                 req->rq_no_resend = 1;
3662                 req->rq_no_delay = 1;
3663         }
3664
3665         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3666         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3667         aa = ptlrpc_req_async_args(req);
3668         aa->aa_oi = oinfo;
3669
3670         ptlrpc_set_add_req(rqset, req);
3671         RETURN(0);
3672 }
3673
3674 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3675                       __u64 max_age, __u32 flags)
3676 {
3677         struct obd_statfs     *msfs;
3678         struct ptlrpc_request *req;
3679         struct obd_import     *imp = NULL;
3680         int rc;
3681         ENTRY;
3682
3683         /*Since the request might also come from lprocfs, so we need
3684          *sync this with client_disconnect_export Bug15684*/
3685         cfs_down_read(&obd->u.cli.cl_sem);
3686         if (obd->u.cli.cl_import)
3687                 imp = class_import_get(obd->u.cli.cl_import);
3688         cfs_up_read(&obd->u.cli.cl_sem);
3689         if (!imp)
3690                 RETURN(-ENODEV);
3691
3692         /* We could possibly pass max_age in the request (as an absolute
3693          * timestamp or a "seconds.usec ago") so the target can avoid doing
3694          * extra calls into the filesystem if that isn't necessary (e.g.
3695          * during mount that would help a bit).  Having relative timestamps
3696          * is not so great if request processing is slow, while absolute
3697          * timestamps are not ideal because they need time synchronization. */
3698         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3699
3700         class_import_put(imp);
3701
3702         if (req == NULL)
3703                 RETURN(-ENOMEM);
3704
3705         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3706         if (rc) {
3707                 ptlrpc_request_free(req);
3708                 RETURN(rc);
3709         }
3710         ptlrpc_request_set_replen(req);
3711         req->rq_request_portal = OST_CREATE_PORTAL;
3712         ptlrpc_at_set_req_timeout(req);
3713
3714         if (flags & OBD_STATFS_NODELAY) {
3715                 /* procfs requests not want stat in wait for avoid deadlock */
3716                 req->rq_no_resend = 1;
3717                 req->rq_no_delay = 1;
3718         }
3719
3720         rc = ptlrpc_queue_wait(req);
3721         if (rc)
3722                 GOTO(out, rc);
3723
3724         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3725         if (msfs == NULL) {
3726                 GOTO(out, rc = -EPROTO);
3727         }
3728
3729         *osfs = *msfs;
3730
3731         EXIT;
3732  out:
3733         ptlrpc_req_finished(req);
3734         return rc;
3735 }
3736
3737 /* Retrieve object striping information.
3738  *
3739  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3740  * the maximum number of OST indices which will fit in the user buffer.
3741  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3742  */
3743 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3744 {
3745         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3746         struct lov_user_md_v3 lum, *lumk;
3747         struct lov_user_ost_data_v1 *lmm_objects;
3748         int rc = 0, lum_size;
3749         ENTRY;
3750
3751         if (!lsm)
3752                 RETURN(-ENODATA);
3753
3754         /* we only need the header part from user space to get lmm_magic and
3755          * lmm_stripe_count, (the header part is common to v1 and v3) */
3756         lum_size = sizeof(struct lov_user_md_v1);
3757         if (cfs_copy_from_user(&lum, lump, lum_size))
3758                 RETURN(-EFAULT);
3759
3760         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3761             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3762                 RETURN(-EINVAL);
3763
3764         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3765         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3766         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3767         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3768
3769         /* we can use lov_mds_md_size() to compute lum_size
3770          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3771         if (lum.lmm_stripe_count > 0) {
3772                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3773                 OBD_ALLOC(lumk, lum_size);
3774                 if (!lumk)
3775                         RETURN(-ENOMEM);
3776
3777                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3778                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3779                 else
3780                         lmm_objects = &(lumk->lmm_objects[0]);
3781                 lmm_objects->l_object_id = lsm->lsm_object_id;
3782         } else {
3783                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3784                 lumk = &lum;
3785         }
3786
3787         lumk->lmm_object_id = lsm->lsm_object_id;
3788         lumk->lmm_object_seq = lsm->lsm_object_seq;
3789         lumk->lmm_stripe_count = 1;
3790
3791         if (cfs_copy_to_user(lump, lumk, lum_size))
3792                 rc = -EFAULT;
3793
3794         if (lumk != &lum)
3795                 OBD_FREE(lumk, lum_size);
3796
3797         RETURN(rc);
3798 }
3799
3800
3801 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3802                          void *karg, void *uarg)
3803 {
3804         struct obd_device *obd = exp->exp_obd;
3805         struct obd_ioctl_data *data = karg;
3806         int err = 0;
3807         ENTRY;
3808
3809         if (!cfs_try_module_get(THIS_MODULE)) {
3810                 CERROR("Can't get module. Is it alive?");
3811                 return -EINVAL;
3812         }
3813         switch (cmd) {
3814         case OBD_IOC_LOV_GET_CONFIG: {
3815                 char *buf;
3816                 struct lov_desc *desc;
3817                 struct obd_uuid uuid;
3818
3819                 buf = NULL;
3820                 len = 0;
3821                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3822                         GOTO(out, err = -EINVAL);
3823
3824                 data = (struct obd_ioctl_data *)buf;
3825
3826                 if (sizeof(*desc) > data->ioc_inllen1) {
3827                         obd_ioctl_freedata(buf, len);
3828                         GOTO(out, err = -EINVAL);
3829                 }
3830
3831                 if (data->ioc_inllen2 < sizeof(uuid)) {
3832                         obd_ioctl_freedata(buf, len);
3833                         GOTO(out, err = -EINVAL);
3834                 }
3835
3836                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3837                 desc->ld_tgt_count = 1;
3838                 desc->ld_active_tgt_count = 1;
3839                 desc->ld_default_stripe_count = 1;
3840                 desc->ld_default_stripe_size = 0;
3841                 desc->ld_default_stripe_offset = 0;
3842                 desc->ld_pattern = 0;
3843                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3844
3845                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3846
3847                 err = cfs_copy_to_user((void *)uarg, buf, len);
3848                 if (err)
3849                         err = -EFAULT;
3850                 obd_ioctl_freedata(buf, len);
3851                 GOTO(out, err);
3852         }
3853         case LL_IOC_LOV_SETSTRIPE:
3854                 err = obd_alloc_memmd(exp, karg);
3855                 if (err > 0)
3856                         err = 0;
3857                 GOTO(out, err);
3858         case LL_IOC_LOV_GETSTRIPE:
3859                 err = osc_getstripe(karg, uarg);
3860                 GOTO(out, err);
3861         case OBD_IOC_CLIENT_RECOVER:
3862                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3863                                             data->ioc_inlbuf1);
3864                 if (err > 0)
3865                         err = 0;
3866                 GOTO(out, err);
3867         case IOC_OSC_SET_ACTIVE:
3868                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3869                                                data->ioc_offset);
3870                 GOTO(out, err);
3871         case OBD_IOC_POLL_QUOTACHECK:
3872                 err = lquota_poll_check(quota_interface, exp,
3873                                         (struct if_quotacheck *)karg);
3874                 GOTO(out, err);
3875         case OBD_IOC_PING_TARGET:
3876                 err = ptlrpc_obd_ping(obd);
3877                 GOTO(out, err);
3878         default:
3879                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3880                        cmd, cfs_curproc_comm());
3881                 GOTO(out, err = -ENOTTY);
3882         }
3883 out:
3884         cfs_module_put(THIS_MODULE);
3885         return err;
3886 }
3887
3888 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3889                         void *key, __u32 *vallen, void *val,
3890                         struct lov_stripe_md *lsm)
3891 {
3892         ENTRY;
3893         if (!vallen || !val)
3894                 RETURN(-EFAULT);
3895
3896         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3897                 __u32 *stripe = val;
3898                 *vallen = sizeof(*stripe);
3899                 *stripe = 0;
3900                 RETURN(0);
3901         } else if (KEY_IS(KEY_LAST_ID)) {
3902                 struct ptlrpc_request *req;
3903                 obd_id                *reply;
3904                 char                  *tmp;
3905                 int                    rc;
3906
3907                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3908                                            &RQF_OST_GET_INFO_LAST_ID);
3909                 if (req == NULL)
3910                         RETURN(-ENOMEM);
3911
3912                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3913                                      RCL_CLIENT, keylen);
3914                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3915                 if (rc) {
3916                         ptlrpc_request_free(req);
3917                         RETURN(rc);
3918                 }
3919
3920                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3921                 memcpy(tmp, key, keylen);
3922
3923                 req->rq_no_delay = req->rq_no_resend = 1;
3924                 ptlrpc_request_set_replen(req);
3925                 rc = ptlrpc_queue_wait(req);
3926                 if (rc)
3927                         GOTO(out, rc);
3928
3929                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3930                 if (reply == NULL)
3931                         GOTO(out, rc = -EPROTO);
3932
3933                 *((obd_id *)val) = *reply;
3934         out:
3935                 ptlrpc_req_finished(req);
3936                 RETURN(rc);
3937         } else if (KEY_IS(KEY_FIEMAP)) {
3938                 struct ptlrpc_request *req;
3939                 struct ll_user_fiemap *reply;
3940                 char *tmp;
3941                 int rc;
3942
3943                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3944                                            &RQF_OST_GET_INFO_FIEMAP);
3945                 if (req == NULL)
3946                         RETURN(-ENOMEM);
3947
3948                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3949                                      RCL_CLIENT, keylen);
3950                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3951                                      RCL_CLIENT, *vallen);
3952                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3953                                      RCL_SERVER, *vallen);
3954
3955                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3956                 if (rc) {
3957                         ptlrpc_request_free(req);
3958                         RETURN(rc);
3959                 }
3960
3961                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3962                 memcpy(tmp, key, keylen);
3963                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3964                 memcpy(tmp, val, *vallen);
3965
3966                 ptlrpc_request_set_replen(req);
3967                 rc = ptlrpc_queue_wait(req);
3968                 if (rc)
3969                         GOTO(out1, rc);
3970
3971                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3972                 if (reply == NULL)
3973                         GOTO(out1, rc = -EPROTO);
3974
3975                 memcpy(val, reply, *vallen);
3976         out1:
3977                 ptlrpc_req_finished(req);
3978
3979                 RETURN(rc);
3980         }
3981
3982         RETURN(-EINVAL);
3983 }
3984
3985 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3986 {
3987         struct llog_ctxt *ctxt;
3988         int rc = 0;
3989         ENTRY;
3990
3991         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3992         if (ctxt) {
3993                 rc = llog_initiator_connect(ctxt);
3994                 llog_ctxt_put(ctxt);
3995         } else {
3996                 /* XXX return an error? skip setting below flags? */
3997         }
3998
3999         cfs_spin_lock(&imp->imp_lock);
4000         imp->imp_server_timeout = 1;
4001         imp->imp_pingable = 1;
4002         cfs_spin_unlock(&imp->imp_lock);
4003         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4004
4005         RETURN(rc);
4006 }
4007
4008 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4009                                           struct ptlrpc_request *req,
4010                                           void *aa, int rc)
4011 {
4012         ENTRY;
4013         if (rc != 0)
4014                 RETURN(rc);
4015
4016         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4017 }
4018
4019 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4020                               void *key, obd_count vallen, void *val,
4021                               struct ptlrpc_request_set *set)
4022 {
4023         struct ptlrpc_request *req;
4024         struct obd_device     *obd = exp->exp_obd;
4025         struct obd_import     *imp = class_exp2cliimp(exp);
4026         char                  *tmp;
4027         int                    rc;
4028         ENTRY;
4029
4030         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4031
4032         if (KEY_IS(KEY_NEXT_ID)) {
4033                 obd_id new_val;
4034                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4035
4036                 if (vallen != sizeof(obd_id))
4037                         RETURN(-ERANGE);
4038                 if (val == NULL)
4039                         RETURN(-EINVAL);
4040
4041                 if (vallen != sizeof(obd_id))
4042                         RETURN(-EINVAL);
4043
4044                 /* avoid race between allocate new object and set next id
4045                  * from ll_sync thread */
4046                 cfs_spin_lock(&oscc->oscc_lock);
4047                 new_val = *((obd_id*)val) + 1;
4048                 if (new_val > oscc->oscc_next_id)
4049                         oscc->oscc_next_id = new_val;
4050                 cfs_spin_unlock(&oscc->oscc_lock);
4051                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4052                        exp->exp_obd->obd_name,
4053                        obd->u.cli.cl_oscc.oscc_next_id);
4054
4055                 RETURN(0);
4056         }
4057
4058         if (KEY_IS(KEY_CHECKSUM)) {
4059                 if (vallen != sizeof(int))
4060                         RETURN(-EINVAL);
4061                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4062                 RETURN(0);
4063         }
4064
4065         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4066                 sptlrpc_conf_client_adapt(obd);
4067                 RETURN(0);
4068         }
4069
4070         if (KEY_IS(KEY_FLUSH_CTX)) {
4071                 sptlrpc_import_flush_my_ctx(imp);
4072                 RETURN(0);
4073         }
4074
4075         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4076                 RETURN(-EINVAL);
4077
4078         /* We pass all other commands directly to OST. Since nobody calls osc
4079            methods directly and everybody is supposed to go through LOV, we
4080            assume lov checked invalid values for us.
4081            The only recognised values so far are evict_by_nid and mds_conn.
4082            Even if something bad goes through, we'd get a -EINVAL from OST
4083            anyway. */
4084
4085         if (KEY_IS(KEY_GRANT_SHRINK))
4086                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4087         else
4088                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4089
4090         if (req == NULL)
4091                 RETURN(-ENOMEM);
4092
4093         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4094                              RCL_CLIENT, keylen);
4095         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4096                              RCL_CLIENT, vallen);
4097         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4098         if (rc) {
4099                 ptlrpc_request_free(req);
4100                 RETURN(rc);
4101         }
4102
4103         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4104         memcpy(tmp, key, keylen);
4105         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4106         memcpy(tmp, val, vallen);
4107
4108         if (KEY_IS(KEY_MDS_CONN)) {
4109                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4110
4111                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4112                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4113                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4114                 req->rq_no_delay = req->rq_no_resend = 1;
4115                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4116         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4117                 struct osc_grant_args *aa;
4118                 struct obdo *oa;
4119
4120                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4121                 aa = ptlrpc_req_async_args(req);
4122                 OBDO_ALLOC(oa);
4123                 if (!oa) {
4124                         ptlrpc_req_finished(req);
4125                         RETURN(-ENOMEM);
4126                 }
4127                 *oa = ((struct ost_body *)val)->oa;
4128                 aa->aa_oa = oa;
4129                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4130         }
4131
4132         ptlrpc_request_set_replen(req);
4133         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4134                 LASSERT(set != NULL);
4135                 ptlrpc_set_add_req(set, req);
4136                 ptlrpc_check_set(NULL, set);
4137         } else
4138                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4139
4140         RETURN(0);
4141 }
4142
4143
4144 static struct llog_operations osc_size_repl_logops = {
4145         lop_cancel: llog_obd_repl_cancel
4146 };
4147
4148 static struct llog_operations osc_mds_ost_orig_logops;
4149
4150 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4151                            struct obd_device *tgt, struct llog_catid *catid)
4152 {
4153         int rc;
4154         ENTRY;
4155
4156         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4157                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4158         if (rc) {
4159                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4160                 GOTO(out, rc);
4161         }
4162
4163         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4164                         NULL, &osc_size_repl_logops);
4165         if (rc) {
4166                 struct llog_ctxt *ctxt =
4167                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4168                 if (ctxt)
4169                         llog_cleanup(ctxt);
4170                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4171         }
4172         GOTO(out, rc);
4173 out:
4174         if (rc) {
4175                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4176                        obd->obd_name, tgt->obd_name, catid, rc);
4177                 CERROR("logid "LPX64":0x%x\n",
4178                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4179         }
4180         return rc;
4181 }
4182
4183 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4184                          struct obd_device *disk_obd, int *index)
4185 {
4186         struct llog_catid catid;
4187         static char name[32] = CATLIST;
4188         int rc;
4189         ENTRY;
4190
4191         LASSERT(olg == &obd->obd_olg);
4192
4193         cfs_mutex_down(&olg->olg_cat_processing);
4194         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4195         if (rc) {
4196                 CERROR("rc: %d\n", rc);
4197                 GOTO(out, rc);
4198         }
4199
4200         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4201                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4202                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4203
4204         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4205         if (rc) {
4206                 CERROR("rc: %d\n", rc);
4207                 GOTO(out, rc);
4208         }
4209
4210         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4211         if (rc) {
4212                 CERROR("rc: %d\n", rc);
4213                 GOTO(out, rc);
4214         }
4215
4216  out:
4217         cfs_mutex_up(&olg->olg_cat_processing);
4218
4219         return rc;
4220 }
4221
4222 static int osc_llog_finish(struct obd_device *obd, int count)
4223 {
4224         struct llog_ctxt *ctxt;
4225         int rc = 0, rc2 = 0;
4226         ENTRY;
4227
4228         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4229         if (ctxt)
4230                 rc = llog_cleanup(ctxt);
4231
4232         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4233         if (ctxt)
4234                 rc2 = llog_cleanup(ctxt);
4235         if (!rc)
4236                 rc = rc2;
4237
4238         RETURN(rc);
4239 }
4240
4241 static int osc_reconnect(const struct lu_env *env,
4242                          struct obd_export *exp, struct obd_device *obd,
4243                          struct obd_uuid *cluuid,
4244                          struct obd_connect_data *data,
4245                          void *localdata)
4246 {
4247         struct client_obd *cli = &obd->u.cli;
4248
4249         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4250                 long lost_grant;
4251
4252                 client_obd_list_lock(&cli->cl_loi_list_lock);
4253                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4254                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4255                 lost_grant = cli->cl_lost_grant;
4256                 cli->cl_lost_grant = 0;
4257                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4258
4259                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4260                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4261                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4262                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4263                        " ocd_grant: %d\n", data->ocd_connect_flags,
4264                        data->ocd_version, data->ocd_grant);
4265         }
4266
4267         RETURN(0);
4268 }
4269
4270 static int osc_disconnect(struct obd_export *exp)
4271 {
4272         struct obd_device *obd = class_exp2obd(exp);
4273         struct llog_ctxt  *ctxt;
4274         int rc;
4275
4276         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4277         if (ctxt) {
4278                 if (obd->u.cli.cl_conn_count == 1) {
4279                         /* Flush any remaining cancel messages out to the
4280                          * target */
4281                         llog_sync(ctxt, exp);
4282                 }
4283                 llog_ctxt_put(ctxt);
4284         } else {
4285                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4286                        obd);
4287         }
4288
4289         rc = client_disconnect_export(exp);
4290         /**
4291          * Initially we put del_shrink_grant before disconnect_export, but it
4292          * causes the following problem if setup (connect) and cleanup
4293          * (disconnect) are tangled together.
4294          *      connect p1                     disconnect p2
4295          *   ptlrpc_connect_import
4296          *     ...............               class_manual_cleanup
4297          *                                     osc_disconnect
4298          *                                     del_shrink_grant
4299          *   ptlrpc_connect_interrupt
4300          *     init_grant_shrink
4301          *   add this client to shrink list
4302          *                                      cleanup_osc
4303          * Bang! pinger trigger the shrink.
4304          * So the osc should be disconnected from the shrink list, after we
4305          * are sure the import has been destroyed. BUG18662
4306          */
4307         if (obd->u.cli.cl_import == NULL)
4308                 osc_del_shrink_grant(&obd->u.cli);
4309         return rc;
4310 }
4311
4312 static int osc_import_event(struct obd_device *obd,
4313                             struct obd_import *imp,
4314                             enum obd_import_event event)
4315 {
4316         struct client_obd *cli;
4317         int rc = 0;
4318
4319         ENTRY;
4320         LASSERT(imp->imp_obd == obd);
4321
4322         switch (event) {
4323         case IMP_EVENT_DISCON: {
4324                 /* Only do this on the MDS OSC's */
4325                 if (imp->imp_server_timeout) {
4326                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4327
4328                         cfs_spin_lock(&oscc->oscc_lock);
4329                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4330                         cfs_spin_unlock(&oscc->oscc_lock);
4331                 }
4332                 cli = &obd->u.cli;
4333                 client_obd_list_lock(&cli->cl_loi_list_lock);
4334                 cli->cl_avail_grant = 0;
4335                 cli->cl_lost_grant = 0;
4336                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4337                 break;
4338         }
4339         case IMP_EVENT_INACTIVE: {
4340                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4341                 break;
4342         }
4343         case IMP_EVENT_INVALIDATE: {
4344                 struct ldlm_namespace *ns = obd->obd_namespace;
4345                 struct lu_env         *env;
4346                 int                    refcheck;
4347
4348                 env = cl_env_get(&refcheck);
4349                 if (!IS_ERR(env)) {
4350                         /* Reset grants */
4351                         cli = &obd->u.cli;
4352                         client_obd_list_lock(&cli->cl_loi_list_lock);
4353                         /* all pages go to failing rpcs due to the invalid
4354                          * import */
4355                         osc_check_rpcs(env, cli);
4356                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4357
4358                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4359                         cl_env_put(env, &refcheck);
4360                 } else
4361                         rc = PTR_ERR(env);
4362                 break;
4363         }
4364         case IMP_EVENT_ACTIVE: {
4365                 /* Only do this on the MDS OSC's */
4366                 if (imp->imp_server_timeout) {
4367                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4368
4369                         cfs_spin_lock(&oscc->oscc_lock);
4370                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4371                         cfs_spin_unlock(&oscc->oscc_lock);
4372                 }
4373                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4374                 break;
4375         }
4376         case IMP_EVENT_OCD: {
4377                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4378
4379                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4380                         osc_init_grant(&obd->u.cli, ocd);
4381
4382                 /* See bug 7198 */
4383                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4384                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4385
4386                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4387                 break;
4388         }
4389         case IMP_EVENT_DEACTIVATE: {
4390                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4391                 break;
4392         }
4393         case IMP_EVENT_ACTIVATE: {
4394                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4395                 break;
4396         }
4397         default:
4398                 CERROR("Unknown import event %d\n", event);
4399                 LBUG();
4400         }
4401         RETURN(rc);
4402 }
4403
4404 /**
4405  * Determine whether the lock can be canceled before replaying the lock
4406  * during recovery, see bug16774 for detailed information.
4407  *
4408  * \retval zero the lock can't be canceled
4409  * \retval other ok to cancel
4410  */
4411 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4412 {
4413         check_res_locked(lock->l_resource);
4414
4415         /*
4416          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4417          *
4418          * XXX as a future improvement, we can also cancel unused write lock
4419          * if it doesn't have dirty data and active mmaps.
4420          */
4421         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4422             (lock->l_granted_mode == LCK_PR ||
4423              lock->l_granted_mode == LCK_CR) &&
4424             (osc_dlm_lock_pageref(lock) == 0))
4425                 RETURN(1);
4426
4427         RETURN(0);
4428 }
4429
4430 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4431 {
4432         int rc;
4433         ENTRY;
4434
4435         ENTRY;
4436         rc = ptlrpcd_addref();
4437         if (rc)
4438                 RETURN(rc);
4439
4440         rc = client_obd_setup(obd, lcfg);
4441         if (rc) {
4442                 ptlrpcd_decref();
4443         } else {
4444                 struct lprocfs_static_vars lvars = { 0 };
4445                 struct client_obd *cli = &obd->u.cli;
4446
4447                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4448                 lprocfs_osc_init_vars(&lvars);
4449                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4450                         lproc_osc_attach_seqstat(obd);
4451                         sptlrpc_lprocfs_cliobd_attach(obd);
4452                         ptlrpc_lprocfs_register_obd(obd);
4453                 }
4454
4455                 oscc_init(obd);
4456                 /* We need to allocate a few requests more, because
4457                    brw_interpret tries to create new requests before freeing
4458                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4459                    reserved, but I afraid that might be too much wasted RAM
4460                    in fact, so 2 is just my guess and still should work. */
4461                 cli->cl_import->imp_rq_pool =
4462                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4463                                             OST_MAXREQSIZE,
4464                                             ptlrpc_add_rqs_to_pool);
4465
4466                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4467                 cfs_sema_init(&cli->cl_grant_sem, 1);
4468
4469                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4470         }
4471
4472         RETURN(rc);
4473 }
4474
4475 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4476 {
4477         int rc = 0;
4478         ENTRY;
4479
4480         switch (stage) {
4481         case OBD_CLEANUP_EARLY: {
4482                 struct obd_import *imp;
4483                 imp = obd->u.cli.cl_import;
4484                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4485                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4486                 ptlrpc_deactivate_import(imp);
4487                 cfs_spin_lock(&imp->imp_lock);
4488                 imp->imp_pingable = 0;
4489                 cfs_spin_unlock(&imp->imp_lock);
4490                 break;
4491         }
4492         case OBD_CLEANUP_EXPORTS: {
4493                 /* If we set up but never connected, the
4494                    client import will not have been cleaned. */
4495                 if (obd->u.cli.cl_import) {
4496                         struct obd_import *imp;
4497                         cfs_down_write(&obd->u.cli.cl_sem);
4498                         imp = obd->u.cli.cl_import;
4499                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4500                                obd->obd_name);
4501                         ptlrpc_invalidate_import(imp);
4502                         if (imp->imp_rq_pool) {
4503                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4504                                 imp->imp_rq_pool = NULL;
4505                         }
4506                         class_destroy_import(imp);
4507                         cfs_up_write(&obd->u.cli.cl_sem);
4508                         obd->u.cli.cl_import = NULL;
4509                 }
4510                 rc = obd_llog_finish(obd, 0);
4511                 if (rc != 0)
4512                         CERROR("failed to cleanup llogging subsystems\n");
4513                 break;
4514                 }
4515         }
4516         RETURN(rc);
4517 }
4518
4519 int osc_cleanup(struct obd_device *obd)
4520 {
4521         int rc;
4522
4523         ENTRY;
4524         ptlrpc_lprocfs_unregister_obd(obd);
4525         lprocfs_obd_cleanup(obd);
4526
4527         /* free memory of osc quota cache */
4528         lquota_cleanup(quota_interface, obd);
4529
4530         rc = client_obd_cleanup(obd);
4531
4532         ptlrpcd_decref();
4533         RETURN(rc);
4534 }
4535
4536 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4537 {
4538         struct lprocfs_static_vars lvars = { 0 };
4539         int rc = 0;
4540
4541         lprocfs_osc_init_vars(&lvars);
4542
4543         switch (lcfg->lcfg_command) {
4544         default:
4545                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4546                                               lcfg, obd);
4547                 if (rc > 0)
4548                         rc = 0;
4549                 break;
4550         }
4551
4552         return(rc);
4553 }
4554
4555 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4556 {
4557         return osc_process_config_base(obd, buf);
4558 }
4559
4560 struct obd_ops osc_obd_ops = {
4561         .o_owner                = THIS_MODULE,
4562         .o_setup                = osc_setup,
4563         .o_precleanup           = osc_precleanup,
4564         .o_cleanup              = osc_cleanup,
4565         .o_add_conn             = client_import_add_conn,
4566         .o_del_conn             = client_import_del_conn,
4567         .o_connect              = client_connect_import,
4568         .o_reconnect            = osc_reconnect,
4569         .o_disconnect           = osc_disconnect,
4570         .o_statfs               = osc_statfs,
4571         .o_statfs_async         = osc_statfs_async,
4572         .o_packmd               = osc_packmd,
4573         .o_unpackmd             = osc_unpackmd,
4574         .o_precreate            = osc_precreate,
4575         .o_create               = osc_create,
4576         .o_create_async         = osc_create_async,
4577         .o_destroy              = osc_destroy,
4578         .o_getattr              = osc_getattr,
4579         .o_getattr_async        = osc_getattr_async,
4580         .o_setattr              = osc_setattr,
4581         .o_setattr_async        = osc_setattr_async,
4582         .o_brw                  = osc_brw,
4583         .o_punch                = osc_punch,
4584         .o_sync                 = osc_sync,
4585         .o_enqueue              = osc_enqueue,
4586         .o_change_cbdata        = osc_change_cbdata,
4587         .o_find_cbdata          = osc_find_cbdata,
4588         .o_cancel               = osc_cancel,
4589         .o_cancel_unused        = osc_cancel_unused,
4590         .o_iocontrol            = osc_iocontrol,
4591         .o_get_info             = osc_get_info,
4592         .o_set_info_async       = osc_set_info_async,
4593         .o_import_event         = osc_import_event,
4594         .o_llog_init            = osc_llog_init,
4595         .o_llog_finish          = osc_llog_finish,
4596         .o_process_config       = osc_process_config,
4597 };
4598
4599 extern struct lu_kmem_descr osc_caches[];
4600 extern cfs_spinlock_t       osc_ast_guard;
4601 extern cfs_lock_class_key_t osc_ast_guard_class;
4602
4603 int __init osc_init(void)
4604 {
4605         struct lprocfs_static_vars lvars = { 0 };
4606         int rc;
4607         ENTRY;
4608
4609         /* print an address of _any_ initialized kernel symbol from this
4610          * module, to allow debugging with gdb that doesn't support data
4611          * symbols from modules.*/
4612         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4613
4614         rc = lu_kmem_init(osc_caches);
4615
4616         lprocfs_osc_init_vars(&lvars);
4617
4618         cfs_request_module("lquota");
4619         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4620         lquota_init(quota_interface);
4621         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4622
4623         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4624                                  LUSTRE_OSC_NAME, &osc_device_type);
4625         if (rc) {
4626                 if (quota_interface)
4627                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4628                 lu_kmem_fini(osc_caches);
4629                 RETURN(rc);
4630         }
4631
4632         cfs_spin_lock_init(&osc_ast_guard);
4633         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4634
4635         osc_mds_ost_orig_logops = llog_lvfs_ops;
4636         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4637         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4638         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4639         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4640
4641         RETURN(rc);
4642 }
4643
4644 #ifdef __KERNEL__
4645 static void /*__exit*/ osc_exit(void)
4646 {
4647         lu_device_type_fini(&osc_device_type);
4648
4649         lquota_exit(quota_interface);
4650         if (quota_interface)
4651                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4652
4653         class_unregister_type(LUSTRE_OSC_NAME);
4654         lu_kmem_fini(osc_caches);
4655 }
4656
4657 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4658 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4659 MODULE_LICENSE("GPL");
4660
4661 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4662 #endif