Whamcloud - gitweb
f4c5dd0a180bfa9031113938d2343e0e0c00ee7e
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568                     struct lov_stripe_md *md, obd_size start, obd_size end,
569                     void *capa)
570 {
571         struct ptlrpc_request *req;
572         struct ost_body       *body;
573         int                    rc;
574         ENTRY;
575
576         if (!oa) {
577                 CDEBUG(D_INFO, "oa NULL\n");
578                 RETURN(-EINVAL);
579         }
580
581         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
582         if (req == NULL)
583                 RETURN(-ENOMEM);
584
585         osc_set_capa_size(req, &RMF_CAPA1, capa);
586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
587         if (rc) {
588                 ptlrpc_request_free(req);
589                 RETURN(rc);
590         }
591
592         /* overload the size and blocks fields in the oa with start/end */
593         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
594         LASSERT(body);
595         lustre_set_wire_obdo(&body->oa, oa);
596         body->oa.o_size = start;
597         body->oa.o_blocks = end;
598         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599         osc_pack_capa(req, body, capa);
600
601         ptlrpc_request_set_replen(req);
602
603         rc = ptlrpc_queue_wait(req);
604         if (rc)
605                 GOTO(out, rc);
606
607         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
608         if (body == NULL)
609                 GOTO(out, rc = -EPROTO);
610
611         lustre_get_wire_obdo(oa, &body->oa);
612
613         EXIT;
614  out:
615         ptlrpc_req_finished(req);
616         return rc;
617 }
618
619 /* Find and cancel locally locks matched by @mode in the resource found by
620  * @objid. Found locks are added into @cancel list. Returns the amount of
621  * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
623                                    cfs_list_t *cancels,
624                                    ldlm_mode_t mode, int lock_flags)
625 {
626         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627         struct ldlm_res_id res_id;
628         struct ldlm_resource *res;
629         int count;
630         ENTRY;
631
632         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         LDLM_RESOURCE_ADDREF(res);
638         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639                                            lock_flags, 0, NULL);
640         LDLM_RESOURCE_DELREF(res);
641         ldlm_resource_putref(res);
642         RETURN(count);
643 }
644
645 static int osc_destroy_interpret(const struct lu_env *env,
646                                  struct ptlrpc_request *req, void *data,
647                                  int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650
651         cfs_atomic_dec(&cli->cl_destroy_in_flight);
652         cfs_waitq_signal(&cli->cl_destroy_waitq);
653         return 0;
654 }
655
656 static int osc_can_send_destroy(struct client_obd *cli)
657 {
658         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659             cli->cl_max_rpcs_in_flight) {
660                 /* The destroy request can be sent */
661                 return 1;
662         }
663         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664             cli->cl_max_rpcs_in_flight) {
665                 /*
666                  * The counter has been modified between the two atomic
667                  * operations.
668                  */
669                 cfs_waitq_signal(&cli->cl_destroy_waitq);
670         }
671         return 0;
672 }
673
674 /* Destroy requests can be async always on the client, and we don't even really
675  * care about the return code since the client cannot do anything at all about
676  * a destroy failure.
677  * When the MDS is unlinking a filename, it saves the file objects into a
678  * recovery llog, and these object records are cancelled when the OST reports
679  * they were destroyed and sync'd to disk (i.e. transaction committed).
680  * If the client dies, or the OST is down when the object should be destroyed,
681  * the records are not cancelled, and when the OST reconnects to the MDS next,
682  * it will retrieve the llog unlink logs and then sends the log cancellation
683  * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
686                        struct obd_export *md_export, void *capa)
687 {
688         struct client_obd     *cli = &exp->exp_obd->u.cli;
689         struct ptlrpc_request *req;
690         struct ost_body       *body;
691         CFS_LIST_HEAD(cancels);
692         int rc, count;
693         ENTRY;
694
695         if (!oa) {
696                 CDEBUG(D_INFO, "oa NULL\n");
697                 RETURN(-EINVAL);
698         }
699
700         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701                                         LDLM_FL_DISCARD_DATA);
702
703         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
704         if (req == NULL) {
705                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
706                 RETURN(-ENOMEM);
707         }
708
709         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
711                                0, &cancels, count);
712         if (rc) {
713                 ptlrpc_request_free(req);
714                 RETURN(rc);
715         }
716
717         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718         ptlrpc_at_set_req_timeout(req);
719
720         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721                 oa->o_lcookie = *oti->oti_logcookies;
722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
723         LASSERT(body);
724         lustre_set_wire_obdo(&body->oa, oa);
725
726         osc_pack_capa(req, body, (struct obd_capa *)capa);
727         ptlrpc_request_set_replen(req);
728
729         /* don't throttle destroy RPCs for the MDT */
730         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731                 req->rq_interpret_reply = osc_destroy_interpret;
732                 if (!osc_can_send_destroy(cli)) {
733                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
734                                                           NULL);
735
736                         /*
737                          * Wait until the number of on-going destroy RPCs drops
738                          * under max_rpc_in_flight
739                          */
740                         l_wait_event_exclusive(cli->cl_destroy_waitq,
741                                                osc_can_send_destroy(cli), &lwi);
742                 }
743         }
744
745         /* Do not wait for response */
746         ptlrpcd_add_req(req, PSCOPE_OTHER);
747         RETURN(0);
748 }
749
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
751                                 long writing_bytes)
752 {
753         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
754
755         LASSERT(!(oa->o_valid & bits));
756
757         oa->o_valid |= bits;
758         client_obd_list_lock(&cli->cl_loi_list_lock);
759         oa->o_dirty = cli->cl_dirty;
760         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
763                 oa->o_undirty = 0;
764         } else if (cfs_atomic_read(&obd_dirty_pages) -
765                    cfs_atomic_read(&obd_dirty_transit_pages) >
766                    obd_max_dirty_pages + 1){
767                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768                  * not covered by a lock thus they may safely race and trip
769                  * this CERROR() unless we add in a small fudge factor (+1). */
770                 CERROR("dirty %d - %d > system dirty_max %d\n",
771                        cfs_atomic_read(&obd_dirty_pages),
772                        cfs_atomic_read(&obd_dirty_transit_pages),
773                        obd_max_dirty_pages);
774                 oa->o_undirty = 0;
775         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776                 CERROR("dirty %lu - dirty_max %lu too big???\n",
777                        cli->cl_dirty, cli->cl_dirty_max);
778                 oa->o_undirty = 0;
779         } else {
780                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781                                 (cli->cl_max_rpcs_in_flight + 1);
782                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
783         }
784         oa->o_grant = cli->cl_avail_grant;
785         oa->o_dropped = cli->cl_lost_grant;
786         cli->cl_lost_grant = 0;
787         client_obd_list_unlock(&cli->cl_loi_list_lock);
788         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
790
791 }
792
793 static void osc_update_next_shrink(struct client_obd *cli)
794 {
795         cli->cl_next_shrink_grant =
796                 cfs_time_shift(cli->cl_grant_shrink_interval);
797         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798                cli->cl_next_shrink_grant);
799 }
800
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803                                     struct brw_page *pga)
804 {
805         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807         cfs_atomic_inc(&obd_dirty_pages);
808         cli->cl_dirty += CFS_PAGE_SIZE;
809         cli->cl_avail_grant -= CFS_PAGE_SIZE;
810         pga->flag |= OBD_BRW_FROM_GRANT;
811         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812                CFS_PAGE_SIZE, pga, pga->pg);
813         LASSERT(cli->cl_avail_grant >= 0);
814         osc_update_next_shrink(cli);
815 }
816
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818  * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga, int sent)
821 {
822         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
823         ENTRY;
824
825         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827                 EXIT;
828                 return;
829         }
830
831         pga->flag &= ~OBD_BRW_FROM_GRANT;
832         cfs_atomic_dec(&obd_dirty_pages);
833         cli->cl_dirty -= CFS_PAGE_SIZE;
834         if (pga->flag & OBD_BRW_NOCACHE) {
835                 pga->flag &= ~OBD_BRW_NOCACHE;
836                 cfs_atomic_dec(&obd_dirty_transit_pages);
837                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
838         }
839         if (!sent) {
840                 cli->cl_lost_grant += CFS_PAGE_SIZE;
841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844                 /* For short writes we shouldn't count parts of pages that
845                  * span a whole block on the OST side, or our accounting goes
846                  * wrong.  Should match the code in filter_grant_check. */
847                 int offset = pga->off & ~CFS_PAGE_MASK;
848                 int count = pga->count + (offset & (blocksize - 1));
849                 int end = (offset + pga->count) & (blocksize - 1);
850                 if (end)
851                         count += blocksize - end;
852
853                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856                        cli->cl_avail_grant, cli->cl_dirty);
857         }
858
859         EXIT;
860 }
861
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
863 {
864         return cli->cl_r_in_flight + cli->cl_w_in_flight;
865 }
866
867 int osc_wake_sync_fs(struct client_obd *cli)
868 {
869         int rc = 0;
870         ENTRY;
871         if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
872             cli->cl_sf_wait.started) {
873                 cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, rc);
874                 cli->cl_sf_wait.started = 0;
875                 CDEBUG(D_CACHE, "sync_fs_loi list is empty\n");
876         }
877         RETURN(rc);
878 }
879
880 /* caller must hold loi_list_lock */
881 void osc_wake_cache_waiters(struct client_obd *cli)
882 {
883         cfs_list_t *l, *tmp;
884         struct osc_cache_waiter *ocw;
885
886         ENTRY;
887         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
888                 /* if we can't dirty more, we must wait until some is written */
889                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
890                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
891                     obd_max_dirty_pages)) {
892                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
893                                "osc max %ld, sys max %d\n", cli->cl_dirty,
894                                cli->cl_dirty_max, obd_max_dirty_pages);
895                         return;
896                 }
897
898                 /* if still dirty cache but no grant wait for pending RPCs that
899                  * may yet return us some grant before doing sync writes */
900                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
901                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
902                                cli->cl_w_in_flight);
903                         return;
904                 }
905
906                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
907                 cfs_list_del_init(&ocw->ocw_entry);
908                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
909                         /* no more RPCs in flight to return grant, do sync IO */
910                         ocw->ocw_rc = -EDQUOT;
911                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
912                 } else {
913                         osc_consume_write_grant(cli,
914                                                 &ocw->ocw_oap->oap_brw_page);
915                 }
916
917                 cfs_waitq_signal(&ocw->ocw_waitq);
918         }
919
920         EXIT;
921 }
922
923 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
924 {
925         client_obd_list_lock(&cli->cl_loi_list_lock);
926         cli->cl_avail_grant += grant;
927         client_obd_list_unlock(&cli->cl_loi_list_lock);
928 }
929
930 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
931 {
932         if (body->oa.o_valid & OBD_MD_FLGRANT) {
933                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
934                 __osc_update_grant(cli, body->oa.o_grant);
935         }
936 }
937
938 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
939                               void *key, obd_count vallen, void *val,
940                               struct ptlrpc_request_set *set);
941
942 static int osc_shrink_grant_interpret(const struct lu_env *env,
943                                       struct ptlrpc_request *req,
944                                       void *aa, int rc)
945 {
946         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
947         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
948         struct ost_body *body;
949
950         if (rc != 0) {
951                 __osc_update_grant(cli, oa->o_grant);
952                 GOTO(out, rc);
953         }
954
955         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
956         LASSERT(body);
957         osc_update_grant(cli, body);
958 out:
959         OBDO_FREE(oa);
960         return rc;
961 }
962
963 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
964 {
965         client_obd_list_lock(&cli->cl_loi_list_lock);
966         oa->o_grant = cli->cl_avail_grant / 4;
967         cli->cl_avail_grant -= oa->o_grant;
968         client_obd_list_unlock(&cli->cl_loi_list_lock);
969         oa->o_flags |= OBD_FL_SHRINK_GRANT;
970         osc_update_next_shrink(cli);
971 }
972
973 /* Shrink the current grant, either from some large amount to enough for a
974  * full set of in-flight RPCs, or if we have already shrunk to that limit
975  * then to enough for a single RPC.  This avoids keeping more grant than
976  * needed, and avoids shrinking the grant piecemeal. */
977 static int osc_shrink_grant(struct client_obd *cli)
978 {
979         long target = (cli->cl_max_rpcs_in_flight + 1) *
980                       cli->cl_max_pages_per_rpc;
981
982         client_obd_list_lock(&cli->cl_loi_list_lock);
983         if (cli->cl_avail_grant <= target)
984                 target = cli->cl_max_pages_per_rpc;
985         client_obd_list_unlock(&cli->cl_loi_list_lock);
986
987         return osc_shrink_grant_to_target(cli, target);
988 }
989
990 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
991 {
992         int    rc = 0;
993         struct ost_body     *body;
994         ENTRY;
995
996         client_obd_list_lock(&cli->cl_loi_list_lock);
997         /* Don't shrink if we are already above or below the desired limit
998          * We don't want to shrink below a single RPC, as that will negatively
999          * impact block allocation and long-term performance. */
1000         if (target < cli->cl_max_pages_per_rpc)
1001                 target = cli->cl_max_pages_per_rpc;
1002
1003         if (target >= cli->cl_avail_grant) {
1004                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1005                 RETURN(0);
1006         }
1007         client_obd_list_unlock(&cli->cl_loi_list_lock);
1008
1009         OBD_ALLOC_PTR(body);
1010         if (!body)
1011                 RETURN(-ENOMEM);
1012
1013         osc_announce_cached(cli, &body->oa, 0);
1014
1015         client_obd_list_lock(&cli->cl_loi_list_lock);
1016         body->oa.o_grant = cli->cl_avail_grant - target;
1017         cli->cl_avail_grant = target;
1018         client_obd_list_unlock(&cli->cl_loi_list_lock);
1019         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1020         osc_update_next_shrink(cli);
1021
1022         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1023                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1024                                 sizeof(*body), body, NULL);
1025         if (rc != 0)
1026                 __osc_update_grant(cli, body->oa.o_grant);
1027         OBD_FREE_PTR(body);
1028         RETURN(rc);
1029 }
1030
1031 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1032 static int osc_should_shrink_grant(struct client_obd *client)
1033 {
1034         cfs_time_t time = cfs_time_current();
1035         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1036
1037         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1038              OBD_CONNECT_GRANT_SHRINK) == 0)
1039                 return 0;
1040
1041         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1042                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1043                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1044                         return 1;
1045                 else
1046                         osc_update_next_shrink(client);
1047         }
1048         return 0;
1049 }
1050
1051 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1052 {
1053         struct client_obd *client;
1054
1055         cfs_list_for_each_entry(client, &item->ti_obd_list,
1056                                 cl_grant_shrink_list) {
1057                 if (osc_should_shrink_grant(client))
1058                         osc_shrink_grant(client);
1059         }
1060         return 0;
1061 }
1062
1063 static int osc_add_shrink_grant(struct client_obd *client)
1064 {
1065         int rc;
1066
1067         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1068                                        TIMEOUT_GRANT,
1069                                        osc_grant_shrink_grant_cb, NULL,
1070                                        &client->cl_grant_shrink_list);
1071         if (rc) {
1072                 CERROR("add grant client %s error %d\n",
1073                         client->cl_import->imp_obd->obd_name, rc);
1074                 return rc;
1075         }
1076         CDEBUG(D_CACHE, "add grant client %s \n",
1077                client->cl_import->imp_obd->obd_name);
1078         osc_update_next_shrink(client);
1079         return 0;
1080 }
1081
1082 static int osc_del_shrink_grant(struct client_obd *client)
1083 {
1084         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1085                                          TIMEOUT_GRANT);
1086 }
1087
1088 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1089 {
1090         /*
1091          * ocd_grant is the total grant amount we're expect to hold: if we've
1092          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1093          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1094          *
1095          * race is tolerable here: if we're evicted, but imp_state already
1096          * left EVICTED state, then cl_dirty must be 0 already.
1097          */
1098         client_obd_list_lock(&cli->cl_loi_list_lock);
1099         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1100                 cli->cl_avail_grant = ocd->ocd_grant;
1101         else
1102                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1103
1104         if (cli->cl_avail_grant < 0) {
1105                 CWARN("%s: available grant < 0, the OSS is probably not running"
1106                       " with patch from bug20278 (%ld) \n",
1107                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1108                 /* workaround for 1.6 servers which do not have 
1109                  * the patch from bug20278 */
1110                 cli->cl_avail_grant = ocd->ocd_grant;
1111         }
1112
1113         client_obd_list_unlock(&cli->cl_loi_list_lock);
1114
1115         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1116                cli->cl_import->imp_obd->obd_name,
1117                cli->cl_avail_grant, cli->cl_lost_grant);
1118
1119         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1120             cfs_list_empty(&cli->cl_grant_shrink_list))
1121                 osc_add_shrink_grant(cli);
1122 }
1123
1124 /* We assume that the reason this OSC got a short read is because it read
1125  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1126  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1127  * this stripe never got written at or beyond this stripe offset yet. */
1128 static void handle_short_read(int nob_read, obd_count page_count,
1129                               struct brw_page **pga)
1130 {
1131         char *ptr;
1132         int i = 0;
1133
1134         /* skip bytes read OK */
1135         while (nob_read > 0) {
1136                 LASSERT (page_count > 0);
1137
1138                 if (pga[i]->count > nob_read) {
1139                         /* EOF inside this page */
1140                         ptr = cfs_kmap(pga[i]->pg) +
1141                                 (pga[i]->off & ~CFS_PAGE_MASK);
1142                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1143                         cfs_kunmap(pga[i]->pg);
1144                         page_count--;
1145                         i++;
1146                         break;
1147                 }
1148
1149                 nob_read -= pga[i]->count;
1150                 page_count--;
1151                 i++;
1152         }
1153
1154         /* zero remaining pages */
1155         while (page_count-- > 0) {
1156                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1157                 memset(ptr, 0, pga[i]->count);
1158                 cfs_kunmap(pga[i]->pg);
1159                 i++;
1160         }
1161 }
1162
1163 static int check_write_rcs(struct ptlrpc_request *req,
1164                            int requested_nob, int niocount,
1165                            obd_count page_count, struct brw_page **pga)
1166 {
1167         int     i;
1168         __u32   *remote_rcs;
1169
1170         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1171                                                   sizeof(*remote_rcs) *
1172                                                   niocount);
1173         if (remote_rcs == NULL) {
1174                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1175                 return(-EPROTO);
1176         }
1177
1178         /* return error if any niobuf was in error */
1179         for (i = 0; i < niocount; i++) {
1180                 if (remote_rcs[i] < 0)
1181                         return(remote_rcs[i]);
1182
1183                 if (remote_rcs[i] != 0) {
1184                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1185                                 i, remote_rcs[i], req);
1186                         return(-EPROTO);
1187                 }
1188         }
1189
1190         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1191                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1192                        req->rq_bulk->bd_nob_transferred, requested_nob);
1193                 return(-EPROTO);
1194         }
1195
1196         return (0);
1197 }
1198
1199 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1200 {
1201         if (p1->flag != p2->flag) {
1202                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1203                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1204
1205                 /* warn if we try to combine flags that we don't know to be
1206                  * safe to combine */
1207                 if ((p1->flag & mask) != (p2->flag & mask))
1208                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1209                                "same brw?\n", p1->flag, p2->flag);
1210                 return 0;
1211         }
1212
1213         return (p1->off + p1->count == p2->off);
1214 }
1215
1216 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1217                                    struct brw_page **pga, int opc,
1218                                    cksum_type_t cksum_type)
1219 {
1220         __u32 cksum;
1221         int i = 0;
1222
1223         LASSERT (pg_count > 0);
1224         cksum = init_checksum(cksum_type);
1225         while (nob > 0 && pg_count > 0) {
1226                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1227                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1228                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1229
1230                 /* corrupt the data before we compute the checksum, to
1231                  * simulate an OST->client data error */
1232                 if (i == 0 && opc == OST_READ &&
1233                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1234                         memcpy(ptr + off, "bad1", min(4, nob));
1235                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1236                 cfs_kunmap(pga[i]->pg);
1237                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1238                                off, cksum);
1239
1240                 nob -= pga[i]->count;
1241                 pg_count--;
1242                 i++;
1243         }
1244         /* For sending we only compute the wrong checksum instead
1245          * of corrupting the data so it is still correct on a redo */
1246         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1247                 cksum++;
1248
1249         return cksum;
1250 }
1251
1252 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1253                                 struct lov_stripe_md *lsm, obd_count page_count,
1254                                 struct brw_page **pga,
1255                                 struct ptlrpc_request **reqp,
1256                                 struct obd_capa *ocapa, int reserve)
1257 {
1258         struct ptlrpc_request   *req;
1259         struct ptlrpc_bulk_desc *desc;
1260         struct ost_body         *body;
1261         struct obd_ioobj        *ioobj;
1262         struct niobuf_remote    *niobuf;
1263         int niocount, i, requested_nob, opc, rc;
1264         struct osc_brw_async_args *aa;
1265         struct req_capsule      *pill;
1266         struct brw_page *pg_prev;
1267
1268         ENTRY;
1269         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1270                 RETURN(-ENOMEM); /* Recoverable */
1271         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1272                 RETURN(-EINVAL); /* Fatal */
1273
1274         if ((cmd & OBD_BRW_WRITE) != 0) {
1275                 opc = OST_WRITE;
1276                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1277                                                 cli->cl_import->imp_rq_pool,
1278                                                 &RQF_OST_BRW_WRITE);
1279         } else {
1280                 opc = OST_READ;
1281                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1282         }
1283         if (req == NULL)
1284                 RETURN(-ENOMEM);
1285
1286         for (niocount = i = 1; i < page_count; i++) {
1287                 if (!can_merge_pages(pga[i - 1], pga[i]))
1288                         niocount++;
1289         }
1290
1291         pill = &req->rq_pill;
1292         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1293                              sizeof(*ioobj));
1294         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1295                              niocount * sizeof(*niobuf));
1296         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1297
1298         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1299         if (rc) {
1300                 ptlrpc_request_free(req);
1301                 RETURN(rc);
1302         }
1303         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1304         ptlrpc_at_set_req_timeout(req);
1305
1306         if (opc == OST_WRITE)
1307                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1308                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1309         else
1310                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1311                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1312
1313         if (desc == NULL)
1314                 GOTO(out, rc = -ENOMEM);
1315         /* NB request now owns desc and will free it when it gets freed */
1316
1317         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1318         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1319         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1320         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1321
1322         lustre_set_wire_obdo(&body->oa, oa);
1323
1324         obdo_to_ioobj(oa, ioobj);
1325         ioobj->ioo_bufcnt = niocount;
1326         osc_pack_capa(req, body, ocapa);
1327         LASSERT (page_count > 0);
1328         pg_prev = pga[0];
1329         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1330                 struct brw_page *pg = pga[i];
1331
1332                 LASSERT(pg->count > 0);
1333                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1334                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1335                          pg->off, pg->count);
1336 #ifdef __linux__
1337                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1338                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1339                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1340                          i, page_count,
1341                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1342                          pg_prev->pg, page_private(pg_prev->pg),
1343                          pg_prev->pg->index, pg_prev->off);
1344 #else
1345                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1346                          "i %d p_c %u\n", i, page_count);
1347 #endif
1348                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1349                         (pg->flag & OBD_BRW_SRVLOCK));
1350
1351                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1352                                       pg->count);
1353                 requested_nob += pg->count;
1354
1355                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1356                         niobuf--;
1357                         niobuf->len += pg->count;
1358                 } else {
1359                         niobuf->offset = pg->off;
1360                         niobuf->len    = pg->count;
1361                         niobuf->flags  = pg->flag;
1362                 }
1363                 pg_prev = pg;
1364         }
1365
1366         LASSERTF((void *)(niobuf - niocount) ==
1367                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1368                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1369                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1370
1371         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1372         if (osc_should_shrink_grant(cli))
1373                 osc_shrink_grant_local(cli, &body->oa);
1374
1375         /* size[REQ_REC_OFF] still sizeof (*body) */
1376         if (opc == OST_WRITE) {
1377                 if (unlikely(cli->cl_checksum) &&
1378                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1379                         /* store cl_cksum_type in a local variable since
1380                          * it can be changed via lprocfs */
1381                         cksum_type_t cksum_type = cli->cl_cksum_type;
1382
1383                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1384                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1385                                 body->oa.o_flags = 0;
1386                         }
1387                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1388                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1389                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1390                                                              page_count, pga,
1391                                                              OST_WRITE,
1392                                                              cksum_type);
1393                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1394                                body->oa.o_cksum);
1395                         /* save this in 'oa', too, for later checking */
1396                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1397                         oa->o_flags |= cksum_type_pack(cksum_type);
1398                 } else {
1399                         /* clear out the checksum flag, in case this is a
1400                          * resend but cl_checksum is no longer set. b=11238 */
1401                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1402                 }
1403                 oa->o_cksum = body->oa.o_cksum;
1404                 /* 1 RC per niobuf */
1405                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1406                                      sizeof(__u32) * niocount);
1407         } else {
1408                 if (unlikely(cli->cl_checksum) &&
1409                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1410                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1411                                 body->oa.o_flags = 0;
1412                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1413                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1414                 }
1415         }
1416         ptlrpc_request_set_replen(req);
1417
1418         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1419         aa = ptlrpc_req_async_args(req);
1420         aa->aa_oa = oa;
1421         aa->aa_requested_nob = requested_nob;
1422         aa->aa_nio_count = niocount;
1423         aa->aa_page_count = page_count;
1424         aa->aa_resends = 0;
1425         aa->aa_ppga = pga;
1426         aa->aa_cli = cli;
1427         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1428         if (ocapa && reserve)
1429                 aa->aa_ocapa = capa_get(ocapa);
1430
1431         *reqp = req;
1432         RETURN(0);
1433
1434  out:
1435         ptlrpc_req_finished(req);
1436         RETURN(rc);
1437 }
1438
1439 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1440                                 __u32 client_cksum, __u32 server_cksum, int nob,
1441                                 obd_count page_count, struct brw_page **pga,
1442                                 cksum_type_t client_cksum_type)
1443 {
1444         __u32 new_cksum;
1445         char *msg;
1446         cksum_type_t cksum_type;
1447
1448         if (server_cksum == client_cksum) {
1449                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1450                 return 0;
1451         }
1452
1453         /* If this is mmaped file - it can be changed at any time */
1454         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1455                 return 1;
1456
1457         if (oa->o_valid & OBD_MD_FLFLAGS)
1458                 cksum_type = cksum_type_unpack(oa->o_flags);
1459         else
1460                 cksum_type = OBD_CKSUM_CRC32;
1461
1462         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1463                                       cksum_type);
1464
1465         if (cksum_type != client_cksum_type)
1466                 msg = "the server did not use the checksum type specified in "
1467                       "the original request - likely a protocol problem";
1468         else if (new_cksum == server_cksum)
1469                 msg = "changed on the client after we checksummed it - "
1470                       "likely false positive due to mmap IO (bug 11742)";
1471         else if (new_cksum == client_cksum)
1472                 msg = "changed in transit before arrival at OST";
1473         else
1474                 msg = "changed in transit AND doesn't match the original - "
1475                       "likely false positive due to mmap IO (bug 11742)";
1476
1477         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1478                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1479                            msg, libcfs_nid2str(peer->nid),
1480                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1481                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1482                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1483                            oa->o_id,
1484                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1485                            pga[0]->off,
1486                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1487         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1488                "client csum now %x\n", client_cksum, client_cksum_type,
1489                server_cksum, cksum_type, new_cksum);
1490         return 1;
1491 }
1492
1493 /* Note rc enters this function as number of bytes transferred */
1494 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1495 {
1496         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1497         const lnet_process_id_t *peer =
1498                         &req->rq_import->imp_connection->c_peer;
1499         struct client_obd *cli = aa->aa_cli;
1500         struct ost_body *body;
1501         __u32 client_cksum = 0;
1502         ENTRY;
1503
1504         if (rc < 0 && rc != -EDQUOT) {
1505                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1506                 RETURN(rc);
1507         }
1508
1509         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1510         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1511         if (body == NULL) {
1512                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1513                 RETURN(-EPROTO);
1514         }
1515
1516 #ifdef HAVE_QUOTA_SUPPORT
1517         /* set/clear over quota flag for a uid/gid */
1518         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1519             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1520                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1521
1522                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1523                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1524                        body->oa.o_flags);
1525                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1526                              body->oa.o_flags);
1527         }
1528 #endif
1529
1530         osc_update_grant(cli, body);
1531
1532         if (rc < 0)
1533                 RETURN(rc);
1534
1535         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1536                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1537
1538         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1539                 if (rc > 0) {
1540                         CERROR("Unexpected +ve rc %d\n", rc);
1541                         RETURN(-EPROTO);
1542                 }
1543                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1544
1545                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1546                         RETURN(-EAGAIN);
1547
1548                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1549                     check_write_checksum(&body->oa, peer, client_cksum,
1550                                          body->oa.o_cksum, aa->aa_requested_nob,
1551                                          aa->aa_page_count, aa->aa_ppga,
1552                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1553                         RETURN(-EAGAIN);
1554
1555                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1556                                      aa->aa_page_count, aa->aa_ppga);
1557                 GOTO(out, rc);
1558         }
1559
1560         /* The rest of this function executes only for OST_READs */
1561
1562         /* if unwrap_bulk failed, return -EAGAIN to retry */
1563         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1564         if (rc < 0)
1565                 GOTO(out, rc = -EAGAIN);
1566
1567         if (rc > aa->aa_requested_nob) {
1568                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1569                        aa->aa_requested_nob);
1570                 RETURN(-EPROTO);
1571         }
1572
1573         if (rc != req->rq_bulk->bd_nob_transferred) {
1574                 CERROR ("Unexpected rc %d (%d transferred)\n",
1575                         rc, req->rq_bulk->bd_nob_transferred);
1576                 return (-EPROTO);
1577         }
1578
1579         if (rc < aa->aa_requested_nob)
1580                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1581
1582         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1583                 static int cksum_counter;
1584                 __u32      server_cksum = body->oa.o_cksum;
1585                 char      *via;
1586                 char      *router;
1587                 cksum_type_t cksum_type;
1588
1589                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1590                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1591                 else
1592                         cksum_type = OBD_CKSUM_CRC32;
1593                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1594                                                  aa->aa_ppga, OST_READ,
1595                                                  cksum_type);
1596
1597                 if (peer->nid == req->rq_bulk->bd_sender) {
1598                         via = router = "";
1599                 } else {
1600                         via = " via ";
1601                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1602                 }
1603
1604                 if (server_cksum == ~0 && rc > 0) {
1605                         CERROR("Protocol error: server %s set the 'checksum' "
1606                                "bit, but didn't send a checksum.  Not fatal, "
1607                                "but please notify on http://bugzilla.lustre.org/\n",
1608                                libcfs_nid2str(peer->nid));
1609                 } else if (server_cksum != client_cksum) {
1610                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1611                                            "%s%s%s inode "DFID" object "
1612                                            LPU64"/"LPU64" extent "
1613                                            "["LPU64"-"LPU64"]\n",
1614                                            req->rq_import->imp_obd->obd_name,
1615                                            libcfs_nid2str(peer->nid),
1616                                            via, router,
1617                                            body->oa.o_valid & OBD_MD_FLFID ?
1618                                                 body->oa.o_parent_seq : (__u64)0,
1619                                            body->oa.o_valid & OBD_MD_FLFID ?
1620                                                 body->oa.o_parent_oid : 0,
1621                                            body->oa.o_valid & OBD_MD_FLFID ?
1622                                                 body->oa.o_parent_ver : 0,
1623                                            body->oa.o_id,
1624                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1625                                                 body->oa.o_seq : (__u64)0,
1626                                            aa->aa_ppga[0]->off,
1627                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1628                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1629                                                                         1);
1630                         CERROR("client %x, server %x, cksum_type %x\n",
1631                                client_cksum, server_cksum, cksum_type);
1632                         cksum_counter = 0;
1633                         aa->aa_oa->o_cksum = client_cksum;
1634                         rc = -EAGAIN;
1635                 } else {
1636                         cksum_counter++;
1637                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1638                         rc = 0;
1639                 }
1640         } else if (unlikely(client_cksum)) {
1641                 static int cksum_missed;
1642
1643                 cksum_missed++;
1644                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1645                         CERROR("Checksum %u requested from %s but not sent\n",
1646                                cksum_missed, libcfs_nid2str(peer->nid));
1647         } else {
1648                 rc = 0;
1649         }
1650 out:
1651         if (rc >= 0)
1652                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1653
1654         RETURN(rc);
1655 }
1656
1657 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1658                             struct lov_stripe_md *lsm,
1659                             obd_count page_count, struct brw_page **pga,
1660                             struct obd_capa *ocapa)
1661 {
1662         struct ptlrpc_request *req;
1663         int                    rc;
1664         cfs_waitq_t            waitq;
1665         int                    resends = 0;
1666         struct l_wait_info     lwi;
1667
1668         ENTRY;
1669
1670         cfs_waitq_init(&waitq);
1671
1672 restart_bulk:
1673         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1674                                   page_count, pga, &req, ocapa, 0);
1675         if (rc != 0)
1676                 return (rc);
1677
1678         rc = ptlrpc_queue_wait(req);
1679
1680         if (rc == -ETIMEDOUT && req->rq_resend) {
1681                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1682                 ptlrpc_req_finished(req);
1683                 goto restart_bulk;
1684         }
1685
1686         rc = osc_brw_fini_request(req, rc);
1687
1688         ptlrpc_req_finished(req);
1689         if (osc_recoverable_error(rc)) {
1690                 resends++;
1691                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1692                         CERROR("too many resend retries, returning error\n");
1693                         RETURN(-EIO);
1694                 }
1695
1696                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1697                 l_wait_event(waitq, 0, &lwi);
1698
1699                 goto restart_bulk;
1700         }
1701
1702         RETURN (rc);
1703 }
1704
1705 int osc_brw_redo_request(struct ptlrpc_request *request,
1706                          struct osc_brw_async_args *aa)
1707 {
1708         struct ptlrpc_request *new_req;
1709         struct ptlrpc_request_set *set = request->rq_set;
1710         struct osc_brw_async_args *new_aa;
1711         struct osc_async_page *oap;
1712         int rc = 0;
1713         ENTRY;
1714
1715         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1716                 CERROR("too many resent retries, returning error\n");
1717                 RETURN(-EIO);
1718         }
1719
1720         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1721
1722         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1723                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1724                                   aa->aa_cli, aa->aa_oa,
1725                                   NULL /* lsm unused by osc currently */,
1726                                   aa->aa_page_count, aa->aa_ppga,
1727                                   &new_req, aa->aa_ocapa, 0);
1728         if (rc)
1729                 RETURN(rc);
1730
1731         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1732
1733         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1734                 if (oap->oap_request != NULL) {
1735                         LASSERTF(request == oap->oap_request,
1736                                  "request %p != oap_request %p\n",
1737                                  request, oap->oap_request);
1738                         if (oap->oap_interrupted) {
1739                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1740                                 ptlrpc_req_finished(new_req);
1741                                 RETURN(-EINTR);
1742                         }
1743                 }
1744         }
1745         /* New request takes over pga and oaps from old request.
1746          * Note that copying a list_head doesn't work, need to move it... */
1747         aa->aa_resends++;
1748         new_req->rq_interpret_reply = request->rq_interpret_reply;
1749         new_req->rq_async_args = request->rq_async_args;
1750         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1751
1752         new_aa = ptlrpc_req_async_args(new_req);
1753
1754         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1755         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1756         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1757
1758         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1759                 if (oap->oap_request) {
1760                         ptlrpc_req_finished(oap->oap_request);
1761                         oap->oap_request = ptlrpc_request_addref(new_req);
1762                 }
1763         }
1764
1765         new_aa->aa_ocapa = aa->aa_ocapa;
1766         aa->aa_ocapa = NULL;
1767
1768         /* use ptlrpc_set_add_req is safe because interpret functions work
1769          * in check_set context. only one way exist with access to request
1770          * from different thread got -EINTR - this way protected with
1771          * cl_loi_list_lock */
1772         ptlrpc_set_add_req(set, new_req);
1773
1774         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1775
1776         DEBUG_REQ(D_INFO, new_req, "new request");
1777         RETURN(0);
1778 }
1779
1780 /*
1781  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1782  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1783  * fine for our small page arrays and doesn't require allocation.  its an
1784  * insertion sort that swaps elements that are strides apart, shrinking the
1785  * stride down until its '1' and the array is sorted.
1786  */
1787 static void sort_brw_pages(struct brw_page **array, int num)
1788 {
1789         int stride, i, j;
1790         struct brw_page *tmp;
1791
1792         if (num == 1)
1793                 return;
1794         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1795                 ;
1796
1797         do {
1798                 stride /= 3;
1799                 for (i = stride ; i < num ; i++) {
1800                         tmp = array[i];
1801                         j = i;
1802                         while (j >= stride && array[j - stride]->off > tmp->off) {
1803                                 array[j] = array[j - stride];
1804                                 j -= stride;
1805                         }
1806                         array[j] = tmp;
1807                 }
1808         } while (stride > 1);
1809 }
1810
1811 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1812 {
1813         int count = 1;
1814         int offset;
1815         int i = 0;
1816
1817         LASSERT (pages > 0);
1818         offset = pg[i]->off & ~CFS_PAGE_MASK;
1819
1820         for (;;) {
1821                 pages--;
1822                 if (pages == 0)         /* that's all */
1823                         return count;
1824
1825                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1826                         return count;   /* doesn't end on page boundary */
1827
1828                 i++;
1829                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1830                 if (offset != 0)        /* doesn't start on page boundary */
1831                         return count;
1832
1833                 count++;
1834         }
1835 }
1836
1837 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1838 {
1839         struct brw_page **ppga;
1840         int i;
1841
1842         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1843         if (ppga == NULL)
1844                 return NULL;
1845
1846         for (i = 0; i < count; i++)
1847                 ppga[i] = pga + i;
1848         return ppga;
1849 }
1850
1851 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1852 {
1853         LASSERT(ppga != NULL);
1854         OBD_FREE(ppga, sizeof(*ppga) * count);
1855 }
1856
1857 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1858                    obd_count page_count, struct brw_page *pga,
1859                    struct obd_trans_info *oti)
1860 {
1861         struct obdo *saved_oa = NULL;
1862         struct brw_page **ppga, **orig;
1863         struct obd_import *imp = class_exp2cliimp(exp);
1864         struct client_obd *cli;
1865         int rc, page_count_orig;
1866         ENTRY;
1867
1868         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1869         cli = &imp->imp_obd->u.cli;
1870
1871         if (cmd & OBD_BRW_CHECK) {
1872                 /* The caller just wants to know if there's a chance that this
1873                  * I/O can succeed */
1874
1875                 if (imp->imp_invalid)
1876                         RETURN(-EIO);
1877                 RETURN(0);
1878         }
1879
1880         /* test_brw with a failed create can trip this, maybe others. */
1881         LASSERT(cli->cl_max_pages_per_rpc);
1882
1883         rc = 0;
1884
1885         orig = ppga = osc_build_ppga(pga, page_count);
1886         if (ppga == NULL)
1887                 RETURN(-ENOMEM);
1888         page_count_orig = page_count;
1889
1890         sort_brw_pages(ppga, page_count);
1891         while (page_count) {
1892                 obd_count pages_per_brw;
1893
1894                 if (page_count > cli->cl_max_pages_per_rpc)
1895                         pages_per_brw = cli->cl_max_pages_per_rpc;
1896                 else
1897                         pages_per_brw = page_count;
1898
1899                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1900
1901                 if (saved_oa != NULL) {
1902                         /* restore previously saved oa */
1903                         *oinfo->oi_oa = *saved_oa;
1904                 } else if (page_count > pages_per_brw) {
1905                         /* save a copy of oa (brw will clobber it) */
1906                         OBDO_ALLOC(saved_oa);
1907                         if (saved_oa == NULL)
1908                                 GOTO(out, rc = -ENOMEM);
1909                         *saved_oa = *oinfo->oi_oa;
1910                 }
1911
1912                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1913                                       pages_per_brw, ppga, oinfo->oi_capa);
1914
1915                 if (rc != 0)
1916                         break;
1917
1918                 page_count -= pages_per_brw;
1919                 ppga += pages_per_brw;
1920         }
1921
1922 out:
1923         osc_release_ppga(orig, page_count_orig);
1924
1925         if (saved_oa != NULL)
1926                 OBDO_FREE(saved_oa);
1927
1928         RETURN(rc);
1929 }
1930
1931 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1932  * the dirty accounting.  Writeback completes or truncate happens before
1933  * writing starts.  Must be called with the loi lock held. */
1934 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1935                            int sent)
1936 {
1937         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1938 }
1939
1940 static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
1941 {
1942         struct osc_async_page *oap;
1943         ENTRY;
1944
1945         if (cfs_list_empty(&lop->lop_urgent))
1946                 RETURN(0);
1947
1948         oap = cfs_list_entry(lop->lop_urgent.next,
1949                              struct osc_async_page, oap_urgent_item);
1950
1951         if (oap->oap_async_flags & ASYNC_SYNCFS) {
1952                 CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
1953                 RETURN(1);
1954         }
1955
1956         RETURN(0);
1957 }
1958
1959 /* This maintains the lists of pending pages to read/write for a given object
1960  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1961  * to quickly find objects that are ready to send an RPC. */
1962 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1963                          int cmd)
1964 {
1965         int optimal;
1966         ENTRY;
1967
1968         if (lop->lop_num_pending == 0)
1969                 RETURN(0);
1970
1971         /* if we have an invalid import we want to drain the queued pages
1972          * by forcing them through rpcs that immediately fail and complete
1973          * the pages.  recovery relies on this to empty the queued pages
1974          * before canceling the locks and evicting down the llite pages */
1975         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1976                 RETURN(1);
1977
1978         /* stream rpcs in queue order as long as as there is an urgent page
1979          * queued.  this is our cheap solution for good batching in the case
1980          * where writepage marks some random page in the middle of the file
1981          * as urgent because of, say, memory pressure */
1982         if (!cfs_list_empty(&lop->lop_urgent)) {
1983                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1984                 RETURN(1);
1985         }
1986         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1987         optimal = cli->cl_max_pages_per_rpc;
1988         if (cmd & OBD_BRW_WRITE) {
1989                 /* trigger a write rpc stream as long as there are dirtiers
1990                  * waiting for space.  as they're waiting, they're not going to
1991                  * create more pages to coalesce with what's waiting.. */
1992                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1993                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1994                         RETURN(1);
1995                 }
1996                 /* +16 to avoid triggering rpcs that would want to include pages
1997                  * that are being queued but which can't be made ready until
1998                  * the queuer finishes with the page. this is a wart for
1999                  * llite::commit_write() */
2000                 optimal += 16;
2001         }
2002         if (lop->lop_num_pending >= optimal)
2003                 RETURN(1);
2004
2005         RETURN(0);
2006 }
2007
2008 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2009 {
2010         struct osc_async_page *oap;
2011         ENTRY;
2012
2013         if (cfs_list_empty(&lop->lop_urgent))
2014                 RETURN(0);
2015
2016         oap = cfs_list_entry(lop->lop_urgent.next,
2017                          struct osc_async_page, oap_urgent_item);
2018
2019         if (oap->oap_async_flags & ASYNC_HP) {
2020                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2021                 RETURN(1);
2022         }
2023
2024         RETURN(0);
2025 }
2026
2027 static void on_list(cfs_list_t *item, cfs_list_t *list,
2028                     int should_be_on)
2029 {
2030         if (cfs_list_empty(item) && should_be_on)
2031                 cfs_list_add_tail(item, list);
2032         else if (!cfs_list_empty(item) && !should_be_on)
2033                 cfs_list_del_init(item);
2034 }
2035
2036 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2037  * can find pages to build into rpcs quickly */
2038 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2039 {
2040         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2041             lop_makes_hprpc(&loi->loi_read_lop)) {
2042                 /* HP rpc */
2043                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2044                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2045         } else {
2046                 if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
2047                         on_list(&loi->loi_sync_fs_item,
2048                                 &cli->cl_loi_sync_fs_list,
2049                                 loi->loi_write_lop.lop_num_pending);
2050                 } else {
2051                         on_list(&loi->loi_hp_ready_item,
2052                                 &cli->cl_loi_hp_ready_list, 0);
2053                         on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2054                                 lop_makes_rpc(cli, &loi->loi_write_lop,
2055                                               OBD_BRW_WRITE)||
2056                                 lop_makes_rpc(cli, &loi->loi_read_lop,
2057                                               OBD_BRW_READ));
2058                 }
2059         }
2060
2061         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2062                 loi->loi_write_lop.lop_num_pending);
2063
2064         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2065                 loi->loi_read_lop.lop_num_pending);
2066 }
2067
2068 static void lop_update_pending(struct client_obd *cli,
2069                                struct loi_oap_pages *lop, int cmd, int delta)
2070 {
2071         lop->lop_num_pending += delta;
2072         if (cmd & OBD_BRW_WRITE)
2073                 cli->cl_pending_w_pages += delta;
2074         else
2075                 cli->cl_pending_r_pages += delta;
2076 }
2077
2078 /**
2079  * this is called when a sync waiter receives an interruption.  Its job is to
2080  * get the caller woken as soon as possible.  If its page hasn't been put in an
2081  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2082  * desiring interruption which will forcefully complete the rpc once the rpc
2083  * has timed out.
2084  */
2085 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2086 {
2087         struct loi_oap_pages *lop;
2088         struct lov_oinfo *loi;
2089         int rc = -EBUSY;
2090         ENTRY;
2091
2092         LASSERT(!oap->oap_interrupted);
2093         oap->oap_interrupted = 1;
2094
2095         /* ok, it's been put in an rpc. only one oap gets a request reference */
2096         if (oap->oap_request != NULL) {
2097                 ptlrpc_mark_interrupted(oap->oap_request);
2098                 ptlrpcd_wake(oap->oap_request);
2099                 ptlrpc_req_finished(oap->oap_request);
2100                 oap->oap_request = NULL;
2101         }
2102
2103         /*
2104          * page completion may be called only if ->cpo_prep() method was
2105          * executed by osc_io_submit(), that also adds page the to pending list
2106          */
2107         if (!cfs_list_empty(&oap->oap_pending_item)) {
2108                 cfs_list_del_init(&oap->oap_pending_item);
2109                 cfs_list_del_init(&oap->oap_urgent_item);
2110
2111                 loi = oap->oap_loi;
2112                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2113                         &loi->loi_write_lop : &loi->loi_read_lop;
2114                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2115                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2116                 rc = oap->oap_caller_ops->ap_completion(env,
2117                                           oap->oap_caller_data,
2118                                           oap->oap_cmd, NULL, -EINTR);
2119         }
2120
2121         RETURN(rc);
2122 }
2123
2124 /* this is trying to propogate async writeback errors back up to the
2125  * application.  As an async write fails we record the error code for later if
2126  * the app does an fsync.  As long as errors persist we force future rpcs to be
2127  * sync so that the app can get a sync error and break the cycle of queueing
2128  * pages for which writeback will fail. */
2129 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2130                            int rc)
2131 {
2132         if (rc) {
2133                 if (!ar->ar_rc)
2134                         ar->ar_rc = rc;
2135
2136                 ar->ar_force_sync = 1;
2137                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2138                 return;
2139
2140         }
2141
2142         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2143                 ar->ar_force_sync = 0;
2144 }
2145
2146 void osc_oap_to_pending(struct osc_async_page *oap)
2147 {
2148         struct loi_oap_pages *lop;
2149
2150         if (oap->oap_cmd & OBD_BRW_WRITE)
2151                 lop = &oap->oap_loi->loi_write_lop;
2152         else
2153                 lop = &oap->oap_loi->loi_read_lop;
2154
2155         if (oap->oap_async_flags & ASYNC_HP)
2156                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2157         else if (oap->oap_async_flags & ASYNC_URGENT)
2158                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2159         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2160         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2161 }
2162
2163 /* this must be called holding the loi list lock to give coverage to exit_cache,
2164  * async_flag maintenance, and oap_request */
2165 static void osc_ap_completion(const struct lu_env *env,
2166                               struct client_obd *cli, struct obdo *oa,
2167                               struct osc_async_page *oap, int sent, int rc)
2168 {
2169         __u64 xid = 0;
2170
2171         ENTRY;
2172         if (oap->oap_request != NULL) {
2173                 xid = ptlrpc_req_xid(oap->oap_request);
2174                 ptlrpc_req_finished(oap->oap_request);
2175                 oap->oap_request = NULL;
2176         }
2177
2178         cfs_spin_lock(&oap->oap_lock);
2179         oap->oap_async_flags = 0;
2180         cfs_spin_unlock(&oap->oap_lock);
2181         oap->oap_interrupted = 0;
2182
2183         if (oap->oap_cmd & OBD_BRW_WRITE) {
2184                 osc_process_ar(&cli->cl_ar, xid, rc);
2185                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2186         }
2187
2188         if (rc == 0 && oa != NULL) {
2189                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2190                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2191                 if (oa->o_valid & OBD_MD_FLMTIME)
2192                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2193                 if (oa->o_valid & OBD_MD_FLATIME)
2194                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2195                 if (oa->o_valid & OBD_MD_FLCTIME)
2196                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2197         }
2198
2199         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2200                                                 oap->oap_cmd, oa, rc);
2201
2202         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2203          * I/O on the page could start, but OSC calls it under lock
2204          * and thus we can add oap back to pending safely */
2205         if (rc)
2206                 /* upper layer wants to leave the page on pending queue */
2207                 osc_oap_to_pending(oap);
2208         else
2209                 osc_exit_cache(cli, oap, sent);
2210         EXIT;
2211 }
2212
2213 static int brw_interpret(const struct lu_env *env,
2214                          struct ptlrpc_request *req, void *data, int rc)
2215 {
2216         struct osc_brw_async_args *aa = data;
2217         struct client_obd *cli;
2218         int async;
2219         ENTRY;
2220
2221         rc = osc_brw_fini_request(req, rc);
2222         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2223         if (osc_recoverable_error(rc)) {
2224                 /* Only retry once for mmaped files since the mmaped page
2225                  * might be modified at anytime. We have to retry at least
2226                  * once in case there WAS really a corruption of the page
2227                  * on the network, that was not caused by mmap() modifying
2228                  * the page. Bug11742 */
2229                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2230                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2231                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2232                         rc = 0;
2233                 } else {
2234                         rc = osc_brw_redo_request(req, aa);
2235                         if (rc == 0)
2236                                 RETURN(0);
2237                 }
2238         }
2239
2240         if (aa->aa_ocapa) {
2241                 capa_put(aa->aa_ocapa);
2242                 aa->aa_ocapa = NULL;
2243         }
2244
2245         cli = aa->aa_cli;
2246
2247         client_obd_list_lock(&cli->cl_loi_list_lock);
2248
2249         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2250          * is called so we know whether to go to sync BRWs or wait for more
2251          * RPCs to complete */
2252         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2253                 cli->cl_w_in_flight--;
2254         else
2255                 cli->cl_r_in_flight--;
2256
2257         async = cfs_list_empty(&aa->aa_oaps);
2258         if (!async) { /* from osc_send_oap_rpc() */
2259                 struct osc_async_page *oap, *tmp;
2260                 /* the caller may re-use the oap after the completion call so
2261                  * we need to clean it up a little */
2262                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2263                                              oap_rpc_item) {
2264                         cfs_list_del_init(&oap->oap_rpc_item);
2265                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2266                 }
2267                 OBDO_FREE(aa->aa_oa);
2268         } else { /* from async_internal() */
2269                 obd_count i;
2270                 for (i = 0; i < aa->aa_page_count; i++)
2271                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2272         }
2273         osc_wake_cache_waiters(cli);
2274         osc_wake_sync_fs(cli);
2275         osc_check_rpcs(env, cli);
2276         client_obd_list_unlock(&cli->cl_loi_list_lock);
2277         if (!async)
2278                 cl_req_completion(env, aa->aa_clerq, rc);
2279         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2280
2281         RETURN(rc);
2282 }
2283
2284 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2285                                             struct client_obd *cli,
2286                                             cfs_list_t *rpc_list,
2287                                             int page_count, int cmd)
2288 {
2289         struct ptlrpc_request *req;
2290         struct brw_page **pga = NULL;
2291         struct osc_brw_async_args *aa;
2292         struct obdo *oa = NULL;
2293         const struct obd_async_page_ops *ops = NULL;
2294         void *caller_data = NULL;
2295         struct osc_async_page *oap;
2296         struct osc_async_page *tmp;
2297         struct ost_body *body;
2298         struct cl_req *clerq = NULL;
2299         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2300         struct ldlm_lock *lock = NULL;
2301         struct cl_req_attr crattr;
2302         int i, rc, mpflag = 0;
2303
2304         ENTRY;
2305         LASSERT(!cfs_list_empty(rpc_list));
2306
2307         if (cmd & OBD_BRW_MEMALLOC)
2308                 mpflag = cfs_memory_pressure_get_and_set();
2309
2310         memset(&crattr, 0, sizeof crattr);
2311         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2312         if (pga == NULL)
2313                 GOTO(out, req = ERR_PTR(-ENOMEM));
2314
2315         OBDO_ALLOC(oa);
2316         if (oa == NULL)
2317                 GOTO(out, req = ERR_PTR(-ENOMEM));
2318
2319         i = 0;
2320         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2321                 struct cl_page *page = osc_oap2cl_page(oap);
2322                 if (ops == NULL) {
2323                         ops = oap->oap_caller_ops;
2324                         caller_data = oap->oap_caller_data;
2325
2326                         clerq = cl_req_alloc(env, page, crt,
2327                                              1 /* only 1-object rpcs for
2328                                                 * now */);
2329                         if (IS_ERR(clerq))
2330                                 GOTO(out, req = (void *)clerq);
2331                         lock = oap->oap_ldlm_lock;
2332                 }
2333                 pga[i] = &oap->oap_brw_page;
2334                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2335                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2336                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2337                 i++;
2338                 cl_req_page_add(env, clerq, page);
2339         }
2340
2341         /* always get the data for the obdo for the rpc */
2342         LASSERT(ops != NULL);
2343         crattr.cra_oa = oa;
2344         crattr.cra_capa = NULL;
2345         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2346         if (lock) {
2347                 oa->o_handle = lock->l_remote_handle;
2348                 oa->o_valid |= OBD_MD_FLHANDLE;
2349         }
2350
2351         rc = cl_req_prep(env, clerq);
2352         if (rc != 0) {
2353                 CERROR("cl_req_prep failed: %d\n", rc);
2354                 GOTO(out, req = ERR_PTR(rc));
2355         }
2356
2357         sort_brw_pages(pga, page_count);
2358         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2359                                   pga, &req, crattr.cra_capa, 1);
2360         if (rc != 0) {
2361                 CERROR("prep_req failed: %d\n", rc);
2362                 GOTO(out, req = ERR_PTR(rc));
2363         }
2364
2365         if (cmd & OBD_BRW_MEMALLOC)
2366                 req->rq_memalloc = 1;
2367
2368         /* Need to update the timestamps after the request is built in case
2369          * we race with setattr (locally or in queue at OST).  If OST gets
2370          * later setattr before earlier BRW (as determined by the request xid),
2371          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2372          * way to do this in a single call.  bug 10150 */
2373         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2374         cl_req_attr_set(env, clerq, &crattr,
2375                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2376
2377         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2378         aa = ptlrpc_req_async_args(req);
2379         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2380         cfs_list_splice(rpc_list, &aa->aa_oaps);
2381         CFS_INIT_LIST_HEAD(rpc_list);
2382         aa->aa_clerq = clerq;
2383 out:
2384         if (cmd & OBD_BRW_MEMALLOC)
2385                 cfs_memory_pressure_restore(mpflag);
2386
2387         capa_put(crattr.cra_capa);
2388         if (IS_ERR(req)) {
2389                 if (oa)
2390                         OBDO_FREE(oa);
2391                 if (pga)
2392                         OBD_FREE(pga, sizeof(*pga) * page_count);
2393                 /* this should happen rarely and is pretty bad, it makes the
2394                  * pending list not follow the dirty order */
2395                 client_obd_list_lock(&cli->cl_loi_list_lock);
2396                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2397                         cfs_list_del_init(&oap->oap_rpc_item);
2398
2399                         /* queued sync pages can be torn down while the pages
2400                          * were between the pending list and the rpc */
2401                         if (oap->oap_interrupted) {
2402                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2403                                 osc_ap_completion(env, cli, NULL, oap, 0,
2404                                                   oap->oap_count);
2405                                 continue;
2406                         }
2407                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2408                 }
2409                 if (clerq && !IS_ERR(clerq))
2410                         cl_req_completion(env, clerq, PTR_ERR(req));
2411         }
2412         RETURN(req);
2413 }
2414
2415 /**
2416  * prepare pages for ASYNC io and put pages in send queue.
2417  *
2418  * \param cmd OBD_BRW_* macroses
2419  * \param lop pending pages
2420  *
2421  * \return zero if no page added to send queue.
2422  * \return 1 if pages successfully added to send queue.
2423  * \return negative on errors.
2424  */
2425 static int
2426 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2427                  struct lov_oinfo *loi,
2428                  int cmd, struct loi_oap_pages *lop)
2429 {
2430         struct ptlrpc_request *req;
2431         obd_count page_count = 0;
2432         struct osc_async_page *oap = NULL, *tmp;
2433         struct osc_brw_async_args *aa;
2434         const struct obd_async_page_ops *ops;
2435         CFS_LIST_HEAD(rpc_list);
2436         CFS_LIST_HEAD(tmp_list);
2437         unsigned int ending_offset;
2438         unsigned  starting_offset = 0;
2439         int srvlock = 0, mem_tight = 0;
2440         struct cl_object *clob = NULL;
2441         ENTRY;
2442
2443         /* ASYNC_HP pages first. At present, when the lock the pages is
2444          * to be canceled, the pages covered by the lock will be sent out
2445          * with ASYNC_HP. We have to send out them as soon as possible. */
2446         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2447                 if (oap->oap_async_flags & ASYNC_HP)
2448                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2449                 else
2450                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2451                 if (++page_count >= cli->cl_max_pages_per_rpc)
2452                         break;
2453         }
2454
2455         cfs_list_splice(&tmp_list, &lop->lop_pending);
2456         page_count = 0;
2457
2458         /* first we find the pages we're allowed to work with */
2459         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2460                                      oap_pending_item) {
2461                 ops = oap->oap_caller_ops;
2462
2463                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2464                          "magic 0x%x\n", oap, oap->oap_magic);
2465
2466                 if (clob == NULL) {
2467                         /* pin object in memory, so that completion call-backs
2468                          * can be safely called under client_obd_list lock. */
2469                         clob = osc_oap2cl_page(oap)->cp_obj;
2470                         cl_object_get(clob);
2471                 }
2472
2473                 if (page_count != 0 &&
2474                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2475                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2476                                " oap %p, page %p, srvlock %u\n",
2477                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2478                         break;
2479                 }
2480
2481                 /* If there is a gap at the start of this page, it can't merge
2482                  * with any previous page, so we'll hand the network a
2483                  * "fragmented" page array that it can't transfer in 1 RDMA */
2484                 if (page_count != 0 && oap->oap_page_off != 0)
2485                         break;
2486
2487                 /* in llite being 'ready' equates to the page being locked
2488                  * until completion unlocks it.  commit_write submits a page
2489                  * as not ready because its unlock will happen unconditionally
2490                  * as the call returns.  if we race with commit_write giving
2491                  * us that page we don't want to create a hole in the page
2492                  * stream, so we stop and leave the rpc to be fired by
2493                  * another dirtier or kupdated interval (the not ready page
2494                  * will still be on the dirty list).  we could call in
2495                  * at the end of ll_file_write to process the queue again. */
2496                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2497                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2498                                                     cmd);
2499                         if (rc < 0)
2500                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2501                                                 "instead of ready\n", oap,
2502                                                 oap->oap_page, rc);
2503                         switch (rc) {
2504                         case -EAGAIN:
2505                                 /* llite is telling us that the page is still
2506                                  * in commit_write and that we should try
2507                                  * and put it in an rpc again later.  we
2508                                  * break out of the loop so we don't create
2509                                  * a hole in the sequence of pages in the rpc
2510                                  * stream.*/
2511                                 oap = NULL;
2512                                 break;
2513                         case -EINTR:
2514                                 /* the io isn't needed.. tell the checks
2515                                  * below to complete the rpc with EINTR */
2516                                 cfs_spin_lock(&oap->oap_lock);
2517                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2518                                 cfs_spin_unlock(&oap->oap_lock);
2519                                 oap->oap_count = -EINTR;
2520                                 break;
2521                         case 0:
2522                                 cfs_spin_lock(&oap->oap_lock);
2523                                 oap->oap_async_flags |= ASYNC_READY;
2524                                 cfs_spin_unlock(&oap->oap_lock);
2525                                 break;
2526                         default:
2527                                 LASSERTF(0, "oap %p page %p returned %d "
2528                                             "from make_ready\n", oap,
2529                                             oap->oap_page, rc);
2530                                 break;
2531                         }
2532                 }
2533                 if (oap == NULL)
2534                         break;
2535                 /*
2536                  * Page submitted for IO has to be locked. Either by
2537                  * ->ap_make_ready() or by higher layers.
2538                  */
2539 #if defined(__KERNEL__) && defined(__linux__)
2540                 {
2541                         struct cl_page *page;
2542
2543                         page = osc_oap2cl_page(oap);
2544
2545                         if (page->cp_type == CPT_CACHEABLE &&
2546                             !(PageLocked(oap->oap_page) &&
2547                               (CheckWriteback(oap->oap_page, cmd)))) {
2548                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2549                                        oap->oap_page,
2550                                        (long)oap->oap_page->flags,
2551                                        oap->oap_async_flags);
2552                                 LBUG();
2553                         }
2554                 }
2555 #endif
2556
2557                 /* take the page out of our book-keeping */
2558                 cfs_list_del_init(&oap->oap_pending_item);
2559                 lop_update_pending(cli, lop, cmd, -1);
2560                 cfs_list_del_init(&oap->oap_urgent_item);
2561
2562                 if (page_count == 0)
2563                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2564                                           (PTLRPC_MAX_BRW_SIZE - 1);
2565
2566                 /* ask the caller for the size of the io as the rpc leaves. */
2567                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2568                         oap->oap_count =
2569                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2570                                                       cmd);
2571                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2572                 }
2573                 if (oap->oap_count <= 0) {
2574                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2575                                oap->oap_count);
2576                         osc_ap_completion(env, cli, NULL,
2577                                           oap, 0, oap->oap_count);
2578                         continue;
2579                 }
2580
2581                 /* now put the page back in our accounting */
2582                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2583                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2584                         mem_tight = 1;
2585                 if (page_count == 0)
2586                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2587                 if (++page_count >= cli->cl_max_pages_per_rpc)
2588                         break;
2589
2590                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2591                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2592                  * have the same alignment as the initial writes that allocated
2593                  * extents on the server. */
2594                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2595                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2596                 if (ending_offset == 0)
2597                         break;
2598
2599                 /* If there is a gap at the end of this page, it can't merge
2600                  * with any subsequent pages, so we'll hand the network a
2601                  * "fragmented" page array that it can't transfer in 1 RDMA */
2602                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2603                         break;
2604         }
2605
2606         osc_wake_cache_waiters(cli);
2607         osc_wake_sync_fs(cli);
2608         loi_list_maint(cli, loi);
2609
2610         client_obd_list_unlock(&cli->cl_loi_list_lock);
2611
2612         if (clob != NULL)
2613                 cl_object_put(env, clob);
2614
2615         if (page_count == 0) {
2616                 client_obd_list_lock(&cli->cl_loi_list_lock);
2617                 RETURN(0);
2618         }
2619
2620         req = osc_build_req(env, cli, &rpc_list, page_count,
2621                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2622         if (IS_ERR(req)) {
2623                 LASSERT(cfs_list_empty(&rpc_list));
2624                 loi_list_maint(cli, loi);
2625                 RETURN(PTR_ERR(req));
2626         }
2627
2628         aa = ptlrpc_req_async_args(req);
2629
2630         if (cmd == OBD_BRW_READ) {
2631                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2632                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2633                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2634                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2635         } else {
2636                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2637                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2638                                  cli->cl_w_in_flight);
2639                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2640                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2641         }
2642         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2643
2644         client_obd_list_lock(&cli->cl_loi_list_lock);
2645
2646         if (cmd == OBD_BRW_READ)
2647                 cli->cl_r_in_flight++;
2648         else
2649                 cli->cl_w_in_flight++;
2650
2651         /* queued sync pages can be torn down while the pages
2652          * were between the pending list and the rpc */
2653         tmp = NULL;
2654         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2655                 /* only one oap gets a request reference */
2656                 if (tmp == NULL)
2657                         tmp = oap;
2658                 if (oap->oap_interrupted && !req->rq_intr) {
2659                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2660                                oap, req);
2661                         ptlrpc_mark_interrupted(req);
2662                 }
2663         }
2664         if (tmp != NULL)
2665                 tmp->oap_request = ptlrpc_request_addref(req);
2666
2667         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2668                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2669
2670         req->rq_interpret_reply = brw_interpret;
2671         ptlrpcd_add_req(req, PSCOPE_BRW);
2672         RETURN(1);
2673 }
2674
2675 #define LOI_DEBUG(LOI, STR, args...)                                     \
2676         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2677                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2678                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2679                (LOI)->loi_write_lop.lop_num_pending,                     \
2680                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2681                (LOI)->loi_read_lop.lop_num_pending,                      \
2682                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2683                args)                                                     \
2684
2685 /* This is called by osc_check_rpcs() to find which objects have pages that
2686  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2687 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2688 {
2689         ENTRY;
2690
2691         /* First return objects that have blocked locks so that they
2692          * will be flushed quickly and other clients can get the lock,
2693          * then objects which have pages ready to be stuffed into RPCs */
2694         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2695                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2696                                       struct lov_oinfo, loi_hp_ready_item));
2697         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2698                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2699                                       struct lov_oinfo, loi_ready_item));
2700         if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
2701                 RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
2702                                       struct lov_oinfo, loi_sync_fs_item));
2703
2704         /* then if we have cache waiters, return all objects with queued
2705          * writes.  This is especially important when many small files
2706          * have filled up the cache and not been fired into rpcs because
2707          * they don't pass the nr_pending/object threshhold */
2708         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2709             !cfs_list_empty(&cli->cl_loi_write_list))
2710                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2711                                       struct lov_oinfo, loi_write_item));
2712
2713         /* then return all queued objects when we have an invalid import
2714          * so that they get flushed */
2715         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2716                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2717                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2718                                               struct lov_oinfo,
2719                                               loi_write_item));
2720                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2721                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2722                                               struct lov_oinfo, loi_read_item));
2723         }
2724         RETURN(NULL);
2725 }
2726
2727 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2728 {
2729         struct osc_async_page *oap;
2730         int hprpc = 0;
2731
2732         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2733                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2734                                      struct osc_async_page, oap_urgent_item);
2735                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2736         }
2737
2738         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2739                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2740                                      struct osc_async_page, oap_urgent_item);
2741                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2742         }
2743
2744         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2745 }
2746
2747 /* called with the loi list lock held */
2748 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2749 {
2750         struct lov_oinfo *loi;
2751         int rc = 0, race_counter = 0;
2752         ENTRY;
2753
2754         while ((loi = osc_next_loi(cli)) != NULL) {
2755                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2756
2757                 if (osc_max_rpc_in_flight(cli, loi))
2758                         break;
2759
2760                 /* attempt some read/write balancing by alternating between
2761                  * reads and writes in an object.  The makes_rpc checks here
2762                  * would be redundant if we were getting read/write work items
2763                  * instead of objects.  we don't want send_oap_rpc to drain a
2764                  * partial read pending queue when we're given this object to
2765                  * do io on writes while there are cache waiters */
2766                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2767                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2768                                               &loi->loi_write_lop);
2769                         if (rc < 0) {
2770                                 CERROR("Write request failed with %d\n", rc);
2771
2772                                 /* osc_send_oap_rpc failed, mostly because of
2773                                  * memory pressure.
2774                                  *
2775                                  * It can't break here, because if:
2776                                  *  - a page was submitted by osc_io_submit, so
2777                                  *    page locked;
2778                                  *  - no request in flight
2779                                  *  - no subsequent request
2780                                  * The system will be in live-lock state,
2781                                  * because there is no chance to call
2782                                  * osc_io_unplug() and osc_check_rpcs() any
2783                                  * more. pdflush can't help in this case,
2784                                  * because it might be blocked at grabbing
2785                                  * the page lock as we mentioned.
2786                                  *
2787                                  * Anyway, continue to drain pages. */
2788                                 /* break; */
2789                         }
2790
2791                         if (rc > 0)
2792                                 race_counter = 0;
2793                         else
2794                                 race_counter++;
2795                 }
2796                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2797                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2798                                               &loi->loi_read_lop);
2799                         if (rc < 0)
2800                                 CERROR("Read request failed with %d\n", rc);
2801
2802                         if (rc > 0)
2803                                 race_counter = 0;
2804                         else
2805                                 race_counter++;
2806                 }
2807
2808                 /* attempt some inter-object balancing by issuing rpcs
2809                  * for each object in turn */
2810                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2811                         cfs_list_del_init(&loi->loi_hp_ready_item);
2812                 if (!cfs_list_empty(&loi->loi_ready_item))
2813                         cfs_list_del_init(&loi->loi_ready_item);
2814                 if (!cfs_list_empty(&loi->loi_write_item))
2815                         cfs_list_del_init(&loi->loi_write_item);
2816                 if (!cfs_list_empty(&loi->loi_read_item))
2817                         cfs_list_del_init(&loi->loi_read_item);
2818                 if (!cfs_list_empty(&loi->loi_sync_fs_item))
2819                         cfs_list_del_init(&loi->loi_sync_fs_item);
2820
2821                 loi_list_maint(cli, loi);
2822
2823                 /* send_oap_rpc fails with 0 when make_ready tells it to
2824                  * back off.  llite's make_ready does this when it tries
2825                  * to lock a page queued for write that is already locked.
2826                  * we want to try sending rpcs from many objects, but we
2827                  * don't want to spin failing with 0.  */
2828                 if (race_counter == 10)
2829                         break;
2830         }
2831         EXIT;
2832 }
2833
2834 /* we're trying to queue a page in the osc so we're subject to the
2835  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2836  * If the osc's queued pages are already at that limit, then we want to sleep
2837  * until there is space in the osc's queue for us.  We also may be waiting for
2838  * write credits from the OST if there are RPCs in flight that may return some
2839  * before we fall back to sync writes.
2840  *
2841  * We need this know our allocation was granted in the presence of signals */
2842 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2843 {
2844         int rc;
2845         ENTRY;
2846         client_obd_list_lock(&cli->cl_loi_list_lock);
2847         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2848         client_obd_list_unlock(&cli->cl_loi_list_lock);
2849         RETURN(rc);
2850 };
2851
2852 /**
2853  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2854  * is available.
2855  */
2856 int osc_enter_cache_try(const struct lu_env *env,
2857                         struct client_obd *cli, struct lov_oinfo *loi,
2858                         struct osc_async_page *oap, int transient)
2859 {
2860         int has_grant;
2861
2862         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2863         if (has_grant) {
2864                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2865                 if (transient) {
2866                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2867                         cfs_atomic_inc(&obd_dirty_transit_pages);
2868                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2869                 }
2870         }
2871         return has_grant;
2872 }
2873
2874 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2875  * grant or cache space. */
2876 static int osc_enter_cache(const struct lu_env *env,
2877                            struct client_obd *cli, struct lov_oinfo *loi,
2878                            struct osc_async_page *oap)
2879 {
2880         struct osc_cache_waiter ocw;
2881         struct l_wait_info lwi = { 0 };
2882
2883         ENTRY;
2884
2885         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2886                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2887                cli->cl_dirty_max, obd_max_dirty_pages,
2888                cli->cl_lost_grant, cli->cl_avail_grant);
2889
2890         /* force the caller to try sync io.  this can jump the list
2891          * of queued writes and create a discontiguous rpc stream */
2892         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2893             loi->loi_ar.ar_force_sync)
2894                 RETURN(-EDQUOT);
2895
2896         /* Hopefully normal case - cache space and write credits available */
2897         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2898             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2899             osc_enter_cache_try(env, cli, loi, oap, 0))
2900                 RETURN(0);
2901
2902         /* It is safe to block as a cache waiter as long as there is grant
2903          * space available or the hope of additional grant being returned
2904          * when an in flight write completes.  Using the write back cache
2905          * if possible is preferable to sending the data synchronously
2906          * because write pages can then be merged in to large requests.
2907          * The addition of this cache waiter will causing pending write
2908          * pages to be sent immediately. */
2909         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2910                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2911                 cfs_waitq_init(&ocw.ocw_waitq);
2912                 ocw.ocw_oap = oap;
2913                 ocw.ocw_rc = 0;
2914
2915                 loi_list_maint(cli, loi);
2916                 osc_check_rpcs(env, cli);
2917                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2918
2919                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2920                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2921
2922                 client_obd_list_lock(&cli->cl_loi_list_lock);
2923                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2924                         cfs_list_del(&ocw.ocw_entry);
2925                         RETURN(-EINTR);
2926                 }
2927                 RETURN(ocw.ocw_rc);
2928         }
2929
2930         RETURN(-EDQUOT);
2931 }
2932
2933
2934 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2935                         struct lov_oinfo *loi, cfs_page_t *page,
2936                         obd_off offset, const struct obd_async_page_ops *ops,
2937                         void *data, void **res, int nocache,
2938                         struct lustre_handle *lockh)
2939 {
2940         struct osc_async_page *oap;
2941
2942         ENTRY;
2943
2944         if (!page)
2945                 return cfs_size_round(sizeof(*oap));
2946
2947         oap = *res;
2948         oap->oap_magic = OAP_MAGIC;
2949         oap->oap_cli = &exp->exp_obd->u.cli;
2950         oap->oap_loi = loi;
2951
2952         oap->oap_caller_ops = ops;
2953         oap->oap_caller_data = data;
2954
2955         oap->oap_page = page;
2956         oap->oap_obj_off = offset;
2957         if (!client_is_remote(exp) &&
2958             cfs_capable(CFS_CAP_SYS_RESOURCE))
2959                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2960
2961         LASSERT(!(offset & ~CFS_PAGE_MASK));
2962
2963         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2964         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2965         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2966         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2967
2968         cfs_spin_lock_init(&oap->oap_lock);
2969         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2970         RETURN(0);
2971 }
2972
2973 struct osc_async_page *oap_from_cookie(void *cookie)
2974 {
2975         struct osc_async_page *oap = cookie;
2976         if (oap->oap_magic != OAP_MAGIC)
2977                 return ERR_PTR(-EINVAL);
2978         return oap;
2979 };
2980
2981 int osc_queue_async_io(const struct lu_env *env,
2982                        struct obd_export *exp, struct lov_stripe_md *lsm,
2983                        struct lov_oinfo *loi, void *cookie,
2984                        int cmd, obd_off off, int count,
2985                        obd_flag brw_flags, enum async_flags async_flags)
2986 {
2987         struct client_obd *cli = &exp->exp_obd->u.cli;
2988         struct osc_async_page *oap;
2989         int rc = 0;
2990         ENTRY;
2991
2992         oap = oap_from_cookie(cookie);
2993         if (IS_ERR(oap))
2994                 RETURN(PTR_ERR(oap));
2995
2996         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2997                 RETURN(-EIO);
2998
2999         if (!cfs_list_empty(&oap->oap_pending_item) ||
3000             !cfs_list_empty(&oap->oap_urgent_item) ||
3001             !cfs_list_empty(&oap->oap_rpc_item))
3002                 RETURN(-EBUSY);
3003
3004         /* check if the file's owner/group is over quota */
3005         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3006                 struct cl_object *obj;
3007                 struct cl_attr    attr; /* XXX put attr into thread info */
3008                 unsigned int qid[MAXQUOTAS];
3009
3010                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3011
3012                 cl_object_attr_lock(obj);
3013                 rc = cl_object_attr_get(env, obj, &attr);
3014                 cl_object_attr_unlock(obj);
3015
3016                 qid[USRQUOTA] = attr.cat_uid;
3017                 qid[GRPQUOTA] = attr.cat_gid;
3018                 if (rc == 0 &&
3019                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3020                         rc = -EDQUOT;
3021                 if (rc)
3022                         RETURN(rc);
3023         }
3024
3025         if (loi == NULL)
3026                 loi = lsm->lsm_oinfo[0];
3027
3028         client_obd_list_lock(&cli->cl_loi_list_lock);
3029
3030         LASSERT(off + count <= CFS_PAGE_SIZE);
3031         oap->oap_cmd = cmd;
3032         oap->oap_page_off = off;
3033         oap->oap_count = count;
3034         oap->oap_brw_flags = brw_flags;
3035         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3036         if (cfs_memory_pressure_get())
3037                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3038         cfs_spin_lock(&oap->oap_lock);
3039         oap->oap_async_flags = async_flags;
3040         cfs_spin_unlock(&oap->oap_lock);
3041
3042         if (cmd & OBD_BRW_WRITE) {
3043                 rc = osc_enter_cache(env, cli, loi, oap);
3044                 if (rc) {
3045                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3046                         RETURN(rc);
3047                 }
3048         }
3049
3050         osc_oap_to_pending(oap);
3051         loi_list_maint(cli, loi);
3052
3053         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3054                   cmd);
3055
3056         osc_check_rpcs(env, cli);
3057         client_obd_list_unlock(&cli->cl_loi_list_lock);
3058
3059         RETURN(0);
3060 }
3061
3062 /* aka (~was & now & flag), but this is more clear :) */
3063 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3064
3065 int osc_set_async_flags_base(struct client_obd *cli,
3066                              struct lov_oinfo *loi, struct osc_async_page *oap,
3067                              obd_flag async_flags)
3068 {
3069         struct loi_oap_pages *lop;
3070         int flags = 0;
3071         ENTRY;
3072
3073         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3074
3075         if (oap->oap_cmd & OBD_BRW_WRITE) {
3076                 lop = &loi->loi_write_lop;
3077         } else {
3078                 lop = &loi->loi_read_lop;
3079         }
3080
3081         if ((oap->oap_async_flags & async_flags) == async_flags)
3082                 RETURN(0);
3083
3084         /* XXX: This introduces a tiny insignificant race for the case if this
3085          * loi already had other urgent items.
3086          */
3087         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
3088             cfs_list_empty(&oap->oap_rpc_item) &&
3089             cfs_list_empty(&oap->oap_urgent_item)) {
3090                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3091                 flags |= ASYNC_SYNCFS;
3092                 cfs_spin_lock(&oap->oap_lock);
3093                 oap->oap_async_flags |= flags;
3094                 cfs_spin_unlock(&oap->oap_lock);
3095                 loi_list_maint(cli, loi);
3096                 RETURN(0);
3097         }
3098
3099         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3100                 flags |= ASYNC_READY;
3101
3102         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3103             cfs_list_empty(&oap->oap_rpc_item)) {
3104                 if (oap->oap_async_flags & ASYNC_HP)
3105                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3106                 else
3107                         cfs_list_add_tail(&oap->oap_urgent_item,
3108                                           &lop->lop_urgent);
3109                 flags |= ASYNC_URGENT;
3110                 loi_list_maint(cli, loi);
3111         }
3112         cfs_spin_lock(&oap->oap_lock);
3113         oap->oap_async_flags |= flags;
3114         cfs_spin_unlock(&oap->oap_lock);
3115
3116         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3117                         oap->oap_async_flags);
3118         RETURN(0);
3119 }
3120
3121 int osc_teardown_async_page(struct obd_export *exp,
3122                             struct lov_stripe_md *lsm,
3123                             struct lov_oinfo *loi, void *cookie)
3124 {
3125         struct client_obd *cli = &exp->exp_obd->u.cli;
3126         struct loi_oap_pages *lop;
3127         struct osc_async_page *oap;
3128         int rc = 0;
3129         ENTRY;
3130
3131         oap = oap_from_cookie(cookie);
3132         if (IS_ERR(oap))
3133                 RETURN(PTR_ERR(oap));
3134
3135         if (loi == NULL)
3136                 loi = lsm->lsm_oinfo[0];
3137
3138         if (oap->oap_cmd & OBD_BRW_WRITE) {
3139                 lop = &loi->loi_write_lop;
3140         } else {
3141                 lop = &loi->loi_read_lop;
3142         }
3143
3144         client_obd_list_lock(&cli->cl_loi_list_lock);
3145
3146         if (!cfs_list_empty(&oap->oap_rpc_item))
3147                 GOTO(out, rc = -EBUSY);
3148
3149         osc_exit_cache(cli, oap, 0);
3150         osc_wake_cache_waiters(cli);
3151
3152         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3153                 cfs_list_del_init(&oap->oap_urgent_item);
3154                 cfs_spin_lock(&oap->oap_lock);
3155                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
3156                                           ASYNC_SYNCFS);
3157                 cfs_spin_unlock(&oap->oap_lock);
3158         }
3159         if (!cfs_list_empty(&oap->oap_pending_item)) {
3160                 cfs_list_del_init(&oap->oap_pending_item);
3161                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3162         }
3163         loi_list_maint(cli, loi);
3164         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3165 out:
3166         client_obd_list_unlock(&cli->cl_loi_list_lock);
3167         RETURN(rc);
3168 }
3169
3170 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3171                                          struct ldlm_enqueue_info *einfo,
3172                                          int flags)
3173 {
3174         void *data = einfo->ei_cbdata;
3175
3176         LASSERT(lock != NULL);
3177         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3178         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3179         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3180         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3181
3182         lock_res_and_lock(lock);
3183         cfs_spin_lock(&osc_ast_guard);
3184         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3185         lock->l_ast_data = data;
3186         cfs_spin_unlock(&osc_ast_guard);
3187         unlock_res_and_lock(lock);
3188 }
3189
3190 static void osc_set_data_with_check(struct lustre_handle *lockh,
3191                                     struct ldlm_enqueue_info *einfo,
3192                                     int flags)
3193 {
3194         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3195
3196         if (lock != NULL) {
3197                 osc_set_lock_data_with_check(lock, einfo, flags);
3198                 LDLM_LOCK_PUT(lock);
3199         } else
3200                 CERROR("lockh %p, data %p - client evicted?\n",
3201                        lockh, einfo->ei_cbdata);
3202 }
3203
3204 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3205                              ldlm_iterator_t replace, void *data)
3206 {
3207         struct ldlm_res_id res_id;
3208         struct obd_device *obd = class_exp2obd(exp);
3209
3210         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3211         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3212         return 0;
3213 }
3214
3215 /* find any ldlm lock of the inode in osc
3216  * return 0    not find
3217  *        1    find one
3218  *      < 0    error */
3219 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3220                            ldlm_iterator_t replace, void *data)
3221 {
3222         struct ldlm_res_id res_id;
3223         struct obd_device *obd = class_exp2obd(exp);
3224         int rc = 0;
3225
3226         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3227         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3228         if (rc == LDLM_ITER_STOP)
3229                 return(1);
3230         if (rc == LDLM_ITER_CONTINUE)
3231                 return(0);
3232         return(rc);
3233 }
3234
3235 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3236                             obd_enqueue_update_f upcall, void *cookie,
3237                             int *flags, int rc)
3238 {
3239         int intent = *flags & LDLM_FL_HAS_INTENT;
3240         ENTRY;
3241
3242         if (intent) {
3243                 /* The request was created before ldlm_cli_enqueue call. */
3244                 if (rc == ELDLM_LOCK_ABORTED) {
3245                         struct ldlm_reply *rep;
3246                         rep = req_capsule_server_get(&req->rq_pill,
3247                                                      &RMF_DLM_REP);
3248
3249                         LASSERT(rep != NULL);
3250                         if (rep->lock_policy_res1)
3251                                 rc = rep->lock_policy_res1;
3252                 }
3253         }
3254
3255         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3256                 *flags |= LDLM_FL_LVB_READY;
3257                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3258                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3259         }
3260
3261         /* Call the update callback. */
3262         rc = (*upcall)(cookie, rc);
3263         RETURN(rc);
3264 }
3265
3266 static int osc_enqueue_interpret(const struct lu_env *env,
3267                                  struct ptlrpc_request *req,
3268                                  struct osc_enqueue_args *aa, int rc)
3269 {
3270         struct ldlm_lock *lock;
3271         struct lustre_handle handle;
3272         __u32 mode;
3273
3274         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3275          * might be freed anytime after lock upcall has been called. */
3276         lustre_handle_copy(&handle, aa->oa_lockh);
3277         mode = aa->oa_ei->ei_mode;
3278
3279         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3280          * be valid. */
3281         lock = ldlm_handle2lock(&handle);
3282
3283         /* Take an additional reference so that a blocking AST that
3284          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3285          * to arrive after an upcall has been executed by
3286          * osc_enqueue_fini(). */
3287         ldlm_lock_addref(&handle, mode);
3288
3289         /* Let CP AST to grant the lock first. */
3290         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3291
3292         /* Complete obtaining the lock procedure. */
3293         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3294                                    mode, aa->oa_flags, aa->oa_lvb,
3295                                    sizeof(*aa->oa_lvb), &handle, rc);
3296         /* Complete osc stuff. */
3297         rc = osc_enqueue_fini(req, aa->oa_lvb,
3298                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3299
3300         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3301
3302         /* Release the lock for async request. */
3303         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3304                 /*
3305                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3306                  * not already released by
3307                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3308                  */
3309                 ldlm_lock_decref(&handle, mode);
3310
3311         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3312                  aa->oa_lockh, req, aa);
3313         ldlm_lock_decref(&handle, mode);
3314         LDLM_LOCK_PUT(lock);
3315         return rc;
3316 }
3317
3318 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3319                         struct lov_oinfo *loi, int flags,
3320                         struct ost_lvb *lvb, __u32 mode, int rc)
3321 {
3322         if (rc == ELDLM_OK) {
3323                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3324                 __u64 tmp;
3325
3326                 LASSERT(lock != NULL);
3327                 loi->loi_lvb = *lvb;
3328                 tmp = loi->loi_lvb.lvb_size;
3329                 /* Extend KMS up to the end of this lock and no further
3330                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3331                 if (tmp > lock->l_policy_data.l_extent.end)
3332                         tmp = lock->l_policy_data.l_extent.end + 1;
3333                 if (tmp >= loi->loi_kms) {
3334                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3335                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3336                         loi_kms_set(loi, tmp);
3337                 } else {
3338                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3339                                    LPU64"; leaving kms="LPU64", end="LPU64,
3340                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3341                                    lock->l_policy_data.l_extent.end);
3342                 }
3343                 ldlm_lock_allow_match(lock);
3344                 LDLM_LOCK_PUT(lock);
3345         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3346                 loi->loi_lvb = *lvb;
3347                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3348                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3349                 rc = ELDLM_OK;
3350         }
3351 }
3352 EXPORT_SYMBOL(osc_update_enqueue);
3353
3354 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3355
3356 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3357  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3358  * other synchronous requests, however keeping some locks and trying to obtain
3359  * others may take a considerable amount of time in a case of ost failure; and
3360  * when other sync requests do not get released lock from a client, the client
3361  * is excluded from the cluster -- such scenarious make the life difficult, so
3362  * release locks just after they are obtained. */
3363 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3364                      int *flags, ldlm_policy_data_t *policy,
3365                      struct ost_lvb *lvb, int kms_valid,
3366                      obd_enqueue_update_f upcall, void *cookie,
3367                      struct ldlm_enqueue_info *einfo,
3368                      struct lustre_handle *lockh,
3369                      struct ptlrpc_request_set *rqset, int async)
3370 {
3371         struct obd_device *obd = exp->exp_obd;
3372         struct ptlrpc_request *req = NULL;
3373         int intent = *flags & LDLM_FL_HAS_INTENT;
3374         ldlm_mode_t mode;
3375         int rc;
3376         ENTRY;
3377
3378         /* Filesystem lock extents are extended to page boundaries so that
3379          * dealing with the page cache is a little smoother.  */
3380         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3381         policy->l_extent.end |= ~CFS_PAGE_MASK;
3382
3383         /*
3384          * kms is not valid when either object is completely fresh (so that no
3385          * locks are cached), or object was evicted. In the latter case cached
3386          * lock cannot be used, because it would prime inode state with
3387          * potentially stale LVB.
3388          */
3389         if (!kms_valid)
3390                 goto no_match;
3391
3392         /* Next, search for already existing extent locks that will cover us */
3393         /* If we're trying to read, we also search for an existing PW lock.  The
3394          * VFS and page cache already protect us locally, so lots of readers/
3395          * writers can share a single PW lock.
3396          *
3397          * There are problems with conversion deadlocks, so instead of
3398          * converting a read lock to a write lock, we'll just enqueue a new
3399          * one.
3400          *
3401          * At some point we should cancel the read lock instead of making them
3402          * send us a blocking callback, but there are problems with canceling
3403          * locks out from other users right now, too. */
3404         mode = einfo->ei_mode;
3405         if (einfo->ei_mode == LCK_PR)
3406                 mode |= LCK_PW;
3407         mode = ldlm_lock_match(obd->obd_namespace,
3408                                *flags | LDLM_FL_LVB_READY, res_id,
3409                                einfo->ei_type, policy, mode, lockh, 0);
3410         if (mode) {
3411                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3412
3413                 if (matched->l_ast_data == NULL ||
3414                     matched->l_ast_data == einfo->ei_cbdata) {
3415                         /* addref the lock only if not async requests and PW
3416                          * lock is matched whereas we asked for PR. */
3417                         if (!rqset && einfo->ei_mode != mode)
3418                                 ldlm_lock_addref(lockh, LCK_PR);
3419                         osc_set_lock_data_with_check(matched, einfo, *flags);
3420                         if (intent) {
3421                                 /* I would like to be able to ASSERT here that
3422                                  * rss <= kms, but I can't, for reasons which
3423                                  * are explained in lov_enqueue() */
3424                         }
3425
3426                         /* We already have a lock, and it's referenced */
3427                         (*upcall)(cookie, ELDLM_OK);
3428
3429                         /* For async requests, decref the lock. */
3430                         if (einfo->ei_mode != mode)
3431                                 ldlm_lock_decref(lockh, LCK_PW);
3432                         else if (rqset)
3433                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3434                         LDLM_LOCK_PUT(matched);
3435                         RETURN(ELDLM_OK);
3436                 } else
3437                         ldlm_lock_decref(lockh, mode);
3438                 LDLM_LOCK_PUT(matched);
3439         }
3440
3441  no_match:
3442         if (intent) {
3443                 CFS_LIST_HEAD(cancels);
3444                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3445                                            &RQF_LDLM_ENQUEUE_LVB);
3446                 if (req == NULL)
3447                         RETURN(-ENOMEM);
3448
3449                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3450                 if (rc) {
3451                         ptlrpc_request_free(req);
3452                         RETURN(rc);
3453                 }
3454
3455                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3456                                      sizeof *lvb);
3457                 ptlrpc_request_set_replen(req);
3458         }
3459
3460         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3461         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3462
3463         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3464                               sizeof(*lvb), lockh, async);
3465         if (rqset) {
3466                 if (!rc) {
3467                         struct osc_enqueue_args *aa;
3468                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3469                         aa = ptlrpc_req_async_args(req);
3470                         aa->oa_ei = einfo;
3471                         aa->oa_exp = exp;
3472                         aa->oa_flags  = flags;
3473                         aa->oa_upcall = upcall;
3474                         aa->oa_cookie = cookie;
3475                         aa->oa_lvb    = lvb;
3476                         aa->oa_lockh  = lockh;
3477
3478                         req->rq_interpret_reply =
3479                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3480                         if (rqset == PTLRPCD_SET)
3481                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3482                         else
3483                                 ptlrpc_set_add_req(rqset, req);
3484                 } else if (intent) {
3485                         ptlrpc_req_finished(req);
3486                 }
3487                 RETURN(rc);
3488         }
3489
3490         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3491         if (intent)
3492                 ptlrpc_req_finished(req);
3493
3494         RETURN(rc);
3495 }
3496
3497 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3498                        struct ldlm_enqueue_info *einfo,
3499                        struct ptlrpc_request_set *rqset)
3500 {
3501         struct ldlm_res_id res_id;
3502         int rc;
3503         ENTRY;
3504
3505         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3506                            oinfo->oi_md->lsm_object_seq, &res_id);
3507
3508         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3509                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3510                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3511                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3512                               rqset, rqset != NULL);
3513         RETURN(rc);
3514 }
3515
3516 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3517                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3518                    int *flags, void *data, struct lustre_handle *lockh,
3519                    int unref)
3520 {
3521         struct obd_device *obd = exp->exp_obd;
3522         int lflags = *flags;
3523         ldlm_mode_t rc;
3524         ENTRY;
3525
3526         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3527                 RETURN(-EIO);
3528
3529         /* Filesystem lock extents are extended to page boundaries so that
3530          * dealing with the page cache is a little smoother */
3531         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3532         policy->l_extent.end |= ~CFS_PAGE_MASK;
3533
3534         /* Next, search for already existing extent locks that will cover us */
3535         /* If we're trying to read, we also search for an existing PW lock.  The
3536          * VFS and page cache already protect us locally, so lots of readers/
3537          * writers can share a single PW lock. */
3538         rc = mode;
3539         if (mode == LCK_PR)
3540                 rc |= LCK_PW;
3541         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3542                              res_id, type, policy, rc, lockh, unref);
3543         if (rc) {
3544                 if (data != NULL)
3545                         osc_set_data_with_check(lockh, data, lflags);
3546                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3547                         ldlm_lock_addref(lockh, LCK_PR);
3548                         ldlm_lock_decref(lockh, LCK_PW);
3549                 }
3550                 RETURN(rc);
3551         }
3552         RETURN(rc);
3553 }
3554
3555 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3556 {
3557         ENTRY;
3558
3559         if (unlikely(mode == LCK_GROUP))
3560                 ldlm_lock_decref_and_cancel(lockh, mode);
3561         else
3562                 ldlm_lock_decref(lockh, mode);
3563
3564         RETURN(0);
3565 }
3566
3567 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3568                       __u32 mode, struct lustre_handle *lockh)
3569 {
3570         ENTRY;
3571         RETURN(osc_cancel_base(lockh, mode));
3572 }
3573
3574 static int osc_cancel_unused(struct obd_export *exp,
3575                              struct lov_stripe_md *lsm,
3576                              ldlm_cancel_flags_t flags,
3577                              void *opaque)
3578 {
3579         struct obd_device *obd = class_exp2obd(exp);
3580         struct ldlm_res_id res_id, *resp = NULL;
3581
3582         if (lsm != NULL) {
3583                 resp = osc_build_res_name(lsm->lsm_object_id,
3584                                           lsm->lsm_object_seq, &res_id);
3585         }
3586
3587         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3588 }
3589
3590 static int osc_statfs_interpret(const struct lu_env *env,
3591                                 struct ptlrpc_request *req,
3592                                 struct osc_async_args *aa, int rc)
3593 {
3594         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3595         struct obd_statfs *msfs;
3596         __u64 used;
3597         ENTRY;
3598
3599         if (rc == -EBADR)
3600                 /* The request has in fact never been sent
3601                  * due to issues at a higher level (LOV).
3602                  * Exit immediately since the caller is
3603                  * aware of the problem and takes care
3604                  * of the clean up */
3605                  RETURN(rc);
3606
3607         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3608             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3609                 GOTO(out, rc = 0);
3610
3611         if (rc != 0)
3612                 GOTO(out, rc);
3613
3614         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3615         if (msfs == NULL) {
3616                 GOTO(out, rc = -EPROTO);
3617         }
3618
3619         /* Reinitialize the RDONLY and DEGRADED flags at the client
3620          * on each statfs, so they don't stay set permanently. */
3621         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3622
3623         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3624                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3625         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3626                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3627
3628         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3629                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3630         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3631                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3632
3633         /* Add a bit of hysteresis so this flag isn't continually flapping,
3634          * and ensure that new files don't get extremely fragmented due to
3635          * only a small amount of available space in the filesystem.
3636          * We want to set the NOSPC flag when there is less than ~0.1% free
3637          * and clear it when there is at least ~0.2% free space, so:
3638          *                   avail < ~0.1% max          max = avail + used
3639          *            1025 * avail < avail + used       used = blocks - free
3640          *            1024 * avail < used
3641          *            1024 * avail < blocks - free                      
3642          *                   avail < ((blocks - free) >> 10)    
3643          *
3644          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3645          * lose that amount of space so in those cases we report no space left
3646          * if their is less than 1 GB left.                             */
3647         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3648         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3649                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3650                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3651         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3652                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3653                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3654
3655         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3656
3657         *aa->aa_oi->oi_osfs = *msfs;
3658 out:
3659         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3660         RETURN(rc);
3661 }
3662
3663 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3664                             __u64 max_age, struct ptlrpc_request_set *rqset)
3665 {
3666         struct ptlrpc_request *req;
3667         struct osc_async_args *aa;
3668         int                    rc;
3669         ENTRY;
3670
3671         /* We could possibly pass max_age in the request (as an absolute
3672          * timestamp or a "seconds.usec ago") so the target can avoid doing
3673          * extra calls into the filesystem if that isn't necessary (e.g.
3674          * during mount that would help a bit).  Having relative timestamps
3675          * is not so great if request processing is slow, while absolute
3676          * timestamps are not ideal because they need time synchronization. */
3677         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3678         if (req == NULL)
3679                 RETURN(-ENOMEM);
3680
3681         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3682         if (rc) {
3683                 ptlrpc_request_free(req);
3684                 RETURN(rc);
3685         }
3686         ptlrpc_request_set_replen(req);
3687         req->rq_request_portal = OST_CREATE_PORTAL;
3688         ptlrpc_at_set_req_timeout(req);
3689
3690         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3691                 /* procfs requests not want stat in wait for avoid deadlock */
3692                 req->rq_no_resend = 1;
3693                 req->rq_no_delay = 1;
3694         }
3695
3696         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3697         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3698         aa = ptlrpc_req_async_args(req);
3699         aa->aa_oi = oinfo;
3700
3701         ptlrpc_set_add_req(rqset, req);
3702         RETURN(0);
3703 }
3704
3705 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3706                       __u64 max_age, __u32 flags)
3707 {
3708         struct obd_statfs     *msfs;
3709         struct ptlrpc_request *req;
3710         struct obd_import     *imp = NULL;
3711         int rc;
3712         ENTRY;
3713
3714         /*Since the request might also come from lprocfs, so we need
3715          *sync this with client_disconnect_export Bug15684*/
3716         cfs_down_read(&obd->u.cli.cl_sem);
3717         if (obd->u.cli.cl_import)
3718                 imp = class_import_get(obd->u.cli.cl_import);
3719         cfs_up_read(&obd->u.cli.cl_sem);
3720         if (!imp)
3721                 RETURN(-ENODEV);
3722
3723         /* We could possibly pass max_age in the request (as an absolute
3724          * timestamp or a "seconds.usec ago") so the target can avoid doing
3725          * extra calls into the filesystem if that isn't necessary (e.g.
3726          * during mount that would help a bit).  Having relative timestamps
3727          * is not so great if request processing is slow, while absolute
3728          * timestamps are not ideal because they need time synchronization. */
3729         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3730
3731         class_import_put(imp);
3732
3733         if (req == NULL)
3734                 RETURN(-ENOMEM);
3735
3736         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3737         if (rc) {
3738                 ptlrpc_request_free(req);
3739                 RETURN(rc);
3740         }
3741         ptlrpc_request_set_replen(req);
3742         req->rq_request_portal = OST_CREATE_PORTAL;
3743         ptlrpc_at_set_req_timeout(req);
3744
3745         if (flags & OBD_STATFS_NODELAY) {
3746                 /* procfs requests not want stat in wait for avoid deadlock */
3747                 req->rq_no_resend = 1;
3748                 req->rq_no_delay = 1;
3749         }
3750
3751         rc = ptlrpc_queue_wait(req);
3752         if (rc)
3753                 GOTO(out, rc);
3754
3755         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3756         if (msfs == NULL) {
3757                 GOTO(out, rc = -EPROTO);
3758         }
3759
3760         *osfs = *msfs;
3761
3762         EXIT;
3763  out:
3764         ptlrpc_req_finished(req);
3765         return rc;
3766 }
3767
3768 /* Retrieve object striping information.
3769  *
3770  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3771  * the maximum number of OST indices which will fit in the user buffer.
3772  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3773  */
3774 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3775 {
3776         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3777         struct lov_user_md_v3 lum, *lumk;
3778         struct lov_user_ost_data_v1 *lmm_objects;
3779         int rc = 0, lum_size;
3780         ENTRY;
3781
3782         if (!lsm)
3783                 RETURN(-ENODATA);
3784
3785         /* we only need the header part from user space to get lmm_magic and
3786          * lmm_stripe_count, (the header part is common to v1 and v3) */
3787         lum_size = sizeof(struct lov_user_md_v1);
3788         if (cfs_copy_from_user(&lum, lump, lum_size))
3789                 RETURN(-EFAULT);
3790
3791         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3792             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3793                 RETURN(-EINVAL);
3794
3795         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3796         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3797         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3798         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3799
3800         /* we can use lov_mds_md_size() to compute lum_size
3801          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3802         if (lum.lmm_stripe_count > 0) {
3803                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3804                 OBD_ALLOC(lumk, lum_size);
3805                 if (!lumk)
3806                         RETURN(-ENOMEM);
3807
3808                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3809                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3810                 else
3811                         lmm_objects = &(lumk->lmm_objects[0]);
3812                 lmm_objects->l_object_id = lsm->lsm_object_id;
3813         } else {
3814                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3815                 lumk = &lum;
3816         }
3817
3818         lumk->lmm_object_id = lsm->lsm_object_id;
3819         lumk->lmm_object_seq = lsm->lsm_object_seq;
3820         lumk->lmm_stripe_count = 1;
3821
3822         if (cfs_copy_to_user(lump, lumk, lum_size))
3823                 rc = -EFAULT;
3824
3825         if (lumk != &lum)
3826                 OBD_FREE(lumk, lum_size);
3827
3828         RETURN(rc);
3829 }
3830
3831
3832 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3833                          void *karg, void *uarg)
3834 {
3835         struct obd_device *obd = exp->exp_obd;
3836         struct obd_ioctl_data *data = karg;
3837         int err = 0;
3838         ENTRY;
3839
3840         if (!cfs_try_module_get(THIS_MODULE)) {
3841                 CERROR("Can't get module. Is it alive?");
3842                 return -EINVAL;
3843         }
3844         switch (cmd) {
3845         case OBD_IOC_LOV_GET_CONFIG: {
3846                 char *buf;
3847                 struct lov_desc *desc;
3848                 struct obd_uuid uuid;
3849
3850                 buf = NULL;
3851                 len = 0;
3852                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3853                         GOTO(out, err = -EINVAL);
3854
3855                 data = (struct obd_ioctl_data *)buf;
3856
3857                 if (sizeof(*desc) > data->ioc_inllen1) {
3858                         obd_ioctl_freedata(buf, len);
3859                         GOTO(out, err = -EINVAL);
3860                 }
3861
3862                 if (data->ioc_inllen2 < sizeof(uuid)) {
3863                         obd_ioctl_freedata(buf, len);
3864                         GOTO(out, err = -EINVAL);
3865                 }
3866
3867                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3868                 desc->ld_tgt_count = 1;
3869                 desc->ld_active_tgt_count = 1;
3870                 desc->ld_default_stripe_count = 1;
3871                 desc->ld_default_stripe_size = 0;
3872                 desc->ld_default_stripe_offset = 0;
3873                 desc->ld_pattern = 0;
3874                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3875
3876                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3877
3878                 err = cfs_copy_to_user((void *)uarg, buf, len);
3879                 if (err)
3880                         err = -EFAULT;
3881                 obd_ioctl_freedata(buf, len);
3882                 GOTO(out, err);
3883         }
3884         case LL_IOC_LOV_SETSTRIPE:
3885                 err = obd_alloc_memmd(exp, karg);
3886                 if (err > 0)
3887                         err = 0;
3888                 GOTO(out, err);
3889         case LL_IOC_LOV_GETSTRIPE:
3890                 err = osc_getstripe(karg, uarg);
3891                 GOTO(out, err);
3892         case OBD_IOC_CLIENT_RECOVER:
3893                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3894                                             data->ioc_inlbuf1);
3895                 if (err > 0)
3896                         err = 0;
3897                 GOTO(out, err);
3898         case IOC_OSC_SET_ACTIVE:
3899                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3900                                                data->ioc_offset);
3901                 GOTO(out, err);
3902         case OBD_IOC_POLL_QUOTACHECK:
3903                 err = lquota_poll_check(quota_interface, exp,
3904                                         (struct if_quotacheck *)karg);
3905                 GOTO(out, err);
3906         case OBD_IOC_PING_TARGET:
3907                 err = ptlrpc_obd_ping(obd);
3908                 GOTO(out, err);
3909         default:
3910                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3911                        cmd, cfs_curproc_comm());
3912                 GOTO(out, err = -ENOTTY);
3913         }
3914 out:
3915         cfs_module_put(THIS_MODULE);
3916         return err;
3917 }
3918
3919 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3920                         void *key, __u32 *vallen, void *val,
3921                         struct lov_stripe_md *lsm)
3922 {
3923         ENTRY;
3924         if (!vallen || !val)
3925                 RETURN(-EFAULT);
3926
3927         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3928                 __u32 *stripe = val;
3929                 *vallen = sizeof(*stripe);
3930                 *stripe = 0;
3931                 RETURN(0);
3932         } else if (KEY_IS(KEY_LAST_ID)) {
3933                 struct ptlrpc_request *req;
3934                 obd_id                *reply;
3935                 char                  *tmp;
3936                 int                    rc;
3937
3938                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3939                                            &RQF_OST_GET_INFO_LAST_ID);
3940                 if (req == NULL)
3941                         RETURN(-ENOMEM);
3942
3943                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3944                                      RCL_CLIENT, keylen);
3945                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3946                 if (rc) {
3947                         ptlrpc_request_free(req);
3948                         RETURN(rc);
3949                 }
3950
3951                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3952                 memcpy(tmp, key, keylen);
3953
3954                 req->rq_no_delay = req->rq_no_resend = 1;
3955                 ptlrpc_request_set_replen(req);
3956                 rc = ptlrpc_queue_wait(req);
3957                 if (rc)
3958                         GOTO(out, rc);
3959
3960                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3961                 if (reply == NULL)
3962                         GOTO(out, rc = -EPROTO);
3963
3964                 *((obd_id *)val) = *reply;
3965         out:
3966                 ptlrpc_req_finished(req);
3967                 RETURN(rc);
3968         } else if (KEY_IS(KEY_FIEMAP)) {
3969                 struct ptlrpc_request *req;
3970                 struct ll_user_fiemap *reply;
3971                 char *tmp;
3972                 int rc;
3973
3974                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3975                                            &RQF_OST_GET_INFO_FIEMAP);
3976                 if (req == NULL)
3977                         RETURN(-ENOMEM);
3978
3979                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3980                                      RCL_CLIENT, keylen);
3981                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3982                                      RCL_CLIENT, *vallen);
3983                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3984                                      RCL_SERVER, *vallen);
3985
3986                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3987                 if (rc) {
3988                         ptlrpc_request_free(req);
3989                         RETURN(rc);
3990                 }
3991
3992                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3993                 memcpy(tmp, key, keylen);
3994                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3995                 memcpy(tmp, val, *vallen);
3996
3997                 ptlrpc_request_set_replen(req);
3998                 rc = ptlrpc_queue_wait(req);
3999                 if (rc)
4000                         GOTO(out1, rc);
4001
4002                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4003                 if (reply == NULL)
4004                         GOTO(out1, rc = -EPROTO);
4005
4006                 memcpy(val, reply, *vallen);
4007         out1:
4008                 ptlrpc_req_finished(req);
4009
4010                 RETURN(rc);
4011         }
4012
4013         RETURN(-EINVAL);
4014 }
4015
4016 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4017 {
4018         struct llog_ctxt *ctxt;
4019         int rc = 0;
4020         ENTRY;
4021
4022         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4023         if (ctxt) {
4024                 rc = llog_initiator_connect(ctxt);
4025                 llog_ctxt_put(ctxt);
4026         } else {
4027                 /* XXX return an error? skip setting below flags? */
4028         }
4029
4030         cfs_spin_lock(&imp->imp_lock);
4031         imp->imp_server_timeout = 1;
4032         imp->imp_pingable = 1;
4033         cfs_spin_unlock(&imp->imp_lock);
4034         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4035
4036         RETURN(rc);
4037 }
4038
4039 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4040                                           struct ptlrpc_request *req,
4041                                           void *aa, int rc)
4042 {
4043         ENTRY;
4044         if (rc != 0)
4045                 RETURN(rc);
4046
4047         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4048 }
4049
4050 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4051                               void *key, obd_count vallen, void *val,
4052                               struct ptlrpc_request_set *set)
4053 {
4054         struct ptlrpc_request *req;
4055         struct obd_device     *obd = exp->exp_obd;
4056         struct obd_import     *imp = class_exp2cliimp(exp);
4057         char                  *tmp;
4058         int                    rc;
4059         ENTRY;
4060
4061         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4062
4063         if (KEY_IS(KEY_NEXT_ID)) {
4064                 obd_id new_val;
4065                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4066
4067                 if (vallen != sizeof(obd_id))
4068                         RETURN(-ERANGE);
4069                 if (val == NULL)
4070                         RETURN(-EINVAL);
4071
4072                 if (vallen != sizeof(obd_id))
4073                         RETURN(-EINVAL);
4074
4075                 /* avoid race between allocate new object and set next id
4076                  * from ll_sync thread */
4077                 cfs_spin_lock(&oscc->oscc_lock);
4078                 new_val = *((obd_id*)val) + 1;
4079                 if (new_val > oscc->oscc_next_id)
4080                         oscc->oscc_next_id = new_val;
4081                 cfs_spin_unlock(&oscc->oscc_lock);
4082                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4083                        exp->exp_obd->obd_name,
4084                        obd->u.cli.cl_oscc.oscc_next_id);
4085
4086                 RETURN(0);
4087         }
4088
4089         if (KEY_IS(KEY_CHECKSUM)) {
4090                 if (vallen != sizeof(int))
4091                         RETURN(-EINVAL);
4092                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4093                 RETURN(0);
4094         }
4095
4096         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4097                 sptlrpc_conf_client_adapt(obd);
4098                 RETURN(0);
4099         }
4100
4101         if (KEY_IS(KEY_FLUSH_CTX)) {
4102                 sptlrpc_import_flush_my_ctx(imp);
4103                 RETURN(0);
4104         }
4105
4106         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4107                 RETURN(-EINVAL);
4108
4109         /* We pass all other commands directly to OST. Since nobody calls osc
4110            methods directly and everybody is supposed to go through LOV, we
4111            assume lov checked invalid values for us.
4112            The only recognised values so far are evict_by_nid and mds_conn.
4113            Even if something bad goes through, we'd get a -EINVAL from OST
4114            anyway. */
4115
4116         if (KEY_IS(KEY_GRANT_SHRINK))
4117                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4118         else
4119                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4120
4121         if (req == NULL)
4122                 RETURN(-ENOMEM);
4123
4124         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4125                              RCL_CLIENT, keylen);
4126         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4127                              RCL_CLIENT, vallen);
4128         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4129         if (rc) {
4130                 ptlrpc_request_free(req);
4131                 RETURN(rc);
4132         }
4133
4134         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4135         memcpy(tmp, key, keylen);
4136         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4137         memcpy(tmp, val, vallen);
4138
4139         if (KEY_IS(KEY_MDS_CONN)) {
4140                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4141
4142                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4143                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4144                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4145                 req->rq_no_delay = req->rq_no_resend = 1;
4146                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4147         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4148                 struct osc_grant_args *aa;
4149                 struct obdo *oa;
4150
4151                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4152                 aa = ptlrpc_req_async_args(req);
4153                 OBDO_ALLOC(oa);
4154                 if (!oa) {
4155                         ptlrpc_req_finished(req);
4156                         RETURN(-ENOMEM);
4157                 }
4158                 *oa = ((struct ost_body *)val)->oa;
4159                 aa->aa_oa = oa;
4160                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4161         }
4162
4163         ptlrpc_request_set_replen(req);
4164         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4165                 LASSERT(set != NULL);
4166                 ptlrpc_set_add_req(set, req);
4167                 ptlrpc_check_set(NULL, set);
4168         } else
4169                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4170
4171         RETURN(0);
4172 }
4173
4174
4175 static struct llog_operations osc_size_repl_logops = {
4176         lop_cancel: llog_obd_repl_cancel
4177 };
4178
4179 static struct llog_operations osc_mds_ost_orig_logops;
4180
4181 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4182                            struct obd_device *tgt, struct llog_catid *catid)
4183 {
4184         int rc;
4185         ENTRY;
4186
4187         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4188                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4189         if (rc) {
4190                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4191                 GOTO(out, rc);
4192         }
4193
4194         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4195                         NULL, &osc_size_repl_logops);
4196         if (rc) {
4197                 struct llog_ctxt *ctxt =
4198                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4199                 if (ctxt)
4200                         llog_cleanup(ctxt);
4201                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4202         }
4203         GOTO(out, rc);
4204 out:
4205         if (rc) {
4206                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4207                        obd->obd_name, tgt->obd_name, catid, rc);
4208                 CERROR("logid "LPX64":0x%x\n",
4209                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4210         }
4211         return rc;
4212 }
4213
4214 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4215                          struct obd_device *disk_obd, int *index)
4216 {
4217         struct llog_catid catid;
4218         static char name[32] = CATLIST;
4219         int rc;
4220         ENTRY;
4221
4222         LASSERT(olg == &obd->obd_olg);
4223
4224         cfs_mutex_down(&olg->olg_cat_processing);
4225         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4226         if (rc) {
4227                 CERROR("rc: %d\n", rc);
4228                 GOTO(out, rc);
4229         }
4230
4231         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4232                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4233                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4234
4235         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4236         if (rc) {
4237                 CERROR("rc: %d\n", rc);
4238                 GOTO(out, rc);
4239         }
4240
4241         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4242         if (rc) {
4243                 CERROR("rc: %d\n", rc);
4244                 GOTO(out, rc);
4245         }
4246
4247  out:
4248         cfs_mutex_up(&olg->olg_cat_processing);
4249
4250         return rc;
4251 }
4252
4253 static int osc_llog_finish(struct obd_device *obd, int count)
4254 {
4255         struct llog_ctxt *ctxt;
4256         int rc = 0, rc2 = 0;
4257         ENTRY;
4258
4259         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4260         if (ctxt)
4261                 rc = llog_cleanup(ctxt);
4262
4263         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4264         if (ctxt)
4265                 rc2 = llog_cleanup(ctxt);
4266         if (!rc)
4267                 rc = rc2;
4268
4269         RETURN(rc);
4270 }
4271
4272 static int osc_reconnect(const struct lu_env *env,
4273                          struct obd_export *exp, struct obd_device *obd,
4274                          struct obd_uuid *cluuid,
4275                          struct obd_connect_data *data,
4276                          void *localdata)
4277 {
4278         struct client_obd *cli = &obd->u.cli;
4279
4280         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4281                 long lost_grant;
4282
4283                 client_obd_list_lock(&cli->cl_loi_list_lock);
4284                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4285                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4286                 lost_grant = cli->cl_lost_grant;
4287                 cli->cl_lost_grant = 0;
4288                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4289
4290                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4291                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4292                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4293                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4294                        " ocd_grant: %d\n", data->ocd_connect_flags,
4295                        data->ocd_version, data->ocd_grant);
4296         }
4297
4298         RETURN(0);
4299 }
4300
4301 static int osc_disconnect(struct obd_export *exp)
4302 {
4303         struct obd_device *obd = class_exp2obd(exp);
4304         struct llog_ctxt  *ctxt;
4305         int rc;
4306
4307         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4308         if (ctxt) {
4309                 if (obd->u.cli.cl_conn_count == 1) {
4310                         /* Flush any remaining cancel messages out to the
4311                          * target */
4312                         llog_sync(ctxt, exp);
4313                 }
4314                 llog_ctxt_put(ctxt);
4315         } else {
4316                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4317                        obd);
4318         }
4319
4320         rc = client_disconnect_export(exp);
4321         /**
4322          * Initially we put del_shrink_grant before disconnect_export, but it
4323          * causes the following problem if setup (connect) and cleanup
4324          * (disconnect) are tangled together.
4325          *      connect p1                     disconnect p2
4326          *   ptlrpc_connect_import
4327          *     ...............               class_manual_cleanup
4328          *                                     osc_disconnect
4329          *                                     del_shrink_grant
4330          *   ptlrpc_connect_interrupt
4331          *     init_grant_shrink
4332          *   add this client to shrink list
4333          *                                      cleanup_osc
4334          * Bang! pinger trigger the shrink.
4335          * So the osc should be disconnected from the shrink list, after we
4336          * are sure the import has been destroyed. BUG18662
4337          */
4338         if (obd->u.cli.cl_import == NULL)
4339                 osc_del_shrink_grant(&obd->u.cli);
4340         return rc;
4341 }
4342
4343 static int osc_import_event(struct obd_device *obd,
4344                             struct obd_import *imp,
4345                             enum obd_import_event event)
4346 {
4347         struct client_obd *cli;
4348         int rc = 0;
4349
4350         ENTRY;
4351         LASSERT(imp->imp_obd == obd);
4352
4353         switch (event) {
4354         case IMP_EVENT_DISCON: {
4355                 /* Only do this on the MDS OSC's */
4356                 if (imp->imp_server_timeout) {
4357                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4358
4359                         cfs_spin_lock(&oscc->oscc_lock);
4360                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4361                         cfs_spin_unlock(&oscc->oscc_lock);
4362                 }
4363                 cli = &obd->u.cli;
4364                 client_obd_list_lock(&cli->cl_loi_list_lock);
4365                 cli->cl_avail_grant = 0;
4366                 cli->cl_lost_grant = 0;
4367                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4368                 break;
4369         }
4370         case IMP_EVENT_INACTIVE: {
4371                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4372                 break;
4373         }
4374         case IMP_EVENT_INVALIDATE: {
4375                 struct ldlm_namespace *ns = obd->obd_namespace;
4376                 struct lu_env         *env;
4377                 int                    refcheck;
4378
4379                 env = cl_env_get(&refcheck);
4380                 if (!IS_ERR(env)) {
4381                         /* Reset grants */
4382                         cli = &obd->u.cli;
4383                         client_obd_list_lock(&cli->cl_loi_list_lock);
4384                         /* all pages go to failing rpcs due to the invalid
4385                          * import */
4386                         osc_check_rpcs(env, cli);
4387                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4388
4389                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4390                         cl_env_put(env, &refcheck);
4391                 } else
4392                         rc = PTR_ERR(env);
4393                 break;
4394         }
4395         case IMP_EVENT_ACTIVE: {
4396                 /* Only do this on the MDS OSC's */
4397                 if (imp->imp_server_timeout) {
4398                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4399
4400                         cfs_spin_lock(&oscc->oscc_lock);
4401                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4402                         cfs_spin_unlock(&oscc->oscc_lock);
4403                 }
4404                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4405                 break;
4406         }
4407         case IMP_EVENT_OCD: {
4408                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4409
4410                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4411                         osc_init_grant(&obd->u.cli, ocd);
4412
4413                 /* See bug 7198 */
4414                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4415                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4416
4417                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4418                 break;
4419         }
4420         default:
4421                 CERROR("Unknown import event %d\n", event);
4422                 LBUG();
4423         }
4424         RETURN(rc);
4425 }
4426
4427 /**
4428  * Determine whether the lock can be canceled before replaying the lock
4429  * during recovery, see bug16774 for detailed information.
4430  *
4431  * \retval zero the lock can't be canceled
4432  * \retval other ok to cancel
4433  */
4434 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4435 {
4436         check_res_locked(lock->l_resource);
4437
4438         /*
4439          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4440          *
4441          * XXX as a future improvement, we can also cancel unused write lock
4442          * if it doesn't have dirty data and active mmaps.
4443          */
4444         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4445             (lock->l_granted_mode == LCK_PR ||
4446              lock->l_granted_mode == LCK_CR) &&
4447             (osc_dlm_lock_pageref(lock) == 0))
4448                 RETURN(1);
4449
4450         RETURN(0);
4451 }
4452
4453 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4454 {
4455         int rc;
4456         ENTRY;
4457
4458         ENTRY;
4459         rc = ptlrpcd_addref();
4460         if (rc)
4461                 RETURN(rc);
4462
4463         rc = client_obd_setup(obd, lcfg);
4464         if (rc) {
4465                 ptlrpcd_decref();
4466         } else {
4467                 struct lprocfs_static_vars lvars = { 0 };
4468                 struct client_obd *cli = &obd->u.cli;
4469
4470                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4471                 lprocfs_osc_init_vars(&lvars);
4472                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4473                         lproc_osc_attach_seqstat(obd);
4474                         sptlrpc_lprocfs_cliobd_attach(obd);
4475                         ptlrpc_lprocfs_register_obd(obd);
4476                 }
4477
4478                 oscc_init(obd);
4479                 /* We need to allocate a few requests more, because
4480                    brw_interpret tries to create new requests before freeing
4481                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4482                    reserved, but I afraid that might be too much wasted RAM
4483                    in fact, so 2 is just my guess and still should work. */
4484                 cli->cl_import->imp_rq_pool =
4485                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4486                                             OST_MAXREQSIZE,
4487                                             ptlrpc_add_rqs_to_pool);
4488
4489                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4490                 cfs_sema_init(&cli->cl_grant_sem, 1);
4491
4492                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4493         }
4494
4495         RETURN(rc);
4496 }
4497
4498 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4499 {
4500         int rc = 0;
4501         ENTRY;
4502
4503         switch (stage) {
4504         case OBD_CLEANUP_EARLY: {
4505                 struct obd_import *imp;
4506                 imp = obd->u.cli.cl_import;
4507                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4508                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4509                 ptlrpc_deactivate_import(imp);
4510                 cfs_spin_lock(&imp->imp_lock);
4511                 imp->imp_pingable = 0;
4512                 cfs_spin_unlock(&imp->imp_lock);
4513                 break;
4514         }
4515         case OBD_CLEANUP_EXPORTS: {
4516                 /* If we set up but never connected, the
4517                    client import will not have been cleaned. */
4518                 if (obd->u.cli.cl_import) {
4519                         struct obd_import *imp;
4520                         cfs_down_write(&obd->u.cli.cl_sem);
4521                         imp = obd->u.cli.cl_import;
4522                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4523                                obd->obd_name);
4524                         ptlrpc_invalidate_import(imp);
4525                         if (imp->imp_rq_pool) {
4526                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4527                                 imp->imp_rq_pool = NULL;
4528                         }
4529                         class_destroy_import(imp);
4530                         cfs_up_write(&obd->u.cli.cl_sem);
4531                         obd->u.cli.cl_import = NULL;
4532                 }
4533                 rc = obd_llog_finish(obd, 0);
4534                 if (rc != 0)
4535                         CERROR("failed to cleanup llogging subsystems\n");
4536                 break;
4537                 }
4538         }
4539         RETURN(rc);
4540 }
4541
4542 int osc_cleanup(struct obd_device *obd)
4543 {
4544         int rc;
4545
4546         ENTRY;
4547         ptlrpc_lprocfs_unregister_obd(obd);
4548         lprocfs_obd_cleanup(obd);
4549
4550         /* free memory of osc quota cache */
4551         lquota_cleanup(quota_interface, obd);
4552
4553         rc = client_obd_cleanup(obd);
4554
4555         ptlrpcd_decref();
4556         RETURN(rc);
4557 }
4558
4559 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4560 {
4561         struct lprocfs_static_vars lvars = { 0 };
4562         int rc = 0;
4563
4564         lprocfs_osc_init_vars(&lvars);
4565
4566         switch (lcfg->lcfg_command) {
4567         default:
4568                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4569                                               lcfg, obd);
4570                 if (rc > 0)
4571                         rc = 0;
4572                 break;
4573         }
4574
4575         return(rc);
4576 }
4577
4578 static int osc_sync_fs(struct obd_device *obd, struct obd_info *oinfo,
4579                        int wait)
4580 {
4581         struct client_obd *cli;
4582         struct lov_oinfo *loi;
4583         struct lov_oinfo *tloi;
4584         struct osc_async_page *oap;
4585         struct osc_async_page *toap;
4586         struct loi_oap_pages *lop;
4587         struct lu_env *env;
4588         int refcheck;
4589         int rc = 0;
4590         ENTRY;
4591
4592         env = cl_env_get(&refcheck);
4593         if (IS_ERR(env))
4594                 RETURN(PTR_ERR(env));
4595
4596         cli = &obd->u.cli;
4597         client_obd_list_lock(&cli->cl_loi_list_lock);
4598         cli->cl_sf_wait.sfw_oi = oinfo;
4599         cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
4600         cli->cl_sf_wait.started = 1;
4601         /* creating cl_loi_sync_fs list */
4602         cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
4603                                      loi_write_item) {
4604                 lop = &loi->loi_write_lop;
4605                 cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
4606                                              oap_pending_item)
4607                         osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
4608         }
4609
4610         osc_check_rpcs(env, cli);
4611         osc_wake_sync_fs(cli);
4612         client_obd_list_unlock(&cli->cl_loi_list_lock);
4613         cl_env_put(env, &refcheck);
4614         RETURN(rc);
4615 }
4616
4617 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4618 {
4619         return osc_process_config_base(obd, buf);
4620 }
4621
4622 struct obd_ops osc_obd_ops = {
4623         .o_owner                = THIS_MODULE,
4624         .o_setup                = osc_setup,
4625         .o_precleanup           = osc_precleanup,
4626         .o_cleanup              = osc_cleanup,
4627         .o_add_conn             = client_import_add_conn,
4628         .o_del_conn             = client_import_del_conn,
4629         .o_connect              = client_connect_import,
4630         .o_reconnect            = osc_reconnect,
4631         .o_disconnect           = osc_disconnect,
4632         .o_statfs               = osc_statfs,
4633         .o_statfs_async         = osc_statfs_async,
4634         .o_packmd               = osc_packmd,
4635         .o_unpackmd             = osc_unpackmd,
4636         .o_precreate            = osc_precreate,
4637         .o_create               = osc_create,
4638         .o_create_async         = osc_create_async,
4639         .o_destroy              = osc_destroy,
4640         .o_getattr              = osc_getattr,
4641         .o_getattr_async        = osc_getattr_async,
4642         .o_setattr              = osc_setattr,
4643         .o_setattr_async        = osc_setattr_async,
4644         .o_brw                  = osc_brw,
4645         .o_punch                = osc_punch,
4646         .o_sync                 = osc_sync,
4647         .o_enqueue              = osc_enqueue,
4648         .o_change_cbdata        = osc_change_cbdata,
4649         .o_find_cbdata          = osc_find_cbdata,
4650         .o_cancel               = osc_cancel,
4651         .o_cancel_unused        = osc_cancel_unused,
4652         .o_iocontrol            = osc_iocontrol,
4653         .o_get_info             = osc_get_info,
4654         .o_set_info_async       = osc_set_info_async,
4655         .o_import_event         = osc_import_event,
4656         .o_llog_init            = osc_llog_init,
4657         .o_llog_finish          = osc_llog_finish,
4658         .o_process_config       = osc_process_config,
4659         .o_sync_fs              = osc_sync_fs,
4660 };
4661
4662 extern struct lu_kmem_descr osc_caches[];
4663 extern cfs_spinlock_t       osc_ast_guard;
4664 extern cfs_lock_class_key_t osc_ast_guard_class;
4665
4666 int __init osc_init(void)
4667 {
4668         struct lprocfs_static_vars lvars = { 0 };
4669         int rc;
4670         ENTRY;
4671
4672         /* print an address of _any_ initialized kernel symbol from this
4673          * module, to allow debugging with gdb that doesn't support data
4674          * symbols from modules.*/
4675         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4676
4677         rc = lu_kmem_init(osc_caches);
4678
4679         lprocfs_osc_init_vars(&lvars);
4680
4681         cfs_request_module("lquota");
4682         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4683         lquota_init(quota_interface);
4684         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4685
4686         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4687                                  LUSTRE_OSC_NAME, &osc_device_type);
4688         if (rc) {
4689                 if (quota_interface)
4690                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4691                 lu_kmem_fini(osc_caches);
4692                 RETURN(rc);
4693         }
4694
4695         cfs_spin_lock_init(&osc_ast_guard);
4696         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4697
4698         osc_mds_ost_orig_logops = llog_lvfs_ops;
4699         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4700         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4701         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4702         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4703
4704         RETURN(rc);
4705 }
4706
4707 #ifdef __KERNEL__
4708 static void /*__exit*/ osc_exit(void)
4709 {
4710         lu_device_type_fini(&osc_device_type);
4711
4712         lquota_exit(quota_interface);
4713         if (quota_interface)
4714                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4715
4716         class_unregister_type(LUSTRE_OSC_NAME);
4717         lu_kmem_fini(osc_caches);
4718 }
4719
4720 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4721 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4722 MODULE_LICENSE("GPL");
4723
4724 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4725 #endif