Whamcloud - gitweb
43f0c7e603290ba536841d1127586e6a885b8681
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
188         osc_pack_capa(req, body, oinfo->oi_capa);
189 }
190
191 static inline void osc_set_capa_size(struct ptlrpc_request *req,
192                                      const struct req_msg_field *field,
193                                      struct obd_capa *oc)
194 {
195         if (oc == NULL)
196                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
197         else
198                 /* it is already calculated as sizeof struct obd_capa */
199                 ;
200 }
201
202 static int osc_getattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body) {
214                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
215                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
216
217                 /* This should really be sent by the OST */
218                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
219                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
220         } else {
221                 CDEBUG(D_INFO, "can't unpack ost_body\n");
222                 rc = -EPROTO;
223                 aa->aa_oi->oi_oa->o_valid = 0;
224         }
225 out:
226         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
227         RETURN(rc);
228 }
229
230 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
231                              struct ptlrpc_request_set *set)
232 {
233         struct ptlrpc_request *req;
234         struct osc_async_args *aa;
235         int                    rc;
236         ENTRY;
237
238         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
239         if (req == NULL)
240                 RETURN(-ENOMEM);
241
242         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
243         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
244         if (rc) {
245                 ptlrpc_request_free(req);
246                 RETURN(rc);
247         }
248
249         osc_pack_req_body(req, oinfo);
250
251         ptlrpc_request_set_replen(req);
252         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
253
254         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
255         aa = ptlrpc_req_async_args(req);
256         aa->aa_oi = oinfo;
257
258         ptlrpc_set_add_req(set, req);
259         RETURN(0);
260 }
261
262 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
263                        struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298
299         EXIT;
300  out:
301         ptlrpc_req_finished(req);
302         return rc;
303 }
304
305 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306                        struct obd_info *oinfo, struct obd_trans_info *oti)
307 {
308         struct ptlrpc_request *req;
309         struct ost_body       *body;
310         int                    rc;
311         ENTRY;
312
313         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
314
315         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316         if (req == NULL)
317                 RETURN(-ENOMEM);
318
319         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
321         if (rc) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325
326         osc_pack_req_body(req, oinfo);
327
328         ptlrpc_request_set_replen(req);
329
330         rc = ptlrpc_queue_wait(req);
331         if (rc)
332                 GOTO(out, rc);
333
334         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335         if (body == NULL)
336                 GOTO(out, rc = -EPROTO);
337
338         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
339
340         EXIT;
341 out:
342         ptlrpc_req_finished(req);
343         RETURN(rc);
344 }
345
346 static int osc_setattr_interpret(const struct lu_env *env,
347                                  struct ptlrpc_request *req,
348                                  struct osc_setattr_args *sa, int rc)
349 {
350         struct ost_body *body;
351         ENTRY;
352
353         if (rc != 0)
354                 GOTO(out, rc);
355
356         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
357         if (body == NULL)
358                 GOTO(out, rc = -EPROTO);
359
360         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
361 out:
362         rc = sa->sa_upcall(sa->sa_cookie, rc);
363         RETURN(rc);
364 }
365
366 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
367                            struct obd_trans_info *oti,
368                            obd_enqueue_update_f upcall, void *cookie,
369                            struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request   *req;
372         struct osc_setattr_args *sa;
373         int                      rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         /* do mds to ost setattr asynchronously */
395         if (!rqset) {
396                 /* Do not wait for response. */
397                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
398         } else {
399                 req->rq_interpret_reply =
400                         (ptlrpc_interpterer_t)osc_setattr_interpret;
401
402                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
403                 sa = ptlrpc_req_async_args(req);
404                 sa->sa_oa = oinfo->oi_oa;
405                 sa->sa_upcall = upcall;
406                 sa->sa_cookie = cookie;
407
408                 if (rqset == PTLRPCD_SET)
409                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
410                 else
411                         ptlrpc_set_add_req(rqset, req);
412         }
413
414         RETURN(0);
415 }
416
417 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
418                              struct obd_trans_info *oti,
419                              struct ptlrpc_request_set *rqset)
420 {
421         return osc_setattr_async_base(exp, oinfo, oti,
422                                       oinfo->oi_cb_up, oinfo, rqset);
423 }
424
425 int osc_real_create(struct obd_export *exp, struct obdo *oa,
426                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
427 {
428         struct ptlrpc_request *req;
429         struct ost_body       *body;
430         struct lov_stripe_md  *lsm;
431         int                    rc;
432         ENTRY;
433
434         LASSERT(oa);
435         LASSERT(ea);
436
437         lsm = *ea;
438         if (!lsm) {
439                 rc = obd_alloc_memmd(exp, &lsm);
440                 if (rc < 0)
441                         RETURN(rc);
442         }
443
444         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
445         if (req == NULL)
446                 GOTO(out, rc = -ENOMEM);
447
448         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 GOTO(out, rc);
452         }
453
454         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
455         LASSERT(body);
456         lustre_set_wire_obdo(&body->oa, oa);
457
458         ptlrpc_request_set_replen(req);
459
460         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
461             oa->o_flags == OBD_FL_DELORPHAN) {
462                 DEBUG_REQ(D_HA, req,
463                           "delorphan from OST integration");
464                 /* Don't resend the delorphan req */
465                 req->rq_no_resend = req->rq_no_delay = 1;
466         }
467
468         rc = ptlrpc_queue_wait(req);
469         if (rc)
470                 GOTO(out_req, rc);
471
472         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
473         if (body == NULL)
474                 GOTO(out_req, rc = -EPROTO);
475
476         lustre_get_wire_obdo(oa, &body->oa);
477
478         oa->o_blksize = cli_brw_size(exp->exp_obd);
479         oa->o_valid |= OBD_MD_FLBLKSZ;
480
481         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
482          * have valid lsm_oinfo data structs, so don't go touching that.
483          * This needs to be fixed in a big way.
484          */
485         lsm->lsm_oi = oa->o_oi;
486         *ea = lsm;
487
488         if (oti != NULL) {
489                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
490
491                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
492                         if (!oti->oti_logcookies)
493                                 oti_alloc_cookies(oti, 1);
494                         *oti->oti_logcookies = oa->o_lcookie;
495                 }
496         }
497
498         CDEBUG(D_HA, "transno: "LPD64"\n",
499                lustre_msg_get_transno(req->rq_repmsg));
500 out_req:
501         ptlrpc_req_finished(req);
502 out:
503         if (rc && !*ea)
504                 obd_free_memmd(exp, &lsm);
505         RETURN(rc);
506 }
507
508 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
509                    obd_enqueue_update_f upcall, void *cookie,
510                    struct ptlrpc_request_set *rqset)
511 {
512         struct ptlrpc_request   *req;
513         struct osc_setattr_args *sa;
514         struct ost_body         *body;
515         int                      rc;
516         ENTRY;
517
518         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
519         if (req == NULL)
520                 RETURN(-ENOMEM);
521
522         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
523         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
524         if (rc) {
525                 ptlrpc_request_free(req);
526                 RETURN(rc);
527         }
528         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
529         ptlrpc_at_set_req_timeout(req);
530
531         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
532         LASSERT(body);
533         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
534         osc_pack_capa(req, body, oinfo->oi_capa);
535
536         ptlrpc_request_set_replen(req);
537
538         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
539         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
540         sa = ptlrpc_req_async_args(req);
541         sa->sa_oa     = oinfo->oi_oa;
542         sa->sa_upcall = upcall;
543         sa->sa_cookie = cookie;
544         if (rqset == PTLRPCD_SET)
545                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
546         else
547                 ptlrpc_set_add_req(rqset, req);
548
549         RETURN(0);
550 }
551
552 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
553                      struct obd_info *oinfo, struct obd_trans_info *oti,
554                      struct ptlrpc_request_set *rqset)
555 {
556         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
557         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
558         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
559         return osc_punch_base(exp, oinfo,
560                               oinfo->oi_cb_up, oinfo, rqset);
561 }
562
563 static int osc_sync_interpret(const struct lu_env *env,
564                               struct ptlrpc_request *req,
565                               void *arg, int rc)
566 {
567         struct osc_fsync_args *fa = arg;
568         struct ost_body *body;
569         ENTRY;
570
571         if (rc)
572                 GOTO(out, rc);
573
574         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575         if (body == NULL) {
576                 CERROR ("can't unpack ost_body\n");
577                 GOTO(out, rc = -EPROTO);
578         }
579
580         *fa->fa_oi->oi_oa = body->oa;
581 out:
582         rc = fa->fa_upcall(fa->fa_cookie, rc);
583         RETURN(rc);
584 }
585
586 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
587                   obd_enqueue_update_f upcall, void *cookie,
588                   struct ptlrpc_request_set *rqset)
589 {
590         struct ptlrpc_request *req;
591         struct ost_body       *body;
592         struct osc_fsync_args *fa;
593         int                    rc;
594         ENTRY;
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597         if (req == NULL)
598                 RETURN(-ENOMEM);
599
600         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
601         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         /* overload the size and blocks fields in the oa with start/end */
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
611         osc_pack_capa(req, body, oinfo->oi_capa);
612
613         ptlrpc_request_set_replen(req);
614         req->rq_interpret_reply = osc_sync_interpret;
615
616         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
617         fa = ptlrpc_req_async_args(req);
618         fa->fa_oi = oinfo;
619         fa->fa_upcall = upcall;
620         fa->fa_cookie = cookie;
621
622         if (rqset == PTLRPCD_SET)
623                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
624         else
625                 ptlrpc_set_add_req(rqset, req);
626
627         RETURN (0);
628 }
629
630 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
631                     struct obd_info *oinfo, obd_size start, obd_size end,
632                     struct ptlrpc_request_set *set)
633 {
634         ENTRY;
635
636         if (!oinfo->oi_oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         oinfo->oi_oa->o_size = start;
642         oinfo->oi_oa->o_blocks = end;
643         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
644
645         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
646 }
647
648 /* Find and cancel locally locks matched by @mode in the resource found by
649  * @objid. Found locks are added into @cancel list. Returns the amount of
650  * locks added to @cancels list. */
651 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
652                                    cfs_list_t *cancels,
653                                    ldlm_mode_t mode, int lock_flags)
654 {
655         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
656         struct ldlm_res_id res_id;
657         struct ldlm_resource *res;
658         int count;
659         ENTRY;
660
661         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
662          * export) but disabled through procfs (flag in NS).
663          *
664          * This distinguishes from a case when ELC is not supported originally,
665          * when we still want to cancel locks in advance and just cancel them
666          * locally, without sending any RPC. */
667         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
668                 RETURN(0);
669
670         ostid_build_res_name(&oa->o_oi, &res_id);
671         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
672         if (res == NULL)
673                 RETURN(0);
674
675         LDLM_RESOURCE_ADDREF(res);
676         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
677                                            lock_flags, 0, NULL);
678         LDLM_RESOURCE_DELREF(res);
679         ldlm_resource_putref(res);
680         RETURN(count);
681 }
682
683 static int osc_destroy_interpret(const struct lu_env *env,
684                                  struct ptlrpc_request *req, void *data,
685                                  int rc)
686 {
687         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
688
689         cfs_atomic_dec(&cli->cl_destroy_in_flight);
690         cfs_waitq_signal(&cli->cl_destroy_waitq);
691         return 0;
692 }
693
694 static int osc_can_send_destroy(struct client_obd *cli)
695 {
696         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
697             cli->cl_max_rpcs_in_flight) {
698                 /* The destroy request can be sent */
699                 return 1;
700         }
701         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
702             cli->cl_max_rpcs_in_flight) {
703                 /*
704                  * The counter has been modified between the two atomic
705                  * operations.
706                  */
707                 cfs_waitq_signal(&cli->cl_destroy_waitq);
708         }
709         return 0;
710 }
711
712 int osc_create(const struct lu_env *env, struct obd_export *exp,
713                struct obdo *oa, struct lov_stripe_md **ea,
714                struct obd_trans_info *oti)
715 {
716         int rc = 0;
717         ENTRY;
718
719         LASSERT(oa);
720         LASSERT(ea);
721         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
722
723         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
724             oa->o_flags == OBD_FL_RECREATE_OBJS) {
725                 RETURN(osc_real_create(exp, oa, ea, oti));
726         }
727
728         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
729                 RETURN(osc_real_create(exp, oa, ea, oti));
730
731         /* we should not get here anymore */
732         LBUG();
733
734         RETURN(rc);
735 }
736
737 /* Destroy requests can be async always on the client, and we don't even really
738  * care about the return code since the client cannot do anything at all about
739  * a destroy failure.
740  * When the MDS is unlinking a filename, it saves the file objects into a
741  * recovery llog, and these object records are cancelled when the OST reports
742  * they were destroyed and sync'd to disk (i.e. transaction committed).
743  * If the client dies, or the OST is down when the object should be destroyed,
744  * the records are not cancelled, and when the OST reconnects to the MDS next,
745  * it will retrieve the llog unlink logs and then sends the log cancellation
746  * cookies to the MDS after committing destroy transactions. */
747 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
748                        struct obdo *oa, struct lov_stripe_md *ea,
749                        struct obd_trans_info *oti, struct obd_export *md_export,
750                        void *capa)
751 {
752         struct client_obd     *cli = &exp->exp_obd->u.cli;
753         struct ptlrpc_request *req;
754         struct ost_body       *body;
755         CFS_LIST_HEAD(cancels);
756         int rc, count;
757         ENTRY;
758
759         if (!oa) {
760                 CDEBUG(D_INFO, "oa NULL\n");
761                 RETURN(-EINVAL);
762         }
763
764         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
765                                         LDLM_FL_DISCARD_DATA);
766
767         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
768         if (req == NULL) {
769                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
770                 RETURN(-ENOMEM);
771         }
772
773         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
774         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
775                                0, &cancels, count);
776         if (rc) {
777                 ptlrpc_request_free(req);
778                 RETURN(rc);
779         }
780
781         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
782         ptlrpc_at_set_req_timeout(req);
783
784         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
785                 oa->o_lcookie = *oti->oti_logcookies;
786         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
787         LASSERT(body);
788         lustre_set_wire_obdo(&body->oa, oa);
789
790         osc_pack_capa(req, body, (struct obd_capa *)capa);
791         ptlrpc_request_set_replen(req);
792
793         /* If osc_destory is for destroying the unlink orphan,
794          * sent from MDT to OST, which should not be blocked here,
795          * because the process might be triggered by ptlrpcd, and
796          * it is not good to block ptlrpcd thread (b=16006)*/
797         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
798                 req->rq_interpret_reply = osc_destroy_interpret;
799                 if (!osc_can_send_destroy(cli)) {
800                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
801                                                           NULL);
802
803                         /*
804                          * Wait until the number of on-going destroy RPCs drops
805                          * under max_rpc_in_flight
806                          */
807                         l_wait_event_exclusive(cli->cl_destroy_waitq,
808                                                osc_can_send_destroy(cli), &lwi);
809                 }
810         }
811
812         /* Do not wait for response */
813         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
814         RETURN(0);
815 }
816
817 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
818                                 long writing_bytes)
819 {
820         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
821
822         LASSERT(!(oa->o_valid & bits));
823
824         oa->o_valid |= bits;
825         client_obd_list_lock(&cli->cl_loi_list_lock);
826         oa->o_dirty = cli->cl_dirty;
827         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
828                      cli->cl_dirty_max)) {
829                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
830                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
831                 oa->o_undirty = 0;
832         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
833                             cfs_atomic_read(&obd_dirty_transit_pages) >
834                             (long)(obd_max_dirty_pages + 1))) {
835                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
836                  * not covered by a lock thus they may safely race and trip
837                  * this CERROR() unless we add in a small fudge factor (+1). */
838                 CERROR("dirty %d - %d > system dirty_max %d\n",
839                        cfs_atomic_read(&obd_dirty_pages),
840                        cfs_atomic_read(&obd_dirty_transit_pages),
841                        obd_max_dirty_pages);
842                 oa->o_undirty = 0;
843         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
844                 CERROR("dirty %lu - dirty_max %lu too big???\n",
845                        cli->cl_dirty, cli->cl_dirty_max);
846                 oa->o_undirty = 0;
847         } else {
848                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
849                                       CFS_PAGE_SHIFT)*
850                                      (cli->cl_max_rpcs_in_flight + 1);
851                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
852         }
853         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
854         oa->o_dropped = cli->cl_lost_grant;
855         cli->cl_lost_grant = 0;
856         client_obd_list_unlock(&cli->cl_loi_list_lock);
857         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
858                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
859
860 }
861
862 void osc_update_next_shrink(struct client_obd *cli)
863 {
864         cli->cl_next_shrink_grant =
865                 cfs_time_shift(cli->cl_grant_shrink_interval);
866         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
867                cli->cl_next_shrink_grant);
868 }
869
870 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
871 {
872         client_obd_list_lock(&cli->cl_loi_list_lock);
873         cli->cl_avail_grant += grant;
874         client_obd_list_unlock(&cli->cl_loi_list_lock);
875 }
876
877 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
878 {
879         if (body->oa.o_valid & OBD_MD_FLGRANT) {
880                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
881                 __osc_update_grant(cli, body->oa.o_grant);
882         }
883 }
884
885 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
886                               obd_count keylen, void *key, obd_count vallen,
887                               void *val, struct ptlrpc_request_set *set);
888
889 static int osc_shrink_grant_interpret(const struct lu_env *env,
890                                       struct ptlrpc_request *req,
891                                       void *aa, int rc)
892 {
893         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
894         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
895         struct ost_body *body;
896
897         if (rc != 0) {
898                 __osc_update_grant(cli, oa->o_grant);
899                 GOTO(out, rc);
900         }
901
902         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
903         LASSERT(body);
904         osc_update_grant(cli, body);
905 out:
906         OBDO_FREE(oa);
907         return rc;
908 }
909
910 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
911 {
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         oa->o_grant = cli->cl_avail_grant / 4;
914         cli->cl_avail_grant -= oa->o_grant;
915         client_obd_list_unlock(&cli->cl_loi_list_lock);
916         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
917                 oa->o_valid |= OBD_MD_FLFLAGS;
918                 oa->o_flags = 0;
919         }
920         oa->o_flags |= OBD_FL_SHRINK_GRANT;
921         osc_update_next_shrink(cli);
922 }
923
924 /* Shrink the current grant, either from some large amount to enough for a
925  * full set of in-flight RPCs, or if we have already shrunk to that limit
926  * then to enough for a single RPC.  This avoids keeping more grant than
927  * needed, and avoids shrinking the grant piecemeal. */
928 static int osc_shrink_grant(struct client_obd *cli)
929 {
930         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
931                              (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
932
933         client_obd_list_lock(&cli->cl_loi_list_lock);
934         if (cli->cl_avail_grant <= target_bytes)
935                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
936         client_obd_list_unlock(&cli->cl_loi_list_lock);
937
938         return osc_shrink_grant_to_target(cli, target_bytes);
939 }
940
941 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
942 {
943         int                     rc = 0;
944         struct ost_body        *body;
945         ENTRY;
946
947         client_obd_list_lock(&cli->cl_loi_list_lock);
948         /* Don't shrink if we are already above or below the desired limit
949          * We don't want to shrink below a single RPC, as that will negatively
950          * impact block allocation and long-term performance. */
951         if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
952                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
953
954         if (target_bytes >= cli->cl_avail_grant) {
955                 client_obd_list_unlock(&cli->cl_loi_list_lock);
956                 RETURN(0);
957         }
958         client_obd_list_unlock(&cli->cl_loi_list_lock);
959
960         OBD_ALLOC_PTR(body);
961         if (!body)
962                 RETURN(-ENOMEM);
963
964         osc_announce_cached(cli, &body->oa, 0);
965
966         client_obd_list_lock(&cli->cl_loi_list_lock);
967         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
968         cli->cl_avail_grant = target_bytes;
969         client_obd_list_unlock(&cli->cl_loi_list_lock);
970         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
971                 body->oa.o_valid |= OBD_MD_FLFLAGS;
972                 body->oa.o_flags = 0;
973         }
974         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
975         osc_update_next_shrink(cli);
976
977         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
978                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
979                                 sizeof(*body), body, NULL);
980         if (rc != 0)
981                 __osc_update_grant(cli, body->oa.o_grant);
982         OBD_FREE_PTR(body);
983         RETURN(rc);
984 }
985
986 static int osc_should_shrink_grant(struct client_obd *client)
987 {
988         cfs_time_t time = cfs_time_current();
989         cfs_time_t next_shrink = client->cl_next_shrink_grant;
990
991         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
992              OBD_CONNECT_GRANT_SHRINK) == 0)
993                 return 0;
994
995         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
996                 /* Get the current RPC size directly, instead of going via:
997                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
998                  * Keep comment here so that it can be found by searching. */
999                 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1000
1001                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1002                     client->cl_avail_grant > brw_size)
1003                         return 1;
1004                 else
1005                         osc_update_next_shrink(client);
1006         }
1007         return 0;
1008 }
1009
1010 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1011 {
1012         struct client_obd *client;
1013
1014         cfs_list_for_each_entry(client, &item->ti_obd_list,
1015                                 cl_grant_shrink_list) {
1016                 if (osc_should_shrink_grant(client))
1017                         osc_shrink_grant(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_add_shrink_grant(struct client_obd *client)
1023 {
1024         int rc;
1025
1026         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1027                                        TIMEOUT_GRANT,
1028                                        osc_grant_shrink_grant_cb, NULL,
1029                                        &client->cl_grant_shrink_list);
1030         if (rc) {
1031                 CERROR("add grant client %s error %d\n",
1032                         client->cl_import->imp_obd->obd_name, rc);
1033                 return rc;
1034         }
1035         CDEBUG(D_CACHE, "add grant client %s \n",
1036                client->cl_import->imp_obd->obd_name);
1037         osc_update_next_shrink(client);
1038         return 0;
1039 }
1040
1041 static int osc_del_shrink_grant(struct client_obd *client)
1042 {
1043         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1044                                          TIMEOUT_GRANT);
1045 }
1046
1047 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1048 {
1049         /*
1050          * ocd_grant is the total grant amount we're expect to hold: if we've
1051          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1052          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1053          *
1054          * race is tolerable here: if we're evicted, but imp_state already
1055          * left EVICTED state, then cl_dirty must be 0 already.
1056          */
1057         client_obd_list_lock(&cli->cl_loi_list_lock);
1058         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1059                 cli->cl_avail_grant = ocd->ocd_grant;
1060         else
1061                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1062
1063         if (cli->cl_avail_grant < 0) {
1064                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1065                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1066                       ocd->ocd_grant, cli->cl_dirty);
1067                 /* workaround for servers which do not have the patch from
1068                  * LU-2679 */
1069                 cli->cl_avail_grant = ocd->ocd_grant;
1070         }
1071
1072         /* determine the appropriate chunk size used by osc_extent. */
1073         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1074         client_obd_list_unlock(&cli->cl_loi_list_lock);
1075
1076         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1077                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1078                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1079
1080         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1081             cfs_list_empty(&cli->cl_grant_shrink_list))
1082                 osc_add_shrink_grant(cli);
1083 }
1084
1085 /* We assume that the reason this OSC got a short read is because it read
1086  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088  * this stripe never got written at or beyond this stripe offset yet. */
1089 static void handle_short_read(int nob_read, obd_count page_count,
1090                               struct brw_page **pga)
1091 {
1092         char *ptr;
1093         int i = 0;
1094
1095         /* skip bytes read OK */
1096         while (nob_read > 0) {
1097                 LASSERT (page_count > 0);
1098
1099                 if (pga[i]->count > nob_read) {
1100                         /* EOF inside this page */
1101                         ptr = cfs_kmap(pga[i]->pg) +
1102                                 (pga[i]->off & ~CFS_PAGE_MASK);
1103                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1104                         cfs_kunmap(pga[i]->pg);
1105                         page_count--;
1106                         i++;
1107                         break;
1108                 }
1109
1110                 nob_read -= pga[i]->count;
1111                 page_count--;
1112                 i++;
1113         }
1114
1115         /* zero remaining pages */
1116         while (page_count-- > 0) {
1117                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1118                 memset(ptr, 0, pga[i]->count);
1119                 cfs_kunmap(pga[i]->pg);
1120                 i++;
1121         }
1122 }
1123
1124 static int check_write_rcs(struct ptlrpc_request *req,
1125                            int requested_nob, int niocount,
1126                            obd_count page_count, struct brw_page **pga)
1127 {
1128         int     i;
1129         __u32   *remote_rcs;
1130
1131         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132                                                   sizeof(*remote_rcs) *
1133                                                   niocount);
1134         if (remote_rcs == NULL) {
1135                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1136                 return(-EPROTO);
1137         }
1138
1139         /* return error if any niobuf was in error */
1140         for (i = 0; i < niocount; i++) {
1141                 if ((int)remote_rcs[i] < 0)
1142                         return(remote_rcs[i]);
1143
1144                 if (remote_rcs[i] != 0) {
1145                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1146                                 i, remote_rcs[i], req);
1147                         return(-EPROTO);
1148                 }
1149         }
1150
1151         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1152                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1153                        req->rq_bulk->bd_nob_transferred, requested_nob);
1154                 return(-EPROTO);
1155         }
1156
1157         return (0);
1158 }
1159
1160 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1161 {
1162         if (p1->flag != p2->flag) {
1163                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1164                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at http://bugs.whamcloud.com/\n",
1171                               p1->flag, p2->flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->off + p1->count == p2->off);
1177 }
1178
1179 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1180                                    struct brw_page **pga, int opc,
1181                                    cksum_type_t cksum_type)
1182 {
1183         __u32                           cksum;
1184         int                             i = 0;
1185         struct cfs_crypto_hash_desc     *hdesc;
1186         unsigned int                    bufsize;
1187         int                             err;
1188         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1189
1190         LASSERT(pg_count > 0);
1191
1192         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1193         if (IS_ERR(hdesc)) {
1194                 CERROR("Unable to initialize checksum hash %s\n",
1195                        cfs_crypto_hash_name(cfs_alg));
1196                 return PTR_ERR(hdesc);
1197         }
1198
1199         while (nob > 0 && pg_count > 0) {
1200                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1201
1202                 /* corrupt the data before we compute the checksum, to
1203                  * simulate an OST->client data error */
1204                 if (i == 0 && opc == OST_READ &&
1205                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1206                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1207                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1208                         memcpy(ptr + off, "bad1", min(4, nob));
1209                         cfs_kunmap(pga[i]->pg);
1210                 }
1211                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1212                                   pga[i]->off & ~CFS_PAGE_MASK,
1213                                   count);
1214                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1215                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1216
1217                 nob -= pga[i]->count;
1218                 pg_count--;
1219                 i++;
1220         }
1221
1222         bufsize = 4;
1223         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1224
1225         if (err)
1226                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1227
1228         /* For sending we only compute the wrong checksum instead
1229          * of corrupting the data so it is still correct on a redo */
1230         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231                 cksum++;
1232
1233         return cksum;
1234 }
1235
1236 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1237                                 struct lov_stripe_md *lsm, obd_count page_count,
1238                                 struct brw_page **pga,
1239                                 struct ptlrpc_request **reqp,
1240                                 struct obd_capa *ocapa, int reserve,
1241                                 int resend)
1242 {
1243         struct ptlrpc_request   *req;
1244         struct ptlrpc_bulk_desc *desc;
1245         struct ost_body         *body;
1246         struct obd_ioobj        *ioobj;
1247         struct niobuf_remote    *niobuf;
1248         int niocount, i, requested_nob, opc, rc;
1249         struct osc_brw_async_args *aa;
1250         struct req_capsule      *pill;
1251         struct brw_page *pg_prev;
1252
1253         ENTRY;
1254         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1255                 RETURN(-ENOMEM); /* Recoverable */
1256         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1257                 RETURN(-EINVAL); /* Fatal */
1258
1259         if ((cmd & OBD_BRW_WRITE) != 0) {
1260                 opc = OST_WRITE;
1261                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1262                                                 cli->cl_import->imp_rq_pool,
1263                                                 &RQF_OST_BRW_WRITE);
1264         } else {
1265                 opc = OST_READ;
1266                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1267         }
1268         if (req == NULL)
1269                 RETURN(-ENOMEM);
1270
1271         for (niocount = i = 1; i < page_count; i++) {
1272                 if (!can_merge_pages(pga[i - 1], pga[i]))
1273                         niocount++;
1274         }
1275
1276         pill = &req->rq_pill;
1277         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1278                              sizeof(*ioobj));
1279         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1280                              niocount * sizeof(*niobuf));
1281         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1282
1283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1284         if (rc) {
1285                 ptlrpc_request_free(req);
1286                 RETURN(rc);
1287         }
1288         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1289         ptlrpc_at_set_req_timeout(req);
1290         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1291          * retry logic */
1292         req->rq_no_retry_einprogress = 1;
1293
1294         desc = ptlrpc_prep_bulk_imp(req, page_count,
1295                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1296                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1297                 OST_BULK_PORTAL);
1298
1299         if (desc == NULL)
1300                 GOTO(out, rc = -ENOMEM);
1301         /* NB request now owns desc and will free it when it gets freed */
1302
1303         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1304         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1305         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1307
1308         lustre_set_wire_obdo(&body->oa, oa);
1309
1310         obdo_to_ioobj(oa, ioobj);
1311         ioobj->ioo_bufcnt = niocount;
1312         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1313          * that might be send for this request.  The actual number is decided
1314          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1315          * "max - 1" for old client compatibility sending "0", and also so the
1316          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1317         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1318         osc_pack_capa(req, body, ocapa);
1319         LASSERT(page_count > 0);
1320         pg_prev = pga[0];
1321         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1322                 struct brw_page *pg = pga[i];
1323                 int poff = pg->off & ~CFS_PAGE_MASK;
1324
1325                 LASSERT(pg->count > 0);
1326                 /* make sure there is no gap in the middle of page array */
1327                 LASSERTF(page_count == 1 ||
1328                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1329                           ergo(i > 0 && i < page_count - 1,
1330                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1331                           ergo(i == page_count - 1, poff == 0)),
1332                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1333                          i, page_count, pg, pg->off, pg->count);
1334 #ifdef __linux__
1335                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1336                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1337                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1338                          i, page_count,
1339                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1340                          pg_prev->pg, page_private(pg_prev->pg),
1341                          pg_prev->pg->index, pg_prev->off);
1342 #else
1343                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1344                          "i %d p_c %u\n", i, page_count);
1345 #endif
1346                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1347                         (pg->flag & OBD_BRW_SRVLOCK));
1348
1349                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1350                 requested_nob += pg->count;
1351
1352                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1353                         niobuf--;
1354                         niobuf->len += pg->count;
1355                 } else {
1356                         niobuf->offset = pg->off;
1357                         niobuf->len    = pg->count;
1358                         niobuf->flags  = pg->flag;
1359                 }
1360                 pg_prev = pg;
1361         }
1362
1363         LASSERTF((void *)(niobuf - niocount) ==
1364                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1365                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1366                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1367
1368         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1369         if (resend) {
1370                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1372                         body->oa.o_flags = 0;
1373                 }
1374                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1375         }
1376
1377         if (osc_should_shrink_grant(cli))
1378                 osc_shrink_grant_local(cli, &body->oa);
1379
1380         /* size[REQ_REC_OFF] still sizeof (*body) */
1381         if (opc == OST_WRITE) {
1382                 if (cli->cl_checksum &&
1383                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1384                         /* store cl_cksum_type in a local variable since
1385                          * it can be changed via lprocfs */
1386                         cksum_type_t cksum_type = cli->cl_cksum_type;
1387
1388                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1389                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1390                                 body->oa.o_flags = 0;
1391                         }
1392                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1393                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1395                                                              page_count, pga,
1396                                                              OST_WRITE,
1397                                                              cksum_type);
1398                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1399                                body->oa.o_cksum);
1400                         /* save this in 'oa', too, for later checking */
1401                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1402                         oa->o_flags |= cksum_type_pack(cksum_type);
1403                 } else {
1404                         /* clear out the checksum flag, in case this is a
1405                          * resend but cl_checksum is no longer set. b=11238 */
1406                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1407                 }
1408                 oa->o_cksum = body->oa.o_cksum;
1409                 /* 1 RC per niobuf */
1410                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1411                                      sizeof(__u32) * niocount);
1412         } else {
1413                 if (cli->cl_checksum &&
1414                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1415                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1416                                 body->oa.o_flags = 0;
1417                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1418                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1419                 }
1420         }
1421         ptlrpc_request_set_replen(req);
1422
1423         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1424         aa = ptlrpc_req_async_args(req);
1425         aa->aa_oa = oa;
1426         aa->aa_requested_nob = requested_nob;
1427         aa->aa_nio_count = niocount;
1428         aa->aa_page_count = page_count;
1429         aa->aa_resends = 0;
1430         aa->aa_ppga = pga;
1431         aa->aa_cli = cli;
1432         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1433         if (ocapa && reserve)
1434                 aa->aa_ocapa = capa_get(ocapa);
1435
1436         *reqp = req;
1437         RETURN(0);
1438
1439  out:
1440         ptlrpc_req_finished(req);
1441         RETURN(rc);
1442 }
1443
1444 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1445                                 __u32 client_cksum, __u32 server_cksum, int nob,
1446                                 obd_count page_count, struct brw_page **pga,
1447                                 cksum_type_t client_cksum_type)
1448 {
1449         __u32 new_cksum;
1450         char *msg;
1451         cksum_type_t cksum_type;
1452
1453         if (server_cksum == client_cksum) {
1454                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1455                 return 0;
1456         }
1457
1458         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1459                                        oa->o_flags : 0);
1460         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1461                                       cksum_type);
1462
1463         if (cksum_type != client_cksum_type)
1464                 msg = "the server did not use the checksum type specified in "
1465                       "the original request - likely a protocol problem";
1466         else if (new_cksum == server_cksum)
1467                 msg = "changed on the client after we checksummed it - "
1468                       "likely false positive due to mmap IO (bug 11742)";
1469         else if (new_cksum == client_cksum)
1470                 msg = "changed in transit before arrival at OST";
1471         else
1472                 msg = "changed in transit AND doesn't match the original - "
1473                       "likely false positive due to mmap IO (bug 11742)";
1474
1475         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1476                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1477                            msg, libcfs_nid2str(peer->nid),
1478                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1479                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1480                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1481                            POSTID(&oa->o_oi), pga[0]->off,
1482                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1483         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1484                "client csum now %x\n", client_cksum, client_cksum_type,
1485                server_cksum, cksum_type, new_cksum);
1486         return 1;
1487 }
1488
1489 /* Note rc enters this function as number of bytes transferred */
1490 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1491 {
1492         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1493         const lnet_process_id_t *peer =
1494                         &req->rq_import->imp_connection->c_peer;
1495         struct client_obd *cli = aa->aa_cli;
1496         struct ost_body *body;
1497         __u32 client_cksum = 0;
1498         ENTRY;
1499
1500         if (rc < 0 && rc != -EDQUOT) {
1501                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1502                 RETURN(rc);
1503         }
1504
1505         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1506         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1507         if (body == NULL) {
1508                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1509                 RETURN(-EPROTO);
1510         }
1511
1512         /* set/clear over quota flag for a uid/gid */
1513         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1514             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1515                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1516
1517                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1518                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1519                        body->oa.o_flags);
1520                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1521         }
1522
1523         osc_update_grant(cli, body);
1524
1525         if (rc < 0)
1526                 RETURN(rc);
1527
1528         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1529                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1530
1531         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1532                 if (rc > 0) {
1533                         CERROR("Unexpected +ve rc %d\n", rc);
1534                         RETURN(-EPROTO);
1535                 }
1536                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1537
1538                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1539                         RETURN(-EAGAIN);
1540
1541                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1542                     check_write_checksum(&body->oa, peer, client_cksum,
1543                                          body->oa.o_cksum, aa->aa_requested_nob,
1544                                          aa->aa_page_count, aa->aa_ppga,
1545                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1546                         RETURN(-EAGAIN);
1547
1548                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1549                                      aa->aa_page_count, aa->aa_ppga);
1550                 GOTO(out, rc);
1551         }
1552
1553         /* The rest of this function executes only for OST_READs */
1554
1555         /* if unwrap_bulk failed, return -EAGAIN to retry */
1556         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1557         if (rc < 0)
1558                 GOTO(out, rc = -EAGAIN);
1559
1560         if (rc > aa->aa_requested_nob) {
1561                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1562                        aa->aa_requested_nob);
1563                 RETURN(-EPROTO);
1564         }
1565
1566         if (rc != req->rq_bulk->bd_nob_transferred) {
1567                 CERROR ("Unexpected rc %d (%d transferred)\n",
1568                         rc, req->rq_bulk->bd_nob_transferred);
1569                 return (-EPROTO);
1570         }
1571
1572         if (rc < aa->aa_requested_nob)
1573                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1574
1575         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1576                 static int cksum_counter;
1577                 __u32      server_cksum = body->oa.o_cksum;
1578                 char      *via;
1579                 char      *router;
1580                 cksum_type_t cksum_type;
1581
1582                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1583                                                body->oa.o_flags : 0);
1584                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1585                                                  aa->aa_ppga, OST_READ,
1586                                                  cksum_type);
1587
1588                 if (peer->nid == req->rq_bulk->bd_sender) {
1589                         via = router = "";
1590                 } else {
1591                         via = " via ";
1592                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1593                 }
1594
1595                 if (server_cksum == ~0 && rc > 0) {
1596                         CERROR("Protocol error: server %s set the 'checksum' "
1597                                "bit, but didn't send a checksum.  Not fatal, "
1598                                "but please notify on http://bugs.whamcloud.com/\n",
1599                                libcfs_nid2str(peer->nid));
1600                 } else if (server_cksum != client_cksum) {
1601                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1602                                            "%s%s%s inode "DFID" object "DOSTID
1603                                            " extent ["LPU64"-"LPU64"]\n",
1604                                            req->rq_import->imp_obd->obd_name,
1605                                            libcfs_nid2str(peer->nid),
1606                                            via, router,
1607                                            body->oa.o_valid & OBD_MD_FLFID ?
1608                                                 body->oa.o_parent_seq : (__u64)0,
1609                                            body->oa.o_valid & OBD_MD_FLFID ?
1610                                                 body->oa.o_parent_oid : 0,
1611                                            body->oa.o_valid & OBD_MD_FLFID ?
1612                                                 body->oa.o_parent_ver : 0,
1613                                            POSTID(&body->oa.o_oi),
1614                                            aa->aa_ppga[0]->off,
1615                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1616                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1617                                                                         1);
1618                         CERROR("client %x, server %x, cksum_type %x\n",
1619                                client_cksum, server_cksum, cksum_type);
1620                         cksum_counter = 0;
1621                         aa->aa_oa->o_cksum = client_cksum;
1622                         rc = -EAGAIN;
1623                 } else {
1624                         cksum_counter++;
1625                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1626                         rc = 0;
1627                 }
1628         } else if (unlikely(client_cksum)) {
1629                 static int cksum_missed;
1630
1631                 cksum_missed++;
1632                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1633                         CERROR("Checksum %u requested from %s but not sent\n",
1634                                cksum_missed, libcfs_nid2str(peer->nid));
1635         } else {
1636                 rc = 0;
1637         }
1638 out:
1639         if (rc >= 0)
1640                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1641
1642         RETURN(rc);
1643 }
1644
1645 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1646                             struct lov_stripe_md *lsm,
1647                             obd_count page_count, struct brw_page **pga,
1648                             struct obd_capa *ocapa)
1649 {
1650         struct ptlrpc_request *req;
1651         int                    rc;
1652         cfs_waitq_t            waitq;
1653         int                    generation, resends = 0;
1654         struct l_wait_info     lwi;
1655
1656         ENTRY;
1657
1658         cfs_waitq_init(&waitq);
1659         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1660
1661 restart_bulk:
1662         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1663                                   page_count, pga, &req, ocapa, 0, resends);
1664         if (rc != 0)
1665                 return (rc);
1666
1667         if (resends) {
1668                 req->rq_generation_set = 1;
1669                 req->rq_import_generation = generation;
1670                 req->rq_sent = cfs_time_current_sec() + resends;
1671         }
1672
1673         rc = ptlrpc_queue_wait(req);
1674
1675         if (rc == -ETIMEDOUT && req->rq_resend) {
1676                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1677                 ptlrpc_req_finished(req);
1678                 goto restart_bulk;
1679         }
1680
1681         rc = osc_brw_fini_request(req, rc);
1682
1683         ptlrpc_req_finished(req);
1684         /* When server return -EINPROGRESS, client should always retry
1685          * regardless of the number of times the bulk was resent already.*/
1686         if (osc_recoverable_error(rc)) {
1687                 resends++;
1688                 if (rc != -EINPROGRESS &&
1689                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1690                         CERROR("%s: too many resend retries for object: "
1691                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1692                                POSTID(&oa->o_oi), rc);
1693                         goto out;
1694                 }
1695                 if (generation !=
1696                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1697                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1698                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1699                                POSTID(&oa->o_oi), rc);
1700                         goto out;
1701                 }
1702
1703                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1704                                        NULL);
1705                 l_wait_event(waitq, 0, &lwi);
1706
1707                 goto restart_bulk;
1708         }
1709 out:
1710         if (rc == -EAGAIN || rc == -EINPROGRESS)
1711                 rc = -EIO;
1712         RETURN (rc);
1713 }
1714
1715 static int osc_brw_redo_request(struct ptlrpc_request *request,
1716                                 struct osc_brw_async_args *aa, int rc)
1717 {
1718         struct ptlrpc_request *new_req;
1719         struct osc_brw_async_args *new_aa;
1720         struct osc_async_page *oap;
1721         ENTRY;
1722
1723         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1724                   "redo for recoverable error %d", rc);
1725
1726         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1727                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1728                                   aa->aa_cli, aa->aa_oa,
1729                                   NULL /* lsm unused by osc currently */,
1730                                   aa->aa_page_count, aa->aa_ppga,
1731                                   &new_req, aa->aa_ocapa, 0, 1);
1732         if (rc)
1733                 RETURN(rc);
1734
1735         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1736                 if (oap->oap_request != NULL) {
1737                         LASSERTF(request == oap->oap_request,
1738                                  "request %p != oap_request %p\n",
1739                                  request, oap->oap_request);
1740                         if (oap->oap_interrupted) {
1741                                 ptlrpc_req_finished(new_req);
1742                                 RETURN(-EINTR);
1743                         }
1744                 }
1745         }
1746         /* New request takes over pga and oaps from old request.
1747          * Note that copying a list_head doesn't work, need to move it... */
1748         aa->aa_resends++;
1749         new_req->rq_interpret_reply = request->rq_interpret_reply;
1750         new_req->rq_async_args = request->rq_async_args;
1751         /* cap resend delay to the current request timeout, this is similar to
1752          * what ptlrpc does (see after_reply()) */
1753         if (aa->aa_resends > new_req->rq_timeout)
1754                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1755         else
1756                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1757         new_req->rq_generation_set = 1;
1758         new_req->rq_import_generation = request->rq_import_generation;
1759
1760         new_aa = ptlrpc_req_async_args(new_req);
1761
1762         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1763         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1764         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1765         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1766         new_aa->aa_resends = aa->aa_resends;
1767
1768         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1769                 if (oap->oap_request) {
1770                         ptlrpc_req_finished(oap->oap_request);
1771                         oap->oap_request = ptlrpc_request_addref(new_req);
1772                 }
1773         }
1774
1775         new_aa->aa_ocapa = aa->aa_ocapa;
1776         aa->aa_ocapa = NULL;
1777
1778         /* XXX: This code will run into problem if we're going to support
1779          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1780          * and wait for all of them to be finished. We should inherit request
1781          * set from old request. */
1782         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1783
1784         DEBUG_REQ(D_INFO, new_req, "new request");
1785         RETURN(0);
1786 }
1787
1788 /*
1789  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1790  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1791  * fine for our small page arrays and doesn't require allocation.  its an
1792  * insertion sort that swaps elements that are strides apart, shrinking the
1793  * stride down until its '1' and the array is sorted.
1794  */
1795 static void sort_brw_pages(struct brw_page **array, int num)
1796 {
1797         int stride, i, j;
1798         struct brw_page *tmp;
1799
1800         if (num == 1)
1801                 return;
1802         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1803                 ;
1804
1805         do {
1806                 stride /= 3;
1807                 for (i = stride ; i < num ; i++) {
1808                         tmp = array[i];
1809                         j = i;
1810                         while (j >= stride && array[j - stride]->off > tmp->off) {
1811                                 array[j] = array[j - stride];
1812                                 j -= stride;
1813                         }
1814                         array[j] = tmp;
1815                 }
1816         } while (stride > 1);
1817 }
1818
1819 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1820 {
1821         int count = 1;
1822         int offset;
1823         int i = 0;
1824
1825         LASSERT (pages > 0);
1826         offset = pg[i]->off & ~CFS_PAGE_MASK;
1827
1828         for (;;) {
1829                 pages--;
1830                 if (pages == 0)         /* that's all */
1831                         return count;
1832
1833                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1834                         return count;   /* doesn't end on page boundary */
1835
1836                 i++;
1837                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1838                 if (offset != 0)        /* doesn't start on page boundary */
1839                         return count;
1840
1841                 count++;
1842         }
1843 }
1844
1845 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1846 {
1847         struct brw_page **ppga;
1848         int i;
1849
1850         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1851         if (ppga == NULL)
1852                 return NULL;
1853
1854         for (i = 0; i < count; i++)
1855                 ppga[i] = pga + i;
1856         return ppga;
1857 }
1858
1859 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1860 {
1861         LASSERT(ppga != NULL);
1862         OBD_FREE(ppga, sizeof(*ppga) * count);
1863 }
1864
1865 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1866                    obd_count page_count, struct brw_page *pga,
1867                    struct obd_trans_info *oti)
1868 {
1869         struct obdo *saved_oa = NULL;
1870         struct brw_page **ppga, **orig;
1871         struct obd_import *imp = class_exp2cliimp(exp);
1872         struct client_obd *cli;
1873         int rc, page_count_orig;
1874         ENTRY;
1875
1876         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1877         cli = &imp->imp_obd->u.cli;
1878
1879         if (cmd & OBD_BRW_CHECK) {
1880                 /* The caller just wants to know if there's a chance that this
1881                  * I/O can succeed */
1882
1883                 if (imp->imp_invalid)
1884                         RETURN(-EIO);
1885                 RETURN(0);
1886         }
1887
1888         /* test_brw with a failed create can trip this, maybe others. */
1889         LASSERT(cli->cl_max_pages_per_rpc);
1890
1891         rc = 0;
1892
1893         orig = ppga = osc_build_ppga(pga, page_count);
1894         if (ppga == NULL)
1895                 RETURN(-ENOMEM);
1896         page_count_orig = page_count;
1897
1898         sort_brw_pages(ppga, page_count);
1899         while (page_count) {
1900                 obd_count pages_per_brw;
1901
1902                 if (page_count > cli->cl_max_pages_per_rpc)
1903                         pages_per_brw = cli->cl_max_pages_per_rpc;
1904                 else
1905                         pages_per_brw = page_count;
1906
1907                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1908
1909                 if (saved_oa != NULL) {
1910                         /* restore previously saved oa */
1911                         *oinfo->oi_oa = *saved_oa;
1912                 } else if (page_count > pages_per_brw) {
1913                         /* save a copy of oa (brw will clobber it) */
1914                         OBDO_ALLOC(saved_oa);
1915                         if (saved_oa == NULL)
1916                                 GOTO(out, rc = -ENOMEM);
1917                         *saved_oa = *oinfo->oi_oa;
1918                 }
1919
1920                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1921                                       pages_per_brw, ppga, oinfo->oi_capa);
1922
1923                 if (rc != 0)
1924                         break;
1925
1926                 page_count -= pages_per_brw;
1927                 ppga += pages_per_brw;
1928         }
1929
1930 out:
1931         osc_release_ppga(orig, page_count_orig);
1932
1933         if (saved_oa != NULL)
1934                 OBDO_FREE(saved_oa);
1935
1936         RETURN(rc);
1937 }
1938
1939 static int brw_interpret(const struct lu_env *env,
1940                          struct ptlrpc_request *req, void *data, int rc)
1941 {
1942         struct osc_brw_async_args *aa = data;
1943         struct osc_extent *ext;
1944         struct osc_extent *tmp;
1945         struct cl_object  *obj = NULL;
1946         struct client_obd *cli = aa->aa_cli;
1947         ENTRY;
1948
1949         rc = osc_brw_fini_request(req, rc);
1950         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1951         /* When server return -EINPROGRESS, client should always retry
1952          * regardless of the number of times the bulk was resent already. */
1953         if (osc_recoverable_error(rc)) {
1954                 if (req->rq_import_generation !=
1955                     req->rq_import->imp_generation) {
1956                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1957                                ""DOSTID", rc = %d.\n",
1958                                req->rq_import->imp_obd->obd_name,
1959                                POSTID(&aa->aa_oa->o_oi), rc);
1960                 } else if (rc == -EINPROGRESS ||
1961                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1962                         rc = osc_brw_redo_request(req, aa, rc);
1963                 } else {
1964                         CERROR("%s: too many resent retries for object: "
1965                                ""LPU64":"LPU64", rc = %d.\n",
1966                                req->rq_import->imp_obd->obd_name,
1967                                POSTID(&aa->aa_oa->o_oi), rc);
1968                 }
1969
1970                 if (rc == 0)
1971                         RETURN(0);
1972                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1973                         rc = -EIO;
1974         }
1975
1976         if (aa->aa_ocapa) {
1977                 capa_put(aa->aa_ocapa);
1978                 aa->aa_ocapa = NULL;
1979         }
1980
1981         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1982                 if (obj == NULL && rc == 0) {
1983                         obj = osc2cl(ext->oe_obj);
1984                         cl_object_get(obj);
1985                 }
1986
1987                 cfs_list_del_init(&ext->oe_link);
1988                 osc_extent_finish(env, ext, 1, rc);
1989         }
1990         LASSERT(cfs_list_empty(&aa->aa_exts));
1991         LASSERT(cfs_list_empty(&aa->aa_oaps));
1992
1993         if (obj != NULL) {
1994                 struct obdo *oa = aa->aa_oa;
1995                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1996                 unsigned long valid = 0;
1997
1998                 LASSERT(rc == 0);
1999                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2000                         attr->cat_blocks = oa->o_blocks;
2001                         valid |= CAT_BLOCKS;
2002                 }
2003                 if (oa->o_valid & OBD_MD_FLMTIME) {
2004                         attr->cat_mtime = oa->o_mtime;
2005                         valid |= CAT_MTIME;
2006                 }
2007                 if (oa->o_valid & OBD_MD_FLATIME) {
2008                         attr->cat_atime = oa->o_atime;
2009                         valid |= CAT_ATIME;
2010                 }
2011                 if (oa->o_valid & OBD_MD_FLCTIME) {
2012                         attr->cat_ctime = oa->o_ctime;
2013                         valid |= CAT_CTIME;
2014                 }
2015                 if (valid != 0) {
2016                         cl_object_attr_lock(obj);
2017                         cl_object_attr_set(env, obj, attr, valid);
2018                         cl_object_attr_unlock(obj);
2019                 }
2020                 cl_object_put(env, obj);
2021         }
2022         OBDO_FREE(aa->aa_oa);
2023
2024         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2025                           req->rq_bulk->bd_nob_transferred);
2026         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2027         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2028
2029         client_obd_list_lock(&cli->cl_loi_list_lock);
2030         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2031          * is called so we know whether to go to sync BRWs or wait for more
2032          * RPCs to complete */
2033         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2034                 cli->cl_w_in_flight--;
2035         else
2036                 cli->cl_r_in_flight--;
2037         osc_wake_cache_waiters(cli);
2038         client_obd_list_unlock(&cli->cl_loi_list_lock);
2039
2040         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2041         RETURN(rc);
2042 }
2043
2044 /**
2045  * Build an RPC by the list of extent @ext_list. The caller must ensure
2046  * that the total pages in this list are NOT over max pages per RPC.
2047  * Extents in the list must be in OES_RPC state.
2048  */
2049 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2050                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2051 {
2052         struct ptlrpc_request *req = NULL;
2053         struct osc_extent *ext;
2054         CFS_LIST_HEAD(rpc_list);
2055         struct brw_page **pga = NULL;
2056         struct osc_brw_async_args *aa = NULL;
2057         struct obdo *oa = NULL;
2058         struct osc_async_page *oap;
2059         struct osc_async_page *tmp;
2060         struct cl_req *clerq = NULL;
2061         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2062         struct ldlm_lock *lock = NULL;
2063         struct cl_req_attr crattr;
2064         obd_off starting_offset = OBD_OBJECT_EOF;
2065         obd_off ending_offset = 0;
2066         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2067
2068         ENTRY;
2069         LASSERT(!cfs_list_empty(ext_list));
2070
2071         /* add pages into rpc_list to build BRW rpc */
2072         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2073                 LASSERT(ext->oe_state == OES_RPC);
2074                 mem_tight |= ext->oe_memalloc;
2075                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2076                         ++page_count;
2077                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2078                         if (starting_offset > oap->oap_obj_off)
2079                                 starting_offset = oap->oap_obj_off;
2080                         else
2081                                 LASSERT(oap->oap_page_off == 0);
2082                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2083                                 ending_offset = oap->oap_obj_off +
2084                                                 oap->oap_count;
2085                         else
2086                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2087                                         CFS_PAGE_SIZE);
2088                 }
2089         }
2090
2091         if (mem_tight)
2092                 mpflag = cfs_memory_pressure_get_and_set();
2093
2094         memset(&crattr, 0, sizeof crattr);
2095         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2096         if (pga == NULL)
2097                 GOTO(out, rc = -ENOMEM);
2098
2099         OBDO_ALLOC(oa);
2100         if (oa == NULL)
2101                 GOTO(out, rc = -ENOMEM);
2102
2103         i = 0;
2104         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2105                 struct cl_page *page = oap2cl_page(oap);
2106                 if (clerq == NULL) {
2107                         clerq = cl_req_alloc(env, page, crt,
2108                                              1 /* only 1-object rpcs for
2109                                                 * now */);
2110                         if (IS_ERR(clerq))
2111                                 GOTO(out, rc = PTR_ERR(clerq));
2112                         lock = oap->oap_ldlm_lock;
2113                 }
2114                 if (mem_tight)
2115                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2116                 pga[i] = &oap->oap_brw_page;
2117                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2118                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2119                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2120                 i++;
2121                 cl_req_page_add(env, clerq, page);
2122         }
2123
2124         /* always get the data for the obdo for the rpc */
2125         LASSERT(clerq != NULL);
2126         crattr.cra_oa = oa;
2127         crattr.cra_capa = NULL;
2128         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2129         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2130         if (lock) {
2131                 oa->o_handle = lock->l_remote_handle;
2132                 oa->o_valid |= OBD_MD_FLHANDLE;
2133         }
2134
2135         rc = cl_req_prep(env, clerq);
2136         if (rc != 0) {
2137                 CERROR("cl_req_prep failed: %d\n", rc);
2138                 GOTO(out, rc);
2139         }
2140
2141         sort_brw_pages(pga, page_count);
2142         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2143                         pga, &req, crattr.cra_capa, 1, 0);
2144         if (rc != 0) {
2145                 CERROR("prep_req failed: %d\n", rc);
2146                 GOTO(out, rc);
2147         }
2148
2149         req->rq_interpret_reply = brw_interpret;
2150         if (mem_tight != 0)
2151                 req->rq_memalloc = 1;
2152
2153         /* Need to update the timestamps after the request is built in case
2154          * we race with setattr (locally or in queue at OST).  If OST gets
2155          * later setattr before earlier BRW (as determined by the request xid),
2156          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2157          * way to do this in a single call.  bug 10150 */
2158         cl_req_attr_set(env, clerq, &crattr,
2159                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2160
2161         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2162
2163         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2164         aa = ptlrpc_req_async_args(req);
2165         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2166         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2167         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2168         cfs_list_splice_init(ext_list, &aa->aa_exts);
2169         aa->aa_clerq = clerq;
2170
2171         /* queued sync pages can be torn down while the pages
2172          * were between the pending list and the rpc */
2173         tmp = NULL;
2174         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2175                 /* only one oap gets a request reference */
2176                 if (tmp == NULL)
2177                         tmp = oap;
2178                 if (oap->oap_interrupted && !req->rq_intr) {
2179                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2180                                         oap, req);
2181                         ptlrpc_mark_interrupted(req);
2182                 }
2183         }
2184         if (tmp != NULL)
2185                 tmp->oap_request = ptlrpc_request_addref(req);
2186
2187         client_obd_list_lock(&cli->cl_loi_list_lock);
2188         starting_offset >>= CFS_PAGE_SHIFT;
2189         if (cmd == OBD_BRW_READ) {
2190                 cli->cl_r_in_flight++;
2191                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2192                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2193                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2194                                       starting_offset + 1);
2195         } else {
2196                 cli->cl_w_in_flight++;
2197                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2198                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2199                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2200                                       starting_offset + 1);
2201         }
2202         client_obd_list_unlock(&cli->cl_loi_list_lock);
2203
2204         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2205                   page_count, aa, cli->cl_r_in_flight,
2206                   cli->cl_w_in_flight);
2207
2208         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2209          * see which CPU/NUMA node the majority of pages were allocated
2210          * on, and try to assign the async RPC to the CPU core
2211          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2212          *
2213          * But on the other hand, we expect that multiple ptlrpcd
2214          * threads and the initial write sponsor can run in parallel,
2215          * especially when data checksum is enabled, which is CPU-bound
2216          * operation and single ptlrpcd thread cannot process in time.
2217          * So more ptlrpcd threads sharing BRW load
2218          * (with PDL_POLICY_ROUND) seems better.
2219          */
2220         ptlrpcd_add_req(req, pol, -1);
2221         rc = 0;
2222         EXIT;
2223
2224 out:
2225         if (mem_tight != 0)
2226                 cfs_memory_pressure_restore(mpflag);
2227
2228         capa_put(crattr.cra_capa);
2229         if (rc != 0) {
2230                 LASSERT(req == NULL);
2231
2232                 if (oa)
2233                         OBDO_FREE(oa);
2234                 if (pga)
2235                         OBD_FREE(pga, sizeof(*pga) * page_count);
2236                 /* this should happen rarely and is pretty bad, it makes the
2237                  * pending list not follow the dirty order */
2238                 while (!cfs_list_empty(ext_list)) {
2239                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2240                                              oe_link);
2241                         cfs_list_del_init(&ext->oe_link);
2242                         osc_extent_finish(env, ext, 0, rc);
2243                 }
2244                 if (clerq && !IS_ERR(clerq))
2245                         cl_req_completion(env, clerq, rc);
2246         }
2247         RETURN(rc);
2248 }
2249
2250 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2251                                         struct ldlm_enqueue_info *einfo)
2252 {
2253         void *data = einfo->ei_cbdata;
2254         int set = 0;
2255
2256         LASSERT(lock != NULL);
2257         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2258         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2259         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2260         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2261
2262         lock_res_and_lock(lock);
2263         spin_lock(&osc_ast_guard);
2264
2265         if (lock->l_ast_data == NULL)
2266                 lock->l_ast_data = data;
2267         if (lock->l_ast_data == data)
2268                 set = 1;
2269
2270         spin_unlock(&osc_ast_guard);
2271         unlock_res_and_lock(lock);
2272
2273         return set;
2274 }
2275
2276 static int osc_set_data_with_check(struct lustre_handle *lockh,
2277                                    struct ldlm_enqueue_info *einfo)
2278 {
2279         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2280         int set = 0;
2281
2282         if (lock != NULL) {
2283                 set = osc_set_lock_data_with_check(lock, einfo);
2284                 LDLM_LOCK_PUT(lock);
2285         } else
2286                 CERROR("lockh %p, data %p - client evicted?\n",
2287                        lockh, einfo->ei_cbdata);
2288         return set;
2289 }
2290
2291 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2292                              ldlm_iterator_t replace, void *data)
2293 {
2294         struct ldlm_res_id res_id;
2295         struct obd_device *obd = class_exp2obd(exp);
2296
2297         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2298         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2299         return 0;
2300 }
2301
2302 /* find any ldlm lock of the inode in osc
2303  * return 0    not find
2304  *        1    find one
2305  *      < 0    error */
2306 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2307                            ldlm_iterator_t replace, void *data)
2308 {
2309         struct ldlm_res_id res_id;
2310         struct obd_device *obd = class_exp2obd(exp);
2311         int rc = 0;
2312
2313         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2314         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2315         if (rc == LDLM_ITER_STOP)
2316                 return(1);
2317         if (rc == LDLM_ITER_CONTINUE)
2318                 return(0);
2319         return(rc);
2320 }
2321
2322 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2323                             obd_enqueue_update_f upcall, void *cookie,
2324                             __u64 *flags, int agl, int rc)
2325 {
2326         int intent = *flags & LDLM_FL_HAS_INTENT;
2327         ENTRY;
2328
2329         if (intent) {
2330                 /* The request was created before ldlm_cli_enqueue call. */
2331                 if (rc == ELDLM_LOCK_ABORTED) {
2332                         struct ldlm_reply *rep;
2333                         rep = req_capsule_server_get(&req->rq_pill,
2334                                                      &RMF_DLM_REP);
2335
2336                         LASSERT(rep != NULL);
2337                         if (rep->lock_policy_res1)
2338                                 rc = rep->lock_policy_res1;
2339                 }
2340         }
2341
2342         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2343             (rc == 0)) {
2344                 *flags |= LDLM_FL_LVB_READY;
2345                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2346                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2347         }
2348
2349         /* Call the update callback. */
2350         rc = (*upcall)(cookie, rc);
2351         RETURN(rc);
2352 }
2353
2354 static int osc_enqueue_interpret(const struct lu_env *env,
2355                                  struct ptlrpc_request *req,
2356                                  struct osc_enqueue_args *aa, int rc)
2357 {
2358         struct ldlm_lock *lock;
2359         struct lustre_handle handle;
2360         __u32 mode;
2361         struct ost_lvb *lvb;
2362         __u32 lvb_len;
2363         __u64 *flags = aa->oa_flags;
2364
2365         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2366          * might be freed anytime after lock upcall has been called. */
2367         lustre_handle_copy(&handle, aa->oa_lockh);
2368         mode = aa->oa_ei->ei_mode;
2369
2370         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2371          * be valid. */
2372         lock = ldlm_handle2lock(&handle);
2373
2374         /* Take an additional reference so that a blocking AST that
2375          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2376          * to arrive after an upcall has been executed by
2377          * osc_enqueue_fini(). */
2378         ldlm_lock_addref(&handle, mode);
2379
2380         /* Let CP AST to grant the lock first. */
2381         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2382
2383         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2384                 lvb = NULL;
2385                 lvb_len = 0;
2386         } else {
2387                 lvb = aa->oa_lvb;
2388                 lvb_len = sizeof(*aa->oa_lvb);
2389         }
2390
2391         /* Complete obtaining the lock procedure. */
2392         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2393                                    mode, flags, lvb, lvb_len, &handle, rc);
2394         /* Complete osc stuff. */
2395         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2396                               flags, aa->oa_agl, rc);
2397
2398         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2399
2400         /* Release the lock for async request. */
2401         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2402                 /*
2403                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2404                  * not already released by
2405                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2406                  */
2407                 ldlm_lock_decref(&handle, mode);
2408
2409         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2410                  aa->oa_lockh, req, aa);
2411         ldlm_lock_decref(&handle, mode);
2412         LDLM_LOCK_PUT(lock);
2413         return rc;
2414 }
2415
2416 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2417                         struct lov_oinfo *loi, int flags,
2418                         struct ost_lvb *lvb, __u32 mode, int rc)
2419 {
2420         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2421
2422         if (rc == ELDLM_OK) {
2423                 __u64 tmp;
2424
2425                 LASSERT(lock != NULL);
2426                 loi->loi_lvb = *lvb;
2427                 tmp = loi->loi_lvb.lvb_size;
2428                 /* Extend KMS up to the end of this lock and no further
2429                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2430                 if (tmp > lock->l_policy_data.l_extent.end)
2431                         tmp = lock->l_policy_data.l_extent.end + 1;
2432                 if (tmp >= loi->loi_kms) {
2433                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2434                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2435                         loi_kms_set(loi, tmp);
2436                 } else {
2437                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2438                                    LPU64"; leaving kms="LPU64", end="LPU64,
2439                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2440                                    lock->l_policy_data.l_extent.end);
2441                 }
2442                 ldlm_lock_allow_match(lock);
2443         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2444                 LASSERT(lock != NULL);
2445                 loi->loi_lvb = *lvb;
2446                 ldlm_lock_allow_match(lock);
2447                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2448                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2449                 rc = ELDLM_OK;
2450         }
2451
2452         if (lock != NULL) {
2453                 if (rc != ELDLM_OK)
2454                         ldlm_lock_fail_match(lock);
2455
2456                 LDLM_LOCK_PUT(lock);
2457         }
2458 }
2459 EXPORT_SYMBOL(osc_update_enqueue);
2460
2461 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2462
2463 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2464  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2465  * other synchronous requests, however keeping some locks and trying to obtain
2466  * others may take a considerable amount of time in a case of ost failure; and
2467  * when other sync requests do not get released lock from a client, the client
2468  * is excluded from the cluster -- such scenarious make the life difficult, so
2469  * release locks just after they are obtained. */
2470 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2471                      __u64 *flags, ldlm_policy_data_t *policy,
2472                      struct ost_lvb *lvb, int kms_valid,
2473                      obd_enqueue_update_f upcall, void *cookie,
2474                      struct ldlm_enqueue_info *einfo,
2475                      struct lustre_handle *lockh,
2476                      struct ptlrpc_request_set *rqset, int async, int agl)
2477 {
2478         struct obd_device *obd = exp->exp_obd;
2479         struct ptlrpc_request *req = NULL;
2480         int intent = *flags & LDLM_FL_HAS_INTENT;
2481         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2482         ldlm_mode_t mode;
2483         int rc;
2484         ENTRY;
2485
2486         /* Filesystem lock extents are extended to page boundaries so that
2487          * dealing with the page cache is a little smoother.  */
2488         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2489         policy->l_extent.end |= ~CFS_PAGE_MASK;
2490
2491         /*
2492          * kms is not valid when either object is completely fresh (so that no
2493          * locks are cached), or object was evicted. In the latter case cached
2494          * lock cannot be used, because it would prime inode state with
2495          * potentially stale LVB.
2496          */
2497         if (!kms_valid)
2498                 goto no_match;
2499
2500         /* Next, search for already existing extent locks that will cover us */
2501         /* If we're trying to read, we also search for an existing PW lock.  The
2502          * VFS and page cache already protect us locally, so lots of readers/
2503          * writers can share a single PW lock.
2504          *
2505          * There are problems with conversion deadlocks, so instead of
2506          * converting a read lock to a write lock, we'll just enqueue a new
2507          * one.
2508          *
2509          * At some point we should cancel the read lock instead of making them
2510          * send us a blocking callback, but there are problems with canceling
2511          * locks out from other users right now, too. */
2512         mode = einfo->ei_mode;
2513         if (einfo->ei_mode == LCK_PR)
2514                 mode |= LCK_PW;
2515         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2516                                einfo->ei_type, policy, mode, lockh, 0);
2517         if (mode) {
2518                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2519
2520                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2521                         /* For AGL, if enqueue RPC is sent but the lock is not
2522                          * granted, then skip to process this strpe.
2523                          * Return -ECANCELED to tell the caller. */
2524                         ldlm_lock_decref(lockh, mode);
2525                         LDLM_LOCK_PUT(matched);
2526                         RETURN(-ECANCELED);
2527                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2528                         *flags |= LDLM_FL_LVB_READY;
2529                         /* addref the lock only if not async requests and PW
2530                          * lock is matched whereas we asked for PR. */
2531                         if (!rqset && einfo->ei_mode != mode)
2532                                 ldlm_lock_addref(lockh, LCK_PR);
2533                         if (intent) {
2534                                 /* I would like to be able to ASSERT here that
2535                                  * rss <= kms, but I can't, for reasons which
2536                                  * are explained in lov_enqueue() */
2537                         }
2538
2539                         /* We already have a lock, and it's referenced.
2540                          *
2541                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2542                          * AGL upcall may change it to CLS_HELD directly. */
2543                         (*upcall)(cookie, ELDLM_OK);
2544
2545                         if (einfo->ei_mode != mode)
2546                                 ldlm_lock_decref(lockh, LCK_PW);
2547                         else if (rqset)
2548                                 /* For async requests, decref the lock. */
2549                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2550                         LDLM_LOCK_PUT(matched);
2551                         RETURN(ELDLM_OK);
2552                 } else {
2553                         ldlm_lock_decref(lockh, mode);
2554                         LDLM_LOCK_PUT(matched);
2555                 }
2556         }
2557
2558  no_match:
2559         if (intent) {
2560                 CFS_LIST_HEAD(cancels);
2561                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2562                                            &RQF_LDLM_ENQUEUE_LVB);
2563                 if (req == NULL)
2564                         RETURN(-ENOMEM);
2565
2566                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2567                 if (rc) {
2568                         ptlrpc_request_free(req);
2569                         RETURN(rc);
2570                 }
2571
2572                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2573                                      sizeof *lvb);
2574                 ptlrpc_request_set_replen(req);
2575         }
2576
2577         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2578         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2579
2580         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2581                               sizeof(*lvb), LVB_T_OST, lockh, async);
2582         if (rqset) {
2583                 if (!rc) {
2584                         struct osc_enqueue_args *aa;
2585                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2586                         aa = ptlrpc_req_async_args(req);
2587                         aa->oa_ei = einfo;
2588                         aa->oa_exp = exp;
2589                         aa->oa_flags  = flags;
2590                         aa->oa_upcall = upcall;
2591                         aa->oa_cookie = cookie;
2592                         aa->oa_lvb    = lvb;
2593                         aa->oa_lockh  = lockh;
2594                         aa->oa_agl    = !!agl;
2595
2596                         req->rq_interpret_reply =
2597                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2598                         if (rqset == PTLRPCD_SET)
2599                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2600                         else
2601                                 ptlrpc_set_add_req(rqset, req);
2602                 } else if (intent) {
2603                         ptlrpc_req_finished(req);
2604                 }
2605                 RETURN(rc);
2606         }
2607
2608         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2609         if (intent)
2610                 ptlrpc_req_finished(req);
2611
2612         RETURN(rc);
2613 }
2614
2615 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2616                        struct ldlm_enqueue_info *einfo,
2617                        struct ptlrpc_request_set *rqset)
2618 {
2619         struct ldlm_res_id res_id;
2620         int rc;
2621         ENTRY;
2622
2623         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2624         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2625                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2626                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2627                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2628                               rqset, rqset != NULL, 0);
2629         RETURN(rc);
2630 }
2631
2632 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2633                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2634                    int *flags, void *data, struct lustre_handle *lockh,
2635                    int unref)
2636 {
2637         struct obd_device *obd = exp->exp_obd;
2638         int lflags = *flags;
2639         ldlm_mode_t rc;
2640         ENTRY;
2641
2642         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2643                 RETURN(-EIO);
2644
2645         /* Filesystem lock extents are extended to page boundaries so that
2646          * dealing with the page cache is a little smoother */
2647         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2648         policy->l_extent.end |= ~CFS_PAGE_MASK;
2649
2650         /* Next, search for already existing extent locks that will cover us */
2651         /* If we're trying to read, we also search for an existing PW lock.  The
2652          * VFS and page cache already protect us locally, so lots of readers/
2653          * writers can share a single PW lock. */
2654         rc = mode;
2655         if (mode == LCK_PR)
2656                 rc |= LCK_PW;
2657         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2658                              res_id, type, policy, rc, lockh, unref);
2659         if (rc) {
2660                 if (data != NULL) {
2661                         if (!osc_set_data_with_check(lockh, data)) {
2662                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2663                                         ldlm_lock_decref(lockh, rc);
2664                                 RETURN(0);
2665                         }
2666                 }
2667                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2668                         ldlm_lock_addref(lockh, LCK_PR);
2669                         ldlm_lock_decref(lockh, LCK_PW);
2670                 }
2671                 RETURN(rc);
2672         }
2673         RETURN(rc);
2674 }
2675
2676 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2677 {
2678         ENTRY;
2679
2680         if (unlikely(mode == LCK_GROUP))
2681                 ldlm_lock_decref_and_cancel(lockh, mode);
2682         else
2683                 ldlm_lock_decref(lockh, mode);
2684
2685         RETURN(0);
2686 }
2687
2688 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2689                       __u32 mode, struct lustre_handle *lockh)
2690 {
2691         ENTRY;
2692         RETURN(osc_cancel_base(lockh, mode));
2693 }
2694
2695 static int osc_cancel_unused(struct obd_export *exp,
2696                              struct lov_stripe_md *lsm,
2697                              ldlm_cancel_flags_t flags,
2698                              void *opaque)
2699 {
2700         struct obd_device *obd = class_exp2obd(exp);
2701         struct ldlm_res_id res_id, *resp = NULL;
2702
2703         if (lsm != NULL) {
2704                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2705                 resp = &res_id;
2706         }
2707
2708         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2709 }
2710
2711 static int osc_statfs_interpret(const struct lu_env *env,
2712                                 struct ptlrpc_request *req,
2713                                 struct osc_async_args *aa, int rc)
2714 {
2715         struct obd_statfs *msfs;
2716         ENTRY;
2717
2718         if (rc == -EBADR)
2719                 /* The request has in fact never been sent
2720                  * due to issues at a higher level (LOV).
2721                  * Exit immediately since the caller is
2722                  * aware of the problem and takes care
2723                  * of the clean up */
2724                  RETURN(rc);
2725
2726         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2727             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2728                 GOTO(out, rc = 0);
2729
2730         if (rc != 0)
2731                 GOTO(out, rc);
2732
2733         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2734         if (msfs == NULL) {
2735                 GOTO(out, rc = -EPROTO);
2736         }
2737
2738         *aa->aa_oi->oi_osfs = *msfs;
2739 out:
2740         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2741         RETURN(rc);
2742 }
2743
2744 static int osc_statfs_async(struct obd_export *exp,
2745                             struct obd_info *oinfo, __u64 max_age,
2746                             struct ptlrpc_request_set *rqset)
2747 {
2748         struct obd_device     *obd = class_exp2obd(exp);
2749         struct ptlrpc_request *req;
2750         struct osc_async_args *aa;
2751         int                    rc;
2752         ENTRY;
2753
2754         /* We could possibly pass max_age in the request (as an absolute
2755          * timestamp or a "seconds.usec ago") so the target can avoid doing
2756          * extra calls into the filesystem if that isn't necessary (e.g.
2757          * during mount that would help a bit).  Having relative timestamps
2758          * is not so great if request processing is slow, while absolute
2759          * timestamps are not ideal because they need time synchronization. */
2760         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2761         if (req == NULL)
2762                 RETURN(-ENOMEM);
2763
2764         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2765         if (rc) {
2766                 ptlrpc_request_free(req);
2767                 RETURN(rc);
2768         }
2769         ptlrpc_request_set_replen(req);
2770         req->rq_request_portal = OST_CREATE_PORTAL;
2771         ptlrpc_at_set_req_timeout(req);
2772
2773         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2774                 /* procfs requests not want stat in wait for avoid deadlock */
2775                 req->rq_no_resend = 1;
2776                 req->rq_no_delay = 1;
2777         }
2778
2779         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2780         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2781         aa = ptlrpc_req_async_args(req);
2782         aa->aa_oi = oinfo;
2783
2784         ptlrpc_set_add_req(rqset, req);
2785         RETURN(0);
2786 }
2787
2788 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2789                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2790 {
2791         struct obd_device     *obd = class_exp2obd(exp);
2792         struct obd_statfs     *msfs;
2793         struct ptlrpc_request *req;
2794         struct obd_import     *imp = NULL;
2795         int rc;
2796         ENTRY;
2797
2798         /*Since the request might also come from lprocfs, so we need
2799          *sync this with client_disconnect_export Bug15684*/
2800         down_read(&obd->u.cli.cl_sem);
2801         if (obd->u.cli.cl_import)
2802                 imp = class_import_get(obd->u.cli.cl_import);
2803         up_read(&obd->u.cli.cl_sem);
2804         if (!imp)
2805                 RETURN(-ENODEV);
2806
2807         /* We could possibly pass max_age in the request (as an absolute
2808          * timestamp or a "seconds.usec ago") so the target can avoid doing
2809          * extra calls into the filesystem if that isn't necessary (e.g.
2810          * during mount that would help a bit).  Having relative timestamps
2811          * is not so great if request processing is slow, while absolute
2812          * timestamps are not ideal because they need time synchronization. */
2813         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2814
2815         class_import_put(imp);
2816
2817         if (req == NULL)
2818                 RETURN(-ENOMEM);
2819
2820         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2821         if (rc) {
2822                 ptlrpc_request_free(req);
2823                 RETURN(rc);
2824         }
2825         ptlrpc_request_set_replen(req);
2826         req->rq_request_portal = OST_CREATE_PORTAL;
2827         ptlrpc_at_set_req_timeout(req);
2828
2829         if (flags & OBD_STATFS_NODELAY) {
2830                 /* procfs requests not want stat in wait for avoid deadlock */
2831                 req->rq_no_resend = 1;
2832                 req->rq_no_delay = 1;
2833         }
2834
2835         rc = ptlrpc_queue_wait(req);
2836         if (rc)
2837                 GOTO(out, rc);
2838
2839         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2840         if (msfs == NULL) {
2841                 GOTO(out, rc = -EPROTO);
2842         }
2843
2844         *osfs = *msfs;
2845
2846         EXIT;
2847  out:
2848         ptlrpc_req_finished(req);
2849         return rc;
2850 }
2851
2852 /* Retrieve object striping information.
2853  *
2854  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2855  * the maximum number of OST indices which will fit in the user buffer.
2856  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2857  */
2858 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2859 {
2860         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2861         struct lov_user_md_v3 lum, *lumk;
2862         struct lov_user_ost_data_v1 *lmm_objects;
2863         int rc = 0, lum_size;
2864         ENTRY;
2865
2866         if (!lsm)
2867                 RETURN(-ENODATA);
2868
2869         /* we only need the header part from user space to get lmm_magic and
2870          * lmm_stripe_count, (the header part is common to v1 and v3) */
2871         lum_size = sizeof(struct lov_user_md_v1);
2872         if (cfs_copy_from_user(&lum, lump, lum_size))
2873                 RETURN(-EFAULT);
2874
2875         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2876             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2877                 RETURN(-EINVAL);
2878
2879         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2880         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2881         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2882         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2883
2884         /* we can use lov_mds_md_size() to compute lum_size
2885          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2886         if (lum.lmm_stripe_count > 0) {
2887                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2888                 OBD_ALLOC(lumk, lum_size);
2889                 if (!lumk)
2890                         RETURN(-ENOMEM);
2891
2892                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2893                         lmm_objects =
2894                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2895                 else
2896                         lmm_objects = &(lumk->lmm_objects[0]);
2897                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2898         } else {
2899                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2900                 lumk = &lum;
2901         }
2902
2903         lumk->lmm_oi = lsm->lsm_oi;
2904         lumk->lmm_stripe_count = 1;
2905
2906         if (cfs_copy_to_user(lump, lumk, lum_size))
2907                 rc = -EFAULT;
2908
2909         if (lumk != &lum)
2910                 OBD_FREE(lumk, lum_size);
2911
2912         RETURN(rc);
2913 }
2914
2915
2916 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2917                          void *karg, void *uarg)
2918 {
2919         struct obd_device *obd = exp->exp_obd;
2920         struct obd_ioctl_data *data = karg;
2921         int err = 0;
2922         ENTRY;
2923
2924         if (!cfs_try_module_get(THIS_MODULE)) {
2925                 CERROR("Can't get module. Is it alive?");
2926                 return -EINVAL;
2927         }
2928         switch (cmd) {
2929         case OBD_IOC_LOV_GET_CONFIG: {
2930                 char *buf;
2931                 struct lov_desc *desc;
2932                 struct obd_uuid uuid;
2933
2934                 buf = NULL;
2935                 len = 0;
2936                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2937                         GOTO(out, err = -EINVAL);
2938
2939                 data = (struct obd_ioctl_data *)buf;
2940
2941                 if (sizeof(*desc) > data->ioc_inllen1) {
2942                         obd_ioctl_freedata(buf, len);
2943                         GOTO(out, err = -EINVAL);
2944                 }
2945
2946                 if (data->ioc_inllen2 < sizeof(uuid)) {
2947                         obd_ioctl_freedata(buf, len);
2948                         GOTO(out, err = -EINVAL);
2949                 }
2950
2951                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2952                 desc->ld_tgt_count = 1;
2953                 desc->ld_active_tgt_count = 1;
2954                 desc->ld_default_stripe_count = 1;
2955                 desc->ld_default_stripe_size = 0;
2956                 desc->ld_default_stripe_offset = 0;
2957                 desc->ld_pattern = 0;
2958                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2959
2960                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2961
2962                 err = cfs_copy_to_user((void *)uarg, buf, len);
2963                 if (err)
2964                         err = -EFAULT;
2965                 obd_ioctl_freedata(buf, len);
2966                 GOTO(out, err);
2967         }
2968         case LL_IOC_LOV_SETSTRIPE:
2969                 err = obd_alloc_memmd(exp, karg);
2970                 if (err > 0)
2971                         err = 0;
2972                 GOTO(out, err);
2973         case LL_IOC_LOV_GETSTRIPE:
2974                 err = osc_getstripe(karg, uarg);
2975                 GOTO(out, err);
2976         case OBD_IOC_CLIENT_RECOVER:
2977                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2978                                             data->ioc_inlbuf1, 0);
2979                 if (err > 0)
2980                         err = 0;
2981                 GOTO(out, err);
2982         case IOC_OSC_SET_ACTIVE:
2983                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2984                                                data->ioc_offset);
2985                 GOTO(out, err);
2986         case OBD_IOC_POLL_QUOTACHECK:
2987                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2988                 GOTO(out, err);
2989         case OBD_IOC_PING_TARGET:
2990                 err = ptlrpc_obd_ping(obd);
2991                 GOTO(out, err);
2992         default:
2993                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2994                        cmd, cfs_curproc_comm());
2995                 GOTO(out, err = -ENOTTY);
2996         }
2997 out:
2998         cfs_module_put(THIS_MODULE);
2999         return err;
3000 }
3001
3002 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3003                         obd_count keylen, void *key, __u32 *vallen, void *val,
3004                         struct lov_stripe_md *lsm)
3005 {
3006         ENTRY;
3007         if (!vallen || !val)
3008                 RETURN(-EFAULT);
3009
3010         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3011                 __u32 *stripe = val;
3012                 *vallen = sizeof(*stripe);
3013                 *stripe = 0;
3014                 RETURN(0);
3015         } else if (KEY_IS(KEY_LAST_ID)) {
3016                 struct ptlrpc_request *req;
3017                 obd_id                *reply;
3018                 char                  *tmp;
3019                 int                    rc;
3020
3021                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3022                                            &RQF_OST_GET_INFO_LAST_ID);
3023                 if (req == NULL)
3024                         RETURN(-ENOMEM);
3025
3026                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3027                                      RCL_CLIENT, keylen);
3028                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3029                 if (rc) {
3030                         ptlrpc_request_free(req);
3031                         RETURN(rc);
3032                 }
3033
3034                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3035                 memcpy(tmp, key, keylen);
3036
3037                 req->rq_no_delay = req->rq_no_resend = 1;
3038                 ptlrpc_request_set_replen(req);
3039                 rc = ptlrpc_queue_wait(req);
3040                 if (rc)
3041                         GOTO(out, rc);
3042
3043                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3044                 if (reply == NULL)
3045                         GOTO(out, rc = -EPROTO);
3046
3047                 *((obd_id *)val) = *reply;
3048         out:
3049                 ptlrpc_req_finished(req);
3050                 RETURN(rc);
3051         } else if (KEY_IS(KEY_FIEMAP)) {
3052                 struct ptlrpc_request *req;
3053                 struct ll_user_fiemap *reply;
3054                 char *tmp;
3055                 int rc;
3056
3057                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3058                                            &RQF_OST_GET_INFO_FIEMAP);
3059                 if (req == NULL)
3060                         RETURN(-ENOMEM);
3061
3062                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3063                                      RCL_CLIENT, keylen);
3064                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3065                                      RCL_CLIENT, *vallen);
3066                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3067                                      RCL_SERVER, *vallen);
3068
3069                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3070                 if (rc) {
3071                         ptlrpc_request_free(req);
3072                         RETURN(rc);
3073                 }
3074
3075                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3076                 memcpy(tmp, key, keylen);
3077                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3078                 memcpy(tmp, val, *vallen);
3079
3080                 ptlrpc_request_set_replen(req);
3081                 rc = ptlrpc_queue_wait(req);
3082                 if (rc)
3083                         GOTO(out1, rc);
3084
3085                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3086                 if (reply == NULL)
3087                         GOTO(out1, rc = -EPROTO);
3088
3089                 memcpy(val, reply, *vallen);
3090         out1:
3091                 ptlrpc_req_finished(req);
3092
3093                 RETURN(rc);
3094         }
3095
3096         RETURN(-EINVAL);
3097 }
3098
3099 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3100                               obd_count keylen, void *key, obd_count vallen,
3101                               void *val, struct ptlrpc_request_set *set)
3102 {
3103         struct ptlrpc_request *req;
3104         struct obd_device     *obd = exp->exp_obd;
3105         struct obd_import     *imp = class_exp2cliimp(exp);
3106         char                  *tmp;
3107         int                    rc;
3108         ENTRY;
3109
3110         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3111
3112         if (KEY_IS(KEY_CHECKSUM)) {
3113                 if (vallen != sizeof(int))
3114                         RETURN(-EINVAL);
3115                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3116                 RETURN(0);
3117         }
3118
3119         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3120                 sptlrpc_conf_client_adapt(obd);
3121                 RETURN(0);
3122         }
3123
3124         if (KEY_IS(KEY_FLUSH_CTX)) {
3125                 sptlrpc_import_flush_my_ctx(imp);
3126                 RETURN(0);
3127         }
3128
3129         if (KEY_IS(KEY_CACHE_SET)) {
3130                 struct client_obd *cli = &obd->u.cli;
3131
3132                 LASSERT(cli->cl_cache == NULL); /* only once */
3133                 cli->cl_cache = (struct cl_client_cache *)val;
3134                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3135                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3136
3137                 /* add this osc into entity list */
3138                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3139                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3140                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3141                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3142
3143                 RETURN(0);
3144         }
3145
3146         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3147                 struct client_obd *cli = &obd->u.cli;
3148                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3149                 int target = *(int *)val;
3150
3151                 nr = osc_lru_shrink(cli, min(nr, target));
3152                 *(int *)val -= nr;
3153                 RETURN(0);
3154         }
3155
3156         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3157                 RETURN(-EINVAL);
3158
3159         /* We pass all other commands directly to OST. Since nobody calls osc
3160            methods directly and everybody is supposed to go through LOV, we
3161            assume lov checked invalid values for us.
3162            The only recognised values so far are evict_by_nid and mds_conn.
3163            Even if something bad goes through, we'd get a -EINVAL from OST
3164            anyway. */
3165
3166         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3167                                                 &RQF_OST_SET_GRANT_INFO :
3168                                                 &RQF_OBD_SET_INFO);
3169         if (req == NULL)
3170                 RETURN(-ENOMEM);
3171
3172         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3173                              RCL_CLIENT, keylen);
3174         if (!KEY_IS(KEY_GRANT_SHRINK))
3175                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3176                                      RCL_CLIENT, vallen);
3177         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3178         if (rc) {
3179                 ptlrpc_request_free(req);
3180                 RETURN(rc);
3181         }
3182
3183         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3184         memcpy(tmp, key, keylen);
3185         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3186                                                         &RMF_OST_BODY :
3187                                                         &RMF_SETINFO_VAL);
3188         memcpy(tmp, val, vallen);
3189
3190         if (KEY_IS(KEY_GRANT_SHRINK)) {
3191                 struct osc_grant_args *aa;
3192                 struct obdo *oa;
3193
3194                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3195                 aa = ptlrpc_req_async_args(req);
3196                 OBDO_ALLOC(oa);
3197                 if (!oa) {
3198                         ptlrpc_req_finished(req);
3199                         RETURN(-ENOMEM);
3200                 }
3201                 *oa = ((struct ost_body *)val)->oa;
3202                 aa->aa_oa = oa;
3203                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3204         }
3205
3206         ptlrpc_request_set_replen(req);
3207         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3208                 LASSERT(set != NULL);
3209                 ptlrpc_set_add_req(set, req);
3210                 ptlrpc_check_set(NULL, set);
3211         } else
3212                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3213
3214         RETURN(0);
3215 }
3216
3217
3218 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3219                          struct obd_device *disk_obd, int *index)
3220 {
3221         /* this code is not supposed to be used with LOD/OSP
3222          * to be removed soon */
3223         LBUG();
3224         return 0;
3225 }
3226
3227 static int osc_llog_finish(struct obd_device *obd, int count)
3228 {
3229         struct llog_ctxt *ctxt;
3230
3231         ENTRY;
3232
3233         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3234         if (ctxt) {
3235                 llog_cat_close(NULL, ctxt->loc_handle);
3236                 llog_cleanup(NULL, ctxt);
3237         }
3238
3239         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3240         if (ctxt)
3241                 llog_cleanup(NULL, ctxt);
3242         RETURN(0);
3243 }
3244
3245 static int osc_reconnect(const struct lu_env *env,
3246                          struct obd_export *exp, struct obd_device *obd,
3247                          struct obd_uuid *cluuid,
3248                          struct obd_connect_data *data,
3249                          void *localdata)
3250 {
3251         struct client_obd *cli = &obd->u.cli;
3252
3253         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3254                 long lost_grant;
3255
3256                 client_obd_list_lock(&cli->cl_loi_list_lock);
3257                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3258                                 2 * cli_brw_size(obd);
3259                 lost_grant = cli->cl_lost_grant;
3260                 cli->cl_lost_grant = 0;
3261                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3262
3263                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3264                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3265                        data->ocd_version, data->ocd_grant, lost_grant);
3266         }
3267
3268         RETURN(0);
3269 }
3270
3271 static int osc_disconnect(struct obd_export *exp)
3272 {
3273         struct obd_device *obd = class_exp2obd(exp);
3274         struct llog_ctxt  *ctxt;
3275         int rc;
3276
3277         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3278         if (ctxt) {
3279                 if (obd->u.cli.cl_conn_count == 1) {
3280                         /* Flush any remaining cancel messages out to the
3281                          * target */
3282                         llog_sync(ctxt, exp, 0);
3283                 }
3284                 llog_ctxt_put(ctxt);
3285         } else {
3286                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3287                        obd);
3288         }
3289
3290         rc = client_disconnect_export(exp);
3291         /**
3292          * Initially we put del_shrink_grant before disconnect_export, but it
3293          * causes the following problem if setup (connect) and cleanup
3294          * (disconnect) are tangled together.
3295          *      connect p1                     disconnect p2
3296          *   ptlrpc_connect_import
3297          *     ...............               class_manual_cleanup
3298          *                                     osc_disconnect
3299          *                                     del_shrink_grant
3300          *   ptlrpc_connect_interrupt
3301          *     init_grant_shrink
3302          *   add this client to shrink list
3303          *                                      cleanup_osc
3304          * Bang! pinger trigger the shrink.
3305          * So the osc should be disconnected from the shrink list, after we
3306          * are sure the import has been destroyed. BUG18662
3307          */
3308         if (obd->u.cli.cl_import == NULL)
3309                 osc_del_shrink_grant(&obd->u.cli);
3310         return rc;
3311 }
3312
3313 static int osc_import_event(struct obd_device *obd,
3314                             struct obd_import *imp,
3315                             enum obd_import_event event)
3316 {
3317         struct client_obd *cli;
3318         int rc = 0;
3319
3320         ENTRY;
3321         LASSERT(imp->imp_obd == obd);
3322
3323         switch (event) {
3324         case IMP_EVENT_DISCON: {
3325                 cli = &obd->u.cli;
3326                 client_obd_list_lock(&cli->cl_loi_list_lock);
3327                 cli->cl_avail_grant = 0;
3328                 cli->cl_lost_grant = 0;
3329                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3330                 break;
3331         }
3332         case IMP_EVENT_INACTIVE: {
3333                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3334                 break;
3335         }
3336         case IMP_EVENT_INVALIDATE: {
3337                 struct ldlm_namespace *ns = obd->obd_namespace;
3338                 struct lu_env         *env;
3339                 int                    refcheck;
3340
3341                 env = cl_env_get(&refcheck);
3342                 if (!IS_ERR(env)) {
3343                         /* Reset grants */
3344                         cli = &obd->u.cli;
3345                         /* all pages go to failing rpcs due to the invalid
3346                          * import */
3347                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3348
3349                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3350                         cl_env_put(env, &refcheck);
3351                 } else
3352                         rc = PTR_ERR(env);
3353                 break;
3354         }
3355         case IMP_EVENT_ACTIVE: {
3356                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3357                 break;
3358         }
3359         case IMP_EVENT_OCD: {
3360                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3361
3362                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3363                         osc_init_grant(&obd->u.cli, ocd);
3364
3365                 /* See bug 7198 */
3366                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3367                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3368
3369                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3370                 break;
3371         }
3372         case IMP_EVENT_DEACTIVATE: {
3373                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3374                 break;
3375         }
3376         case IMP_EVENT_ACTIVATE: {
3377                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3378                 break;
3379         }
3380         default:
3381                 CERROR("Unknown import event %d\n", event);
3382                 LBUG();
3383         }
3384         RETURN(rc);
3385 }
3386
3387 /**
3388  * Determine whether the lock can be canceled before replaying the lock
3389  * during recovery, see bug16774 for detailed information.
3390  *
3391  * \retval zero the lock can't be canceled
3392  * \retval other ok to cancel
3393  */
3394 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3395 {
3396         check_res_locked(lock->l_resource);
3397
3398         /*
3399          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3400          *
3401          * XXX as a future improvement, we can also cancel unused write lock
3402          * if it doesn't have dirty data and active mmaps.
3403          */
3404         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3405             (lock->l_granted_mode == LCK_PR ||
3406              lock->l_granted_mode == LCK_CR) &&
3407             (osc_dlm_lock_pageref(lock) == 0))
3408                 RETURN(1);
3409
3410         RETURN(0);
3411 }
3412
3413 static int brw_queue_work(const struct lu_env *env, void *data)
3414 {
3415         struct client_obd *cli = data;
3416
3417         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3418
3419         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3420         RETURN(0);
3421 }
3422
3423 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3424 {
3425         struct lprocfs_static_vars lvars = { 0 };
3426         struct client_obd          *cli = &obd->u.cli;
3427         void                       *handler;
3428         int                        rc;
3429         ENTRY;
3430
3431         rc = ptlrpcd_addref();
3432         if (rc)
3433                 RETURN(rc);
3434
3435         rc = client_obd_setup(obd, lcfg);
3436         if (rc)
3437                 GOTO(out_ptlrpcd, rc);
3438
3439         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3440         if (IS_ERR(handler))
3441                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3442         cli->cl_writeback_work = handler;
3443
3444         rc = osc_quota_setup(obd);
3445         if (rc)
3446                 GOTO(out_ptlrpcd_work, rc);
3447
3448         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3449         lprocfs_osc_init_vars(&lvars);
3450         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3451                 lproc_osc_attach_seqstat(obd);
3452                 sptlrpc_lprocfs_cliobd_attach(obd);
3453                 ptlrpc_lprocfs_register_obd(obd);
3454         }
3455
3456         /* We need to allocate a few requests more, because
3457          * brw_interpret tries to create new requests before freeing
3458          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3459          * reserved, but I'm afraid that might be too much wasted RAM
3460          * in fact, so 2 is just my guess and still should work. */
3461         cli->cl_import->imp_rq_pool =
3462                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3463                                     OST_MAXREQSIZE,
3464                                     ptlrpc_add_rqs_to_pool);
3465
3466         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3467         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3468         RETURN(rc);
3469
3470 out_ptlrpcd_work:
3471         ptlrpcd_destroy_work(handler);
3472 out_client_setup:
3473         client_obd_cleanup(obd);
3474 out_ptlrpcd:
3475         ptlrpcd_decref();
3476         RETURN(rc);
3477 }
3478
3479 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3480 {
3481         int rc = 0;
3482         ENTRY;
3483
3484         switch (stage) {
3485         case OBD_CLEANUP_EARLY: {
3486                 struct obd_import *imp;
3487                 imp = obd->u.cli.cl_import;
3488                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3489                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3490                 ptlrpc_deactivate_import(imp);
3491                 spin_lock(&imp->imp_lock);
3492                 imp->imp_pingable = 0;
3493                 spin_unlock(&imp->imp_lock);
3494                 break;
3495         }
3496         case OBD_CLEANUP_EXPORTS: {
3497                 struct client_obd *cli = &obd->u.cli;
3498                 /* LU-464
3499                  * for echo client, export may be on zombie list, wait for
3500                  * zombie thread to cull it, because cli.cl_import will be
3501                  * cleared in client_disconnect_export():
3502                  *   class_export_destroy() -> obd_cleanup() ->
3503                  *   echo_device_free() -> echo_client_cleanup() ->
3504                  *   obd_disconnect() -> osc_disconnect() ->
3505                  *   client_disconnect_export()
3506                  */
3507                 obd_zombie_barrier();
3508                 if (cli->cl_writeback_work) {
3509                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3510                         cli->cl_writeback_work = NULL;
3511                 }
3512                 obd_cleanup_client_import(obd);
3513                 ptlrpc_lprocfs_unregister_obd(obd);
3514                 lprocfs_obd_cleanup(obd);
3515                 rc = obd_llog_finish(obd, 0);
3516                 if (rc != 0)
3517                         CERROR("failed to cleanup llogging subsystems\n");
3518                 break;
3519                 }
3520         }
3521         RETURN(rc);
3522 }
3523
3524 int osc_cleanup(struct obd_device *obd)
3525 {
3526         struct client_obd *cli = &obd->u.cli;
3527         int rc;
3528
3529         ENTRY;
3530
3531         /* lru cleanup */
3532         if (cli->cl_cache != NULL) {
3533                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3534                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3535                 cfs_list_del_init(&cli->cl_lru_osc);
3536                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3537                 cli->cl_lru_left = NULL;
3538                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3539                 cli->cl_cache = NULL;
3540         }
3541
3542         /* free memory of osc quota cache */
3543         osc_quota_cleanup(obd);
3544
3545         rc = client_obd_cleanup(obd);
3546
3547         ptlrpcd_decref();
3548         RETURN(rc);
3549 }
3550
3551 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3552 {
3553         struct lprocfs_static_vars lvars = { 0 };
3554         int rc = 0;
3555
3556         lprocfs_osc_init_vars(&lvars);
3557
3558         switch (lcfg->lcfg_command) {
3559         default:
3560                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3561                                               lcfg, obd);
3562                 if (rc > 0)
3563                         rc = 0;
3564                 break;
3565         }
3566
3567         return(rc);
3568 }
3569
3570 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3571 {
3572         return osc_process_config_base(obd, buf);
3573 }
3574
3575 struct obd_ops osc_obd_ops = {
3576         .o_owner                = THIS_MODULE,
3577         .o_setup                = osc_setup,
3578         .o_precleanup           = osc_precleanup,
3579         .o_cleanup              = osc_cleanup,
3580         .o_add_conn             = client_import_add_conn,
3581         .o_del_conn             = client_import_del_conn,
3582         .o_connect              = client_connect_import,
3583         .o_reconnect            = osc_reconnect,
3584         .o_disconnect           = osc_disconnect,
3585         .o_statfs               = osc_statfs,
3586         .o_statfs_async         = osc_statfs_async,
3587         .o_packmd               = osc_packmd,
3588         .o_unpackmd             = osc_unpackmd,
3589         .o_create               = osc_create,
3590         .o_destroy              = osc_destroy,
3591         .o_getattr              = osc_getattr,
3592         .o_getattr_async        = osc_getattr_async,
3593         .o_setattr              = osc_setattr,
3594         .o_setattr_async        = osc_setattr_async,
3595         .o_brw                  = osc_brw,
3596         .o_punch                = osc_punch,
3597         .o_sync                 = osc_sync,
3598         .o_enqueue              = osc_enqueue,
3599         .o_change_cbdata        = osc_change_cbdata,
3600         .o_find_cbdata          = osc_find_cbdata,
3601         .o_cancel               = osc_cancel,
3602         .o_cancel_unused        = osc_cancel_unused,
3603         .o_iocontrol            = osc_iocontrol,
3604         .o_get_info             = osc_get_info,
3605         .o_set_info_async       = osc_set_info_async,
3606         .o_import_event         = osc_import_event,
3607         .o_llog_init            = osc_llog_init,
3608         .o_llog_finish          = osc_llog_finish,
3609         .o_process_config       = osc_process_config,
3610         .o_quotactl             = osc_quotactl,
3611         .o_quotacheck           = osc_quotacheck,
3612 };
3613
3614 extern struct lu_kmem_descr osc_caches[];
3615 extern spinlock_t osc_ast_guard;
3616 extern struct lock_class_key osc_ast_guard_class;
3617
3618 int __init osc_init(void)
3619 {
3620         struct lprocfs_static_vars lvars = { 0 };
3621         int rc;
3622         ENTRY;
3623
3624         /* print an address of _any_ initialized kernel symbol from this
3625          * module, to allow debugging with gdb that doesn't support data
3626          * symbols from modules.*/
3627         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3628
3629         rc = lu_kmem_init(osc_caches);
3630
3631         lprocfs_osc_init_vars(&lvars);
3632
3633         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3634                                  LUSTRE_OSC_NAME, &osc_device_type);
3635         if (rc) {
3636                 lu_kmem_fini(osc_caches);
3637                 RETURN(rc);
3638         }
3639
3640         spin_lock_init(&osc_ast_guard);
3641         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3642
3643         RETURN(rc);
3644 }
3645
3646 #ifdef __KERNEL__
3647 static void /*__exit*/ osc_exit(void)
3648 {
3649         class_unregister_type(LUSTRE_OSC_NAME);
3650         lu_kmem_fini(osc_caches);
3651 }
3652
3653 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3654 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3655 MODULE_LICENSE("GPL");
3656
3657 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3658 #endif