Whamcloud - gitweb
LU-925 agl: async glimpse lock process in CLIO stack
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_OSC
44
45 #include <libcfs/libcfs.h>
46
47 #ifndef __KERNEL__
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
55 #include <obd_ost.h>
56 #include <obd_lov.h>
57
58 #ifdef  __CYGWIN__
59 # include <ctype.h>
60 #endif
61
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         struct obd_import *imp = class_exp2cliimp(exp);
113         ENTRY;
114
115         if (lmm != NULL) {
116                 if (lmm_bytes < sizeof (*lmm)) {
117                         CERROR("lov_mds_md too small: %d, need %d\n",
118                                lmm_bytes, (int)sizeof(*lmm));
119                         RETURN(-EINVAL);
120                 }
121                 /* XXX LOV_MAGIC etc check? */
122
123                 if (lmm->lmm_object_id == 0) {
124                         CERROR("lov_mds_md: zero lmm_object_id\n");
125                         RETURN(-EINVAL);
126                 }
127         }
128
129         lsm_size = lov_stripe_md_size(1);
130         if (lsmp == NULL)
131                 RETURN(lsm_size);
132
133         if (*lsmp != NULL && lmm == NULL) {
134                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
135                 OBD_FREE(*lsmp, lsm_size);
136                 *lsmp = NULL;
137                 RETURN(0);
138         }
139
140         if (*lsmp == NULL) {
141                 OBD_ALLOC(*lsmp, lsm_size);
142                 if (*lsmp == NULL)
143                         RETURN(-ENOMEM);
144                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
145                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
146                         OBD_FREE(*lsmp, lsm_size);
147                         RETURN(-ENOMEM);
148                 }
149                 loi_init((*lsmp)->lsm_oinfo[0]);
150         }
151
152         if (lmm != NULL) {
153                 /* XXX zero *lsmp? */
154                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
155                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
156                 LASSERT((*lsmp)->lsm_object_id);
157                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
158         }
159
160         if (imp != NULL &&
161             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
162                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
163         else
164                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
165
166         RETURN(lsm_size);
167 }
168
169 static inline void osc_pack_capa(struct ptlrpc_request *req,
170                                  struct ost_body *body, void *capa)
171 {
172         struct obd_capa *oc = (struct obd_capa *)capa;
173         struct lustre_capa *c;
174
175         if (!capa)
176                 return;
177
178         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
179         LASSERT(c);
180         capa_cpy(c, oc);
181         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
182         DEBUG_CAPA(D_SEC, c, "pack");
183 }
184
185 static inline void osc_pack_req_body(struct ptlrpc_request *req,
186                                      struct obd_info *oinfo)
187 {
188         struct ost_body *body;
189
190         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
191         LASSERT(body);
192
193         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
194         osc_pack_capa(req, body, oinfo->oi_capa);
195 }
196
197 static inline void osc_set_capa_size(struct ptlrpc_request *req,
198                                      const struct req_msg_field *field,
199                                      struct obd_capa *oc)
200 {
201         if (oc == NULL)
202                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
203         else
204                 /* it is already calculated as sizeof struct obd_capa */
205                 ;
206 }
207
208 static int osc_getattr_interpret(const struct lu_env *env,
209                                  struct ptlrpc_request *req,
210                                  struct osc_async_args *aa, int rc)
211 {
212         struct ost_body *body;
213         ENTRY;
214
215         if (rc != 0)
216                 GOTO(out, rc);
217
218         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
219         if (body) {
220                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
221                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
222
223                 /* This should really be sent by the OST */
224                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
225                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
226         } else {
227                 CDEBUG(D_INFO, "can't unpack ost_body\n");
228                 rc = -EPROTO;
229                 aa->aa_oi->oi_oa->o_valid = 0;
230         }
231 out:
232         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
233         RETURN(rc);
234 }
235
236 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
237                              struct ptlrpc_request_set *set)
238 {
239         struct ptlrpc_request *req;
240         struct osc_async_args *aa;
241         int                    rc;
242         ENTRY;
243
244         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
245         if (req == NULL)
246                 RETURN(-ENOMEM);
247
248         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
249         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
250         if (rc) {
251                 ptlrpc_request_free(req);
252                 RETURN(rc);
253         }
254
255         osc_pack_req_body(req, oinfo);
256
257         ptlrpc_request_set_replen(req);
258         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
259
260         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
261         aa = ptlrpc_req_async_args(req);
262         aa->aa_oi = oinfo;
263
264         ptlrpc_set_add_req(set, req);
265         RETURN(0);
266 }
267
268 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
269 {
270         struct ptlrpc_request *req;
271         struct ost_body       *body;
272         int                    rc;
273         ENTRY;
274
275         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
276         if (req == NULL)
277                 RETURN(-ENOMEM);
278
279         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
280         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
281         if (rc) {
282                 ptlrpc_request_free(req);
283                 RETURN(rc);
284         }
285
286         osc_pack_req_body(req, oinfo);
287
288         ptlrpc_request_set_replen(req);
289
290         rc = ptlrpc_queue_wait(req);
291         if (rc)
292                 GOTO(out, rc);
293
294         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
295         if (body == NULL)
296                 GOTO(out, rc = -EPROTO);
297
298         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
299         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
300
301         /* This should really be sent by the OST */
302         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
303         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
304
305         EXIT;
306  out:
307         ptlrpc_req_finished(req);
308         return rc;
309 }
310
311 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
312                        struct obd_trans_info *oti)
313 {
314         struct ptlrpc_request *req;
315         struct ost_body       *body;
316         int                    rc;
317         ENTRY;
318
319         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         osc_pack_req_body(req, oinfo);
333
334         ptlrpc_request_set_replen(req);
335
336         rc = ptlrpc_queue_wait(req);
337         if (rc)
338                 GOTO(out, rc);
339
340         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341         if (body == NULL)
342                 GOTO(out, rc = -EPROTO);
343
344         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
345
346         EXIT;
347 out:
348         ptlrpc_req_finished(req);
349         RETURN(rc);
350 }
351
352 static int osc_setattr_interpret(const struct lu_env *env,
353                                  struct ptlrpc_request *req,
354                                  struct osc_setattr_args *sa, int rc)
355 {
356         struct ost_body *body;
357         ENTRY;
358
359         if (rc != 0)
360                 GOTO(out, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out, rc = -EPROTO);
365
366         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
367 out:
368         rc = sa->sa_upcall(sa->sa_cookie, rc);
369         RETURN(rc);
370 }
371
372 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
373                            struct obd_trans_info *oti,
374                            obd_enqueue_update_f upcall, void *cookie,
375                            struct ptlrpc_request_set *rqset)
376 {
377         struct ptlrpc_request   *req;
378         struct osc_setattr_args *sa;
379         int                      rc;
380         ENTRY;
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
383         if (req == NULL)
384                 RETURN(-ENOMEM);
385
386         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
387         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
388         if (rc) {
389                 ptlrpc_request_free(req);
390                 RETURN(rc);
391         }
392
393         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
394                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
395
396         osc_pack_req_body(req, oinfo);
397
398         ptlrpc_request_set_replen(req);
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
409                 sa = ptlrpc_req_async_args(req);
410                 sa->sa_oa = oinfo->oi_oa;
411                 sa->sa_upcall = upcall;
412                 sa->sa_cookie = cookie;
413
414                 if (rqset == PTLRPCD_SET)
415                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
416                 else
417                         ptlrpc_set_add_req(rqset, req);
418         }
419
420         RETURN(0);
421 }
422
423 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
424                              struct obd_trans_info *oti,
425                              struct ptlrpc_request_set *rqset)
426 {
427         return osc_setattr_async_base(exp, oinfo, oti,
428                                       oinfo->oi_cb_up, oinfo, rqset);
429 }
430
431 int osc_real_create(struct obd_export *exp, struct obdo *oa,
432                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
433 {
434         struct ptlrpc_request *req;
435         struct ost_body       *body;
436         struct lov_stripe_md  *lsm;
437         int                    rc;
438         ENTRY;
439
440         LASSERT(oa);
441         LASSERT(ea);
442
443         lsm = *ea;
444         if (!lsm) {
445                 rc = obd_alloc_memmd(exp, &lsm);
446                 if (rc < 0)
447                         RETURN(rc);
448         }
449
450         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
451         if (req == NULL)
452                 GOTO(out, rc = -ENOMEM);
453
454         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
455         if (rc) {
456                 ptlrpc_request_free(req);
457                 GOTO(out, rc);
458         }
459
460         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
461         LASSERT(body);
462         lustre_set_wire_obdo(&body->oa, oa);
463
464         ptlrpc_request_set_replen(req);
465
466         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467             oa->o_flags == OBD_FL_DELORPHAN) {
468                 DEBUG_REQ(D_HA, req,
469                           "delorphan from OST integration");
470                 /* Don't resend the delorphan req */
471                 req->rq_no_resend = req->rq_no_delay = 1;
472         }
473
474         rc = ptlrpc_queue_wait(req);
475         if (rc)
476                 GOTO(out_req, rc);
477
478         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479         if (body == NULL)
480                 GOTO(out_req, rc = -EPROTO);
481
482         lustre_get_wire_obdo(oa, &body->oa);
483
484         /* This should really be sent by the OST */
485         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
486         oa->o_valid |= OBD_MD_FLBLKSZ;
487
488         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489          * have valid lsm_oinfo data structs, so don't go touching that.
490          * This needs to be fixed in a big way.
491          */
492         lsm->lsm_object_id = oa->o_id;
493         lsm->lsm_object_seq = oa->o_seq;
494         *ea = lsm;
495
496         if (oti != NULL) {
497                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
498
499                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
500                         if (!oti->oti_logcookies)
501                                 oti_alloc_cookies(oti, 1);
502                         *oti->oti_logcookies = oa->o_lcookie;
503                 }
504         }
505
506         CDEBUG(D_HA, "transno: "LPD64"\n",
507                lustre_msg_get_transno(req->rq_repmsg));
508 out_req:
509         ptlrpc_req_finished(req);
510 out:
511         if (rc && !*ea)
512                 obd_free_memmd(exp, &lsm);
513         RETURN(rc);
514 }
515
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517                    obd_enqueue_update_f upcall, void *cookie,
518                    struct ptlrpc_request_set *rqset)
519 {
520         struct ptlrpc_request   *req;
521         struct osc_setattr_args *sa;
522         struct ost_body         *body;
523         int                      rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         ptlrpc_at_set_req_timeout(req);
538
539         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540         LASSERT(body);
541         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546
547         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
548         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
549         sa = ptlrpc_req_async_args(req);
550         sa->sa_oa     = oinfo->oi_oa;
551         sa->sa_upcall = upcall;
552         sa->sa_cookie = cookie;
553         if (rqset == PTLRPCD_SET)
554                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555         else
556                 ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
562                      struct obd_trans_info *oti,
563                      struct ptlrpc_request_set *rqset)
564 {
565         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
566         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
567         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
568         return osc_punch_base(exp, oinfo,
569                               oinfo->oi_cb_up, oinfo, rqset);
570 }
571
572 static int osc_sync_interpret(const struct lu_env *env,
573                               struct ptlrpc_request *req,
574                               void *arg, int rc)
575 {
576         struct osc_async_args *aa = arg;
577         struct ost_body *body;
578         ENTRY;
579
580         if (rc)
581                 GOTO(out, rc);
582
583         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
584         if (body == NULL) {
585                 CERROR ("can't unpack ost_body\n");
586                 GOTO(out, rc = -EPROTO);
587         }
588
589         *aa->aa_oi->oi_oa = body->oa;
590 out:
591         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
592         RETURN(rc);
593 }
594
595 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
596                     obd_size start, obd_size end,
597                     struct ptlrpc_request_set *set)
598 {
599         struct ptlrpc_request *req;
600         struct ost_body       *body;
601         struct osc_async_args *aa;
602         int                    rc;
603         ENTRY;
604
605         if (!oinfo->oi_oa) {
606                 CDEBUG(D_INFO, "oa NULL\n");
607                 RETURN(-EINVAL);
608         }
609
610         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
611         if (req == NULL)
612                 RETURN(-ENOMEM);
613
614         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
615         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
616         if (rc) {
617                 ptlrpc_request_free(req);
618                 RETURN(rc);
619         }
620
621         /* overload the size and blocks fields in the oa with start/end */
622         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
623         LASSERT(body);
624         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
625         body->oa.o_size = start;
626         body->oa.o_blocks = end;
627         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
628         osc_pack_capa(req, body, oinfo->oi_capa);
629
630         ptlrpc_request_set_replen(req);
631         req->rq_interpret_reply = osc_sync_interpret;
632
633         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
634         aa = ptlrpc_req_async_args(req);
635         aa->aa_oi = oinfo;
636
637         ptlrpc_set_add_req(set, req);
638         RETURN (0);
639 }
640
641 /* Find and cancel locally locks matched by @mode in the resource found by
642  * @objid. Found locks are added into @cancel list. Returns the amount of
643  * locks added to @cancels list. */
644 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
645                                    cfs_list_t *cancels,
646                                    ldlm_mode_t mode, int lock_flags)
647 {
648         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
649         struct ldlm_res_id res_id;
650         struct ldlm_resource *res;
651         int count;
652         ENTRY;
653
654         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
655         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
656         if (res == NULL)
657                 RETURN(0);
658
659         LDLM_RESOURCE_ADDREF(res);
660         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
661                                            lock_flags, 0, NULL);
662         LDLM_RESOURCE_DELREF(res);
663         ldlm_resource_putref(res);
664         RETURN(count);
665 }
666
667 static int osc_destroy_interpret(const struct lu_env *env,
668                                  struct ptlrpc_request *req, void *data,
669                                  int rc)
670 {
671         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672
673         cfs_atomic_dec(&cli->cl_destroy_in_flight);
674         cfs_waitq_signal(&cli->cl_destroy_waitq);
675         return 0;
676 }
677
678 static int osc_can_send_destroy(struct client_obd *cli)
679 {
680         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
681             cli->cl_max_rpcs_in_flight) {
682                 /* The destroy request can be sent */
683                 return 1;
684         }
685         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
686             cli->cl_max_rpcs_in_flight) {
687                 /*
688                  * The counter has been modified between the two atomic
689                  * operations.
690                  */
691                 cfs_waitq_signal(&cli->cl_destroy_waitq);
692         }
693         return 0;
694 }
695
696 /* Destroy requests can be async always on the client, and we don't even really
697  * care about the return code since the client cannot do anything at all about
698  * a destroy failure.
699  * When the MDS is unlinking a filename, it saves the file objects into a
700  * recovery llog, and these object records are cancelled when the OST reports
701  * they were destroyed and sync'd to disk (i.e. transaction committed).
702  * If the client dies, or the OST is down when the object should be destroyed,
703  * the records are not cancelled, and when the OST reconnects to the MDS next,
704  * it will retrieve the llog unlink logs and then sends the log cancellation
705  * cookies to the MDS after committing destroy transactions. */
706 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
707                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
708                        struct obd_export *md_export, void *capa)
709 {
710         struct client_obd     *cli = &exp->exp_obd->u.cli;
711         struct ptlrpc_request *req;
712         struct ost_body       *body;
713         CFS_LIST_HEAD(cancels);
714         int rc, count;
715         ENTRY;
716
717         if (!oa) {
718                 CDEBUG(D_INFO, "oa NULL\n");
719                 RETURN(-EINVAL);
720         }
721
722         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
723                                         LDLM_FL_DISCARD_DATA);
724
725         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
726         if (req == NULL) {
727                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
728                 RETURN(-ENOMEM);
729         }
730
731         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
732         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
733                                0, &cancels, count);
734         if (rc) {
735                 ptlrpc_request_free(req);
736                 RETURN(rc);
737         }
738
739         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
740         ptlrpc_at_set_req_timeout(req);
741
742         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
743                 oa->o_lcookie = *oti->oti_logcookies;
744         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
745         LASSERT(body);
746         lustre_set_wire_obdo(&body->oa, oa);
747
748         osc_pack_capa(req, body, (struct obd_capa *)capa);
749         ptlrpc_request_set_replen(req);
750
751         /* don't throttle destroy RPCs for the MDT */
752         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
753                 req->rq_interpret_reply = osc_destroy_interpret;
754                 if (!osc_can_send_destroy(cli)) {
755                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
756                                                           NULL);
757
758                         /*
759                          * Wait until the number of on-going destroy RPCs drops
760                          * under max_rpc_in_flight
761                          */
762                         l_wait_event_exclusive(cli->cl_destroy_waitq,
763                                                osc_can_send_destroy(cli), &lwi);
764                 }
765         }
766
767         /* Do not wait for response */
768         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
769         RETURN(0);
770 }
771
772 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
773                                 long writing_bytes)
774 {
775         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
776
777         LASSERT(!(oa->o_valid & bits));
778
779         oa->o_valid |= bits;
780         client_obd_list_lock(&cli->cl_loi_list_lock);
781         oa->o_dirty = cli->cl_dirty;
782         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
783                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
784                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
785                 oa->o_undirty = 0;
786         } else if (cfs_atomic_read(&obd_dirty_pages) -
787                    cfs_atomic_read(&obd_dirty_transit_pages) >
788                    obd_max_dirty_pages + 1){
789                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
790                  * not covered by a lock thus they may safely race and trip
791                  * this CERROR() unless we add in a small fudge factor (+1). */
792                 CERROR("dirty %d - %d > system dirty_max %d\n",
793                        cfs_atomic_read(&obd_dirty_pages),
794                        cfs_atomic_read(&obd_dirty_transit_pages),
795                        obd_max_dirty_pages);
796                 oa->o_undirty = 0;
797         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
798                 CERROR("dirty %lu - dirty_max %lu too big???\n",
799                        cli->cl_dirty, cli->cl_dirty_max);
800                 oa->o_undirty = 0;
801         } else {
802                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
803                                 (cli->cl_max_rpcs_in_flight + 1);
804                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
805         }
806         oa->o_grant = cli->cl_avail_grant;
807         oa->o_dropped = cli->cl_lost_grant;
808         cli->cl_lost_grant = 0;
809         client_obd_list_unlock(&cli->cl_loi_list_lock);
810         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
811                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
812
813 }
814
815 static void osc_update_next_shrink(struct client_obd *cli)
816 {
817         cli->cl_next_shrink_grant =
818                 cfs_time_shift(cli->cl_grant_shrink_interval);
819         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
820                cli->cl_next_shrink_grant);
821 }
822
823 /* caller must hold loi_list_lock */
824 static void osc_consume_write_grant(struct client_obd *cli,
825                                     struct brw_page *pga)
826 {
827         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
828         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
829         cfs_atomic_inc(&obd_dirty_pages);
830         cli->cl_dirty += CFS_PAGE_SIZE;
831         cli->cl_avail_grant -= CFS_PAGE_SIZE;
832         pga->flag |= OBD_BRW_FROM_GRANT;
833         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
834                CFS_PAGE_SIZE, pga, pga->pg);
835         LASSERT(cli->cl_avail_grant >= 0);
836         osc_update_next_shrink(cli);
837 }
838
839 /* the companion to osc_consume_write_grant, called when a brw has completed.
840  * must be called with the loi lock held. */
841 static void osc_release_write_grant(struct client_obd *cli,
842                                     struct brw_page *pga, int sent)
843 {
844         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
845         ENTRY;
846
847         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
848         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
849                 EXIT;
850                 return;
851         }
852
853         pga->flag &= ~OBD_BRW_FROM_GRANT;
854         cfs_atomic_dec(&obd_dirty_pages);
855         cli->cl_dirty -= CFS_PAGE_SIZE;
856         if (pga->flag & OBD_BRW_NOCACHE) {
857                 pga->flag &= ~OBD_BRW_NOCACHE;
858                 cfs_atomic_dec(&obd_dirty_transit_pages);
859                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
860         }
861         if (!sent) {
862                 cli->cl_lost_grant += CFS_PAGE_SIZE;
863                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
864                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
865         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
866                 /* For short writes we shouldn't count parts of pages that
867                  * span a whole block on the OST side, or our accounting goes
868                  * wrong.  Should match the code in filter_grant_check. */
869                 int offset = pga->off & ~CFS_PAGE_MASK;
870                 int count = pga->count + (offset & (blocksize - 1));
871                 int end = (offset + pga->count) & (blocksize - 1);
872                 if (end)
873                         count += blocksize - end;
874
875                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
876                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
877                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
878                        cli->cl_avail_grant, cli->cl_dirty);
879         }
880
881         EXIT;
882 }
883
884 static unsigned long rpcs_in_flight(struct client_obd *cli)
885 {
886         return cli->cl_r_in_flight + cli->cl_w_in_flight;
887 }
888
889 /* caller must hold loi_list_lock */
890 void osc_wake_cache_waiters(struct client_obd *cli)
891 {
892         cfs_list_t *l, *tmp;
893         struct osc_cache_waiter *ocw;
894
895         ENTRY;
896         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
897                 /* if we can't dirty more, we must wait until some is written */
898                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
899                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
900                     obd_max_dirty_pages)) {
901                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
902                                "osc max %ld, sys max %d\n", cli->cl_dirty,
903                                cli->cl_dirty_max, obd_max_dirty_pages);
904                         return;
905                 }
906
907                 /* if still dirty cache but no grant wait for pending RPCs that
908                  * may yet return us some grant before doing sync writes */
909                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
910                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
911                                cli->cl_w_in_flight);
912                         return;
913                 }
914
915                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
916                 cfs_list_del_init(&ocw->ocw_entry);
917                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
918                         /* no more RPCs in flight to return grant, do sync IO */
919                         ocw->ocw_rc = -EDQUOT;
920                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
921                 } else {
922                         osc_consume_write_grant(cli,
923                                                 &ocw->ocw_oap->oap_brw_page);
924                 }
925
926                 cfs_waitq_signal(&ocw->ocw_waitq);
927         }
928
929         EXIT;
930 }
931
932 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
933 {
934         client_obd_list_lock(&cli->cl_loi_list_lock);
935         cli->cl_avail_grant += grant;
936         client_obd_list_unlock(&cli->cl_loi_list_lock);
937 }
938
939 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
940 {
941         if (body->oa.o_valid & OBD_MD_FLGRANT) {
942                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
943                 __osc_update_grant(cli, body->oa.o_grant);
944         }
945 }
946
947 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
948                               void *key, obd_count vallen, void *val,
949                               struct ptlrpc_request_set *set);
950
951 static int osc_shrink_grant_interpret(const struct lu_env *env,
952                                       struct ptlrpc_request *req,
953                                       void *aa, int rc)
954 {
955         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
956         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
957         struct ost_body *body;
958
959         if (rc != 0) {
960                 __osc_update_grant(cli, oa->o_grant);
961                 GOTO(out, rc);
962         }
963
964         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
965         LASSERT(body);
966         osc_update_grant(cli, body);
967 out:
968         OBDO_FREE(oa);
969         return rc;
970 }
971
972 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
973 {
974         client_obd_list_lock(&cli->cl_loi_list_lock);
975         oa->o_grant = cli->cl_avail_grant / 4;
976         cli->cl_avail_grant -= oa->o_grant;
977         client_obd_list_unlock(&cli->cl_loi_list_lock);
978         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
979                 oa->o_valid |= OBD_MD_FLFLAGS;
980                 oa->o_flags = 0;
981         }
982         oa->o_flags |= OBD_FL_SHRINK_GRANT;
983         osc_update_next_shrink(cli);
984 }
985
986 /* Shrink the current grant, either from some large amount to enough for a
987  * full set of in-flight RPCs, or if we have already shrunk to that limit
988  * then to enough for a single RPC.  This avoids keeping more grant than
989  * needed, and avoids shrinking the grant piecemeal. */
990 static int osc_shrink_grant(struct client_obd *cli)
991 {
992         long target = (cli->cl_max_rpcs_in_flight + 1) *
993                       cli->cl_max_pages_per_rpc;
994
995         client_obd_list_lock(&cli->cl_loi_list_lock);
996         if (cli->cl_avail_grant <= target)
997                 target = cli->cl_max_pages_per_rpc;
998         client_obd_list_unlock(&cli->cl_loi_list_lock);
999
1000         return osc_shrink_grant_to_target(cli, target);
1001 }
1002
1003 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1004 {
1005         int    rc = 0;
1006         struct ost_body     *body;
1007         ENTRY;
1008
1009         client_obd_list_lock(&cli->cl_loi_list_lock);
1010         /* Don't shrink if we are already above or below the desired limit
1011          * We don't want to shrink below a single RPC, as that will negatively
1012          * impact block allocation and long-term performance. */
1013         if (target < cli->cl_max_pages_per_rpc)
1014                 target = cli->cl_max_pages_per_rpc;
1015
1016         if (target >= cli->cl_avail_grant) {
1017                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1018                 RETURN(0);
1019         }
1020         client_obd_list_unlock(&cli->cl_loi_list_lock);
1021
1022         OBD_ALLOC_PTR(body);
1023         if (!body)
1024                 RETURN(-ENOMEM);
1025
1026         osc_announce_cached(cli, &body->oa, 0);
1027
1028         client_obd_list_lock(&cli->cl_loi_list_lock);
1029         body->oa.o_grant = cli->cl_avail_grant - target;
1030         cli->cl_avail_grant = target;
1031         client_obd_list_unlock(&cli->cl_loi_list_lock);
1032         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1033                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1034                 body->oa.o_flags = 0;
1035         }
1036         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1037         osc_update_next_shrink(cli);
1038
1039         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1040                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1041                                 sizeof(*body), body, NULL);
1042         if (rc != 0)
1043                 __osc_update_grant(cli, body->oa.o_grant);
1044         OBD_FREE_PTR(body);
1045         RETURN(rc);
1046 }
1047
1048 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1049 static int osc_should_shrink_grant(struct client_obd *client)
1050 {
1051         cfs_time_t time = cfs_time_current();
1052         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1053
1054         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1055              OBD_CONNECT_GRANT_SHRINK) == 0)
1056                 return 0;
1057
1058         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1059                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1060                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1061                         return 1;
1062                 else
1063                         osc_update_next_shrink(client);
1064         }
1065         return 0;
1066 }
1067
1068 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1069 {
1070         struct client_obd *client;
1071
1072         cfs_list_for_each_entry(client, &item->ti_obd_list,
1073                                 cl_grant_shrink_list) {
1074                 if (osc_should_shrink_grant(client))
1075                         osc_shrink_grant(client);
1076         }
1077         return 0;
1078 }
1079
1080 static int osc_add_shrink_grant(struct client_obd *client)
1081 {
1082         int rc;
1083
1084         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1085                                        TIMEOUT_GRANT,
1086                                        osc_grant_shrink_grant_cb, NULL,
1087                                        &client->cl_grant_shrink_list);
1088         if (rc) {
1089                 CERROR("add grant client %s error %d\n",
1090                         client->cl_import->imp_obd->obd_name, rc);
1091                 return rc;
1092         }
1093         CDEBUG(D_CACHE, "add grant client %s \n",
1094                client->cl_import->imp_obd->obd_name);
1095         osc_update_next_shrink(client);
1096         return 0;
1097 }
1098
1099 static int osc_del_shrink_grant(struct client_obd *client)
1100 {
1101         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1102                                          TIMEOUT_GRANT);
1103 }
1104
1105 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1106 {
1107         /*
1108          * ocd_grant is the total grant amount we're expect to hold: if we've
1109          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1110          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1111          *
1112          * race is tolerable here: if we're evicted, but imp_state already
1113          * left EVICTED state, then cl_dirty must be 0 already.
1114          */
1115         client_obd_list_lock(&cli->cl_loi_list_lock);
1116         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1117                 cli->cl_avail_grant = ocd->ocd_grant;
1118         else
1119                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1120
1121         if (cli->cl_avail_grant < 0) {
1122                 CWARN("%s: available grant < 0, the OSS is probably not running"
1123                       " with patch from bug20278 (%ld) \n",
1124                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1125                 /* workaround for 1.6 servers which do not have
1126                  * the patch from bug20278 */
1127                 cli->cl_avail_grant = ocd->ocd_grant;
1128         }
1129
1130         client_obd_list_unlock(&cli->cl_loi_list_lock);
1131
1132         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1133                cli->cl_import->imp_obd->obd_name,
1134                cli->cl_avail_grant, cli->cl_lost_grant);
1135
1136         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1137             cfs_list_empty(&cli->cl_grant_shrink_list))
1138                 osc_add_shrink_grant(cli);
1139 }
1140
1141 /* We assume that the reason this OSC got a short read is because it read
1142  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1143  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1144  * this stripe never got written at or beyond this stripe offset yet. */
1145 static void handle_short_read(int nob_read, obd_count page_count,
1146                               struct brw_page **pga)
1147 {
1148         char *ptr;
1149         int i = 0;
1150
1151         /* skip bytes read OK */
1152         while (nob_read > 0) {
1153                 LASSERT (page_count > 0);
1154
1155                 if (pga[i]->count > nob_read) {
1156                         /* EOF inside this page */
1157                         ptr = cfs_kmap(pga[i]->pg) +
1158                                 (pga[i]->off & ~CFS_PAGE_MASK);
1159                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1160                         cfs_kunmap(pga[i]->pg);
1161                         page_count--;
1162                         i++;
1163                         break;
1164                 }
1165
1166                 nob_read -= pga[i]->count;
1167                 page_count--;
1168                 i++;
1169         }
1170
1171         /* zero remaining pages */
1172         while (page_count-- > 0) {
1173                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1174                 memset(ptr, 0, pga[i]->count);
1175                 cfs_kunmap(pga[i]->pg);
1176                 i++;
1177         }
1178 }
1179
1180 static int check_write_rcs(struct ptlrpc_request *req,
1181                            int requested_nob, int niocount,
1182                            obd_count page_count, struct brw_page **pga)
1183 {
1184         int     i;
1185         __u32   *remote_rcs;
1186
1187         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1188                                                   sizeof(*remote_rcs) *
1189                                                   niocount);
1190         if (remote_rcs == NULL) {
1191                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1192                 return(-EPROTO);
1193         }
1194
1195         /* return error if any niobuf was in error */
1196         for (i = 0; i < niocount; i++) {
1197                 if ((int)remote_rcs[i] < 0)
1198                         return(remote_rcs[i]);
1199
1200                 if (remote_rcs[i] != 0) {
1201                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1202                                 i, remote_rcs[i], req);
1203                         return(-EPROTO);
1204                 }
1205         }
1206
1207         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1208                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1209                        req->rq_bulk->bd_nob_transferred, requested_nob);
1210                 return(-EPROTO);
1211         }
1212
1213         return (0);
1214 }
1215
1216 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1217 {
1218         if (p1->flag != p2->flag) {
1219                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1220                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1221
1222                 /* warn if we try to combine flags that we don't know to be
1223                  * safe to combine */
1224                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1225                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1226                               "report this at http://bugs.whamcloud.com/\n",
1227                               p1->flag, p2->flag);
1228                 }
1229                 return 0;
1230         }
1231
1232         return (p1->off + p1->count == p2->off);
1233 }
1234
1235 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1236                                    struct brw_page **pga, int opc,
1237                                    cksum_type_t cksum_type)
1238 {
1239         __u32 cksum;
1240         int i = 0;
1241
1242         LASSERT (pg_count > 0);
1243         cksum = init_checksum(cksum_type);
1244         while (nob > 0 && pg_count > 0) {
1245                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1246                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1247                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1248
1249                 /* corrupt the data before we compute the checksum, to
1250                  * simulate an OST->client data error */
1251                 if (i == 0 && opc == OST_READ &&
1252                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1253                         memcpy(ptr + off, "bad1", min(4, nob));
1254                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1255                 cfs_kunmap(pga[i]->pg);
1256                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1257                                off, cksum);
1258
1259                 nob -= pga[i]->count;
1260                 pg_count--;
1261                 i++;
1262         }
1263         /* For sending we only compute the wrong checksum instead
1264          * of corrupting the data so it is still correct on a redo */
1265         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1266                 cksum++;
1267
1268         return cksum;
1269 }
1270
1271 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1272                                 struct lov_stripe_md *lsm, obd_count page_count,
1273                                 struct brw_page **pga,
1274                                 struct ptlrpc_request **reqp,
1275                                 struct obd_capa *ocapa, int reserve,
1276                                 int resend)
1277 {
1278         struct ptlrpc_request   *req;
1279         struct ptlrpc_bulk_desc *desc;
1280         struct ost_body         *body;
1281         struct obd_ioobj        *ioobj;
1282         struct niobuf_remote    *niobuf;
1283         int niocount, i, requested_nob, opc, rc;
1284         struct osc_brw_async_args *aa;
1285         struct req_capsule      *pill;
1286         struct brw_page *pg_prev;
1287
1288         ENTRY;
1289         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1290                 RETURN(-ENOMEM); /* Recoverable */
1291         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1292                 RETURN(-EINVAL); /* Fatal */
1293
1294         if ((cmd & OBD_BRW_WRITE) != 0) {
1295                 opc = OST_WRITE;
1296                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1297                                                 cli->cl_import->imp_rq_pool,
1298                                                 &RQF_OST_BRW_WRITE);
1299         } else {
1300                 opc = OST_READ;
1301                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1302         }
1303         if (req == NULL)
1304                 RETURN(-ENOMEM);
1305
1306         for (niocount = i = 1; i < page_count; i++) {
1307                 if (!can_merge_pages(pga[i - 1], pga[i]))
1308                         niocount++;
1309         }
1310
1311         pill = &req->rq_pill;
1312         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1313                              sizeof(*ioobj));
1314         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1315                              niocount * sizeof(*niobuf));
1316         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1317
1318         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1319         if (rc) {
1320                 ptlrpc_request_free(req);
1321                 RETURN(rc);
1322         }
1323         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1324         ptlrpc_at_set_req_timeout(req);
1325
1326         if (opc == OST_WRITE)
1327                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1328                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1329         else
1330                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1331                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1332
1333         if (desc == NULL)
1334                 GOTO(out, rc = -ENOMEM);
1335         /* NB request now owns desc and will free it when it gets freed */
1336
1337         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1338         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1339         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1340         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1341
1342         lustre_set_wire_obdo(&body->oa, oa);
1343
1344         obdo_to_ioobj(oa, ioobj);
1345         ioobj->ioo_bufcnt = niocount;
1346         osc_pack_capa(req, body, ocapa);
1347         LASSERT (page_count > 0);
1348         pg_prev = pga[0];
1349         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1350                 struct brw_page *pg = pga[i];
1351                 int poff = pg->off & ~CFS_PAGE_MASK;
1352
1353                 LASSERT(pg->count > 0);
1354                 /* make sure there is no gap in the middle of page array */
1355                 LASSERTF(page_count == 1 ||
1356                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1357                           ergo(i > 0 && i < page_count - 1,
1358                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1359                           ergo(i == page_count - 1, poff == 0)),
1360                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1361                          i, page_count, pg, pg->off, pg->count);
1362 #ifdef __linux__
1363                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1364                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1365                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1366                          i, page_count,
1367                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1368                          pg_prev->pg, page_private(pg_prev->pg),
1369                          pg_prev->pg->index, pg_prev->off);
1370 #else
1371                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1372                          "i %d p_c %u\n", i, page_count);
1373 #endif
1374                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1375                         (pg->flag & OBD_BRW_SRVLOCK));
1376
1377                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1378                 requested_nob += pg->count;
1379
1380                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1381                         niobuf--;
1382                         niobuf->len += pg->count;
1383                 } else {
1384                         niobuf->offset = pg->off;
1385                         niobuf->len    = pg->count;
1386                         niobuf->flags  = pg->flag;
1387                 }
1388                 pg_prev = pg;
1389         }
1390
1391         LASSERTF((void *)(niobuf - niocount) ==
1392                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1393                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1394                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1395
1396         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1397         if (resend) {
1398                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1399                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1400                         body->oa.o_flags = 0;
1401                 }
1402                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1403         }
1404
1405         if (osc_should_shrink_grant(cli))
1406                 osc_shrink_grant_local(cli, &body->oa);
1407
1408         /* size[REQ_REC_OFF] still sizeof (*body) */
1409         if (opc == OST_WRITE) {
1410                 if (unlikely(cli->cl_checksum) &&
1411                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1412                         /* store cl_cksum_type in a local variable since
1413                          * it can be changed via lprocfs */
1414                         cksum_type_t cksum_type = cli->cl_cksum_type;
1415
1416                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1417                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1418                                 body->oa.o_flags = 0;
1419                         }
1420                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1421                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1422                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1423                                                              page_count, pga,
1424                                                              OST_WRITE,
1425                                                              cksum_type);
1426                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1427                                body->oa.o_cksum);
1428                         /* save this in 'oa', too, for later checking */
1429                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1430                         oa->o_flags |= cksum_type_pack(cksum_type);
1431                 } else {
1432                         /* clear out the checksum flag, in case this is a
1433                          * resend but cl_checksum is no longer set. b=11238 */
1434                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1435                 }
1436                 oa->o_cksum = body->oa.o_cksum;
1437                 /* 1 RC per niobuf */
1438                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1439                                      sizeof(__u32) * niocount);
1440         } else {
1441                 if (unlikely(cli->cl_checksum) &&
1442                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1443                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1444                                 body->oa.o_flags = 0;
1445                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1446                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1447                 }
1448         }
1449         ptlrpc_request_set_replen(req);
1450
1451         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1452         aa = ptlrpc_req_async_args(req);
1453         aa->aa_oa = oa;
1454         aa->aa_requested_nob = requested_nob;
1455         aa->aa_nio_count = niocount;
1456         aa->aa_page_count = page_count;
1457         aa->aa_resends = 0;
1458         aa->aa_ppga = pga;
1459         aa->aa_cli = cli;
1460         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1461         if (ocapa && reserve)
1462                 aa->aa_ocapa = capa_get(ocapa);
1463
1464         *reqp = req;
1465         RETURN(0);
1466
1467  out:
1468         ptlrpc_req_finished(req);
1469         RETURN(rc);
1470 }
1471
1472 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1473                                 __u32 client_cksum, __u32 server_cksum, int nob,
1474                                 obd_count page_count, struct brw_page **pga,
1475                                 cksum_type_t client_cksum_type)
1476 {
1477         __u32 new_cksum;
1478         char *msg;
1479         cksum_type_t cksum_type;
1480
1481         if (server_cksum == client_cksum) {
1482                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1483                 return 0;
1484         }
1485
1486         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1487                                        oa->o_flags : 0);
1488         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1489                                       cksum_type);
1490
1491         if (cksum_type != client_cksum_type)
1492                 msg = "the server did not use the checksum type specified in "
1493                       "the original request - likely a protocol problem";
1494         else if (new_cksum == server_cksum)
1495                 msg = "changed on the client after we checksummed it - "
1496                       "likely false positive due to mmap IO (bug 11742)";
1497         else if (new_cksum == client_cksum)
1498                 msg = "changed in transit before arrival at OST";
1499         else
1500                 msg = "changed in transit AND doesn't match the original - "
1501                       "likely false positive due to mmap IO (bug 11742)";
1502
1503         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1504                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1505                            msg, libcfs_nid2str(peer->nid),
1506                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1507                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1508                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1509                            oa->o_id,
1510                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1511                            pga[0]->off,
1512                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1513         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1514                "client csum now %x\n", client_cksum, client_cksum_type,
1515                server_cksum, cksum_type, new_cksum);
1516         return 1;
1517 }
1518
1519 /* Note rc enters this function as number of bytes transferred */
1520 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1521 {
1522         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1523         const lnet_process_id_t *peer =
1524                         &req->rq_import->imp_connection->c_peer;
1525         struct client_obd *cli = aa->aa_cli;
1526         struct ost_body *body;
1527         __u32 client_cksum = 0;
1528         ENTRY;
1529
1530         if (rc < 0 && rc != -EDQUOT) {
1531                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1532                 RETURN(rc);
1533         }
1534
1535         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1536         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1537         if (body == NULL) {
1538                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1539                 RETURN(-EPROTO);
1540         }
1541
1542         /* set/clear over quota flag for a uid/gid */
1543         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1544             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1545                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1546
1547                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1548                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1549                        body->oa.o_flags);
1550                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1551         }
1552
1553         osc_update_grant(cli, body);
1554
1555         if (rc < 0)
1556                 RETURN(rc);
1557
1558         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1559                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1560
1561         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1562                 if (rc > 0) {
1563                         CERROR("Unexpected +ve rc %d\n", rc);
1564                         RETURN(-EPROTO);
1565                 }
1566                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1567
1568                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1569                         RETURN(-EAGAIN);
1570
1571                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1572                     check_write_checksum(&body->oa, peer, client_cksum,
1573                                          body->oa.o_cksum, aa->aa_requested_nob,
1574                                          aa->aa_page_count, aa->aa_ppga,
1575                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1576                         RETURN(-EAGAIN);
1577
1578                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1579                                      aa->aa_page_count, aa->aa_ppga);
1580                 GOTO(out, rc);
1581         }
1582
1583         /* The rest of this function executes only for OST_READs */
1584
1585         /* if unwrap_bulk failed, return -EAGAIN to retry */
1586         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1587         if (rc < 0)
1588                 GOTO(out, rc = -EAGAIN);
1589
1590         if (rc > aa->aa_requested_nob) {
1591                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1592                        aa->aa_requested_nob);
1593                 RETURN(-EPROTO);
1594         }
1595
1596         if (rc != req->rq_bulk->bd_nob_transferred) {
1597                 CERROR ("Unexpected rc %d (%d transferred)\n",
1598                         rc, req->rq_bulk->bd_nob_transferred);
1599                 return (-EPROTO);
1600         }
1601
1602         if (rc < aa->aa_requested_nob)
1603                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1604
1605         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1606                 static int cksum_counter;
1607                 __u32      server_cksum = body->oa.o_cksum;
1608                 char      *via;
1609                 char      *router;
1610                 cksum_type_t cksum_type;
1611
1612                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1613                                                body->oa.o_flags : 0);
1614                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1615                                                  aa->aa_ppga, OST_READ,
1616                                                  cksum_type);
1617
1618                 if (peer->nid == req->rq_bulk->bd_sender) {
1619                         via = router = "";
1620                 } else {
1621                         via = " via ";
1622                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1623                 }
1624
1625                 if (server_cksum == ~0 && rc > 0) {
1626                         CERROR("Protocol error: server %s set the 'checksum' "
1627                                "bit, but didn't send a checksum.  Not fatal, "
1628                                "but please notify on http://bugs.whamcloud.com/\n",
1629                                libcfs_nid2str(peer->nid));
1630                 } else if (server_cksum != client_cksum) {
1631                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1632                                            "%s%s%s inode "DFID" object "
1633                                            LPU64"/"LPU64" extent "
1634                                            "["LPU64"-"LPU64"]\n",
1635                                            req->rq_import->imp_obd->obd_name,
1636                                            libcfs_nid2str(peer->nid),
1637                                            via, router,
1638                                            body->oa.o_valid & OBD_MD_FLFID ?
1639                                                 body->oa.o_parent_seq : (__u64)0,
1640                                            body->oa.o_valid & OBD_MD_FLFID ?
1641                                                 body->oa.o_parent_oid : 0,
1642                                            body->oa.o_valid & OBD_MD_FLFID ?
1643                                                 body->oa.o_parent_ver : 0,
1644                                            body->oa.o_id,
1645                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1646                                                 body->oa.o_seq : (__u64)0,
1647                                            aa->aa_ppga[0]->off,
1648                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1649                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1650                                                                         1);
1651                         CERROR("client %x, server %x, cksum_type %x\n",
1652                                client_cksum, server_cksum, cksum_type);
1653                         cksum_counter = 0;
1654                         aa->aa_oa->o_cksum = client_cksum;
1655                         rc = -EAGAIN;
1656                 } else {
1657                         cksum_counter++;
1658                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1659                         rc = 0;
1660                 }
1661         } else if (unlikely(client_cksum)) {
1662                 static int cksum_missed;
1663
1664                 cksum_missed++;
1665                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1666                         CERROR("Checksum %u requested from %s but not sent\n",
1667                                cksum_missed, libcfs_nid2str(peer->nid));
1668         } else {
1669                 rc = 0;
1670         }
1671 out:
1672         if (rc >= 0)
1673                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1674
1675         RETURN(rc);
1676 }
1677
1678 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1679                             struct lov_stripe_md *lsm,
1680                             obd_count page_count, struct brw_page **pga,
1681                             struct obd_capa *ocapa)
1682 {
1683         struct ptlrpc_request *req;
1684         int                    rc;
1685         cfs_waitq_t            waitq;
1686         int                    resends = 0;
1687         struct l_wait_info     lwi;
1688
1689         ENTRY;
1690
1691         cfs_waitq_init(&waitq);
1692
1693 restart_bulk:
1694         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1695                                   page_count, pga, &req, ocapa, 0, resends);
1696         if (rc != 0)
1697                 return (rc);
1698
1699         rc = ptlrpc_queue_wait(req);
1700
1701         if (rc == -ETIMEDOUT && req->rq_resend) {
1702                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1703                 ptlrpc_req_finished(req);
1704                 goto restart_bulk;
1705         }
1706
1707         rc = osc_brw_fini_request(req, rc);
1708
1709         ptlrpc_req_finished(req);
1710         if (osc_recoverable_error(rc)) {
1711                 resends++;
1712                 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1713                         CERROR("too many resend retries, returning error\n");
1714                         RETURN(-EIO);
1715                 }
1716
1717                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1718                 l_wait_event(waitq, 0, &lwi);
1719
1720                 goto restart_bulk;
1721         }
1722
1723         RETURN (rc);
1724 }
1725
1726 int osc_brw_redo_request(struct ptlrpc_request *request,
1727                          struct osc_brw_async_args *aa)
1728 {
1729         struct ptlrpc_request *new_req;
1730         struct ptlrpc_request_set *set = request->rq_set;
1731         struct osc_brw_async_args *new_aa;
1732         struct osc_async_page *oap;
1733         int rc = 0;
1734         ENTRY;
1735
1736         if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1737                 CERROR("too many resent retries, returning error\n");
1738                 RETURN(-EIO);
1739         }
1740
1741         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1742
1743         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1744                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1745                                   aa->aa_cli, aa->aa_oa,
1746                                   NULL /* lsm unused by osc currently */,
1747                                   aa->aa_page_count, aa->aa_ppga,
1748                                   &new_req, aa->aa_ocapa, 0, 1);
1749         if (rc)
1750                 RETURN(rc);
1751
1752         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1753
1754         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1755                 if (oap->oap_request != NULL) {
1756                         LASSERTF(request == oap->oap_request,
1757                                  "request %p != oap_request %p\n",
1758                                  request, oap->oap_request);
1759                         if (oap->oap_interrupted) {
1760                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1761                                 ptlrpc_req_finished(new_req);
1762                                 RETURN(-EINTR);
1763                         }
1764                 }
1765         }
1766         /* New request takes over pga and oaps from old request.
1767          * Note that copying a list_head doesn't work, need to move it... */
1768         aa->aa_resends++;
1769         new_req->rq_interpret_reply = request->rq_interpret_reply;
1770         new_req->rq_async_args = request->rq_async_args;
1771         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1772
1773         new_aa = ptlrpc_req_async_args(new_req);
1774
1775         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1776         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1777         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1778
1779         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1780                 if (oap->oap_request) {
1781                         ptlrpc_req_finished(oap->oap_request);
1782                         oap->oap_request = ptlrpc_request_addref(new_req);
1783                 }
1784         }
1785
1786         new_aa->aa_ocapa = aa->aa_ocapa;
1787         aa->aa_ocapa = NULL;
1788
1789         /* use ptlrpc_set_add_req is safe because interpret functions work
1790          * in check_set context. only one way exist with access to request
1791          * from different thread got -EINTR - this way protected with
1792          * cl_loi_list_lock */
1793         ptlrpc_set_add_req(set, new_req);
1794
1795         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1796
1797         DEBUG_REQ(D_INFO, new_req, "new request");
1798         RETURN(0);
1799 }
1800
1801 /*
1802  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1803  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1804  * fine for our small page arrays and doesn't require allocation.  its an
1805  * insertion sort that swaps elements that are strides apart, shrinking the
1806  * stride down until its '1' and the array is sorted.
1807  */
1808 static void sort_brw_pages(struct brw_page **array, int num)
1809 {
1810         int stride, i, j;
1811         struct brw_page *tmp;
1812
1813         if (num == 1)
1814                 return;
1815         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1816                 ;
1817
1818         do {
1819                 stride /= 3;
1820                 for (i = stride ; i < num ; i++) {
1821                         tmp = array[i];
1822                         j = i;
1823                         while (j >= stride && array[j - stride]->off > tmp->off) {
1824                                 array[j] = array[j - stride];
1825                                 j -= stride;
1826                         }
1827                         array[j] = tmp;
1828                 }
1829         } while (stride > 1);
1830 }
1831
1832 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1833 {
1834         int count = 1;
1835         int offset;
1836         int i = 0;
1837
1838         LASSERT (pages > 0);
1839         offset = pg[i]->off & ~CFS_PAGE_MASK;
1840
1841         for (;;) {
1842                 pages--;
1843                 if (pages == 0)         /* that's all */
1844                         return count;
1845
1846                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1847                         return count;   /* doesn't end on page boundary */
1848
1849                 i++;
1850                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1851                 if (offset != 0)        /* doesn't start on page boundary */
1852                         return count;
1853
1854                 count++;
1855         }
1856 }
1857
1858 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1859 {
1860         struct brw_page **ppga;
1861         int i;
1862
1863         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1864         if (ppga == NULL)
1865                 return NULL;
1866
1867         for (i = 0; i < count; i++)
1868                 ppga[i] = pga + i;
1869         return ppga;
1870 }
1871
1872 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1873 {
1874         LASSERT(ppga != NULL);
1875         OBD_FREE(ppga, sizeof(*ppga) * count);
1876 }
1877
1878 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1879                    obd_count page_count, struct brw_page *pga,
1880                    struct obd_trans_info *oti)
1881 {
1882         struct obdo *saved_oa = NULL;
1883         struct brw_page **ppga, **orig;
1884         struct obd_import *imp = class_exp2cliimp(exp);
1885         struct client_obd *cli;
1886         int rc, page_count_orig;
1887         ENTRY;
1888
1889         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1890         cli = &imp->imp_obd->u.cli;
1891
1892         if (cmd & OBD_BRW_CHECK) {
1893                 /* The caller just wants to know if there's a chance that this
1894                  * I/O can succeed */
1895
1896                 if (imp->imp_invalid)
1897                         RETURN(-EIO);
1898                 RETURN(0);
1899         }
1900
1901         /* test_brw with a failed create can trip this, maybe others. */
1902         LASSERT(cli->cl_max_pages_per_rpc);
1903
1904         rc = 0;
1905
1906         orig = ppga = osc_build_ppga(pga, page_count);
1907         if (ppga == NULL)
1908                 RETURN(-ENOMEM);
1909         page_count_orig = page_count;
1910
1911         sort_brw_pages(ppga, page_count);
1912         while (page_count) {
1913                 obd_count pages_per_brw;
1914
1915                 if (page_count > cli->cl_max_pages_per_rpc)
1916                         pages_per_brw = cli->cl_max_pages_per_rpc;
1917                 else
1918                         pages_per_brw = page_count;
1919
1920                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1921
1922                 if (saved_oa != NULL) {
1923                         /* restore previously saved oa */
1924                         *oinfo->oi_oa = *saved_oa;
1925                 } else if (page_count > pages_per_brw) {
1926                         /* save a copy of oa (brw will clobber it) */
1927                         OBDO_ALLOC(saved_oa);
1928                         if (saved_oa == NULL)
1929                                 GOTO(out, rc = -ENOMEM);
1930                         *saved_oa = *oinfo->oi_oa;
1931                 }
1932
1933                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1934                                       pages_per_brw, ppga, oinfo->oi_capa);
1935
1936                 if (rc != 0)
1937                         break;
1938
1939                 page_count -= pages_per_brw;
1940                 ppga += pages_per_brw;
1941         }
1942
1943 out:
1944         osc_release_ppga(orig, page_count_orig);
1945
1946         if (saved_oa != NULL)
1947                 OBDO_FREE(saved_oa);
1948
1949         RETURN(rc);
1950 }
1951
1952 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1953  * the dirty accounting.  Writeback completes or truncate happens before
1954  * writing starts.  Must be called with the loi lock held. */
1955 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1956                            int sent)
1957 {
1958         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1959 }
1960
1961
1962 /* This maintains the lists of pending pages to read/write for a given object
1963  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964  * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1966                          int cmd)
1967 {
1968         int optimal;
1969         ENTRY;
1970
1971         if (lop->lop_num_pending == 0)
1972                 RETURN(0);
1973
1974         /* if we have an invalid import we want to drain the queued pages
1975          * by forcing them through rpcs that immediately fail and complete
1976          * the pages.  recovery relies on this to empty the queued pages
1977          * before canceling the locks and evicting down the llite pages */
1978         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1979                 RETURN(1);
1980
1981         /* stream rpcs in queue order as long as as there is an urgent page
1982          * queued.  this is our cheap solution for good batching in the case
1983          * where writepage marks some random page in the middle of the file
1984          * as urgent because of, say, memory pressure */
1985         if (!cfs_list_empty(&lop->lop_urgent)) {
1986                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1987                 RETURN(1);
1988         }
1989         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1990         optimal = cli->cl_max_pages_per_rpc;
1991         if (cmd & OBD_BRW_WRITE) {
1992                 /* trigger a write rpc stream as long as there are dirtiers
1993                  * waiting for space.  as they're waiting, they're not going to
1994                  * create more pages to coalesce with what's waiting.. */
1995                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1997                         RETURN(1);
1998                 }
1999                 /* +16 to avoid triggering rpcs that would want to include pages
2000                  * that are being queued but which can't be made ready until
2001                  * the queuer finishes with the page. this is a wart for
2002                  * llite::commit_write() */
2003                 optimal += 16;
2004         }
2005         if (lop->lop_num_pending >= optimal)
2006                 RETURN(1);
2007
2008         RETURN(0);
2009 }
2010
2011 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2012 {
2013         struct osc_async_page *oap;
2014         ENTRY;
2015
2016         if (cfs_list_empty(&lop->lop_urgent))
2017                 RETURN(0);
2018
2019         oap = cfs_list_entry(lop->lop_urgent.next,
2020                          struct osc_async_page, oap_urgent_item);
2021
2022         if (oap->oap_async_flags & ASYNC_HP) {
2023                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2024                 RETURN(1);
2025         }
2026
2027         RETURN(0);
2028 }
2029
2030 static void on_list(cfs_list_t *item, cfs_list_t *list,
2031                     int should_be_on)
2032 {
2033         if (cfs_list_empty(item) && should_be_on)
2034                 cfs_list_add_tail(item, list);
2035         else if (!cfs_list_empty(item) && !should_be_on)
2036                 cfs_list_del_init(item);
2037 }
2038
2039 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2040  * can find pages to build into rpcs quickly */
2041 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2042 {
2043         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2044             lop_makes_hprpc(&loi->loi_read_lop)) {
2045                 /* HP rpc */
2046                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2047                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2048         } else {
2049                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2050                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2051                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2052                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2053         }
2054
2055         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2056                 loi->loi_write_lop.lop_num_pending);
2057
2058         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2059                 loi->loi_read_lop.lop_num_pending);
2060 }
2061
2062 static void lop_update_pending(struct client_obd *cli,
2063                                struct loi_oap_pages *lop, int cmd, int delta)
2064 {
2065         lop->lop_num_pending += delta;
2066         if (cmd & OBD_BRW_WRITE)
2067                 cli->cl_pending_w_pages += delta;
2068         else
2069                 cli->cl_pending_r_pages += delta;
2070 }
2071
2072 /**
2073  * this is called when a sync waiter receives an interruption.  Its job is to
2074  * get the caller woken as soon as possible.  If its page hasn't been put in an
2075  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2076  * desiring interruption which will forcefully complete the rpc once the rpc
2077  * has timed out.
2078  */
2079 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2080 {
2081         struct loi_oap_pages *lop;
2082         struct lov_oinfo *loi;
2083         int rc = -EBUSY;
2084         ENTRY;
2085
2086         LASSERT(!oap->oap_interrupted);
2087         oap->oap_interrupted = 1;
2088
2089         /* ok, it's been put in an rpc. only one oap gets a request reference */
2090         if (oap->oap_request != NULL) {
2091                 ptlrpc_mark_interrupted(oap->oap_request);
2092                 ptlrpcd_wake(oap->oap_request);
2093                 ptlrpc_req_finished(oap->oap_request);
2094                 oap->oap_request = NULL;
2095         }
2096
2097         /*
2098          * page completion may be called only if ->cpo_prep() method was
2099          * executed by osc_io_submit(), that also adds page the to pending list
2100          */
2101         if (!cfs_list_empty(&oap->oap_pending_item)) {
2102                 cfs_list_del_init(&oap->oap_pending_item);
2103                 cfs_list_del_init(&oap->oap_urgent_item);
2104
2105                 loi = oap->oap_loi;
2106                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2107                         &loi->loi_write_lop : &loi->loi_read_lop;
2108                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2109                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2110                 rc = oap->oap_caller_ops->ap_completion(env,
2111                                           oap->oap_caller_data,
2112                                           oap->oap_cmd, NULL, -EINTR);
2113         }
2114
2115         RETURN(rc);
2116 }
2117
2118 /* this is trying to propogate async writeback errors back up to the
2119  * application.  As an async write fails we record the error code for later if
2120  * the app does an fsync.  As long as errors persist we force future rpcs to be
2121  * sync so that the app can get a sync error and break the cycle of queueing
2122  * pages for which writeback will fail. */
2123 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2124                            int rc)
2125 {
2126         if (rc) {
2127                 if (!ar->ar_rc)
2128                         ar->ar_rc = rc;
2129
2130                 ar->ar_force_sync = 1;
2131                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2132                 return;
2133
2134         }
2135
2136         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2137                 ar->ar_force_sync = 0;
2138 }
2139
2140 void osc_oap_to_pending(struct osc_async_page *oap)
2141 {
2142         struct loi_oap_pages *lop;
2143
2144         if (oap->oap_cmd & OBD_BRW_WRITE)
2145                 lop = &oap->oap_loi->loi_write_lop;
2146         else
2147                 lop = &oap->oap_loi->loi_read_lop;
2148
2149         if (oap->oap_async_flags & ASYNC_HP)
2150                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2151         else if (oap->oap_async_flags & ASYNC_URGENT)
2152                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2153         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2154         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2155 }
2156
2157 /* this must be called holding the loi list lock to give coverage to exit_cache,
2158  * async_flag maintenance, and oap_request */
2159 static void osc_ap_completion(const struct lu_env *env,
2160                               struct client_obd *cli, struct obdo *oa,
2161                               struct osc_async_page *oap, int sent, int rc)
2162 {
2163         __u64 xid = 0;
2164
2165         ENTRY;
2166         if (oap->oap_request != NULL) {
2167                 xid = ptlrpc_req_xid(oap->oap_request);
2168                 ptlrpc_req_finished(oap->oap_request);
2169                 oap->oap_request = NULL;
2170         }
2171
2172         cfs_spin_lock(&oap->oap_lock);
2173         oap->oap_async_flags = 0;
2174         cfs_spin_unlock(&oap->oap_lock);
2175         oap->oap_interrupted = 0;
2176
2177         if (oap->oap_cmd & OBD_BRW_WRITE) {
2178                 osc_process_ar(&cli->cl_ar, xid, rc);
2179                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2180         }
2181
2182         if (rc == 0 && oa != NULL) {
2183                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2184                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2185                 if (oa->o_valid & OBD_MD_FLMTIME)
2186                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2187                 if (oa->o_valid & OBD_MD_FLATIME)
2188                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2189                 if (oa->o_valid & OBD_MD_FLCTIME)
2190                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2191         }
2192
2193         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2194                                                 oap->oap_cmd, oa, rc);
2195
2196         /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2197          * start, but OSC calls it under lock and thus we can add oap back to
2198          * pending safely */
2199         if (rc)
2200                 /* upper layer wants to leave the page on pending queue */
2201                 osc_oap_to_pending(oap);
2202         else
2203                 osc_exit_cache(cli, oap, sent);
2204         EXIT;
2205 }
2206
2207 static int brw_interpret(const struct lu_env *env,
2208                          struct ptlrpc_request *req, void *data, int rc)
2209 {
2210         struct osc_brw_async_args *aa = data;
2211         struct client_obd *cli;
2212         int async;
2213         ENTRY;
2214
2215         rc = osc_brw_fini_request(req, rc);
2216         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2217         if (osc_recoverable_error(rc)) {
2218                 rc = osc_brw_redo_request(req, aa);
2219                 if (rc == 0)
2220                         RETURN(0);
2221         }
2222
2223         if (aa->aa_ocapa) {
2224                 capa_put(aa->aa_ocapa);
2225                 aa->aa_ocapa = NULL;
2226         }
2227
2228         cli = aa->aa_cli;
2229
2230         client_obd_list_lock(&cli->cl_loi_list_lock);
2231
2232         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2233          * is called so we know whether to go to sync BRWs or wait for more
2234          * RPCs to complete */
2235         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2236                 cli->cl_w_in_flight--;
2237         else
2238                 cli->cl_r_in_flight--;
2239
2240         async = cfs_list_empty(&aa->aa_oaps);
2241         if (!async) { /* from osc_send_oap_rpc() */
2242                 struct osc_async_page *oap, *tmp;
2243                 /* the caller may re-use the oap after the completion call so
2244                  * we need to clean it up a little */
2245                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2246                                              oap_rpc_item) {
2247                         cfs_list_del_init(&oap->oap_rpc_item);
2248                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2249                 }
2250                 OBDO_FREE(aa->aa_oa);
2251         } else { /* from async_internal() */
2252                 obd_count i;
2253                 for (i = 0; i < aa->aa_page_count; i++)
2254                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2255         }
2256         osc_wake_cache_waiters(cli);
2257         osc_check_rpcs(env, cli);
2258         client_obd_list_unlock(&cli->cl_loi_list_lock);
2259         if (!async)
2260                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2261                                   req->rq_bulk->bd_nob_transferred);
2262         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2263         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2264
2265         RETURN(rc);
2266 }
2267
2268 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2269                                             struct client_obd *cli,
2270                                             cfs_list_t *rpc_list,
2271                                             int page_count, int cmd)
2272 {
2273         struct ptlrpc_request *req;
2274         struct brw_page **pga = NULL;
2275         struct osc_brw_async_args *aa;
2276         struct obdo *oa = NULL;
2277         const struct obd_async_page_ops *ops = NULL;
2278         struct osc_async_page *oap;
2279         struct osc_async_page *tmp;
2280         struct cl_req *clerq = NULL;
2281         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2282         struct ldlm_lock *lock = NULL;
2283         struct cl_req_attr crattr;
2284         int i, rc, mpflag = 0;
2285
2286         ENTRY;
2287         LASSERT(!cfs_list_empty(rpc_list));
2288
2289         if (cmd & OBD_BRW_MEMALLOC)
2290                 mpflag = cfs_memory_pressure_get_and_set();
2291
2292         memset(&crattr, 0, sizeof crattr);
2293         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2294         if (pga == NULL)
2295                 GOTO(out, req = ERR_PTR(-ENOMEM));
2296
2297         OBDO_ALLOC(oa);
2298         if (oa == NULL)
2299                 GOTO(out, req = ERR_PTR(-ENOMEM));
2300
2301         i = 0;
2302         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2303                 struct cl_page *page = osc_oap2cl_page(oap);
2304                 if (ops == NULL) {
2305                         ops = oap->oap_caller_ops;
2306
2307                         clerq = cl_req_alloc(env, page, crt,
2308                                              1 /* only 1-object rpcs for
2309                                                 * now */);
2310                         if (IS_ERR(clerq))
2311                                 GOTO(out, req = (void *)clerq);
2312                         lock = oap->oap_ldlm_lock;
2313                 }
2314                 pga[i] = &oap->oap_brw_page;
2315                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2316                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2317                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2318                 i++;
2319                 cl_req_page_add(env, clerq, page);
2320         }
2321
2322         /* always get the data for the obdo for the rpc */
2323         LASSERT(ops != NULL);
2324         crattr.cra_oa = oa;
2325         crattr.cra_capa = NULL;
2326         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2327         if (lock) {
2328                 oa->o_handle = lock->l_remote_handle;
2329                 oa->o_valid |= OBD_MD_FLHANDLE;
2330         }
2331
2332         rc = cl_req_prep(env, clerq);
2333         if (rc != 0) {
2334                 CERROR("cl_req_prep failed: %d\n", rc);
2335                 GOTO(out, req = ERR_PTR(rc));
2336         }
2337
2338         sort_brw_pages(pga, page_count);
2339         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2340                                   pga, &req, crattr.cra_capa, 1, 0);
2341         if (rc != 0) {
2342                 CERROR("prep_req failed: %d\n", rc);
2343                 GOTO(out, req = ERR_PTR(rc));
2344         }
2345
2346         if (cmd & OBD_BRW_MEMALLOC)
2347                 req->rq_memalloc = 1;
2348
2349         /* Need to update the timestamps after the request is built in case
2350          * we race with setattr (locally or in queue at OST).  If OST gets
2351          * later setattr before earlier BRW (as determined by the request xid),
2352          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2353          * way to do this in a single call.  bug 10150 */
2354         cl_req_attr_set(env, clerq, &crattr,
2355                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2356
2357         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2358         aa = ptlrpc_req_async_args(req);
2359         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2360         cfs_list_splice(rpc_list, &aa->aa_oaps);
2361         CFS_INIT_LIST_HEAD(rpc_list);
2362         aa->aa_clerq = clerq;
2363 out:
2364         if (cmd & OBD_BRW_MEMALLOC)
2365                 cfs_memory_pressure_restore(mpflag);
2366
2367         capa_put(crattr.cra_capa);
2368         if (IS_ERR(req)) {
2369                 if (oa)
2370                         OBDO_FREE(oa);
2371                 if (pga)
2372                         OBD_FREE(pga, sizeof(*pga) * page_count);
2373                 /* this should happen rarely and is pretty bad, it makes the
2374                  * pending list not follow the dirty order */
2375                 client_obd_list_lock(&cli->cl_loi_list_lock);
2376                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2377                         cfs_list_del_init(&oap->oap_rpc_item);
2378
2379                         /* queued sync pages can be torn down while the pages
2380                          * were between the pending list and the rpc */
2381                         if (oap->oap_interrupted) {
2382                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2383                                 osc_ap_completion(env, cli, NULL, oap, 0,
2384                                                   oap->oap_count);
2385                                 continue;
2386                         }
2387                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2388                 }
2389                 if (clerq && !IS_ERR(clerq))
2390                         cl_req_completion(env, clerq, PTR_ERR(req));
2391         }
2392         RETURN(req);
2393 }
2394
2395 /**
2396  * prepare pages for ASYNC io and put pages in send queue.
2397  *
2398  * \param cmd OBD_BRW_* macroses
2399  * \param lop pending pages
2400  *
2401  * \return zero if no page added to send queue.
2402  * \return 1 if pages successfully added to send queue.
2403  * \return negative on errors.
2404  */
2405 static int
2406 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2407                  struct lov_oinfo *loi,
2408                  int cmd, struct loi_oap_pages *lop)
2409 {
2410         struct ptlrpc_request *req;
2411         obd_count page_count = 0;
2412         struct osc_async_page *oap = NULL, *tmp;
2413         struct osc_brw_async_args *aa;
2414         const struct obd_async_page_ops *ops;
2415         CFS_LIST_HEAD(rpc_list);
2416         int srvlock = 0, mem_tight = 0;
2417         struct cl_object *clob = NULL;
2418         obd_off starting_offset = OBD_OBJECT_EOF;
2419         unsigned int ending_offset;
2420         int starting_page_off = 0;
2421         ENTRY;
2422
2423         /* ASYNC_HP pages first. At present, when the lock the pages is
2424          * to be canceled, the pages covered by the lock will be sent out
2425          * with ASYNC_HP. We have to send out them as soon as possible. */
2426         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2427                 if (oap->oap_async_flags & ASYNC_HP)
2428                         cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
2429                 if (++page_count >= cli->cl_max_pages_per_rpc)
2430                         break;
2431         }
2432         page_count = 0;
2433
2434         /* first we find the pages we're allowed to work with */
2435         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2436                                      oap_pending_item) {
2437                 ops = oap->oap_caller_ops;
2438
2439                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2440                          "magic 0x%x\n", oap, oap->oap_magic);
2441
2442                 if (clob == NULL) {
2443                         /* pin object in memory, so that completion call-backs
2444                          * can be safely called under client_obd_list lock. */
2445                         clob = osc_oap2cl_page(oap)->cp_obj;
2446                         cl_object_get(clob);
2447                 }
2448
2449                 if (page_count != 0 &&
2450                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2451                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2452                                " oap %p, page %p, srvlock %u\n",
2453                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2454                         break;
2455                 }
2456
2457                 /* If there is a gap at the start of this page, it can't merge
2458                  * with any previous page, so we'll hand the network a
2459                  * "fragmented" page array that it can't transfer in 1 RDMA */
2460                 if (oap->oap_obj_off < starting_offset) {
2461                         if (starting_page_off != 0)
2462                                 break;
2463
2464                         starting_page_off = oap->oap_page_off;
2465                         starting_offset = oap->oap_obj_off + starting_page_off;
2466                 } else if (oap->oap_page_off != 0)
2467                         break;
2468
2469                 /* in llite being 'ready' equates to the page being locked
2470                  * until completion unlocks it.  commit_write submits a page
2471                  * as not ready because its unlock will happen unconditionally
2472                  * as the call returns.  if we race with commit_write giving
2473                  * us that page we don't want to create a hole in the page
2474                  * stream, so we stop and leave the rpc to be fired by
2475                  * another dirtier or kupdated interval (the not ready page
2476                  * will still be on the dirty list).  we could call in
2477                  * at the end of ll_file_write to process the queue again. */
2478                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2479                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2480                                                     cmd);
2481                         if (rc < 0)
2482                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2483                                                 "instead of ready\n", oap,
2484                                                 oap->oap_page, rc);
2485                         switch (rc) {
2486                         case -EAGAIN:
2487                                 /* llite is telling us that the page is still
2488                                  * in commit_write and that we should try
2489                                  * and put it in an rpc again later.  we
2490                                  * break out of the loop so we don't create
2491                                  * a hole in the sequence of pages in the rpc
2492                                  * stream.*/
2493                                 oap = NULL;
2494                                 break;
2495                         case -EINTR:
2496                                 /* the io isn't needed.. tell the checks
2497                                  * below to complete the rpc with EINTR */
2498                                 cfs_spin_lock(&oap->oap_lock);
2499                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2500                                 cfs_spin_unlock(&oap->oap_lock);
2501                                 oap->oap_count = -EINTR;
2502                                 break;
2503                         case 0:
2504                                 cfs_spin_lock(&oap->oap_lock);
2505                                 oap->oap_async_flags |= ASYNC_READY;
2506                                 cfs_spin_unlock(&oap->oap_lock);
2507                                 break;
2508                         default:
2509                                 LASSERTF(0, "oap %p page %p returned %d "
2510                                             "from make_ready\n", oap,
2511                                             oap->oap_page, rc);
2512                                 break;
2513                         }
2514                 }
2515                 if (oap == NULL)
2516                         break;
2517
2518                 /* take the page out of our book-keeping */
2519                 cfs_list_del_init(&oap->oap_pending_item);
2520                 lop_update_pending(cli, lop, cmd, -1);
2521                 cfs_list_del_init(&oap->oap_urgent_item);
2522
2523                 /* ask the caller for the size of the io as the rpc leaves. */
2524                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2525                         oap->oap_count =
2526                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2527                                                       cmd);
2528                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2529                 }
2530                 if (oap->oap_count <= 0) {
2531                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2532                                oap->oap_count);
2533                         osc_ap_completion(env, cli, NULL,
2534                                           oap, 0, oap->oap_count);
2535                         continue;
2536                 }
2537
2538                 /* now put the page back in our accounting */
2539                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2540                 if (page_count++ == 0)
2541                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2542
2543                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2544                         mem_tight = 1;
2545
2546                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2547                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2548                  * have the same alignment as the initial writes that allocated
2549                  * extents on the server. */
2550                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2551                                 oap->oap_count;
2552                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2553                         break;
2554
2555                 if (page_count >= cli->cl_max_pages_per_rpc)
2556                         break;
2557
2558                 /* If there is a gap at the end of this page, it can't merge
2559                  * with any subsequent pages, so we'll hand the network a
2560                  * "fragmented" page array that it can't transfer in 1 RDMA */
2561                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2562                         break;
2563         }
2564
2565         osc_wake_cache_waiters(cli);
2566
2567         loi_list_maint(cli, loi);
2568
2569         client_obd_list_unlock(&cli->cl_loi_list_lock);
2570
2571         if (clob != NULL)
2572                 cl_object_put(env, clob);
2573
2574         if (page_count == 0) {
2575                 client_obd_list_lock(&cli->cl_loi_list_lock);
2576                 RETURN(0);
2577         }
2578
2579         req = osc_build_req(env, cli, &rpc_list, page_count,
2580                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2581         if (IS_ERR(req)) {
2582                 LASSERT(cfs_list_empty(&rpc_list));
2583                 loi_list_maint(cli, loi);
2584                 RETURN(PTR_ERR(req));
2585         }
2586
2587         aa = ptlrpc_req_async_args(req);
2588
2589         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2590         if (cmd == OBD_BRW_READ) {
2591                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2592                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2593                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2594                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2595         } else {
2596                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2597                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2598                                  cli->cl_w_in_flight);
2599                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2600                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2601         }
2602
2603         client_obd_list_lock(&cli->cl_loi_list_lock);
2604
2605         if (cmd == OBD_BRW_READ)
2606                 cli->cl_r_in_flight++;
2607         else
2608                 cli->cl_w_in_flight++;
2609
2610         /* queued sync pages can be torn down while the pages
2611          * were between the pending list and the rpc */
2612         tmp = NULL;
2613         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2614                 /* only one oap gets a request reference */
2615                 if (tmp == NULL)
2616                         tmp = oap;
2617                 if (oap->oap_interrupted && !req->rq_intr) {
2618                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2619                                oap, req);
2620                         ptlrpc_mark_interrupted(req);
2621                 }
2622         }
2623         if (tmp != NULL)
2624                 tmp->oap_request = ptlrpc_request_addref(req);
2625
2626         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2627                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2628
2629         req->rq_interpret_reply = brw_interpret;
2630
2631         /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2632          *      CPU/NUMA node the majority of pages were allocated on, and try
2633          *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2634          *      to reduce cross-CPU memory traffic.
2635          *
2636          *      But on the other hand, we expect that multiple ptlrpcd threads
2637          *      and the initial write sponsor can run in parallel, especially
2638          *      when data checksum is enabled, which is CPU-bound operation and
2639          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
2640          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2641          */
2642         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2643         RETURN(1);
2644 }
2645
2646 #define LOI_DEBUG(LOI, STR, args...)                                     \
2647         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2648                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2649                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2650                (LOI)->loi_write_lop.lop_num_pending,                     \
2651                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2652                (LOI)->loi_read_lop.lop_num_pending,                      \
2653                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2654                args)                                                     \
2655
2656 /* This is called by osc_check_rpcs() to find which objects have pages that
2657  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2658 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2659 {
2660         ENTRY;
2661
2662         /* First return objects that have blocked locks so that they
2663          * will be flushed quickly and other clients can get the lock,
2664          * then objects which have pages ready to be stuffed into RPCs */
2665         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2666                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2667                                       struct lov_oinfo, loi_hp_ready_item));
2668         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2669                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2670                                       struct lov_oinfo, loi_ready_item));
2671
2672         /* then if we have cache waiters, return all objects with queued
2673          * writes.  This is especially important when many small files
2674          * have filled up the cache and not been fired into rpcs because
2675          * they don't pass the nr_pending/object threshhold */
2676         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2677             !cfs_list_empty(&cli->cl_loi_write_list))
2678                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2679                                       struct lov_oinfo, loi_write_item));
2680
2681         /* then return all queued objects when we have an invalid import
2682          * so that they get flushed */
2683         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2684                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2685                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2686                                               struct lov_oinfo,
2687                                               loi_write_item));
2688                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2689                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2690                                               struct lov_oinfo, loi_read_item));
2691         }
2692         RETURN(NULL);
2693 }
2694
2695 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2696 {
2697         struct osc_async_page *oap;
2698         int hprpc = 0;
2699
2700         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2701                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2702                                      struct osc_async_page, oap_urgent_item);
2703                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2704         }
2705
2706         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2707                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2708                                      struct osc_async_page, oap_urgent_item);
2709                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2710         }
2711
2712         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2713 }
2714
2715 /* called with the loi list lock held */
2716 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2717 {
2718         struct lov_oinfo *loi;
2719         int rc = 0, race_counter = 0;
2720         ENTRY;
2721
2722         while ((loi = osc_next_loi(cli)) != NULL) {
2723                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2724
2725                 if (osc_max_rpc_in_flight(cli, loi))
2726                         break;
2727
2728                 /* attempt some read/write balancing by alternating between
2729                  * reads and writes in an object.  The makes_rpc checks here
2730                  * would be redundant if we were getting read/write work items
2731                  * instead of objects.  we don't want send_oap_rpc to drain a
2732                  * partial read pending queue when we're given this object to
2733                  * do io on writes while there are cache waiters */
2734                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2735                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2736                                               &loi->loi_write_lop);
2737                         if (rc < 0) {
2738                                 CERROR("Write request failed with %d\n", rc);
2739
2740                                 /* osc_send_oap_rpc failed, mostly because of
2741                                  * memory pressure.
2742                                  *
2743                                  * It can't break here, because if:
2744                                  *  - a page was submitted by osc_io_submit, so
2745                                  *    page locked;
2746                                  *  - no request in flight
2747                                  *  - no subsequent request
2748                                  * The system will be in live-lock state,
2749                                  * because there is no chance to call
2750                                  * osc_io_unplug() and osc_check_rpcs() any
2751                                  * more. pdflush can't help in this case,
2752                                  * because it might be blocked at grabbing
2753                                  * the page lock as we mentioned.
2754                                  *
2755                                  * Anyway, continue to drain pages. */
2756                                 /* break; */
2757                         }
2758
2759                         if (rc > 0)
2760                                 race_counter = 0;
2761                         else if (rc == 0)
2762                                 race_counter++;
2763                 }
2764                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2765                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2766                                               &loi->loi_read_lop);
2767                         if (rc < 0)
2768                                 CERROR("Read request failed with %d\n", rc);
2769
2770                         if (rc > 0)
2771                                 race_counter = 0;
2772                         else if (rc == 0)
2773                                 race_counter++;
2774                 }
2775
2776                 /* attempt some inter-object balancing by issuing rpcs
2777                  * for each object in turn */
2778                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2779                         cfs_list_del_init(&loi->loi_hp_ready_item);
2780                 if (!cfs_list_empty(&loi->loi_ready_item))
2781                         cfs_list_del_init(&loi->loi_ready_item);
2782                 if (!cfs_list_empty(&loi->loi_write_item))
2783                         cfs_list_del_init(&loi->loi_write_item);
2784                 if (!cfs_list_empty(&loi->loi_read_item))
2785                         cfs_list_del_init(&loi->loi_read_item);
2786
2787                 loi_list_maint(cli, loi);
2788
2789                 /* send_oap_rpc fails with 0 when make_ready tells it to
2790                  * back off.  llite's make_ready does this when it tries
2791                  * to lock a page queued for write that is already locked.
2792                  * we want to try sending rpcs from many objects, but we
2793                  * don't want to spin failing with 0.  */
2794                 if (race_counter == 10)
2795                         break;
2796         }
2797         EXIT;
2798 }
2799
2800 /* we're trying to queue a page in the osc so we're subject to the
2801  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2802  * If the osc's queued pages are already at that limit, then we want to sleep
2803  * until there is space in the osc's queue for us.  We also may be waiting for
2804  * write credits from the OST if there are RPCs in flight that may return some
2805  * before we fall back to sync writes.
2806  *
2807  * We need this know our allocation was granted in the presence of signals */
2808 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2809 {
2810         int rc;
2811         ENTRY;
2812         client_obd_list_lock(&cli->cl_loi_list_lock);
2813         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2814         client_obd_list_unlock(&cli->cl_loi_list_lock);
2815         RETURN(rc);
2816 };
2817
2818 /**
2819  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2820  * is available.
2821  */
2822 int osc_enter_cache_try(const struct lu_env *env,
2823                         struct client_obd *cli, struct lov_oinfo *loi,
2824                         struct osc_async_page *oap, int transient)
2825 {
2826         int has_grant;
2827
2828         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2829         if (has_grant) {
2830                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2831                 if (transient) {
2832                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2833                         cfs_atomic_inc(&obd_dirty_transit_pages);
2834                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2835                 }
2836         }
2837         return has_grant;
2838 }
2839
2840 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2841  * grant or cache space. */
2842 static int osc_enter_cache(const struct lu_env *env,
2843                            struct client_obd *cli, struct lov_oinfo *loi,
2844                            struct osc_async_page *oap)
2845 {
2846         struct osc_cache_waiter ocw;
2847         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2848
2849         ENTRY;
2850
2851         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2852                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2853                cli->cl_dirty_max, obd_max_dirty_pages,
2854                cli->cl_lost_grant, cli->cl_avail_grant);
2855
2856         /* force the caller to try sync io.  this can jump the list
2857          * of queued writes and create a discontiguous rpc stream */
2858         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2859             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2860             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2861                 RETURN(-EDQUOT);
2862
2863         /* Hopefully normal case - cache space and write credits available */
2864         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2865             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2866             osc_enter_cache_try(env, cli, loi, oap, 0))
2867                 RETURN(0);
2868
2869         /* It is safe to block as a cache waiter as long as there is grant
2870          * space available or the hope of additional grant being returned
2871          * when an in flight write completes.  Using the write back cache
2872          * if possible is preferable to sending the data synchronously
2873          * because write pages can then be merged in to large requests.
2874          * The addition of this cache waiter will causing pending write
2875          * pages to be sent immediately. */
2876         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2877                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2878                 cfs_waitq_init(&ocw.ocw_waitq);
2879                 ocw.ocw_oap = oap;
2880                 ocw.ocw_rc = 0;
2881
2882                 loi_list_maint(cli, loi);
2883                 osc_check_rpcs(env, cli);
2884                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2885
2886                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2887                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2888
2889                 client_obd_list_lock(&cli->cl_loi_list_lock);
2890                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2891                         cfs_list_del(&ocw.ocw_entry);
2892                         RETURN(-EINTR);
2893                 }
2894                 RETURN(ocw.ocw_rc);
2895         }
2896
2897         RETURN(-EDQUOT);
2898 }
2899
2900
2901 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2902                         struct lov_oinfo *loi, cfs_page_t *page,
2903                         obd_off offset, const struct obd_async_page_ops *ops,
2904                         void *data, void **res, int nocache,
2905                         struct lustre_handle *lockh)
2906 {
2907         struct osc_async_page *oap;
2908
2909         ENTRY;
2910
2911         if (!page)
2912                 return cfs_size_round(sizeof(*oap));
2913
2914         oap = *res;
2915         oap->oap_magic = OAP_MAGIC;
2916         oap->oap_cli = &exp->exp_obd->u.cli;
2917         oap->oap_loi = loi;
2918
2919         oap->oap_caller_ops = ops;
2920         oap->oap_caller_data = data;
2921
2922         oap->oap_page = page;
2923         oap->oap_obj_off = offset;
2924         if (!client_is_remote(exp) &&
2925             cfs_capable(CFS_CAP_SYS_RESOURCE))
2926                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2927
2928         LASSERT(!(offset & ~CFS_PAGE_MASK));
2929
2930         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2931         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2932         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2933         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2934
2935         cfs_spin_lock_init(&oap->oap_lock);
2936         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2937         RETURN(0);
2938 }
2939
2940 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2941                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2942                        struct osc_async_page *oap, int cmd, int off,
2943                        int count, obd_flag brw_flags, enum async_flags async_flags)
2944 {
2945         struct client_obd *cli = &exp->exp_obd->u.cli;
2946         int rc = 0;
2947         ENTRY;
2948
2949         if (oap->oap_magic != OAP_MAGIC)
2950                 RETURN(-EINVAL);
2951
2952         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2953                 RETURN(-EIO);
2954
2955         if (!cfs_list_empty(&oap->oap_pending_item) ||
2956             !cfs_list_empty(&oap->oap_urgent_item) ||
2957             !cfs_list_empty(&oap->oap_rpc_item))
2958                 RETURN(-EBUSY);
2959
2960         /* check if the file's owner/group is over quota */
2961         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2962                 struct cl_object *obj;
2963                 struct cl_attr    attr; /* XXX put attr into thread info */
2964                 unsigned int qid[MAXQUOTAS];
2965
2966                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2967
2968                 cl_object_attr_lock(obj);
2969                 rc = cl_object_attr_get(env, obj, &attr);
2970                 cl_object_attr_unlock(obj);
2971
2972                 qid[USRQUOTA] = attr.cat_uid;
2973                 qid[GRPQUOTA] = attr.cat_gid;
2974                 if (rc == 0 &&
2975                     osc_quota_chkdq(cli, qid) == NO_QUOTA)
2976                         rc = -EDQUOT;
2977                 if (rc)
2978                         RETURN(rc);
2979         }
2980
2981         if (loi == NULL)
2982                 loi = lsm->lsm_oinfo[0];
2983
2984         client_obd_list_lock(&cli->cl_loi_list_lock);
2985
2986         LASSERT(off + count <= CFS_PAGE_SIZE);
2987         oap->oap_cmd = cmd;
2988         oap->oap_page_off = off;
2989         oap->oap_count = count;
2990         oap->oap_brw_flags = brw_flags;
2991         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2992         if (cfs_memory_pressure_get())
2993                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2994         cfs_spin_lock(&oap->oap_lock);
2995         oap->oap_async_flags = async_flags;
2996         cfs_spin_unlock(&oap->oap_lock);
2997
2998         if (cmd & OBD_BRW_WRITE) {
2999                 rc = osc_enter_cache(env, cli, loi, oap);
3000                 if (rc) {
3001                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3002                         RETURN(rc);
3003                 }
3004         }
3005
3006         osc_oap_to_pending(oap);
3007         loi_list_maint(cli, loi);
3008
3009         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3010                   cmd);
3011
3012         osc_check_rpcs(env, cli);
3013         client_obd_list_unlock(&cli->cl_loi_list_lock);
3014
3015         RETURN(0);
3016 }
3017
3018 /* aka (~was & now & flag), but this is more clear :) */
3019 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3020
3021 int osc_set_async_flags_base(struct client_obd *cli,
3022                              struct lov_oinfo *loi, struct osc_async_page *oap,
3023                              obd_flag async_flags)
3024 {
3025         struct loi_oap_pages *lop;
3026         int flags = 0;
3027         ENTRY;
3028
3029         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3030
3031         if (oap->oap_cmd & OBD_BRW_WRITE) {
3032                 lop = &loi->loi_write_lop;
3033         } else {
3034                 lop = &loi->loi_read_lop;
3035         }
3036
3037         if ((oap->oap_async_flags & async_flags) == async_flags)
3038                 RETURN(0);
3039
3040         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3041                 flags |= ASYNC_READY;
3042
3043         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3044             cfs_list_empty(&oap->oap_rpc_item)) {
3045                 if (oap->oap_async_flags & ASYNC_HP)
3046                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3047                 else
3048                         cfs_list_add_tail(&oap->oap_urgent_item,
3049                                           &lop->lop_urgent);
3050                 flags |= ASYNC_URGENT;
3051                 loi_list_maint(cli, loi);
3052         }
3053         cfs_spin_lock(&oap->oap_lock);
3054         oap->oap_async_flags |= flags;
3055         cfs_spin_unlock(&oap->oap_lock);
3056
3057         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3058                         oap->oap_async_flags);
3059         RETURN(0);
3060 }
3061
3062 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3063                             struct lov_oinfo *loi, struct osc_async_page *oap)
3064 {
3065         struct client_obd *cli = &exp->exp_obd->u.cli;
3066         struct loi_oap_pages *lop;
3067         int rc = 0;
3068         ENTRY;
3069
3070         if (oap->oap_magic != OAP_MAGIC)
3071                 RETURN(-EINVAL);
3072
3073         if (loi == NULL)
3074                 loi = lsm->lsm_oinfo[0];
3075
3076         if (oap->oap_cmd & OBD_BRW_WRITE) {
3077                 lop = &loi->loi_write_lop;
3078         } else {
3079                 lop = &loi->loi_read_lop;
3080         }
3081
3082         client_obd_list_lock(&cli->cl_loi_list_lock);
3083
3084         if (!cfs_list_empty(&oap->oap_rpc_item))
3085                 GOTO(out, rc = -EBUSY);
3086
3087         osc_exit_cache(cli, oap, 0);
3088         osc_wake_cache_waiters(cli);
3089
3090         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3091                 cfs_list_del_init(&oap->oap_urgent_item);
3092                 cfs_spin_lock(&oap->oap_lock);
3093                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3094                 cfs_spin_unlock(&oap->oap_lock);
3095         }
3096         if (!cfs_list_empty(&oap->oap_pending_item)) {
3097                 cfs_list_del_init(&oap->oap_pending_item);
3098                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3099         }
3100         loi_list_maint(cli, loi);
3101         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3102 out:
3103         client_obd_list_unlock(&cli->cl_loi_list_lock);
3104         RETURN(rc);
3105 }
3106
3107 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3108                                         struct ldlm_enqueue_info *einfo)
3109 {
3110         void *data = einfo->ei_cbdata;
3111         int set = 0;
3112
3113         LASSERT(lock != NULL);
3114         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3115         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3116         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3117         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3118
3119         lock_res_and_lock(lock);
3120         cfs_spin_lock(&osc_ast_guard);
3121
3122         if (lock->l_ast_data == NULL)
3123                 lock->l_ast_data = data;
3124         if (lock->l_ast_data == data)
3125                 set = 1;
3126
3127         cfs_spin_unlock(&osc_ast_guard);
3128         unlock_res_and_lock(lock);
3129
3130         return set;
3131 }
3132
3133 static int osc_set_data_with_check(struct lustre_handle *lockh,
3134                                    struct ldlm_enqueue_info *einfo)
3135 {
3136         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3137         int set = 0;
3138
3139         if (lock != NULL) {
3140                 set = osc_set_lock_data_with_check(lock, einfo);
3141                 LDLM_LOCK_PUT(lock);
3142         } else
3143                 CERROR("lockh %p, data %p - client evicted?\n",
3144                        lockh, einfo->ei_cbdata);
3145         return set;
3146 }
3147
3148 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3149                              ldlm_iterator_t replace, void *data)
3150 {
3151         struct ldlm_res_id res_id;
3152         struct obd_device *obd = class_exp2obd(exp);
3153
3154         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3155         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3156         return 0;
3157 }
3158
3159 /* find any ldlm lock of the inode in osc
3160  * return 0    not find
3161  *        1    find one
3162  *      < 0    error */
3163 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3164                            ldlm_iterator_t replace, void *data)
3165 {
3166         struct ldlm_res_id res_id;
3167         struct obd_device *obd = class_exp2obd(exp);
3168         int rc = 0;
3169
3170         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3171         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3172         if (rc == LDLM_ITER_STOP)
3173                 return(1);
3174         if (rc == LDLM_ITER_CONTINUE)
3175                 return(0);
3176         return(rc);
3177 }
3178
3179 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3180                             obd_enqueue_update_f upcall, void *cookie,
3181                             int *flags, int agl, int rc)
3182 {
3183         int intent = *flags & LDLM_FL_HAS_INTENT;
3184         ENTRY;
3185
3186         if (intent) {
3187                 /* The request was created before ldlm_cli_enqueue call. */
3188                 if (rc == ELDLM_LOCK_ABORTED) {
3189                         struct ldlm_reply *rep;
3190                         rep = req_capsule_server_get(&req->rq_pill,
3191                                                      &RMF_DLM_REP);
3192
3193                         LASSERT(rep != NULL);
3194                         if (rep->lock_policy_res1)
3195                                 rc = rep->lock_policy_res1;
3196                 }
3197         }
3198
3199         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3200             (rc == 0)) {
3201                 *flags |= LDLM_FL_LVB_READY;
3202                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3203                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3204         }
3205
3206         /* Call the update callback. */
3207         rc = (*upcall)(cookie, rc);
3208         RETURN(rc);
3209 }
3210
3211 static int osc_enqueue_interpret(const struct lu_env *env,
3212                                  struct ptlrpc_request *req,
3213                                  struct osc_enqueue_args *aa, int rc)
3214 {
3215         struct ldlm_lock *lock;
3216         struct lustre_handle handle;
3217         __u32 mode;
3218         struct ost_lvb *lvb;
3219         __u32 lvb_len;
3220         int *flags = aa->oa_flags;
3221
3222         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3223          * might be freed anytime after lock upcall has been called. */
3224         lustre_handle_copy(&handle, aa->oa_lockh);
3225         mode = aa->oa_ei->ei_mode;
3226
3227         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3228          * be valid. */
3229         lock = ldlm_handle2lock(&handle);
3230
3231         /* Take an additional reference so that a blocking AST that
3232          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3233          * to arrive after an upcall has been executed by
3234          * osc_enqueue_fini(). */
3235         ldlm_lock_addref(&handle, mode);
3236
3237         /* Let CP AST to grant the lock first. */
3238         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3239
3240         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3241                 lvb = NULL;
3242                 lvb_len = 0;
3243         } else {
3244                 lvb = aa->oa_lvb;
3245                 lvb_len = sizeof(*aa->oa_lvb);
3246         }
3247
3248         /* Complete obtaining the lock procedure. */
3249         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3250                                    mode, flags, lvb, lvb_len, &handle, rc);
3251         /* Complete osc stuff. */
3252         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3253                               flags, aa->oa_agl, rc);
3254
3255         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3256
3257         /* Release the lock for async request. */
3258         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3259                 /*
3260                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3261                  * not already released by
3262                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3263                  */
3264                 ldlm_lock_decref(&handle, mode);
3265
3266         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3267                  aa->oa_lockh, req, aa);
3268         ldlm_lock_decref(&handle, mode);
3269         LDLM_LOCK_PUT(lock);
3270         return rc;
3271 }
3272
3273 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3274                         struct lov_oinfo *loi, int flags,
3275                         struct ost_lvb *lvb, __u32 mode, int rc)
3276 {
3277         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3278
3279         if (rc == ELDLM_OK) {
3280                 __u64 tmp;
3281
3282                 LASSERT(lock != NULL);
3283                 loi->loi_lvb = *lvb;
3284                 tmp = loi->loi_lvb.lvb_size;
3285                 /* Extend KMS up to the end of this lock and no further
3286                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3287                 if (tmp > lock->l_policy_data.l_extent.end)
3288                         tmp = lock->l_policy_data.l_extent.end + 1;
3289                 if (tmp >= loi->loi_kms) {
3290                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3291                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3292                         loi_kms_set(loi, tmp);
3293                 } else {
3294                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3295                                    LPU64"; leaving kms="LPU64", end="LPU64,
3296                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3297                                    lock->l_policy_data.l_extent.end);
3298                 }
3299                 ldlm_lock_allow_match(lock);
3300         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3301                 LASSERT(lock != NULL);
3302                 loi->loi_lvb = *lvb;
3303                 ldlm_lock_allow_match(lock);
3304                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3305                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3306                 rc = ELDLM_OK;
3307         }
3308
3309         if (lock != NULL) {
3310                 if (rc != ELDLM_OK)
3311                         ldlm_lock_fail_match(lock, rc);
3312
3313                 LDLM_LOCK_PUT(lock);
3314         }
3315 }
3316 EXPORT_SYMBOL(osc_update_enqueue);
3317
3318 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3319
3320 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3321  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3322  * other synchronous requests, however keeping some locks and trying to obtain
3323  * others may take a considerable amount of time in a case of ost failure; and
3324  * when other sync requests do not get released lock from a client, the client
3325  * is excluded from the cluster -- such scenarious make the life difficult, so
3326  * release locks just after they are obtained. */
3327 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3328                      int *flags, ldlm_policy_data_t *policy,
3329                      struct ost_lvb *lvb, int kms_valid,
3330                      obd_enqueue_update_f upcall, void *cookie,
3331                      struct ldlm_enqueue_info *einfo,
3332                      struct lustre_handle *lockh,
3333                      struct ptlrpc_request_set *rqset, int async, int agl)
3334 {
3335         struct obd_device *obd = exp->exp_obd;
3336         struct ptlrpc_request *req = NULL;
3337         int intent = *flags & LDLM_FL_HAS_INTENT;
3338         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3339         ldlm_mode_t mode;
3340         int rc;
3341         ENTRY;
3342
3343         /* Filesystem lock extents are extended to page boundaries so that
3344          * dealing with the page cache is a little smoother.  */
3345         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3346         policy->l_extent.end |= ~CFS_PAGE_MASK;
3347
3348         /*
3349          * kms is not valid when either object is completely fresh (so that no
3350          * locks are cached), or object was evicted. In the latter case cached
3351          * lock cannot be used, because it would prime inode state with
3352          * potentially stale LVB.
3353          */
3354         if (!kms_valid)
3355                 goto no_match;
3356
3357         /* Next, search for already existing extent locks that will cover us */
3358         /* If we're trying to read, we also search for an existing PW lock.  The
3359          * VFS and page cache already protect us locally, so lots of readers/
3360          * writers can share a single PW lock.
3361          *
3362          * There are problems with conversion deadlocks, so instead of
3363          * converting a read lock to a write lock, we'll just enqueue a new
3364          * one.
3365          *
3366          * At some point we should cancel the read lock instead of making them
3367          * send us a blocking callback, but there are problems with canceling
3368          * locks out from other users right now, too. */
3369         mode = einfo->ei_mode;
3370         if (einfo->ei_mode == LCK_PR)
3371                 mode |= LCK_PW;
3372         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3373                                einfo->ei_type, policy, mode, lockh, 0);
3374         if (mode) {
3375                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3376
3377                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3378                         /* For AGL, if enqueue RPC is sent but the lock is not
3379                          * granted, then skip to process this strpe.
3380                          * Return -ECANCELED to tell the caller. */
3381                         ldlm_lock_decref(lockh, mode);
3382                         LDLM_LOCK_PUT(matched);
3383                         RETURN(-ECANCELED);
3384                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3385                         *flags |= LDLM_FL_LVB_READY;
3386                         /* addref the lock only if not async requests and PW
3387                          * lock is matched whereas we asked for PR. */
3388                         if (!rqset && einfo->ei_mode != mode)
3389                                 ldlm_lock_addref(lockh, LCK_PR);
3390                         if (intent) {
3391                                 /* I would like to be able to ASSERT here that
3392                                  * rss <= kms, but I can't, for reasons which
3393                                  * are explained in lov_enqueue() */
3394                         }
3395
3396                         /* We already have a lock, and it's referenced */
3397                         (*upcall)(cookie, ELDLM_OK);
3398
3399                         if (einfo->ei_mode != mode)
3400                                 ldlm_lock_decref(lockh, LCK_PW);
3401                         else if (rqset)
3402                                 /* For async requests, decref the lock. */
3403                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3404                         LDLM_LOCK_PUT(matched);
3405                         RETURN(ELDLM_OK);
3406                 } else {
3407                         ldlm_lock_decref(lockh, mode);
3408                         LDLM_LOCK_PUT(matched);
3409                 }
3410         }
3411
3412  no_match:
3413         if (intent) {
3414                 CFS_LIST_HEAD(cancels);
3415                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3416                                            &RQF_LDLM_ENQUEUE_LVB);
3417                 if (req == NULL)
3418                         RETURN(-ENOMEM);
3419
3420                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3421                 if (rc) {
3422                         ptlrpc_request_free(req);
3423                         RETURN(rc);
3424                 }
3425
3426                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3427                                      sizeof *lvb);
3428                 ptlrpc_request_set_replen(req);
3429         }
3430
3431         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3432         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3433
3434         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3435                               sizeof(*lvb), lockh, async);
3436         if (rqset) {
3437                 if (!rc) {
3438                         struct osc_enqueue_args *aa;
3439                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3440                         aa = ptlrpc_req_async_args(req);
3441                         aa->oa_ei = einfo;
3442                         aa->oa_exp = exp;
3443                         aa->oa_flags  = flags;
3444                         aa->oa_upcall = upcall;
3445                         aa->oa_cookie = cookie;
3446                         aa->oa_lvb    = lvb;
3447                         aa->oa_lockh  = lockh;
3448                         aa->oa_agl    = !!agl;
3449
3450                         req->rq_interpret_reply =
3451                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3452                         if (rqset == PTLRPCD_SET)
3453                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3454                         else
3455                                 ptlrpc_set_add_req(rqset, req);
3456                 } else if (intent) {
3457                         ptlrpc_req_finished(req);
3458                 }
3459                 RETURN(rc);
3460         }
3461
3462         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3463         if (intent)
3464                 ptlrpc_req_finished(req);
3465
3466         RETURN(rc);
3467 }
3468
3469 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3470                        struct ldlm_enqueue_info *einfo,
3471                        struct ptlrpc_request_set *rqset)
3472 {
3473         struct ldlm_res_id res_id;
3474         int rc;
3475         ENTRY;
3476
3477         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3478                            oinfo->oi_md->lsm_object_seq, &res_id);
3479
3480         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3481                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3482                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3483                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3484                               rqset, rqset != NULL, 0);
3485         RETURN(rc);
3486 }
3487
3488 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3489                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3490                    int *flags, void *data, struct lustre_handle *lockh,
3491                    int unref)
3492 {
3493         struct obd_device *obd = exp->exp_obd;
3494         int lflags = *flags;
3495         ldlm_mode_t rc;
3496         ENTRY;
3497
3498         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3499                 RETURN(-EIO);
3500
3501         /* Filesystem lock extents are extended to page boundaries so that
3502          * dealing with the page cache is a little smoother */
3503         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3504         policy->l_extent.end |= ~CFS_PAGE_MASK;
3505
3506         /* Next, search for already existing extent locks that will cover us */
3507         /* If we're trying to read, we also search for an existing PW lock.  The
3508          * VFS and page cache already protect us locally, so lots of readers/
3509          * writers can share a single PW lock. */
3510         rc = mode;
3511         if (mode == LCK_PR)
3512                 rc |= LCK_PW;
3513         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3514                              res_id, type, policy, rc, lockh, unref);
3515         if (rc) {
3516                 if (data != NULL) {
3517                         if (!osc_set_data_with_check(lockh, data)) {
3518                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3519                                         ldlm_lock_decref(lockh, rc);
3520                                 RETURN(0);
3521                         }
3522                 }
3523                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3524                         ldlm_lock_addref(lockh, LCK_PR);
3525                         ldlm_lock_decref(lockh, LCK_PW);
3526                 }
3527                 RETURN(rc);
3528         }
3529         RETURN(rc);
3530 }
3531
3532 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3533 {
3534         ENTRY;
3535
3536         if (unlikely(mode == LCK_GROUP))
3537                 ldlm_lock_decref_and_cancel(lockh, mode);
3538         else
3539                 ldlm_lock_decref(lockh, mode);
3540
3541         RETURN(0);
3542 }
3543
3544 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3545                       __u32 mode, struct lustre_handle *lockh)
3546 {
3547         ENTRY;
3548         RETURN(osc_cancel_base(lockh, mode));
3549 }
3550
3551 static int osc_cancel_unused(struct obd_export *exp,
3552                              struct lov_stripe_md *lsm,
3553                              ldlm_cancel_flags_t flags,
3554                              void *opaque)
3555 {
3556         struct obd_device *obd = class_exp2obd(exp);
3557         struct ldlm_res_id res_id, *resp = NULL;
3558
3559         if (lsm != NULL) {
3560                 resp = osc_build_res_name(lsm->lsm_object_id,
3561                                           lsm->lsm_object_seq, &res_id);
3562         }
3563
3564         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3565 }
3566
3567 static int osc_statfs_interpret(const struct lu_env *env,
3568                                 struct ptlrpc_request *req,
3569                                 struct osc_async_args *aa, int rc)
3570 {
3571         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3572         struct obd_statfs *msfs;
3573         __u64 used;
3574         ENTRY;
3575
3576         if (rc == -EBADR)
3577                 /* The request has in fact never been sent
3578                  * due to issues at a higher level (LOV).
3579                  * Exit immediately since the caller is
3580                  * aware of the problem and takes care
3581                  * of the clean up */
3582                  RETURN(rc);
3583
3584         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3585             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3586                 GOTO(out, rc = 0);
3587
3588         if (rc != 0)
3589                 GOTO(out, rc);
3590
3591         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3592         if (msfs == NULL) {
3593                 GOTO(out, rc = -EPROTO);
3594         }
3595
3596         /* Reinitialize the RDONLY and DEGRADED flags at the client
3597          * on each statfs, so they don't stay set permanently. */
3598         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3599
3600         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3601                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3602         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3603                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3604
3605         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3606                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3607         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3608                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3609
3610         /* Add a bit of hysteresis so this flag isn't continually flapping,
3611          * and ensure that new files don't get extremely fragmented due to
3612          * only a small amount of available space in the filesystem.
3613          * We want to set the NOSPC flag when there is less than ~0.1% free
3614          * and clear it when there is at least ~0.2% free space, so:
3615          *                   avail < ~0.1% max          max = avail + used
3616          *            1025 * avail < avail + used       used = blocks - free
3617          *            1024 * avail < used
3618          *            1024 * avail < blocks - free
3619          *                   avail < ((blocks - free) >> 10)
3620          *
3621          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3622          * lose that amount of space so in those cases we report no space left
3623          * if their is less than 1 GB left.                             */
3624         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3625         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3626                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3627                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3628         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3629                           (msfs->os_ffree > 64) &&
3630                           (msfs->os_bavail > (used << 1)))) {
3631                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3632                                              OSCC_FLAG_NOSPC_BLK);
3633         }
3634
3635         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3636                      (msfs->os_bavail < used)))
3637                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3638
3639         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3640
3641         *aa->aa_oi->oi_osfs = *msfs;
3642 out:
3643         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3644         RETURN(rc);
3645 }
3646
3647 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3648                             __u64 max_age, struct ptlrpc_request_set *rqset)
3649 {
3650         struct ptlrpc_request *req;
3651         struct osc_async_args *aa;
3652         int                    rc;
3653         ENTRY;
3654
3655         /* We could possibly pass max_age in the request (as an absolute
3656          * timestamp or a "seconds.usec ago") so the target can avoid doing
3657          * extra calls into the filesystem if that isn't necessary (e.g.
3658          * during mount that would help a bit).  Having relative timestamps
3659          * is not so great if request processing is slow, while absolute
3660          * timestamps are not ideal because they need time synchronization. */
3661         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3662         if (req == NULL)
3663                 RETURN(-ENOMEM);
3664
3665         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3666         if (rc) {
3667                 ptlrpc_request_free(req);
3668                 RETURN(rc);
3669         }
3670         ptlrpc_request_set_replen(req);
3671         req->rq_request_portal = OST_CREATE_PORTAL;
3672         ptlrpc_at_set_req_timeout(req);
3673
3674         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3675                 /* procfs requests not want stat in wait for avoid deadlock */
3676                 req->rq_no_resend = 1;
3677                 req->rq_no_delay = 1;
3678         }
3679
3680         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3681         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3682         aa = ptlrpc_req_async_args(req);
3683         aa->aa_oi = oinfo;
3684
3685         ptlrpc_set_add_req(rqset, req);
3686         RETURN(0);
3687 }
3688
3689 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3690                       __u64 max_age, __u32 flags)
3691 {
3692         struct obd_statfs     *msfs;
3693         struct ptlrpc_request *req;
3694         struct obd_import     *imp = NULL;
3695         int rc;
3696         ENTRY;
3697
3698         /*Since the request might also come from lprocfs, so we need
3699          *sync this with client_disconnect_export Bug15684*/
3700         cfs_down_read(&obd->u.cli.cl_sem);
3701         if (obd->u.cli.cl_import)
3702                 imp = class_import_get(obd->u.cli.cl_import);
3703         cfs_up_read(&obd->u.cli.cl_sem);
3704         if (!imp)
3705                 RETURN(-ENODEV);
3706
3707         /* We could possibly pass max_age in the request (as an absolute
3708          * timestamp or a "seconds.usec ago") so the target can avoid doing
3709          * extra calls into the filesystem if that isn't necessary (e.g.
3710          * during mount that would help a bit).  Having relative timestamps
3711          * is not so great if request processing is slow, while absolute
3712          * timestamps are not ideal because they need time synchronization. */
3713         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3714
3715         class_import_put(imp);
3716
3717         if (req == NULL)
3718                 RETURN(-ENOMEM);
3719
3720         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3721         if (rc) {
3722                 ptlrpc_request_free(req);
3723                 RETURN(rc);
3724         }
3725         ptlrpc_request_set_replen(req);
3726         req->rq_request_portal = OST_CREATE_PORTAL;
3727         ptlrpc_at_set_req_timeout(req);
3728
3729         if (flags & OBD_STATFS_NODELAY) {
3730                 /* procfs requests not want stat in wait for avoid deadlock */
3731                 req->rq_no_resend = 1;
3732                 req->rq_no_delay = 1;
3733         }
3734
3735         rc = ptlrpc_queue_wait(req);
3736         if (rc)
3737                 GOTO(out, rc);
3738
3739         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3740         if (msfs == NULL) {
3741                 GOTO(out, rc = -EPROTO);
3742         }
3743
3744         *osfs = *msfs;
3745
3746         EXIT;
3747  out:
3748         ptlrpc_req_finished(req);
3749         return rc;
3750 }
3751
3752 /* Retrieve object striping information.
3753  *
3754  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3755  * the maximum number of OST indices which will fit in the user buffer.
3756  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3757  */
3758 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3759 {
3760         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3761         struct lov_user_md_v3 lum, *lumk;
3762         struct lov_user_ost_data_v1 *lmm_objects;
3763         int rc = 0, lum_size;
3764         ENTRY;
3765
3766         if (!lsm)
3767                 RETURN(-ENODATA);
3768
3769         /* we only need the header part from user space to get lmm_magic and
3770          * lmm_stripe_count, (the header part is common to v1 and v3) */
3771         lum_size = sizeof(struct lov_user_md_v1);
3772         if (cfs_copy_from_user(&lum, lump, lum_size))
3773                 RETURN(-EFAULT);
3774
3775         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3776             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3777                 RETURN(-EINVAL);
3778
3779         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3780         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3781         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3782         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3783
3784         /* we can use lov_mds_md_size() to compute lum_size
3785          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3786         if (lum.lmm_stripe_count > 0) {
3787                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3788                 OBD_ALLOC(lumk, lum_size);
3789                 if (!lumk)
3790                         RETURN(-ENOMEM);
3791
3792                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3793                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3794                 else
3795                         lmm_objects = &(lumk->lmm_objects[0]);
3796                 lmm_objects->l_object_id = lsm->lsm_object_id;
3797         } else {
3798                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3799                 lumk = &lum;
3800         }
3801
3802         lumk->lmm_object_id = lsm->lsm_object_id;
3803         lumk->lmm_object_seq = lsm->lsm_object_seq;
3804         lumk->lmm_stripe_count = 1;
3805
3806         if (cfs_copy_to_user(lump, lumk, lum_size))
3807                 rc = -EFAULT;
3808
3809         if (lumk != &lum)
3810                 OBD_FREE(lumk, lum_size);
3811
3812         RETURN(rc);
3813 }
3814
3815
3816 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3817                          void *karg, void *uarg)
3818 {
3819         struct obd_device *obd = exp->exp_obd;
3820         struct obd_ioctl_data *data = karg;
3821         int err = 0;
3822         ENTRY;
3823
3824         if (!cfs_try_module_get(THIS_MODULE)) {
3825                 CERROR("Can't get module. Is it alive?");
3826                 return -EINVAL;
3827         }
3828         switch (cmd) {
3829         case OBD_IOC_LOV_GET_CONFIG: {
3830                 char *buf;
3831                 struct lov_desc *desc;
3832                 struct obd_uuid uuid;
3833
3834                 buf = NULL;
3835                 len = 0;
3836                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3837                         GOTO(out, err = -EINVAL);
3838
3839                 data = (struct obd_ioctl_data *)buf;
3840
3841                 if (sizeof(*desc) > data->ioc_inllen1) {
3842                         obd_ioctl_freedata(buf, len);
3843                         GOTO(out, err = -EINVAL);
3844                 }
3845
3846                 if (data->ioc_inllen2 < sizeof(uuid)) {
3847                         obd_ioctl_freedata(buf, len);
3848                         GOTO(out, err = -EINVAL);
3849                 }
3850
3851                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3852                 desc->ld_tgt_count = 1;
3853                 desc->ld_active_tgt_count = 1;
3854                 desc->ld_default_stripe_count = 1;
3855                 desc->ld_default_stripe_size = 0;
3856                 desc->ld_default_stripe_offset = 0;
3857                 desc->ld_pattern = 0;
3858                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3859
3860                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3861
3862                 err = cfs_copy_to_user((void *)uarg, buf, len);
3863                 if (err)
3864                         err = -EFAULT;
3865                 obd_ioctl_freedata(buf, len);
3866                 GOTO(out, err);
3867         }
3868         case LL_IOC_LOV_SETSTRIPE:
3869                 err = obd_alloc_memmd(exp, karg);
3870                 if (err > 0)
3871                         err = 0;
3872                 GOTO(out, err);
3873         case LL_IOC_LOV_GETSTRIPE:
3874                 err = osc_getstripe(karg, uarg);
3875                 GOTO(out, err);
3876         case OBD_IOC_CLIENT_RECOVER:
3877                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3878                                             data->ioc_inlbuf1, 0);
3879                 if (err > 0)
3880                         err = 0;
3881                 GOTO(out, err);
3882         case IOC_OSC_SET_ACTIVE:
3883                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3884                                                data->ioc_offset);
3885                 GOTO(out, err);
3886         case OBD_IOC_POLL_QUOTACHECK:
3887                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3888                 GOTO(out, err);
3889         case OBD_IOC_PING_TARGET:
3890                 err = ptlrpc_obd_ping(obd);
3891                 GOTO(out, err);
3892         default:
3893                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3894                        cmd, cfs_curproc_comm());
3895                 GOTO(out, err = -ENOTTY);
3896         }
3897 out:
3898         cfs_module_put(THIS_MODULE);
3899         return err;
3900 }
3901
3902 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3903                         void *key, __u32 *vallen, void *val,
3904                         struct lov_stripe_md *lsm)
3905 {
3906         ENTRY;
3907         if (!vallen || !val)
3908                 RETURN(-EFAULT);
3909
3910         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3911                 __u32 *stripe = val;
3912                 *vallen = sizeof(*stripe);
3913                 *stripe = 0;
3914                 RETURN(0);
3915         } else if (KEY_IS(KEY_LAST_ID)) {
3916                 struct ptlrpc_request *req;
3917                 obd_id                *reply;
3918                 char                  *tmp;
3919                 int                    rc;
3920
3921                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3922                                            &RQF_OST_GET_INFO_LAST_ID);
3923                 if (req == NULL)
3924                         RETURN(-ENOMEM);
3925
3926                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3927                                      RCL_CLIENT, keylen);
3928                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3929                 if (rc) {
3930                         ptlrpc_request_free(req);
3931                         RETURN(rc);
3932                 }
3933
3934                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3935                 memcpy(tmp, key, keylen);
3936
3937                 req->rq_no_delay = req->rq_no_resend = 1;
3938                 ptlrpc_request_set_replen(req);
3939                 rc = ptlrpc_queue_wait(req);
3940                 if (rc)
3941                         GOTO(out, rc);
3942
3943                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3944                 if (reply == NULL)
3945                         GOTO(out, rc = -EPROTO);
3946
3947                 *((obd_id *)val) = *reply;
3948         out:
3949                 ptlrpc_req_finished(req);
3950                 RETURN(rc);
3951         } else if (KEY_IS(KEY_FIEMAP)) {
3952                 struct ptlrpc_request *req;
3953                 struct ll_user_fiemap *reply;
3954                 char *tmp;
3955                 int rc;
3956
3957                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3958                                            &RQF_OST_GET_INFO_FIEMAP);
3959                 if (req == NULL)
3960                         RETURN(-ENOMEM);
3961
3962                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3963                                      RCL_CLIENT, keylen);
3964                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3965                                      RCL_CLIENT, *vallen);
3966                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3967                                      RCL_SERVER, *vallen);
3968
3969                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3970                 if (rc) {
3971                         ptlrpc_request_free(req);
3972                         RETURN(rc);
3973                 }
3974
3975                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3976                 memcpy(tmp, key, keylen);
3977                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3978                 memcpy(tmp, val, *vallen);
3979
3980                 ptlrpc_request_set_replen(req);
3981                 rc = ptlrpc_queue_wait(req);
3982                 if (rc)
3983                         GOTO(out1, rc);
3984
3985                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3986                 if (reply == NULL)
3987                         GOTO(out1, rc = -EPROTO);
3988
3989                 memcpy(val, reply, *vallen);
3990         out1:
3991                 ptlrpc_req_finished(req);
3992
3993                 RETURN(rc);
3994         }
3995
3996         RETURN(-EINVAL);
3997 }
3998
3999 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4000 {
4001         struct llog_ctxt *ctxt;
4002         int rc = 0;
4003         ENTRY;
4004
4005         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4006         if (ctxt) {
4007                 rc = llog_initiator_connect(ctxt);
4008                 llog_ctxt_put(ctxt);
4009         } else {
4010                 /* XXX return an error? skip setting below flags? */
4011         }
4012
4013         cfs_spin_lock(&imp->imp_lock);
4014         imp->imp_server_timeout = 1;
4015         imp->imp_pingable = 1;
4016         cfs_spin_unlock(&imp->imp_lock);
4017         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4018
4019         RETURN(rc);
4020 }
4021
4022 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4023                                           struct ptlrpc_request *req,
4024                                           void *aa, int rc)
4025 {
4026         ENTRY;
4027         if (rc != 0)
4028                 RETURN(rc);
4029
4030         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4031 }
4032
4033 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4034                               void *key, obd_count vallen, void *val,
4035                               struct ptlrpc_request_set *set)
4036 {
4037         struct ptlrpc_request *req;
4038         struct obd_device     *obd = exp->exp_obd;
4039         struct obd_import     *imp = class_exp2cliimp(exp);
4040         char                  *tmp;
4041         int                    rc;
4042         ENTRY;
4043
4044         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4045
4046         if (KEY_IS(KEY_NEXT_ID)) {
4047                 obd_id new_val;
4048                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4049
4050                 if (vallen != sizeof(obd_id))
4051                         RETURN(-ERANGE);
4052                 if (val == NULL)
4053                         RETURN(-EINVAL);
4054
4055                 if (vallen != sizeof(obd_id))
4056                         RETURN(-EINVAL);
4057
4058                 /* avoid race between allocate new object and set next id
4059                  * from ll_sync thread */
4060                 cfs_spin_lock(&oscc->oscc_lock);
4061                 new_val = *((obd_id*)val) + 1;
4062                 if (new_val > oscc->oscc_next_id)
4063                         oscc->oscc_next_id = new_val;
4064                 cfs_spin_unlock(&oscc->oscc_lock);
4065                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4066                        exp->exp_obd->obd_name,
4067                        obd->u.cli.cl_oscc.oscc_next_id);
4068
4069                 RETURN(0);
4070         }
4071
4072         if (KEY_IS(KEY_CHECKSUM)) {
4073                 if (vallen != sizeof(int))
4074                         RETURN(-EINVAL);
4075                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4076                 RETURN(0);
4077         }
4078
4079         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4080                 sptlrpc_conf_client_adapt(obd);
4081                 RETURN(0);
4082         }
4083
4084         if (KEY_IS(KEY_FLUSH_CTX)) {
4085                 sptlrpc_import_flush_my_ctx(imp);
4086                 RETURN(0);
4087         }
4088
4089         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4090                 RETURN(-EINVAL);
4091
4092         /* We pass all other commands directly to OST. Since nobody calls osc
4093            methods directly and everybody is supposed to go through LOV, we
4094            assume lov checked invalid values for us.
4095            The only recognised values so far are evict_by_nid and mds_conn.
4096            Even if something bad goes through, we'd get a -EINVAL from OST
4097            anyway. */
4098
4099         if (KEY_IS(KEY_GRANT_SHRINK))
4100                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4101         else
4102                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4103
4104         if (req == NULL)
4105                 RETURN(-ENOMEM);
4106
4107         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4108                              RCL_CLIENT, keylen);
4109         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4110                              RCL_CLIENT, vallen);
4111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4112         if (rc) {
4113                 ptlrpc_request_free(req);
4114                 RETURN(rc);
4115         }
4116
4117         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4118         memcpy(tmp, key, keylen);
4119         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4120         memcpy(tmp, val, vallen);
4121
4122         if (KEY_IS(KEY_MDS_CONN)) {
4123                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4124
4125                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4126                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4127                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4128                 req->rq_no_delay = req->rq_no_resend = 1;
4129                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4130         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4131                 struct osc_grant_args *aa;
4132                 struct obdo *oa;
4133
4134                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4135                 aa = ptlrpc_req_async_args(req);
4136                 OBDO_ALLOC(oa);
4137                 if (!oa) {
4138                         ptlrpc_req_finished(req);
4139                         RETURN(-ENOMEM);
4140                 }
4141                 *oa = ((struct ost_body *)val)->oa;
4142                 aa->aa_oa = oa;
4143                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4144         }
4145
4146         ptlrpc_request_set_replen(req);
4147         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4148                 LASSERT(set != NULL);
4149                 ptlrpc_set_add_req(set, req);
4150                 ptlrpc_check_set(NULL, set);
4151         } else
4152                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4153
4154         RETURN(0);
4155 }
4156
4157
4158 static struct llog_operations osc_size_repl_logops = {
4159         lop_cancel: llog_obd_repl_cancel
4160 };
4161
4162 static struct llog_operations osc_mds_ost_orig_logops;
4163
4164 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4165                            struct obd_device *tgt, struct llog_catid *catid)
4166 {
4167         int rc;
4168         ENTRY;
4169
4170         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4171                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4172         if (rc) {
4173                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4174                 GOTO(out, rc);
4175         }
4176
4177         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4178                         NULL, &osc_size_repl_logops);
4179         if (rc) {
4180                 struct llog_ctxt *ctxt =
4181                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4182                 if (ctxt)
4183                         llog_cleanup(ctxt);
4184                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4185         }
4186         GOTO(out, rc);
4187 out:
4188         if (rc) {
4189                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4190                        obd->obd_name, tgt->obd_name, catid, rc);
4191                 CERROR("logid "LPX64":0x%x\n",
4192                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4193         }
4194         return rc;
4195 }
4196
4197 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4198                          struct obd_device *disk_obd, int *index)
4199 {
4200         struct llog_catid catid;
4201         static char name[32] = CATLIST;
4202         int rc;
4203         ENTRY;
4204
4205         LASSERT(olg == &obd->obd_olg);
4206
4207         cfs_mutex_down(&olg->olg_cat_processing);
4208         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4209         if (rc) {
4210                 CERROR("rc: %d\n", rc);
4211                 GOTO(out, rc);
4212         }
4213
4214         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4215                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4216                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4217
4218         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4219         if (rc) {
4220                 CERROR("rc: %d\n", rc);
4221                 GOTO(out, rc);
4222         }
4223
4224         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4225         if (rc) {
4226                 CERROR("rc: %d\n", rc);
4227                 GOTO(out, rc);
4228         }
4229
4230  out:
4231         cfs_mutex_up(&olg->olg_cat_processing);
4232
4233         return rc;
4234 }
4235
4236 static int osc_llog_finish(struct obd_device *obd, int count)
4237 {
4238         struct llog_ctxt *ctxt;
4239         int rc = 0, rc2 = 0;
4240         ENTRY;
4241
4242         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4243         if (ctxt)
4244                 rc = llog_cleanup(ctxt);
4245
4246         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4247         if (ctxt)
4248                 rc2 = llog_cleanup(ctxt);
4249         if (!rc)
4250                 rc = rc2;
4251
4252         RETURN(rc);
4253 }
4254
4255 static int osc_reconnect(const struct lu_env *env,
4256                          struct obd_export *exp, struct obd_device *obd,
4257                          struct obd_uuid *cluuid,
4258                          struct obd_connect_data *data,
4259                          void *localdata)
4260 {
4261         struct client_obd *cli = &obd->u.cli;
4262
4263         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4264                 long lost_grant;
4265
4266                 client_obd_list_lock(&cli->cl_loi_list_lock);
4267                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4268                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4269                 lost_grant = cli->cl_lost_grant;
4270                 cli->cl_lost_grant = 0;
4271                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4272
4273                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4274                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4275                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4276                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4277                        " ocd_grant: %d\n", data->ocd_connect_flags,
4278                        data->ocd_version, data->ocd_grant);
4279         }
4280
4281         RETURN(0);
4282 }
4283
4284 static int osc_disconnect(struct obd_export *exp)
4285 {
4286         struct obd_device *obd = class_exp2obd(exp);
4287         struct llog_ctxt  *ctxt;
4288         int rc;
4289
4290         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4291         if (ctxt) {
4292                 if (obd->u.cli.cl_conn_count == 1) {
4293                         /* Flush any remaining cancel messages out to the
4294                          * target */
4295                         llog_sync(ctxt, exp);
4296                 }
4297                 llog_ctxt_put(ctxt);
4298         } else {
4299                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4300                        obd);
4301         }
4302
4303         rc = client_disconnect_export(exp);
4304         /**
4305          * Initially we put del_shrink_grant before disconnect_export, but it
4306          * causes the following problem if setup (connect) and cleanup
4307          * (disconnect) are tangled together.
4308          *      connect p1                     disconnect p2
4309          *   ptlrpc_connect_import
4310          *     ...............               class_manual_cleanup
4311          *                                     osc_disconnect
4312          *                                     del_shrink_grant
4313          *   ptlrpc_connect_interrupt
4314          *     init_grant_shrink
4315          *   add this client to shrink list
4316          *                                      cleanup_osc
4317          * Bang! pinger trigger the shrink.
4318          * So the osc should be disconnected from the shrink list, after we
4319          * are sure the import has been destroyed. BUG18662
4320          */
4321         if (obd->u.cli.cl_import == NULL)
4322                 osc_del_shrink_grant(&obd->u.cli);
4323         return rc;
4324 }
4325
4326 static int osc_import_event(struct obd_device *obd,
4327                             struct obd_import *imp,
4328                             enum obd_import_event event)
4329 {
4330         struct client_obd *cli;
4331         int rc = 0;
4332
4333         ENTRY;
4334         LASSERT(imp->imp_obd == obd);
4335
4336         switch (event) {
4337         case IMP_EVENT_DISCON: {
4338                 /* Only do this on the MDS OSC's */
4339                 if (imp->imp_server_timeout) {
4340                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4341
4342                         cfs_spin_lock(&oscc->oscc_lock);
4343                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4344                         cfs_spin_unlock(&oscc->oscc_lock);
4345                 }
4346                 cli = &obd->u.cli;
4347                 client_obd_list_lock(&cli->cl_loi_list_lock);
4348                 cli->cl_avail_grant = 0;
4349                 cli->cl_lost_grant = 0;
4350                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4351                 break;
4352         }
4353         case IMP_EVENT_INACTIVE: {
4354                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4355                 break;
4356         }
4357         case IMP_EVENT_INVALIDATE: {
4358                 struct ldlm_namespace *ns = obd->obd_namespace;
4359                 struct lu_env         *env;
4360                 int                    refcheck;
4361
4362                 env = cl_env_get(&refcheck);
4363                 if (!IS_ERR(env)) {
4364                         /* Reset grants */
4365                         cli = &obd->u.cli;
4366                         client_obd_list_lock(&cli->cl_loi_list_lock);
4367                         /* all pages go to failing rpcs due to the invalid
4368                          * import */
4369                         osc_check_rpcs(env, cli);
4370                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4371
4372                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4373                         cl_env_put(env, &refcheck);
4374                 } else
4375                         rc = PTR_ERR(env);
4376                 break;
4377         }
4378         case IMP_EVENT_ACTIVE: {
4379                 /* Only do this on the MDS OSC's */
4380                 if (imp->imp_server_timeout) {
4381                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4382
4383                         cfs_spin_lock(&oscc->oscc_lock);
4384                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4385                                               OSCC_FLAG_NOSPC_BLK);
4386                         cfs_spin_unlock(&oscc->oscc_lock);
4387                 }
4388                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4389                 break;
4390         }
4391         case IMP_EVENT_OCD: {
4392                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4393
4394                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4395                         osc_init_grant(&obd->u.cli, ocd);
4396
4397                 /* See bug 7198 */
4398                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4399                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4400
4401                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4402                 break;
4403         }
4404         case IMP_EVENT_DEACTIVATE: {
4405                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4406                 break;
4407         }
4408         case IMP_EVENT_ACTIVATE: {
4409                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4410                 break;
4411         }
4412         default:
4413                 CERROR("Unknown import event %d\n", event);
4414                 LBUG();
4415         }
4416         RETURN(rc);
4417 }
4418
4419 /**
4420  * Determine whether the lock can be canceled before replaying the lock
4421  * during recovery, see bug16774 for detailed information.
4422  *
4423  * \retval zero the lock can't be canceled
4424  * \retval other ok to cancel
4425  */
4426 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4427 {
4428         check_res_locked(lock->l_resource);
4429
4430         /*
4431          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4432          *
4433          * XXX as a future improvement, we can also cancel unused write lock
4434          * if it doesn't have dirty data and active mmaps.
4435          */
4436         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4437             (lock->l_granted_mode == LCK_PR ||
4438              lock->l_granted_mode == LCK_CR) &&
4439             (osc_dlm_lock_pageref(lock) == 0))
4440                 RETURN(1);
4441
4442         RETURN(0);
4443 }
4444
4445 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4446 {
4447         int rc;
4448         ENTRY;
4449
4450         ENTRY;
4451         rc = ptlrpcd_addref();
4452         if (rc)
4453                 RETURN(rc);
4454
4455         rc = client_obd_setup(obd, lcfg);
4456         if (rc) {
4457                 ptlrpcd_decref();
4458         } else {
4459                 struct lprocfs_static_vars lvars = { 0 };
4460                 struct client_obd *cli = &obd->u.cli;
4461
4462                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4463                 lprocfs_osc_init_vars(&lvars);
4464                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4465                         lproc_osc_attach_seqstat(obd);
4466                         sptlrpc_lprocfs_cliobd_attach(obd);
4467                         ptlrpc_lprocfs_register_obd(obd);
4468                 }
4469
4470                 oscc_init(obd);
4471                 /* We need to allocate a few requests more, because
4472                    brw_interpret tries to create new requests before freeing
4473                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4474                    reserved, but I afraid that might be too much wasted RAM
4475                    in fact, so 2 is just my guess and still should work. */
4476                 cli->cl_import->imp_rq_pool =
4477                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4478                                             OST_MAXREQSIZE,
4479                                             ptlrpc_add_rqs_to_pool);
4480
4481                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4482                 cfs_sema_init(&cli->cl_grant_sem, 1);
4483
4484                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4485         }
4486
4487         RETURN(rc);
4488 }
4489
4490 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4491 {
4492         int rc = 0;
4493         ENTRY;
4494
4495         switch (stage) {
4496         case OBD_CLEANUP_EARLY: {
4497                 struct obd_import *imp;
4498                 imp = obd->u.cli.cl_import;
4499                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4500                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4501                 ptlrpc_deactivate_import(imp);
4502                 cfs_spin_lock(&imp->imp_lock);
4503                 imp->imp_pingable = 0;
4504                 cfs_spin_unlock(&imp->imp_lock);
4505                 break;
4506         }
4507         case OBD_CLEANUP_EXPORTS: {
4508                 /* LU-464
4509                  * for echo client, export may be on zombie list, wait for
4510                  * zombie thread to cull it, because cli.cl_import will be
4511                  * cleared in client_disconnect_export():
4512                  *   class_export_destroy() -> obd_cleanup() ->
4513                  *   echo_device_free() -> echo_client_cleanup() ->
4514                  *   obd_disconnect() -> osc_disconnect() ->
4515                  *   client_disconnect_export()
4516                  */
4517                 obd_zombie_barrier();
4518                 obd_cleanup_client_import(obd);
4519                 ptlrpc_lprocfs_unregister_obd(obd);
4520                 lprocfs_obd_cleanup(obd);
4521                 rc = obd_llog_finish(obd, 0);
4522                 if (rc != 0)
4523                         CERROR("failed to cleanup llogging subsystems\n");
4524                 break;
4525                 }
4526         }
4527         RETURN(rc);
4528 }
4529
4530 int osc_cleanup(struct obd_device *obd)
4531 {
4532         int rc;
4533
4534         ENTRY;
4535
4536         /* free memory of osc quota cache */
4537         osc_quota_cleanup(obd);
4538
4539         rc = client_obd_cleanup(obd);
4540
4541         ptlrpcd_decref();
4542         RETURN(rc);
4543 }
4544
4545 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4546 {
4547         struct lprocfs_static_vars lvars = { 0 };
4548         int rc = 0;
4549
4550         lprocfs_osc_init_vars(&lvars);
4551
4552         switch (lcfg->lcfg_command) {
4553         default:
4554                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4555                                               lcfg, obd);
4556                 if (rc > 0)
4557                         rc = 0;
4558                 break;
4559         }
4560
4561         return(rc);
4562 }
4563
4564 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4565 {
4566         return osc_process_config_base(obd, buf);
4567 }
4568
4569 struct obd_ops osc_obd_ops = {
4570         .o_owner                = THIS_MODULE,
4571         .o_setup                = osc_setup,
4572         .o_precleanup           = osc_precleanup,
4573         .o_cleanup              = osc_cleanup,
4574         .o_add_conn             = client_import_add_conn,
4575         .o_del_conn             = client_import_del_conn,
4576         .o_connect              = client_connect_import,
4577         .o_reconnect            = osc_reconnect,
4578         .o_disconnect           = osc_disconnect,
4579         .o_statfs               = osc_statfs,
4580         .o_statfs_async         = osc_statfs_async,
4581         .o_packmd               = osc_packmd,
4582         .o_unpackmd             = osc_unpackmd,
4583         .o_precreate            = osc_precreate,
4584         .o_create               = osc_create,
4585         .o_create_async         = osc_create_async,
4586         .o_destroy              = osc_destroy,
4587         .o_getattr              = osc_getattr,
4588         .o_getattr_async        = osc_getattr_async,
4589         .o_setattr              = osc_setattr,
4590         .o_setattr_async        = osc_setattr_async,
4591         .o_brw                  = osc_brw,
4592         .o_punch                = osc_punch,
4593         .o_sync                 = osc_sync,
4594         .o_enqueue              = osc_enqueue,
4595         .o_change_cbdata        = osc_change_cbdata,
4596         .o_find_cbdata          = osc_find_cbdata,
4597         .o_cancel               = osc_cancel,
4598         .o_cancel_unused        = osc_cancel_unused,
4599         .o_iocontrol            = osc_iocontrol,
4600         .o_get_info             = osc_get_info,
4601         .o_set_info_async       = osc_set_info_async,
4602         .o_import_event         = osc_import_event,
4603         .o_llog_init            = osc_llog_init,
4604         .o_llog_finish          = osc_llog_finish,
4605         .o_process_config       = osc_process_config,
4606         .o_quotactl             = osc_quotactl,
4607         .o_quotacheck           = osc_quotacheck,
4608         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
4609 };
4610
4611 extern struct lu_kmem_descr osc_caches[];
4612 extern cfs_spinlock_t       osc_ast_guard;
4613 extern cfs_lock_class_key_t osc_ast_guard_class;
4614
4615 int __init osc_init(void)
4616 {
4617         struct lprocfs_static_vars lvars = { 0 };
4618         int rc;
4619         ENTRY;
4620
4621         /* print an address of _any_ initialized kernel symbol from this
4622          * module, to allow debugging with gdb that doesn't support data
4623          * symbols from modules.*/
4624         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4625
4626         rc = lu_kmem_init(osc_caches);
4627
4628         lprocfs_osc_init_vars(&lvars);
4629
4630         osc_quota_init();
4631         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4632                                  LUSTRE_OSC_NAME, &osc_device_type);
4633         if (rc) {
4634                 lu_kmem_fini(osc_caches);
4635                 RETURN(rc);
4636         }
4637
4638         cfs_spin_lock_init(&osc_ast_guard);
4639         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4640
4641         osc_mds_ost_orig_logops = llog_lvfs_ops;
4642         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4643         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4644         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4645         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4646
4647         RETURN(rc);
4648 }
4649
4650 #ifdef __KERNEL__
4651 static void /*__exit*/ osc_exit(void)
4652 {
4653         lu_device_type_fini(&osc_device_type);
4654
4655         osc_quota_exit();
4656         class_unregister_type(LUSTRE_OSC_NAME);
4657         lu_kmem_fini(osc_caches);
4658 }
4659
4660 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4661 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4662 MODULE_LICENSE("GPL");
4663
4664 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4665 #endif