Whamcloud - gitweb
LU-3281 osc: some cleanup to reduce stack overflow chance
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
188         osc_pack_capa(req, body, oinfo->oi_capa);
189 }
190
191 static inline void osc_set_capa_size(struct ptlrpc_request *req,
192                                      const struct req_msg_field *field,
193                                      struct obd_capa *oc)
194 {
195         if (oc == NULL)
196                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
197         else
198                 /* it is already calculated as sizeof struct obd_capa */
199                 ;
200 }
201
202 static int osc_getattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body) {
214                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
215                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
216
217                 /* This should really be sent by the OST */
218                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
219                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
220         } else {
221                 CDEBUG(D_INFO, "can't unpack ost_body\n");
222                 rc = -EPROTO;
223                 aa->aa_oi->oi_oa->o_valid = 0;
224         }
225 out:
226         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
227         RETURN(rc);
228 }
229
230 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
231                              struct ptlrpc_request_set *set)
232 {
233         struct ptlrpc_request *req;
234         struct osc_async_args *aa;
235         int                    rc;
236         ENTRY;
237
238         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
239         if (req == NULL)
240                 RETURN(-ENOMEM);
241
242         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
243         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
244         if (rc) {
245                 ptlrpc_request_free(req);
246                 RETURN(rc);
247         }
248
249         osc_pack_req_body(req, oinfo);
250
251         ptlrpc_request_set_replen(req);
252         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
253
254         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
255         aa = ptlrpc_req_async_args(req);
256         aa->aa_oi = oinfo;
257
258         ptlrpc_set_add_req(set, req);
259         RETURN(0);
260 }
261
262 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
263                        struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298
299         EXIT;
300  out:
301         ptlrpc_req_finished(req);
302         return rc;
303 }
304
305 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306                        struct obd_info *oinfo, struct obd_trans_info *oti)
307 {
308         struct ptlrpc_request *req;
309         struct ost_body       *body;
310         int                    rc;
311         ENTRY;
312
313         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
314
315         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316         if (req == NULL)
317                 RETURN(-ENOMEM);
318
319         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
321         if (rc) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325
326         osc_pack_req_body(req, oinfo);
327
328         ptlrpc_request_set_replen(req);
329
330         rc = ptlrpc_queue_wait(req);
331         if (rc)
332                 GOTO(out, rc);
333
334         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335         if (body == NULL)
336                 GOTO(out, rc = -EPROTO);
337
338         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
339
340         EXIT;
341 out:
342         ptlrpc_req_finished(req);
343         RETURN(rc);
344 }
345
346 static int osc_setattr_interpret(const struct lu_env *env,
347                                  struct ptlrpc_request *req,
348                                  struct osc_setattr_args *sa, int rc)
349 {
350         struct ost_body *body;
351         ENTRY;
352
353         if (rc != 0)
354                 GOTO(out, rc);
355
356         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
357         if (body == NULL)
358                 GOTO(out, rc = -EPROTO);
359
360         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
361 out:
362         rc = sa->sa_upcall(sa->sa_cookie, rc);
363         RETURN(rc);
364 }
365
366 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
367                            struct obd_trans_info *oti,
368                            obd_enqueue_update_f upcall, void *cookie,
369                            struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request   *req;
372         struct osc_setattr_args *sa;
373         int                      rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         /* do mds to ost setattr asynchronously */
395         if (!rqset) {
396                 /* Do not wait for response. */
397                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
398         } else {
399                 req->rq_interpret_reply =
400                         (ptlrpc_interpterer_t)osc_setattr_interpret;
401
402                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
403                 sa = ptlrpc_req_async_args(req);
404                 sa->sa_oa = oinfo->oi_oa;
405                 sa->sa_upcall = upcall;
406                 sa->sa_cookie = cookie;
407
408                 if (rqset == PTLRPCD_SET)
409                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
410                 else
411                         ptlrpc_set_add_req(rqset, req);
412         }
413
414         RETURN(0);
415 }
416
417 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
418                              struct obd_trans_info *oti,
419                              struct ptlrpc_request_set *rqset)
420 {
421         return osc_setattr_async_base(exp, oinfo, oti,
422                                       oinfo->oi_cb_up, oinfo, rqset);
423 }
424
425 int osc_real_create(struct obd_export *exp, struct obdo *oa,
426                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
427 {
428         struct ptlrpc_request *req;
429         struct ost_body       *body;
430         struct lov_stripe_md  *lsm;
431         int                    rc;
432         ENTRY;
433
434         LASSERT(oa);
435         LASSERT(ea);
436
437         lsm = *ea;
438         if (!lsm) {
439                 rc = obd_alloc_memmd(exp, &lsm);
440                 if (rc < 0)
441                         RETURN(rc);
442         }
443
444         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
445         if (req == NULL)
446                 GOTO(out, rc = -ENOMEM);
447
448         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 GOTO(out, rc);
452         }
453
454         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
455         LASSERT(body);
456         lustre_set_wire_obdo(&body->oa, oa);
457
458         ptlrpc_request_set_replen(req);
459
460         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
461             oa->o_flags == OBD_FL_DELORPHAN) {
462                 DEBUG_REQ(D_HA, req,
463                           "delorphan from OST integration");
464                 /* Don't resend the delorphan req */
465                 req->rq_no_resend = req->rq_no_delay = 1;
466         }
467
468         rc = ptlrpc_queue_wait(req);
469         if (rc)
470                 GOTO(out_req, rc);
471
472         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
473         if (body == NULL)
474                 GOTO(out_req, rc = -EPROTO);
475
476         lustre_get_wire_obdo(oa, &body->oa);
477
478         oa->o_blksize = cli_brw_size(exp->exp_obd);
479         oa->o_valid |= OBD_MD_FLBLKSZ;
480
481         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
482          * have valid lsm_oinfo data structs, so don't go touching that.
483          * This needs to be fixed in a big way.
484          */
485         lsm->lsm_oi = oa->o_oi;
486         *ea = lsm;
487
488         if (oti != NULL) {
489                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
490
491                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
492                         if (!oti->oti_logcookies)
493                                 oti_alloc_cookies(oti, 1);
494                         *oti->oti_logcookies = oa->o_lcookie;
495                 }
496         }
497
498         CDEBUG(D_HA, "transno: "LPD64"\n",
499                lustre_msg_get_transno(req->rq_repmsg));
500 out_req:
501         ptlrpc_req_finished(req);
502 out:
503         if (rc && !*ea)
504                 obd_free_memmd(exp, &lsm);
505         RETURN(rc);
506 }
507
508 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
509                    obd_enqueue_update_f upcall, void *cookie,
510                    struct ptlrpc_request_set *rqset)
511 {
512         struct ptlrpc_request   *req;
513         struct osc_setattr_args *sa;
514         struct ost_body         *body;
515         int                      rc;
516         ENTRY;
517
518         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
519         if (req == NULL)
520                 RETURN(-ENOMEM);
521
522         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
523         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
524         if (rc) {
525                 ptlrpc_request_free(req);
526                 RETURN(rc);
527         }
528         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
529         ptlrpc_at_set_req_timeout(req);
530
531         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
532         LASSERT(body);
533         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
534         osc_pack_capa(req, body, oinfo->oi_capa);
535
536         ptlrpc_request_set_replen(req);
537
538         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
539         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
540         sa = ptlrpc_req_async_args(req);
541         sa->sa_oa     = oinfo->oi_oa;
542         sa->sa_upcall = upcall;
543         sa->sa_cookie = cookie;
544         if (rqset == PTLRPCD_SET)
545                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
546         else
547                 ptlrpc_set_add_req(rqset, req);
548
549         RETURN(0);
550 }
551
552 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
553                      struct obd_info *oinfo, struct obd_trans_info *oti,
554                      struct ptlrpc_request_set *rqset)
555 {
556         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
557         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
558         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
559         return osc_punch_base(exp, oinfo,
560                               oinfo->oi_cb_up, oinfo, rqset);
561 }
562
563 static int osc_sync_interpret(const struct lu_env *env,
564                               struct ptlrpc_request *req,
565                               void *arg, int rc)
566 {
567         struct osc_fsync_args *fa = arg;
568         struct ost_body *body;
569         ENTRY;
570
571         if (rc)
572                 GOTO(out, rc);
573
574         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575         if (body == NULL) {
576                 CERROR ("can't unpack ost_body\n");
577                 GOTO(out, rc = -EPROTO);
578         }
579
580         *fa->fa_oi->oi_oa = body->oa;
581 out:
582         rc = fa->fa_upcall(fa->fa_cookie, rc);
583         RETURN(rc);
584 }
585
586 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
587                   obd_enqueue_update_f upcall, void *cookie,
588                   struct ptlrpc_request_set *rqset)
589 {
590         struct ptlrpc_request *req;
591         struct ost_body       *body;
592         struct osc_fsync_args *fa;
593         int                    rc;
594         ENTRY;
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597         if (req == NULL)
598                 RETURN(-ENOMEM);
599
600         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
601         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         /* overload the size and blocks fields in the oa with start/end */
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
611         osc_pack_capa(req, body, oinfo->oi_capa);
612
613         ptlrpc_request_set_replen(req);
614         req->rq_interpret_reply = osc_sync_interpret;
615
616         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
617         fa = ptlrpc_req_async_args(req);
618         fa->fa_oi = oinfo;
619         fa->fa_upcall = upcall;
620         fa->fa_cookie = cookie;
621
622         if (rqset == PTLRPCD_SET)
623                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
624         else
625                 ptlrpc_set_add_req(rqset, req);
626
627         RETURN (0);
628 }
629
630 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
631                     struct obd_info *oinfo, obd_size start, obd_size end,
632                     struct ptlrpc_request_set *set)
633 {
634         ENTRY;
635
636         if (!oinfo->oi_oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         oinfo->oi_oa->o_size = start;
642         oinfo->oi_oa->o_blocks = end;
643         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
644
645         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
646 }
647
648 /* Find and cancel locally locks matched by @mode in the resource found by
649  * @objid. Found locks are added into @cancel list. Returns the amount of
650  * locks added to @cancels list. */
651 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
652                                    cfs_list_t *cancels,
653                                    ldlm_mode_t mode, int lock_flags)
654 {
655         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
656         struct ldlm_res_id res_id;
657         struct ldlm_resource *res;
658         int count;
659         ENTRY;
660
661         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
662          * export) but disabled through procfs (flag in NS).
663          *
664          * This distinguishes from a case when ELC is not supported originally,
665          * when we still want to cancel locks in advance and just cancel them
666          * locally, without sending any RPC. */
667         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
668                 RETURN(0);
669
670         ostid_build_res_name(&oa->o_oi, &res_id);
671         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
672         if (res == NULL)
673                 RETURN(0);
674
675         LDLM_RESOURCE_ADDREF(res);
676         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
677                                            lock_flags, 0, NULL);
678         LDLM_RESOURCE_DELREF(res);
679         ldlm_resource_putref(res);
680         RETURN(count);
681 }
682
683 static int osc_destroy_interpret(const struct lu_env *env,
684                                  struct ptlrpc_request *req, void *data,
685                                  int rc)
686 {
687         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
688
689         cfs_atomic_dec(&cli->cl_destroy_in_flight);
690         cfs_waitq_signal(&cli->cl_destroy_waitq);
691         return 0;
692 }
693
694 static int osc_can_send_destroy(struct client_obd *cli)
695 {
696         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
697             cli->cl_max_rpcs_in_flight) {
698                 /* The destroy request can be sent */
699                 return 1;
700         }
701         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
702             cli->cl_max_rpcs_in_flight) {
703                 /*
704                  * The counter has been modified between the two atomic
705                  * operations.
706                  */
707                 cfs_waitq_signal(&cli->cl_destroy_waitq);
708         }
709         return 0;
710 }
711
712 int osc_create(const struct lu_env *env, struct obd_export *exp,
713                struct obdo *oa, struct lov_stripe_md **ea,
714                struct obd_trans_info *oti)
715 {
716         int rc = 0;
717         ENTRY;
718
719         LASSERT(oa);
720         LASSERT(ea);
721         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
722
723         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
724             oa->o_flags == OBD_FL_RECREATE_OBJS) {
725                 RETURN(osc_real_create(exp, oa, ea, oti));
726         }
727
728         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
729                 RETURN(osc_real_create(exp, oa, ea, oti));
730
731         /* we should not get here anymore */
732         LBUG();
733
734         RETURN(rc);
735 }
736
737 /* Destroy requests can be async always on the client, and we don't even really
738  * care about the return code since the client cannot do anything at all about
739  * a destroy failure.
740  * When the MDS is unlinking a filename, it saves the file objects into a
741  * recovery llog, and these object records are cancelled when the OST reports
742  * they were destroyed and sync'd to disk (i.e. transaction committed).
743  * If the client dies, or the OST is down when the object should be destroyed,
744  * the records are not cancelled, and when the OST reconnects to the MDS next,
745  * it will retrieve the llog unlink logs and then sends the log cancellation
746  * cookies to the MDS after committing destroy transactions. */
747 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
748                        struct obdo *oa, struct lov_stripe_md *ea,
749                        struct obd_trans_info *oti, struct obd_export *md_export,
750                        void *capa)
751 {
752         struct client_obd     *cli = &exp->exp_obd->u.cli;
753         struct ptlrpc_request *req;
754         struct ost_body       *body;
755         CFS_LIST_HEAD(cancels);
756         int rc, count;
757         ENTRY;
758
759         if (!oa) {
760                 CDEBUG(D_INFO, "oa NULL\n");
761                 RETURN(-EINVAL);
762         }
763
764         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
765                                         LDLM_FL_DISCARD_DATA);
766
767         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
768         if (req == NULL) {
769                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
770                 RETURN(-ENOMEM);
771         }
772
773         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
774         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
775                                0, &cancels, count);
776         if (rc) {
777                 ptlrpc_request_free(req);
778                 RETURN(rc);
779         }
780
781         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
782         ptlrpc_at_set_req_timeout(req);
783
784         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
785                 oa->o_lcookie = *oti->oti_logcookies;
786         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
787         LASSERT(body);
788         lustre_set_wire_obdo(&body->oa, oa);
789
790         osc_pack_capa(req, body, (struct obd_capa *)capa);
791         ptlrpc_request_set_replen(req);
792
793         /* If osc_destory is for destroying the unlink orphan,
794          * sent from MDT to OST, which should not be blocked here,
795          * because the process might be triggered by ptlrpcd, and
796          * it is not good to block ptlrpcd thread (b=16006)*/
797         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
798                 req->rq_interpret_reply = osc_destroy_interpret;
799                 if (!osc_can_send_destroy(cli)) {
800                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
801                                                           NULL);
802
803                         /*
804                          * Wait until the number of on-going destroy RPCs drops
805                          * under max_rpc_in_flight
806                          */
807                         l_wait_event_exclusive(cli->cl_destroy_waitq,
808                                                osc_can_send_destroy(cli), &lwi);
809                 }
810         }
811
812         /* Do not wait for response */
813         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
814         RETURN(0);
815 }
816
817 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
818                                 long writing_bytes)
819 {
820         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
821
822         LASSERT(!(oa->o_valid & bits));
823
824         oa->o_valid |= bits;
825         client_obd_list_lock(&cli->cl_loi_list_lock);
826         oa->o_dirty = cli->cl_dirty;
827         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
828                      cli->cl_dirty_max)) {
829                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
830                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
831                 oa->o_undirty = 0;
832         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
833                             cfs_atomic_read(&obd_dirty_transit_pages) >
834                             (long)(obd_max_dirty_pages + 1))) {
835                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
836                  * not covered by a lock thus they may safely race and trip
837                  * this CERROR() unless we add in a small fudge factor (+1). */
838                 CERROR("dirty %d - %d > system dirty_max %d\n",
839                        cfs_atomic_read(&obd_dirty_pages),
840                        cfs_atomic_read(&obd_dirty_transit_pages),
841                        obd_max_dirty_pages);
842                 oa->o_undirty = 0;
843         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
844                 CERROR("dirty %lu - dirty_max %lu too big???\n",
845                        cli->cl_dirty, cli->cl_dirty_max);
846                 oa->o_undirty = 0;
847         } else {
848                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
849                                       CFS_PAGE_SHIFT)*
850                                      (cli->cl_max_rpcs_in_flight + 1);
851                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
852         }
853         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
854         oa->o_dropped = cli->cl_lost_grant;
855         cli->cl_lost_grant = 0;
856         client_obd_list_unlock(&cli->cl_loi_list_lock);
857         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
858                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
859
860 }
861
862 void osc_update_next_shrink(struct client_obd *cli)
863 {
864         cli->cl_next_shrink_grant =
865                 cfs_time_shift(cli->cl_grant_shrink_interval);
866         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
867                cli->cl_next_shrink_grant);
868 }
869
870 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
871 {
872         client_obd_list_lock(&cli->cl_loi_list_lock);
873         cli->cl_avail_grant += grant;
874         client_obd_list_unlock(&cli->cl_loi_list_lock);
875 }
876
877 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
878 {
879         if (body->oa.o_valid & OBD_MD_FLGRANT) {
880                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
881                 __osc_update_grant(cli, body->oa.o_grant);
882         }
883 }
884
885 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
886                               obd_count keylen, void *key, obd_count vallen,
887                               void *val, struct ptlrpc_request_set *set);
888
889 static int osc_shrink_grant_interpret(const struct lu_env *env,
890                                       struct ptlrpc_request *req,
891                                       void *aa, int rc)
892 {
893         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
894         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
895         struct ost_body *body;
896
897         if (rc != 0) {
898                 __osc_update_grant(cli, oa->o_grant);
899                 GOTO(out, rc);
900         }
901
902         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
903         LASSERT(body);
904         osc_update_grant(cli, body);
905 out:
906         OBDO_FREE(oa);
907         return rc;
908 }
909
910 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
911 {
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         oa->o_grant = cli->cl_avail_grant / 4;
914         cli->cl_avail_grant -= oa->o_grant;
915         client_obd_list_unlock(&cli->cl_loi_list_lock);
916         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
917                 oa->o_valid |= OBD_MD_FLFLAGS;
918                 oa->o_flags = 0;
919         }
920         oa->o_flags |= OBD_FL_SHRINK_GRANT;
921         osc_update_next_shrink(cli);
922 }
923
924 /* Shrink the current grant, either from some large amount to enough for a
925  * full set of in-flight RPCs, or if we have already shrunk to that limit
926  * then to enough for a single RPC.  This avoids keeping more grant than
927  * needed, and avoids shrinking the grant piecemeal. */
928 static int osc_shrink_grant(struct client_obd *cli)
929 {
930         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
931                              (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
932
933         client_obd_list_lock(&cli->cl_loi_list_lock);
934         if (cli->cl_avail_grant <= target_bytes)
935                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
936         client_obd_list_unlock(&cli->cl_loi_list_lock);
937
938         return osc_shrink_grant_to_target(cli, target_bytes);
939 }
940
941 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
942 {
943         int                     rc = 0;
944         struct ost_body        *body;
945         ENTRY;
946
947         client_obd_list_lock(&cli->cl_loi_list_lock);
948         /* Don't shrink if we are already above or below the desired limit
949          * We don't want to shrink below a single RPC, as that will negatively
950          * impact block allocation and long-term performance. */
951         if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
952                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
953
954         if (target_bytes >= cli->cl_avail_grant) {
955                 client_obd_list_unlock(&cli->cl_loi_list_lock);
956                 RETURN(0);
957         }
958         client_obd_list_unlock(&cli->cl_loi_list_lock);
959
960         OBD_ALLOC_PTR(body);
961         if (!body)
962                 RETURN(-ENOMEM);
963
964         osc_announce_cached(cli, &body->oa, 0);
965
966         client_obd_list_lock(&cli->cl_loi_list_lock);
967         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
968         cli->cl_avail_grant = target_bytes;
969         client_obd_list_unlock(&cli->cl_loi_list_lock);
970         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
971                 body->oa.o_valid |= OBD_MD_FLFLAGS;
972                 body->oa.o_flags = 0;
973         }
974         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
975         osc_update_next_shrink(cli);
976
977         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
978                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
979                                 sizeof(*body), body, NULL);
980         if (rc != 0)
981                 __osc_update_grant(cli, body->oa.o_grant);
982         OBD_FREE_PTR(body);
983         RETURN(rc);
984 }
985
986 static int osc_should_shrink_grant(struct client_obd *client)
987 {
988         cfs_time_t time = cfs_time_current();
989         cfs_time_t next_shrink = client->cl_next_shrink_grant;
990
991         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
992              OBD_CONNECT_GRANT_SHRINK) == 0)
993                 return 0;
994
995         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
996                 /* Get the current RPC size directly, instead of going via:
997                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
998                  * Keep comment here so that it can be found by searching. */
999                 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1000
1001                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1002                     client->cl_avail_grant > brw_size)
1003                         return 1;
1004                 else
1005                         osc_update_next_shrink(client);
1006         }
1007         return 0;
1008 }
1009
1010 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1011 {
1012         struct client_obd *client;
1013
1014         cfs_list_for_each_entry(client, &item->ti_obd_list,
1015                                 cl_grant_shrink_list) {
1016                 if (osc_should_shrink_grant(client))
1017                         osc_shrink_grant(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_add_shrink_grant(struct client_obd *client)
1023 {
1024         int rc;
1025
1026         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1027                                        TIMEOUT_GRANT,
1028                                        osc_grant_shrink_grant_cb, NULL,
1029                                        &client->cl_grant_shrink_list);
1030         if (rc) {
1031                 CERROR("add grant client %s error %d\n",
1032                         client->cl_import->imp_obd->obd_name, rc);
1033                 return rc;
1034         }
1035         CDEBUG(D_CACHE, "add grant client %s \n",
1036                client->cl_import->imp_obd->obd_name);
1037         osc_update_next_shrink(client);
1038         return 0;
1039 }
1040
1041 static int osc_del_shrink_grant(struct client_obd *client)
1042 {
1043         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1044                                          TIMEOUT_GRANT);
1045 }
1046
1047 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1048 {
1049         /*
1050          * ocd_grant is the total grant amount we're expect to hold: if we've
1051          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1052          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1053          *
1054          * race is tolerable here: if we're evicted, but imp_state already
1055          * left EVICTED state, then cl_dirty must be 0 already.
1056          */
1057         client_obd_list_lock(&cli->cl_loi_list_lock);
1058         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1059                 cli->cl_avail_grant = ocd->ocd_grant;
1060         else
1061                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1062
1063         if (cli->cl_avail_grant < 0) {
1064                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1065                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1066                       ocd->ocd_grant, cli->cl_dirty);
1067                 /* workaround for servers which do not have the patch from
1068                  * LU-2679 */
1069                 cli->cl_avail_grant = ocd->ocd_grant;
1070         }
1071
1072         /* determine the appropriate chunk size used by osc_extent. */
1073         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1074         client_obd_list_unlock(&cli->cl_loi_list_lock);
1075
1076         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1077                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1078                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1079
1080         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1081             cfs_list_empty(&cli->cl_grant_shrink_list))
1082                 osc_add_shrink_grant(cli);
1083 }
1084
1085 /* We assume that the reason this OSC got a short read is because it read
1086  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088  * this stripe never got written at or beyond this stripe offset yet. */
1089 static void handle_short_read(int nob_read, obd_count page_count,
1090                               struct brw_page **pga)
1091 {
1092         char *ptr;
1093         int i = 0;
1094
1095         /* skip bytes read OK */
1096         while (nob_read > 0) {
1097                 LASSERT (page_count > 0);
1098
1099                 if (pga[i]->count > nob_read) {
1100                         /* EOF inside this page */
1101                         ptr = cfs_kmap(pga[i]->pg) +
1102                                 (pga[i]->off & ~CFS_PAGE_MASK);
1103                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1104                         cfs_kunmap(pga[i]->pg);
1105                         page_count--;
1106                         i++;
1107                         break;
1108                 }
1109
1110                 nob_read -= pga[i]->count;
1111                 page_count--;
1112                 i++;
1113         }
1114
1115         /* zero remaining pages */
1116         while (page_count-- > 0) {
1117                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1118                 memset(ptr, 0, pga[i]->count);
1119                 cfs_kunmap(pga[i]->pg);
1120                 i++;
1121         }
1122 }
1123
1124 static int check_write_rcs(struct ptlrpc_request *req,
1125                            int requested_nob, int niocount,
1126                            obd_count page_count, struct brw_page **pga)
1127 {
1128         int     i;
1129         __u32   *remote_rcs;
1130
1131         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132                                                   sizeof(*remote_rcs) *
1133                                                   niocount);
1134         if (remote_rcs == NULL) {
1135                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1136                 return(-EPROTO);
1137         }
1138
1139         /* return error if any niobuf was in error */
1140         for (i = 0; i < niocount; i++) {
1141                 if ((int)remote_rcs[i] < 0)
1142                         return(remote_rcs[i]);
1143
1144                 if (remote_rcs[i] != 0) {
1145                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1146                                 i, remote_rcs[i], req);
1147                         return(-EPROTO);
1148                 }
1149         }
1150
1151         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1152                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1153                        req->rq_bulk->bd_nob_transferred, requested_nob);
1154                 return(-EPROTO);
1155         }
1156
1157         return (0);
1158 }
1159
1160 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1161 {
1162         if (p1->flag != p2->flag) {
1163                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1164                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at http://bugs.whamcloud.com/\n",
1171                               p1->flag, p2->flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->off + p1->count == p2->off);
1177 }
1178
1179 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1180                                    struct brw_page **pga, int opc,
1181                                    cksum_type_t cksum_type)
1182 {
1183         __u32                           cksum;
1184         int                             i = 0;
1185         struct cfs_crypto_hash_desc     *hdesc;
1186         unsigned int                    bufsize;
1187         int                             err;
1188         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1189
1190         LASSERT(pg_count > 0);
1191
1192         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1193         if (IS_ERR(hdesc)) {
1194                 CERROR("Unable to initialize checksum hash %s\n",
1195                        cfs_crypto_hash_name(cfs_alg));
1196                 return PTR_ERR(hdesc);
1197         }
1198
1199         while (nob > 0 && pg_count > 0) {
1200                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1201
1202                 /* corrupt the data before we compute the checksum, to
1203                  * simulate an OST->client data error */
1204                 if (i == 0 && opc == OST_READ &&
1205                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1206                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1207                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1208                         memcpy(ptr + off, "bad1", min(4, nob));
1209                         cfs_kunmap(pga[i]->pg);
1210                 }
1211                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1212                                   pga[i]->off & ~CFS_PAGE_MASK,
1213                                   count);
1214                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1215                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1216
1217                 nob -= pga[i]->count;
1218                 pg_count--;
1219                 i++;
1220         }
1221
1222         bufsize = 4;
1223         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1224
1225         if (err)
1226                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1227
1228         /* For sending we only compute the wrong checksum instead
1229          * of corrupting the data so it is still correct on a redo */
1230         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231                 cksum++;
1232
1233         return cksum;
1234 }
1235
1236 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1237                                 struct lov_stripe_md *lsm, obd_count page_count,
1238                                 struct brw_page **pga,
1239                                 struct ptlrpc_request **reqp,
1240                                 struct obd_capa *ocapa, int reserve,
1241                                 int resend)
1242 {
1243         struct ptlrpc_request   *req;
1244         struct ptlrpc_bulk_desc *desc;
1245         struct ost_body         *body;
1246         struct obd_ioobj        *ioobj;
1247         struct niobuf_remote    *niobuf;
1248         int niocount, i, requested_nob, opc, rc;
1249         struct osc_brw_async_args *aa;
1250         struct req_capsule      *pill;
1251         struct brw_page *pg_prev;
1252
1253         ENTRY;
1254         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1255                 RETURN(-ENOMEM); /* Recoverable */
1256         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1257                 RETURN(-EINVAL); /* Fatal */
1258
1259         if ((cmd & OBD_BRW_WRITE) != 0) {
1260                 opc = OST_WRITE;
1261                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1262                                                 cli->cl_import->imp_rq_pool,
1263                                                 &RQF_OST_BRW_WRITE);
1264         } else {
1265                 opc = OST_READ;
1266                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1267         }
1268         if (req == NULL)
1269                 RETURN(-ENOMEM);
1270
1271         for (niocount = i = 1; i < page_count; i++) {
1272                 if (!can_merge_pages(pga[i - 1], pga[i]))
1273                         niocount++;
1274         }
1275
1276         pill = &req->rq_pill;
1277         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1278                              sizeof(*ioobj));
1279         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1280                              niocount * sizeof(*niobuf));
1281         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1282
1283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1284         if (rc) {
1285                 ptlrpc_request_free(req);
1286                 RETURN(rc);
1287         }
1288         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1289         ptlrpc_at_set_req_timeout(req);
1290         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1291          * retry logic */
1292         req->rq_no_retry_einprogress = 1;
1293
1294         desc = ptlrpc_prep_bulk_imp(req, page_count,
1295                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1296                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1297                 OST_BULK_PORTAL);
1298
1299         if (desc == NULL)
1300                 GOTO(out, rc = -ENOMEM);
1301         /* NB request now owns desc and will free it when it gets freed */
1302
1303         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1304         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1305         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1307
1308         lustre_set_wire_obdo(&body->oa, oa);
1309
1310         obdo_to_ioobj(oa, ioobj);
1311         ioobj->ioo_bufcnt = niocount;
1312         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1313          * that might be send for this request.  The actual number is decided
1314          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1315          * "max - 1" for old client compatibility sending "0", and also so the
1316          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1317         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1318         osc_pack_capa(req, body, ocapa);
1319         LASSERT(page_count > 0);
1320         pg_prev = pga[0];
1321         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1322                 struct brw_page *pg = pga[i];
1323                 int poff = pg->off & ~CFS_PAGE_MASK;
1324
1325                 LASSERT(pg->count > 0);
1326                 /* make sure there is no gap in the middle of page array */
1327                 LASSERTF(page_count == 1 ||
1328                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1329                           ergo(i > 0 && i < page_count - 1,
1330                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1331                           ergo(i == page_count - 1, poff == 0)),
1332                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1333                          i, page_count, pg, pg->off, pg->count);
1334 #ifdef __linux__
1335                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1336                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1337                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1338                          i, page_count,
1339                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1340                          pg_prev->pg, page_private(pg_prev->pg),
1341                          pg_prev->pg->index, pg_prev->off);
1342 #else
1343                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1344                          "i %d p_c %u\n", i, page_count);
1345 #endif
1346                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1347                         (pg->flag & OBD_BRW_SRVLOCK));
1348
1349                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1350                 requested_nob += pg->count;
1351
1352                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1353                         niobuf--;
1354                         niobuf->len += pg->count;
1355                 } else {
1356                         niobuf->offset = pg->off;
1357                         niobuf->len    = pg->count;
1358                         niobuf->flags  = pg->flag;
1359                 }
1360                 pg_prev = pg;
1361         }
1362
1363         LASSERTF((void *)(niobuf - niocount) ==
1364                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1365                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1366                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1367
1368         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1369         if (resend) {
1370                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1372                         body->oa.o_flags = 0;
1373                 }
1374                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1375         }
1376
1377         if (osc_should_shrink_grant(cli))
1378                 osc_shrink_grant_local(cli, &body->oa);
1379
1380         /* size[REQ_REC_OFF] still sizeof (*body) */
1381         if (opc == OST_WRITE) {
1382                 if (cli->cl_checksum &&
1383                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1384                         /* store cl_cksum_type in a local variable since
1385                          * it can be changed via lprocfs */
1386                         cksum_type_t cksum_type = cli->cl_cksum_type;
1387
1388                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1389                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1390                                 body->oa.o_flags = 0;
1391                         }
1392                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1393                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1395                                                              page_count, pga,
1396                                                              OST_WRITE,
1397                                                              cksum_type);
1398                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1399                                body->oa.o_cksum);
1400                         /* save this in 'oa', too, for later checking */
1401                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1402                         oa->o_flags |= cksum_type_pack(cksum_type);
1403                 } else {
1404                         /* clear out the checksum flag, in case this is a
1405                          * resend but cl_checksum is no longer set. b=11238 */
1406                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1407                 }
1408                 oa->o_cksum = body->oa.o_cksum;
1409                 /* 1 RC per niobuf */
1410                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1411                                      sizeof(__u32) * niocount);
1412         } else {
1413                 if (cli->cl_checksum &&
1414                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1415                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1416                                 body->oa.o_flags = 0;
1417                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1418                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1419                 }
1420         }
1421         ptlrpc_request_set_replen(req);
1422
1423         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1424         aa = ptlrpc_req_async_args(req);
1425         aa->aa_oa = oa;
1426         aa->aa_requested_nob = requested_nob;
1427         aa->aa_nio_count = niocount;
1428         aa->aa_page_count = page_count;
1429         aa->aa_resends = 0;
1430         aa->aa_ppga = pga;
1431         aa->aa_cli = cli;
1432         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1433         if (ocapa && reserve)
1434                 aa->aa_ocapa = capa_get(ocapa);
1435
1436         *reqp = req;
1437         RETURN(0);
1438
1439  out:
1440         ptlrpc_req_finished(req);
1441         RETURN(rc);
1442 }
1443
1444 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1445                                 __u32 client_cksum, __u32 server_cksum, int nob,
1446                                 obd_count page_count, struct brw_page **pga,
1447                                 cksum_type_t client_cksum_type)
1448 {
1449         __u32 new_cksum;
1450         char *msg;
1451         cksum_type_t cksum_type;
1452
1453         if (server_cksum == client_cksum) {
1454                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1455                 return 0;
1456         }
1457
1458         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1459                                        oa->o_flags : 0);
1460         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1461                                       cksum_type);
1462
1463         if (cksum_type != client_cksum_type)
1464                 msg = "the server did not use the checksum type specified in "
1465                       "the original request - likely a protocol problem";
1466         else if (new_cksum == server_cksum)
1467                 msg = "changed on the client after we checksummed it - "
1468                       "likely false positive due to mmap IO (bug 11742)";
1469         else if (new_cksum == client_cksum)
1470                 msg = "changed in transit before arrival at OST";
1471         else
1472                 msg = "changed in transit AND doesn't match the original - "
1473                       "likely false positive due to mmap IO (bug 11742)";
1474
1475         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1476                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1477                            msg, libcfs_nid2str(peer->nid),
1478                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1479                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1480                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1481                            POSTID(&oa->o_oi), pga[0]->off,
1482                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1483         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1484                "client csum now %x\n", client_cksum, client_cksum_type,
1485                server_cksum, cksum_type, new_cksum);
1486         return 1;
1487 }
1488
1489 /* Note rc enters this function as number of bytes transferred */
1490 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1491 {
1492         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1493         const lnet_process_id_t *peer =
1494                         &req->rq_import->imp_connection->c_peer;
1495         struct client_obd *cli = aa->aa_cli;
1496         struct ost_body *body;
1497         __u32 client_cksum = 0;
1498         ENTRY;
1499
1500         if (rc < 0 && rc != -EDQUOT) {
1501                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1502                 RETURN(rc);
1503         }
1504
1505         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1506         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1507         if (body == NULL) {
1508                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1509                 RETURN(-EPROTO);
1510         }
1511
1512         /* set/clear over quota flag for a uid/gid */
1513         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1514             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1515                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1516
1517                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1518                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1519                        body->oa.o_flags);
1520                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1521         }
1522
1523         osc_update_grant(cli, body);
1524
1525         if (rc < 0)
1526                 RETURN(rc);
1527
1528         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1529                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1530
1531         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1532                 if (rc > 0) {
1533                         CERROR("Unexpected +ve rc %d\n", rc);
1534                         RETURN(-EPROTO);
1535                 }
1536                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1537
1538                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1539                         RETURN(-EAGAIN);
1540
1541                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1542                     check_write_checksum(&body->oa, peer, client_cksum,
1543                                          body->oa.o_cksum, aa->aa_requested_nob,
1544                                          aa->aa_page_count, aa->aa_ppga,
1545                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1546                         RETURN(-EAGAIN);
1547
1548                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1549                                      aa->aa_page_count, aa->aa_ppga);
1550                 GOTO(out, rc);
1551         }
1552
1553         /* The rest of this function executes only for OST_READs */
1554
1555         /* if unwrap_bulk failed, return -EAGAIN to retry */
1556         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1557         if (rc < 0)
1558                 GOTO(out, rc = -EAGAIN);
1559
1560         if (rc > aa->aa_requested_nob) {
1561                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1562                        aa->aa_requested_nob);
1563                 RETURN(-EPROTO);
1564         }
1565
1566         if (rc != req->rq_bulk->bd_nob_transferred) {
1567                 CERROR ("Unexpected rc %d (%d transferred)\n",
1568                         rc, req->rq_bulk->bd_nob_transferred);
1569                 return (-EPROTO);
1570         }
1571
1572         if (rc < aa->aa_requested_nob)
1573                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1574
1575         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1576                 static int cksum_counter;
1577                 __u32      server_cksum = body->oa.o_cksum;
1578                 char      *via;
1579                 char      *router;
1580                 cksum_type_t cksum_type;
1581
1582                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1583                                                body->oa.o_flags : 0);
1584                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1585                                                  aa->aa_ppga, OST_READ,
1586                                                  cksum_type);
1587
1588                 if (peer->nid == req->rq_bulk->bd_sender) {
1589                         via = router = "";
1590                 } else {
1591                         via = " via ";
1592                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1593                 }
1594
1595                 if (server_cksum == ~0 && rc > 0) {
1596                         CERROR("Protocol error: server %s set the 'checksum' "
1597                                "bit, but didn't send a checksum.  Not fatal, "
1598                                "but please notify on http://bugs.whamcloud.com/\n",
1599                                libcfs_nid2str(peer->nid));
1600                 } else if (server_cksum != client_cksum) {
1601                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1602                                            "%s%s%s inode "DFID" object "DOSTID
1603                                            " extent ["LPU64"-"LPU64"]\n",
1604                                            req->rq_import->imp_obd->obd_name,
1605                                            libcfs_nid2str(peer->nid),
1606                                            via, router,
1607                                            body->oa.o_valid & OBD_MD_FLFID ?
1608                                                 body->oa.o_parent_seq : (__u64)0,
1609                                            body->oa.o_valid & OBD_MD_FLFID ?
1610                                                 body->oa.o_parent_oid : 0,
1611                                            body->oa.o_valid & OBD_MD_FLFID ?
1612                                                 body->oa.o_parent_ver : 0,
1613                                            POSTID(&body->oa.o_oi),
1614                                            aa->aa_ppga[0]->off,
1615                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1616                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1617                                                                         1);
1618                         CERROR("client %x, server %x, cksum_type %x\n",
1619                                client_cksum, server_cksum, cksum_type);
1620                         cksum_counter = 0;
1621                         aa->aa_oa->o_cksum = client_cksum;
1622                         rc = -EAGAIN;
1623                 } else {
1624                         cksum_counter++;
1625                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1626                         rc = 0;
1627                 }
1628         } else if (unlikely(client_cksum)) {
1629                 static int cksum_missed;
1630
1631                 cksum_missed++;
1632                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1633                         CERROR("Checksum %u requested from %s but not sent\n",
1634                                cksum_missed, libcfs_nid2str(peer->nid));
1635         } else {
1636                 rc = 0;
1637         }
1638 out:
1639         if (rc >= 0)
1640                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1641
1642         RETURN(rc);
1643 }
1644
1645 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1646                             struct lov_stripe_md *lsm,
1647                             obd_count page_count, struct brw_page **pga,
1648                             struct obd_capa *ocapa)
1649 {
1650         struct ptlrpc_request *req;
1651         int                    rc;
1652         cfs_waitq_t            waitq;
1653         int                    generation, resends = 0;
1654         struct l_wait_info     lwi;
1655
1656         ENTRY;
1657
1658         cfs_waitq_init(&waitq);
1659         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1660
1661 restart_bulk:
1662         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1663                                   page_count, pga, &req, ocapa, 0, resends);
1664         if (rc != 0)
1665                 return (rc);
1666
1667         if (resends) {
1668                 req->rq_generation_set = 1;
1669                 req->rq_import_generation = generation;
1670                 req->rq_sent = cfs_time_current_sec() + resends;
1671         }
1672
1673         rc = ptlrpc_queue_wait(req);
1674
1675         if (rc == -ETIMEDOUT && req->rq_resend) {
1676                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1677                 ptlrpc_req_finished(req);
1678                 goto restart_bulk;
1679         }
1680
1681         rc = osc_brw_fini_request(req, rc);
1682
1683         ptlrpc_req_finished(req);
1684         /* When server return -EINPROGRESS, client should always retry
1685          * regardless of the number of times the bulk was resent already.*/
1686         if (osc_recoverable_error(rc)) {
1687                 resends++;
1688                 if (rc != -EINPROGRESS &&
1689                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1690                         CERROR("%s: too many resend retries for object: "
1691                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1692                                POSTID(&oa->o_oi), rc);
1693                         goto out;
1694                 }
1695                 if (generation !=
1696                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1697                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1698                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1699                                POSTID(&oa->o_oi), rc);
1700                         goto out;
1701                 }
1702
1703                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1704                                        NULL);
1705                 l_wait_event(waitq, 0, &lwi);
1706
1707                 goto restart_bulk;
1708         }
1709 out:
1710         if (rc == -EAGAIN || rc == -EINPROGRESS)
1711                 rc = -EIO;
1712         RETURN (rc);
1713 }
1714
1715 static int osc_brw_redo_request(struct ptlrpc_request *request,
1716                                 struct osc_brw_async_args *aa, int rc)
1717 {
1718         struct ptlrpc_request *new_req;
1719         struct osc_brw_async_args *new_aa;
1720         struct osc_async_page *oap;
1721         ENTRY;
1722
1723         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1724                   "redo for recoverable error %d", rc);
1725
1726         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1727                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1728                                   aa->aa_cli, aa->aa_oa,
1729                                   NULL /* lsm unused by osc currently */,
1730                                   aa->aa_page_count, aa->aa_ppga,
1731                                   &new_req, aa->aa_ocapa, 0, 1);
1732         if (rc)
1733                 RETURN(rc);
1734
1735         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1736                 if (oap->oap_request != NULL) {
1737                         LASSERTF(request == oap->oap_request,
1738                                  "request %p != oap_request %p\n",
1739                                  request, oap->oap_request);
1740                         if (oap->oap_interrupted) {
1741                                 ptlrpc_req_finished(new_req);
1742                                 RETURN(-EINTR);
1743                         }
1744                 }
1745         }
1746         /* New request takes over pga and oaps from old request.
1747          * Note that copying a list_head doesn't work, need to move it... */
1748         aa->aa_resends++;
1749         new_req->rq_interpret_reply = request->rq_interpret_reply;
1750         new_req->rq_async_args = request->rq_async_args;
1751         /* cap resend delay to the current request timeout, this is similar to
1752          * what ptlrpc does (see after_reply()) */
1753         if (aa->aa_resends > new_req->rq_timeout)
1754                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1755         else
1756                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1757         new_req->rq_generation_set = 1;
1758         new_req->rq_import_generation = request->rq_import_generation;
1759
1760         new_aa = ptlrpc_req_async_args(new_req);
1761
1762         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1763         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1764         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1765         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1766         new_aa->aa_resends = aa->aa_resends;
1767
1768         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1769                 if (oap->oap_request) {
1770                         ptlrpc_req_finished(oap->oap_request);
1771                         oap->oap_request = ptlrpc_request_addref(new_req);
1772                 }
1773         }
1774
1775         new_aa->aa_ocapa = aa->aa_ocapa;
1776         aa->aa_ocapa = NULL;
1777
1778         /* XXX: This code will run into problem if we're going to support
1779          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1780          * and wait for all of them to be finished. We should inherit request
1781          * set from old request. */
1782         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1783
1784         DEBUG_REQ(D_INFO, new_req, "new request");
1785         RETURN(0);
1786 }
1787
1788 /*
1789  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1790  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1791  * fine for our small page arrays and doesn't require allocation.  its an
1792  * insertion sort that swaps elements that are strides apart, shrinking the
1793  * stride down until its '1' and the array is sorted.
1794  */
1795 static void sort_brw_pages(struct brw_page **array, int num)
1796 {
1797         int stride, i, j;
1798         struct brw_page *tmp;
1799
1800         if (num == 1)
1801                 return;
1802         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1803                 ;
1804
1805         do {
1806                 stride /= 3;
1807                 for (i = stride ; i < num ; i++) {
1808                         tmp = array[i];
1809                         j = i;
1810                         while (j >= stride && array[j - stride]->off > tmp->off) {
1811                                 array[j] = array[j - stride];
1812                                 j -= stride;
1813                         }
1814                         array[j] = tmp;
1815                 }
1816         } while (stride > 1);
1817 }
1818
1819 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1820 {
1821         int count = 1;
1822         int offset;
1823         int i = 0;
1824
1825         LASSERT (pages > 0);
1826         offset = pg[i]->off & ~CFS_PAGE_MASK;
1827
1828         for (;;) {
1829                 pages--;
1830                 if (pages == 0)         /* that's all */
1831                         return count;
1832
1833                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1834                         return count;   /* doesn't end on page boundary */
1835
1836                 i++;
1837                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1838                 if (offset != 0)        /* doesn't start on page boundary */
1839                         return count;
1840
1841                 count++;
1842         }
1843 }
1844
1845 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1846 {
1847         struct brw_page **ppga;
1848         int i;
1849
1850         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1851         if (ppga == NULL)
1852                 return NULL;
1853
1854         for (i = 0; i < count; i++)
1855                 ppga[i] = pga + i;
1856         return ppga;
1857 }
1858
1859 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1860 {
1861         LASSERT(ppga != NULL);
1862         OBD_FREE(ppga, sizeof(*ppga) * count);
1863 }
1864
1865 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1866                    obd_count page_count, struct brw_page *pga,
1867                    struct obd_trans_info *oti)
1868 {
1869         struct obdo *saved_oa = NULL;
1870         struct brw_page **ppga, **orig;
1871         struct obd_import *imp = class_exp2cliimp(exp);
1872         struct client_obd *cli;
1873         int rc, page_count_orig;
1874         ENTRY;
1875
1876         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1877         cli = &imp->imp_obd->u.cli;
1878
1879         if (cmd & OBD_BRW_CHECK) {
1880                 /* The caller just wants to know if there's a chance that this
1881                  * I/O can succeed */
1882
1883                 if (imp->imp_invalid)
1884                         RETURN(-EIO);
1885                 RETURN(0);
1886         }
1887
1888         /* test_brw with a failed create can trip this, maybe others. */
1889         LASSERT(cli->cl_max_pages_per_rpc);
1890
1891         rc = 0;
1892
1893         orig = ppga = osc_build_ppga(pga, page_count);
1894         if (ppga == NULL)
1895                 RETURN(-ENOMEM);
1896         page_count_orig = page_count;
1897
1898         sort_brw_pages(ppga, page_count);
1899         while (page_count) {
1900                 obd_count pages_per_brw;
1901
1902                 if (page_count > cli->cl_max_pages_per_rpc)
1903                         pages_per_brw = cli->cl_max_pages_per_rpc;
1904                 else
1905                         pages_per_brw = page_count;
1906
1907                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1908
1909                 if (saved_oa != NULL) {
1910                         /* restore previously saved oa */
1911                         *oinfo->oi_oa = *saved_oa;
1912                 } else if (page_count > pages_per_brw) {
1913                         /* save a copy of oa (brw will clobber it) */
1914                         OBDO_ALLOC(saved_oa);
1915                         if (saved_oa == NULL)
1916                                 GOTO(out, rc = -ENOMEM);
1917                         *saved_oa = *oinfo->oi_oa;
1918                 }
1919
1920                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1921                                       pages_per_brw, ppga, oinfo->oi_capa);
1922
1923                 if (rc != 0)
1924                         break;
1925
1926                 page_count -= pages_per_brw;
1927                 ppga += pages_per_brw;
1928         }
1929
1930 out:
1931         osc_release_ppga(orig, page_count_orig);
1932
1933         if (saved_oa != NULL)
1934                 OBDO_FREE(saved_oa);
1935
1936         RETURN(rc);
1937 }
1938
1939 static int brw_interpret(const struct lu_env *env,
1940                          struct ptlrpc_request *req, void *data, int rc)
1941 {
1942         struct osc_brw_async_args *aa = data;
1943         struct osc_extent *ext;
1944         struct osc_extent *tmp;
1945         struct cl_object  *obj = NULL;
1946         struct client_obd *cli = aa->aa_cli;
1947         ENTRY;
1948
1949         rc = osc_brw_fini_request(req, rc);
1950         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1951         /* When server return -EINPROGRESS, client should always retry
1952          * regardless of the number of times the bulk was resent already. */
1953         if (osc_recoverable_error(rc)) {
1954                 if (req->rq_import_generation !=
1955                     req->rq_import->imp_generation) {
1956                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1957                                ""DOSTID", rc = %d.\n",
1958                                req->rq_import->imp_obd->obd_name,
1959                                POSTID(&aa->aa_oa->o_oi), rc);
1960                 } else if (rc == -EINPROGRESS ||
1961                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1962                         rc = osc_brw_redo_request(req, aa, rc);
1963                 } else {
1964                         CERROR("%s: too many resent retries for object: "
1965                                ""LPU64":"LPU64", rc = %d.\n",
1966                                req->rq_import->imp_obd->obd_name,
1967                                POSTID(&aa->aa_oa->o_oi), rc);
1968                 }
1969
1970                 if (rc == 0)
1971                         RETURN(0);
1972                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1973                         rc = -EIO;
1974         }
1975
1976         if (aa->aa_ocapa) {
1977                 capa_put(aa->aa_ocapa);
1978                 aa->aa_ocapa = NULL;
1979         }
1980
1981         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1982                 if (obj == NULL && rc == 0) {
1983                         obj = osc2cl(ext->oe_obj);
1984                         cl_object_get(obj);
1985                 }
1986
1987                 cfs_list_del_init(&ext->oe_link);
1988                 osc_extent_finish(env, ext, 1, rc);
1989         }
1990         LASSERT(cfs_list_empty(&aa->aa_exts));
1991         LASSERT(cfs_list_empty(&aa->aa_oaps));
1992
1993         if (obj != NULL) {
1994                 struct obdo *oa = aa->aa_oa;
1995                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1996                 unsigned long valid = 0;
1997
1998                 LASSERT(rc == 0);
1999                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2000                         attr->cat_blocks = oa->o_blocks;
2001                         valid |= CAT_BLOCKS;
2002                 }
2003                 if (oa->o_valid & OBD_MD_FLMTIME) {
2004                         attr->cat_mtime = oa->o_mtime;
2005                         valid |= CAT_MTIME;
2006                 }
2007                 if (oa->o_valid & OBD_MD_FLATIME) {
2008                         attr->cat_atime = oa->o_atime;
2009                         valid |= CAT_ATIME;
2010                 }
2011                 if (oa->o_valid & OBD_MD_FLCTIME) {
2012                         attr->cat_ctime = oa->o_ctime;
2013                         valid |= CAT_CTIME;
2014                 }
2015                 if (valid != 0) {
2016                         cl_object_attr_lock(obj);
2017                         cl_object_attr_set(env, obj, attr, valid);
2018                         cl_object_attr_unlock(obj);
2019                 }
2020                 cl_object_put(env, obj);
2021         }
2022         OBDO_FREE(aa->aa_oa);
2023
2024         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2025                           req->rq_bulk->bd_nob_transferred);
2026         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2027         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2028
2029         client_obd_list_lock(&cli->cl_loi_list_lock);
2030         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2031          * is called so we know whether to go to sync BRWs or wait for more
2032          * RPCs to complete */
2033         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2034                 cli->cl_w_in_flight--;
2035         else
2036                 cli->cl_r_in_flight--;
2037         osc_wake_cache_waiters(cli);
2038         client_obd_list_unlock(&cli->cl_loi_list_lock);
2039
2040         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2041         RETURN(rc);
2042 }
2043
2044 /**
2045  * Build an RPC by the list of extent @ext_list. The caller must ensure
2046  * that the total pages in this list are NOT over max pages per RPC.
2047  * Extents in the list must be in OES_RPC state.
2048  */
2049 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2050                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2051 {
2052         struct ptlrpc_request           *req = NULL;
2053         struct osc_extent               *ext;
2054         struct brw_page                 **pga = NULL;
2055         struct osc_brw_async_args       *aa = NULL;
2056         struct obdo                     *oa = NULL;
2057         struct osc_async_page           *oap;
2058         struct osc_async_page           *tmp;
2059         struct cl_req                   *clerq = NULL;
2060         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2061                                                                       CRT_READ;
2062         struct ldlm_lock                *lock = NULL;
2063         struct cl_req_attr              *crattr = NULL;
2064         obd_off                         starting_offset = OBD_OBJECT_EOF;
2065         obd_off                         ending_offset = 0;
2066         int                             mpflag = 0;
2067         int                             mem_tight = 0;
2068         int                             page_count = 0;
2069         int                             i;
2070         int                             rc;
2071         CFS_LIST_HEAD(rpc_list);
2072
2073         ENTRY;
2074         LASSERT(!cfs_list_empty(ext_list));
2075
2076         /* add pages into rpc_list to build BRW rpc */
2077         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2078                 LASSERT(ext->oe_state == OES_RPC);
2079                 mem_tight |= ext->oe_memalloc;
2080                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2081                         ++page_count;
2082                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2083                         if (starting_offset > oap->oap_obj_off)
2084                                 starting_offset = oap->oap_obj_off;
2085                         else
2086                                 LASSERT(oap->oap_page_off == 0);
2087                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2088                                 ending_offset = oap->oap_obj_off +
2089                                                 oap->oap_count;
2090                         else
2091                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2092                                         CFS_PAGE_SIZE);
2093                 }
2094         }
2095
2096         if (mem_tight)
2097                 mpflag = cfs_memory_pressure_get_and_set();
2098
2099         OBD_ALLOC(crattr, sizeof(*crattr));
2100         if (crattr == NULL)
2101                 GOTO(out, rc = -ENOMEM);
2102
2103         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2104         if (pga == NULL)
2105                 GOTO(out, rc = -ENOMEM);
2106
2107         OBDO_ALLOC(oa);
2108         if (oa == NULL)
2109                 GOTO(out, rc = -ENOMEM);
2110
2111         i = 0;
2112         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2113                 struct cl_page *page = oap2cl_page(oap);
2114                 if (clerq == NULL) {
2115                         clerq = cl_req_alloc(env, page, crt,
2116                                              1 /* only 1-object rpcs for now */);
2117                         if (IS_ERR(clerq))
2118                                 GOTO(out, rc = PTR_ERR(clerq));
2119                         lock = oap->oap_ldlm_lock;
2120                 }
2121                 if (mem_tight)
2122                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2123                 pga[i] = &oap->oap_brw_page;
2124                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2125                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2126                        pga[i]->pg, cfs_page_index(oap->oap_page), oap,
2127                        pga[i]->flag);
2128                 i++;
2129                 cl_req_page_add(env, clerq, page);
2130         }
2131
2132         /* always get the data for the obdo for the rpc */
2133         LASSERT(clerq != NULL);
2134         crattr->cra_oa = oa;
2135         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2136         if (lock) {
2137                 oa->o_handle = lock->l_remote_handle;
2138                 oa->o_valid |= OBD_MD_FLHANDLE;
2139         }
2140
2141         rc = cl_req_prep(env, clerq);
2142         if (rc != 0) {
2143                 CERROR("cl_req_prep failed: %d\n", rc);
2144                 GOTO(out, rc);
2145         }
2146
2147         sort_brw_pages(pga, page_count);
2148         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2149                         pga, &req, crattr->cra_capa, 1, 0);
2150         if (rc != 0) {
2151                 CERROR("prep_req failed: %d\n", rc);
2152                 GOTO(out, rc);
2153         }
2154
2155         req->rq_interpret_reply = brw_interpret;
2156         if (mem_tight != 0)
2157                 req->rq_memalloc = 1;
2158
2159         /* Need to update the timestamps after the request is built in case
2160          * we race with setattr (locally or in queue at OST).  If OST gets
2161          * later setattr before earlier BRW (as determined by the request xid),
2162          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2163          * way to do this in a single call.  bug 10150 */
2164         cl_req_attr_set(env, clerq, crattr,
2165                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2166
2167         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2168
2169         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2170         aa = ptlrpc_req_async_args(req);
2171         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2172         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2173         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2174         cfs_list_splice_init(ext_list, &aa->aa_exts);
2175         aa->aa_clerq = clerq;
2176
2177         /* queued sync pages can be torn down while the pages
2178          * were between the pending list and the rpc */
2179         tmp = NULL;
2180         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2181                 /* only one oap gets a request reference */
2182                 if (tmp == NULL)
2183                         tmp = oap;
2184                 if (oap->oap_interrupted && !req->rq_intr) {
2185                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2186                                         oap, req);
2187                         ptlrpc_mark_interrupted(req);
2188                 }
2189         }
2190         if (tmp != NULL)
2191                 tmp->oap_request = ptlrpc_request_addref(req);
2192
2193         client_obd_list_lock(&cli->cl_loi_list_lock);
2194         starting_offset >>= CFS_PAGE_SHIFT;
2195         if (cmd == OBD_BRW_READ) {
2196                 cli->cl_r_in_flight++;
2197                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2198                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2199                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2200                                       starting_offset + 1);
2201         } else {
2202                 cli->cl_w_in_flight++;
2203                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2204                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2205                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2206                                       starting_offset + 1);
2207         }
2208         client_obd_list_unlock(&cli->cl_loi_list_lock);
2209
2210         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2211                   page_count, aa, cli->cl_r_in_flight,
2212                   cli->cl_w_in_flight);
2213
2214         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2215          * see which CPU/NUMA node the majority of pages were allocated
2216          * on, and try to assign the async RPC to the CPU core
2217          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2218          *
2219          * But on the other hand, we expect that multiple ptlrpcd
2220          * threads and the initial write sponsor can run in parallel,
2221          * especially when data checksum is enabled, which is CPU-bound
2222          * operation and single ptlrpcd thread cannot process in time.
2223          * So more ptlrpcd threads sharing BRW load
2224          * (with PDL_POLICY_ROUND) seems better.
2225          */
2226         ptlrpcd_add_req(req, pol, -1);
2227         rc = 0;
2228         EXIT;
2229
2230 out:
2231         if (mem_tight != 0)
2232                 cfs_memory_pressure_restore(mpflag);
2233
2234         if (crattr != NULL) {
2235                 capa_put(crattr->cra_capa);
2236                 OBD_FREE(crattr, sizeof(*crattr));
2237         }
2238
2239         if (rc != 0) {
2240                 LASSERT(req == NULL);
2241
2242                 if (oa)
2243                         OBDO_FREE(oa);
2244                 if (pga)
2245                         OBD_FREE(pga, sizeof(*pga) * page_count);
2246                 /* this should happen rarely and is pretty bad, it makes the
2247                  * pending list not follow the dirty order */
2248                 while (!cfs_list_empty(ext_list)) {
2249                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2250                                              oe_link);
2251                         cfs_list_del_init(&ext->oe_link);
2252                         osc_extent_finish(env, ext, 0, rc);
2253                 }
2254                 if (clerq && !IS_ERR(clerq))
2255                         cl_req_completion(env, clerq, rc);
2256         }
2257         RETURN(rc);
2258 }
2259
2260 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2261                                         struct ldlm_enqueue_info *einfo)
2262 {
2263         void *data = einfo->ei_cbdata;
2264         int set = 0;
2265
2266         LASSERT(lock != NULL);
2267         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2268         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2269         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2270         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2271
2272         lock_res_and_lock(lock);
2273         spin_lock(&osc_ast_guard);
2274
2275         if (lock->l_ast_data == NULL)
2276                 lock->l_ast_data = data;
2277         if (lock->l_ast_data == data)
2278                 set = 1;
2279
2280         spin_unlock(&osc_ast_guard);
2281         unlock_res_and_lock(lock);
2282
2283         return set;
2284 }
2285
2286 static int osc_set_data_with_check(struct lustre_handle *lockh,
2287                                    struct ldlm_enqueue_info *einfo)
2288 {
2289         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2290         int set = 0;
2291
2292         if (lock != NULL) {
2293                 set = osc_set_lock_data_with_check(lock, einfo);
2294                 LDLM_LOCK_PUT(lock);
2295         } else
2296                 CERROR("lockh %p, data %p - client evicted?\n",
2297                        lockh, einfo->ei_cbdata);
2298         return set;
2299 }
2300
2301 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2302                              ldlm_iterator_t replace, void *data)
2303 {
2304         struct ldlm_res_id res_id;
2305         struct obd_device *obd = class_exp2obd(exp);
2306
2307         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2308         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2309         return 0;
2310 }
2311
2312 /* find any ldlm lock of the inode in osc
2313  * return 0    not find
2314  *        1    find one
2315  *      < 0    error */
2316 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2317                            ldlm_iterator_t replace, void *data)
2318 {
2319         struct ldlm_res_id res_id;
2320         struct obd_device *obd = class_exp2obd(exp);
2321         int rc = 0;
2322
2323         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2324         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2325         if (rc == LDLM_ITER_STOP)
2326                 return(1);
2327         if (rc == LDLM_ITER_CONTINUE)
2328                 return(0);
2329         return(rc);
2330 }
2331
2332 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2333                             obd_enqueue_update_f upcall, void *cookie,
2334                             __u64 *flags, int agl, int rc)
2335 {
2336         int intent = *flags & LDLM_FL_HAS_INTENT;
2337         ENTRY;
2338
2339         if (intent) {
2340                 /* The request was created before ldlm_cli_enqueue call. */
2341                 if (rc == ELDLM_LOCK_ABORTED) {
2342                         struct ldlm_reply *rep;
2343                         rep = req_capsule_server_get(&req->rq_pill,
2344                                                      &RMF_DLM_REP);
2345
2346                         LASSERT(rep != NULL);
2347                         if (rep->lock_policy_res1)
2348                                 rc = rep->lock_policy_res1;
2349                 }
2350         }
2351
2352         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2353             (rc == 0)) {
2354                 *flags |= LDLM_FL_LVB_READY;
2355                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2356                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2357         }
2358
2359         /* Call the update callback. */
2360         rc = (*upcall)(cookie, rc);
2361         RETURN(rc);
2362 }
2363
2364 static int osc_enqueue_interpret(const struct lu_env *env,
2365                                  struct ptlrpc_request *req,
2366                                  struct osc_enqueue_args *aa, int rc)
2367 {
2368         struct ldlm_lock *lock;
2369         struct lustre_handle handle;
2370         __u32 mode;
2371         struct ost_lvb *lvb;
2372         __u32 lvb_len;
2373         __u64 *flags = aa->oa_flags;
2374
2375         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2376          * might be freed anytime after lock upcall has been called. */
2377         lustre_handle_copy(&handle, aa->oa_lockh);
2378         mode = aa->oa_ei->ei_mode;
2379
2380         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2381          * be valid. */
2382         lock = ldlm_handle2lock(&handle);
2383
2384         /* Take an additional reference so that a blocking AST that
2385          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2386          * to arrive after an upcall has been executed by
2387          * osc_enqueue_fini(). */
2388         ldlm_lock_addref(&handle, mode);
2389
2390         /* Let CP AST to grant the lock first. */
2391         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2392
2393         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2394                 lvb = NULL;
2395                 lvb_len = 0;
2396         } else {
2397                 lvb = aa->oa_lvb;
2398                 lvb_len = sizeof(*aa->oa_lvb);
2399         }
2400
2401         /* Complete obtaining the lock procedure. */
2402         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2403                                    mode, flags, lvb, lvb_len, &handle, rc);
2404         /* Complete osc stuff. */
2405         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2406                               flags, aa->oa_agl, rc);
2407
2408         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2409
2410         /* Release the lock for async request. */
2411         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2412                 /*
2413                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2414                  * not already released by
2415                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2416                  */
2417                 ldlm_lock_decref(&handle, mode);
2418
2419         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2420                  aa->oa_lockh, req, aa);
2421         ldlm_lock_decref(&handle, mode);
2422         LDLM_LOCK_PUT(lock);
2423         return rc;
2424 }
2425
2426 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2427                         struct lov_oinfo *loi, int flags,
2428                         struct ost_lvb *lvb, __u32 mode, int rc)
2429 {
2430         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2431
2432         if (rc == ELDLM_OK) {
2433                 __u64 tmp;
2434
2435                 LASSERT(lock != NULL);
2436                 loi->loi_lvb = *lvb;
2437                 tmp = loi->loi_lvb.lvb_size;
2438                 /* Extend KMS up to the end of this lock and no further
2439                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2440                 if (tmp > lock->l_policy_data.l_extent.end)
2441                         tmp = lock->l_policy_data.l_extent.end + 1;
2442                 if (tmp >= loi->loi_kms) {
2443                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2444                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2445                         loi_kms_set(loi, tmp);
2446                 } else {
2447                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2448                                    LPU64"; leaving kms="LPU64", end="LPU64,
2449                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2450                                    lock->l_policy_data.l_extent.end);
2451                 }
2452                 ldlm_lock_allow_match(lock);
2453         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2454                 LASSERT(lock != NULL);
2455                 loi->loi_lvb = *lvb;
2456                 ldlm_lock_allow_match(lock);
2457                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2458                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2459                 rc = ELDLM_OK;
2460         }
2461
2462         if (lock != NULL) {
2463                 if (rc != ELDLM_OK)
2464                         ldlm_lock_fail_match(lock);
2465
2466                 LDLM_LOCK_PUT(lock);
2467         }
2468 }
2469 EXPORT_SYMBOL(osc_update_enqueue);
2470
2471 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2472
2473 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2474  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2475  * other synchronous requests, however keeping some locks and trying to obtain
2476  * others may take a considerable amount of time in a case of ost failure; and
2477  * when other sync requests do not get released lock from a client, the client
2478  * is excluded from the cluster -- such scenarious make the life difficult, so
2479  * release locks just after they are obtained. */
2480 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2481                      __u64 *flags, ldlm_policy_data_t *policy,
2482                      struct ost_lvb *lvb, int kms_valid,
2483                      obd_enqueue_update_f upcall, void *cookie,
2484                      struct ldlm_enqueue_info *einfo,
2485                      struct lustre_handle *lockh,
2486                      struct ptlrpc_request_set *rqset, int async, int agl)
2487 {
2488         struct obd_device *obd = exp->exp_obd;
2489         struct ptlrpc_request *req = NULL;
2490         int intent = *flags & LDLM_FL_HAS_INTENT;
2491         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2492         ldlm_mode_t mode;
2493         int rc;
2494         ENTRY;
2495
2496         /* Filesystem lock extents are extended to page boundaries so that
2497          * dealing with the page cache is a little smoother.  */
2498         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2499         policy->l_extent.end |= ~CFS_PAGE_MASK;
2500
2501         /*
2502          * kms is not valid when either object is completely fresh (so that no
2503          * locks are cached), or object was evicted. In the latter case cached
2504          * lock cannot be used, because it would prime inode state with
2505          * potentially stale LVB.
2506          */
2507         if (!kms_valid)
2508                 goto no_match;
2509
2510         /* Next, search for already existing extent locks that will cover us */
2511         /* If we're trying to read, we also search for an existing PW lock.  The
2512          * VFS and page cache already protect us locally, so lots of readers/
2513          * writers can share a single PW lock.
2514          *
2515          * There are problems with conversion deadlocks, so instead of
2516          * converting a read lock to a write lock, we'll just enqueue a new
2517          * one.
2518          *
2519          * At some point we should cancel the read lock instead of making them
2520          * send us a blocking callback, but there are problems with canceling
2521          * locks out from other users right now, too. */
2522         mode = einfo->ei_mode;
2523         if (einfo->ei_mode == LCK_PR)
2524                 mode |= LCK_PW;
2525         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2526                                einfo->ei_type, policy, mode, lockh, 0);
2527         if (mode) {
2528                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2529
2530                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2531                         /* For AGL, if enqueue RPC is sent but the lock is not
2532                          * granted, then skip to process this strpe.
2533                          * Return -ECANCELED to tell the caller. */
2534                         ldlm_lock_decref(lockh, mode);
2535                         LDLM_LOCK_PUT(matched);
2536                         RETURN(-ECANCELED);
2537                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2538                         *flags |= LDLM_FL_LVB_READY;
2539                         /* addref the lock only if not async requests and PW
2540                          * lock is matched whereas we asked for PR. */
2541                         if (!rqset && einfo->ei_mode != mode)
2542                                 ldlm_lock_addref(lockh, LCK_PR);
2543                         if (intent) {
2544                                 /* I would like to be able to ASSERT here that
2545                                  * rss <= kms, but I can't, for reasons which
2546                                  * are explained in lov_enqueue() */
2547                         }
2548
2549                         /* We already have a lock, and it's referenced.
2550                          *
2551                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2552                          * AGL upcall may change it to CLS_HELD directly. */
2553                         (*upcall)(cookie, ELDLM_OK);
2554
2555                         if (einfo->ei_mode != mode)
2556                                 ldlm_lock_decref(lockh, LCK_PW);
2557                         else if (rqset)
2558                                 /* For async requests, decref the lock. */
2559                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2560                         LDLM_LOCK_PUT(matched);
2561                         RETURN(ELDLM_OK);
2562                 } else {
2563                         ldlm_lock_decref(lockh, mode);
2564                         LDLM_LOCK_PUT(matched);
2565                 }
2566         }
2567
2568  no_match:
2569         if (intent) {
2570                 CFS_LIST_HEAD(cancels);
2571                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2572                                            &RQF_LDLM_ENQUEUE_LVB);
2573                 if (req == NULL)
2574                         RETURN(-ENOMEM);
2575
2576                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2577                 if (rc) {
2578                         ptlrpc_request_free(req);
2579                         RETURN(rc);
2580                 }
2581
2582                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2583                                      sizeof *lvb);
2584                 ptlrpc_request_set_replen(req);
2585         }
2586
2587         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2588         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2589
2590         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2591                               sizeof(*lvb), LVB_T_OST, lockh, async);
2592         if (rqset) {
2593                 if (!rc) {
2594                         struct osc_enqueue_args *aa;
2595                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2596                         aa = ptlrpc_req_async_args(req);
2597                         aa->oa_ei = einfo;
2598                         aa->oa_exp = exp;
2599                         aa->oa_flags  = flags;
2600                         aa->oa_upcall = upcall;
2601                         aa->oa_cookie = cookie;
2602                         aa->oa_lvb    = lvb;
2603                         aa->oa_lockh  = lockh;
2604                         aa->oa_agl    = !!agl;
2605
2606                         req->rq_interpret_reply =
2607                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2608                         if (rqset == PTLRPCD_SET)
2609                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2610                         else
2611                                 ptlrpc_set_add_req(rqset, req);
2612                 } else if (intent) {
2613                         ptlrpc_req_finished(req);
2614                 }
2615                 RETURN(rc);
2616         }
2617
2618         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2619         if (intent)
2620                 ptlrpc_req_finished(req);
2621
2622         RETURN(rc);
2623 }
2624
2625 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2626                        struct ldlm_enqueue_info *einfo,
2627                        struct ptlrpc_request_set *rqset)
2628 {
2629         struct ldlm_res_id res_id;
2630         int rc;
2631         ENTRY;
2632
2633         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2634         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2635                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2636                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2637                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2638                               rqset, rqset != NULL, 0);
2639         RETURN(rc);
2640 }
2641
2642 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2643                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2644                    int *flags, void *data, struct lustre_handle *lockh,
2645                    int unref)
2646 {
2647         struct obd_device *obd = exp->exp_obd;
2648         int lflags = *flags;
2649         ldlm_mode_t rc;
2650         ENTRY;
2651
2652         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2653                 RETURN(-EIO);
2654
2655         /* Filesystem lock extents are extended to page boundaries so that
2656          * dealing with the page cache is a little smoother */
2657         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2658         policy->l_extent.end |= ~CFS_PAGE_MASK;
2659
2660         /* Next, search for already existing extent locks that will cover us */
2661         /* If we're trying to read, we also search for an existing PW lock.  The
2662          * VFS and page cache already protect us locally, so lots of readers/
2663          * writers can share a single PW lock. */
2664         rc = mode;
2665         if (mode == LCK_PR)
2666                 rc |= LCK_PW;
2667         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2668                              res_id, type, policy, rc, lockh, unref);
2669         if (rc) {
2670                 if (data != NULL) {
2671                         if (!osc_set_data_with_check(lockh, data)) {
2672                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2673                                         ldlm_lock_decref(lockh, rc);
2674                                 RETURN(0);
2675                         }
2676                 }
2677                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2678                         ldlm_lock_addref(lockh, LCK_PR);
2679                         ldlm_lock_decref(lockh, LCK_PW);
2680                 }
2681                 RETURN(rc);
2682         }
2683         RETURN(rc);
2684 }
2685
2686 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2687 {
2688         ENTRY;
2689
2690         if (unlikely(mode == LCK_GROUP))
2691                 ldlm_lock_decref_and_cancel(lockh, mode);
2692         else
2693                 ldlm_lock_decref(lockh, mode);
2694
2695         RETURN(0);
2696 }
2697
2698 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2699                       __u32 mode, struct lustre_handle *lockh)
2700 {
2701         ENTRY;
2702         RETURN(osc_cancel_base(lockh, mode));
2703 }
2704
2705 static int osc_cancel_unused(struct obd_export *exp,
2706                              struct lov_stripe_md *lsm,
2707                              ldlm_cancel_flags_t flags,
2708                              void *opaque)
2709 {
2710         struct obd_device *obd = class_exp2obd(exp);
2711         struct ldlm_res_id res_id, *resp = NULL;
2712
2713         if (lsm != NULL) {
2714                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2715                 resp = &res_id;
2716         }
2717
2718         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2719 }
2720
2721 static int osc_statfs_interpret(const struct lu_env *env,
2722                                 struct ptlrpc_request *req,
2723                                 struct osc_async_args *aa, int rc)
2724 {
2725         struct obd_statfs *msfs;
2726         ENTRY;
2727
2728         if (rc == -EBADR)
2729                 /* The request has in fact never been sent
2730                  * due to issues at a higher level (LOV).
2731                  * Exit immediately since the caller is
2732                  * aware of the problem and takes care
2733                  * of the clean up */
2734                  RETURN(rc);
2735
2736         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2737             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2738                 GOTO(out, rc = 0);
2739
2740         if (rc != 0)
2741                 GOTO(out, rc);
2742
2743         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2744         if (msfs == NULL) {
2745                 GOTO(out, rc = -EPROTO);
2746         }
2747
2748         *aa->aa_oi->oi_osfs = *msfs;
2749 out:
2750         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2751         RETURN(rc);
2752 }
2753
2754 static int osc_statfs_async(struct obd_export *exp,
2755                             struct obd_info *oinfo, __u64 max_age,
2756                             struct ptlrpc_request_set *rqset)
2757 {
2758         struct obd_device     *obd = class_exp2obd(exp);
2759         struct ptlrpc_request *req;
2760         struct osc_async_args *aa;
2761         int                    rc;
2762         ENTRY;
2763
2764         /* We could possibly pass max_age in the request (as an absolute
2765          * timestamp or a "seconds.usec ago") so the target can avoid doing
2766          * extra calls into the filesystem if that isn't necessary (e.g.
2767          * during mount that would help a bit).  Having relative timestamps
2768          * is not so great if request processing is slow, while absolute
2769          * timestamps are not ideal because they need time synchronization. */
2770         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2771         if (req == NULL)
2772                 RETURN(-ENOMEM);
2773
2774         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2775         if (rc) {
2776                 ptlrpc_request_free(req);
2777                 RETURN(rc);
2778         }
2779         ptlrpc_request_set_replen(req);
2780         req->rq_request_portal = OST_CREATE_PORTAL;
2781         ptlrpc_at_set_req_timeout(req);
2782
2783         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2784                 /* procfs requests not want stat in wait for avoid deadlock */
2785                 req->rq_no_resend = 1;
2786                 req->rq_no_delay = 1;
2787         }
2788
2789         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2790         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2791         aa = ptlrpc_req_async_args(req);
2792         aa->aa_oi = oinfo;
2793
2794         ptlrpc_set_add_req(rqset, req);
2795         RETURN(0);
2796 }
2797
2798 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2799                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2800 {
2801         struct obd_device     *obd = class_exp2obd(exp);
2802         struct obd_statfs     *msfs;
2803         struct ptlrpc_request *req;
2804         struct obd_import     *imp = NULL;
2805         int rc;
2806         ENTRY;
2807
2808         /*Since the request might also come from lprocfs, so we need
2809          *sync this with client_disconnect_export Bug15684*/
2810         down_read(&obd->u.cli.cl_sem);
2811         if (obd->u.cli.cl_import)
2812                 imp = class_import_get(obd->u.cli.cl_import);
2813         up_read(&obd->u.cli.cl_sem);
2814         if (!imp)
2815                 RETURN(-ENODEV);
2816
2817         /* We could possibly pass max_age in the request (as an absolute
2818          * timestamp or a "seconds.usec ago") so the target can avoid doing
2819          * extra calls into the filesystem if that isn't necessary (e.g.
2820          * during mount that would help a bit).  Having relative timestamps
2821          * is not so great if request processing is slow, while absolute
2822          * timestamps are not ideal because they need time synchronization. */
2823         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2824
2825         class_import_put(imp);
2826
2827         if (req == NULL)
2828                 RETURN(-ENOMEM);
2829
2830         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2831         if (rc) {
2832                 ptlrpc_request_free(req);
2833                 RETURN(rc);
2834         }
2835         ptlrpc_request_set_replen(req);
2836         req->rq_request_portal = OST_CREATE_PORTAL;
2837         ptlrpc_at_set_req_timeout(req);
2838
2839         if (flags & OBD_STATFS_NODELAY) {
2840                 /* procfs requests not want stat in wait for avoid deadlock */
2841                 req->rq_no_resend = 1;
2842                 req->rq_no_delay = 1;
2843         }
2844
2845         rc = ptlrpc_queue_wait(req);
2846         if (rc)
2847                 GOTO(out, rc);
2848
2849         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2850         if (msfs == NULL) {
2851                 GOTO(out, rc = -EPROTO);
2852         }
2853
2854         *osfs = *msfs;
2855
2856         EXIT;
2857  out:
2858         ptlrpc_req_finished(req);
2859         return rc;
2860 }
2861
2862 /* Retrieve object striping information.
2863  *
2864  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2865  * the maximum number of OST indices which will fit in the user buffer.
2866  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2867  */
2868 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2869 {
2870         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2871         struct lov_user_md_v3 lum, *lumk;
2872         struct lov_user_ost_data_v1 *lmm_objects;
2873         int rc = 0, lum_size;
2874         ENTRY;
2875
2876         if (!lsm)
2877                 RETURN(-ENODATA);
2878
2879         /* we only need the header part from user space to get lmm_magic and
2880          * lmm_stripe_count, (the header part is common to v1 and v3) */
2881         lum_size = sizeof(struct lov_user_md_v1);
2882         if (cfs_copy_from_user(&lum, lump, lum_size))
2883                 RETURN(-EFAULT);
2884
2885         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2886             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2887                 RETURN(-EINVAL);
2888
2889         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2890         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2891         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2892         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2893
2894         /* we can use lov_mds_md_size() to compute lum_size
2895          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2896         if (lum.lmm_stripe_count > 0) {
2897                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2898                 OBD_ALLOC(lumk, lum_size);
2899                 if (!lumk)
2900                         RETURN(-ENOMEM);
2901
2902                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2903                         lmm_objects =
2904                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2905                 else
2906                         lmm_objects = &(lumk->lmm_objects[0]);
2907                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2908         } else {
2909                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2910                 lumk = &lum;
2911         }
2912
2913         lumk->lmm_oi = lsm->lsm_oi;
2914         lumk->lmm_stripe_count = 1;
2915
2916         if (cfs_copy_to_user(lump, lumk, lum_size))
2917                 rc = -EFAULT;
2918
2919         if (lumk != &lum)
2920                 OBD_FREE(lumk, lum_size);
2921
2922         RETURN(rc);
2923 }
2924
2925
2926 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2927                          void *karg, void *uarg)
2928 {
2929         struct obd_device *obd = exp->exp_obd;
2930         struct obd_ioctl_data *data = karg;
2931         int err = 0;
2932         ENTRY;
2933
2934         if (!cfs_try_module_get(THIS_MODULE)) {
2935                 CERROR("Can't get module. Is it alive?");
2936                 return -EINVAL;
2937         }
2938         switch (cmd) {
2939         case OBD_IOC_LOV_GET_CONFIG: {
2940                 char *buf;
2941                 struct lov_desc *desc;
2942                 struct obd_uuid uuid;
2943
2944                 buf = NULL;
2945                 len = 0;
2946                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2947                         GOTO(out, err = -EINVAL);
2948
2949                 data = (struct obd_ioctl_data *)buf;
2950
2951                 if (sizeof(*desc) > data->ioc_inllen1) {
2952                         obd_ioctl_freedata(buf, len);
2953                         GOTO(out, err = -EINVAL);
2954                 }
2955
2956                 if (data->ioc_inllen2 < sizeof(uuid)) {
2957                         obd_ioctl_freedata(buf, len);
2958                         GOTO(out, err = -EINVAL);
2959                 }
2960
2961                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2962                 desc->ld_tgt_count = 1;
2963                 desc->ld_active_tgt_count = 1;
2964                 desc->ld_default_stripe_count = 1;
2965                 desc->ld_default_stripe_size = 0;
2966                 desc->ld_default_stripe_offset = 0;
2967                 desc->ld_pattern = 0;
2968                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2969
2970                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2971
2972                 err = cfs_copy_to_user((void *)uarg, buf, len);
2973                 if (err)
2974                         err = -EFAULT;
2975                 obd_ioctl_freedata(buf, len);
2976                 GOTO(out, err);
2977         }
2978         case LL_IOC_LOV_SETSTRIPE:
2979                 err = obd_alloc_memmd(exp, karg);
2980                 if (err > 0)
2981                         err = 0;
2982                 GOTO(out, err);
2983         case LL_IOC_LOV_GETSTRIPE:
2984                 err = osc_getstripe(karg, uarg);
2985                 GOTO(out, err);
2986         case OBD_IOC_CLIENT_RECOVER:
2987                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2988                                             data->ioc_inlbuf1, 0);
2989                 if (err > 0)
2990                         err = 0;
2991                 GOTO(out, err);
2992         case IOC_OSC_SET_ACTIVE:
2993                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2994                                                data->ioc_offset);
2995                 GOTO(out, err);
2996         case OBD_IOC_POLL_QUOTACHECK:
2997                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2998                 GOTO(out, err);
2999         case OBD_IOC_PING_TARGET:
3000                 err = ptlrpc_obd_ping(obd);
3001                 GOTO(out, err);
3002         default:
3003                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3004                        cmd, cfs_curproc_comm());
3005                 GOTO(out, err = -ENOTTY);
3006         }
3007 out:
3008         cfs_module_put(THIS_MODULE);
3009         return err;
3010 }
3011
3012 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3013                         obd_count keylen, void *key, __u32 *vallen, void *val,
3014                         struct lov_stripe_md *lsm)
3015 {
3016         ENTRY;
3017         if (!vallen || !val)
3018                 RETURN(-EFAULT);
3019
3020         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3021                 __u32 *stripe = val;
3022                 *vallen = sizeof(*stripe);
3023                 *stripe = 0;
3024                 RETURN(0);
3025         } else if (KEY_IS(KEY_LAST_ID)) {
3026                 struct ptlrpc_request *req;
3027                 obd_id                *reply;
3028                 char                  *tmp;
3029                 int                    rc;
3030
3031                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3032                                            &RQF_OST_GET_INFO_LAST_ID);
3033                 if (req == NULL)
3034                         RETURN(-ENOMEM);
3035
3036                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3037                                      RCL_CLIENT, keylen);
3038                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3039                 if (rc) {
3040                         ptlrpc_request_free(req);
3041                         RETURN(rc);
3042                 }
3043
3044                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3045                 memcpy(tmp, key, keylen);
3046
3047                 req->rq_no_delay = req->rq_no_resend = 1;
3048                 ptlrpc_request_set_replen(req);
3049                 rc = ptlrpc_queue_wait(req);
3050                 if (rc)
3051                         GOTO(out, rc);
3052
3053                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3054                 if (reply == NULL)
3055                         GOTO(out, rc = -EPROTO);
3056
3057                 *((obd_id *)val) = *reply;
3058         out:
3059                 ptlrpc_req_finished(req);
3060                 RETURN(rc);
3061         } else if (KEY_IS(KEY_FIEMAP)) {
3062                 struct ptlrpc_request *req;
3063                 struct ll_user_fiemap *reply;
3064                 char *tmp;
3065                 int rc;
3066
3067                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3068                                            &RQF_OST_GET_INFO_FIEMAP);
3069                 if (req == NULL)
3070                         RETURN(-ENOMEM);
3071
3072                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3073                                      RCL_CLIENT, keylen);
3074                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3075                                      RCL_CLIENT, *vallen);
3076                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3077                                      RCL_SERVER, *vallen);
3078
3079                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3080                 if (rc) {
3081                         ptlrpc_request_free(req);
3082                         RETURN(rc);
3083                 }
3084
3085                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3086                 memcpy(tmp, key, keylen);
3087                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3088                 memcpy(tmp, val, *vallen);
3089
3090                 ptlrpc_request_set_replen(req);
3091                 rc = ptlrpc_queue_wait(req);
3092                 if (rc)
3093                         GOTO(out1, rc);
3094
3095                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3096                 if (reply == NULL)
3097                         GOTO(out1, rc = -EPROTO);
3098
3099                 memcpy(val, reply, *vallen);
3100         out1:
3101                 ptlrpc_req_finished(req);
3102
3103                 RETURN(rc);
3104         }
3105
3106         RETURN(-EINVAL);
3107 }
3108
3109 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3110                               obd_count keylen, void *key, obd_count vallen,
3111                               void *val, struct ptlrpc_request_set *set)
3112 {
3113         struct ptlrpc_request *req;
3114         struct obd_device     *obd = exp->exp_obd;
3115         struct obd_import     *imp = class_exp2cliimp(exp);
3116         char                  *tmp;
3117         int                    rc;
3118         ENTRY;
3119
3120         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3121
3122         if (KEY_IS(KEY_CHECKSUM)) {
3123                 if (vallen != sizeof(int))
3124                         RETURN(-EINVAL);
3125                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3126                 RETURN(0);
3127         }
3128
3129         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3130                 sptlrpc_conf_client_adapt(obd);
3131                 RETURN(0);
3132         }
3133
3134         if (KEY_IS(KEY_FLUSH_CTX)) {
3135                 sptlrpc_import_flush_my_ctx(imp);
3136                 RETURN(0);
3137         }
3138
3139         if (KEY_IS(KEY_CACHE_SET)) {
3140                 struct client_obd *cli = &obd->u.cli;
3141
3142                 LASSERT(cli->cl_cache == NULL); /* only once */
3143                 cli->cl_cache = (struct cl_client_cache *)val;
3144                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3145                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3146
3147                 /* add this osc into entity list */
3148                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3149                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3150                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3151                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3152
3153                 RETURN(0);
3154         }
3155
3156         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3157                 struct client_obd *cli = &obd->u.cli;
3158                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3159                 int target = *(int *)val;
3160
3161                 nr = osc_lru_shrink(cli, min(nr, target));
3162                 *(int *)val -= nr;
3163                 RETURN(0);
3164         }
3165
3166         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3167                 RETURN(-EINVAL);
3168
3169         /* We pass all other commands directly to OST. Since nobody calls osc
3170            methods directly and everybody is supposed to go through LOV, we
3171            assume lov checked invalid values for us.
3172            The only recognised values so far are evict_by_nid and mds_conn.
3173            Even if something bad goes through, we'd get a -EINVAL from OST
3174            anyway. */
3175
3176         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3177                                                 &RQF_OST_SET_GRANT_INFO :
3178                                                 &RQF_OBD_SET_INFO);
3179         if (req == NULL)
3180                 RETURN(-ENOMEM);
3181
3182         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3183                              RCL_CLIENT, keylen);
3184         if (!KEY_IS(KEY_GRANT_SHRINK))
3185                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3186                                      RCL_CLIENT, vallen);
3187         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3188         if (rc) {
3189                 ptlrpc_request_free(req);
3190                 RETURN(rc);
3191         }
3192
3193         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3194         memcpy(tmp, key, keylen);
3195         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3196                                                         &RMF_OST_BODY :
3197                                                         &RMF_SETINFO_VAL);
3198         memcpy(tmp, val, vallen);
3199
3200         if (KEY_IS(KEY_GRANT_SHRINK)) {
3201                 struct osc_grant_args *aa;
3202                 struct obdo *oa;
3203
3204                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3205                 aa = ptlrpc_req_async_args(req);
3206                 OBDO_ALLOC(oa);
3207                 if (!oa) {
3208                         ptlrpc_req_finished(req);
3209                         RETURN(-ENOMEM);
3210                 }
3211                 *oa = ((struct ost_body *)val)->oa;
3212                 aa->aa_oa = oa;
3213                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3214         }
3215
3216         ptlrpc_request_set_replen(req);
3217         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3218                 LASSERT(set != NULL);
3219                 ptlrpc_set_add_req(set, req);
3220                 ptlrpc_check_set(NULL, set);
3221         } else
3222                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3223
3224         RETURN(0);
3225 }
3226
3227
3228 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3229                          struct obd_device *disk_obd, int *index)
3230 {
3231         /* this code is not supposed to be used with LOD/OSP
3232          * to be removed soon */
3233         LBUG();
3234         return 0;
3235 }
3236
3237 static int osc_llog_finish(struct obd_device *obd, int count)
3238 {
3239         struct llog_ctxt *ctxt;
3240
3241         ENTRY;
3242
3243         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3244         if (ctxt) {
3245                 llog_cat_close(NULL, ctxt->loc_handle);
3246                 llog_cleanup(NULL, ctxt);
3247         }
3248
3249         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3250         if (ctxt)
3251                 llog_cleanup(NULL, ctxt);
3252         RETURN(0);
3253 }
3254
3255 static int osc_reconnect(const struct lu_env *env,
3256                          struct obd_export *exp, struct obd_device *obd,
3257                          struct obd_uuid *cluuid,
3258                          struct obd_connect_data *data,
3259                          void *localdata)
3260 {
3261         struct client_obd *cli = &obd->u.cli;
3262
3263         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3264                 long lost_grant;
3265
3266                 client_obd_list_lock(&cli->cl_loi_list_lock);
3267                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3268                                 2 * cli_brw_size(obd);
3269                 lost_grant = cli->cl_lost_grant;
3270                 cli->cl_lost_grant = 0;
3271                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3272
3273                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3274                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3275                        data->ocd_version, data->ocd_grant, lost_grant);
3276         }
3277
3278         RETURN(0);
3279 }
3280
3281 static int osc_disconnect(struct obd_export *exp)
3282 {
3283         struct obd_device *obd = class_exp2obd(exp);
3284         struct llog_ctxt  *ctxt;
3285         int rc;
3286
3287         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3288         if (ctxt) {
3289                 if (obd->u.cli.cl_conn_count == 1) {
3290                         /* Flush any remaining cancel messages out to the
3291                          * target */
3292                         llog_sync(ctxt, exp, 0);
3293                 }
3294                 llog_ctxt_put(ctxt);
3295         } else {
3296                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3297                        obd);
3298         }
3299
3300         rc = client_disconnect_export(exp);
3301         /**
3302          * Initially we put del_shrink_grant before disconnect_export, but it
3303          * causes the following problem if setup (connect) and cleanup
3304          * (disconnect) are tangled together.
3305          *      connect p1                     disconnect p2
3306          *   ptlrpc_connect_import
3307          *     ...............               class_manual_cleanup
3308          *                                     osc_disconnect
3309          *                                     del_shrink_grant
3310          *   ptlrpc_connect_interrupt
3311          *     init_grant_shrink
3312          *   add this client to shrink list
3313          *                                      cleanup_osc
3314          * Bang! pinger trigger the shrink.
3315          * So the osc should be disconnected from the shrink list, after we
3316          * are sure the import has been destroyed. BUG18662
3317          */
3318         if (obd->u.cli.cl_import == NULL)
3319                 osc_del_shrink_grant(&obd->u.cli);
3320         return rc;
3321 }
3322
3323 static int osc_import_event(struct obd_device *obd,
3324                             struct obd_import *imp,
3325                             enum obd_import_event event)
3326 {
3327         struct client_obd *cli;
3328         int rc = 0;
3329
3330         ENTRY;
3331         LASSERT(imp->imp_obd == obd);
3332
3333         switch (event) {
3334         case IMP_EVENT_DISCON: {
3335                 cli = &obd->u.cli;
3336                 client_obd_list_lock(&cli->cl_loi_list_lock);
3337                 cli->cl_avail_grant = 0;
3338                 cli->cl_lost_grant = 0;
3339                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3340                 break;
3341         }
3342         case IMP_EVENT_INACTIVE: {
3343                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3344                 break;
3345         }
3346         case IMP_EVENT_INVALIDATE: {
3347                 struct ldlm_namespace *ns = obd->obd_namespace;
3348                 struct lu_env         *env;
3349                 int                    refcheck;
3350
3351                 env = cl_env_get(&refcheck);
3352                 if (!IS_ERR(env)) {
3353                         /* Reset grants */
3354                         cli = &obd->u.cli;
3355                         /* all pages go to failing rpcs due to the invalid
3356                          * import */
3357                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3358
3359                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3360                         cl_env_put(env, &refcheck);
3361                 } else
3362                         rc = PTR_ERR(env);
3363                 break;
3364         }
3365         case IMP_EVENT_ACTIVE: {
3366                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3367                 break;
3368         }
3369         case IMP_EVENT_OCD: {
3370                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3371
3372                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3373                         osc_init_grant(&obd->u.cli, ocd);
3374
3375                 /* See bug 7198 */
3376                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3377                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3378
3379                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3380                 break;
3381         }
3382         case IMP_EVENT_DEACTIVATE: {
3383                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3384                 break;
3385         }
3386         case IMP_EVENT_ACTIVATE: {
3387                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3388                 break;
3389         }
3390         default:
3391                 CERROR("Unknown import event %d\n", event);
3392                 LBUG();
3393         }
3394         RETURN(rc);
3395 }
3396
3397 /**
3398  * Determine whether the lock can be canceled before replaying the lock
3399  * during recovery, see bug16774 for detailed information.
3400  *
3401  * \retval zero the lock can't be canceled
3402  * \retval other ok to cancel
3403  */
3404 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3405 {
3406         check_res_locked(lock->l_resource);
3407
3408         /*
3409          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3410          *
3411          * XXX as a future improvement, we can also cancel unused write lock
3412          * if it doesn't have dirty data and active mmaps.
3413          */
3414         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3415             (lock->l_granted_mode == LCK_PR ||
3416              lock->l_granted_mode == LCK_CR) &&
3417             (osc_dlm_lock_pageref(lock) == 0))
3418                 RETURN(1);
3419
3420         RETURN(0);
3421 }
3422
3423 static int brw_queue_work(const struct lu_env *env, void *data)
3424 {
3425         struct client_obd *cli = data;
3426
3427         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3428
3429         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3430         RETURN(0);
3431 }
3432
3433 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3434 {
3435         struct lprocfs_static_vars lvars = { 0 };
3436         struct client_obd          *cli = &obd->u.cli;
3437         void                       *handler;
3438         int                        rc;
3439         ENTRY;
3440
3441         rc = ptlrpcd_addref();
3442         if (rc)
3443                 RETURN(rc);
3444
3445         rc = client_obd_setup(obd, lcfg);
3446         if (rc)
3447                 GOTO(out_ptlrpcd, rc);
3448
3449         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3450         if (IS_ERR(handler))
3451                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3452         cli->cl_writeback_work = handler;
3453
3454         rc = osc_quota_setup(obd);
3455         if (rc)
3456                 GOTO(out_ptlrpcd_work, rc);
3457
3458         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3459         lprocfs_osc_init_vars(&lvars);
3460         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3461                 lproc_osc_attach_seqstat(obd);
3462                 sptlrpc_lprocfs_cliobd_attach(obd);
3463                 ptlrpc_lprocfs_register_obd(obd);
3464         }
3465
3466         /* We need to allocate a few requests more, because
3467          * brw_interpret tries to create new requests before freeing
3468          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3469          * reserved, but I'm afraid that might be too much wasted RAM
3470          * in fact, so 2 is just my guess and still should work. */
3471         cli->cl_import->imp_rq_pool =
3472                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3473                                     OST_MAXREQSIZE,
3474                                     ptlrpc_add_rqs_to_pool);
3475
3476         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3477         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3478         RETURN(rc);
3479
3480 out_ptlrpcd_work:
3481         ptlrpcd_destroy_work(handler);
3482 out_client_setup:
3483         client_obd_cleanup(obd);
3484 out_ptlrpcd:
3485         ptlrpcd_decref();
3486         RETURN(rc);
3487 }
3488
3489 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3490 {
3491         int rc = 0;
3492         ENTRY;
3493
3494         switch (stage) {
3495         case OBD_CLEANUP_EARLY: {
3496                 struct obd_import *imp;
3497                 imp = obd->u.cli.cl_import;
3498                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3499                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3500                 ptlrpc_deactivate_import(imp);
3501                 spin_lock(&imp->imp_lock);
3502                 imp->imp_pingable = 0;
3503                 spin_unlock(&imp->imp_lock);
3504                 break;
3505         }
3506         case OBD_CLEANUP_EXPORTS: {
3507                 struct client_obd *cli = &obd->u.cli;
3508                 /* LU-464
3509                  * for echo client, export may be on zombie list, wait for
3510                  * zombie thread to cull it, because cli.cl_import will be
3511                  * cleared in client_disconnect_export():
3512                  *   class_export_destroy() -> obd_cleanup() ->
3513                  *   echo_device_free() -> echo_client_cleanup() ->
3514                  *   obd_disconnect() -> osc_disconnect() ->
3515                  *   client_disconnect_export()
3516                  */
3517                 obd_zombie_barrier();
3518                 if (cli->cl_writeback_work) {
3519                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3520                         cli->cl_writeback_work = NULL;
3521                 }
3522                 obd_cleanup_client_import(obd);
3523                 ptlrpc_lprocfs_unregister_obd(obd);
3524                 lprocfs_obd_cleanup(obd);
3525                 rc = obd_llog_finish(obd, 0);
3526                 if (rc != 0)
3527                         CERROR("failed to cleanup llogging subsystems\n");
3528                 break;
3529                 }
3530         }
3531         RETURN(rc);
3532 }
3533
3534 int osc_cleanup(struct obd_device *obd)
3535 {
3536         struct client_obd *cli = &obd->u.cli;
3537         int rc;
3538
3539         ENTRY;
3540
3541         /* lru cleanup */
3542         if (cli->cl_cache != NULL) {
3543                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3544                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3545                 cfs_list_del_init(&cli->cl_lru_osc);
3546                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3547                 cli->cl_lru_left = NULL;
3548                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3549                 cli->cl_cache = NULL;
3550         }
3551
3552         /* free memory of osc quota cache */
3553         osc_quota_cleanup(obd);
3554
3555         rc = client_obd_cleanup(obd);
3556
3557         ptlrpcd_decref();
3558         RETURN(rc);
3559 }
3560
3561 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3562 {
3563         struct lprocfs_static_vars lvars = { 0 };
3564         int rc = 0;
3565
3566         lprocfs_osc_init_vars(&lvars);
3567
3568         switch (lcfg->lcfg_command) {
3569         default:
3570                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3571                                               lcfg, obd);
3572                 if (rc > 0)
3573                         rc = 0;
3574                 break;
3575         }
3576
3577         return(rc);
3578 }
3579
3580 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3581 {
3582         return osc_process_config_base(obd, buf);
3583 }
3584
3585 struct obd_ops osc_obd_ops = {
3586         .o_owner                = THIS_MODULE,
3587         .o_setup                = osc_setup,
3588         .o_precleanup           = osc_precleanup,
3589         .o_cleanup              = osc_cleanup,
3590         .o_add_conn             = client_import_add_conn,
3591         .o_del_conn             = client_import_del_conn,
3592         .o_connect              = client_connect_import,
3593         .o_reconnect            = osc_reconnect,
3594         .o_disconnect           = osc_disconnect,
3595         .o_statfs               = osc_statfs,
3596         .o_statfs_async         = osc_statfs_async,
3597         .o_packmd               = osc_packmd,
3598         .o_unpackmd             = osc_unpackmd,
3599         .o_create               = osc_create,
3600         .o_destroy              = osc_destroy,
3601         .o_getattr              = osc_getattr,
3602         .o_getattr_async        = osc_getattr_async,
3603         .o_setattr              = osc_setattr,
3604         .o_setattr_async        = osc_setattr_async,
3605         .o_brw                  = osc_brw,
3606         .o_punch                = osc_punch,
3607         .o_sync                 = osc_sync,
3608         .o_enqueue              = osc_enqueue,
3609         .o_change_cbdata        = osc_change_cbdata,
3610         .o_find_cbdata          = osc_find_cbdata,
3611         .o_cancel               = osc_cancel,
3612         .o_cancel_unused        = osc_cancel_unused,
3613         .o_iocontrol            = osc_iocontrol,
3614         .o_get_info             = osc_get_info,
3615         .o_set_info_async       = osc_set_info_async,
3616         .o_import_event         = osc_import_event,
3617         .o_llog_init            = osc_llog_init,
3618         .o_llog_finish          = osc_llog_finish,
3619         .o_process_config       = osc_process_config,
3620         .o_quotactl             = osc_quotactl,
3621         .o_quotacheck           = osc_quotacheck,
3622 };
3623
3624 extern struct lu_kmem_descr osc_caches[];
3625 extern spinlock_t osc_ast_guard;
3626 extern struct lock_class_key osc_ast_guard_class;
3627
3628 int __init osc_init(void)
3629 {
3630         struct lprocfs_static_vars lvars = { 0 };
3631         int rc;
3632         ENTRY;
3633
3634         /* print an address of _any_ initialized kernel symbol from this
3635          * module, to allow debugging with gdb that doesn't support data
3636          * symbols from modules.*/
3637         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3638
3639         rc = lu_kmem_init(osc_caches);
3640
3641         lprocfs_osc_init_vars(&lvars);
3642
3643         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3644                                  LUSTRE_OSC_NAME, &osc_device_type);
3645         if (rc) {
3646                 lu_kmem_fini(osc_caches);
3647                 RETURN(rc);
3648         }
3649
3650         spin_lock_init(&osc_ast_guard);
3651         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3652
3653         RETURN(rc);
3654 }
3655
3656 #ifdef __KERNEL__
3657 static void /*__exit*/ osc_exit(void)
3658 {
3659         class_unregister_type(LUSTRE_OSC_NAME);
3660         lu_kmem_fini(osc_caches);
3661 }
3662
3663 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3664 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3665 MODULE_LICENSE("GPL");
3666
3667 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3668 #endif