Whamcloud - gitweb
17f088c3114124057cbbb5d41e78e7fe26ed4621
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568                     struct lov_stripe_md *md, obd_size start, obd_size end,
569                     void *capa)
570 {
571         struct ptlrpc_request *req;
572         struct ost_body       *body;
573         int                    rc;
574         ENTRY;
575
576         if (!oa) {
577                 CDEBUG(D_INFO, "oa NULL\n");
578                 RETURN(-EINVAL);
579         }
580
581         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
582         if (req == NULL)
583                 RETURN(-ENOMEM);
584
585         osc_set_capa_size(req, &RMF_CAPA1, capa);
586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
587         if (rc) {
588                 ptlrpc_request_free(req);
589                 RETURN(rc);
590         }
591
592         /* overload the size and blocks fields in the oa with start/end */
593         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
594         LASSERT(body);
595         lustre_set_wire_obdo(&body->oa, oa);
596         body->oa.o_size = start;
597         body->oa.o_blocks = end;
598         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599         osc_pack_capa(req, body, capa);
600
601         ptlrpc_request_set_replen(req);
602
603         rc = ptlrpc_queue_wait(req);
604         if (rc)
605                 GOTO(out, rc);
606
607         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
608         if (body == NULL)
609                 GOTO(out, rc = -EPROTO);
610
611         lustre_get_wire_obdo(oa, &body->oa);
612
613         EXIT;
614  out:
615         ptlrpc_req_finished(req);
616         return rc;
617 }
618
619 /* Find and cancel locally locks matched by @mode in the resource found by
620  * @objid. Found locks are added into @cancel list. Returns the amount of
621  * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
623                                    cfs_list_t *cancels,
624                                    ldlm_mode_t mode, int lock_flags)
625 {
626         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627         struct ldlm_res_id res_id;
628         struct ldlm_resource *res;
629         int count;
630         ENTRY;
631
632         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         LDLM_RESOURCE_ADDREF(res);
638         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639                                            lock_flags, 0, NULL);
640         LDLM_RESOURCE_DELREF(res);
641         ldlm_resource_putref(res);
642         RETURN(count);
643 }
644
645 static int osc_destroy_interpret(const struct lu_env *env,
646                                  struct ptlrpc_request *req, void *data,
647                                  int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650
651         cfs_atomic_dec(&cli->cl_destroy_in_flight);
652         cfs_waitq_signal(&cli->cl_destroy_waitq);
653         return 0;
654 }
655
656 static int osc_can_send_destroy(struct client_obd *cli)
657 {
658         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659             cli->cl_max_rpcs_in_flight) {
660                 /* The destroy request can be sent */
661                 return 1;
662         }
663         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664             cli->cl_max_rpcs_in_flight) {
665                 /*
666                  * The counter has been modified between the two atomic
667                  * operations.
668                  */
669                 cfs_waitq_signal(&cli->cl_destroy_waitq);
670         }
671         return 0;
672 }
673
674 /* Destroy requests can be async always on the client, and we don't even really
675  * care about the return code since the client cannot do anything at all about
676  * a destroy failure.
677  * When the MDS is unlinking a filename, it saves the file objects into a
678  * recovery llog, and these object records are cancelled when the OST reports
679  * they were destroyed and sync'd to disk (i.e. transaction committed).
680  * If the client dies, or the OST is down when the object should be destroyed,
681  * the records are not cancelled, and when the OST reconnects to the MDS next,
682  * it will retrieve the llog unlink logs and then sends the log cancellation
683  * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
686                        struct obd_export *md_export, void *capa)
687 {
688         struct client_obd     *cli = &exp->exp_obd->u.cli;
689         struct ptlrpc_request *req;
690         struct ost_body       *body;
691         CFS_LIST_HEAD(cancels);
692         int rc, count;
693         ENTRY;
694
695         if (!oa) {
696                 CDEBUG(D_INFO, "oa NULL\n");
697                 RETURN(-EINVAL);
698         }
699
700         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701                                         LDLM_FL_DISCARD_DATA);
702
703         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
704         if (req == NULL) {
705                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
706                 RETURN(-ENOMEM);
707         }
708
709         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
711                                0, &cancels, count);
712         if (rc) {
713                 ptlrpc_request_free(req);
714                 RETURN(rc);
715         }
716
717         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718         ptlrpc_at_set_req_timeout(req);
719
720         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721                 oa->o_lcookie = *oti->oti_logcookies;
722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
723         LASSERT(body);
724         lustre_set_wire_obdo(&body->oa, oa);
725
726         osc_pack_capa(req, body, (struct obd_capa *)capa);
727         ptlrpc_request_set_replen(req);
728
729         /* don't throttle destroy RPCs for the MDT */
730         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731                 req->rq_interpret_reply = osc_destroy_interpret;
732                 if (!osc_can_send_destroy(cli)) {
733                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
734                                                           NULL);
735
736                         /*
737                          * Wait until the number of on-going destroy RPCs drops
738                          * under max_rpc_in_flight
739                          */
740                         l_wait_event_exclusive(cli->cl_destroy_waitq,
741                                                osc_can_send_destroy(cli), &lwi);
742                 }
743         }
744
745         /* Do not wait for response */
746         ptlrpcd_add_req(req, PSCOPE_OTHER);
747         RETURN(0);
748 }
749
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
751                                 long writing_bytes)
752 {
753         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
754
755         LASSERT(!(oa->o_valid & bits));
756
757         oa->o_valid |= bits;
758         client_obd_list_lock(&cli->cl_loi_list_lock);
759         oa->o_dirty = cli->cl_dirty;
760         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
763                 oa->o_undirty = 0;
764         } else if (cfs_atomic_read(&obd_dirty_pages) -
765                    cfs_atomic_read(&obd_dirty_transit_pages) >
766                    obd_max_dirty_pages + 1){
767                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768                  * not covered by a lock thus they may safely race and trip
769                  * this CERROR() unless we add in a small fudge factor (+1). */
770                 CERROR("dirty %d - %d > system dirty_max %d\n",
771                        cfs_atomic_read(&obd_dirty_pages),
772                        cfs_atomic_read(&obd_dirty_transit_pages),
773                        obd_max_dirty_pages);
774                 oa->o_undirty = 0;
775         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776                 CERROR("dirty %lu - dirty_max %lu too big???\n",
777                        cli->cl_dirty, cli->cl_dirty_max);
778                 oa->o_undirty = 0;
779         } else {
780                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781                                 (cli->cl_max_rpcs_in_flight + 1);
782                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
783         }
784         oa->o_grant = cli->cl_avail_grant;
785         oa->o_dropped = cli->cl_lost_grant;
786         cli->cl_lost_grant = 0;
787         client_obd_list_unlock(&cli->cl_loi_list_lock);
788         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
790
791 }
792
793 static void osc_update_next_shrink(struct client_obd *cli)
794 {
795         cli->cl_next_shrink_grant =
796                 cfs_time_shift(cli->cl_grant_shrink_interval);
797         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798                cli->cl_next_shrink_grant);
799 }
800
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803                                     struct brw_page *pga)
804 {
805         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807         cfs_atomic_inc(&obd_dirty_pages);
808         cli->cl_dirty += CFS_PAGE_SIZE;
809         cli->cl_avail_grant -= CFS_PAGE_SIZE;
810         pga->flag |= OBD_BRW_FROM_GRANT;
811         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812                CFS_PAGE_SIZE, pga, pga->pg);
813         LASSERT(cli->cl_avail_grant >= 0);
814         osc_update_next_shrink(cli);
815 }
816
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818  * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga, int sent)
821 {
822         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
823         ENTRY;
824
825         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827                 EXIT;
828                 return;
829         }
830
831         pga->flag &= ~OBD_BRW_FROM_GRANT;
832         cfs_atomic_dec(&obd_dirty_pages);
833         cli->cl_dirty -= CFS_PAGE_SIZE;
834         if (pga->flag & OBD_BRW_NOCACHE) {
835                 pga->flag &= ~OBD_BRW_NOCACHE;
836                 cfs_atomic_dec(&obd_dirty_transit_pages);
837                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
838         }
839         if (!sent) {
840                 cli->cl_lost_grant += CFS_PAGE_SIZE;
841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844                 /* For short writes we shouldn't count parts of pages that
845                  * span a whole block on the OST side, or our accounting goes
846                  * wrong.  Should match the code in filter_grant_check. */
847                 int offset = pga->off & ~CFS_PAGE_MASK;
848                 int count = pga->count + (offset & (blocksize - 1));
849                 int end = (offset + pga->count) & (blocksize - 1);
850                 if (end)
851                         count += blocksize - end;
852
853                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856                        cli->cl_avail_grant, cli->cl_dirty);
857         }
858
859         EXIT;
860 }
861
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
863 {
864         return cli->cl_r_in_flight + cli->cl_w_in_flight;
865 }
866
867 int osc_wake_sync_fs(struct client_obd *cli)
868 {
869         int rc = 0;
870         ENTRY;
871         if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
872             cli->cl_sf_wait.started) {
873                 cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, rc);
874                 cli->cl_sf_wait.started = 0;
875                 CDEBUG(D_CACHE, "sync_fs_loi list is empty\n");
876         }
877         RETURN(rc);
878 }
879
880 /* caller must hold loi_list_lock */
881 void osc_wake_cache_waiters(struct client_obd *cli)
882 {
883         cfs_list_t *l, *tmp;
884         struct osc_cache_waiter *ocw;
885
886         ENTRY;
887         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
888                 /* if we can't dirty more, we must wait until some is written */
889                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
890                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
891                     obd_max_dirty_pages)) {
892                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
893                                "osc max %ld, sys max %d\n", cli->cl_dirty,
894                                cli->cl_dirty_max, obd_max_dirty_pages);
895                         return;
896                 }
897
898                 /* if still dirty cache but no grant wait for pending RPCs that
899                  * may yet return us some grant before doing sync writes */
900                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
901                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
902                                cli->cl_w_in_flight);
903                         return;
904                 }
905
906                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
907                 cfs_list_del_init(&ocw->ocw_entry);
908                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
909                         /* no more RPCs in flight to return grant, do sync IO */
910                         ocw->ocw_rc = -EDQUOT;
911                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
912                 } else {
913                         osc_consume_write_grant(cli,
914                                                 &ocw->ocw_oap->oap_brw_page);
915                 }
916
917                 cfs_waitq_signal(&ocw->ocw_waitq);
918         }
919
920         EXIT;
921 }
922
923 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
924 {
925         client_obd_list_lock(&cli->cl_loi_list_lock);
926         cli->cl_avail_grant += grant;
927         client_obd_list_unlock(&cli->cl_loi_list_lock);
928 }
929
930 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
931 {
932         if (body->oa.o_valid & OBD_MD_FLGRANT) {
933                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
934                 __osc_update_grant(cli, body->oa.o_grant);
935         }
936 }
937
938 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
939                               void *key, obd_count vallen, void *val,
940                               struct ptlrpc_request_set *set);
941
942 static int osc_shrink_grant_interpret(const struct lu_env *env,
943                                       struct ptlrpc_request *req,
944                                       void *aa, int rc)
945 {
946         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
947         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
948         struct ost_body *body;
949
950         if (rc != 0) {
951                 __osc_update_grant(cli, oa->o_grant);
952                 GOTO(out, rc);
953         }
954
955         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
956         LASSERT(body);
957         osc_update_grant(cli, body);
958 out:
959         OBDO_FREE(oa);
960         return rc;
961 }
962
963 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
964 {
965         client_obd_list_lock(&cli->cl_loi_list_lock);
966         oa->o_grant = cli->cl_avail_grant / 4;
967         cli->cl_avail_grant -= oa->o_grant;
968         client_obd_list_unlock(&cli->cl_loi_list_lock);
969         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
970                 oa->o_valid |= OBD_MD_FLFLAGS;
971                 oa->o_flags = 0;
972         }
973         oa->o_flags |= OBD_FL_SHRINK_GRANT;
974         osc_update_next_shrink(cli);
975 }
976
977 /* Shrink the current grant, either from some large amount to enough for a
978  * full set of in-flight RPCs, or if we have already shrunk to that limit
979  * then to enough for a single RPC.  This avoids keeping more grant than
980  * needed, and avoids shrinking the grant piecemeal. */
981 static int osc_shrink_grant(struct client_obd *cli)
982 {
983         long target = (cli->cl_max_rpcs_in_flight + 1) *
984                       cli->cl_max_pages_per_rpc;
985
986         client_obd_list_lock(&cli->cl_loi_list_lock);
987         if (cli->cl_avail_grant <= target)
988                 target = cli->cl_max_pages_per_rpc;
989         client_obd_list_unlock(&cli->cl_loi_list_lock);
990
991         return osc_shrink_grant_to_target(cli, target);
992 }
993
994 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
995 {
996         int    rc = 0;
997         struct ost_body     *body;
998         ENTRY;
999
1000         client_obd_list_lock(&cli->cl_loi_list_lock);
1001         /* Don't shrink if we are already above or below the desired limit
1002          * We don't want to shrink below a single RPC, as that will negatively
1003          * impact block allocation and long-term performance. */
1004         if (target < cli->cl_max_pages_per_rpc)
1005                 target = cli->cl_max_pages_per_rpc;
1006
1007         if (target >= cli->cl_avail_grant) {
1008                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1009                 RETURN(0);
1010         }
1011         client_obd_list_unlock(&cli->cl_loi_list_lock);
1012
1013         OBD_ALLOC_PTR(body);
1014         if (!body)
1015                 RETURN(-ENOMEM);
1016
1017         osc_announce_cached(cli, &body->oa, 0);
1018
1019         client_obd_list_lock(&cli->cl_loi_list_lock);
1020         body->oa.o_grant = cli->cl_avail_grant - target;
1021         cli->cl_avail_grant = target;
1022         client_obd_list_unlock(&cli->cl_loi_list_lock);
1023         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1024                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1025                 body->oa.o_flags = 0;
1026         }
1027         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1028         osc_update_next_shrink(cli);
1029
1030         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1031                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1032                                 sizeof(*body), body, NULL);
1033         if (rc != 0)
1034                 __osc_update_grant(cli, body->oa.o_grant);
1035         OBD_FREE_PTR(body);
1036         RETURN(rc);
1037 }
1038
1039 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1040 static int osc_should_shrink_grant(struct client_obd *client)
1041 {
1042         cfs_time_t time = cfs_time_current();
1043         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1044
1045         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1046              OBD_CONNECT_GRANT_SHRINK) == 0)
1047                 return 0;
1048
1049         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1050                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1051                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1052                         return 1;
1053                 else
1054                         osc_update_next_shrink(client);
1055         }
1056         return 0;
1057 }
1058
1059 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1060 {
1061         struct client_obd *client;
1062
1063         cfs_list_for_each_entry(client, &item->ti_obd_list,
1064                                 cl_grant_shrink_list) {
1065                 if (osc_should_shrink_grant(client))
1066                         osc_shrink_grant(client);
1067         }
1068         return 0;
1069 }
1070
1071 static int osc_add_shrink_grant(struct client_obd *client)
1072 {
1073         int rc;
1074
1075         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1076                                        TIMEOUT_GRANT,
1077                                        osc_grant_shrink_grant_cb, NULL,
1078                                        &client->cl_grant_shrink_list);
1079         if (rc) {
1080                 CERROR("add grant client %s error %d\n",
1081                         client->cl_import->imp_obd->obd_name, rc);
1082                 return rc;
1083         }
1084         CDEBUG(D_CACHE, "add grant client %s \n",
1085                client->cl_import->imp_obd->obd_name);
1086         osc_update_next_shrink(client);
1087         return 0;
1088 }
1089
1090 static int osc_del_shrink_grant(struct client_obd *client)
1091 {
1092         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1093                                          TIMEOUT_GRANT);
1094 }
1095
1096 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1097 {
1098         /*
1099          * ocd_grant is the total grant amount we're expect to hold: if we've
1100          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1101          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1102          *
1103          * race is tolerable here: if we're evicted, but imp_state already
1104          * left EVICTED state, then cl_dirty must be 0 already.
1105          */
1106         client_obd_list_lock(&cli->cl_loi_list_lock);
1107         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1108                 cli->cl_avail_grant = ocd->ocd_grant;
1109         else
1110                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1111
1112         if (cli->cl_avail_grant < 0) {
1113                 CWARN("%s: available grant < 0, the OSS is probably not running"
1114                       " with patch from bug20278 (%ld) \n",
1115                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1116                 /* workaround for 1.6 servers which do not have 
1117                  * the patch from bug20278 */
1118                 cli->cl_avail_grant = ocd->ocd_grant;
1119         }
1120
1121         client_obd_list_unlock(&cli->cl_loi_list_lock);
1122
1123         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1124                cli->cl_import->imp_obd->obd_name,
1125                cli->cl_avail_grant, cli->cl_lost_grant);
1126
1127         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1128             cfs_list_empty(&cli->cl_grant_shrink_list))
1129                 osc_add_shrink_grant(cli);
1130 }
1131
1132 /* We assume that the reason this OSC got a short read is because it read
1133  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1134  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1135  * this stripe never got written at or beyond this stripe offset yet. */
1136 static void handle_short_read(int nob_read, obd_count page_count,
1137                               struct brw_page **pga)
1138 {
1139         char *ptr;
1140         int i = 0;
1141
1142         /* skip bytes read OK */
1143         while (nob_read > 0) {
1144                 LASSERT (page_count > 0);
1145
1146                 if (pga[i]->count > nob_read) {
1147                         /* EOF inside this page */
1148                         ptr = cfs_kmap(pga[i]->pg) +
1149                                 (pga[i]->off & ~CFS_PAGE_MASK);
1150                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1151                         cfs_kunmap(pga[i]->pg);
1152                         page_count--;
1153                         i++;
1154                         break;
1155                 }
1156
1157                 nob_read -= pga[i]->count;
1158                 page_count--;
1159                 i++;
1160         }
1161
1162         /* zero remaining pages */
1163         while (page_count-- > 0) {
1164                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1165                 memset(ptr, 0, pga[i]->count);
1166                 cfs_kunmap(pga[i]->pg);
1167                 i++;
1168         }
1169 }
1170
1171 static int check_write_rcs(struct ptlrpc_request *req,
1172                            int requested_nob, int niocount,
1173                            obd_count page_count, struct brw_page **pga)
1174 {
1175         int     i;
1176         __u32   *remote_rcs;
1177
1178         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1179                                                   sizeof(*remote_rcs) *
1180                                                   niocount);
1181         if (remote_rcs == NULL) {
1182                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1183                 return(-EPROTO);
1184         }
1185
1186         /* return error if any niobuf was in error */
1187         for (i = 0; i < niocount; i++) {
1188                 if (remote_rcs[i] < 0)
1189                         return(remote_rcs[i]);
1190
1191                 if (remote_rcs[i] != 0) {
1192                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1193                                 i, remote_rcs[i], req);
1194                         return(-EPROTO);
1195                 }
1196         }
1197
1198         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1199                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1200                        req->rq_bulk->bd_nob_transferred, requested_nob);
1201                 return(-EPROTO);
1202         }
1203
1204         return (0);
1205 }
1206
1207 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1208 {
1209         if (p1->flag != p2->flag) {
1210                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1211                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1212
1213                 /* warn if we try to combine flags that we don't know to be
1214                  * safe to combine */
1215                 if ((p1->flag & mask) != (p2->flag & mask))
1216                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1217                                "same brw?\n", p1->flag, p2->flag);
1218                 return 0;
1219         }
1220
1221         return (p1->off + p1->count == p2->off);
1222 }
1223
1224 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1225                                    struct brw_page **pga, int opc,
1226                                    cksum_type_t cksum_type)
1227 {
1228         __u32 cksum;
1229         int i = 0;
1230
1231         LASSERT (pg_count > 0);
1232         cksum = init_checksum(cksum_type);
1233         while (nob > 0 && pg_count > 0) {
1234                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1235                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1236                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1237
1238                 /* corrupt the data before we compute the checksum, to
1239                  * simulate an OST->client data error */
1240                 if (i == 0 && opc == OST_READ &&
1241                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1242                         memcpy(ptr + off, "bad1", min(4, nob));
1243                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1244                 cfs_kunmap(pga[i]->pg);
1245                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1246                                off, cksum);
1247
1248                 nob -= pga[i]->count;
1249                 pg_count--;
1250                 i++;
1251         }
1252         /* For sending we only compute the wrong checksum instead
1253          * of corrupting the data so it is still correct on a redo */
1254         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1255                 cksum++;
1256
1257         return cksum;
1258 }
1259
1260 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1261                                 struct lov_stripe_md *lsm, obd_count page_count,
1262                                 struct brw_page **pga,
1263                                 struct ptlrpc_request **reqp,
1264                                 struct obd_capa *ocapa, int reserve)
1265 {
1266         struct ptlrpc_request   *req;
1267         struct ptlrpc_bulk_desc *desc;
1268         struct ost_body         *body;
1269         struct obd_ioobj        *ioobj;
1270         struct niobuf_remote    *niobuf;
1271         int niocount, i, requested_nob, opc, rc;
1272         struct osc_brw_async_args *aa;
1273         struct req_capsule      *pill;
1274         struct brw_page *pg_prev;
1275
1276         ENTRY;
1277         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1278                 RETURN(-ENOMEM); /* Recoverable */
1279         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1280                 RETURN(-EINVAL); /* Fatal */
1281
1282         if ((cmd & OBD_BRW_WRITE) != 0) {
1283                 opc = OST_WRITE;
1284                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1285                                                 cli->cl_import->imp_rq_pool,
1286                                                 &RQF_OST_BRW_WRITE);
1287         } else {
1288                 opc = OST_READ;
1289                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1290         }
1291         if (req == NULL)
1292                 RETURN(-ENOMEM);
1293
1294         for (niocount = i = 1; i < page_count; i++) {
1295                 if (!can_merge_pages(pga[i - 1], pga[i]))
1296                         niocount++;
1297         }
1298
1299         pill = &req->rq_pill;
1300         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1301                              sizeof(*ioobj));
1302         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1303                              niocount * sizeof(*niobuf));
1304         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1305
1306         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1307         if (rc) {
1308                 ptlrpc_request_free(req);
1309                 RETURN(rc);
1310         }
1311         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1312         ptlrpc_at_set_req_timeout(req);
1313
1314         if (opc == OST_WRITE)
1315                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1316                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1317         else
1318                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1319                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1320
1321         if (desc == NULL)
1322                 GOTO(out, rc = -ENOMEM);
1323         /* NB request now owns desc and will free it when it gets freed */
1324
1325         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1326         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1327         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1328         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1329
1330         lustre_set_wire_obdo(&body->oa, oa);
1331
1332         obdo_to_ioobj(oa, ioobj);
1333         ioobj->ioo_bufcnt = niocount;
1334         osc_pack_capa(req, body, ocapa);
1335         LASSERT (page_count > 0);
1336         pg_prev = pga[0];
1337         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1338                 struct brw_page *pg = pga[i];
1339
1340                 LASSERT(pg->count > 0);
1341                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1342                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1343                          pg->off, pg->count);
1344 #ifdef __linux__
1345                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1346                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1347                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1348                          i, page_count,
1349                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1350                          pg_prev->pg, page_private(pg_prev->pg),
1351                          pg_prev->pg->index, pg_prev->off);
1352 #else
1353                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1354                          "i %d p_c %u\n", i, page_count);
1355 #endif
1356                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1357                         (pg->flag & OBD_BRW_SRVLOCK));
1358
1359                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1360                                       pg->count);
1361                 requested_nob += pg->count;
1362
1363                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1364                         niobuf--;
1365                         niobuf->len += pg->count;
1366                 } else {
1367                         niobuf->offset = pg->off;
1368                         niobuf->len    = pg->count;
1369                         niobuf->flags  = pg->flag;
1370                 }
1371                 pg_prev = pg;
1372         }
1373
1374         LASSERTF((void *)(niobuf - niocount) ==
1375                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1376                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1377                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1378
1379         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1380         if (osc_should_shrink_grant(cli))
1381                 osc_shrink_grant_local(cli, &body->oa);
1382
1383         /* size[REQ_REC_OFF] still sizeof (*body) */
1384         if (opc == OST_WRITE) {
1385                 if (unlikely(cli->cl_checksum) &&
1386                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387                         /* store cl_cksum_type in a local variable since
1388                          * it can be changed via lprocfs */
1389                         cksum_type_t cksum_type = cli->cl_cksum_type;
1390
1391                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1392                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1393                                 body->oa.o_flags = 0;
1394                         }
1395                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1396                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1397                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1398                                                              page_count, pga,
1399                                                              OST_WRITE,
1400                                                              cksum_type);
1401                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1402                                body->oa.o_cksum);
1403                         /* save this in 'oa', too, for later checking */
1404                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1405                         oa->o_flags |= cksum_type_pack(cksum_type);
1406                 } else {
1407                         /* clear out the checksum flag, in case this is a
1408                          * resend but cl_checksum is no longer set. b=11238 */
1409                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1410                 }
1411                 oa->o_cksum = body->oa.o_cksum;
1412                 /* 1 RC per niobuf */
1413                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1414                                      sizeof(__u32) * niocount);
1415         } else {
1416                 if (unlikely(cli->cl_checksum) &&
1417                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1418                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1419                                 body->oa.o_flags = 0;
1420                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1421                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1422                 }
1423         }
1424         ptlrpc_request_set_replen(req);
1425
1426         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1427         aa = ptlrpc_req_async_args(req);
1428         aa->aa_oa = oa;
1429         aa->aa_requested_nob = requested_nob;
1430         aa->aa_nio_count = niocount;
1431         aa->aa_page_count = page_count;
1432         aa->aa_resends = 0;
1433         aa->aa_ppga = pga;
1434         aa->aa_cli = cli;
1435         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1436         if (ocapa && reserve)
1437                 aa->aa_ocapa = capa_get(ocapa);
1438
1439         *reqp = req;
1440         RETURN(0);
1441
1442  out:
1443         ptlrpc_req_finished(req);
1444         RETURN(rc);
1445 }
1446
1447 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1448                                 __u32 client_cksum, __u32 server_cksum, int nob,
1449                                 obd_count page_count, struct brw_page **pga,
1450                                 cksum_type_t client_cksum_type)
1451 {
1452         __u32 new_cksum;
1453         char *msg;
1454         cksum_type_t cksum_type;
1455
1456         if (server_cksum == client_cksum) {
1457                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1458                 return 0;
1459         }
1460
1461         /* If this is mmaped file - it can be changed at any time */
1462         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1463                 return 1;
1464
1465         if (oa->o_valid & OBD_MD_FLFLAGS)
1466                 cksum_type = cksum_type_unpack(oa->o_flags);
1467         else
1468                 cksum_type = OBD_CKSUM_CRC32;
1469
1470         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1471                                       cksum_type);
1472
1473         if (cksum_type != client_cksum_type)
1474                 msg = "the server did not use the checksum type specified in "
1475                       "the original request - likely a protocol problem";
1476         else if (new_cksum == server_cksum)
1477                 msg = "changed on the client after we checksummed it - "
1478                       "likely false positive due to mmap IO (bug 11742)";
1479         else if (new_cksum == client_cksum)
1480                 msg = "changed in transit before arrival at OST";
1481         else
1482                 msg = "changed in transit AND doesn't match the original - "
1483                       "likely false positive due to mmap IO (bug 11742)";
1484
1485         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1486                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1487                            msg, libcfs_nid2str(peer->nid),
1488                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1489                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1490                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1491                            oa->o_id,
1492                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1493                            pga[0]->off,
1494                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1495         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1496                "client csum now %x\n", client_cksum, client_cksum_type,
1497                server_cksum, cksum_type, new_cksum);
1498         return 1;
1499 }
1500
1501 /* Note rc enters this function as number of bytes transferred */
1502 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1503 {
1504         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1505         const lnet_process_id_t *peer =
1506                         &req->rq_import->imp_connection->c_peer;
1507         struct client_obd *cli = aa->aa_cli;
1508         struct ost_body *body;
1509         __u32 client_cksum = 0;
1510         ENTRY;
1511
1512         if (rc < 0 && rc != -EDQUOT) {
1513                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1514                 RETURN(rc);
1515         }
1516
1517         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1518         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1519         if (body == NULL) {
1520                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1521                 RETURN(-EPROTO);
1522         }
1523
1524 #ifdef HAVE_QUOTA_SUPPORT
1525         /* set/clear over quota flag for a uid/gid */
1526         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1527             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1528                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1529
1530                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1531                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1532                        body->oa.o_flags);
1533                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1534                              body->oa.o_flags);
1535         }
1536 #endif
1537
1538         osc_update_grant(cli, body);
1539
1540         if (rc < 0)
1541                 RETURN(rc);
1542
1543         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1544                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1545
1546         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1547                 if (rc > 0) {
1548                         CERROR("Unexpected +ve rc %d\n", rc);
1549                         RETURN(-EPROTO);
1550                 }
1551                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1552
1553                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1554                         RETURN(-EAGAIN);
1555
1556                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1557                     check_write_checksum(&body->oa, peer, client_cksum,
1558                                          body->oa.o_cksum, aa->aa_requested_nob,
1559                                          aa->aa_page_count, aa->aa_ppga,
1560                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1561                         RETURN(-EAGAIN);
1562
1563                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1564                                      aa->aa_page_count, aa->aa_ppga);
1565                 GOTO(out, rc);
1566         }
1567
1568         /* The rest of this function executes only for OST_READs */
1569
1570         /* if unwrap_bulk failed, return -EAGAIN to retry */
1571         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1572         if (rc < 0)
1573                 GOTO(out, rc = -EAGAIN);
1574
1575         if (rc > aa->aa_requested_nob) {
1576                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1577                        aa->aa_requested_nob);
1578                 RETURN(-EPROTO);
1579         }
1580
1581         if (rc != req->rq_bulk->bd_nob_transferred) {
1582                 CERROR ("Unexpected rc %d (%d transferred)\n",
1583                         rc, req->rq_bulk->bd_nob_transferred);
1584                 return (-EPROTO);
1585         }
1586
1587         if (rc < aa->aa_requested_nob)
1588                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1589
1590         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1591                 static int cksum_counter;
1592                 __u32      server_cksum = body->oa.o_cksum;
1593                 char      *via;
1594                 char      *router;
1595                 cksum_type_t cksum_type;
1596
1597                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1598                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1599                 else
1600                         cksum_type = OBD_CKSUM_CRC32;
1601                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1602                                                  aa->aa_ppga, OST_READ,
1603                                                  cksum_type);
1604
1605                 if (peer->nid == req->rq_bulk->bd_sender) {
1606                         via = router = "";
1607                 } else {
1608                         via = " via ";
1609                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1610                 }
1611
1612                 if (server_cksum == ~0 && rc > 0) {
1613                         CERROR("Protocol error: server %s set the 'checksum' "
1614                                "bit, but didn't send a checksum.  Not fatal, "
1615                                "but please notify on http://bugzilla.lustre.org/\n",
1616                                libcfs_nid2str(peer->nid));
1617                 } else if (server_cksum != client_cksum) {
1618                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1619                                            "%s%s%s inode "DFID" object "
1620                                            LPU64"/"LPU64" extent "
1621                                            "["LPU64"-"LPU64"]\n",
1622                                            req->rq_import->imp_obd->obd_name,
1623                                            libcfs_nid2str(peer->nid),
1624                                            via, router,
1625                                            body->oa.o_valid & OBD_MD_FLFID ?
1626                                                 body->oa.o_parent_seq : (__u64)0,
1627                                            body->oa.o_valid & OBD_MD_FLFID ?
1628                                                 body->oa.o_parent_oid : 0,
1629                                            body->oa.o_valid & OBD_MD_FLFID ?
1630                                                 body->oa.o_parent_ver : 0,
1631                                            body->oa.o_id,
1632                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1633                                                 body->oa.o_seq : (__u64)0,
1634                                            aa->aa_ppga[0]->off,
1635                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1636                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1637                                                                         1);
1638                         CERROR("client %x, server %x, cksum_type %x\n",
1639                                client_cksum, server_cksum, cksum_type);
1640                         cksum_counter = 0;
1641                         aa->aa_oa->o_cksum = client_cksum;
1642                         rc = -EAGAIN;
1643                 } else {
1644                         cksum_counter++;
1645                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1646                         rc = 0;
1647                 }
1648         } else if (unlikely(client_cksum)) {
1649                 static int cksum_missed;
1650
1651                 cksum_missed++;
1652                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1653                         CERROR("Checksum %u requested from %s but not sent\n",
1654                                cksum_missed, libcfs_nid2str(peer->nid));
1655         } else {
1656                 rc = 0;
1657         }
1658 out:
1659         if (rc >= 0)
1660                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1661
1662         RETURN(rc);
1663 }
1664
1665 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1666                             struct lov_stripe_md *lsm,
1667                             obd_count page_count, struct brw_page **pga,
1668                             struct obd_capa *ocapa)
1669 {
1670         struct ptlrpc_request *req;
1671         int                    rc;
1672         cfs_waitq_t            waitq;
1673         int                    resends = 0;
1674         struct l_wait_info     lwi;
1675
1676         ENTRY;
1677
1678         cfs_waitq_init(&waitq);
1679
1680 restart_bulk:
1681         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1682                                   page_count, pga, &req, ocapa, 0);
1683         if (rc != 0)
1684                 return (rc);
1685
1686         rc = ptlrpc_queue_wait(req);
1687
1688         if (rc == -ETIMEDOUT && req->rq_resend) {
1689                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1690                 ptlrpc_req_finished(req);
1691                 goto restart_bulk;
1692         }
1693
1694         rc = osc_brw_fini_request(req, rc);
1695
1696         ptlrpc_req_finished(req);
1697         if (osc_recoverable_error(rc)) {
1698                 resends++;
1699                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1700                         CERROR("too many resend retries, returning error\n");
1701                         RETURN(-EIO);
1702                 }
1703
1704                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1705                 l_wait_event(waitq, 0, &lwi);
1706
1707                 goto restart_bulk;
1708         }
1709
1710         RETURN (rc);
1711 }
1712
1713 int osc_brw_redo_request(struct ptlrpc_request *request,
1714                          struct osc_brw_async_args *aa)
1715 {
1716         struct ptlrpc_request *new_req;
1717         struct ptlrpc_request_set *set = request->rq_set;
1718         struct osc_brw_async_args *new_aa;
1719         struct osc_async_page *oap;
1720         int rc = 0;
1721         ENTRY;
1722
1723         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1724                 CERROR("too many resent retries, returning error\n");
1725                 RETURN(-EIO);
1726         }
1727
1728         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1729
1730         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1731                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1732                                   aa->aa_cli, aa->aa_oa,
1733                                   NULL /* lsm unused by osc currently */,
1734                                   aa->aa_page_count, aa->aa_ppga,
1735                                   &new_req, aa->aa_ocapa, 0);
1736         if (rc)
1737                 RETURN(rc);
1738
1739         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1740
1741         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1742                 if (oap->oap_request != NULL) {
1743                         LASSERTF(request == oap->oap_request,
1744                                  "request %p != oap_request %p\n",
1745                                  request, oap->oap_request);
1746                         if (oap->oap_interrupted) {
1747                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1748                                 ptlrpc_req_finished(new_req);
1749                                 RETURN(-EINTR);
1750                         }
1751                 }
1752         }
1753         /* New request takes over pga and oaps from old request.
1754          * Note that copying a list_head doesn't work, need to move it... */
1755         aa->aa_resends++;
1756         new_req->rq_interpret_reply = request->rq_interpret_reply;
1757         new_req->rq_async_args = request->rq_async_args;
1758         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1759
1760         new_aa = ptlrpc_req_async_args(new_req);
1761
1762         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1763         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1764         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1765
1766         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1767                 if (oap->oap_request) {
1768                         ptlrpc_req_finished(oap->oap_request);
1769                         oap->oap_request = ptlrpc_request_addref(new_req);
1770                 }
1771         }
1772
1773         new_aa->aa_ocapa = aa->aa_ocapa;
1774         aa->aa_ocapa = NULL;
1775
1776         /* use ptlrpc_set_add_req is safe because interpret functions work
1777          * in check_set context. only one way exist with access to request
1778          * from different thread got -EINTR - this way protected with
1779          * cl_loi_list_lock */
1780         ptlrpc_set_add_req(set, new_req);
1781
1782         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1783
1784         DEBUG_REQ(D_INFO, new_req, "new request");
1785         RETURN(0);
1786 }
1787
1788 /*
1789  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1790  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1791  * fine for our small page arrays and doesn't require allocation.  its an
1792  * insertion sort that swaps elements that are strides apart, shrinking the
1793  * stride down until its '1' and the array is sorted.
1794  */
1795 static void sort_brw_pages(struct brw_page **array, int num)
1796 {
1797         int stride, i, j;
1798         struct brw_page *tmp;
1799
1800         if (num == 1)
1801                 return;
1802         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1803                 ;
1804
1805         do {
1806                 stride /= 3;
1807                 for (i = stride ; i < num ; i++) {
1808                         tmp = array[i];
1809                         j = i;
1810                         while (j >= stride && array[j - stride]->off > tmp->off) {
1811                                 array[j] = array[j - stride];
1812                                 j -= stride;
1813                         }
1814                         array[j] = tmp;
1815                 }
1816         } while (stride > 1);
1817 }
1818
1819 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1820 {
1821         int count = 1;
1822         int offset;
1823         int i = 0;
1824
1825         LASSERT (pages > 0);
1826         offset = pg[i]->off & ~CFS_PAGE_MASK;
1827
1828         for (;;) {
1829                 pages--;
1830                 if (pages == 0)         /* that's all */
1831                         return count;
1832
1833                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1834                         return count;   /* doesn't end on page boundary */
1835
1836                 i++;
1837                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1838                 if (offset != 0)        /* doesn't start on page boundary */
1839                         return count;
1840
1841                 count++;
1842         }
1843 }
1844
1845 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1846 {
1847         struct brw_page **ppga;
1848         int i;
1849
1850         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1851         if (ppga == NULL)
1852                 return NULL;
1853
1854         for (i = 0; i < count; i++)
1855                 ppga[i] = pga + i;
1856         return ppga;
1857 }
1858
1859 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1860 {
1861         LASSERT(ppga != NULL);
1862         OBD_FREE(ppga, sizeof(*ppga) * count);
1863 }
1864
1865 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1866                    obd_count page_count, struct brw_page *pga,
1867                    struct obd_trans_info *oti)
1868 {
1869         struct obdo *saved_oa = NULL;
1870         struct brw_page **ppga, **orig;
1871         struct obd_import *imp = class_exp2cliimp(exp);
1872         struct client_obd *cli;
1873         int rc, page_count_orig;
1874         ENTRY;
1875
1876         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1877         cli = &imp->imp_obd->u.cli;
1878
1879         if (cmd & OBD_BRW_CHECK) {
1880                 /* The caller just wants to know if there's a chance that this
1881                  * I/O can succeed */
1882
1883                 if (imp->imp_invalid)
1884                         RETURN(-EIO);
1885                 RETURN(0);
1886         }
1887
1888         /* test_brw with a failed create can trip this, maybe others. */
1889         LASSERT(cli->cl_max_pages_per_rpc);
1890
1891         rc = 0;
1892
1893         orig = ppga = osc_build_ppga(pga, page_count);
1894         if (ppga == NULL)
1895                 RETURN(-ENOMEM);
1896         page_count_orig = page_count;
1897
1898         sort_brw_pages(ppga, page_count);
1899         while (page_count) {
1900                 obd_count pages_per_brw;
1901
1902                 if (page_count > cli->cl_max_pages_per_rpc)
1903                         pages_per_brw = cli->cl_max_pages_per_rpc;
1904                 else
1905                         pages_per_brw = page_count;
1906
1907                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1908
1909                 if (saved_oa != NULL) {
1910                         /* restore previously saved oa */
1911                         *oinfo->oi_oa = *saved_oa;
1912                 } else if (page_count > pages_per_brw) {
1913                         /* save a copy of oa (brw will clobber it) */
1914                         OBDO_ALLOC(saved_oa);
1915                         if (saved_oa == NULL)
1916                                 GOTO(out, rc = -ENOMEM);
1917                         *saved_oa = *oinfo->oi_oa;
1918                 }
1919
1920                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1921                                       pages_per_brw, ppga, oinfo->oi_capa);
1922
1923                 if (rc != 0)
1924                         break;
1925
1926                 page_count -= pages_per_brw;
1927                 ppga += pages_per_brw;
1928         }
1929
1930 out:
1931         osc_release_ppga(orig, page_count_orig);
1932
1933         if (saved_oa != NULL)
1934                 OBDO_FREE(saved_oa);
1935
1936         RETURN(rc);
1937 }
1938
1939 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1940  * the dirty accounting.  Writeback completes or truncate happens before
1941  * writing starts.  Must be called with the loi lock held. */
1942 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1943                            int sent)
1944 {
1945         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1946 }
1947
1948 static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
1949 {
1950         struct osc_async_page *oap;
1951         ENTRY;
1952
1953         if (cfs_list_empty(&lop->lop_urgent))
1954                 RETURN(0);
1955
1956         oap = cfs_list_entry(lop->lop_urgent.next,
1957                              struct osc_async_page, oap_urgent_item);
1958
1959         if (oap->oap_async_flags & ASYNC_SYNCFS) {
1960                 CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
1961                 RETURN(1);
1962         }
1963
1964         RETURN(0);
1965 }
1966
1967 /* This maintains the lists of pending pages to read/write for a given object
1968  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1969  * to quickly find objects that are ready to send an RPC. */
1970 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1971                          int cmd)
1972 {
1973         int optimal;
1974         ENTRY;
1975
1976         if (lop->lop_num_pending == 0)
1977                 RETURN(0);
1978
1979         /* if we have an invalid import we want to drain the queued pages
1980          * by forcing them through rpcs that immediately fail and complete
1981          * the pages.  recovery relies on this to empty the queued pages
1982          * before canceling the locks and evicting down the llite pages */
1983         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1984                 RETURN(1);
1985
1986         /* stream rpcs in queue order as long as as there is an urgent page
1987          * queued.  this is our cheap solution for good batching in the case
1988          * where writepage marks some random page in the middle of the file
1989          * as urgent because of, say, memory pressure */
1990         if (!cfs_list_empty(&lop->lop_urgent)) {
1991                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1992                 RETURN(1);
1993         }
1994         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1995         optimal = cli->cl_max_pages_per_rpc;
1996         if (cmd & OBD_BRW_WRITE) {
1997                 /* trigger a write rpc stream as long as there are dirtiers
1998                  * waiting for space.  as they're waiting, they're not going to
1999                  * create more pages to coalesce with what's waiting.. */
2000                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2001                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2002                         RETURN(1);
2003                 }
2004                 /* +16 to avoid triggering rpcs that would want to include pages
2005                  * that are being queued but which can't be made ready until
2006                  * the queuer finishes with the page. this is a wart for
2007                  * llite::commit_write() */
2008                 optimal += 16;
2009         }
2010         if (lop->lop_num_pending >= optimal)
2011                 RETURN(1);
2012
2013         RETURN(0);
2014 }
2015
2016 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2017 {
2018         struct osc_async_page *oap;
2019         ENTRY;
2020
2021         if (cfs_list_empty(&lop->lop_urgent))
2022                 RETURN(0);
2023
2024         oap = cfs_list_entry(lop->lop_urgent.next,
2025                          struct osc_async_page, oap_urgent_item);
2026
2027         if (oap->oap_async_flags & ASYNC_HP) {
2028                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2029                 RETURN(1);
2030         }
2031
2032         RETURN(0);
2033 }
2034
2035 static void on_list(cfs_list_t *item, cfs_list_t *list,
2036                     int should_be_on)
2037 {
2038         if (cfs_list_empty(item) && should_be_on)
2039                 cfs_list_add_tail(item, list);
2040         else if (!cfs_list_empty(item) && !should_be_on)
2041                 cfs_list_del_init(item);
2042 }
2043
2044 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2045  * can find pages to build into rpcs quickly */
2046 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2047 {
2048         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2049             lop_makes_hprpc(&loi->loi_read_lop)) {
2050                 /* HP rpc */
2051                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2052                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2053         } else {
2054                 if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
2055                         on_list(&loi->loi_sync_fs_item,
2056                                 &cli->cl_loi_sync_fs_list,
2057                                 loi->loi_write_lop.lop_num_pending);
2058                 } else {
2059                         on_list(&loi->loi_hp_ready_item,
2060                                 &cli->cl_loi_hp_ready_list, 0);
2061                         on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2062                                 lop_makes_rpc(cli, &loi->loi_write_lop,
2063                                               OBD_BRW_WRITE)||
2064                                 lop_makes_rpc(cli, &loi->loi_read_lop,
2065                                               OBD_BRW_READ));
2066                 }
2067         }
2068
2069         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2070                 loi->loi_write_lop.lop_num_pending);
2071
2072         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2073                 loi->loi_read_lop.lop_num_pending);
2074 }
2075
2076 static void lop_update_pending(struct client_obd *cli,
2077                                struct loi_oap_pages *lop, int cmd, int delta)
2078 {
2079         lop->lop_num_pending += delta;
2080         if (cmd & OBD_BRW_WRITE)
2081                 cli->cl_pending_w_pages += delta;
2082         else
2083                 cli->cl_pending_r_pages += delta;
2084 }
2085
2086 /**
2087  * this is called when a sync waiter receives an interruption.  Its job is to
2088  * get the caller woken as soon as possible.  If its page hasn't been put in an
2089  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2090  * desiring interruption which will forcefully complete the rpc once the rpc
2091  * has timed out.
2092  */
2093 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2094 {
2095         struct loi_oap_pages *lop;
2096         struct lov_oinfo *loi;
2097         int rc = -EBUSY;
2098         ENTRY;
2099
2100         LASSERT(!oap->oap_interrupted);
2101         oap->oap_interrupted = 1;
2102
2103         /* ok, it's been put in an rpc. only one oap gets a request reference */
2104         if (oap->oap_request != NULL) {
2105                 ptlrpc_mark_interrupted(oap->oap_request);
2106                 ptlrpcd_wake(oap->oap_request);
2107                 ptlrpc_req_finished(oap->oap_request);
2108                 oap->oap_request = NULL;
2109         }
2110
2111         /*
2112          * page completion may be called only if ->cpo_prep() method was
2113          * executed by osc_io_submit(), that also adds page the to pending list
2114          */
2115         if (!cfs_list_empty(&oap->oap_pending_item)) {
2116                 cfs_list_del_init(&oap->oap_pending_item);
2117                 cfs_list_del_init(&oap->oap_urgent_item);
2118
2119                 loi = oap->oap_loi;
2120                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2121                         &loi->loi_write_lop : &loi->loi_read_lop;
2122                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2123                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2124                 rc = oap->oap_caller_ops->ap_completion(env,
2125                                           oap->oap_caller_data,
2126                                           oap->oap_cmd, NULL, -EINTR);
2127         }
2128
2129         RETURN(rc);
2130 }
2131
2132 /* this is trying to propogate async writeback errors back up to the
2133  * application.  As an async write fails we record the error code for later if
2134  * the app does an fsync.  As long as errors persist we force future rpcs to be
2135  * sync so that the app can get a sync error and break the cycle of queueing
2136  * pages for which writeback will fail. */
2137 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2138                            int rc)
2139 {
2140         if (rc) {
2141                 if (!ar->ar_rc)
2142                         ar->ar_rc = rc;
2143
2144                 ar->ar_force_sync = 1;
2145                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2146                 return;
2147
2148         }
2149
2150         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2151                 ar->ar_force_sync = 0;
2152 }
2153
2154 void osc_oap_to_pending(struct osc_async_page *oap)
2155 {
2156         struct loi_oap_pages *lop;
2157
2158         if (oap->oap_cmd & OBD_BRW_WRITE)
2159                 lop = &oap->oap_loi->loi_write_lop;
2160         else
2161                 lop = &oap->oap_loi->loi_read_lop;
2162
2163         if (oap->oap_async_flags & ASYNC_HP)
2164                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2165         else if (oap->oap_async_flags & ASYNC_URGENT)
2166                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2167         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2168         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2169 }
2170
2171 /* this must be called holding the loi list lock to give coverage to exit_cache,
2172  * async_flag maintenance, and oap_request */
2173 static void osc_ap_completion(const struct lu_env *env,
2174                               struct client_obd *cli, struct obdo *oa,
2175                               struct osc_async_page *oap, int sent, int rc)
2176 {
2177         __u64 xid = 0;
2178
2179         ENTRY;
2180         if (oap->oap_request != NULL) {
2181                 xid = ptlrpc_req_xid(oap->oap_request);
2182                 ptlrpc_req_finished(oap->oap_request);
2183                 oap->oap_request = NULL;
2184         }
2185
2186         cfs_spin_lock(&oap->oap_lock);
2187         oap->oap_async_flags = 0;
2188         cfs_spin_unlock(&oap->oap_lock);
2189         oap->oap_interrupted = 0;
2190
2191         if (oap->oap_cmd & OBD_BRW_WRITE) {
2192                 osc_process_ar(&cli->cl_ar, xid, rc);
2193                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2194         }
2195
2196         if (rc == 0 && oa != NULL) {
2197                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2198                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2199                 if (oa->o_valid & OBD_MD_FLMTIME)
2200                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2201                 if (oa->o_valid & OBD_MD_FLATIME)
2202                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2203                 if (oa->o_valid & OBD_MD_FLCTIME)
2204                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2205         }
2206
2207         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2208                                                 oap->oap_cmd, oa, rc);
2209
2210         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2211          * I/O on the page could start, but OSC calls it under lock
2212          * and thus we can add oap back to pending safely */
2213         if (rc)
2214                 /* upper layer wants to leave the page on pending queue */
2215                 osc_oap_to_pending(oap);
2216         else
2217                 osc_exit_cache(cli, oap, sent);
2218         EXIT;
2219 }
2220
2221 static int brw_interpret(const struct lu_env *env,
2222                          struct ptlrpc_request *req, void *data, int rc)
2223 {
2224         struct osc_brw_async_args *aa = data;
2225         struct client_obd *cli;
2226         int async;
2227         ENTRY;
2228
2229         rc = osc_brw_fini_request(req, rc);
2230         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2231         if (osc_recoverable_error(rc)) {
2232                 /* Only retry once for mmaped files since the mmaped page
2233                  * might be modified at anytime. We have to retry at least
2234                  * once in case there WAS really a corruption of the page
2235                  * on the network, that was not caused by mmap() modifying
2236                  * the page. Bug11742 */
2237                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2238                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2239                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2240                         rc = 0;
2241                 } else {
2242                         rc = osc_brw_redo_request(req, aa);
2243                         if (rc == 0)
2244                                 RETURN(0);
2245                 }
2246         }
2247
2248         if (aa->aa_ocapa) {
2249                 capa_put(aa->aa_ocapa);
2250                 aa->aa_ocapa = NULL;
2251         }
2252
2253         cli = aa->aa_cli;
2254
2255         client_obd_list_lock(&cli->cl_loi_list_lock);
2256
2257         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2258          * is called so we know whether to go to sync BRWs or wait for more
2259          * RPCs to complete */
2260         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2261                 cli->cl_w_in_flight--;
2262         else
2263                 cli->cl_r_in_flight--;
2264
2265         async = cfs_list_empty(&aa->aa_oaps);
2266         if (!async) { /* from osc_send_oap_rpc() */
2267                 struct osc_async_page *oap, *tmp;
2268                 /* the caller may re-use the oap after the completion call so
2269                  * we need to clean it up a little */
2270                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2271                                              oap_rpc_item) {
2272                         cfs_list_del_init(&oap->oap_rpc_item);
2273                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2274                 }
2275                 OBDO_FREE(aa->aa_oa);
2276         } else { /* from async_internal() */
2277                 obd_count i;
2278                 for (i = 0; i < aa->aa_page_count; i++)
2279                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2280         }
2281         osc_wake_cache_waiters(cli);
2282         osc_wake_sync_fs(cli);
2283         osc_check_rpcs(env, cli);
2284         client_obd_list_unlock(&cli->cl_loi_list_lock);
2285         if (!async)
2286                 cl_req_completion(env, aa->aa_clerq, rc);
2287         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2288
2289         RETURN(rc);
2290 }
2291
2292 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2293                                             struct client_obd *cli,
2294                                             cfs_list_t *rpc_list,
2295                                             int page_count, int cmd)
2296 {
2297         struct ptlrpc_request *req;
2298         struct brw_page **pga = NULL;
2299         struct osc_brw_async_args *aa;
2300         struct obdo *oa = NULL;
2301         const struct obd_async_page_ops *ops = NULL;
2302         void *caller_data = NULL;
2303         struct osc_async_page *oap;
2304         struct osc_async_page *tmp;
2305         struct ost_body *body;
2306         struct cl_req *clerq = NULL;
2307         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2308         struct ldlm_lock *lock = NULL;
2309         struct cl_req_attr crattr;
2310         int i, rc, mpflag = 0;
2311
2312         ENTRY;
2313         LASSERT(!cfs_list_empty(rpc_list));
2314
2315         if (cmd & OBD_BRW_MEMALLOC)
2316                 mpflag = cfs_memory_pressure_get_and_set();
2317
2318         memset(&crattr, 0, sizeof crattr);
2319         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2320         if (pga == NULL)
2321                 GOTO(out, req = ERR_PTR(-ENOMEM));
2322
2323         OBDO_ALLOC(oa);
2324         if (oa == NULL)
2325                 GOTO(out, req = ERR_PTR(-ENOMEM));
2326
2327         i = 0;
2328         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2329                 struct cl_page *page = osc_oap2cl_page(oap);
2330                 if (ops == NULL) {
2331                         ops = oap->oap_caller_ops;
2332                         caller_data = oap->oap_caller_data;
2333
2334                         clerq = cl_req_alloc(env, page, crt,
2335                                              1 /* only 1-object rpcs for
2336                                                 * now */);
2337                         if (IS_ERR(clerq))
2338                                 GOTO(out, req = (void *)clerq);
2339                         lock = oap->oap_ldlm_lock;
2340                 }
2341                 pga[i] = &oap->oap_brw_page;
2342                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2343                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2344                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2345                 i++;
2346                 cl_req_page_add(env, clerq, page);
2347         }
2348
2349         /* always get the data for the obdo for the rpc */
2350         LASSERT(ops != NULL);
2351         crattr.cra_oa = oa;
2352         crattr.cra_capa = NULL;
2353         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2354         if (lock) {
2355                 oa->o_handle = lock->l_remote_handle;
2356                 oa->o_valid |= OBD_MD_FLHANDLE;
2357         }
2358
2359         rc = cl_req_prep(env, clerq);
2360         if (rc != 0) {
2361                 CERROR("cl_req_prep failed: %d\n", rc);
2362                 GOTO(out, req = ERR_PTR(rc));
2363         }
2364
2365         sort_brw_pages(pga, page_count);
2366         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2367                                   pga, &req, crattr.cra_capa, 1);
2368         if (rc != 0) {
2369                 CERROR("prep_req failed: %d\n", rc);
2370                 GOTO(out, req = ERR_PTR(rc));
2371         }
2372
2373         if (cmd & OBD_BRW_MEMALLOC)
2374                 req->rq_memalloc = 1;
2375
2376         /* Need to update the timestamps after the request is built in case
2377          * we race with setattr (locally or in queue at OST).  If OST gets
2378          * later setattr before earlier BRW (as determined by the request xid),
2379          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2380          * way to do this in a single call.  bug 10150 */
2381         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2382         cl_req_attr_set(env, clerq, &crattr,
2383                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2384
2385         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2386         aa = ptlrpc_req_async_args(req);
2387         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2388         cfs_list_splice(rpc_list, &aa->aa_oaps);
2389         CFS_INIT_LIST_HEAD(rpc_list);
2390         aa->aa_clerq = clerq;
2391 out:
2392         if (cmd & OBD_BRW_MEMALLOC)
2393                 cfs_memory_pressure_restore(mpflag);
2394
2395         capa_put(crattr.cra_capa);
2396         if (IS_ERR(req)) {
2397                 if (oa)
2398                         OBDO_FREE(oa);
2399                 if (pga)
2400                         OBD_FREE(pga, sizeof(*pga) * page_count);
2401                 /* this should happen rarely and is pretty bad, it makes the
2402                  * pending list not follow the dirty order */
2403                 client_obd_list_lock(&cli->cl_loi_list_lock);
2404                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2405                         cfs_list_del_init(&oap->oap_rpc_item);
2406
2407                         /* queued sync pages can be torn down while the pages
2408                          * were between the pending list and the rpc */
2409                         if (oap->oap_interrupted) {
2410                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2411                                 osc_ap_completion(env, cli, NULL, oap, 0,
2412                                                   oap->oap_count);
2413                                 continue;
2414                         }
2415                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2416                 }
2417                 if (clerq && !IS_ERR(clerq))
2418                         cl_req_completion(env, clerq, PTR_ERR(req));
2419         }
2420         RETURN(req);
2421 }
2422
2423 /**
2424  * prepare pages for ASYNC io and put pages in send queue.
2425  *
2426  * \param cmd OBD_BRW_* macroses
2427  * \param lop pending pages
2428  *
2429  * \return zero if no page added to send queue.
2430  * \return 1 if pages successfully added to send queue.
2431  * \return negative on errors.
2432  */
2433 static int
2434 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2435                  struct lov_oinfo *loi,
2436                  int cmd, struct loi_oap_pages *lop)
2437 {
2438         struct ptlrpc_request *req;
2439         obd_count page_count = 0;
2440         struct osc_async_page *oap = NULL, *tmp;
2441         struct osc_brw_async_args *aa;
2442         const struct obd_async_page_ops *ops;
2443         CFS_LIST_HEAD(rpc_list);
2444         CFS_LIST_HEAD(tmp_list);
2445         unsigned int ending_offset;
2446         unsigned  starting_offset = 0;
2447         int srvlock = 0, mem_tight = 0;
2448         struct cl_object *clob = NULL;
2449         ENTRY;
2450
2451         /* ASYNC_HP pages first. At present, when the lock the pages is
2452          * to be canceled, the pages covered by the lock will be sent out
2453          * with ASYNC_HP. We have to send out them as soon as possible. */
2454         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2455                 if (oap->oap_async_flags & ASYNC_HP)
2456                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2457                 else
2458                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2459                 if (++page_count >= cli->cl_max_pages_per_rpc)
2460                         break;
2461         }
2462
2463         cfs_list_splice(&tmp_list, &lop->lop_pending);
2464         page_count = 0;
2465
2466         /* first we find the pages we're allowed to work with */
2467         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2468                                      oap_pending_item) {
2469                 ops = oap->oap_caller_ops;
2470
2471                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2472                          "magic 0x%x\n", oap, oap->oap_magic);
2473
2474                 if (clob == NULL) {
2475                         /* pin object in memory, so that completion call-backs
2476                          * can be safely called under client_obd_list lock. */
2477                         clob = osc_oap2cl_page(oap)->cp_obj;
2478                         cl_object_get(clob);
2479                 }
2480
2481                 if (page_count != 0 &&
2482                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2483                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2484                                " oap %p, page %p, srvlock %u\n",
2485                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2486                         break;
2487                 }
2488
2489                 /* If there is a gap at the start of this page, it can't merge
2490                  * with any previous page, so we'll hand the network a
2491                  * "fragmented" page array that it can't transfer in 1 RDMA */
2492                 if (page_count != 0 && oap->oap_page_off != 0)
2493                         break;
2494
2495                 /* in llite being 'ready' equates to the page being locked
2496                  * until completion unlocks it.  commit_write submits a page
2497                  * as not ready because its unlock will happen unconditionally
2498                  * as the call returns.  if we race with commit_write giving
2499                  * us that page we don't want to create a hole in the page
2500                  * stream, so we stop and leave the rpc to be fired by
2501                  * another dirtier or kupdated interval (the not ready page
2502                  * will still be on the dirty list).  we could call in
2503                  * at the end of ll_file_write to process the queue again. */
2504                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2505                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2506                                                     cmd);
2507                         if (rc < 0)
2508                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2509                                                 "instead of ready\n", oap,
2510                                                 oap->oap_page, rc);
2511                         switch (rc) {
2512                         case -EAGAIN:
2513                                 /* llite is telling us that the page is still
2514                                  * in commit_write and that we should try
2515                                  * and put it in an rpc again later.  we
2516                                  * break out of the loop so we don't create
2517                                  * a hole in the sequence of pages in the rpc
2518                                  * stream.*/
2519                                 oap = NULL;
2520                                 break;
2521                         case -EINTR:
2522                                 /* the io isn't needed.. tell the checks
2523                                  * below to complete the rpc with EINTR */
2524                                 cfs_spin_lock(&oap->oap_lock);
2525                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2526                                 cfs_spin_unlock(&oap->oap_lock);
2527                                 oap->oap_count = -EINTR;
2528                                 break;
2529                         case 0:
2530                                 cfs_spin_lock(&oap->oap_lock);
2531                                 oap->oap_async_flags |= ASYNC_READY;
2532                                 cfs_spin_unlock(&oap->oap_lock);
2533                                 break;
2534                         default:
2535                                 LASSERTF(0, "oap %p page %p returned %d "
2536                                             "from make_ready\n", oap,
2537                                             oap->oap_page, rc);
2538                                 break;
2539                         }
2540                 }
2541                 if (oap == NULL)
2542                         break;
2543                 /*
2544                  * Page submitted for IO has to be locked. Either by
2545                  * ->ap_make_ready() or by higher layers.
2546                  */
2547 #if defined(__KERNEL__) && defined(__linux__)
2548                 {
2549                         struct cl_page *page;
2550
2551                         page = osc_oap2cl_page(oap);
2552
2553                         if (page->cp_type == CPT_CACHEABLE &&
2554                             !(PageLocked(oap->oap_page) &&
2555                               (CheckWriteback(oap->oap_page, cmd)))) {
2556                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2557                                        oap->oap_page,
2558                                        (long)oap->oap_page->flags,
2559                                        oap->oap_async_flags);
2560                                 LBUG();
2561                         }
2562                 }
2563 #endif
2564
2565                 /* take the page out of our book-keeping */
2566                 cfs_list_del_init(&oap->oap_pending_item);
2567                 lop_update_pending(cli, lop, cmd, -1);
2568                 cfs_list_del_init(&oap->oap_urgent_item);
2569
2570                 if (page_count == 0)
2571                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2572                                           (PTLRPC_MAX_BRW_SIZE - 1);
2573
2574                 /* ask the caller for the size of the io as the rpc leaves. */
2575                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2576                         oap->oap_count =
2577                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2578                                                       cmd);
2579                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2580                 }
2581                 if (oap->oap_count <= 0) {
2582                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2583                                oap->oap_count);
2584                         osc_ap_completion(env, cli, NULL,
2585                                           oap, 0, oap->oap_count);
2586                         continue;
2587                 }
2588
2589                 /* now put the page back in our accounting */
2590                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2591                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2592                         mem_tight = 1;
2593                 if (page_count == 0)
2594                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2595                 if (++page_count >= cli->cl_max_pages_per_rpc)
2596                         break;
2597
2598                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2599                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2600                  * have the same alignment as the initial writes that allocated
2601                  * extents on the server. */
2602                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2603                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2604                 if (ending_offset == 0)
2605                         break;
2606
2607                 /* If there is a gap at the end of this page, it can't merge
2608                  * with any subsequent pages, so we'll hand the network a
2609                  * "fragmented" page array that it can't transfer in 1 RDMA */
2610                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2611                         break;
2612         }
2613
2614         osc_wake_cache_waiters(cli);
2615         osc_wake_sync_fs(cli);
2616         loi_list_maint(cli, loi);
2617
2618         client_obd_list_unlock(&cli->cl_loi_list_lock);
2619
2620         if (clob != NULL)
2621                 cl_object_put(env, clob);
2622
2623         if (page_count == 0) {
2624                 client_obd_list_lock(&cli->cl_loi_list_lock);
2625                 RETURN(0);
2626         }
2627
2628         req = osc_build_req(env, cli, &rpc_list, page_count,
2629                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2630         if (IS_ERR(req)) {
2631                 LASSERT(cfs_list_empty(&rpc_list));
2632                 loi_list_maint(cli, loi);
2633                 RETURN(PTR_ERR(req));
2634         }
2635
2636         aa = ptlrpc_req_async_args(req);
2637
2638         if (cmd == OBD_BRW_READ) {
2639                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2640                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2641                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2642                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2643         } else {
2644                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2645                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2646                                  cli->cl_w_in_flight);
2647                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2648                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2649         }
2650         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2651
2652         client_obd_list_lock(&cli->cl_loi_list_lock);
2653
2654         if (cmd == OBD_BRW_READ)
2655                 cli->cl_r_in_flight++;
2656         else
2657                 cli->cl_w_in_flight++;
2658
2659         /* queued sync pages can be torn down while the pages
2660          * were between the pending list and the rpc */
2661         tmp = NULL;
2662         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2663                 /* only one oap gets a request reference */
2664                 if (tmp == NULL)
2665                         tmp = oap;
2666                 if (oap->oap_interrupted && !req->rq_intr) {
2667                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2668                                oap, req);
2669                         ptlrpc_mark_interrupted(req);
2670                 }
2671         }
2672         if (tmp != NULL)
2673                 tmp->oap_request = ptlrpc_request_addref(req);
2674
2675         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2676                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2677
2678         req->rq_interpret_reply = brw_interpret;
2679         ptlrpcd_add_req(req, PSCOPE_BRW);
2680         RETURN(1);
2681 }
2682
2683 #define LOI_DEBUG(LOI, STR, args...)                                     \
2684         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2685                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2686                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2687                (LOI)->loi_write_lop.lop_num_pending,                     \
2688                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2689                (LOI)->loi_read_lop.lop_num_pending,                      \
2690                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2691                args)                                                     \
2692
2693 /* This is called by osc_check_rpcs() to find which objects have pages that
2694  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2695 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2696 {
2697         ENTRY;
2698
2699         /* First return objects that have blocked locks so that they
2700          * will be flushed quickly and other clients can get the lock,
2701          * then objects which have pages ready to be stuffed into RPCs */
2702         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2703                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2704                                       struct lov_oinfo, loi_hp_ready_item));
2705         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2706                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2707                                       struct lov_oinfo, loi_ready_item));
2708         if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
2709                 RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
2710                                       struct lov_oinfo, loi_sync_fs_item));
2711
2712         /* then if we have cache waiters, return all objects with queued
2713          * writes.  This is especially important when many small files
2714          * have filled up the cache and not been fired into rpcs because
2715          * they don't pass the nr_pending/object threshhold */
2716         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2717             !cfs_list_empty(&cli->cl_loi_write_list))
2718                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2719                                       struct lov_oinfo, loi_write_item));
2720
2721         /* then return all queued objects when we have an invalid import
2722          * so that they get flushed */
2723         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2724                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2725                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2726                                               struct lov_oinfo,
2727                                               loi_write_item));
2728                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2729                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2730                                               struct lov_oinfo, loi_read_item));
2731         }
2732         RETURN(NULL);
2733 }
2734
2735 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2736 {
2737         struct osc_async_page *oap;
2738         int hprpc = 0;
2739
2740         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2741                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2742                                      struct osc_async_page, oap_urgent_item);
2743                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2744         }
2745
2746         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2747                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2748                                      struct osc_async_page, oap_urgent_item);
2749                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2750         }
2751
2752         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2753 }
2754
2755 /* called with the loi list lock held */
2756 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2757 {
2758         struct lov_oinfo *loi;
2759         int rc = 0, race_counter = 0;
2760         ENTRY;
2761
2762         while ((loi = osc_next_loi(cli)) != NULL) {
2763                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2764
2765                 if (osc_max_rpc_in_flight(cli, loi))
2766                         break;
2767
2768                 /* attempt some read/write balancing by alternating between
2769                  * reads and writes in an object.  The makes_rpc checks here
2770                  * would be redundant if we were getting read/write work items
2771                  * instead of objects.  we don't want send_oap_rpc to drain a
2772                  * partial read pending queue when we're given this object to
2773                  * do io on writes while there are cache waiters */
2774                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2775                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2776                                               &loi->loi_write_lop);
2777                         if (rc < 0) {
2778                                 CERROR("Write request failed with %d\n", rc);
2779
2780                                 /* osc_send_oap_rpc failed, mostly because of
2781                                  * memory pressure.
2782                                  *
2783                                  * It can't break here, because if:
2784                                  *  - a page was submitted by osc_io_submit, so
2785                                  *    page locked;
2786                                  *  - no request in flight
2787                                  *  - no subsequent request
2788                                  * The system will be in live-lock state,
2789                                  * because there is no chance to call
2790                                  * osc_io_unplug() and osc_check_rpcs() any
2791                                  * more. pdflush can't help in this case,
2792                                  * because it might be blocked at grabbing
2793                                  * the page lock as we mentioned.
2794                                  *
2795                                  * Anyway, continue to drain pages. */
2796                                 /* break; */
2797                         }
2798
2799                         if (rc > 0)
2800                                 race_counter = 0;
2801                         else
2802                                 race_counter++;
2803                 }
2804                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2805                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2806                                               &loi->loi_read_lop);
2807                         if (rc < 0)
2808                                 CERROR("Read request failed with %d\n", rc);
2809
2810                         if (rc > 0)
2811                                 race_counter = 0;
2812                         else
2813                                 race_counter++;
2814                 }
2815
2816                 /* attempt some inter-object balancing by issuing rpcs
2817                  * for each object in turn */
2818                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2819                         cfs_list_del_init(&loi->loi_hp_ready_item);
2820                 if (!cfs_list_empty(&loi->loi_ready_item))
2821                         cfs_list_del_init(&loi->loi_ready_item);
2822                 if (!cfs_list_empty(&loi->loi_write_item))
2823                         cfs_list_del_init(&loi->loi_write_item);
2824                 if (!cfs_list_empty(&loi->loi_read_item))
2825                         cfs_list_del_init(&loi->loi_read_item);
2826                 if (!cfs_list_empty(&loi->loi_sync_fs_item))
2827                         cfs_list_del_init(&loi->loi_sync_fs_item);
2828
2829                 loi_list_maint(cli, loi);
2830
2831                 /* send_oap_rpc fails with 0 when make_ready tells it to
2832                  * back off.  llite's make_ready does this when it tries
2833                  * to lock a page queued for write that is already locked.
2834                  * we want to try sending rpcs from many objects, but we
2835                  * don't want to spin failing with 0.  */
2836                 if (race_counter == 10)
2837                         break;
2838         }
2839         EXIT;
2840 }
2841
2842 /* we're trying to queue a page in the osc so we're subject to the
2843  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2844  * If the osc's queued pages are already at that limit, then we want to sleep
2845  * until there is space in the osc's queue for us.  We also may be waiting for
2846  * write credits from the OST if there are RPCs in flight that may return some
2847  * before we fall back to sync writes.
2848  *
2849  * We need this know our allocation was granted in the presence of signals */
2850 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2851 {
2852         int rc;
2853         ENTRY;
2854         client_obd_list_lock(&cli->cl_loi_list_lock);
2855         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2856         client_obd_list_unlock(&cli->cl_loi_list_lock);
2857         RETURN(rc);
2858 };
2859
2860 /**
2861  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2862  * is available.
2863  */
2864 int osc_enter_cache_try(const struct lu_env *env,
2865                         struct client_obd *cli, struct lov_oinfo *loi,
2866                         struct osc_async_page *oap, int transient)
2867 {
2868         int has_grant;
2869
2870         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2871         if (has_grant) {
2872                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2873                 if (transient) {
2874                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2875                         cfs_atomic_inc(&obd_dirty_transit_pages);
2876                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2877                 }
2878         }
2879         return has_grant;
2880 }
2881
2882 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2883  * grant or cache space. */
2884 static int osc_enter_cache(const struct lu_env *env,
2885                            struct client_obd *cli, struct lov_oinfo *loi,
2886                            struct osc_async_page *oap)
2887 {
2888         struct osc_cache_waiter ocw;
2889         struct l_wait_info lwi = { 0 };
2890
2891         ENTRY;
2892
2893         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2894                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2895                cli->cl_dirty_max, obd_max_dirty_pages,
2896                cli->cl_lost_grant, cli->cl_avail_grant);
2897
2898         /* force the caller to try sync io.  this can jump the list
2899          * of queued writes and create a discontiguous rpc stream */
2900         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2901             loi->loi_ar.ar_force_sync)
2902                 RETURN(-EDQUOT);
2903
2904         /* Hopefully normal case - cache space and write credits available */
2905         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2906             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2907             osc_enter_cache_try(env, cli, loi, oap, 0))
2908                 RETURN(0);
2909
2910         /* It is safe to block as a cache waiter as long as there is grant
2911          * space available or the hope of additional grant being returned
2912          * when an in flight write completes.  Using the write back cache
2913          * if possible is preferable to sending the data synchronously
2914          * because write pages can then be merged in to large requests.
2915          * The addition of this cache waiter will causing pending write
2916          * pages to be sent immediately. */
2917         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2918                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2919                 cfs_waitq_init(&ocw.ocw_waitq);
2920                 ocw.ocw_oap = oap;
2921                 ocw.ocw_rc = 0;
2922
2923                 loi_list_maint(cli, loi);
2924                 osc_check_rpcs(env, cli);
2925                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2926
2927                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2928                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2929
2930                 client_obd_list_lock(&cli->cl_loi_list_lock);
2931                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2932                         cfs_list_del(&ocw.ocw_entry);
2933                         RETURN(-EINTR);
2934                 }
2935                 RETURN(ocw.ocw_rc);
2936         }
2937
2938         RETURN(-EDQUOT);
2939 }
2940
2941
2942 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2943                         struct lov_oinfo *loi, cfs_page_t *page,
2944                         obd_off offset, const struct obd_async_page_ops *ops,
2945                         void *data, void **res, int nocache,
2946                         struct lustre_handle *lockh)
2947 {
2948         struct osc_async_page *oap;
2949
2950         ENTRY;
2951
2952         if (!page)
2953                 return cfs_size_round(sizeof(*oap));
2954
2955         oap = *res;
2956         oap->oap_magic = OAP_MAGIC;
2957         oap->oap_cli = &exp->exp_obd->u.cli;
2958         oap->oap_loi = loi;
2959
2960         oap->oap_caller_ops = ops;
2961         oap->oap_caller_data = data;
2962
2963         oap->oap_page = page;
2964         oap->oap_obj_off = offset;
2965         if (!client_is_remote(exp) &&
2966             cfs_capable(CFS_CAP_SYS_RESOURCE))
2967                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2968
2969         LASSERT(!(offset & ~CFS_PAGE_MASK));
2970
2971         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2972         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2973         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2974         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2975
2976         cfs_spin_lock_init(&oap->oap_lock);
2977         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2978         RETURN(0);
2979 }
2980
2981 struct osc_async_page *oap_from_cookie(void *cookie)
2982 {
2983         struct osc_async_page *oap = cookie;
2984         if (oap->oap_magic != OAP_MAGIC)
2985                 return ERR_PTR(-EINVAL);
2986         return oap;
2987 };
2988
2989 int osc_queue_async_io(const struct lu_env *env,
2990                        struct obd_export *exp, struct lov_stripe_md *lsm,
2991                        struct lov_oinfo *loi, void *cookie,
2992                        int cmd, obd_off off, int count,
2993                        obd_flag brw_flags, enum async_flags async_flags)
2994 {
2995         struct client_obd *cli = &exp->exp_obd->u.cli;
2996         struct osc_async_page *oap;
2997         int rc = 0;
2998         ENTRY;
2999
3000         oap = oap_from_cookie(cookie);
3001         if (IS_ERR(oap))
3002                 RETURN(PTR_ERR(oap));
3003
3004         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3005                 RETURN(-EIO);
3006
3007         if (!cfs_list_empty(&oap->oap_pending_item) ||
3008             !cfs_list_empty(&oap->oap_urgent_item) ||
3009             !cfs_list_empty(&oap->oap_rpc_item))
3010                 RETURN(-EBUSY);
3011
3012         /* check if the file's owner/group is over quota */
3013         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3014                 struct cl_object *obj;
3015                 struct cl_attr    attr; /* XXX put attr into thread info */
3016                 unsigned int qid[MAXQUOTAS];
3017
3018                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3019
3020                 cl_object_attr_lock(obj);
3021                 rc = cl_object_attr_get(env, obj, &attr);
3022                 cl_object_attr_unlock(obj);
3023
3024                 qid[USRQUOTA] = attr.cat_uid;
3025                 qid[GRPQUOTA] = attr.cat_gid;
3026                 if (rc == 0 &&
3027                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3028                         rc = -EDQUOT;
3029                 if (rc)
3030                         RETURN(rc);
3031         }
3032
3033         if (loi == NULL)
3034                 loi = lsm->lsm_oinfo[0];
3035
3036         client_obd_list_lock(&cli->cl_loi_list_lock);
3037
3038         LASSERT(off + count <= CFS_PAGE_SIZE);
3039         oap->oap_cmd = cmd;
3040         oap->oap_page_off = off;
3041         oap->oap_count = count;
3042         oap->oap_brw_flags = brw_flags;
3043         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3044         if (cfs_memory_pressure_get())
3045                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3046         cfs_spin_lock(&oap->oap_lock);
3047         oap->oap_async_flags = async_flags;
3048         cfs_spin_unlock(&oap->oap_lock);
3049
3050         if (cmd & OBD_BRW_WRITE) {
3051                 rc = osc_enter_cache(env, cli, loi, oap);
3052                 if (rc) {
3053                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3054                         RETURN(rc);
3055                 }
3056         }
3057
3058         osc_oap_to_pending(oap);
3059         loi_list_maint(cli, loi);
3060
3061         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3062                   cmd);
3063
3064         osc_check_rpcs(env, cli);
3065         client_obd_list_unlock(&cli->cl_loi_list_lock);
3066
3067         RETURN(0);
3068 }
3069
3070 /* aka (~was & now & flag), but this is more clear :) */
3071 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3072
3073 int osc_set_async_flags_base(struct client_obd *cli,
3074                              struct lov_oinfo *loi, struct osc_async_page *oap,
3075                              obd_flag async_flags)
3076 {
3077         struct loi_oap_pages *lop;
3078         int flags = 0;
3079         ENTRY;
3080
3081         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3082
3083         if (oap->oap_cmd & OBD_BRW_WRITE) {
3084                 lop = &loi->loi_write_lop;
3085         } else {
3086                 lop = &loi->loi_read_lop;
3087         }
3088
3089         if ((oap->oap_async_flags & async_flags) == async_flags)
3090                 RETURN(0);
3091
3092         /* XXX: This introduces a tiny insignificant race for the case if this
3093          * loi already had other urgent items.
3094          */
3095         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
3096             cfs_list_empty(&oap->oap_rpc_item) &&
3097             cfs_list_empty(&oap->oap_urgent_item)) {
3098                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3099                 flags |= ASYNC_SYNCFS;
3100                 cfs_spin_lock(&oap->oap_lock);
3101                 oap->oap_async_flags |= flags;
3102                 cfs_spin_unlock(&oap->oap_lock);
3103                 loi_list_maint(cli, loi);
3104                 RETURN(0);
3105         }
3106
3107         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3108                 flags |= ASYNC_READY;
3109
3110         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3111             cfs_list_empty(&oap->oap_rpc_item)) {
3112                 if (oap->oap_async_flags & ASYNC_HP)
3113                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3114                 else
3115                         cfs_list_add_tail(&oap->oap_urgent_item,
3116                                           &lop->lop_urgent);
3117                 flags |= ASYNC_URGENT;
3118                 loi_list_maint(cli, loi);
3119         }
3120         cfs_spin_lock(&oap->oap_lock);
3121         oap->oap_async_flags |= flags;
3122         cfs_spin_unlock(&oap->oap_lock);
3123
3124         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3125                         oap->oap_async_flags);
3126         RETURN(0);
3127 }
3128
3129 int osc_teardown_async_page(struct obd_export *exp,
3130                             struct lov_stripe_md *lsm,
3131                             struct lov_oinfo *loi, void *cookie)
3132 {
3133         struct client_obd *cli = &exp->exp_obd->u.cli;
3134         struct loi_oap_pages *lop;
3135         struct osc_async_page *oap;
3136         int rc = 0;
3137         ENTRY;
3138
3139         oap = oap_from_cookie(cookie);
3140         if (IS_ERR(oap))
3141                 RETURN(PTR_ERR(oap));
3142
3143         if (loi == NULL)
3144                 loi = lsm->lsm_oinfo[0];
3145
3146         if (oap->oap_cmd & OBD_BRW_WRITE) {
3147                 lop = &loi->loi_write_lop;
3148         } else {
3149                 lop = &loi->loi_read_lop;
3150         }
3151
3152         client_obd_list_lock(&cli->cl_loi_list_lock);
3153
3154         if (!cfs_list_empty(&oap->oap_rpc_item))
3155                 GOTO(out, rc = -EBUSY);
3156
3157         osc_exit_cache(cli, oap, 0);
3158         osc_wake_cache_waiters(cli);
3159
3160         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3161                 cfs_list_del_init(&oap->oap_urgent_item);
3162                 cfs_spin_lock(&oap->oap_lock);
3163                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
3164                                           ASYNC_SYNCFS);
3165                 cfs_spin_unlock(&oap->oap_lock);
3166         }
3167         if (!cfs_list_empty(&oap->oap_pending_item)) {
3168                 cfs_list_del_init(&oap->oap_pending_item);
3169                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3170         }
3171         loi_list_maint(cli, loi);
3172         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3173 out:
3174         client_obd_list_unlock(&cli->cl_loi_list_lock);
3175         RETURN(rc);
3176 }
3177
3178 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3179                                          struct ldlm_enqueue_info *einfo,
3180                                          int flags)
3181 {
3182         void *data = einfo->ei_cbdata;
3183
3184         LASSERT(lock != NULL);
3185         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3186         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3187         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3188         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3189
3190         lock_res_and_lock(lock);
3191         cfs_spin_lock(&osc_ast_guard);
3192         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3193         lock->l_ast_data = data;
3194         cfs_spin_unlock(&osc_ast_guard);
3195         unlock_res_and_lock(lock);
3196 }
3197
3198 static void osc_set_data_with_check(struct lustre_handle *lockh,
3199                                     struct ldlm_enqueue_info *einfo,
3200                                     int flags)
3201 {
3202         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3203
3204         if (lock != NULL) {
3205                 osc_set_lock_data_with_check(lock, einfo, flags);
3206                 LDLM_LOCK_PUT(lock);
3207         } else
3208                 CERROR("lockh %p, data %p - client evicted?\n",
3209                        lockh, einfo->ei_cbdata);
3210 }
3211
3212 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3213                              ldlm_iterator_t replace, void *data)
3214 {
3215         struct ldlm_res_id res_id;
3216         struct obd_device *obd = class_exp2obd(exp);
3217
3218         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3219         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3220         return 0;
3221 }
3222
3223 /* find any ldlm lock of the inode in osc
3224  * return 0    not find
3225  *        1    find one
3226  *      < 0    error */
3227 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3228                            ldlm_iterator_t replace, void *data)
3229 {
3230         struct ldlm_res_id res_id;
3231         struct obd_device *obd = class_exp2obd(exp);
3232         int rc = 0;
3233
3234         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3235         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3236         if (rc == LDLM_ITER_STOP)
3237                 return(1);
3238         if (rc == LDLM_ITER_CONTINUE)
3239                 return(0);
3240         return(rc);
3241 }
3242
3243 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3244                             obd_enqueue_update_f upcall, void *cookie,
3245                             int *flags, int rc)
3246 {
3247         int intent = *flags & LDLM_FL_HAS_INTENT;
3248         ENTRY;
3249
3250         if (intent) {
3251                 /* The request was created before ldlm_cli_enqueue call. */
3252                 if (rc == ELDLM_LOCK_ABORTED) {
3253                         struct ldlm_reply *rep;
3254                         rep = req_capsule_server_get(&req->rq_pill,
3255                                                      &RMF_DLM_REP);
3256
3257                         LASSERT(rep != NULL);
3258                         if (rep->lock_policy_res1)
3259                                 rc = rep->lock_policy_res1;
3260                 }
3261         }
3262
3263         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3264                 *flags |= LDLM_FL_LVB_READY;
3265                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3266                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3267         }
3268
3269         /* Call the update callback. */
3270         rc = (*upcall)(cookie, rc);
3271         RETURN(rc);
3272 }
3273
3274 static int osc_enqueue_interpret(const struct lu_env *env,
3275                                  struct ptlrpc_request *req,
3276                                  struct osc_enqueue_args *aa, int rc)
3277 {
3278         struct ldlm_lock *lock;
3279         struct lustre_handle handle;
3280         __u32 mode;
3281
3282         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3283          * might be freed anytime after lock upcall has been called. */
3284         lustre_handle_copy(&handle, aa->oa_lockh);
3285         mode = aa->oa_ei->ei_mode;
3286
3287         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3288          * be valid. */
3289         lock = ldlm_handle2lock(&handle);
3290
3291         /* Take an additional reference so that a blocking AST that
3292          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3293          * to arrive after an upcall has been executed by
3294          * osc_enqueue_fini(). */
3295         ldlm_lock_addref(&handle, mode);
3296
3297         /* Let CP AST to grant the lock first. */
3298         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3299
3300         /* Complete obtaining the lock procedure. */
3301         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3302                                    mode, aa->oa_flags, aa->oa_lvb,
3303                                    sizeof(*aa->oa_lvb), &handle, rc);
3304         /* Complete osc stuff. */
3305         rc = osc_enqueue_fini(req, aa->oa_lvb,
3306                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3307
3308         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3309
3310         /* Release the lock for async request. */
3311         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3312                 /*
3313                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3314                  * not already released by
3315                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3316                  */
3317                 ldlm_lock_decref(&handle, mode);
3318
3319         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3320                  aa->oa_lockh, req, aa);
3321         ldlm_lock_decref(&handle, mode);
3322         LDLM_LOCK_PUT(lock);
3323         return rc;
3324 }
3325
3326 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3327                         struct lov_oinfo *loi, int flags,
3328                         struct ost_lvb *lvb, __u32 mode, int rc)
3329 {
3330         if (rc == ELDLM_OK) {
3331                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3332                 __u64 tmp;
3333
3334                 LASSERT(lock != NULL);
3335                 loi->loi_lvb = *lvb;
3336                 tmp = loi->loi_lvb.lvb_size;
3337                 /* Extend KMS up to the end of this lock and no further
3338                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3339                 if (tmp > lock->l_policy_data.l_extent.end)
3340                         tmp = lock->l_policy_data.l_extent.end + 1;
3341                 if (tmp >= loi->loi_kms) {
3342                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3343                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3344                         loi_kms_set(loi, tmp);
3345                 } else {
3346                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3347                                    LPU64"; leaving kms="LPU64", end="LPU64,
3348                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3349                                    lock->l_policy_data.l_extent.end);
3350                 }
3351                 ldlm_lock_allow_match(lock);
3352                 LDLM_LOCK_PUT(lock);
3353         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3354                 loi->loi_lvb = *lvb;
3355                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3356                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3357                 rc = ELDLM_OK;
3358         }
3359 }
3360 EXPORT_SYMBOL(osc_update_enqueue);
3361
3362 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3363
3364 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3365  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3366  * other synchronous requests, however keeping some locks and trying to obtain
3367  * others may take a considerable amount of time in a case of ost failure; and
3368  * when other sync requests do not get released lock from a client, the client
3369  * is excluded from the cluster -- such scenarious make the life difficult, so
3370  * release locks just after they are obtained. */
3371 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3372                      int *flags, ldlm_policy_data_t *policy,
3373                      struct ost_lvb *lvb, int kms_valid,
3374                      obd_enqueue_update_f upcall, void *cookie,
3375                      struct ldlm_enqueue_info *einfo,
3376                      struct lustre_handle *lockh,
3377                      struct ptlrpc_request_set *rqset, int async)
3378 {
3379         struct obd_device *obd = exp->exp_obd;
3380         struct ptlrpc_request *req = NULL;
3381         int intent = *flags & LDLM_FL_HAS_INTENT;
3382         ldlm_mode_t mode;
3383         int rc;
3384         ENTRY;
3385
3386         /* Filesystem lock extents are extended to page boundaries so that
3387          * dealing with the page cache is a little smoother.  */
3388         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3389         policy->l_extent.end |= ~CFS_PAGE_MASK;
3390
3391         /*
3392          * kms is not valid when either object is completely fresh (so that no
3393          * locks are cached), or object was evicted. In the latter case cached
3394          * lock cannot be used, because it would prime inode state with
3395          * potentially stale LVB.
3396          */
3397         if (!kms_valid)
3398                 goto no_match;
3399
3400         /* Next, search for already existing extent locks that will cover us */
3401         /* If we're trying to read, we also search for an existing PW lock.  The
3402          * VFS and page cache already protect us locally, so lots of readers/
3403          * writers can share a single PW lock.
3404          *
3405          * There are problems with conversion deadlocks, so instead of
3406          * converting a read lock to a write lock, we'll just enqueue a new
3407          * one.
3408          *
3409          * At some point we should cancel the read lock instead of making them
3410          * send us a blocking callback, but there are problems with canceling
3411          * locks out from other users right now, too. */
3412         mode = einfo->ei_mode;
3413         if (einfo->ei_mode == LCK_PR)
3414                 mode |= LCK_PW;
3415         mode = ldlm_lock_match(obd->obd_namespace,
3416                                *flags | LDLM_FL_LVB_READY, res_id,
3417                                einfo->ei_type, policy, mode, lockh, 0);
3418         if (mode) {
3419                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3420
3421                 if (matched->l_ast_data == NULL ||
3422                     matched->l_ast_data == einfo->ei_cbdata) {
3423                         /* addref the lock only if not async requests and PW
3424                          * lock is matched whereas we asked for PR. */
3425                         if (!rqset && einfo->ei_mode != mode)
3426                                 ldlm_lock_addref(lockh, LCK_PR);
3427                         osc_set_lock_data_with_check(matched, einfo, *flags);
3428                         if (intent) {
3429                                 /* I would like to be able to ASSERT here that
3430                                  * rss <= kms, but I can't, for reasons which
3431                                  * are explained in lov_enqueue() */
3432                         }
3433
3434                         /* We already have a lock, and it's referenced */
3435                         (*upcall)(cookie, ELDLM_OK);
3436
3437                         /* For async requests, decref the lock. */
3438                         if (einfo->ei_mode != mode)
3439                                 ldlm_lock_decref(lockh, LCK_PW);
3440                         else if (rqset)
3441                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3442                         LDLM_LOCK_PUT(matched);
3443                         RETURN(ELDLM_OK);
3444                 } else
3445                         ldlm_lock_decref(lockh, mode);
3446                 LDLM_LOCK_PUT(matched);
3447         }
3448
3449  no_match:
3450         if (intent) {
3451                 CFS_LIST_HEAD(cancels);
3452                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3453                                            &RQF_LDLM_ENQUEUE_LVB);
3454                 if (req == NULL)
3455                         RETURN(-ENOMEM);
3456
3457                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3458                 if (rc) {
3459                         ptlrpc_request_free(req);
3460                         RETURN(rc);
3461                 }
3462
3463                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3464                                      sizeof *lvb);
3465                 ptlrpc_request_set_replen(req);
3466         }
3467
3468         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3469         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3470
3471         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3472                               sizeof(*lvb), lockh, async);
3473         if (rqset) {
3474                 if (!rc) {
3475                         struct osc_enqueue_args *aa;
3476                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3477                         aa = ptlrpc_req_async_args(req);
3478                         aa->oa_ei = einfo;
3479                         aa->oa_exp = exp;
3480                         aa->oa_flags  = flags;
3481                         aa->oa_upcall = upcall;
3482                         aa->oa_cookie = cookie;
3483                         aa->oa_lvb    = lvb;
3484                         aa->oa_lockh  = lockh;
3485
3486                         req->rq_interpret_reply =
3487                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3488                         if (rqset == PTLRPCD_SET)
3489                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3490                         else
3491                                 ptlrpc_set_add_req(rqset, req);
3492                 } else if (intent) {
3493                         ptlrpc_req_finished(req);
3494                 }
3495                 RETURN(rc);
3496         }
3497
3498         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3499         if (intent)
3500                 ptlrpc_req_finished(req);
3501
3502         RETURN(rc);
3503 }
3504
3505 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3506                        struct ldlm_enqueue_info *einfo,
3507                        struct ptlrpc_request_set *rqset)
3508 {
3509         struct ldlm_res_id res_id;
3510         int rc;
3511         ENTRY;
3512
3513         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3514                            oinfo->oi_md->lsm_object_seq, &res_id);
3515
3516         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3517                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3518                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3519                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3520                               rqset, rqset != NULL);
3521         RETURN(rc);
3522 }
3523
3524 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3525                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3526                    int *flags, void *data, struct lustre_handle *lockh,
3527                    int unref)
3528 {
3529         struct obd_device *obd = exp->exp_obd;
3530         int lflags = *flags;
3531         ldlm_mode_t rc;
3532         ENTRY;
3533
3534         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3535                 RETURN(-EIO);
3536
3537         /* Filesystem lock extents are extended to page boundaries so that
3538          * dealing with the page cache is a little smoother */
3539         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3540         policy->l_extent.end |= ~CFS_PAGE_MASK;
3541
3542         /* Next, search for already existing extent locks that will cover us */
3543         /* If we're trying to read, we also search for an existing PW lock.  The
3544          * VFS and page cache already protect us locally, so lots of readers/
3545          * writers can share a single PW lock. */
3546         rc = mode;
3547         if (mode == LCK_PR)
3548                 rc |= LCK_PW;
3549         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3550                              res_id, type, policy, rc, lockh, unref);
3551         if (rc) {
3552                 if (data != NULL)
3553                         osc_set_data_with_check(lockh, data, lflags);
3554                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3555                         ldlm_lock_addref(lockh, LCK_PR);
3556                         ldlm_lock_decref(lockh, LCK_PW);
3557                 }
3558                 RETURN(rc);
3559         }
3560         RETURN(rc);
3561 }
3562
3563 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3564 {
3565         ENTRY;
3566
3567         if (unlikely(mode == LCK_GROUP))
3568                 ldlm_lock_decref_and_cancel(lockh, mode);
3569         else
3570                 ldlm_lock_decref(lockh, mode);
3571
3572         RETURN(0);
3573 }
3574
3575 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3576                       __u32 mode, struct lustre_handle *lockh)
3577 {
3578         ENTRY;
3579         RETURN(osc_cancel_base(lockh, mode));
3580 }
3581
3582 static int osc_cancel_unused(struct obd_export *exp,
3583                              struct lov_stripe_md *lsm,
3584                              ldlm_cancel_flags_t flags,
3585                              void *opaque)
3586 {
3587         struct obd_device *obd = class_exp2obd(exp);
3588         struct ldlm_res_id res_id, *resp = NULL;
3589
3590         if (lsm != NULL) {
3591                 resp = osc_build_res_name(lsm->lsm_object_id,
3592                                           lsm->lsm_object_seq, &res_id);
3593         }
3594
3595         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3596 }
3597
3598 static int osc_statfs_interpret(const struct lu_env *env,
3599                                 struct ptlrpc_request *req,
3600                                 struct osc_async_args *aa, int rc)
3601 {
3602         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3603         struct obd_statfs *msfs;
3604         __u64 used;
3605         ENTRY;
3606
3607         if (rc == -EBADR)
3608                 /* The request has in fact never been sent
3609                  * due to issues at a higher level (LOV).
3610                  * Exit immediately since the caller is
3611                  * aware of the problem and takes care
3612                  * of the clean up */
3613                  RETURN(rc);
3614
3615         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3616             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3617                 GOTO(out, rc = 0);
3618
3619         if (rc != 0)
3620                 GOTO(out, rc);
3621
3622         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3623         if (msfs == NULL) {
3624                 GOTO(out, rc = -EPROTO);
3625         }
3626
3627         /* Reinitialize the RDONLY and DEGRADED flags at the client
3628          * on each statfs, so they don't stay set permanently. */
3629         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3630
3631         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3632                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3633         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3634                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3635
3636         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3637                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3638         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3639                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3640
3641         /* Add a bit of hysteresis so this flag isn't continually flapping,
3642          * and ensure that new files don't get extremely fragmented due to
3643          * only a small amount of available space in the filesystem.
3644          * We want to set the NOSPC flag when there is less than ~0.1% free
3645          * and clear it when there is at least ~0.2% free space, so:
3646          *                   avail < ~0.1% max          max = avail + used
3647          *            1025 * avail < avail + used       used = blocks - free
3648          *            1024 * avail < used
3649          *            1024 * avail < blocks - free                      
3650          *                   avail < ((blocks - free) >> 10)    
3651          *
3652          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3653          * lose that amount of space so in those cases we report no space left
3654          * if their is less than 1 GB left.                             */
3655         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3656         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3657                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3658                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3659         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3660                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3661                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3662
3663         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3664
3665         *aa->aa_oi->oi_osfs = *msfs;
3666 out:
3667         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3668         RETURN(rc);
3669 }
3670
3671 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3672                             __u64 max_age, struct ptlrpc_request_set *rqset)
3673 {
3674         struct ptlrpc_request *req;
3675         struct osc_async_args *aa;
3676         int                    rc;
3677         ENTRY;
3678
3679         /* We could possibly pass max_age in the request (as an absolute
3680          * timestamp or a "seconds.usec ago") so the target can avoid doing
3681          * extra calls into the filesystem if that isn't necessary (e.g.
3682          * during mount that would help a bit).  Having relative timestamps
3683          * is not so great if request processing is slow, while absolute
3684          * timestamps are not ideal because they need time synchronization. */
3685         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3686         if (req == NULL)
3687                 RETURN(-ENOMEM);
3688
3689         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3690         if (rc) {
3691                 ptlrpc_request_free(req);
3692                 RETURN(rc);
3693         }
3694         ptlrpc_request_set_replen(req);
3695         req->rq_request_portal = OST_CREATE_PORTAL;
3696         ptlrpc_at_set_req_timeout(req);
3697
3698         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3699                 /* procfs requests not want stat in wait for avoid deadlock */
3700                 req->rq_no_resend = 1;
3701                 req->rq_no_delay = 1;
3702         }
3703
3704         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3705         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3706         aa = ptlrpc_req_async_args(req);
3707         aa->aa_oi = oinfo;
3708
3709         ptlrpc_set_add_req(rqset, req);
3710         RETURN(0);
3711 }
3712
3713 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3714                       __u64 max_age, __u32 flags)
3715 {
3716         struct obd_statfs     *msfs;
3717         struct ptlrpc_request *req;
3718         struct obd_import     *imp = NULL;
3719         int rc;
3720         ENTRY;
3721
3722         /*Since the request might also come from lprocfs, so we need
3723          *sync this with client_disconnect_export Bug15684*/
3724         cfs_down_read(&obd->u.cli.cl_sem);
3725         if (obd->u.cli.cl_import)
3726                 imp = class_import_get(obd->u.cli.cl_import);
3727         cfs_up_read(&obd->u.cli.cl_sem);
3728         if (!imp)
3729                 RETURN(-ENODEV);
3730
3731         /* We could possibly pass max_age in the request (as an absolute
3732          * timestamp or a "seconds.usec ago") so the target can avoid doing
3733          * extra calls into the filesystem if that isn't necessary (e.g.
3734          * during mount that would help a bit).  Having relative timestamps
3735          * is not so great if request processing is slow, while absolute
3736          * timestamps are not ideal because they need time synchronization. */
3737         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3738
3739         class_import_put(imp);
3740
3741         if (req == NULL)
3742                 RETURN(-ENOMEM);
3743
3744         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3745         if (rc) {
3746                 ptlrpc_request_free(req);
3747                 RETURN(rc);
3748         }
3749         ptlrpc_request_set_replen(req);
3750         req->rq_request_portal = OST_CREATE_PORTAL;
3751         ptlrpc_at_set_req_timeout(req);
3752
3753         if (flags & OBD_STATFS_NODELAY) {
3754                 /* procfs requests not want stat in wait for avoid deadlock */
3755                 req->rq_no_resend = 1;
3756                 req->rq_no_delay = 1;
3757         }
3758
3759         rc = ptlrpc_queue_wait(req);
3760         if (rc)
3761                 GOTO(out, rc);
3762
3763         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3764         if (msfs == NULL) {
3765                 GOTO(out, rc = -EPROTO);
3766         }
3767
3768         *osfs = *msfs;
3769
3770         EXIT;
3771  out:
3772         ptlrpc_req_finished(req);
3773         return rc;
3774 }
3775
3776 /* Retrieve object striping information.
3777  *
3778  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3779  * the maximum number of OST indices which will fit in the user buffer.
3780  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3781  */
3782 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3783 {
3784         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3785         struct lov_user_md_v3 lum, *lumk;
3786         struct lov_user_ost_data_v1 *lmm_objects;
3787         int rc = 0, lum_size;
3788         ENTRY;
3789
3790         if (!lsm)
3791                 RETURN(-ENODATA);
3792
3793         /* we only need the header part from user space to get lmm_magic and
3794          * lmm_stripe_count, (the header part is common to v1 and v3) */
3795         lum_size = sizeof(struct lov_user_md_v1);
3796         if (cfs_copy_from_user(&lum, lump, lum_size))
3797                 RETURN(-EFAULT);
3798
3799         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3800             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3801                 RETURN(-EINVAL);
3802
3803         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3804         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3805         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3806         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3807
3808         /* we can use lov_mds_md_size() to compute lum_size
3809          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3810         if (lum.lmm_stripe_count > 0) {
3811                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3812                 OBD_ALLOC(lumk, lum_size);
3813                 if (!lumk)
3814                         RETURN(-ENOMEM);
3815
3816                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3817                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3818                 else
3819                         lmm_objects = &(lumk->lmm_objects[0]);
3820                 lmm_objects->l_object_id = lsm->lsm_object_id;
3821         } else {
3822                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3823                 lumk = &lum;
3824         }
3825
3826         lumk->lmm_object_id = lsm->lsm_object_id;
3827         lumk->lmm_object_seq = lsm->lsm_object_seq;
3828         lumk->lmm_stripe_count = 1;
3829
3830         if (cfs_copy_to_user(lump, lumk, lum_size))
3831                 rc = -EFAULT;
3832
3833         if (lumk != &lum)
3834                 OBD_FREE(lumk, lum_size);
3835
3836         RETURN(rc);
3837 }
3838
3839
3840 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3841                          void *karg, void *uarg)
3842 {
3843         struct obd_device *obd = exp->exp_obd;
3844         struct obd_ioctl_data *data = karg;
3845         int err = 0;
3846         ENTRY;
3847
3848         if (!cfs_try_module_get(THIS_MODULE)) {
3849                 CERROR("Can't get module. Is it alive?");
3850                 return -EINVAL;
3851         }
3852         switch (cmd) {
3853         case OBD_IOC_LOV_GET_CONFIG: {
3854                 char *buf;
3855                 struct lov_desc *desc;
3856                 struct obd_uuid uuid;
3857
3858                 buf = NULL;
3859                 len = 0;
3860                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3861                         GOTO(out, err = -EINVAL);
3862
3863                 data = (struct obd_ioctl_data *)buf;
3864
3865                 if (sizeof(*desc) > data->ioc_inllen1) {
3866                         obd_ioctl_freedata(buf, len);
3867                         GOTO(out, err = -EINVAL);
3868                 }
3869
3870                 if (data->ioc_inllen2 < sizeof(uuid)) {
3871                         obd_ioctl_freedata(buf, len);
3872                         GOTO(out, err = -EINVAL);
3873                 }
3874
3875                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3876                 desc->ld_tgt_count = 1;
3877                 desc->ld_active_tgt_count = 1;
3878                 desc->ld_default_stripe_count = 1;
3879                 desc->ld_default_stripe_size = 0;
3880                 desc->ld_default_stripe_offset = 0;
3881                 desc->ld_pattern = 0;
3882                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3883
3884                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3885
3886                 err = cfs_copy_to_user((void *)uarg, buf, len);
3887                 if (err)
3888                         err = -EFAULT;
3889                 obd_ioctl_freedata(buf, len);
3890                 GOTO(out, err);
3891         }
3892         case LL_IOC_LOV_SETSTRIPE:
3893                 err = obd_alloc_memmd(exp, karg);
3894                 if (err > 0)
3895                         err = 0;
3896                 GOTO(out, err);
3897         case LL_IOC_LOV_GETSTRIPE:
3898                 err = osc_getstripe(karg, uarg);
3899                 GOTO(out, err);
3900         case OBD_IOC_CLIENT_RECOVER:
3901                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3902                                             data->ioc_inlbuf1);
3903                 if (err > 0)
3904                         err = 0;
3905                 GOTO(out, err);
3906         case IOC_OSC_SET_ACTIVE:
3907                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3908                                                data->ioc_offset);
3909                 GOTO(out, err);
3910         case OBD_IOC_POLL_QUOTACHECK:
3911                 err = lquota_poll_check(quota_interface, exp,
3912                                         (struct if_quotacheck *)karg);
3913                 GOTO(out, err);
3914         case OBD_IOC_PING_TARGET:
3915                 err = ptlrpc_obd_ping(obd);
3916                 GOTO(out, err);
3917         default:
3918                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3919                        cmd, cfs_curproc_comm());
3920                 GOTO(out, err = -ENOTTY);
3921         }
3922 out:
3923         cfs_module_put(THIS_MODULE);
3924         return err;
3925 }
3926
3927 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3928                         void *key, __u32 *vallen, void *val,
3929                         struct lov_stripe_md *lsm)
3930 {
3931         ENTRY;
3932         if (!vallen || !val)
3933                 RETURN(-EFAULT);
3934
3935         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3936                 __u32 *stripe = val;
3937                 *vallen = sizeof(*stripe);
3938                 *stripe = 0;
3939                 RETURN(0);
3940         } else if (KEY_IS(KEY_LAST_ID)) {
3941                 struct ptlrpc_request *req;
3942                 obd_id                *reply;
3943                 char                  *tmp;
3944                 int                    rc;
3945
3946                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3947                                            &RQF_OST_GET_INFO_LAST_ID);
3948                 if (req == NULL)
3949                         RETURN(-ENOMEM);
3950
3951                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3952                                      RCL_CLIENT, keylen);
3953                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3954                 if (rc) {
3955                         ptlrpc_request_free(req);
3956                         RETURN(rc);
3957                 }
3958
3959                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3960                 memcpy(tmp, key, keylen);
3961
3962                 req->rq_no_delay = req->rq_no_resend = 1;
3963                 ptlrpc_request_set_replen(req);
3964                 rc = ptlrpc_queue_wait(req);
3965                 if (rc)
3966                         GOTO(out, rc);
3967
3968                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3969                 if (reply == NULL)
3970                         GOTO(out, rc = -EPROTO);
3971
3972                 *((obd_id *)val) = *reply;
3973         out:
3974                 ptlrpc_req_finished(req);
3975                 RETURN(rc);
3976         } else if (KEY_IS(KEY_FIEMAP)) {
3977                 struct ptlrpc_request *req;
3978                 struct ll_user_fiemap *reply;
3979                 char *tmp;
3980                 int rc;
3981
3982                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3983                                            &RQF_OST_GET_INFO_FIEMAP);
3984                 if (req == NULL)
3985                         RETURN(-ENOMEM);
3986
3987                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3988                                      RCL_CLIENT, keylen);
3989                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3990                                      RCL_CLIENT, *vallen);
3991                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3992                                      RCL_SERVER, *vallen);
3993
3994                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3995                 if (rc) {
3996                         ptlrpc_request_free(req);
3997                         RETURN(rc);
3998                 }
3999
4000                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4001                 memcpy(tmp, key, keylen);
4002                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4003                 memcpy(tmp, val, *vallen);
4004
4005                 ptlrpc_request_set_replen(req);
4006                 rc = ptlrpc_queue_wait(req);
4007                 if (rc)
4008                         GOTO(out1, rc);
4009
4010                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4011                 if (reply == NULL)
4012                         GOTO(out1, rc = -EPROTO);
4013
4014                 memcpy(val, reply, *vallen);
4015         out1:
4016                 ptlrpc_req_finished(req);
4017
4018                 RETURN(rc);
4019         }
4020
4021         RETURN(-EINVAL);
4022 }
4023
4024 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4025 {
4026         struct llog_ctxt *ctxt;
4027         int rc = 0;
4028         ENTRY;
4029
4030         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4031         if (ctxt) {
4032                 rc = llog_initiator_connect(ctxt);
4033                 llog_ctxt_put(ctxt);
4034         } else {
4035                 /* XXX return an error? skip setting below flags? */
4036         }
4037
4038         cfs_spin_lock(&imp->imp_lock);
4039         imp->imp_server_timeout = 1;
4040         imp->imp_pingable = 1;
4041         cfs_spin_unlock(&imp->imp_lock);
4042         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4043
4044         RETURN(rc);
4045 }
4046
4047 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4048                                           struct ptlrpc_request *req,
4049                                           void *aa, int rc)
4050 {
4051         ENTRY;
4052         if (rc != 0)
4053                 RETURN(rc);
4054
4055         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4056 }
4057
4058 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4059                               void *key, obd_count vallen, void *val,
4060                               struct ptlrpc_request_set *set)
4061 {
4062         struct ptlrpc_request *req;
4063         struct obd_device     *obd = exp->exp_obd;
4064         struct obd_import     *imp = class_exp2cliimp(exp);
4065         char                  *tmp;
4066         int                    rc;
4067         ENTRY;
4068
4069         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4070
4071         if (KEY_IS(KEY_NEXT_ID)) {
4072                 obd_id new_val;
4073                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4074
4075                 if (vallen != sizeof(obd_id))
4076                         RETURN(-ERANGE);
4077                 if (val == NULL)
4078                         RETURN(-EINVAL);
4079
4080                 if (vallen != sizeof(obd_id))
4081                         RETURN(-EINVAL);
4082
4083                 /* avoid race between allocate new object and set next id
4084                  * from ll_sync thread */
4085                 cfs_spin_lock(&oscc->oscc_lock);
4086                 new_val = *((obd_id*)val) + 1;
4087                 if (new_val > oscc->oscc_next_id)
4088                         oscc->oscc_next_id = new_val;
4089                 cfs_spin_unlock(&oscc->oscc_lock);
4090                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4091                        exp->exp_obd->obd_name,
4092                        obd->u.cli.cl_oscc.oscc_next_id);
4093
4094                 RETURN(0);
4095         }
4096
4097         if (KEY_IS(KEY_CHECKSUM)) {
4098                 if (vallen != sizeof(int))
4099                         RETURN(-EINVAL);
4100                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4101                 RETURN(0);
4102         }
4103
4104         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4105                 sptlrpc_conf_client_adapt(obd);
4106                 RETURN(0);
4107         }
4108
4109         if (KEY_IS(KEY_FLUSH_CTX)) {
4110                 sptlrpc_import_flush_my_ctx(imp);
4111                 RETURN(0);
4112         }
4113
4114         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4115                 RETURN(-EINVAL);
4116
4117         /* We pass all other commands directly to OST. Since nobody calls osc
4118            methods directly and everybody is supposed to go through LOV, we
4119            assume lov checked invalid values for us.
4120            The only recognised values so far are evict_by_nid and mds_conn.
4121            Even if something bad goes through, we'd get a -EINVAL from OST
4122            anyway. */
4123
4124         if (KEY_IS(KEY_GRANT_SHRINK))
4125                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4126         else
4127                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4128
4129         if (req == NULL)
4130                 RETURN(-ENOMEM);
4131
4132         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4133                              RCL_CLIENT, keylen);
4134         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4135                              RCL_CLIENT, vallen);
4136         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4137         if (rc) {
4138                 ptlrpc_request_free(req);
4139                 RETURN(rc);
4140         }
4141
4142         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4143         memcpy(tmp, key, keylen);
4144         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4145         memcpy(tmp, val, vallen);
4146
4147         if (KEY_IS(KEY_MDS_CONN)) {
4148                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4149
4150                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4151                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4152                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4153                 req->rq_no_delay = req->rq_no_resend = 1;
4154                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4155         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4156                 struct osc_grant_args *aa;
4157                 struct obdo *oa;
4158
4159                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4160                 aa = ptlrpc_req_async_args(req);
4161                 OBDO_ALLOC(oa);
4162                 if (!oa) {
4163                         ptlrpc_req_finished(req);
4164                         RETURN(-ENOMEM);
4165                 }
4166                 *oa = ((struct ost_body *)val)->oa;
4167                 aa->aa_oa = oa;
4168                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4169         }
4170
4171         ptlrpc_request_set_replen(req);
4172         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4173                 LASSERT(set != NULL);
4174                 ptlrpc_set_add_req(set, req);
4175                 ptlrpc_check_set(NULL, set);
4176         } else
4177                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4178
4179         RETURN(0);
4180 }
4181
4182
4183 static struct llog_operations osc_size_repl_logops = {
4184         lop_cancel: llog_obd_repl_cancel
4185 };
4186
4187 static struct llog_operations osc_mds_ost_orig_logops;
4188
4189 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4190                            struct obd_device *tgt, struct llog_catid *catid)
4191 {
4192         int rc;
4193         ENTRY;
4194
4195         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4196                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4197         if (rc) {
4198                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4199                 GOTO(out, rc);
4200         }
4201
4202         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4203                         NULL, &osc_size_repl_logops);
4204         if (rc) {
4205                 struct llog_ctxt *ctxt =
4206                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4207                 if (ctxt)
4208                         llog_cleanup(ctxt);
4209                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4210         }
4211         GOTO(out, rc);
4212 out:
4213         if (rc) {
4214                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4215                        obd->obd_name, tgt->obd_name, catid, rc);
4216                 CERROR("logid "LPX64":0x%x\n",
4217                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4218         }
4219         return rc;
4220 }
4221
4222 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4223                          struct obd_device *disk_obd, int *index)
4224 {
4225         struct llog_catid catid;
4226         static char name[32] = CATLIST;
4227         int rc;
4228         ENTRY;
4229
4230         LASSERT(olg == &obd->obd_olg);
4231
4232         cfs_mutex_down(&olg->olg_cat_processing);
4233         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4234         if (rc) {
4235                 CERROR("rc: %d\n", rc);
4236                 GOTO(out, rc);
4237         }
4238
4239         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4240                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4241                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4242
4243         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4244         if (rc) {
4245                 CERROR("rc: %d\n", rc);
4246                 GOTO(out, rc);
4247         }
4248
4249         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4250         if (rc) {
4251                 CERROR("rc: %d\n", rc);
4252                 GOTO(out, rc);
4253         }
4254
4255  out:
4256         cfs_mutex_up(&olg->olg_cat_processing);
4257
4258         return rc;
4259 }
4260
4261 static int osc_llog_finish(struct obd_device *obd, int count)
4262 {
4263         struct llog_ctxt *ctxt;
4264         int rc = 0, rc2 = 0;
4265         ENTRY;
4266
4267         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4268         if (ctxt)
4269                 rc = llog_cleanup(ctxt);
4270
4271         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4272         if (ctxt)
4273                 rc2 = llog_cleanup(ctxt);
4274         if (!rc)
4275                 rc = rc2;
4276
4277         RETURN(rc);
4278 }
4279
4280 static int osc_reconnect(const struct lu_env *env,
4281                          struct obd_export *exp, struct obd_device *obd,
4282                          struct obd_uuid *cluuid,
4283                          struct obd_connect_data *data,
4284                          void *localdata)
4285 {
4286         struct client_obd *cli = &obd->u.cli;
4287
4288         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4289                 long lost_grant;
4290
4291                 client_obd_list_lock(&cli->cl_loi_list_lock);
4292                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4293                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4294                 lost_grant = cli->cl_lost_grant;
4295                 cli->cl_lost_grant = 0;
4296                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4297
4298                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4299                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4300                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4301                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4302                        " ocd_grant: %d\n", data->ocd_connect_flags,
4303                        data->ocd_version, data->ocd_grant);
4304         }
4305
4306         RETURN(0);
4307 }
4308
4309 static int osc_disconnect(struct obd_export *exp)
4310 {
4311         struct obd_device *obd = class_exp2obd(exp);
4312         struct llog_ctxt  *ctxt;
4313         int rc;
4314
4315         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4316         if (ctxt) {
4317                 if (obd->u.cli.cl_conn_count == 1) {
4318                         /* Flush any remaining cancel messages out to the
4319                          * target */
4320                         llog_sync(ctxt, exp);
4321                 }
4322                 llog_ctxt_put(ctxt);
4323         } else {
4324                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4325                        obd);
4326         }
4327
4328         rc = client_disconnect_export(exp);
4329         /**
4330          * Initially we put del_shrink_grant before disconnect_export, but it
4331          * causes the following problem if setup (connect) and cleanup
4332          * (disconnect) are tangled together.
4333          *      connect p1                     disconnect p2
4334          *   ptlrpc_connect_import
4335          *     ...............               class_manual_cleanup
4336          *                                     osc_disconnect
4337          *                                     del_shrink_grant
4338          *   ptlrpc_connect_interrupt
4339          *     init_grant_shrink
4340          *   add this client to shrink list
4341          *                                      cleanup_osc
4342          * Bang! pinger trigger the shrink.
4343          * So the osc should be disconnected from the shrink list, after we
4344          * are sure the import has been destroyed. BUG18662
4345          */
4346         if (obd->u.cli.cl_import == NULL)
4347                 osc_del_shrink_grant(&obd->u.cli);
4348         return rc;
4349 }
4350
4351 static int osc_import_event(struct obd_device *obd,
4352                             struct obd_import *imp,
4353                             enum obd_import_event event)
4354 {
4355         struct client_obd *cli;
4356         int rc = 0;
4357
4358         ENTRY;
4359         LASSERT(imp->imp_obd == obd);
4360
4361         switch (event) {
4362         case IMP_EVENT_DISCON: {
4363                 /* Only do this on the MDS OSC's */
4364                 if (imp->imp_server_timeout) {
4365                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4366
4367                         cfs_spin_lock(&oscc->oscc_lock);
4368                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4369                         cfs_spin_unlock(&oscc->oscc_lock);
4370                 }
4371                 cli = &obd->u.cli;
4372                 client_obd_list_lock(&cli->cl_loi_list_lock);
4373                 cli->cl_avail_grant = 0;
4374                 cli->cl_lost_grant = 0;
4375                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4376                 break;
4377         }
4378         case IMP_EVENT_INACTIVE: {
4379                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4380                 break;
4381         }
4382         case IMP_EVENT_INVALIDATE: {
4383                 struct ldlm_namespace *ns = obd->obd_namespace;
4384                 struct lu_env         *env;
4385                 int                    refcheck;
4386
4387                 env = cl_env_get(&refcheck);
4388                 if (!IS_ERR(env)) {
4389                         /* Reset grants */
4390                         cli = &obd->u.cli;
4391                         client_obd_list_lock(&cli->cl_loi_list_lock);
4392                         /* all pages go to failing rpcs due to the invalid
4393                          * import */
4394                         osc_check_rpcs(env, cli);
4395                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4396
4397                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4398                         cl_env_put(env, &refcheck);
4399                 } else
4400                         rc = PTR_ERR(env);
4401                 break;
4402         }
4403         case IMP_EVENT_ACTIVE: {
4404                 /* Only do this on the MDS OSC's */
4405                 if (imp->imp_server_timeout) {
4406                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4407
4408                         cfs_spin_lock(&oscc->oscc_lock);
4409                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4410                         cfs_spin_unlock(&oscc->oscc_lock);
4411                 }
4412                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4413                 break;
4414         }
4415         case IMP_EVENT_OCD: {
4416                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4417
4418                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4419                         osc_init_grant(&obd->u.cli, ocd);
4420
4421                 /* See bug 7198 */
4422                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4423                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4424
4425                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4426                 break;
4427         }
4428         default:
4429                 CERROR("Unknown import event %d\n", event);
4430                 LBUG();
4431         }
4432         RETURN(rc);
4433 }
4434
4435 /**
4436  * Determine whether the lock can be canceled before replaying the lock
4437  * during recovery, see bug16774 for detailed information.
4438  *
4439  * \retval zero the lock can't be canceled
4440  * \retval other ok to cancel
4441  */
4442 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4443 {
4444         check_res_locked(lock->l_resource);
4445
4446         /*
4447          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4448          *
4449          * XXX as a future improvement, we can also cancel unused write lock
4450          * if it doesn't have dirty data and active mmaps.
4451          */
4452         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4453             (lock->l_granted_mode == LCK_PR ||
4454              lock->l_granted_mode == LCK_CR) &&
4455             (osc_dlm_lock_pageref(lock) == 0))
4456                 RETURN(1);
4457
4458         RETURN(0);
4459 }
4460
4461 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4462 {
4463         int rc;
4464         ENTRY;
4465
4466         ENTRY;
4467         rc = ptlrpcd_addref();
4468         if (rc)
4469                 RETURN(rc);
4470
4471         rc = client_obd_setup(obd, lcfg);
4472         if (rc) {
4473                 ptlrpcd_decref();
4474         } else {
4475                 struct lprocfs_static_vars lvars = { 0 };
4476                 struct client_obd *cli = &obd->u.cli;
4477
4478                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4479                 lprocfs_osc_init_vars(&lvars);
4480                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4481                         lproc_osc_attach_seqstat(obd);
4482                         sptlrpc_lprocfs_cliobd_attach(obd);
4483                         ptlrpc_lprocfs_register_obd(obd);
4484                 }
4485
4486                 oscc_init(obd);
4487                 /* We need to allocate a few requests more, because
4488                    brw_interpret tries to create new requests before freeing
4489                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4490                    reserved, but I afraid that might be too much wasted RAM
4491                    in fact, so 2 is just my guess and still should work. */
4492                 cli->cl_import->imp_rq_pool =
4493                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4494                                             OST_MAXREQSIZE,
4495                                             ptlrpc_add_rqs_to_pool);
4496
4497                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4498                 cfs_sema_init(&cli->cl_grant_sem, 1);
4499
4500                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4501         }
4502
4503         RETURN(rc);
4504 }
4505
4506 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4507 {
4508         int rc = 0;
4509         ENTRY;
4510
4511         switch (stage) {
4512         case OBD_CLEANUP_EARLY: {
4513                 struct obd_import *imp;
4514                 imp = obd->u.cli.cl_import;
4515                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4516                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4517                 ptlrpc_deactivate_import(imp);
4518                 cfs_spin_lock(&imp->imp_lock);
4519                 imp->imp_pingable = 0;
4520                 cfs_spin_unlock(&imp->imp_lock);
4521                 break;
4522         }
4523         case OBD_CLEANUP_EXPORTS: {
4524                 /* If we set up but never connected, the
4525                    client import will not have been cleaned. */
4526                 if (obd->u.cli.cl_import) {
4527                         struct obd_import *imp;
4528                         cfs_down_write(&obd->u.cli.cl_sem);
4529                         imp = obd->u.cli.cl_import;
4530                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4531                                obd->obd_name);
4532                         ptlrpc_invalidate_import(imp);
4533                         if (imp->imp_rq_pool) {
4534                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4535                                 imp->imp_rq_pool = NULL;
4536                         }
4537                         class_destroy_import(imp);
4538                         cfs_up_write(&obd->u.cli.cl_sem);
4539                         obd->u.cli.cl_import = NULL;
4540                 }
4541                 rc = obd_llog_finish(obd, 0);
4542                 if (rc != 0)
4543                         CERROR("failed to cleanup llogging subsystems\n");
4544                 break;
4545                 }
4546         }
4547         RETURN(rc);
4548 }
4549
4550 int osc_cleanup(struct obd_device *obd)
4551 {
4552         int rc;
4553
4554         ENTRY;
4555         ptlrpc_lprocfs_unregister_obd(obd);
4556         lprocfs_obd_cleanup(obd);
4557
4558         /* free memory of osc quota cache */
4559         lquota_cleanup(quota_interface, obd);
4560
4561         rc = client_obd_cleanup(obd);
4562
4563         ptlrpcd_decref();
4564         RETURN(rc);
4565 }
4566
4567 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4568 {
4569         struct lprocfs_static_vars lvars = { 0 };
4570         int rc = 0;
4571
4572         lprocfs_osc_init_vars(&lvars);
4573
4574         switch (lcfg->lcfg_command) {
4575         default:
4576                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4577                                               lcfg, obd);
4578                 if (rc > 0)
4579                         rc = 0;
4580                 break;
4581         }
4582
4583         return(rc);
4584 }
4585
4586 static int osc_sync_fs(struct obd_device *obd, struct obd_info *oinfo,
4587                        int wait)
4588 {
4589         struct client_obd *cli;
4590         struct lov_oinfo *loi;
4591         struct lov_oinfo *tloi;
4592         struct osc_async_page *oap;
4593         struct osc_async_page *toap;
4594         struct loi_oap_pages *lop;
4595         struct lu_env *env;
4596         int refcheck;
4597         int rc = 0;
4598         ENTRY;
4599
4600         env = cl_env_get(&refcheck);
4601         if (IS_ERR(env))
4602                 RETURN(PTR_ERR(env));
4603
4604         cli = &obd->u.cli;
4605         client_obd_list_lock(&cli->cl_loi_list_lock);
4606         cli->cl_sf_wait.sfw_oi = oinfo;
4607         cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
4608         cli->cl_sf_wait.started = 1;
4609         /* creating cl_loi_sync_fs list */
4610         cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
4611                                      loi_write_item) {
4612                 lop = &loi->loi_write_lop;
4613                 cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
4614                                              oap_pending_item)
4615                         osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
4616         }
4617
4618         osc_check_rpcs(env, cli);
4619         osc_wake_sync_fs(cli);
4620         client_obd_list_unlock(&cli->cl_loi_list_lock);
4621         cl_env_put(env, &refcheck);
4622         RETURN(rc);
4623 }
4624
4625 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4626 {
4627         return osc_process_config_base(obd, buf);
4628 }
4629
4630 struct obd_ops osc_obd_ops = {
4631         .o_owner                = THIS_MODULE,
4632         .o_setup                = osc_setup,
4633         .o_precleanup           = osc_precleanup,
4634         .o_cleanup              = osc_cleanup,
4635         .o_add_conn             = client_import_add_conn,
4636         .o_del_conn             = client_import_del_conn,
4637         .o_connect              = client_connect_import,
4638         .o_reconnect            = osc_reconnect,
4639         .o_disconnect           = osc_disconnect,
4640         .o_statfs               = osc_statfs,
4641         .o_statfs_async         = osc_statfs_async,
4642         .o_packmd               = osc_packmd,
4643         .o_unpackmd             = osc_unpackmd,
4644         .o_precreate            = osc_precreate,
4645         .o_create               = osc_create,
4646         .o_create_async         = osc_create_async,
4647         .o_destroy              = osc_destroy,
4648         .o_getattr              = osc_getattr,
4649         .o_getattr_async        = osc_getattr_async,
4650         .o_setattr              = osc_setattr,
4651         .o_setattr_async        = osc_setattr_async,
4652         .o_brw                  = osc_brw,
4653         .o_punch                = osc_punch,
4654         .o_sync                 = osc_sync,
4655         .o_enqueue              = osc_enqueue,
4656         .o_change_cbdata        = osc_change_cbdata,
4657         .o_find_cbdata          = osc_find_cbdata,
4658         .o_cancel               = osc_cancel,
4659         .o_cancel_unused        = osc_cancel_unused,
4660         .o_iocontrol            = osc_iocontrol,
4661         .o_get_info             = osc_get_info,
4662         .o_set_info_async       = osc_set_info_async,
4663         .o_import_event         = osc_import_event,
4664         .o_llog_init            = osc_llog_init,
4665         .o_llog_finish          = osc_llog_finish,
4666         .o_process_config       = osc_process_config,
4667         .o_sync_fs              = osc_sync_fs,
4668 };
4669
4670 extern struct lu_kmem_descr osc_caches[];
4671 extern cfs_spinlock_t       osc_ast_guard;
4672 extern cfs_lock_class_key_t osc_ast_guard_class;
4673
4674 int __init osc_init(void)
4675 {
4676         struct lprocfs_static_vars lvars = { 0 };
4677         int rc;
4678         ENTRY;
4679
4680         /* print an address of _any_ initialized kernel symbol from this
4681          * module, to allow debugging with gdb that doesn't support data
4682          * symbols from modules.*/
4683         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4684
4685         rc = lu_kmem_init(osc_caches);
4686
4687         lprocfs_osc_init_vars(&lvars);
4688
4689         cfs_request_module("lquota");
4690         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4691         lquota_init(quota_interface);
4692         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4693
4694         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4695                                  LUSTRE_OSC_NAME, &osc_device_type);
4696         if (rc) {
4697                 if (quota_interface)
4698                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4699                 lu_kmem_fini(osc_caches);
4700                 RETURN(rc);
4701         }
4702
4703         cfs_spin_lock_init(&osc_ast_guard);
4704         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4705
4706         osc_mds_ost_orig_logops = llog_lvfs_ops;
4707         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4708         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4709         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4710         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4711
4712         RETURN(rc);
4713 }
4714
4715 #ifdef __KERNEL__
4716 static void /*__exit*/ osc_exit(void)
4717 {
4718         lu_device_type_fini(&osc_device_type);
4719
4720         lquota_exit(quota_interface);
4721         if (quota_interface)
4722                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4723
4724         class_unregister_type(LUSTRE_OSC_NAME);
4725         lu_kmem_fini(osc_caches);
4726 }
4727
4728 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4729 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4730 MODULE_LICENSE("GPL");
4731
4732 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4733 #endif