Whamcloud - gitweb
ad912ec98c6ba6f17ce78868faa3946b9289fabc
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
188         osc_pack_capa(req, body, oinfo->oi_capa);
189 }
190
191 static inline void osc_set_capa_size(struct ptlrpc_request *req,
192                                      const struct req_msg_field *field,
193                                      struct obd_capa *oc)
194 {
195         if (oc == NULL)
196                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
197         else
198                 /* it is already calculated as sizeof struct obd_capa */
199                 ;
200 }
201
202 static int osc_getattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body) {
214                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
215                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
216
217                 /* This should really be sent by the OST */
218                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
219                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
220         } else {
221                 CDEBUG(D_INFO, "can't unpack ost_body\n");
222                 rc = -EPROTO;
223                 aa->aa_oi->oi_oa->o_valid = 0;
224         }
225 out:
226         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
227         RETURN(rc);
228 }
229
230 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
231                              struct ptlrpc_request_set *set)
232 {
233         struct ptlrpc_request *req;
234         struct osc_async_args *aa;
235         int                    rc;
236         ENTRY;
237
238         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
239         if (req == NULL)
240                 RETURN(-ENOMEM);
241
242         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
243         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
244         if (rc) {
245                 ptlrpc_request_free(req);
246                 RETURN(rc);
247         }
248
249         osc_pack_req_body(req, oinfo);
250
251         ptlrpc_request_set_replen(req);
252         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
253
254         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
255         aa = ptlrpc_req_async_args(req);
256         aa->aa_oi = oinfo;
257
258         ptlrpc_set_add_req(set, req);
259         RETURN(0);
260 }
261
262 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
263                        struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298
299         EXIT;
300  out:
301         ptlrpc_req_finished(req);
302         return rc;
303 }
304
305 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306                        struct obd_info *oinfo, struct obd_trans_info *oti)
307 {
308         struct ptlrpc_request *req;
309         struct ost_body       *body;
310         int                    rc;
311         ENTRY;
312
313         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
314
315         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316         if (req == NULL)
317                 RETURN(-ENOMEM);
318
319         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
321         if (rc) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325
326         osc_pack_req_body(req, oinfo);
327
328         ptlrpc_request_set_replen(req);
329
330         rc = ptlrpc_queue_wait(req);
331         if (rc)
332                 GOTO(out, rc);
333
334         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335         if (body == NULL)
336                 GOTO(out, rc = -EPROTO);
337
338         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
339
340         EXIT;
341 out:
342         ptlrpc_req_finished(req);
343         RETURN(rc);
344 }
345
346 static int osc_setattr_interpret(const struct lu_env *env,
347                                  struct ptlrpc_request *req,
348                                  struct osc_setattr_args *sa, int rc)
349 {
350         struct ost_body *body;
351         ENTRY;
352
353         if (rc != 0)
354                 GOTO(out, rc);
355
356         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
357         if (body == NULL)
358                 GOTO(out, rc = -EPROTO);
359
360         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
361 out:
362         rc = sa->sa_upcall(sa->sa_cookie, rc);
363         RETURN(rc);
364 }
365
366 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
367                            struct obd_trans_info *oti,
368                            obd_enqueue_update_f upcall, void *cookie,
369                            struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request   *req;
372         struct osc_setattr_args *sa;
373         int                      rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         /* do mds to ost setattr asynchronously */
395         if (!rqset) {
396                 /* Do not wait for response. */
397                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
398         } else {
399                 req->rq_interpret_reply =
400                         (ptlrpc_interpterer_t)osc_setattr_interpret;
401
402                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
403                 sa = ptlrpc_req_async_args(req);
404                 sa->sa_oa = oinfo->oi_oa;
405                 sa->sa_upcall = upcall;
406                 sa->sa_cookie = cookie;
407
408                 if (rqset == PTLRPCD_SET)
409                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
410                 else
411                         ptlrpc_set_add_req(rqset, req);
412         }
413
414         RETURN(0);
415 }
416
417 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
418                              struct obd_trans_info *oti,
419                              struct ptlrpc_request_set *rqset)
420 {
421         return osc_setattr_async_base(exp, oinfo, oti,
422                                       oinfo->oi_cb_up, oinfo, rqset);
423 }
424
425 int osc_real_create(struct obd_export *exp, struct obdo *oa,
426                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
427 {
428         struct ptlrpc_request *req;
429         struct ost_body       *body;
430         struct lov_stripe_md  *lsm;
431         int                    rc;
432         ENTRY;
433
434         LASSERT(oa);
435         LASSERT(ea);
436
437         lsm = *ea;
438         if (!lsm) {
439                 rc = obd_alloc_memmd(exp, &lsm);
440                 if (rc < 0)
441                         RETURN(rc);
442         }
443
444         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
445         if (req == NULL)
446                 GOTO(out, rc = -ENOMEM);
447
448         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 GOTO(out, rc);
452         }
453
454         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
455         LASSERT(body);
456         lustre_set_wire_obdo(&body->oa, oa);
457
458         ptlrpc_request_set_replen(req);
459
460         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
461             oa->o_flags == OBD_FL_DELORPHAN) {
462                 DEBUG_REQ(D_HA, req,
463                           "delorphan from OST integration");
464                 /* Don't resend the delorphan req */
465                 req->rq_no_resend = req->rq_no_delay = 1;
466         }
467
468         rc = ptlrpc_queue_wait(req);
469         if (rc)
470                 GOTO(out_req, rc);
471
472         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
473         if (body == NULL)
474                 GOTO(out_req, rc = -EPROTO);
475
476         lustre_get_wire_obdo(oa, &body->oa);
477
478         oa->o_blksize = cli_brw_size(exp->exp_obd);
479         oa->o_valid |= OBD_MD_FLBLKSZ;
480
481         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
482          * have valid lsm_oinfo data structs, so don't go touching that.
483          * This needs to be fixed in a big way.
484          */
485         lsm->lsm_oi = oa->o_oi;
486         *ea = lsm;
487
488         if (oti != NULL) {
489                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
490
491                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
492                         if (!oti->oti_logcookies)
493                                 oti_alloc_cookies(oti, 1);
494                         *oti->oti_logcookies = oa->o_lcookie;
495                 }
496         }
497
498         CDEBUG(D_HA, "transno: "LPD64"\n",
499                lustre_msg_get_transno(req->rq_repmsg));
500 out_req:
501         ptlrpc_req_finished(req);
502 out:
503         if (rc && !*ea)
504                 obd_free_memmd(exp, &lsm);
505         RETURN(rc);
506 }
507
508 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
509                    obd_enqueue_update_f upcall, void *cookie,
510                    struct ptlrpc_request_set *rqset)
511 {
512         struct ptlrpc_request   *req;
513         struct osc_setattr_args *sa;
514         struct ost_body         *body;
515         int                      rc;
516         ENTRY;
517
518         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
519         if (req == NULL)
520                 RETURN(-ENOMEM);
521
522         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
523         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
524         if (rc) {
525                 ptlrpc_request_free(req);
526                 RETURN(rc);
527         }
528         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
529         ptlrpc_at_set_req_timeout(req);
530
531         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
532         LASSERT(body);
533         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
534         osc_pack_capa(req, body, oinfo->oi_capa);
535
536         ptlrpc_request_set_replen(req);
537
538         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
539         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
540         sa = ptlrpc_req_async_args(req);
541         sa->sa_oa     = oinfo->oi_oa;
542         sa->sa_upcall = upcall;
543         sa->sa_cookie = cookie;
544         if (rqset == PTLRPCD_SET)
545                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
546         else
547                 ptlrpc_set_add_req(rqset, req);
548
549         RETURN(0);
550 }
551
552 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
553                      struct obd_info *oinfo, struct obd_trans_info *oti,
554                      struct ptlrpc_request_set *rqset)
555 {
556         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
557         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
558         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
559         return osc_punch_base(exp, oinfo,
560                               oinfo->oi_cb_up, oinfo, rqset);
561 }
562
563 static int osc_sync_interpret(const struct lu_env *env,
564                               struct ptlrpc_request *req,
565                               void *arg, int rc)
566 {
567         struct osc_fsync_args *fa = arg;
568         struct ost_body *body;
569         ENTRY;
570
571         if (rc)
572                 GOTO(out, rc);
573
574         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575         if (body == NULL) {
576                 CERROR ("can't unpack ost_body\n");
577                 GOTO(out, rc = -EPROTO);
578         }
579
580         *fa->fa_oi->oi_oa = body->oa;
581 out:
582         rc = fa->fa_upcall(fa->fa_cookie, rc);
583         RETURN(rc);
584 }
585
586 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
587                   obd_enqueue_update_f upcall, void *cookie,
588                   struct ptlrpc_request_set *rqset)
589 {
590         struct ptlrpc_request *req;
591         struct ost_body       *body;
592         struct osc_fsync_args *fa;
593         int                    rc;
594         ENTRY;
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597         if (req == NULL)
598                 RETURN(-ENOMEM);
599
600         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
601         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         /* overload the size and blocks fields in the oa with start/end */
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
611         osc_pack_capa(req, body, oinfo->oi_capa);
612
613         ptlrpc_request_set_replen(req);
614         req->rq_interpret_reply = osc_sync_interpret;
615
616         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
617         fa = ptlrpc_req_async_args(req);
618         fa->fa_oi = oinfo;
619         fa->fa_upcall = upcall;
620         fa->fa_cookie = cookie;
621
622         if (rqset == PTLRPCD_SET)
623                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
624         else
625                 ptlrpc_set_add_req(rqset, req);
626
627         RETURN (0);
628 }
629
630 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
631                     struct obd_info *oinfo, obd_size start, obd_size end,
632                     struct ptlrpc_request_set *set)
633 {
634         ENTRY;
635
636         if (!oinfo->oi_oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         oinfo->oi_oa->o_size = start;
642         oinfo->oi_oa->o_blocks = end;
643         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
644
645         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
646 }
647
648 /* Find and cancel locally locks matched by @mode in the resource found by
649  * @objid. Found locks are added into @cancel list. Returns the amount of
650  * locks added to @cancels list. */
651 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
652                                    cfs_list_t *cancels,
653                                    ldlm_mode_t mode, int lock_flags)
654 {
655         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
656         struct ldlm_res_id res_id;
657         struct ldlm_resource *res;
658         int count;
659         ENTRY;
660
661         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
662          * export) but disabled through procfs (flag in NS).
663          *
664          * This distinguishes from a case when ELC is not supported originally,
665          * when we still want to cancel locks in advance and just cancel them
666          * locally, without sending any RPC. */
667         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
668                 RETURN(0);
669
670         ostid_build_res_name(&oa->o_oi, &res_id);
671         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
672         if (res == NULL)
673                 RETURN(0);
674
675         LDLM_RESOURCE_ADDREF(res);
676         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
677                                            lock_flags, 0, NULL);
678         LDLM_RESOURCE_DELREF(res);
679         ldlm_resource_putref(res);
680         RETURN(count);
681 }
682
683 static int osc_destroy_interpret(const struct lu_env *env,
684                                  struct ptlrpc_request *req, void *data,
685                                  int rc)
686 {
687         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
688
689         cfs_atomic_dec(&cli->cl_destroy_in_flight);
690         cfs_waitq_signal(&cli->cl_destroy_waitq);
691         return 0;
692 }
693
694 static int osc_can_send_destroy(struct client_obd *cli)
695 {
696         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
697             cli->cl_max_rpcs_in_flight) {
698                 /* The destroy request can be sent */
699                 return 1;
700         }
701         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
702             cli->cl_max_rpcs_in_flight) {
703                 /*
704                  * The counter has been modified between the two atomic
705                  * operations.
706                  */
707                 cfs_waitq_signal(&cli->cl_destroy_waitq);
708         }
709         return 0;
710 }
711
712 int osc_create(const struct lu_env *env, struct obd_export *exp,
713                struct obdo *oa, struct lov_stripe_md **ea,
714                struct obd_trans_info *oti)
715 {
716         int rc = 0;
717         ENTRY;
718
719         LASSERT(oa);
720         LASSERT(ea);
721         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
722
723         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
724             oa->o_flags == OBD_FL_RECREATE_OBJS) {
725                 RETURN(osc_real_create(exp, oa, ea, oti));
726         }
727
728         if (!fid_seq_is_mdt(oa->o_seq))
729                 RETURN(osc_real_create(exp, oa, ea, oti));
730
731         /* we should not get here anymore */
732         LBUG();
733
734         RETURN(rc);
735 }
736
737 /* Destroy requests can be async always on the client, and we don't even really
738  * care about the return code since the client cannot do anything at all about
739  * a destroy failure.
740  * When the MDS is unlinking a filename, it saves the file objects into a
741  * recovery llog, and these object records are cancelled when the OST reports
742  * they were destroyed and sync'd to disk (i.e. transaction committed).
743  * If the client dies, or the OST is down when the object should be destroyed,
744  * the records are not cancelled, and when the OST reconnects to the MDS next,
745  * it will retrieve the llog unlink logs and then sends the log cancellation
746  * cookies to the MDS after committing destroy transactions. */
747 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
748                        struct obdo *oa, struct lov_stripe_md *ea,
749                        struct obd_trans_info *oti, struct obd_export *md_export,
750                        void *capa)
751 {
752         struct client_obd     *cli = &exp->exp_obd->u.cli;
753         struct ptlrpc_request *req;
754         struct ost_body       *body;
755         CFS_LIST_HEAD(cancels);
756         int rc, count;
757         ENTRY;
758
759         if (!oa) {
760                 CDEBUG(D_INFO, "oa NULL\n");
761                 RETURN(-EINVAL);
762         }
763
764         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
765                                         LDLM_FL_DISCARD_DATA);
766
767         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
768         if (req == NULL) {
769                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
770                 RETURN(-ENOMEM);
771         }
772
773         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
774         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
775                                0, &cancels, count);
776         if (rc) {
777                 ptlrpc_request_free(req);
778                 RETURN(rc);
779         }
780
781         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
782         ptlrpc_at_set_req_timeout(req);
783
784         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
785                 oa->o_lcookie = *oti->oti_logcookies;
786         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
787         LASSERT(body);
788         lustre_set_wire_obdo(&body->oa, oa);
789
790         osc_pack_capa(req, body, (struct obd_capa *)capa);
791         ptlrpc_request_set_replen(req);
792
793         /* If osc_destory is for destroying the unlink orphan,
794          * sent from MDT to OST, which should not be blocked here,
795          * because the process might be triggered by ptlrpcd, and
796          * it is not good to block ptlrpcd thread (b=16006)*/
797         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
798                 req->rq_interpret_reply = osc_destroy_interpret;
799                 if (!osc_can_send_destroy(cli)) {
800                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
801                                                           NULL);
802
803                         /*
804                          * Wait until the number of on-going destroy RPCs drops
805                          * under max_rpc_in_flight
806                          */
807                         l_wait_event_exclusive(cli->cl_destroy_waitq,
808                                                osc_can_send_destroy(cli), &lwi);
809                 }
810         }
811
812         /* Do not wait for response */
813         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
814         RETURN(0);
815 }
816
817 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
818                                 long writing_bytes)
819 {
820         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
821
822         LASSERT(!(oa->o_valid & bits));
823
824         oa->o_valid |= bits;
825         client_obd_list_lock(&cli->cl_loi_list_lock);
826         oa->o_dirty = cli->cl_dirty;
827         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
828                      cli->cl_dirty_max)) {
829                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
830                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
831                 oa->o_undirty = 0;
832         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
833                             cfs_atomic_read(&obd_dirty_transit_pages) >
834                             (long)(obd_max_dirty_pages + 1))) {
835                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
836                  * not covered by a lock thus they may safely race and trip
837                  * this CERROR() unless we add in a small fudge factor (+1). */
838                 CERROR("dirty %d - %d > system dirty_max %d\n",
839                        cfs_atomic_read(&obd_dirty_pages),
840                        cfs_atomic_read(&obd_dirty_transit_pages),
841                        obd_max_dirty_pages);
842                 oa->o_undirty = 0;
843         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
844                 CERROR("dirty %lu - dirty_max %lu too big???\n",
845                        cli->cl_dirty, cli->cl_dirty_max);
846                 oa->o_undirty = 0;
847         } else {
848                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
849                                       CFS_PAGE_SHIFT)*
850                                      (cli->cl_max_rpcs_in_flight + 1);
851                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
852         }
853         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
854         oa->o_dropped = cli->cl_lost_grant;
855         cli->cl_lost_grant = 0;
856         client_obd_list_unlock(&cli->cl_loi_list_lock);
857         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
858                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
859
860 }
861
862 void osc_update_next_shrink(struct client_obd *cli)
863 {
864         cli->cl_next_shrink_grant =
865                 cfs_time_shift(cli->cl_grant_shrink_interval);
866         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
867                cli->cl_next_shrink_grant);
868 }
869
870 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
871 {
872         client_obd_list_lock(&cli->cl_loi_list_lock);
873         cli->cl_avail_grant += grant;
874         client_obd_list_unlock(&cli->cl_loi_list_lock);
875 }
876
877 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
878 {
879         if (body->oa.o_valid & OBD_MD_FLGRANT) {
880                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
881                 __osc_update_grant(cli, body->oa.o_grant);
882         }
883 }
884
885 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
886                               obd_count keylen, void *key, obd_count vallen,
887                               void *val, struct ptlrpc_request_set *set);
888
889 static int osc_shrink_grant_interpret(const struct lu_env *env,
890                                       struct ptlrpc_request *req,
891                                       void *aa, int rc)
892 {
893         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
894         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
895         struct ost_body *body;
896
897         if (rc != 0) {
898                 __osc_update_grant(cli, oa->o_grant);
899                 GOTO(out, rc);
900         }
901
902         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
903         LASSERT(body);
904         osc_update_grant(cli, body);
905 out:
906         OBDO_FREE(oa);
907         return rc;
908 }
909
910 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
911 {
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         oa->o_grant = cli->cl_avail_grant / 4;
914         cli->cl_avail_grant -= oa->o_grant;
915         client_obd_list_unlock(&cli->cl_loi_list_lock);
916         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
917                 oa->o_valid |= OBD_MD_FLFLAGS;
918                 oa->o_flags = 0;
919         }
920         oa->o_flags |= OBD_FL_SHRINK_GRANT;
921         osc_update_next_shrink(cli);
922 }
923
924 /* Shrink the current grant, either from some large amount to enough for a
925  * full set of in-flight RPCs, or if we have already shrunk to that limit
926  * then to enough for a single RPC.  This avoids keeping more grant than
927  * needed, and avoids shrinking the grant piecemeal. */
928 static int osc_shrink_grant(struct client_obd *cli)
929 {
930         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
931                              (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
932
933         client_obd_list_lock(&cli->cl_loi_list_lock);
934         if (cli->cl_avail_grant <= target_bytes)
935                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
936         client_obd_list_unlock(&cli->cl_loi_list_lock);
937
938         return osc_shrink_grant_to_target(cli, target_bytes);
939 }
940
941 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
942 {
943         int                     rc = 0;
944         struct ost_body        *body;
945         ENTRY;
946
947         client_obd_list_lock(&cli->cl_loi_list_lock);
948         /* Don't shrink if we are already above or below the desired limit
949          * We don't want to shrink below a single RPC, as that will negatively
950          * impact block allocation and long-term performance. */
951         if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
952                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
953
954         if (target_bytes >= cli->cl_avail_grant) {
955                 client_obd_list_unlock(&cli->cl_loi_list_lock);
956                 RETURN(0);
957         }
958         client_obd_list_unlock(&cli->cl_loi_list_lock);
959
960         OBD_ALLOC_PTR(body);
961         if (!body)
962                 RETURN(-ENOMEM);
963
964         osc_announce_cached(cli, &body->oa, 0);
965
966         client_obd_list_lock(&cli->cl_loi_list_lock);
967         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
968         cli->cl_avail_grant = target_bytes;
969         client_obd_list_unlock(&cli->cl_loi_list_lock);
970         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
971                 body->oa.o_valid |= OBD_MD_FLFLAGS;
972                 body->oa.o_flags = 0;
973         }
974         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
975         osc_update_next_shrink(cli);
976
977         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
978                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
979                                 sizeof(*body), body, NULL);
980         if (rc != 0)
981                 __osc_update_grant(cli, body->oa.o_grant);
982         OBD_FREE_PTR(body);
983         RETURN(rc);
984 }
985
986 static int osc_should_shrink_grant(struct client_obd *client)
987 {
988         cfs_time_t time = cfs_time_current();
989         cfs_time_t next_shrink = client->cl_next_shrink_grant;
990
991         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
992              OBD_CONNECT_GRANT_SHRINK) == 0)
993                 return 0;
994
995         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
996                 /* Get the current RPC size directly, instead of going via:
997                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
998                  * Keep comment here so that it can be found by searching. */
999                 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1000
1001                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1002                     client->cl_avail_grant > brw_size)
1003                         return 1;
1004                 else
1005                         osc_update_next_shrink(client);
1006         }
1007         return 0;
1008 }
1009
1010 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1011 {
1012         struct client_obd *client;
1013
1014         cfs_list_for_each_entry(client, &item->ti_obd_list,
1015                                 cl_grant_shrink_list) {
1016                 if (osc_should_shrink_grant(client))
1017                         osc_shrink_grant(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_add_shrink_grant(struct client_obd *client)
1023 {
1024         int rc;
1025
1026         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1027                                        TIMEOUT_GRANT,
1028                                        osc_grant_shrink_grant_cb, NULL,
1029                                        &client->cl_grant_shrink_list);
1030         if (rc) {
1031                 CERROR("add grant client %s error %d\n",
1032                         client->cl_import->imp_obd->obd_name, rc);
1033                 return rc;
1034         }
1035         CDEBUG(D_CACHE, "add grant client %s \n",
1036                client->cl_import->imp_obd->obd_name);
1037         osc_update_next_shrink(client);
1038         return 0;
1039 }
1040
1041 static int osc_del_shrink_grant(struct client_obd *client)
1042 {
1043         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1044                                          TIMEOUT_GRANT);
1045 }
1046
1047 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1048 {
1049         /*
1050          * ocd_grant is the total grant amount we're expect to hold: if we've
1051          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1052          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1053          *
1054          * race is tolerable here: if we're evicted, but imp_state already
1055          * left EVICTED state, then cl_dirty must be 0 already.
1056          */
1057         client_obd_list_lock(&cli->cl_loi_list_lock);
1058         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1059                 cli->cl_avail_grant = ocd->ocd_grant;
1060         else
1061                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1062
1063         if (cli->cl_avail_grant < 0) {
1064                 CWARN("%s: available grant < 0, the OSS is probably not running"
1065                       " with patch from bug20278 (%ld) \n",
1066                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1067                 /* workaround for 1.6 servers which do not have
1068                  * the patch from bug20278 */
1069                 cli->cl_avail_grant = ocd->ocd_grant;
1070         }
1071
1072         /* determine the appropriate chunk size used by osc_extent. */
1073         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1074         client_obd_list_unlock(&cli->cl_loi_list_lock);
1075
1076         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1077                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1078                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1079
1080         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1081             cfs_list_empty(&cli->cl_grant_shrink_list))
1082                 osc_add_shrink_grant(cli);
1083 }
1084
1085 /* We assume that the reason this OSC got a short read is because it read
1086  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088  * this stripe never got written at or beyond this stripe offset yet. */
1089 static void handle_short_read(int nob_read, obd_count page_count,
1090                               struct brw_page **pga)
1091 {
1092         char *ptr;
1093         int i = 0;
1094
1095         /* skip bytes read OK */
1096         while (nob_read > 0) {
1097                 LASSERT (page_count > 0);
1098
1099                 if (pga[i]->count > nob_read) {
1100                         /* EOF inside this page */
1101                         ptr = cfs_kmap(pga[i]->pg) +
1102                                 (pga[i]->off & ~CFS_PAGE_MASK);
1103                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1104                         cfs_kunmap(pga[i]->pg);
1105                         page_count--;
1106                         i++;
1107                         break;
1108                 }
1109
1110                 nob_read -= pga[i]->count;
1111                 page_count--;
1112                 i++;
1113         }
1114
1115         /* zero remaining pages */
1116         while (page_count-- > 0) {
1117                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1118                 memset(ptr, 0, pga[i]->count);
1119                 cfs_kunmap(pga[i]->pg);
1120                 i++;
1121         }
1122 }
1123
1124 static int check_write_rcs(struct ptlrpc_request *req,
1125                            int requested_nob, int niocount,
1126                            obd_count page_count, struct brw_page **pga)
1127 {
1128         int     i;
1129         __u32   *remote_rcs;
1130
1131         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132                                                   sizeof(*remote_rcs) *
1133                                                   niocount);
1134         if (remote_rcs == NULL) {
1135                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1136                 return(-EPROTO);
1137         }
1138
1139         /* return error if any niobuf was in error */
1140         for (i = 0; i < niocount; i++) {
1141                 if ((int)remote_rcs[i] < 0)
1142                         return(remote_rcs[i]);
1143
1144                 if (remote_rcs[i] != 0) {
1145                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1146                                 i, remote_rcs[i], req);
1147                         return(-EPROTO);
1148                 }
1149         }
1150
1151         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1152                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1153                        req->rq_bulk->bd_nob_transferred, requested_nob);
1154                 return(-EPROTO);
1155         }
1156
1157         return (0);
1158 }
1159
1160 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1161 {
1162         if (p1->flag != p2->flag) {
1163                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1164                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at http://bugs.whamcloud.com/\n",
1171                               p1->flag, p2->flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->off + p1->count == p2->off);
1177 }
1178
1179 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1180                                    struct brw_page **pga, int opc,
1181                                    cksum_type_t cksum_type)
1182 {
1183         __u32                           cksum;
1184         int                             i = 0;
1185         struct cfs_crypto_hash_desc     *hdesc;
1186         unsigned int                    bufsize;
1187         int                             err;
1188         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1189
1190         LASSERT(pg_count > 0);
1191
1192         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1193         if (IS_ERR(hdesc)) {
1194                 CERROR("Unable to initialize checksum hash %s\n",
1195                        cfs_crypto_hash_name(cfs_alg));
1196                 return PTR_ERR(hdesc);
1197         }
1198
1199         while (nob > 0 && pg_count > 0) {
1200                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1201
1202                 /* corrupt the data before we compute the checksum, to
1203                  * simulate an OST->client data error */
1204                 if (i == 0 && opc == OST_READ &&
1205                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1206                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1207                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1208                         memcpy(ptr + off, "bad1", min(4, nob));
1209                         cfs_kunmap(pga[i]->pg);
1210                 }
1211                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1212                                   pga[i]->off & ~CFS_PAGE_MASK,
1213                                   count);
1214                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1215                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1216
1217                 nob -= pga[i]->count;
1218                 pg_count--;
1219                 i++;
1220         }
1221
1222         bufsize = 4;
1223         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1224
1225         if (err)
1226                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1227
1228         /* For sending we only compute the wrong checksum instead
1229          * of corrupting the data so it is still correct on a redo */
1230         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231                 cksum++;
1232
1233         return cksum;
1234 }
1235
1236 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1237                                 struct lov_stripe_md *lsm, obd_count page_count,
1238                                 struct brw_page **pga,
1239                                 struct ptlrpc_request **reqp,
1240                                 struct obd_capa *ocapa, int reserve,
1241                                 int resend)
1242 {
1243         struct ptlrpc_request   *req;
1244         struct ptlrpc_bulk_desc *desc;
1245         struct ost_body         *body;
1246         struct obd_ioobj        *ioobj;
1247         struct niobuf_remote    *niobuf;
1248         int niocount, i, requested_nob, opc, rc;
1249         struct osc_brw_async_args *aa;
1250         struct req_capsule      *pill;
1251         struct brw_page *pg_prev;
1252
1253         ENTRY;
1254         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1255                 RETURN(-ENOMEM); /* Recoverable */
1256         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1257                 RETURN(-EINVAL); /* Fatal */
1258
1259         if ((cmd & OBD_BRW_WRITE) != 0) {
1260                 opc = OST_WRITE;
1261                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1262                                                 cli->cl_import->imp_rq_pool,
1263                                                 &RQF_OST_BRW_WRITE);
1264         } else {
1265                 opc = OST_READ;
1266                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1267         }
1268         if (req == NULL)
1269                 RETURN(-ENOMEM);
1270
1271         for (niocount = i = 1; i < page_count; i++) {
1272                 if (!can_merge_pages(pga[i - 1], pga[i]))
1273                         niocount++;
1274         }
1275
1276         pill = &req->rq_pill;
1277         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1278                              sizeof(*ioobj));
1279         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1280                              niocount * sizeof(*niobuf));
1281         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1282
1283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1284         if (rc) {
1285                 ptlrpc_request_free(req);
1286                 RETURN(rc);
1287         }
1288         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1289         ptlrpc_at_set_req_timeout(req);
1290         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1291          * retry logic */
1292         req->rq_no_retry_einprogress = 1;
1293
1294         desc = ptlrpc_prep_bulk_imp(req, page_count,
1295                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1296                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1297                 OST_BULK_PORTAL);
1298
1299         if (desc == NULL)
1300                 GOTO(out, rc = -ENOMEM);
1301         /* NB request now owns desc and will free it when it gets freed */
1302
1303         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1304         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1305         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1307
1308         lustre_set_wire_obdo(&body->oa, oa);
1309
1310         obdo_to_ioobj(oa, ioobj);
1311         ioobj->ioo_bufcnt = niocount;
1312         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1313          * that might be send for this request.  The actual number is decided
1314          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1315          * "max - 1" for old client compatibility sending "0", and also so the
1316          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1317         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1318         osc_pack_capa(req, body, ocapa);
1319         LASSERT(page_count > 0);
1320         pg_prev = pga[0];
1321         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1322                 struct brw_page *pg = pga[i];
1323                 int poff = pg->off & ~CFS_PAGE_MASK;
1324
1325                 LASSERT(pg->count > 0);
1326                 /* make sure there is no gap in the middle of page array */
1327                 LASSERTF(page_count == 1 ||
1328                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1329                           ergo(i > 0 && i < page_count - 1,
1330                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1331                           ergo(i == page_count - 1, poff == 0)),
1332                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1333                          i, page_count, pg, pg->off, pg->count);
1334 #ifdef __linux__
1335                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1336                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1337                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1338                          i, page_count,
1339                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1340                          pg_prev->pg, page_private(pg_prev->pg),
1341                          pg_prev->pg->index, pg_prev->off);
1342 #else
1343                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1344                          "i %d p_c %u\n", i, page_count);
1345 #endif
1346                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1347                         (pg->flag & OBD_BRW_SRVLOCK));
1348
1349                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1350                 requested_nob += pg->count;
1351
1352                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1353                         niobuf--;
1354                         niobuf->len += pg->count;
1355                 } else {
1356                         niobuf->offset = pg->off;
1357                         niobuf->len    = pg->count;
1358                         niobuf->flags  = pg->flag;
1359                 }
1360                 pg_prev = pg;
1361         }
1362
1363         LASSERTF((void *)(niobuf - niocount) ==
1364                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1365                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1366                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1367
1368         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1369         if (resend) {
1370                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1372                         body->oa.o_flags = 0;
1373                 }
1374                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1375         }
1376
1377         if (osc_should_shrink_grant(cli))
1378                 osc_shrink_grant_local(cli, &body->oa);
1379
1380         /* size[REQ_REC_OFF] still sizeof (*body) */
1381         if (opc == OST_WRITE) {
1382                 if (cli->cl_checksum &&
1383                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1384                         /* store cl_cksum_type in a local variable since
1385                          * it can be changed via lprocfs */
1386                         cksum_type_t cksum_type = cli->cl_cksum_type;
1387
1388                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1389                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1390                                 body->oa.o_flags = 0;
1391                         }
1392                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1393                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1395                                                              page_count, pga,
1396                                                              OST_WRITE,
1397                                                              cksum_type);
1398                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1399                                body->oa.o_cksum);
1400                         /* save this in 'oa', too, for later checking */
1401                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1402                         oa->o_flags |= cksum_type_pack(cksum_type);
1403                 } else {
1404                         /* clear out the checksum flag, in case this is a
1405                          * resend but cl_checksum is no longer set. b=11238 */
1406                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1407                 }
1408                 oa->o_cksum = body->oa.o_cksum;
1409                 /* 1 RC per niobuf */
1410                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1411                                      sizeof(__u32) * niocount);
1412         } else {
1413                 if (cli->cl_checksum &&
1414                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1415                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1416                                 body->oa.o_flags = 0;
1417                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1418                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1419                 }
1420         }
1421         ptlrpc_request_set_replen(req);
1422
1423         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1424         aa = ptlrpc_req_async_args(req);
1425         aa->aa_oa = oa;
1426         aa->aa_requested_nob = requested_nob;
1427         aa->aa_nio_count = niocount;
1428         aa->aa_page_count = page_count;
1429         aa->aa_resends = 0;
1430         aa->aa_ppga = pga;
1431         aa->aa_cli = cli;
1432         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1433         if (ocapa && reserve)
1434                 aa->aa_ocapa = capa_get(ocapa);
1435
1436         *reqp = req;
1437         RETURN(0);
1438
1439  out:
1440         ptlrpc_req_finished(req);
1441         RETURN(rc);
1442 }
1443
1444 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1445                                 __u32 client_cksum, __u32 server_cksum, int nob,
1446                                 obd_count page_count, struct brw_page **pga,
1447                                 cksum_type_t client_cksum_type)
1448 {
1449         __u32 new_cksum;
1450         char *msg;
1451         cksum_type_t cksum_type;
1452
1453         if (server_cksum == client_cksum) {
1454                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1455                 return 0;
1456         }
1457
1458         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1459                                        oa->o_flags : 0);
1460         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1461                                       cksum_type);
1462
1463         if (cksum_type != client_cksum_type)
1464                 msg = "the server did not use the checksum type specified in "
1465                       "the original request - likely a protocol problem";
1466         else if (new_cksum == server_cksum)
1467                 msg = "changed on the client after we checksummed it - "
1468                       "likely false positive due to mmap IO (bug 11742)";
1469         else if (new_cksum == client_cksum)
1470                 msg = "changed in transit before arrival at OST";
1471         else
1472                 msg = "changed in transit AND doesn't match the original - "
1473                       "likely false positive due to mmap IO (bug 11742)";
1474
1475         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1476                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1477                            msg, libcfs_nid2str(peer->nid),
1478                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1479                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1480                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1481                            oa->o_id,
1482                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1483                            pga[0]->off,
1484                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1485         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1486                "client csum now %x\n", client_cksum, client_cksum_type,
1487                server_cksum, cksum_type, new_cksum);
1488         return 1;
1489 }
1490
1491 /* Note rc enters this function as number of bytes transferred */
1492 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1493 {
1494         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1495         const lnet_process_id_t *peer =
1496                         &req->rq_import->imp_connection->c_peer;
1497         struct client_obd *cli = aa->aa_cli;
1498         struct ost_body *body;
1499         __u32 client_cksum = 0;
1500         ENTRY;
1501
1502         if (rc < 0 && rc != -EDQUOT) {
1503                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1504                 RETURN(rc);
1505         }
1506
1507         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1508         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1509         if (body == NULL) {
1510                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1511                 RETURN(-EPROTO);
1512         }
1513
1514         /* set/clear over quota flag for a uid/gid */
1515         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1516             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1517                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1518
1519                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1520                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1521                        body->oa.o_flags);
1522                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1523         }
1524
1525         osc_update_grant(cli, body);
1526
1527         if (rc < 0)
1528                 RETURN(rc);
1529
1530         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1531                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1532
1533         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1534                 if (rc > 0) {
1535                         CERROR("Unexpected +ve rc %d\n", rc);
1536                         RETURN(-EPROTO);
1537                 }
1538                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1539
1540                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1541                         RETURN(-EAGAIN);
1542
1543                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1544                     check_write_checksum(&body->oa, peer, client_cksum,
1545                                          body->oa.o_cksum, aa->aa_requested_nob,
1546                                          aa->aa_page_count, aa->aa_ppga,
1547                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1548                         RETURN(-EAGAIN);
1549
1550                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1551                                      aa->aa_page_count, aa->aa_ppga);
1552                 GOTO(out, rc);
1553         }
1554
1555         /* The rest of this function executes only for OST_READs */
1556
1557         /* if unwrap_bulk failed, return -EAGAIN to retry */
1558         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1559         if (rc < 0)
1560                 GOTO(out, rc = -EAGAIN);
1561
1562         if (rc > aa->aa_requested_nob) {
1563                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1564                        aa->aa_requested_nob);
1565                 RETURN(-EPROTO);
1566         }
1567
1568         if (rc != req->rq_bulk->bd_nob_transferred) {
1569                 CERROR ("Unexpected rc %d (%d transferred)\n",
1570                         rc, req->rq_bulk->bd_nob_transferred);
1571                 return (-EPROTO);
1572         }
1573
1574         if (rc < aa->aa_requested_nob)
1575                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1576
1577         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1578                 static int cksum_counter;
1579                 __u32      server_cksum = body->oa.o_cksum;
1580                 char      *via;
1581                 char      *router;
1582                 cksum_type_t cksum_type;
1583
1584                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1585                                                body->oa.o_flags : 0);
1586                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1587                                                  aa->aa_ppga, OST_READ,
1588                                                  cksum_type);
1589
1590                 if (peer->nid == req->rq_bulk->bd_sender) {
1591                         via = router = "";
1592                 } else {
1593                         via = " via ";
1594                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1595                 }
1596
1597                 if (server_cksum == ~0 && rc > 0) {
1598                         CERROR("Protocol error: server %s set the 'checksum' "
1599                                "bit, but didn't send a checksum.  Not fatal, "
1600                                "but please notify on http://bugs.whamcloud.com/\n",
1601                                libcfs_nid2str(peer->nid));
1602                 } else if (server_cksum != client_cksum) {
1603                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1604                                            "%s%s%s inode "DFID" object "
1605                                            LPU64"/"LPU64" extent "
1606                                            "["LPU64"-"LPU64"]\n",
1607                                            req->rq_import->imp_obd->obd_name,
1608                                            libcfs_nid2str(peer->nid),
1609                                            via, router,
1610                                            body->oa.o_valid & OBD_MD_FLFID ?
1611                                                 body->oa.o_parent_seq : (__u64)0,
1612                                            body->oa.o_valid & OBD_MD_FLFID ?
1613                                                 body->oa.o_parent_oid : 0,
1614                                            body->oa.o_valid & OBD_MD_FLFID ?
1615                                                 body->oa.o_parent_ver : 0,
1616                                            body->oa.o_id,
1617                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1618                                                 body->oa.o_seq : (__u64)0,
1619                                            aa->aa_ppga[0]->off,
1620                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1621                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1622                                                                         1);
1623                         CERROR("client %x, server %x, cksum_type %x\n",
1624                                client_cksum, server_cksum, cksum_type);
1625                         cksum_counter = 0;
1626                         aa->aa_oa->o_cksum = client_cksum;
1627                         rc = -EAGAIN;
1628                 } else {
1629                         cksum_counter++;
1630                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1631                         rc = 0;
1632                 }
1633         } else if (unlikely(client_cksum)) {
1634                 static int cksum_missed;
1635
1636                 cksum_missed++;
1637                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1638                         CERROR("Checksum %u requested from %s but not sent\n",
1639                                cksum_missed, libcfs_nid2str(peer->nid));
1640         } else {
1641                 rc = 0;
1642         }
1643 out:
1644         if (rc >= 0)
1645                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1646
1647         RETURN(rc);
1648 }
1649
1650 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1651                             struct lov_stripe_md *lsm,
1652                             obd_count page_count, struct brw_page **pga,
1653                             struct obd_capa *ocapa)
1654 {
1655         struct ptlrpc_request *req;
1656         int                    rc;
1657         cfs_waitq_t            waitq;
1658         int                    generation, resends = 0;
1659         struct l_wait_info     lwi;
1660
1661         ENTRY;
1662
1663         cfs_waitq_init(&waitq);
1664         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1665
1666 restart_bulk:
1667         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1668                                   page_count, pga, &req, ocapa, 0, resends);
1669         if (rc != 0)
1670                 return (rc);
1671
1672         if (resends) {
1673                 req->rq_generation_set = 1;
1674                 req->rq_import_generation = generation;
1675                 req->rq_sent = cfs_time_current_sec() + resends;
1676         }
1677
1678         rc = ptlrpc_queue_wait(req);
1679
1680         if (rc == -ETIMEDOUT && req->rq_resend) {
1681                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1682                 ptlrpc_req_finished(req);
1683                 goto restart_bulk;
1684         }
1685
1686         rc = osc_brw_fini_request(req, rc);
1687
1688         ptlrpc_req_finished(req);
1689         /* When server return -EINPROGRESS, client should always retry
1690          * regardless of the number of times the bulk was resent already.*/
1691         if (osc_recoverable_error(rc)) {
1692                 resends++;
1693                 if (rc != -EINPROGRESS &&
1694                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1695                         CERROR("%s: too many resend retries for object: "
1696                                ""LPU64":"LPU64", rc = %d.\n",
1697                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1698                         goto out;
1699                 }
1700                 if (generation !=
1701                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1702                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1703                                ""LPU64":"LPU64", rc = %d.\n",
1704                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1705                         goto out;
1706                 }
1707
1708                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1709                                        NULL);
1710                 l_wait_event(waitq, 0, &lwi);
1711
1712                 goto restart_bulk;
1713         }
1714 out:
1715         if (rc == -EAGAIN || rc == -EINPROGRESS)
1716                 rc = -EIO;
1717         RETURN (rc);
1718 }
1719
1720 static int osc_brw_redo_request(struct ptlrpc_request *request,
1721                                 struct osc_brw_async_args *aa, int rc)
1722 {
1723         struct ptlrpc_request *new_req;
1724         struct osc_brw_async_args *new_aa;
1725         struct osc_async_page *oap;
1726         ENTRY;
1727
1728         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1729                   "redo for recoverable error %d", rc);
1730
1731         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1732                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1733                                   aa->aa_cli, aa->aa_oa,
1734                                   NULL /* lsm unused by osc currently */,
1735                                   aa->aa_page_count, aa->aa_ppga,
1736                                   &new_req, aa->aa_ocapa, 0, 1);
1737         if (rc)
1738                 RETURN(rc);
1739
1740         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1741                 if (oap->oap_request != NULL) {
1742                         LASSERTF(request == oap->oap_request,
1743                                  "request %p != oap_request %p\n",
1744                                  request, oap->oap_request);
1745                         if (oap->oap_interrupted) {
1746                                 ptlrpc_req_finished(new_req);
1747                                 RETURN(-EINTR);
1748                         }
1749                 }
1750         }
1751         /* New request takes over pga and oaps from old request.
1752          * Note that copying a list_head doesn't work, need to move it... */
1753         aa->aa_resends++;
1754         new_req->rq_interpret_reply = request->rq_interpret_reply;
1755         new_req->rq_async_args = request->rq_async_args;
1756         /* cap resend delay to the current request timeout, this is similar to
1757          * what ptlrpc does (see after_reply()) */
1758         if (aa->aa_resends > new_req->rq_timeout)
1759                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1760         else
1761                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1762         new_req->rq_generation_set = 1;
1763         new_req->rq_import_generation = request->rq_import_generation;
1764
1765         new_aa = ptlrpc_req_async_args(new_req);
1766
1767         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1768         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1769         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1770         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1771         new_aa->aa_resends = aa->aa_resends;
1772
1773         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1774                 if (oap->oap_request) {
1775                         ptlrpc_req_finished(oap->oap_request);
1776                         oap->oap_request = ptlrpc_request_addref(new_req);
1777                 }
1778         }
1779
1780         new_aa->aa_ocapa = aa->aa_ocapa;
1781         aa->aa_ocapa = NULL;
1782
1783         /* XXX: This code will run into problem if we're going to support
1784          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1785          * and wait for all of them to be finished. We should inherit request
1786          * set from old request. */
1787         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1788
1789         DEBUG_REQ(D_INFO, new_req, "new request");
1790         RETURN(0);
1791 }
1792
1793 /*
1794  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1795  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1796  * fine for our small page arrays and doesn't require allocation.  its an
1797  * insertion sort that swaps elements that are strides apart, shrinking the
1798  * stride down until its '1' and the array is sorted.
1799  */
1800 static void sort_brw_pages(struct brw_page **array, int num)
1801 {
1802         int stride, i, j;
1803         struct brw_page *tmp;
1804
1805         if (num == 1)
1806                 return;
1807         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1808                 ;
1809
1810         do {
1811                 stride /= 3;
1812                 for (i = stride ; i < num ; i++) {
1813                         tmp = array[i];
1814                         j = i;
1815                         while (j >= stride && array[j - stride]->off > tmp->off) {
1816                                 array[j] = array[j - stride];
1817                                 j -= stride;
1818                         }
1819                         array[j] = tmp;
1820                 }
1821         } while (stride > 1);
1822 }
1823
1824 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1825 {
1826         int count = 1;
1827         int offset;
1828         int i = 0;
1829
1830         LASSERT (pages > 0);
1831         offset = pg[i]->off & ~CFS_PAGE_MASK;
1832
1833         for (;;) {
1834                 pages--;
1835                 if (pages == 0)         /* that's all */
1836                         return count;
1837
1838                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1839                         return count;   /* doesn't end on page boundary */
1840
1841                 i++;
1842                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1843                 if (offset != 0)        /* doesn't start on page boundary */
1844                         return count;
1845
1846                 count++;
1847         }
1848 }
1849
1850 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1851 {
1852         struct brw_page **ppga;
1853         int i;
1854
1855         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1856         if (ppga == NULL)
1857                 return NULL;
1858
1859         for (i = 0; i < count; i++)
1860                 ppga[i] = pga + i;
1861         return ppga;
1862 }
1863
1864 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1865 {
1866         LASSERT(ppga != NULL);
1867         OBD_FREE(ppga, sizeof(*ppga) * count);
1868 }
1869
1870 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1871                    obd_count page_count, struct brw_page *pga,
1872                    struct obd_trans_info *oti)
1873 {
1874         struct obdo *saved_oa = NULL;
1875         struct brw_page **ppga, **orig;
1876         struct obd_import *imp = class_exp2cliimp(exp);
1877         struct client_obd *cli;
1878         int rc, page_count_orig;
1879         ENTRY;
1880
1881         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1882         cli = &imp->imp_obd->u.cli;
1883
1884         if (cmd & OBD_BRW_CHECK) {
1885                 /* The caller just wants to know if there's a chance that this
1886                  * I/O can succeed */
1887
1888                 if (imp->imp_invalid)
1889                         RETURN(-EIO);
1890                 RETURN(0);
1891         }
1892
1893         /* test_brw with a failed create can trip this, maybe others. */
1894         LASSERT(cli->cl_max_pages_per_rpc);
1895
1896         rc = 0;
1897
1898         orig = ppga = osc_build_ppga(pga, page_count);
1899         if (ppga == NULL)
1900                 RETURN(-ENOMEM);
1901         page_count_orig = page_count;
1902
1903         sort_brw_pages(ppga, page_count);
1904         while (page_count) {
1905                 obd_count pages_per_brw;
1906
1907                 if (page_count > cli->cl_max_pages_per_rpc)
1908                         pages_per_brw = cli->cl_max_pages_per_rpc;
1909                 else
1910                         pages_per_brw = page_count;
1911
1912                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1913
1914                 if (saved_oa != NULL) {
1915                         /* restore previously saved oa */
1916                         *oinfo->oi_oa = *saved_oa;
1917                 } else if (page_count > pages_per_brw) {
1918                         /* save a copy of oa (brw will clobber it) */
1919                         OBDO_ALLOC(saved_oa);
1920                         if (saved_oa == NULL)
1921                                 GOTO(out, rc = -ENOMEM);
1922                         *saved_oa = *oinfo->oi_oa;
1923                 }
1924
1925                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1926                                       pages_per_brw, ppga, oinfo->oi_capa);
1927
1928                 if (rc != 0)
1929                         break;
1930
1931                 page_count -= pages_per_brw;
1932                 ppga += pages_per_brw;
1933         }
1934
1935 out:
1936         osc_release_ppga(orig, page_count_orig);
1937
1938         if (saved_oa != NULL)
1939                 OBDO_FREE(saved_oa);
1940
1941         RETURN(rc);
1942 }
1943
1944 static int brw_interpret(const struct lu_env *env,
1945                          struct ptlrpc_request *req, void *data, int rc)
1946 {
1947         struct osc_brw_async_args *aa = data;
1948         struct osc_extent *ext;
1949         struct osc_extent *tmp;
1950         struct cl_object  *obj = NULL;
1951         struct client_obd *cli = aa->aa_cli;
1952         ENTRY;
1953
1954         rc = osc_brw_fini_request(req, rc);
1955         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1956         /* When server return -EINPROGRESS, client should always retry
1957          * regardless of the number of times the bulk was resent already. */
1958         if (osc_recoverable_error(rc)) {
1959                 if (req->rq_import_generation !=
1960                     req->rq_import->imp_generation) {
1961                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1962                                ""LPU64":"LPU64", rc = %d.\n",
1963                                req->rq_import->imp_obd->obd_name,
1964                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1965                 } else if (rc == -EINPROGRESS ||
1966                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1967                         rc = osc_brw_redo_request(req, aa, rc);
1968                 } else {
1969                         CERROR("%s: too many resent retries for object: "
1970                                ""LPU64":"LPU64", rc = %d.\n",
1971                                req->rq_import->imp_obd->obd_name,
1972                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1973                 }
1974
1975                 if (rc == 0)
1976                         RETURN(0);
1977                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1978                         rc = -EIO;
1979         }
1980
1981         if (aa->aa_ocapa) {
1982                 capa_put(aa->aa_ocapa);
1983                 aa->aa_ocapa = NULL;
1984         }
1985
1986         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1987                 if (obj == NULL && rc == 0) {
1988                         obj = osc2cl(ext->oe_obj);
1989                         cl_object_get(obj);
1990                 }
1991
1992                 cfs_list_del_init(&ext->oe_link);
1993                 osc_extent_finish(env, ext, 1, rc);
1994         }
1995         LASSERT(cfs_list_empty(&aa->aa_exts));
1996         LASSERT(cfs_list_empty(&aa->aa_oaps));
1997
1998         if (obj != NULL) {
1999                 struct obdo *oa = aa->aa_oa;
2000                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
2001                 unsigned long valid = 0;
2002
2003                 LASSERT(rc == 0);
2004                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2005                         attr->cat_blocks = oa->o_blocks;
2006                         valid |= CAT_BLOCKS;
2007                 }
2008                 if (oa->o_valid & OBD_MD_FLMTIME) {
2009                         attr->cat_mtime = oa->o_mtime;
2010                         valid |= CAT_MTIME;
2011                 }
2012                 if (oa->o_valid & OBD_MD_FLATIME) {
2013                         attr->cat_atime = oa->o_atime;
2014                         valid |= CAT_ATIME;
2015                 }
2016                 if (oa->o_valid & OBD_MD_FLCTIME) {
2017                         attr->cat_ctime = oa->o_ctime;
2018                         valid |= CAT_CTIME;
2019                 }
2020                 if (valid != 0) {
2021                         cl_object_attr_lock(obj);
2022                         cl_object_attr_set(env, obj, attr, valid);
2023                         cl_object_attr_unlock(obj);
2024                 }
2025                 cl_object_put(env, obj);
2026         }
2027         OBDO_FREE(aa->aa_oa);
2028
2029         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2030                           req->rq_bulk->bd_nob_transferred);
2031         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2032         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2033
2034         client_obd_list_lock(&cli->cl_loi_list_lock);
2035         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2036          * is called so we know whether to go to sync BRWs or wait for more
2037          * RPCs to complete */
2038         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2039                 cli->cl_w_in_flight--;
2040         else
2041                 cli->cl_r_in_flight--;
2042         osc_wake_cache_waiters(cli);
2043         client_obd_list_unlock(&cli->cl_loi_list_lock);
2044
2045         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2046         RETURN(rc);
2047 }
2048
2049 /**
2050  * Build an RPC by the list of extent @ext_list. The caller must ensure
2051  * that the total pages in this list are NOT over max pages per RPC.
2052  * Extents in the list must be in OES_RPC state.
2053  */
2054 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2055                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2056 {
2057         struct ptlrpc_request *req = NULL;
2058         struct osc_extent *ext;
2059         CFS_LIST_HEAD(rpc_list);
2060         struct brw_page **pga = NULL;
2061         struct osc_brw_async_args *aa = NULL;
2062         struct obdo *oa = NULL;
2063         struct osc_async_page *oap;
2064         struct osc_async_page *tmp;
2065         struct cl_req *clerq = NULL;
2066         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2067         struct ldlm_lock *lock = NULL;
2068         struct cl_req_attr crattr;
2069         obd_off starting_offset = OBD_OBJECT_EOF;
2070         obd_off ending_offset = 0;
2071         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2072
2073         ENTRY;
2074         LASSERT(!cfs_list_empty(ext_list));
2075
2076         /* add pages into rpc_list to build BRW rpc */
2077         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2078                 LASSERT(ext->oe_state == OES_RPC);
2079                 mem_tight |= ext->oe_memalloc;
2080                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2081                         ++page_count;
2082                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2083                         if (starting_offset > oap->oap_obj_off)
2084                                 starting_offset = oap->oap_obj_off;
2085                         else
2086                                 LASSERT(oap->oap_page_off == 0);
2087                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2088                                 ending_offset = oap->oap_obj_off +
2089                                                 oap->oap_count;
2090                         else
2091                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2092                                         CFS_PAGE_SIZE);
2093                 }
2094         }
2095
2096         if (mem_tight)
2097                 mpflag = cfs_memory_pressure_get_and_set();
2098
2099         memset(&crattr, 0, sizeof crattr);
2100         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2101         if (pga == NULL)
2102                 GOTO(out, rc = -ENOMEM);
2103
2104         OBDO_ALLOC(oa);
2105         if (oa == NULL)
2106                 GOTO(out, rc = -ENOMEM);
2107
2108         i = 0;
2109         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2110                 struct cl_page *page = oap2cl_page(oap);
2111                 if (clerq == NULL) {
2112                         clerq = cl_req_alloc(env, page, crt,
2113                                              1 /* only 1-object rpcs for
2114                                                 * now */);
2115                         if (IS_ERR(clerq))
2116                                 GOTO(out, rc = PTR_ERR(clerq));
2117                         lock = oap->oap_ldlm_lock;
2118                 }
2119                 if (mem_tight)
2120                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2121                 pga[i] = &oap->oap_brw_page;
2122                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2123                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2124                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2125                 i++;
2126                 cl_req_page_add(env, clerq, page);
2127         }
2128
2129         /* always get the data for the obdo for the rpc */
2130         LASSERT(clerq != NULL);
2131         crattr.cra_oa = oa;
2132         crattr.cra_capa = NULL;
2133         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2134         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2135         if (lock) {
2136                 oa->o_handle = lock->l_remote_handle;
2137                 oa->o_valid |= OBD_MD_FLHANDLE;
2138         }
2139
2140         rc = cl_req_prep(env, clerq);
2141         if (rc != 0) {
2142                 CERROR("cl_req_prep failed: %d\n", rc);
2143                 GOTO(out, rc);
2144         }
2145
2146         sort_brw_pages(pga, page_count);
2147         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2148                         pga, &req, crattr.cra_capa, 1, 0);
2149         if (rc != 0) {
2150                 CERROR("prep_req failed: %d\n", rc);
2151                 GOTO(out, rc);
2152         }
2153
2154         req->rq_interpret_reply = brw_interpret;
2155         if (mem_tight != 0)
2156                 req->rq_memalloc = 1;
2157
2158         /* Need to update the timestamps after the request is built in case
2159          * we race with setattr (locally or in queue at OST).  If OST gets
2160          * later setattr before earlier BRW (as determined by the request xid),
2161          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2162          * way to do this in a single call.  bug 10150 */
2163         cl_req_attr_set(env, clerq, &crattr,
2164                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2165
2166         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2167
2168         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2169         aa = ptlrpc_req_async_args(req);
2170         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2171         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2172         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2173         cfs_list_splice_init(ext_list, &aa->aa_exts);
2174         aa->aa_clerq = clerq;
2175
2176         /* queued sync pages can be torn down while the pages
2177          * were between the pending list and the rpc */
2178         tmp = NULL;
2179         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2180                 /* only one oap gets a request reference */
2181                 if (tmp == NULL)
2182                         tmp = oap;
2183                 if (oap->oap_interrupted && !req->rq_intr) {
2184                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2185                                         oap, req);
2186                         ptlrpc_mark_interrupted(req);
2187                 }
2188         }
2189         if (tmp != NULL)
2190                 tmp->oap_request = ptlrpc_request_addref(req);
2191
2192         client_obd_list_lock(&cli->cl_loi_list_lock);
2193         starting_offset >>= CFS_PAGE_SHIFT;
2194         if (cmd == OBD_BRW_READ) {
2195                 cli->cl_r_in_flight++;
2196                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2197                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2198                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2199                                       starting_offset + 1);
2200         } else {
2201                 cli->cl_w_in_flight++;
2202                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2203                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2204                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2205                                       starting_offset + 1);
2206         }
2207         client_obd_list_unlock(&cli->cl_loi_list_lock);
2208
2209         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2210                   page_count, aa, cli->cl_r_in_flight,
2211                   cli->cl_w_in_flight);
2212
2213         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2214          * see which CPU/NUMA node the majority of pages were allocated
2215          * on, and try to assign the async RPC to the CPU core
2216          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2217          *
2218          * But on the other hand, we expect that multiple ptlrpcd
2219          * threads and the initial write sponsor can run in parallel,
2220          * especially when data checksum is enabled, which is CPU-bound
2221          * operation and single ptlrpcd thread cannot process in time.
2222          * So more ptlrpcd threads sharing BRW load
2223          * (with PDL_POLICY_ROUND) seems better.
2224          */
2225         ptlrpcd_add_req(req, pol, -1);
2226         rc = 0;
2227         EXIT;
2228
2229 out:
2230         if (mem_tight != 0)
2231                 cfs_memory_pressure_restore(mpflag);
2232
2233         capa_put(crattr.cra_capa);
2234         if (rc != 0) {
2235                 LASSERT(req == NULL);
2236
2237                 if (oa)
2238                         OBDO_FREE(oa);
2239                 if (pga)
2240                         OBD_FREE(pga, sizeof(*pga) * page_count);
2241                 /* this should happen rarely and is pretty bad, it makes the
2242                  * pending list not follow the dirty order */
2243                 while (!cfs_list_empty(ext_list)) {
2244                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2245                                              oe_link);
2246                         cfs_list_del_init(&ext->oe_link);
2247                         osc_extent_finish(env, ext, 0, rc);
2248                 }
2249                 if (clerq && !IS_ERR(clerq))
2250                         cl_req_completion(env, clerq, rc);
2251         }
2252         RETURN(rc);
2253 }
2254
2255 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2256                                         struct ldlm_enqueue_info *einfo)
2257 {
2258         void *data = einfo->ei_cbdata;
2259         int set = 0;
2260
2261         LASSERT(lock != NULL);
2262         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2263         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2264         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2265         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2266
2267         lock_res_and_lock(lock);
2268         spin_lock(&osc_ast_guard);
2269
2270         if (lock->l_ast_data == NULL)
2271                 lock->l_ast_data = data;
2272         if (lock->l_ast_data == data)
2273                 set = 1;
2274
2275         spin_unlock(&osc_ast_guard);
2276         unlock_res_and_lock(lock);
2277
2278         return set;
2279 }
2280
2281 static int osc_set_data_with_check(struct lustre_handle *lockh,
2282                                    struct ldlm_enqueue_info *einfo)
2283 {
2284         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2285         int set = 0;
2286
2287         if (lock != NULL) {
2288                 set = osc_set_lock_data_with_check(lock, einfo);
2289                 LDLM_LOCK_PUT(lock);
2290         } else
2291                 CERROR("lockh %p, data %p - client evicted?\n",
2292                        lockh, einfo->ei_cbdata);
2293         return set;
2294 }
2295
2296 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2297                              ldlm_iterator_t replace, void *data)
2298 {
2299         struct ldlm_res_id res_id;
2300         struct obd_device *obd = class_exp2obd(exp);
2301
2302         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2303         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2304         return 0;
2305 }
2306
2307 /* find any ldlm lock of the inode in osc
2308  * return 0    not find
2309  *        1    find one
2310  *      < 0    error */
2311 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2312                            ldlm_iterator_t replace, void *data)
2313 {
2314         struct ldlm_res_id res_id;
2315         struct obd_device *obd = class_exp2obd(exp);
2316         int rc = 0;
2317
2318         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2319         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2320         if (rc == LDLM_ITER_STOP)
2321                 return(1);
2322         if (rc == LDLM_ITER_CONTINUE)
2323                 return(0);
2324         return(rc);
2325 }
2326
2327 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2328                             obd_enqueue_update_f upcall, void *cookie,
2329                             __u64 *flags, int agl, int rc)
2330 {
2331         int intent = *flags & LDLM_FL_HAS_INTENT;
2332         ENTRY;
2333
2334         if (intent) {
2335                 /* The request was created before ldlm_cli_enqueue call. */
2336                 if (rc == ELDLM_LOCK_ABORTED) {
2337                         struct ldlm_reply *rep;
2338                         rep = req_capsule_server_get(&req->rq_pill,
2339                                                      &RMF_DLM_REP);
2340
2341                         LASSERT(rep != NULL);
2342                         if (rep->lock_policy_res1)
2343                                 rc = rep->lock_policy_res1;
2344                 }
2345         }
2346
2347         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2348             (rc == 0)) {
2349                 *flags |= LDLM_FL_LVB_READY;
2350                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2351                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2352         }
2353
2354         /* Call the update callback. */
2355         rc = (*upcall)(cookie, rc);
2356         RETURN(rc);
2357 }
2358
2359 static int osc_enqueue_interpret(const struct lu_env *env,
2360                                  struct ptlrpc_request *req,
2361                                  struct osc_enqueue_args *aa, int rc)
2362 {
2363         struct ldlm_lock *lock;
2364         struct lustre_handle handle;
2365         __u32 mode;
2366         struct ost_lvb *lvb;
2367         __u32 lvb_len;
2368         __u64 *flags = aa->oa_flags;
2369
2370         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2371          * might be freed anytime after lock upcall has been called. */
2372         lustre_handle_copy(&handle, aa->oa_lockh);
2373         mode = aa->oa_ei->ei_mode;
2374
2375         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2376          * be valid. */
2377         lock = ldlm_handle2lock(&handle);
2378
2379         /* Take an additional reference so that a blocking AST that
2380          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2381          * to arrive after an upcall has been executed by
2382          * osc_enqueue_fini(). */
2383         ldlm_lock_addref(&handle, mode);
2384
2385         /* Let CP AST to grant the lock first. */
2386         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2387
2388         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2389                 lvb = NULL;
2390                 lvb_len = 0;
2391         } else {
2392                 lvb = aa->oa_lvb;
2393                 lvb_len = sizeof(*aa->oa_lvb);
2394         }
2395
2396         /* Complete obtaining the lock procedure. */
2397         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2398                                    mode, flags, lvb, lvb_len, &handle, rc);
2399         /* Complete osc stuff. */
2400         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2401                               flags, aa->oa_agl, rc);
2402
2403         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2404
2405         /* Release the lock for async request. */
2406         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2407                 /*
2408                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2409                  * not already released by
2410                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2411                  */
2412                 ldlm_lock_decref(&handle, mode);
2413
2414         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2415                  aa->oa_lockh, req, aa);
2416         ldlm_lock_decref(&handle, mode);
2417         LDLM_LOCK_PUT(lock);
2418         return rc;
2419 }
2420
2421 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2422                         struct lov_oinfo *loi, int flags,
2423                         struct ost_lvb *lvb, __u32 mode, int rc)
2424 {
2425         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2426
2427         if (rc == ELDLM_OK) {
2428                 __u64 tmp;
2429
2430                 LASSERT(lock != NULL);
2431                 loi->loi_lvb = *lvb;
2432                 tmp = loi->loi_lvb.lvb_size;
2433                 /* Extend KMS up to the end of this lock and no further
2434                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2435                 if (tmp > lock->l_policy_data.l_extent.end)
2436                         tmp = lock->l_policy_data.l_extent.end + 1;
2437                 if (tmp >= loi->loi_kms) {
2438                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2439                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2440                         loi_kms_set(loi, tmp);
2441                 } else {
2442                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2443                                    LPU64"; leaving kms="LPU64", end="LPU64,
2444                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2445                                    lock->l_policy_data.l_extent.end);
2446                 }
2447                 ldlm_lock_allow_match(lock);
2448         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2449                 LASSERT(lock != NULL);
2450                 loi->loi_lvb = *lvb;
2451                 ldlm_lock_allow_match(lock);
2452                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2453                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2454                 rc = ELDLM_OK;
2455         }
2456
2457         if (lock != NULL) {
2458                 if (rc != ELDLM_OK)
2459                         ldlm_lock_fail_match(lock);
2460
2461                 LDLM_LOCK_PUT(lock);
2462         }
2463 }
2464 EXPORT_SYMBOL(osc_update_enqueue);
2465
2466 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2467
2468 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2469  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2470  * other synchronous requests, however keeping some locks and trying to obtain
2471  * others may take a considerable amount of time in a case of ost failure; and
2472  * when other sync requests do not get released lock from a client, the client
2473  * is excluded from the cluster -- such scenarious make the life difficult, so
2474  * release locks just after they are obtained. */
2475 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2476                      __u64 *flags, ldlm_policy_data_t *policy,
2477                      struct ost_lvb *lvb, int kms_valid,
2478                      obd_enqueue_update_f upcall, void *cookie,
2479                      struct ldlm_enqueue_info *einfo,
2480                      struct lustre_handle *lockh,
2481                      struct ptlrpc_request_set *rqset, int async, int agl)
2482 {
2483         struct obd_device *obd = exp->exp_obd;
2484         struct ptlrpc_request *req = NULL;
2485         int intent = *flags & LDLM_FL_HAS_INTENT;
2486         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2487         ldlm_mode_t mode;
2488         int rc;
2489         ENTRY;
2490
2491         /* Filesystem lock extents are extended to page boundaries so that
2492          * dealing with the page cache is a little smoother.  */
2493         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2494         policy->l_extent.end |= ~CFS_PAGE_MASK;
2495
2496         /*
2497          * kms is not valid when either object is completely fresh (so that no
2498          * locks are cached), or object was evicted. In the latter case cached
2499          * lock cannot be used, because it would prime inode state with
2500          * potentially stale LVB.
2501          */
2502         if (!kms_valid)
2503                 goto no_match;
2504
2505         /* Next, search for already existing extent locks that will cover us */
2506         /* If we're trying to read, we also search for an existing PW lock.  The
2507          * VFS and page cache already protect us locally, so lots of readers/
2508          * writers can share a single PW lock.
2509          *
2510          * There are problems with conversion deadlocks, so instead of
2511          * converting a read lock to a write lock, we'll just enqueue a new
2512          * one.
2513          *
2514          * At some point we should cancel the read lock instead of making them
2515          * send us a blocking callback, but there are problems with canceling
2516          * locks out from other users right now, too. */
2517         mode = einfo->ei_mode;
2518         if (einfo->ei_mode == LCK_PR)
2519                 mode |= LCK_PW;
2520         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2521                                einfo->ei_type, policy, mode, lockh, 0);
2522         if (mode) {
2523                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2524
2525                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2526                         /* For AGL, if enqueue RPC is sent but the lock is not
2527                          * granted, then skip to process this strpe.
2528                          * Return -ECANCELED to tell the caller. */
2529                         ldlm_lock_decref(lockh, mode);
2530                         LDLM_LOCK_PUT(matched);
2531                         RETURN(-ECANCELED);
2532                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2533                         *flags |= LDLM_FL_LVB_READY;
2534                         /* addref the lock only if not async requests and PW
2535                          * lock is matched whereas we asked for PR. */
2536                         if (!rqset && einfo->ei_mode != mode)
2537                                 ldlm_lock_addref(lockh, LCK_PR);
2538                         if (intent) {
2539                                 /* I would like to be able to ASSERT here that
2540                                  * rss <= kms, but I can't, for reasons which
2541                                  * are explained in lov_enqueue() */
2542                         }
2543
2544                         /* We already have a lock, and it's referenced.
2545                          *
2546                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2547                          * AGL upcall may change it to CLS_HELD directly. */
2548                         (*upcall)(cookie, ELDLM_OK);
2549
2550                         if (einfo->ei_mode != mode)
2551                                 ldlm_lock_decref(lockh, LCK_PW);
2552                         else if (rqset)
2553                                 /* For async requests, decref the lock. */
2554                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2555                         LDLM_LOCK_PUT(matched);
2556                         RETURN(ELDLM_OK);
2557                 } else {
2558                         ldlm_lock_decref(lockh, mode);
2559                         LDLM_LOCK_PUT(matched);
2560                 }
2561         }
2562
2563  no_match:
2564         if (intent) {
2565                 CFS_LIST_HEAD(cancels);
2566                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2567                                            &RQF_LDLM_ENQUEUE_LVB);
2568                 if (req == NULL)
2569                         RETURN(-ENOMEM);
2570
2571                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2572                 if (rc) {
2573                         ptlrpc_request_free(req);
2574                         RETURN(rc);
2575                 }
2576
2577                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2578                                      sizeof *lvb);
2579                 ptlrpc_request_set_replen(req);
2580         }
2581
2582         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2583         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2584
2585         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2586                               sizeof(*lvb), LVB_T_OST, lockh, async);
2587         if (rqset) {
2588                 if (!rc) {
2589                         struct osc_enqueue_args *aa;
2590                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2591                         aa = ptlrpc_req_async_args(req);
2592                         aa->oa_ei = einfo;
2593                         aa->oa_exp = exp;
2594                         aa->oa_flags  = flags;
2595                         aa->oa_upcall = upcall;
2596                         aa->oa_cookie = cookie;
2597                         aa->oa_lvb    = lvb;
2598                         aa->oa_lockh  = lockh;
2599                         aa->oa_agl    = !!agl;
2600
2601                         req->rq_interpret_reply =
2602                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2603                         if (rqset == PTLRPCD_SET)
2604                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2605                         else
2606                                 ptlrpc_set_add_req(rqset, req);
2607                 } else if (intent) {
2608                         ptlrpc_req_finished(req);
2609                 }
2610                 RETURN(rc);
2611         }
2612
2613         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2614         if (intent)
2615                 ptlrpc_req_finished(req);
2616
2617         RETURN(rc);
2618 }
2619
2620 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2621                        struct ldlm_enqueue_info *einfo,
2622                        struct ptlrpc_request_set *rqset)
2623 {
2624         struct ldlm_res_id res_id;
2625         int rc;
2626         ENTRY;
2627
2628         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2629         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2630                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2631                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2632                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2633                               rqset, rqset != NULL, 0);
2634         RETURN(rc);
2635 }
2636
2637 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2638                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2639                    int *flags, void *data, struct lustre_handle *lockh,
2640                    int unref)
2641 {
2642         struct obd_device *obd = exp->exp_obd;
2643         int lflags = *flags;
2644         ldlm_mode_t rc;
2645         ENTRY;
2646
2647         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2648                 RETURN(-EIO);
2649
2650         /* Filesystem lock extents are extended to page boundaries so that
2651          * dealing with the page cache is a little smoother */
2652         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2653         policy->l_extent.end |= ~CFS_PAGE_MASK;
2654
2655         /* Next, search for already existing extent locks that will cover us */
2656         /* If we're trying to read, we also search for an existing PW lock.  The
2657          * VFS and page cache already protect us locally, so lots of readers/
2658          * writers can share a single PW lock. */
2659         rc = mode;
2660         if (mode == LCK_PR)
2661                 rc |= LCK_PW;
2662         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2663                              res_id, type, policy, rc, lockh, unref);
2664         if (rc) {
2665                 if (data != NULL) {
2666                         if (!osc_set_data_with_check(lockh, data)) {
2667                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2668                                         ldlm_lock_decref(lockh, rc);
2669                                 RETURN(0);
2670                         }
2671                 }
2672                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2673                         ldlm_lock_addref(lockh, LCK_PR);
2674                         ldlm_lock_decref(lockh, LCK_PW);
2675                 }
2676                 RETURN(rc);
2677         }
2678         RETURN(rc);
2679 }
2680
2681 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2682 {
2683         ENTRY;
2684
2685         if (unlikely(mode == LCK_GROUP))
2686                 ldlm_lock_decref_and_cancel(lockh, mode);
2687         else
2688                 ldlm_lock_decref(lockh, mode);
2689
2690         RETURN(0);
2691 }
2692
2693 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2694                       __u32 mode, struct lustre_handle *lockh)
2695 {
2696         ENTRY;
2697         RETURN(osc_cancel_base(lockh, mode));
2698 }
2699
2700 static int osc_cancel_unused(struct obd_export *exp,
2701                              struct lov_stripe_md *lsm,
2702                              ldlm_cancel_flags_t flags,
2703                              void *opaque)
2704 {
2705         struct obd_device *obd = class_exp2obd(exp);
2706         struct ldlm_res_id res_id, *resp = NULL;
2707
2708         if (lsm != NULL) {
2709                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2710                 resp = &res_id;
2711         }
2712
2713         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2714 }
2715
2716 static int osc_statfs_interpret(const struct lu_env *env,
2717                                 struct ptlrpc_request *req,
2718                                 struct osc_async_args *aa, int rc)
2719 {
2720         struct obd_statfs *msfs;
2721         ENTRY;
2722
2723         if (rc == -EBADR)
2724                 /* The request has in fact never been sent
2725                  * due to issues at a higher level (LOV).
2726                  * Exit immediately since the caller is
2727                  * aware of the problem and takes care
2728                  * of the clean up */
2729                  RETURN(rc);
2730
2731         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2732             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2733                 GOTO(out, rc = 0);
2734
2735         if (rc != 0)
2736                 GOTO(out, rc);
2737
2738         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2739         if (msfs == NULL) {
2740                 GOTO(out, rc = -EPROTO);
2741         }
2742
2743         *aa->aa_oi->oi_osfs = *msfs;
2744 out:
2745         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2746         RETURN(rc);
2747 }
2748
2749 static int osc_statfs_async(struct obd_export *exp,
2750                             struct obd_info *oinfo, __u64 max_age,
2751                             struct ptlrpc_request_set *rqset)
2752 {
2753         struct obd_device     *obd = class_exp2obd(exp);
2754         struct ptlrpc_request *req;
2755         struct osc_async_args *aa;
2756         int                    rc;
2757         ENTRY;
2758
2759         /* We could possibly pass max_age in the request (as an absolute
2760          * timestamp or a "seconds.usec ago") so the target can avoid doing
2761          * extra calls into the filesystem if that isn't necessary (e.g.
2762          * during mount that would help a bit).  Having relative timestamps
2763          * is not so great if request processing is slow, while absolute
2764          * timestamps are not ideal because they need time synchronization. */
2765         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2766         if (req == NULL)
2767                 RETURN(-ENOMEM);
2768
2769         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2770         if (rc) {
2771                 ptlrpc_request_free(req);
2772                 RETURN(rc);
2773         }
2774         ptlrpc_request_set_replen(req);
2775         req->rq_request_portal = OST_CREATE_PORTAL;
2776         ptlrpc_at_set_req_timeout(req);
2777
2778         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2779                 /* procfs requests not want stat in wait for avoid deadlock */
2780                 req->rq_no_resend = 1;
2781                 req->rq_no_delay = 1;
2782         }
2783
2784         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2785         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2786         aa = ptlrpc_req_async_args(req);
2787         aa->aa_oi = oinfo;
2788
2789         ptlrpc_set_add_req(rqset, req);
2790         RETURN(0);
2791 }
2792
2793 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2794                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2795 {
2796         struct obd_device     *obd = class_exp2obd(exp);
2797         struct obd_statfs     *msfs;
2798         struct ptlrpc_request *req;
2799         struct obd_import     *imp = NULL;
2800         int rc;
2801         ENTRY;
2802
2803         /*Since the request might also come from lprocfs, so we need
2804          *sync this with client_disconnect_export Bug15684*/
2805         down_read(&obd->u.cli.cl_sem);
2806         if (obd->u.cli.cl_import)
2807                 imp = class_import_get(obd->u.cli.cl_import);
2808         up_read(&obd->u.cli.cl_sem);
2809         if (!imp)
2810                 RETURN(-ENODEV);
2811
2812         /* We could possibly pass max_age in the request (as an absolute
2813          * timestamp or a "seconds.usec ago") so the target can avoid doing
2814          * extra calls into the filesystem if that isn't necessary (e.g.
2815          * during mount that would help a bit).  Having relative timestamps
2816          * is not so great if request processing is slow, while absolute
2817          * timestamps are not ideal because they need time synchronization. */
2818         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2819
2820         class_import_put(imp);
2821
2822         if (req == NULL)
2823                 RETURN(-ENOMEM);
2824
2825         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2826         if (rc) {
2827                 ptlrpc_request_free(req);
2828                 RETURN(rc);
2829         }
2830         ptlrpc_request_set_replen(req);
2831         req->rq_request_portal = OST_CREATE_PORTAL;
2832         ptlrpc_at_set_req_timeout(req);
2833
2834         if (flags & OBD_STATFS_NODELAY) {
2835                 /* procfs requests not want stat in wait for avoid deadlock */
2836                 req->rq_no_resend = 1;
2837                 req->rq_no_delay = 1;
2838         }
2839
2840         rc = ptlrpc_queue_wait(req);
2841         if (rc)
2842                 GOTO(out, rc);
2843
2844         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2845         if (msfs == NULL) {
2846                 GOTO(out, rc = -EPROTO);
2847         }
2848
2849         *osfs = *msfs;
2850
2851         EXIT;
2852  out:
2853         ptlrpc_req_finished(req);
2854         return rc;
2855 }
2856
2857 /* Retrieve object striping information.
2858  *
2859  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2860  * the maximum number of OST indices which will fit in the user buffer.
2861  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2862  */
2863 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2864 {
2865         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2866         struct lov_user_md_v3 lum, *lumk;
2867         struct lov_user_ost_data_v1 *lmm_objects;
2868         int rc = 0, lum_size;
2869         ENTRY;
2870
2871         if (!lsm)
2872                 RETURN(-ENODATA);
2873
2874         /* we only need the header part from user space to get lmm_magic and
2875          * lmm_stripe_count, (the header part is common to v1 and v3) */
2876         lum_size = sizeof(struct lov_user_md_v1);
2877         if (cfs_copy_from_user(&lum, lump, lum_size))
2878                 RETURN(-EFAULT);
2879
2880         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2881             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2882                 RETURN(-EINVAL);
2883
2884         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2885         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2886         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2887         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2888
2889         /* we can use lov_mds_md_size() to compute lum_size
2890          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2891         if (lum.lmm_stripe_count > 0) {
2892                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2893                 OBD_ALLOC(lumk, lum_size);
2894                 if (!lumk)
2895                         RETURN(-ENOMEM);
2896
2897                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2898                         lmm_objects =
2899                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2900                 else
2901                         lmm_objects = &(lumk->lmm_objects[0]);
2902                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2903         } else {
2904                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2905                 lumk = &lum;
2906         }
2907
2908         lumk->lmm_oi = lsm->lsm_oi;
2909         lumk->lmm_stripe_count = 1;
2910
2911         if (cfs_copy_to_user(lump, lumk, lum_size))
2912                 rc = -EFAULT;
2913
2914         if (lumk != &lum)
2915                 OBD_FREE(lumk, lum_size);
2916
2917         RETURN(rc);
2918 }
2919
2920
2921 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2922                          void *karg, void *uarg)
2923 {
2924         struct obd_device *obd = exp->exp_obd;
2925         struct obd_ioctl_data *data = karg;
2926         int err = 0;
2927         ENTRY;
2928
2929         if (!cfs_try_module_get(THIS_MODULE)) {
2930                 CERROR("Can't get module. Is it alive?");
2931                 return -EINVAL;
2932         }
2933         switch (cmd) {
2934         case OBD_IOC_LOV_GET_CONFIG: {
2935                 char *buf;
2936                 struct lov_desc *desc;
2937                 struct obd_uuid uuid;
2938
2939                 buf = NULL;
2940                 len = 0;
2941                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2942                         GOTO(out, err = -EINVAL);
2943
2944                 data = (struct obd_ioctl_data *)buf;
2945
2946                 if (sizeof(*desc) > data->ioc_inllen1) {
2947                         obd_ioctl_freedata(buf, len);
2948                         GOTO(out, err = -EINVAL);
2949                 }
2950
2951                 if (data->ioc_inllen2 < sizeof(uuid)) {
2952                         obd_ioctl_freedata(buf, len);
2953                         GOTO(out, err = -EINVAL);
2954                 }
2955
2956                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2957                 desc->ld_tgt_count = 1;
2958                 desc->ld_active_tgt_count = 1;
2959                 desc->ld_default_stripe_count = 1;
2960                 desc->ld_default_stripe_size = 0;
2961                 desc->ld_default_stripe_offset = 0;
2962                 desc->ld_pattern = 0;
2963                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2964
2965                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2966
2967                 err = cfs_copy_to_user((void *)uarg, buf, len);
2968                 if (err)
2969                         err = -EFAULT;
2970                 obd_ioctl_freedata(buf, len);
2971                 GOTO(out, err);
2972         }
2973         case LL_IOC_LOV_SETSTRIPE:
2974                 err = obd_alloc_memmd(exp, karg);
2975                 if (err > 0)
2976                         err = 0;
2977                 GOTO(out, err);
2978         case LL_IOC_LOV_GETSTRIPE:
2979                 err = osc_getstripe(karg, uarg);
2980                 GOTO(out, err);
2981         case OBD_IOC_CLIENT_RECOVER:
2982                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2983                                             data->ioc_inlbuf1, 0);
2984                 if (err > 0)
2985                         err = 0;
2986                 GOTO(out, err);
2987         case IOC_OSC_SET_ACTIVE:
2988                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2989                                                data->ioc_offset);
2990                 GOTO(out, err);
2991         case OBD_IOC_POLL_QUOTACHECK:
2992                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2993                 GOTO(out, err);
2994         case OBD_IOC_PING_TARGET:
2995                 err = ptlrpc_obd_ping(obd);
2996                 GOTO(out, err);
2997         default:
2998                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2999                        cmd, cfs_curproc_comm());
3000                 GOTO(out, err = -ENOTTY);
3001         }
3002 out:
3003         cfs_module_put(THIS_MODULE);
3004         return err;
3005 }
3006
3007 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3008                         obd_count keylen, void *key, __u32 *vallen, void *val,
3009                         struct lov_stripe_md *lsm)
3010 {
3011         ENTRY;
3012         if (!vallen || !val)
3013                 RETURN(-EFAULT);
3014
3015         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3016                 __u32 *stripe = val;
3017                 *vallen = sizeof(*stripe);
3018                 *stripe = 0;
3019                 RETURN(0);
3020         } else if (KEY_IS(KEY_LAST_ID)) {
3021                 struct ptlrpc_request *req;
3022                 obd_id                *reply;
3023                 char                  *tmp;
3024                 int                    rc;
3025
3026                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3027                                            &RQF_OST_GET_INFO_LAST_ID);
3028                 if (req == NULL)
3029                         RETURN(-ENOMEM);
3030
3031                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3032                                      RCL_CLIENT, keylen);
3033                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3034                 if (rc) {
3035                         ptlrpc_request_free(req);
3036                         RETURN(rc);
3037                 }
3038
3039                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3040                 memcpy(tmp, key, keylen);
3041
3042                 req->rq_no_delay = req->rq_no_resend = 1;
3043                 ptlrpc_request_set_replen(req);
3044                 rc = ptlrpc_queue_wait(req);
3045                 if (rc)
3046                         GOTO(out, rc);
3047
3048                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3049                 if (reply == NULL)
3050                         GOTO(out, rc = -EPROTO);
3051
3052                 *((obd_id *)val) = *reply;
3053         out:
3054                 ptlrpc_req_finished(req);
3055                 RETURN(rc);
3056         } else if (KEY_IS(KEY_FIEMAP)) {
3057                 struct ptlrpc_request *req;
3058                 struct ll_user_fiemap *reply;
3059                 char *tmp;
3060                 int rc;
3061
3062                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3063                                            &RQF_OST_GET_INFO_FIEMAP);
3064                 if (req == NULL)
3065                         RETURN(-ENOMEM);
3066
3067                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3068                                      RCL_CLIENT, keylen);
3069                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3070                                      RCL_CLIENT, *vallen);
3071                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3072                                      RCL_SERVER, *vallen);
3073
3074                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3075                 if (rc) {
3076                         ptlrpc_request_free(req);
3077                         RETURN(rc);
3078                 }
3079
3080                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3081                 memcpy(tmp, key, keylen);
3082                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3083                 memcpy(tmp, val, *vallen);
3084
3085                 ptlrpc_request_set_replen(req);
3086                 rc = ptlrpc_queue_wait(req);
3087                 if (rc)
3088                         GOTO(out1, rc);
3089
3090                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3091                 if (reply == NULL)
3092                         GOTO(out1, rc = -EPROTO);
3093
3094                 memcpy(val, reply, *vallen);
3095         out1:
3096                 ptlrpc_req_finished(req);
3097
3098                 RETURN(rc);
3099         }
3100
3101         RETURN(-EINVAL);
3102 }
3103
3104 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3105                               obd_count keylen, void *key, obd_count vallen,
3106                               void *val, struct ptlrpc_request_set *set)
3107 {
3108         struct ptlrpc_request *req;
3109         struct obd_device     *obd = exp->exp_obd;
3110         struct obd_import     *imp = class_exp2cliimp(exp);
3111         char                  *tmp;
3112         int                    rc;
3113         ENTRY;
3114
3115         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3116
3117         if (KEY_IS(KEY_CHECKSUM)) {
3118                 if (vallen != sizeof(int))
3119                         RETURN(-EINVAL);
3120                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3121                 RETURN(0);
3122         }
3123
3124         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3125                 sptlrpc_conf_client_adapt(obd);
3126                 RETURN(0);
3127         }
3128
3129         if (KEY_IS(KEY_FLUSH_CTX)) {
3130                 sptlrpc_import_flush_my_ctx(imp);
3131                 RETURN(0);
3132         }
3133
3134         if (KEY_IS(KEY_CACHE_SET)) {
3135                 struct client_obd *cli = &obd->u.cli;
3136
3137                 LASSERT(cli->cl_cache == NULL); /* only once */
3138                 cli->cl_cache = (struct cl_client_cache *)val;
3139                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3140                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3141
3142                 /* add this osc into entity list */
3143                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3144                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3145                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3146                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3147
3148                 RETURN(0);
3149         }
3150
3151         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3152                 struct client_obd *cli = &obd->u.cli;
3153                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3154                 int target = *(int *)val;
3155
3156                 nr = osc_lru_shrink(cli, min(nr, target));
3157                 *(int *)val -= nr;
3158                 RETURN(0);
3159         }
3160
3161         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3162                 RETURN(-EINVAL);
3163
3164         /* We pass all other commands directly to OST. Since nobody calls osc
3165            methods directly and everybody is supposed to go through LOV, we
3166            assume lov checked invalid values for us.
3167            The only recognised values so far are evict_by_nid and mds_conn.
3168            Even if something bad goes through, we'd get a -EINVAL from OST
3169            anyway. */
3170
3171         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3172                                                 &RQF_OST_SET_GRANT_INFO :
3173                                                 &RQF_OBD_SET_INFO);
3174         if (req == NULL)
3175                 RETURN(-ENOMEM);
3176
3177         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3178                              RCL_CLIENT, keylen);
3179         if (!KEY_IS(KEY_GRANT_SHRINK))
3180                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3181                                      RCL_CLIENT, vallen);
3182         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3183         if (rc) {
3184                 ptlrpc_request_free(req);
3185                 RETURN(rc);
3186         }
3187
3188         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3189         memcpy(tmp, key, keylen);
3190         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3191                                                         &RMF_OST_BODY :
3192                                                         &RMF_SETINFO_VAL);
3193         memcpy(tmp, val, vallen);
3194
3195         if (KEY_IS(KEY_GRANT_SHRINK)) {
3196                 struct osc_grant_args *aa;
3197                 struct obdo *oa;
3198
3199                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3200                 aa = ptlrpc_req_async_args(req);
3201                 OBDO_ALLOC(oa);
3202                 if (!oa) {
3203                         ptlrpc_req_finished(req);
3204                         RETURN(-ENOMEM);
3205                 }
3206                 *oa = ((struct ost_body *)val)->oa;
3207                 aa->aa_oa = oa;
3208                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3209         }
3210
3211         ptlrpc_request_set_replen(req);
3212         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3213                 LASSERT(set != NULL);
3214                 ptlrpc_set_add_req(set, req);
3215                 ptlrpc_check_set(NULL, set);
3216         } else
3217                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3218
3219         RETURN(0);
3220 }
3221
3222
3223 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3224                          struct obd_device *disk_obd, int *index)
3225 {
3226         /* this code is not supposed to be used with LOD/OSP
3227          * to be removed soon */
3228         LBUG();
3229         return 0;
3230 }
3231
3232 static int osc_llog_finish(struct obd_device *obd, int count)
3233 {
3234         struct llog_ctxt *ctxt;
3235
3236         ENTRY;
3237
3238         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3239         if (ctxt) {
3240                 llog_cat_close(NULL, ctxt->loc_handle);
3241                 llog_cleanup(NULL, ctxt);
3242         }
3243
3244         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3245         if (ctxt)
3246                 llog_cleanup(NULL, ctxt);
3247         RETURN(0);
3248 }
3249
3250 static int osc_reconnect(const struct lu_env *env,
3251                          struct obd_export *exp, struct obd_device *obd,
3252                          struct obd_uuid *cluuid,
3253                          struct obd_connect_data *data,
3254                          void *localdata)
3255 {
3256         struct client_obd *cli = &obd->u.cli;
3257
3258         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3259                 long lost_grant;
3260
3261                 client_obd_list_lock(&cli->cl_loi_list_lock);
3262                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3263                                 2 * cli_brw_size(obd);
3264                 lost_grant = cli->cl_lost_grant;
3265                 cli->cl_lost_grant = 0;
3266                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3267
3268                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3269                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3270                        data->ocd_version, data->ocd_grant, lost_grant);
3271         }
3272
3273         RETURN(0);
3274 }
3275
3276 static int osc_disconnect(struct obd_export *exp)
3277 {
3278         struct obd_device *obd = class_exp2obd(exp);
3279         struct llog_ctxt  *ctxt;
3280         int rc;
3281
3282         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3283         if (ctxt) {
3284                 if (obd->u.cli.cl_conn_count == 1) {
3285                         /* Flush any remaining cancel messages out to the
3286                          * target */
3287                         llog_sync(ctxt, exp, 0);
3288                 }
3289                 llog_ctxt_put(ctxt);
3290         } else {
3291                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3292                        obd);
3293         }
3294
3295         rc = client_disconnect_export(exp);
3296         /**
3297          * Initially we put del_shrink_grant before disconnect_export, but it
3298          * causes the following problem if setup (connect) and cleanup
3299          * (disconnect) are tangled together.
3300          *      connect p1                     disconnect p2
3301          *   ptlrpc_connect_import
3302          *     ...............               class_manual_cleanup
3303          *                                     osc_disconnect
3304          *                                     del_shrink_grant
3305          *   ptlrpc_connect_interrupt
3306          *     init_grant_shrink
3307          *   add this client to shrink list
3308          *                                      cleanup_osc
3309          * Bang! pinger trigger the shrink.
3310          * So the osc should be disconnected from the shrink list, after we
3311          * are sure the import has been destroyed. BUG18662
3312          */
3313         if (obd->u.cli.cl_import == NULL)
3314                 osc_del_shrink_grant(&obd->u.cli);
3315         return rc;
3316 }
3317
3318 static int osc_import_event(struct obd_device *obd,
3319                             struct obd_import *imp,
3320                             enum obd_import_event event)
3321 {
3322         struct client_obd *cli;
3323         int rc = 0;
3324
3325         ENTRY;
3326         LASSERT(imp->imp_obd == obd);
3327
3328         switch (event) {
3329         case IMP_EVENT_DISCON: {
3330                 cli = &obd->u.cli;
3331                 client_obd_list_lock(&cli->cl_loi_list_lock);
3332                 cli->cl_avail_grant = 0;
3333                 cli->cl_lost_grant = 0;
3334                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3335                 break;
3336         }
3337         case IMP_EVENT_INACTIVE: {
3338                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3339                 break;
3340         }
3341         case IMP_EVENT_INVALIDATE: {
3342                 struct ldlm_namespace *ns = obd->obd_namespace;
3343                 struct lu_env         *env;
3344                 int                    refcheck;
3345
3346                 env = cl_env_get(&refcheck);
3347                 if (!IS_ERR(env)) {
3348                         /* Reset grants */
3349                         cli = &obd->u.cli;
3350                         /* all pages go to failing rpcs due to the invalid
3351                          * import */
3352                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3353
3354                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3355                         cl_env_put(env, &refcheck);
3356                 } else
3357                         rc = PTR_ERR(env);
3358                 break;
3359         }
3360         case IMP_EVENT_ACTIVE: {
3361                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3362                 break;
3363         }
3364         case IMP_EVENT_OCD: {
3365                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3366
3367                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3368                         osc_init_grant(&obd->u.cli, ocd);
3369
3370                 /* See bug 7198 */
3371                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3372                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3373
3374                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3375                 break;
3376         }
3377         case IMP_EVENT_DEACTIVATE: {
3378                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3379                 break;
3380         }
3381         case IMP_EVENT_ACTIVATE: {
3382                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3383                 break;
3384         }
3385         default:
3386                 CERROR("Unknown import event %d\n", event);
3387                 LBUG();
3388         }
3389         RETURN(rc);
3390 }
3391
3392 /**
3393  * Determine whether the lock can be canceled before replaying the lock
3394  * during recovery, see bug16774 for detailed information.
3395  *
3396  * \retval zero the lock can't be canceled
3397  * \retval other ok to cancel
3398  */
3399 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3400 {
3401         check_res_locked(lock->l_resource);
3402
3403         /*
3404          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3405          *
3406          * XXX as a future improvement, we can also cancel unused write lock
3407          * if it doesn't have dirty data and active mmaps.
3408          */
3409         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3410             (lock->l_granted_mode == LCK_PR ||
3411              lock->l_granted_mode == LCK_CR) &&
3412             (osc_dlm_lock_pageref(lock) == 0))
3413                 RETURN(1);
3414
3415         RETURN(0);
3416 }
3417
3418 static int brw_queue_work(const struct lu_env *env, void *data)
3419 {
3420         struct client_obd *cli = data;
3421
3422         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3423
3424         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3425         RETURN(0);
3426 }
3427
3428 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3429 {
3430         struct lprocfs_static_vars lvars = { 0 };
3431         struct client_obd          *cli = &obd->u.cli;
3432         void                       *handler;
3433         int                        rc;
3434         ENTRY;
3435
3436         rc = ptlrpcd_addref();
3437         if (rc)
3438                 RETURN(rc);
3439
3440         rc = client_obd_setup(obd, lcfg);
3441         if (rc)
3442                 GOTO(out_ptlrpcd, rc);
3443
3444         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3445         if (IS_ERR(handler))
3446                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3447         cli->cl_writeback_work = handler;
3448
3449         rc = osc_quota_setup(obd);
3450         if (rc)
3451                 GOTO(out_ptlrpcd_work, rc);
3452
3453         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3454         lprocfs_osc_init_vars(&lvars);
3455         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3456                 lproc_osc_attach_seqstat(obd);
3457                 sptlrpc_lprocfs_cliobd_attach(obd);
3458                 ptlrpc_lprocfs_register_obd(obd);
3459         }
3460
3461         /* We need to allocate a few requests more, because
3462          * brw_interpret tries to create new requests before freeing
3463          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3464          * reserved, but I'm afraid that might be too much wasted RAM
3465          * in fact, so 2 is just my guess and still should work. */
3466         cli->cl_import->imp_rq_pool =
3467                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3468                                     OST_MAXREQSIZE,
3469                                     ptlrpc_add_rqs_to_pool);
3470
3471         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3472         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3473         RETURN(rc);
3474
3475 out_ptlrpcd_work:
3476         ptlrpcd_destroy_work(handler);
3477 out_client_setup:
3478         client_obd_cleanup(obd);
3479 out_ptlrpcd:
3480         ptlrpcd_decref();
3481         RETURN(rc);
3482 }
3483
3484 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3485 {
3486         int rc = 0;
3487         ENTRY;
3488
3489         switch (stage) {
3490         case OBD_CLEANUP_EARLY: {
3491                 struct obd_import *imp;
3492                 imp = obd->u.cli.cl_import;
3493                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3494                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3495                 ptlrpc_deactivate_import(imp);
3496                 spin_lock(&imp->imp_lock);
3497                 imp->imp_pingable = 0;
3498                 spin_unlock(&imp->imp_lock);
3499                 break;
3500         }
3501         case OBD_CLEANUP_EXPORTS: {
3502                 struct client_obd *cli = &obd->u.cli;
3503                 /* LU-464
3504                  * for echo client, export may be on zombie list, wait for
3505                  * zombie thread to cull it, because cli.cl_import will be
3506                  * cleared in client_disconnect_export():
3507                  *   class_export_destroy() -> obd_cleanup() ->
3508                  *   echo_device_free() -> echo_client_cleanup() ->
3509                  *   obd_disconnect() -> osc_disconnect() ->
3510                  *   client_disconnect_export()
3511                  */
3512                 obd_zombie_barrier();
3513                 if (cli->cl_writeback_work) {
3514                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3515                         cli->cl_writeback_work = NULL;
3516                 }
3517                 obd_cleanup_client_import(obd);
3518                 ptlrpc_lprocfs_unregister_obd(obd);
3519                 lprocfs_obd_cleanup(obd);
3520                 rc = obd_llog_finish(obd, 0);
3521                 if (rc != 0)
3522                         CERROR("failed to cleanup llogging subsystems\n");
3523                 break;
3524                 }
3525         }
3526         RETURN(rc);
3527 }
3528
3529 int osc_cleanup(struct obd_device *obd)
3530 {
3531         struct client_obd *cli = &obd->u.cli;
3532         int rc;
3533
3534         ENTRY;
3535
3536         /* lru cleanup */
3537         if (cli->cl_cache != NULL) {
3538                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3539                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3540                 cfs_list_del_init(&cli->cl_lru_osc);
3541                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3542                 cli->cl_lru_left = NULL;
3543                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3544                 cli->cl_cache = NULL;
3545         }
3546
3547         /* free memory of osc quota cache */
3548         osc_quota_cleanup(obd);
3549
3550         rc = client_obd_cleanup(obd);
3551
3552         ptlrpcd_decref();
3553         RETURN(rc);
3554 }
3555
3556 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3557 {
3558         struct lprocfs_static_vars lvars = { 0 };
3559         int rc = 0;
3560
3561         lprocfs_osc_init_vars(&lvars);
3562
3563         switch (lcfg->lcfg_command) {
3564         default:
3565                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3566                                               lcfg, obd);
3567                 if (rc > 0)
3568                         rc = 0;
3569                 break;
3570         }
3571
3572         return(rc);
3573 }
3574
3575 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3576 {
3577         return osc_process_config_base(obd, buf);
3578 }
3579
3580 struct obd_ops osc_obd_ops = {
3581         .o_owner                = THIS_MODULE,
3582         .o_setup                = osc_setup,
3583         .o_precleanup           = osc_precleanup,
3584         .o_cleanup              = osc_cleanup,
3585         .o_add_conn             = client_import_add_conn,
3586         .o_del_conn             = client_import_del_conn,
3587         .o_connect              = client_connect_import,
3588         .o_reconnect            = osc_reconnect,
3589         .o_disconnect           = osc_disconnect,
3590         .o_statfs               = osc_statfs,
3591         .o_statfs_async         = osc_statfs_async,
3592         .o_packmd               = osc_packmd,
3593         .o_unpackmd             = osc_unpackmd,
3594         .o_create               = osc_create,
3595         .o_destroy              = osc_destroy,
3596         .o_getattr              = osc_getattr,
3597         .o_getattr_async        = osc_getattr_async,
3598         .o_setattr              = osc_setattr,
3599         .o_setattr_async        = osc_setattr_async,
3600         .o_brw                  = osc_brw,
3601         .o_punch                = osc_punch,
3602         .o_sync                 = osc_sync,
3603         .o_enqueue              = osc_enqueue,
3604         .o_change_cbdata        = osc_change_cbdata,
3605         .o_find_cbdata          = osc_find_cbdata,
3606         .o_cancel               = osc_cancel,
3607         .o_cancel_unused        = osc_cancel_unused,
3608         .o_iocontrol            = osc_iocontrol,
3609         .o_get_info             = osc_get_info,
3610         .o_set_info_async       = osc_set_info_async,
3611         .o_import_event         = osc_import_event,
3612         .o_llog_init            = osc_llog_init,
3613         .o_llog_finish          = osc_llog_finish,
3614         .o_process_config       = osc_process_config,
3615         .o_quotactl             = osc_quotactl,
3616         .o_quotacheck           = osc_quotacheck,
3617 };
3618
3619 extern struct lu_kmem_descr osc_caches[];
3620 extern spinlock_t osc_ast_guard;
3621 extern struct lock_class_key osc_ast_guard_class;
3622
3623 int __init osc_init(void)
3624 {
3625         struct lprocfs_static_vars lvars = { 0 };
3626         int rc;
3627         ENTRY;
3628
3629         /* print an address of _any_ initialized kernel symbol from this
3630          * module, to allow debugging with gdb that doesn't support data
3631          * symbols from modules.*/
3632         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3633
3634         rc = lu_kmem_init(osc_caches);
3635
3636         lprocfs_osc_init_vars(&lvars);
3637
3638         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3639                                  LUSTRE_OSC_NAME, &osc_device_type);
3640         if (rc) {
3641                 lu_kmem_fini(osc_caches);
3642                 RETURN(rc);
3643         }
3644
3645         spin_lock_init(&osc_ast_guard);
3646         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3647
3648         RETURN(rc);
3649 }
3650
3651 #ifdef __KERNEL__
3652 static void /*__exit*/ osc_exit(void)
3653 {
3654         class_unregister_type(LUSTRE_OSC_NAME);
3655         lu_kmem_fini(osc_caches);
3656 }
3657
3658 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3659 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3660 MODULE_LICENSE("GPL");
3661
3662 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3663 #endif