Whamcloud - gitweb
LU-2513 osc: compute grant targets in bytes
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && lsm->lsm_object_id == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm != NULL) {
96                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         struct obd_import *imp = class_exp2cliimp(exp);
109         ENTRY;
110
111         if (lmm != NULL) {
112                 if (lmm_bytes < sizeof(*lmm)) {
113                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
114                                exp->exp_obd->obd_name, lmm_bytes,
115                                (int)sizeof(*lmm));
116                         RETURN(-EINVAL);
117                 }
118                 /* XXX LOV_MAGIC etc check? */
119
120                 if (unlikely(lmm->lmm_object_id == 0)) {
121                         CERROR("%s: zero lmm_object_id\n",
122                                exp->exp_obd->obd_name);
123                         RETURN(-EINVAL);
124                 }
125         }
126
127         lsm_size = lov_stripe_md_size(1);
128         if (lsmp == NULL)
129                 RETURN(lsm_size);
130
131         if (*lsmp != NULL && lmm == NULL) {
132                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
133                 OBD_FREE(*lsmp, lsm_size);
134                 *lsmp = NULL;
135                 RETURN(0);
136         }
137
138         if (*lsmp == NULL) {
139                 OBD_ALLOC(*lsmp, lsm_size);
140                 if (unlikely(*lsmp == NULL))
141                         RETURN(-ENOMEM);
142                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
143                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
144                         OBD_FREE(*lsmp, lsm_size);
145                         RETURN(-ENOMEM);
146                 }
147                 loi_init((*lsmp)->lsm_oinfo[0]);
148         } else if (unlikely((*lsmp)->lsm_object_id == 0)) {
149                 RETURN(-EBADF);
150         }
151
152         if (lmm != NULL) {
153                 /* XXX zero *lsmp? */
154                 (*lsmp)->lsm_object_id = le64_to_cpu(lmm->lmm_object_id);
155                 (*lsmp)->lsm_object_seq = le64_to_cpu(lmm->lmm_object_seq);
156         }
157
158         if (imp != NULL &&
159             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
160                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
161         else
162                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
163
164         RETURN(lsm_size);
165 }
166
167 static inline void osc_pack_capa(struct ptlrpc_request *req,
168                                  struct ost_body *body, void *capa)
169 {
170         struct obd_capa *oc = (struct obd_capa *)capa;
171         struct lustre_capa *c;
172
173         if (!capa)
174                 return;
175
176         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
177         LASSERT(c);
178         capa_cpy(c, oc);
179         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
180         DEBUG_CAPA(D_SEC, c, "pack");
181 }
182
183 static inline void osc_pack_req_body(struct ptlrpc_request *req,
184                                      struct obd_info *oinfo)
185 {
186         struct ost_body *body;
187
188         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
189         LASSERT(body);
190
191         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
192         osc_pack_capa(req, body, oinfo->oi_capa);
193 }
194
195 static inline void osc_set_capa_size(struct ptlrpc_request *req,
196                                      const struct req_msg_field *field,
197                                      struct obd_capa *oc)
198 {
199         if (oc == NULL)
200                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
201         else
202                 /* it is already calculated as sizeof struct obd_capa */
203                 ;
204 }
205
206 static int osc_getattr_interpret(const struct lu_env *env,
207                                  struct ptlrpc_request *req,
208                                  struct osc_async_args *aa, int rc)
209 {
210         struct ost_body *body;
211         ENTRY;
212
213         if (rc != 0)
214                 GOTO(out, rc);
215
216         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217         if (body) {
218                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
219                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
220
221                 /* This should really be sent by the OST */
222                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
223                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
224         } else {
225                 CDEBUG(D_INFO, "can't unpack ost_body\n");
226                 rc = -EPROTO;
227                 aa->aa_oi->oi_oa->o_valid = 0;
228         }
229 out:
230         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231         RETURN(rc);
232 }
233
234 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
235                              struct ptlrpc_request_set *set)
236 {
237         struct ptlrpc_request *req;
238         struct osc_async_args *aa;
239         int                    rc;
240         ENTRY;
241
242         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243         if (req == NULL)
244                 RETURN(-ENOMEM);
245
246         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
247         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
248         if (rc) {
249                 ptlrpc_request_free(req);
250                 RETURN(rc);
251         }
252
253         osc_pack_req_body(req, oinfo);
254
255         ptlrpc_request_set_replen(req);
256         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
257
258         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
259         aa = ptlrpc_req_async_args(req);
260         aa->aa_oi = oinfo;
261
262         ptlrpc_set_add_req(set, req);
263         RETURN(0);
264 }
265
266 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
267                        struct obd_info *oinfo)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body       *body;
271         int                    rc;
272         ENTRY;
273
274         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275         if (req == NULL)
276                 RETURN(-ENOMEM);
277
278         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
280         if (rc) {
281                 ptlrpc_request_free(req);
282                 RETURN(rc);
283         }
284
285         osc_pack_req_body(req, oinfo);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out, rc = -EPROTO);
296
297         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
299
300         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
301         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302
303         EXIT;
304  out:
305         ptlrpc_req_finished(req);
306         return rc;
307 }
308
309 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
310                        struct obd_info *oinfo, struct obd_trans_info *oti)
311 {
312         struct ptlrpc_request *req;
313         struct ost_body       *body;
314         int                    rc;
315         ENTRY;
316
317         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
318
319         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320         if (req == NULL)
321                 RETURN(-ENOMEM);
322
323         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
325         if (rc) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329
330         osc_pack_req_body(req, oinfo);
331
332         ptlrpc_request_set_replen(req);
333
334         rc = ptlrpc_queue_wait(req);
335         if (rc)
336                 GOTO(out, rc);
337
338         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
339         if (body == NULL)
340                 GOTO(out, rc = -EPROTO);
341
342         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343
344         EXIT;
345 out:
346         ptlrpc_req_finished(req);
347         RETURN(rc);
348 }
349
350 static int osc_setattr_interpret(const struct lu_env *env,
351                                  struct ptlrpc_request *req,
352                                  struct osc_setattr_args *sa, int rc)
353 {
354         struct ost_body *body;
355         ENTRY;
356
357         if (rc != 0)
358                 GOTO(out, rc);
359
360         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361         if (body == NULL)
362                 GOTO(out, rc = -EPROTO);
363
364         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
365 out:
366         rc = sa->sa_upcall(sa->sa_cookie, rc);
367         RETURN(rc);
368 }
369
370 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
371                            struct obd_trans_info *oti,
372                            obd_enqueue_update_f upcall, void *cookie,
373                            struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request   *req;
376         struct osc_setattr_args *sa;
377         int                      rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
392                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
393
394         osc_pack_req_body(req, oinfo);
395
396         ptlrpc_request_set_replen(req);
397
398         /* do mds to ost setattr asynchronously */
399         if (!rqset) {
400                 /* Do not wait for response. */
401                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
402         } else {
403                 req->rq_interpret_reply =
404                         (ptlrpc_interpterer_t)osc_setattr_interpret;
405
406                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
407                 sa = ptlrpc_req_async_args(req);
408                 sa->sa_oa = oinfo->oi_oa;
409                 sa->sa_upcall = upcall;
410                 sa->sa_cookie = cookie;
411
412                 if (rqset == PTLRPCD_SET)
413                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
414                 else
415                         ptlrpc_set_add_req(rqset, req);
416         }
417
418         RETURN(0);
419 }
420
421 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
422                              struct obd_trans_info *oti,
423                              struct ptlrpc_request_set *rqset)
424 {
425         return osc_setattr_async_base(exp, oinfo, oti,
426                                       oinfo->oi_cb_up, oinfo, rqset);
427 }
428
429 int osc_real_create(struct obd_export *exp, struct obdo *oa,
430                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
431 {
432         struct ptlrpc_request *req;
433         struct ost_body       *body;
434         struct lov_stripe_md  *lsm;
435         int                    rc;
436         ENTRY;
437
438         LASSERT(oa);
439         LASSERT(ea);
440
441         lsm = *ea;
442         if (!lsm) {
443                 rc = obd_alloc_memmd(exp, &lsm);
444                 if (rc < 0)
445                         RETURN(rc);
446         }
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
449         if (req == NULL)
450                 GOTO(out, rc = -ENOMEM);
451
452         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
453         if (rc) {
454                 ptlrpc_request_free(req);
455                 GOTO(out, rc);
456         }
457
458         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
459         LASSERT(body);
460         lustre_set_wire_obdo(&body->oa, oa);
461
462         ptlrpc_request_set_replen(req);
463
464         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
465             oa->o_flags == OBD_FL_DELORPHAN) {
466                 DEBUG_REQ(D_HA, req,
467                           "delorphan from OST integration");
468                 /* Don't resend the delorphan req */
469                 req->rq_no_resend = req->rq_no_delay = 1;
470         }
471
472         rc = ptlrpc_queue_wait(req);
473         if (rc)
474                 GOTO(out_req, rc);
475
476         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
477         if (body == NULL)
478                 GOTO(out_req, rc = -EPROTO);
479
480         lustre_get_wire_obdo(oa, &body->oa);
481
482         oa->o_blksize = cli_brw_size(exp->exp_obd);
483         oa->o_valid |= OBD_MD_FLBLKSZ;
484
485         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486          * have valid lsm_oinfo data structs, so don't go touching that.
487          * This needs to be fixed in a big way.
488          */
489         lsm->lsm_object_id = oa->o_id;
490         lsm->lsm_object_seq = oa->o_seq;
491         *ea = lsm;
492
493         if (oti != NULL) {
494                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495
496                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
497                         if (!oti->oti_logcookies)
498                                 oti_alloc_cookies(oti, 1);
499                         *oti->oti_logcookies = oa->o_lcookie;
500                 }
501         }
502
503         CDEBUG(D_HA, "transno: "LPD64"\n",
504                lustre_msg_get_transno(req->rq_repmsg));
505 out_req:
506         ptlrpc_req_finished(req);
507 out:
508         if (rc && !*ea)
509                 obd_free_memmd(exp, &lsm);
510         RETURN(rc);
511 }
512
513 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
514                    obd_enqueue_update_f upcall, void *cookie,
515                    struct ptlrpc_request_set *rqset)
516 {
517         struct ptlrpc_request   *req;
518         struct osc_setattr_args *sa;
519         struct ost_body         *body;
520         int                      rc;
521         ENTRY;
522
523         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
524         if (req == NULL)
525                 RETURN(-ENOMEM);
526
527         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
528         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529         if (rc) {
530                 ptlrpc_request_free(req);
531                 RETURN(rc);
532         }
533         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
534         ptlrpc_at_set_req_timeout(req);
535
536         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537         LASSERT(body);
538         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
539         osc_pack_capa(req, body, oinfo->oi_capa);
540
541         ptlrpc_request_set_replen(req);
542
543         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545         sa = ptlrpc_req_async_args(req);
546         sa->sa_oa     = oinfo->oi_oa;
547         sa->sa_upcall = upcall;
548         sa->sa_cookie = cookie;
549         if (rqset == PTLRPCD_SET)
550                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551         else
552                 ptlrpc_set_add_req(rqset, req);
553
554         RETURN(0);
555 }
556
557 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558                      struct obd_info *oinfo, struct obd_trans_info *oti,
559                      struct ptlrpc_request_set *rqset)
560 {
561         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
562         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564         return osc_punch_base(exp, oinfo,
565                               oinfo->oi_cb_up, oinfo, rqset);
566 }
567
568 static int osc_sync_interpret(const struct lu_env *env,
569                               struct ptlrpc_request *req,
570                               void *arg, int rc)
571 {
572         struct osc_fsync_args *fa = arg;
573         struct ost_body *body;
574         ENTRY;
575
576         if (rc)
577                 GOTO(out, rc);
578
579         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580         if (body == NULL) {
581                 CERROR ("can't unpack ost_body\n");
582                 GOTO(out, rc = -EPROTO);
583         }
584
585         *fa->fa_oi->oi_oa = body->oa;
586 out:
587         rc = fa->fa_upcall(fa->fa_cookie, rc);
588         RETURN(rc);
589 }
590
591 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
592                   obd_enqueue_update_f upcall, void *cookie,
593                   struct ptlrpc_request_set *rqset)
594 {
595         struct ptlrpc_request *req;
596         struct ost_body       *body;
597         struct osc_fsync_args *fa;
598         int                    rc;
599         ENTRY;
600
601         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
602         if (req == NULL)
603                 RETURN(-ENOMEM);
604
605         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
606         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
607         if (rc) {
608                 ptlrpc_request_free(req);
609                 RETURN(rc);
610         }
611
612         /* overload the size and blocks fields in the oa with start/end */
613         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614         LASSERT(body);
615         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
616         osc_pack_capa(req, body, oinfo->oi_capa);
617
618         ptlrpc_request_set_replen(req);
619         req->rq_interpret_reply = osc_sync_interpret;
620
621         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
622         fa = ptlrpc_req_async_args(req);
623         fa->fa_oi = oinfo;
624         fa->fa_upcall = upcall;
625         fa->fa_cookie = cookie;
626
627         if (rqset == PTLRPCD_SET)
628                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
629         else
630                 ptlrpc_set_add_req(rqset, req);
631
632         RETURN (0);
633 }
634
635 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
636                     struct obd_info *oinfo, obd_size start, obd_size end,
637                     struct ptlrpc_request_set *set)
638 {
639         ENTRY;
640
641         if (!oinfo->oi_oa) {
642                 CDEBUG(D_INFO, "oa NULL\n");
643                 RETURN(-EINVAL);
644         }
645
646         oinfo->oi_oa->o_size = start;
647         oinfo->oi_oa->o_blocks = end;
648         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
649
650         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
651 }
652
653 /* Find and cancel locally locks matched by @mode in the resource found by
654  * @objid. Found locks are added into @cancel list. Returns the amount of
655  * locks added to @cancels list. */
656 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
657                                    cfs_list_t *cancels,
658                                    ldlm_mode_t mode, int lock_flags)
659 {
660         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
661         struct ldlm_res_id res_id;
662         struct ldlm_resource *res;
663         int count;
664         ENTRY;
665
666         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
667          * export) but disabled through procfs (flag in NS).
668          *
669          * This distinguishes from a case when ELC is not supported originally,
670          * when we still want to cancel locks in advance and just cancel them
671          * locally, without sending any RPC. */
672         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
673                 RETURN(0);
674
675         ostid_build_res_name(&oa->o_oi, &res_id);
676         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
677         if (res == NULL)
678                 RETURN(0);
679
680         LDLM_RESOURCE_ADDREF(res);
681         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
682                                            lock_flags, 0, NULL);
683         LDLM_RESOURCE_DELREF(res);
684         ldlm_resource_putref(res);
685         RETURN(count);
686 }
687
688 static int osc_destroy_interpret(const struct lu_env *env,
689                                  struct ptlrpc_request *req, void *data,
690                                  int rc)
691 {
692         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
693
694         cfs_atomic_dec(&cli->cl_destroy_in_flight);
695         cfs_waitq_signal(&cli->cl_destroy_waitq);
696         return 0;
697 }
698
699 static int osc_can_send_destroy(struct client_obd *cli)
700 {
701         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
702             cli->cl_max_rpcs_in_flight) {
703                 /* The destroy request can be sent */
704                 return 1;
705         }
706         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
707             cli->cl_max_rpcs_in_flight) {
708                 /*
709                  * The counter has been modified between the two atomic
710                  * operations.
711                  */
712                 cfs_waitq_signal(&cli->cl_destroy_waitq);
713         }
714         return 0;
715 }
716
717 int osc_create(const struct lu_env *env, struct obd_export *exp,
718                struct obdo *oa, struct lov_stripe_md **ea,
719                struct obd_trans_info *oti)
720 {
721         int rc = 0;
722         ENTRY;
723
724         LASSERT(oa);
725         LASSERT(ea);
726         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
727
728         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
729             oa->o_flags == OBD_FL_RECREATE_OBJS) {
730                 RETURN(osc_real_create(exp, oa, ea, oti));
731         }
732
733         if (!fid_seq_is_mdt(oa->o_seq))
734                 RETURN(osc_real_create(exp, oa, ea, oti));
735
736         /* we should not get here anymore */
737         LBUG();
738
739         RETURN(rc);
740 }
741
742 /* Destroy requests can be async always on the client, and we don't even really
743  * care about the return code since the client cannot do anything at all about
744  * a destroy failure.
745  * When the MDS is unlinking a filename, it saves the file objects into a
746  * recovery llog, and these object records are cancelled when the OST reports
747  * they were destroyed and sync'd to disk (i.e. transaction committed).
748  * If the client dies, or the OST is down when the object should be destroyed,
749  * the records are not cancelled, and when the OST reconnects to the MDS next,
750  * it will retrieve the llog unlink logs and then sends the log cancellation
751  * cookies to the MDS after committing destroy transactions. */
752 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
753                        struct obdo *oa, struct lov_stripe_md *ea,
754                        struct obd_trans_info *oti, struct obd_export *md_export,
755                        void *capa)
756 {
757         struct client_obd     *cli = &exp->exp_obd->u.cli;
758         struct ptlrpc_request *req;
759         struct ost_body       *body;
760         CFS_LIST_HEAD(cancels);
761         int rc, count;
762         ENTRY;
763
764         if (!oa) {
765                 CDEBUG(D_INFO, "oa NULL\n");
766                 RETURN(-EINVAL);
767         }
768
769         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
770                                         LDLM_FL_DISCARD_DATA);
771
772         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
773         if (req == NULL) {
774                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
775                 RETURN(-ENOMEM);
776         }
777
778         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
779         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
780                                0, &cancels, count);
781         if (rc) {
782                 ptlrpc_request_free(req);
783                 RETURN(rc);
784         }
785
786         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
787         ptlrpc_at_set_req_timeout(req);
788
789         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
790                 oa->o_lcookie = *oti->oti_logcookies;
791         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
792         LASSERT(body);
793         lustre_set_wire_obdo(&body->oa, oa);
794
795         osc_pack_capa(req, body, (struct obd_capa *)capa);
796         ptlrpc_request_set_replen(req);
797
798         /* If osc_destory is for destroying the unlink orphan,
799          * sent from MDT to OST, which should not be blocked here,
800          * because the process might be triggered by ptlrpcd, and
801          * it is not good to block ptlrpcd thread (b=16006)*/
802         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
803                 req->rq_interpret_reply = osc_destroy_interpret;
804                 if (!osc_can_send_destroy(cli)) {
805                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
806                                                           NULL);
807
808                         /*
809                          * Wait until the number of on-going destroy RPCs drops
810                          * under max_rpc_in_flight
811                          */
812                         l_wait_event_exclusive(cli->cl_destroy_waitq,
813                                                osc_can_send_destroy(cli), &lwi);
814                 }
815         }
816
817         /* Do not wait for response */
818         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
819         RETURN(0);
820 }
821
822 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
823                                 long writing_bytes)
824 {
825         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
826
827         LASSERT(!(oa->o_valid & bits));
828
829         oa->o_valid |= bits;
830         client_obd_list_lock(&cli->cl_loi_list_lock);
831         oa->o_dirty = cli->cl_dirty;
832         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
833                      cli->cl_dirty_max)) {
834                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
835                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
836                 oa->o_undirty = 0;
837         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
838                             cfs_atomic_read(&obd_dirty_transit_pages) >
839                             (long)(obd_max_dirty_pages + 1))) {
840                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
841                  * not covered by a lock thus they may safely race and trip
842                  * this CERROR() unless we add in a small fudge factor (+1). */
843                 CERROR("dirty %d - %d > system dirty_max %d\n",
844                        cfs_atomic_read(&obd_dirty_pages),
845                        cfs_atomic_read(&obd_dirty_transit_pages),
846                        obd_max_dirty_pages);
847                 oa->o_undirty = 0;
848         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
849                 CERROR("dirty %lu - dirty_max %lu too big???\n",
850                        cli->cl_dirty, cli->cl_dirty_max);
851                 oa->o_undirty = 0;
852         } else {
853                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
854                                       CFS_PAGE_SHIFT)*
855                                      (cli->cl_max_rpcs_in_flight + 1);
856                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
857         }
858         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
859         oa->o_dropped = cli->cl_lost_grant;
860         cli->cl_lost_grant = 0;
861         client_obd_list_unlock(&cli->cl_loi_list_lock);
862         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
863                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
864
865 }
866
867 void osc_update_next_shrink(struct client_obd *cli)
868 {
869         cli->cl_next_shrink_grant =
870                 cfs_time_shift(cli->cl_grant_shrink_interval);
871         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
872                cli->cl_next_shrink_grant);
873 }
874
875 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         cli->cl_avail_grant += grant;
879         client_obd_list_unlock(&cli->cl_loi_list_lock);
880 }
881
882 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
883 {
884         if (body->oa.o_valid & OBD_MD_FLGRANT) {
885                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
886                 __osc_update_grant(cli, body->oa.o_grant);
887         }
888 }
889
890 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
891                               obd_count keylen, void *key, obd_count vallen,
892                               void *val, struct ptlrpc_request_set *set);
893
894 static int osc_shrink_grant_interpret(const struct lu_env *env,
895                                       struct ptlrpc_request *req,
896                                       void *aa, int rc)
897 {
898         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
899         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
900         struct ost_body *body;
901
902         if (rc != 0) {
903                 __osc_update_grant(cli, oa->o_grant);
904                 GOTO(out, rc);
905         }
906
907         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
908         LASSERT(body);
909         osc_update_grant(cli, body);
910 out:
911         OBDO_FREE(oa);
912         return rc;
913 }
914
915 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
916 {
917         client_obd_list_lock(&cli->cl_loi_list_lock);
918         oa->o_grant = cli->cl_avail_grant / 4;
919         cli->cl_avail_grant -= oa->o_grant;
920         client_obd_list_unlock(&cli->cl_loi_list_lock);
921         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
922                 oa->o_valid |= OBD_MD_FLFLAGS;
923                 oa->o_flags = 0;
924         }
925         oa->o_flags |= OBD_FL_SHRINK_GRANT;
926         osc_update_next_shrink(cli);
927 }
928
929 /* Shrink the current grant, either from some large amount to enough for a
930  * full set of in-flight RPCs, or if we have already shrunk to that limit
931  * then to enough for a single RPC.  This avoids keeping more grant than
932  * needed, and avoids shrinking the grant piecemeal. */
933 static int osc_shrink_grant(struct client_obd *cli)
934 {
935         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
936                              (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
937
938         client_obd_list_lock(&cli->cl_loi_list_lock);
939         if (cli->cl_avail_grant <= target_bytes)
940                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
941         client_obd_list_unlock(&cli->cl_loi_list_lock);
942
943         return osc_shrink_grant_to_target(cli, target_bytes);
944 }
945
946 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
947 {
948         int                     rc = 0;
949         struct ost_body        *body;
950         ENTRY;
951
952         client_obd_list_lock(&cli->cl_loi_list_lock);
953         /* Don't shrink if we are already above or below the desired limit
954          * We don't want to shrink below a single RPC, as that will negatively
955          * impact block allocation and long-term performance. */
956         if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
957                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
958
959         if (target_bytes >= cli->cl_avail_grant) {
960                 client_obd_list_unlock(&cli->cl_loi_list_lock);
961                 RETURN(0);
962         }
963         client_obd_list_unlock(&cli->cl_loi_list_lock);
964
965         OBD_ALLOC_PTR(body);
966         if (!body)
967                 RETURN(-ENOMEM);
968
969         osc_announce_cached(cli, &body->oa, 0);
970
971         client_obd_list_lock(&cli->cl_loi_list_lock);
972         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
973         cli->cl_avail_grant = target_bytes;
974         client_obd_list_unlock(&cli->cl_loi_list_lock);
975         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976                 body->oa.o_valid |= OBD_MD_FLFLAGS;
977                 body->oa.o_flags = 0;
978         }
979         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980         osc_update_next_shrink(cli);
981
982         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984                                 sizeof(*body), body, NULL);
985         if (rc != 0)
986                 __osc_update_grant(cli, body->oa.o_grant);
987         OBD_FREE_PTR(body);
988         RETURN(rc);
989 }
990
991 static int osc_should_shrink_grant(struct client_obd *client)
992 {
993         cfs_time_t time = cfs_time_current();
994         cfs_time_t next_shrink = client->cl_next_shrink_grant;
995
996         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997              OBD_CONNECT_GRANT_SHRINK) == 0)
998                 return 0;
999
1000         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001                 /* Get the current RPC size directly, instead of going via:
1002                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1003                  * Keep comment here so that it can be found by searching. */
1004                 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1005
1006                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1007                     client->cl_avail_grant > brw_size)
1008                         return 1;
1009                 else
1010                         osc_update_next_shrink(client);
1011         }
1012         return 0;
1013 }
1014
1015 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1016 {
1017         struct client_obd *client;
1018
1019         cfs_list_for_each_entry(client, &item->ti_obd_list,
1020                                 cl_grant_shrink_list) {
1021                 if (osc_should_shrink_grant(client))
1022                         osc_shrink_grant(client);
1023         }
1024         return 0;
1025 }
1026
1027 static int osc_add_shrink_grant(struct client_obd *client)
1028 {
1029         int rc;
1030
1031         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1032                                        TIMEOUT_GRANT,
1033                                        osc_grant_shrink_grant_cb, NULL,
1034                                        &client->cl_grant_shrink_list);
1035         if (rc) {
1036                 CERROR("add grant client %s error %d\n",
1037                         client->cl_import->imp_obd->obd_name, rc);
1038                 return rc;
1039         }
1040         CDEBUG(D_CACHE, "add grant client %s \n",
1041                client->cl_import->imp_obd->obd_name);
1042         osc_update_next_shrink(client);
1043         return 0;
1044 }
1045
1046 static int osc_del_shrink_grant(struct client_obd *client)
1047 {
1048         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1049                                          TIMEOUT_GRANT);
1050 }
1051
1052 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1053 {
1054         /*
1055          * ocd_grant is the total grant amount we're expect to hold: if we've
1056          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1057          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1058          *
1059          * race is tolerable here: if we're evicted, but imp_state already
1060          * left EVICTED state, then cl_dirty must be 0 already.
1061          */
1062         client_obd_list_lock(&cli->cl_loi_list_lock);
1063         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1064                 cli->cl_avail_grant = ocd->ocd_grant;
1065         else
1066                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1067
1068         if (cli->cl_avail_grant < 0) {
1069                 CWARN("%s: available grant < 0, the OSS is probably not running"
1070                       " with patch from bug20278 (%ld) \n",
1071                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1072                 /* workaround for 1.6 servers which do not have
1073                  * the patch from bug20278 */
1074                 cli->cl_avail_grant = ocd->ocd_grant;
1075         }
1076
1077         /* determine the appropriate chunk size used by osc_extent. */
1078         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1079         client_obd_list_unlock(&cli->cl_loi_list_lock);
1080
1081         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1082                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1083                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1084
1085         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1086             cfs_list_empty(&cli->cl_grant_shrink_list))
1087                 osc_add_shrink_grant(cli);
1088 }
1089
1090 /* We assume that the reason this OSC got a short read is because it read
1091  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1092  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1093  * this stripe never got written at or beyond this stripe offset yet. */
1094 static void handle_short_read(int nob_read, obd_count page_count,
1095                               struct brw_page **pga)
1096 {
1097         char *ptr;
1098         int i = 0;
1099
1100         /* skip bytes read OK */
1101         while (nob_read > 0) {
1102                 LASSERT (page_count > 0);
1103
1104                 if (pga[i]->count > nob_read) {
1105                         /* EOF inside this page */
1106                         ptr = cfs_kmap(pga[i]->pg) +
1107                                 (pga[i]->off & ~CFS_PAGE_MASK);
1108                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1109                         cfs_kunmap(pga[i]->pg);
1110                         page_count--;
1111                         i++;
1112                         break;
1113                 }
1114
1115                 nob_read -= pga[i]->count;
1116                 page_count--;
1117                 i++;
1118         }
1119
1120         /* zero remaining pages */
1121         while (page_count-- > 0) {
1122                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1123                 memset(ptr, 0, pga[i]->count);
1124                 cfs_kunmap(pga[i]->pg);
1125                 i++;
1126         }
1127 }
1128
1129 static int check_write_rcs(struct ptlrpc_request *req,
1130                            int requested_nob, int niocount,
1131                            obd_count page_count, struct brw_page **pga)
1132 {
1133         int     i;
1134         __u32   *remote_rcs;
1135
1136         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1137                                                   sizeof(*remote_rcs) *
1138                                                   niocount);
1139         if (remote_rcs == NULL) {
1140                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1141                 return(-EPROTO);
1142         }
1143
1144         /* return error if any niobuf was in error */
1145         for (i = 0; i < niocount; i++) {
1146                 if ((int)remote_rcs[i] < 0)
1147                         return(remote_rcs[i]);
1148
1149                 if (remote_rcs[i] != 0) {
1150                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151                                 i, remote_rcs[i], req);
1152                         return(-EPROTO);
1153                 }
1154         }
1155
1156         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158                        req->rq_bulk->bd_nob_transferred, requested_nob);
1159                 return(-EPROTO);
1160         }
1161
1162         return (0);
1163 }
1164
1165 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1166 {
1167         if (p1->flag != p2->flag) {
1168                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1169                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1170
1171                 /* warn if we try to combine flags that we don't know to be
1172                  * safe to combine */
1173                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1175                               "report this at http://bugs.whamcloud.com/\n",
1176                               p1->flag, p2->flag);
1177                 }
1178                 return 0;
1179         }
1180
1181         return (p1->off + p1->count == p2->off);
1182 }
1183
1184 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1185                                    struct brw_page **pga, int opc,
1186                                    cksum_type_t cksum_type)
1187 {
1188         __u32                           cksum;
1189         int                             i = 0;
1190         struct cfs_crypto_hash_desc     *hdesc;
1191         unsigned int                    bufsize;
1192         int                             err;
1193         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1194
1195         LASSERT(pg_count > 0);
1196
1197         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1198         if (IS_ERR(hdesc)) {
1199                 CERROR("Unable to initialize checksum hash %s\n",
1200                        cfs_crypto_hash_name(cfs_alg));
1201                 return PTR_ERR(hdesc);
1202         }
1203
1204         while (nob > 0 && pg_count > 0) {
1205                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1206
1207                 /* corrupt the data before we compute the checksum, to
1208                  * simulate an OST->client data error */
1209                 if (i == 0 && opc == OST_READ &&
1210                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1211                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1212                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1213                         memcpy(ptr + off, "bad1", min(4, nob));
1214                         cfs_kunmap(pga[i]->pg);
1215                 }
1216                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1217                                   pga[i]->off & ~CFS_PAGE_MASK,
1218                                   count);
1219                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1220                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1221
1222                 nob -= pga[i]->count;
1223                 pg_count--;
1224                 i++;
1225         }
1226
1227         bufsize = 4;
1228         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1229
1230         if (err)
1231                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1232
1233         /* For sending we only compute the wrong checksum instead
1234          * of corrupting the data so it is still correct on a redo */
1235         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1236                 cksum++;
1237
1238         return cksum;
1239 }
1240
1241 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1242                                 struct lov_stripe_md *lsm, obd_count page_count,
1243                                 struct brw_page **pga,
1244                                 struct ptlrpc_request **reqp,
1245                                 struct obd_capa *ocapa, int reserve,
1246                                 int resend)
1247 {
1248         struct ptlrpc_request   *req;
1249         struct ptlrpc_bulk_desc *desc;
1250         struct ost_body         *body;
1251         struct obd_ioobj        *ioobj;
1252         struct niobuf_remote    *niobuf;
1253         int niocount, i, requested_nob, opc, rc;
1254         struct osc_brw_async_args *aa;
1255         struct req_capsule      *pill;
1256         struct brw_page *pg_prev;
1257
1258         ENTRY;
1259         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1260                 RETURN(-ENOMEM); /* Recoverable */
1261         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1262                 RETURN(-EINVAL); /* Fatal */
1263
1264         if ((cmd & OBD_BRW_WRITE) != 0) {
1265                 opc = OST_WRITE;
1266                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1267                                                 cli->cl_import->imp_rq_pool,
1268                                                 &RQF_OST_BRW_WRITE);
1269         } else {
1270                 opc = OST_READ;
1271                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1272         }
1273         if (req == NULL)
1274                 RETURN(-ENOMEM);
1275
1276         for (niocount = i = 1; i < page_count; i++) {
1277                 if (!can_merge_pages(pga[i - 1], pga[i]))
1278                         niocount++;
1279         }
1280
1281         pill = &req->rq_pill;
1282         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1283                              sizeof(*ioobj));
1284         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1285                              niocount * sizeof(*niobuf));
1286         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1287
1288         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1289         if (rc) {
1290                 ptlrpc_request_free(req);
1291                 RETURN(rc);
1292         }
1293         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1294         ptlrpc_at_set_req_timeout(req);
1295         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1296          * retry logic */
1297         req->rq_no_retry_einprogress = 1;
1298
1299         desc = ptlrpc_prep_bulk_imp(req, page_count,
1300                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1301                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1302                 OST_BULK_PORTAL);
1303
1304         if (desc == NULL)
1305                 GOTO(out, rc = -ENOMEM);
1306         /* NB request now owns desc and will free it when it gets freed */
1307
1308         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1309         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1310         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1311         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1312
1313         lustre_set_wire_obdo(&body->oa, oa);
1314
1315         obdo_to_ioobj(oa, ioobj);
1316         ioobj->ioo_bufcnt = niocount;
1317         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1318          * that might be send for this request.  The actual number is decided
1319          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1320          * "max - 1" for old client compatibility sending "0", and also so the
1321          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1322         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1323         osc_pack_capa(req, body, ocapa);
1324         LASSERT(page_count > 0);
1325         pg_prev = pga[0];
1326         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1327                 struct brw_page *pg = pga[i];
1328                 int poff = pg->off & ~CFS_PAGE_MASK;
1329
1330                 LASSERT(pg->count > 0);
1331                 /* make sure there is no gap in the middle of page array */
1332                 LASSERTF(page_count == 1 ||
1333                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1334                           ergo(i > 0 && i < page_count - 1,
1335                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1336                           ergo(i == page_count - 1, poff == 0)),
1337                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1338                          i, page_count, pg, pg->off, pg->count);
1339 #ifdef __linux__
1340                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1341                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1342                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1343                          i, page_count,
1344                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1345                          pg_prev->pg, page_private(pg_prev->pg),
1346                          pg_prev->pg->index, pg_prev->off);
1347 #else
1348                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1349                          "i %d p_c %u\n", i, page_count);
1350 #endif
1351                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1352                         (pg->flag & OBD_BRW_SRVLOCK));
1353
1354                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1355                 requested_nob += pg->count;
1356
1357                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1358                         niobuf--;
1359                         niobuf->len += pg->count;
1360                 } else {
1361                         niobuf->offset = pg->off;
1362                         niobuf->len    = pg->count;
1363                         niobuf->flags  = pg->flag;
1364                 }
1365                 pg_prev = pg;
1366         }
1367
1368         LASSERTF((void *)(niobuf - niocount) ==
1369                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1370                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1371                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1372
1373         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1374         if (resend) {
1375                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1376                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1377                         body->oa.o_flags = 0;
1378                 }
1379                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1380         }
1381
1382         if (osc_should_shrink_grant(cli))
1383                 osc_shrink_grant_local(cli, &body->oa);
1384
1385         /* size[REQ_REC_OFF] still sizeof (*body) */
1386         if (opc == OST_WRITE) {
1387                 if (cli->cl_checksum &&
1388                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389                         /* store cl_cksum_type in a local variable since
1390                          * it can be changed via lprocfs */
1391                         cksum_type_t cksum_type = cli->cl_cksum_type;
1392
1393                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1394                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1395                                 body->oa.o_flags = 0;
1396                         }
1397                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1398                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1400                                                              page_count, pga,
1401                                                              OST_WRITE,
1402                                                              cksum_type);
1403                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1404                                body->oa.o_cksum);
1405                         /* save this in 'oa', too, for later checking */
1406                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1407                         oa->o_flags |= cksum_type_pack(cksum_type);
1408                 } else {
1409                         /* clear out the checksum flag, in case this is a
1410                          * resend but cl_checksum is no longer set. b=11238 */
1411                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1412                 }
1413                 oa->o_cksum = body->oa.o_cksum;
1414                 /* 1 RC per niobuf */
1415                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1416                                      sizeof(__u32) * niocount);
1417         } else {
1418                 if (cli->cl_checksum &&
1419                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1420                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1421                                 body->oa.o_flags = 0;
1422                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1423                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424                 }
1425         }
1426         ptlrpc_request_set_replen(req);
1427
1428         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1429         aa = ptlrpc_req_async_args(req);
1430         aa->aa_oa = oa;
1431         aa->aa_requested_nob = requested_nob;
1432         aa->aa_nio_count = niocount;
1433         aa->aa_page_count = page_count;
1434         aa->aa_resends = 0;
1435         aa->aa_ppga = pga;
1436         aa->aa_cli = cli;
1437         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1438         if (ocapa && reserve)
1439                 aa->aa_ocapa = capa_get(ocapa);
1440
1441         *reqp = req;
1442         RETURN(0);
1443
1444  out:
1445         ptlrpc_req_finished(req);
1446         RETURN(rc);
1447 }
1448
1449 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1450                                 __u32 client_cksum, __u32 server_cksum, int nob,
1451                                 obd_count page_count, struct brw_page **pga,
1452                                 cksum_type_t client_cksum_type)
1453 {
1454         __u32 new_cksum;
1455         char *msg;
1456         cksum_type_t cksum_type;
1457
1458         if (server_cksum == client_cksum) {
1459                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1460                 return 0;
1461         }
1462
1463         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1464                                        oa->o_flags : 0);
1465         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1466                                       cksum_type);
1467
1468         if (cksum_type != client_cksum_type)
1469                 msg = "the server did not use the checksum type specified in "
1470                       "the original request - likely a protocol problem";
1471         else if (new_cksum == server_cksum)
1472                 msg = "changed on the client after we checksummed it - "
1473                       "likely false positive due to mmap IO (bug 11742)";
1474         else if (new_cksum == client_cksum)
1475                 msg = "changed in transit before arrival at OST";
1476         else
1477                 msg = "changed in transit AND doesn't match the original - "
1478                       "likely false positive due to mmap IO (bug 11742)";
1479
1480         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1481                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1482                            msg, libcfs_nid2str(peer->nid),
1483                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1484                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1485                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1486                            oa->o_id,
1487                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1488                            pga[0]->off,
1489                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1490         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1491                "client csum now %x\n", client_cksum, client_cksum_type,
1492                server_cksum, cksum_type, new_cksum);
1493         return 1;
1494 }
1495
1496 /* Note rc enters this function as number of bytes transferred */
1497 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1498 {
1499         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1500         const lnet_process_id_t *peer =
1501                         &req->rq_import->imp_connection->c_peer;
1502         struct client_obd *cli = aa->aa_cli;
1503         struct ost_body *body;
1504         __u32 client_cksum = 0;
1505         ENTRY;
1506
1507         if (rc < 0 && rc != -EDQUOT) {
1508                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1509                 RETURN(rc);
1510         }
1511
1512         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1514         if (body == NULL) {
1515                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1516                 RETURN(-EPROTO);
1517         }
1518
1519         /* set/clear over quota flag for a uid/gid */
1520         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1521             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1522                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1523
1524                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1525                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1526                        body->oa.o_flags);
1527                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1528         }
1529
1530         osc_update_grant(cli, body);
1531
1532         if (rc < 0)
1533                 RETURN(rc);
1534
1535         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1536                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1537
1538         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1539                 if (rc > 0) {
1540                         CERROR("Unexpected +ve rc %d\n", rc);
1541                         RETURN(-EPROTO);
1542                 }
1543                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1544
1545                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1546                         RETURN(-EAGAIN);
1547
1548                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1549                     check_write_checksum(&body->oa, peer, client_cksum,
1550                                          body->oa.o_cksum, aa->aa_requested_nob,
1551                                          aa->aa_page_count, aa->aa_ppga,
1552                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1553                         RETURN(-EAGAIN);
1554
1555                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1556                                      aa->aa_page_count, aa->aa_ppga);
1557                 GOTO(out, rc);
1558         }
1559
1560         /* The rest of this function executes only for OST_READs */
1561
1562         /* if unwrap_bulk failed, return -EAGAIN to retry */
1563         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1564         if (rc < 0)
1565                 GOTO(out, rc = -EAGAIN);
1566
1567         if (rc > aa->aa_requested_nob) {
1568                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1569                        aa->aa_requested_nob);
1570                 RETURN(-EPROTO);
1571         }
1572
1573         if (rc != req->rq_bulk->bd_nob_transferred) {
1574                 CERROR ("Unexpected rc %d (%d transferred)\n",
1575                         rc, req->rq_bulk->bd_nob_transferred);
1576                 return (-EPROTO);
1577         }
1578
1579         if (rc < aa->aa_requested_nob)
1580                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1581
1582         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1583                 static int cksum_counter;
1584                 __u32      server_cksum = body->oa.o_cksum;
1585                 char      *via;
1586                 char      *router;
1587                 cksum_type_t cksum_type;
1588
1589                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1590                                                body->oa.o_flags : 0);
1591                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1592                                                  aa->aa_ppga, OST_READ,
1593                                                  cksum_type);
1594
1595                 if (peer->nid == req->rq_bulk->bd_sender) {
1596                         via = router = "";
1597                 } else {
1598                         via = " via ";
1599                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1600                 }
1601
1602                 if (server_cksum == ~0 && rc > 0) {
1603                         CERROR("Protocol error: server %s set the 'checksum' "
1604                                "bit, but didn't send a checksum.  Not fatal, "
1605                                "but please notify on http://bugs.whamcloud.com/\n",
1606                                libcfs_nid2str(peer->nid));
1607                 } else if (server_cksum != client_cksum) {
1608                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1609                                            "%s%s%s inode "DFID" object "
1610                                            LPU64"/"LPU64" extent "
1611                                            "["LPU64"-"LPU64"]\n",
1612                                            req->rq_import->imp_obd->obd_name,
1613                                            libcfs_nid2str(peer->nid),
1614                                            via, router,
1615                                            body->oa.o_valid & OBD_MD_FLFID ?
1616                                                 body->oa.o_parent_seq : (__u64)0,
1617                                            body->oa.o_valid & OBD_MD_FLFID ?
1618                                                 body->oa.o_parent_oid : 0,
1619                                            body->oa.o_valid & OBD_MD_FLFID ?
1620                                                 body->oa.o_parent_ver : 0,
1621                                            body->oa.o_id,
1622                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1623                                                 body->oa.o_seq : (__u64)0,
1624                                            aa->aa_ppga[0]->off,
1625                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1626                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1627                                                                         1);
1628                         CERROR("client %x, server %x, cksum_type %x\n",
1629                                client_cksum, server_cksum, cksum_type);
1630                         cksum_counter = 0;
1631                         aa->aa_oa->o_cksum = client_cksum;
1632                         rc = -EAGAIN;
1633                 } else {
1634                         cksum_counter++;
1635                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1636                         rc = 0;
1637                 }
1638         } else if (unlikely(client_cksum)) {
1639                 static int cksum_missed;
1640
1641                 cksum_missed++;
1642                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1643                         CERROR("Checksum %u requested from %s but not sent\n",
1644                                cksum_missed, libcfs_nid2str(peer->nid));
1645         } else {
1646                 rc = 0;
1647         }
1648 out:
1649         if (rc >= 0)
1650                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1651
1652         RETURN(rc);
1653 }
1654
1655 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1656                             struct lov_stripe_md *lsm,
1657                             obd_count page_count, struct brw_page **pga,
1658                             struct obd_capa *ocapa)
1659 {
1660         struct ptlrpc_request *req;
1661         int                    rc;
1662         cfs_waitq_t            waitq;
1663         int                    generation, resends = 0;
1664         struct l_wait_info     lwi;
1665
1666         ENTRY;
1667
1668         cfs_waitq_init(&waitq);
1669         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1670
1671 restart_bulk:
1672         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1673                                   page_count, pga, &req, ocapa, 0, resends);
1674         if (rc != 0)
1675                 return (rc);
1676
1677         if (resends) {
1678                 req->rq_generation_set = 1;
1679                 req->rq_import_generation = generation;
1680                 req->rq_sent = cfs_time_current_sec() + resends;
1681         }
1682
1683         rc = ptlrpc_queue_wait(req);
1684
1685         if (rc == -ETIMEDOUT && req->rq_resend) {
1686                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1687                 ptlrpc_req_finished(req);
1688                 goto restart_bulk;
1689         }
1690
1691         rc = osc_brw_fini_request(req, rc);
1692
1693         ptlrpc_req_finished(req);
1694         /* When server return -EINPROGRESS, client should always retry
1695          * regardless of the number of times the bulk was resent already.*/
1696         if (osc_recoverable_error(rc)) {
1697                 resends++;
1698                 if (rc != -EINPROGRESS &&
1699                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1700                         CERROR("%s: too many resend retries for object: "
1701                                ""LPU64":"LPU64", rc = %d.\n",
1702                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1703                         goto out;
1704                 }
1705                 if (generation !=
1706                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1707                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1708                                ""LPU64":"LPU64", rc = %d.\n",
1709                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1710                         goto out;
1711                 }
1712
1713                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1714                                        NULL);
1715                 l_wait_event(waitq, 0, &lwi);
1716
1717                 goto restart_bulk;
1718         }
1719 out:
1720         if (rc == -EAGAIN || rc == -EINPROGRESS)
1721                 rc = -EIO;
1722         RETURN (rc);
1723 }
1724
1725 static int osc_brw_redo_request(struct ptlrpc_request *request,
1726                                 struct osc_brw_async_args *aa, int rc)
1727 {
1728         struct ptlrpc_request *new_req;
1729         struct osc_brw_async_args *new_aa;
1730         struct osc_async_page *oap;
1731         ENTRY;
1732
1733         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1734                   "redo for recoverable error %d", rc);
1735
1736         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1737                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1738                                   aa->aa_cli, aa->aa_oa,
1739                                   NULL /* lsm unused by osc currently */,
1740                                   aa->aa_page_count, aa->aa_ppga,
1741                                   &new_req, aa->aa_ocapa, 0, 1);
1742         if (rc)
1743                 RETURN(rc);
1744
1745         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1746                 if (oap->oap_request != NULL) {
1747                         LASSERTF(request == oap->oap_request,
1748                                  "request %p != oap_request %p\n",
1749                                  request, oap->oap_request);
1750                         if (oap->oap_interrupted) {
1751                                 ptlrpc_req_finished(new_req);
1752                                 RETURN(-EINTR);
1753                         }
1754                 }
1755         }
1756         /* New request takes over pga and oaps from old request.
1757          * Note that copying a list_head doesn't work, need to move it... */
1758         aa->aa_resends++;
1759         new_req->rq_interpret_reply = request->rq_interpret_reply;
1760         new_req->rq_async_args = request->rq_async_args;
1761         /* cap resend delay to the current request timeout, this is similar to
1762          * what ptlrpc does (see after_reply()) */
1763         if (aa->aa_resends > new_req->rq_timeout)
1764                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1765         else
1766                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1767         new_req->rq_generation_set = 1;
1768         new_req->rq_import_generation = request->rq_import_generation;
1769
1770         new_aa = ptlrpc_req_async_args(new_req);
1771
1772         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1773         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1774         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1775         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1776         new_aa->aa_resends = aa->aa_resends;
1777
1778         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1779                 if (oap->oap_request) {
1780                         ptlrpc_req_finished(oap->oap_request);
1781                         oap->oap_request = ptlrpc_request_addref(new_req);
1782                 }
1783         }
1784
1785         new_aa->aa_ocapa = aa->aa_ocapa;
1786         aa->aa_ocapa = NULL;
1787
1788         /* XXX: This code will run into problem if we're going to support
1789          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1790          * and wait for all of them to be finished. We should inherit request
1791          * set from old request. */
1792         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1793
1794         DEBUG_REQ(D_INFO, new_req, "new request");
1795         RETURN(0);
1796 }
1797
1798 /*
1799  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1800  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1801  * fine for our small page arrays and doesn't require allocation.  its an
1802  * insertion sort that swaps elements that are strides apart, shrinking the
1803  * stride down until its '1' and the array is sorted.
1804  */
1805 static void sort_brw_pages(struct brw_page **array, int num)
1806 {
1807         int stride, i, j;
1808         struct brw_page *tmp;
1809
1810         if (num == 1)
1811                 return;
1812         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1813                 ;
1814
1815         do {
1816                 stride /= 3;
1817                 for (i = stride ; i < num ; i++) {
1818                         tmp = array[i];
1819                         j = i;
1820                         while (j >= stride && array[j - stride]->off > tmp->off) {
1821                                 array[j] = array[j - stride];
1822                                 j -= stride;
1823                         }
1824                         array[j] = tmp;
1825                 }
1826         } while (stride > 1);
1827 }
1828
1829 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1830 {
1831         int count = 1;
1832         int offset;
1833         int i = 0;
1834
1835         LASSERT (pages > 0);
1836         offset = pg[i]->off & ~CFS_PAGE_MASK;
1837
1838         for (;;) {
1839                 pages--;
1840                 if (pages == 0)         /* that's all */
1841                         return count;
1842
1843                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1844                         return count;   /* doesn't end on page boundary */
1845
1846                 i++;
1847                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1848                 if (offset != 0)        /* doesn't start on page boundary */
1849                         return count;
1850
1851                 count++;
1852         }
1853 }
1854
1855 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1856 {
1857         struct brw_page **ppga;
1858         int i;
1859
1860         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1861         if (ppga == NULL)
1862                 return NULL;
1863
1864         for (i = 0; i < count; i++)
1865                 ppga[i] = pga + i;
1866         return ppga;
1867 }
1868
1869 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1870 {
1871         LASSERT(ppga != NULL);
1872         OBD_FREE(ppga, sizeof(*ppga) * count);
1873 }
1874
1875 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1876                    obd_count page_count, struct brw_page *pga,
1877                    struct obd_trans_info *oti)
1878 {
1879         struct obdo *saved_oa = NULL;
1880         struct brw_page **ppga, **orig;
1881         struct obd_import *imp = class_exp2cliimp(exp);
1882         struct client_obd *cli;
1883         int rc, page_count_orig;
1884         ENTRY;
1885
1886         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1887         cli = &imp->imp_obd->u.cli;
1888
1889         if (cmd & OBD_BRW_CHECK) {
1890                 /* The caller just wants to know if there's a chance that this
1891                  * I/O can succeed */
1892
1893                 if (imp->imp_invalid)
1894                         RETURN(-EIO);
1895                 RETURN(0);
1896         }
1897
1898         /* test_brw with a failed create can trip this, maybe others. */
1899         LASSERT(cli->cl_max_pages_per_rpc);
1900
1901         rc = 0;
1902
1903         orig = ppga = osc_build_ppga(pga, page_count);
1904         if (ppga == NULL)
1905                 RETURN(-ENOMEM);
1906         page_count_orig = page_count;
1907
1908         sort_brw_pages(ppga, page_count);
1909         while (page_count) {
1910                 obd_count pages_per_brw;
1911
1912                 if (page_count > cli->cl_max_pages_per_rpc)
1913                         pages_per_brw = cli->cl_max_pages_per_rpc;
1914                 else
1915                         pages_per_brw = page_count;
1916
1917                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1918
1919                 if (saved_oa != NULL) {
1920                         /* restore previously saved oa */
1921                         *oinfo->oi_oa = *saved_oa;
1922                 } else if (page_count > pages_per_brw) {
1923                         /* save a copy of oa (brw will clobber it) */
1924                         OBDO_ALLOC(saved_oa);
1925                         if (saved_oa == NULL)
1926                                 GOTO(out, rc = -ENOMEM);
1927                         *saved_oa = *oinfo->oi_oa;
1928                 }
1929
1930                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1931                                       pages_per_brw, ppga, oinfo->oi_capa);
1932
1933                 if (rc != 0)
1934                         break;
1935
1936                 page_count -= pages_per_brw;
1937                 ppga += pages_per_brw;
1938         }
1939
1940 out:
1941         osc_release_ppga(orig, page_count_orig);
1942
1943         if (saved_oa != NULL)
1944                 OBDO_FREE(saved_oa);
1945
1946         RETURN(rc);
1947 }
1948
1949 static int brw_interpret(const struct lu_env *env,
1950                          struct ptlrpc_request *req, void *data, int rc)
1951 {
1952         struct osc_brw_async_args *aa = data;
1953         struct osc_extent *ext;
1954         struct osc_extent *tmp;
1955         struct cl_object  *obj = NULL;
1956         struct client_obd *cli = aa->aa_cli;
1957         ENTRY;
1958
1959         rc = osc_brw_fini_request(req, rc);
1960         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1961         /* When server return -EINPROGRESS, client should always retry
1962          * regardless of the number of times the bulk was resent already. */
1963         if (osc_recoverable_error(rc)) {
1964                 if (req->rq_import_generation !=
1965                     req->rq_import->imp_generation) {
1966                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1967                                ""LPU64":"LPU64", rc = %d.\n",
1968                                req->rq_import->imp_obd->obd_name,
1969                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1970                 } else if (rc == -EINPROGRESS ||
1971                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1972                         rc = osc_brw_redo_request(req, aa, rc);
1973                 } else {
1974                         CERROR("%s: too many resent retries for object: "
1975                                ""LPU64":"LPU64", rc = %d.\n",
1976                                req->rq_import->imp_obd->obd_name,
1977                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1978                 }
1979
1980                 if (rc == 0)
1981                         RETURN(0);
1982                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1983                         rc = -EIO;
1984         }
1985
1986         if (aa->aa_ocapa) {
1987                 capa_put(aa->aa_ocapa);
1988                 aa->aa_ocapa = NULL;
1989         }
1990
1991         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1992                 if (obj == NULL && rc == 0) {
1993                         obj = osc2cl(ext->oe_obj);
1994                         cl_object_get(obj);
1995                 }
1996
1997                 cfs_list_del_init(&ext->oe_link);
1998                 osc_extent_finish(env, ext, 1, rc);
1999         }
2000         LASSERT(cfs_list_empty(&aa->aa_exts));
2001         LASSERT(cfs_list_empty(&aa->aa_oaps));
2002
2003         if (obj != NULL) {
2004                 struct obdo *oa = aa->aa_oa;
2005                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
2006                 unsigned long valid = 0;
2007
2008                 LASSERT(rc == 0);
2009                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2010                         attr->cat_blocks = oa->o_blocks;
2011                         valid |= CAT_BLOCKS;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLMTIME) {
2014                         attr->cat_mtime = oa->o_mtime;
2015                         valid |= CAT_MTIME;
2016                 }
2017                 if (oa->o_valid & OBD_MD_FLATIME) {
2018                         attr->cat_atime = oa->o_atime;
2019                         valid |= CAT_ATIME;
2020                 }
2021                 if (oa->o_valid & OBD_MD_FLCTIME) {
2022                         attr->cat_ctime = oa->o_ctime;
2023                         valid |= CAT_CTIME;
2024                 }
2025                 if (valid != 0) {
2026                         cl_object_attr_lock(obj);
2027                         cl_object_attr_set(env, obj, attr, valid);
2028                         cl_object_attr_unlock(obj);
2029                 }
2030                 cl_object_put(env, obj);
2031         }
2032         OBDO_FREE(aa->aa_oa);
2033
2034         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2035                           req->rq_bulk->bd_nob_transferred);
2036         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2037         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2038
2039         client_obd_list_lock(&cli->cl_loi_list_lock);
2040         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2041          * is called so we know whether to go to sync BRWs or wait for more
2042          * RPCs to complete */
2043         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2044                 cli->cl_w_in_flight--;
2045         else
2046                 cli->cl_r_in_flight--;
2047         osc_wake_cache_waiters(cli);
2048         client_obd_list_unlock(&cli->cl_loi_list_lock);
2049
2050         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2051         RETURN(rc);
2052 }
2053
2054 /**
2055  * Build an RPC by the list of extent @ext_list. The caller must ensure
2056  * that the total pages in this list are NOT over max pages per RPC.
2057  * Extents in the list must be in OES_RPC state.
2058  */
2059 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2060                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2061 {
2062         struct ptlrpc_request *req = NULL;
2063         struct osc_extent *ext;
2064         CFS_LIST_HEAD(rpc_list);
2065         struct brw_page **pga = NULL;
2066         struct osc_brw_async_args *aa = NULL;
2067         struct obdo *oa = NULL;
2068         struct osc_async_page *oap;
2069         struct osc_async_page *tmp;
2070         struct cl_req *clerq = NULL;
2071         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2072         struct ldlm_lock *lock = NULL;
2073         struct cl_req_attr crattr;
2074         obd_off starting_offset = OBD_OBJECT_EOF;
2075         obd_off ending_offset = 0;
2076         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2077
2078         ENTRY;
2079         LASSERT(!cfs_list_empty(ext_list));
2080
2081         /* add pages into rpc_list to build BRW rpc */
2082         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2083                 LASSERT(ext->oe_state == OES_RPC);
2084                 mem_tight |= ext->oe_memalloc;
2085                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2086                         ++page_count;
2087                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2088                         if (starting_offset > oap->oap_obj_off)
2089                                 starting_offset = oap->oap_obj_off;
2090                         else
2091                                 LASSERT(oap->oap_page_off == 0);
2092                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2093                                 ending_offset = oap->oap_obj_off +
2094                                                 oap->oap_count;
2095                         else
2096                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2097                                         CFS_PAGE_SIZE);
2098                 }
2099         }
2100
2101         if (mem_tight)
2102                 mpflag = cfs_memory_pressure_get_and_set();
2103
2104         memset(&crattr, 0, sizeof crattr);
2105         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2106         if (pga == NULL)
2107                 GOTO(out, rc = -ENOMEM);
2108
2109         OBDO_ALLOC(oa);
2110         if (oa == NULL)
2111                 GOTO(out, rc = -ENOMEM);
2112
2113         i = 0;
2114         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2115                 struct cl_page *page = oap2cl_page(oap);
2116                 if (clerq == NULL) {
2117                         clerq = cl_req_alloc(env, page, crt,
2118                                              1 /* only 1-object rpcs for
2119                                                 * now */);
2120                         if (IS_ERR(clerq))
2121                                 GOTO(out, rc = PTR_ERR(clerq));
2122                         lock = oap->oap_ldlm_lock;
2123                 }
2124                 if (mem_tight)
2125                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2126                 pga[i] = &oap->oap_brw_page;
2127                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2128                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2129                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2130                 i++;
2131                 cl_req_page_add(env, clerq, page);
2132         }
2133
2134         /* always get the data for the obdo for the rpc */
2135         LASSERT(clerq != NULL);
2136         crattr.cra_oa = oa;
2137         crattr.cra_capa = NULL;
2138         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2139         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2140         if (lock) {
2141                 oa->o_handle = lock->l_remote_handle;
2142                 oa->o_valid |= OBD_MD_FLHANDLE;
2143         }
2144
2145         rc = cl_req_prep(env, clerq);
2146         if (rc != 0) {
2147                 CERROR("cl_req_prep failed: %d\n", rc);
2148                 GOTO(out, rc);
2149         }
2150
2151         sort_brw_pages(pga, page_count);
2152         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2153                         pga, &req, crattr.cra_capa, 1, 0);
2154         if (rc != 0) {
2155                 CERROR("prep_req failed: %d\n", rc);
2156                 GOTO(out, rc);
2157         }
2158
2159         req->rq_interpret_reply = brw_interpret;
2160         if (mem_tight != 0)
2161                 req->rq_memalloc = 1;
2162
2163         /* Need to update the timestamps after the request is built in case
2164          * we race with setattr (locally or in queue at OST).  If OST gets
2165          * later setattr before earlier BRW (as determined by the request xid),
2166          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2167          * way to do this in a single call.  bug 10150 */
2168         cl_req_attr_set(env, clerq, &crattr,
2169                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2170
2171         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2172
2173         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2174         aa = ptlrpc_req_async_args(req);
2175         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2176         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2177         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2178         cfs_list_splice_init(ext_list, &aa->aa_exts);
2179         aa->aa_clerq = clerq;
2180
2181         /* queued sync pages can be torn down while the pages
2182          * were between the pending list and the rpc */
2183         tmp = NULL;
2184         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2185                 /* only one oap gets a request reference */
2186                 if (tmp == NULL)
2187                         tmp = oap;
2188                 if (oap->oap_interrupted && !req->rq_intr) {
2189                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2190                                         oap, req);
2191                         ptlrpc_mark_interrupted(req);
2192                 }
2193         }
2194         if (tmp != NULL)
2195                 tmp->oap_request = ptlrpc_request_addref(req);
2196
2197         client_obd_list_lock(&cli->cl_loi_list_lock);
2198         starting_offset >>= CFS_PAGE_SHIFT;
2199         if (cmd == OBD_BRW_READ) {
2200                 cli->cl_r_in_flight++;
2201                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2202                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2203                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2204                                       starting_offset + 1);
2205         } else {
2206                 cli->cl_w_in_flight++;
2207                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2208                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2209                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2210                                       starting_offset + 1);
2211         }
2212         client_obd_list_unlock(&cli->cl_loi_list_lock);
2213
2214         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2215                   page_count, aa, cli->cl_r_in_flight,
2216                   cli->cl_w_in_flight);
2217
2218         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2219          * see which CPU/NUMA node the majority of pages were allocated
2220          * on, and try to assign the async RPC to the CPU core
2221          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2222          *
2223          * But on the other hand, we expect that multiple ptlrpcd
2224          * threads and the initial write sponsor can run in parallel,
2225          * especially when data checksum is enabled, which is CPU-bound
2226          * operation and single ptlrpcd thread cannot process in time.
2227          * So more ptlrpcd threads sharing BRW load
2228          * (with PDL_POLICY_ROUND) seems better.
2229          */
2230         ptlrpcd_add_req(req, pol, -1);
2231         rc = 0;
2232         EXIT;
2233
2234 out:
2235         if (mem_tight != 0)
2236                 cfs_memory_pressure_restore(mpflag);
2237
2238         capa_put(crattr.cra_capa);
2239         if (rc != 0) {
2240                 LASSERT(req == NULL);
2241
2242                 if (oa)
2243                         OBDO_FREE(oa);
2244                 if (pga)
2245                         OBD_FREE(pga, sizeof(*pga) * page_count);
2246                 /* this should happen rarely and is pretty bad, it makes the
2247                  * pending list not follow the dirty order */
2248                 while (!cfs_list_empty(ext_list)) {
2249                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2250                                              oe_link);
2251                         cfs_list_del_init(&ext->oe_link);
2252                         osc_extent_finish(env, ext, 0, rc);
2253                 }
2254                 if (clerq && !IS_ERR(clerq))
2255                         cl_req_completion(env, clerq, rc);
2256         }
2257         RETURN(rc);
2258 }
2259
2260 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2261                                         struct ldlm_enqueue_info *einfo)
2262 {
2263         void *data = einfo->ei_cbdata;
2264         int set = 0;
2265
2266         LASSERT(lock != NULL);
2267         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2268         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2269         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2270         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2271
2272         lock_res_and_lock(lock);
2273         spin_lock(&osc_ast_guard);
2274
2275         if (lock->l_ast_data == NULL)
2276                 lock->l_ast_data = data;
2277         if (lock->l_ast_data == data)
2278                 set = 1;
2279
2280         spin_unlock(&osc_ast_guard);
2281         unlock_res_and_lock(lock);
2282
2283         return set;
2284 }
2285
2286 static int osc_set_data_with_check(struct lustre_handle *lockh,
2287                                    struct ldlm_enqueue_info *einfo)
2288 {
2289         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2290         int set = 0;
2291
2292         if (lock != NULL) {
2293                 set = osc_set_lock_data_with_check(lock, einfo);
2294                 LDLM_LOCK_PUT(lock);
2295         } else
2296                 CERROR("lockh %p, data %p - client evicted?\n",
2297                        lockh, einfo->ei_cbdata);
2298         return set;
2299 }
2300
2301 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2302                              ldlm_iterator_t replace, void *data)
2303 {
2304         struct ldlm_res_id res_id;
2305         struct obd_device *obd = class_exp2obd(exp);
2306
2307         ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2308         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2309         return 0;
2310 }
2311
2312 /* find any ldlm lock of the inode in osc
2313  * return 0    not find
2314  *        1    find one
2315  *      < 0    error */
2316 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2317                            ldlm_iterator_t replace, void *data)
2318 {
2319         struct ldlm_res_id res_id;
2320         struct obd_device *obd = class_exp2obd(exp);
2321         int rc = 0;
2322
2323         ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2324         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2325         if (rc == LDLM_ITER_STOP)
2326                 return(1);
2327         if (rc == LDLM_ITER_CONTINUE)
2328                 return(0);
2329         return(rc);
2330 }
2331
2332 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2333                             obd_enqueue_update_f upcall, void *cookie,
2334                             __u64 *flags, int agl, int rc)
2335 {
2336         int intent = *flags & LDLM_FL_HAS_INTENT;
2337         ENTRY;
2338
2339         if (intent) {
2340                 /* The request was created before ldlm_cli_enqueue call. */
2341                 if (rc == ELDLM_LOCK_ABORTED) {
2342                         struct ldlm_reply *rep;
2343                         rep = req_capsule_server_get(&req->rq_pill,
2344                                                      &RMF_DLM_REP);
2345
2346                         LASSERT(rep != NULL);
2347                         if (rep->lock_policy_res1)
2348                                 rc = rep->lock_policy_res1;
2349                 }
2350         }
2351
2352         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2353             (rc == 0)) {
2354                 *flags |= LDLM_FL_LVB_READY;
2355                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2356                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2357         }
2358
2359         /* Call the update callback. */
2360         rc = (*upcall)(cookie, rc);
2361         RETURN(rc);
2362 }
2363
2364 static int osc_enqueue_interpret(const struct lu_env *env,
2365                                  struct ptlrpc_request *req,
2366                                  struct osc_enqueue_args *aa, int rc)
2367 {
2368         struct ldlm_lock *lock;
2369         struct lustre_handle handle;
2370         __u32 mode;
2371         struct ost_lvb *lvb;
2372         __u32 lvb_len;
2373         __u64 *flags = aa->oa_flags;
2374
2375         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2376          * might be freed anytime after lock upcall has been called. */
2377         lustre_handle_copy(&handle, aa->oa_lockh);
2378         mode = aa->oa_ei->ei_mode;
2379
2380         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2381          * be valid. */
2382         lock = ldlm_handle2lock(&handle);
2383
2384         /* Take an additional reference so that a blocking AST that
2385          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2386          * to arrive after an upcall has been executed by
2387          * osc_enqueue_fini(). */
2388         ldlm_lock_addref(&handle, mode);
2389
2390         /* Let CP AST to grant the lock first. */
2391         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2392
2393         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2394                 lvb = NULL;
2395                 lvb_len = 0;
2396         } else {
2397                 lvb = aa->oa_lvb;
2398                 lvb_len = sizeof(*aa->oa_lvb);
2399         }
2400
2401         /* Complete obtaining the lock procedure. */
2402         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2403                                    mode, flags, lvb, lvb_len, &handle, rc);
2404         /* Complete osc stuff. */
2405         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2406                               flags, aa->oa_agl, rc);
2407
2408         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2409
2410         /* Release the lock for async request. */
2411         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2412                 /*
2413                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2414                  * not already released by
2415                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2416                  */
2417                 ldlm_lock_decref(&handle, mode);
2418
2419         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2420                  aa->oa_lockh, req, aa);
2421         ldlm_lock_decref(&handle, mode);
2422         LDLM_LOCK_PUT(lock);
2423         return rc;
2424 }
2425
2426 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2427                         struct lov_oinfo *loi, int flags,
2428                         struct ost_lvb *lvb, __u32 mode, int rc)
2429 {
2430         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2431
2432         if (rc == ELDLM_OK) {
2433                 __u64 tmp;
2434
2435                 LASSERT(lock != NULL);
2436                 loi->loi_lvb = *lvb;
2437                 tmp = loi->loi_lvb.lvb_size;
2438                 /* Extend KMS up to the end of this lock and no further
2439                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2440                 if (tmp > lock->l_policy_data.l_extent.end)
2441                         tmp = lock->l_policy_data.l_extent.end + 1;
2442                 if (tmp >= loi->loi_kms) {
2443                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2444                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2445                         loi_kms_set(loi, tmp);
2446                 } else {
2447                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2448                                    LPU64"; leaving kms="LPU64", end="LPU64,
2449                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2450                                    lock->l_policy_data.l_extent.end);
2451                 }
2452                 ldlm_lock_allow_match(lock);
2453         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2454                 LASSERT(lock != NULL);
2455                 loi->loi_lvb = *lvb;
2456                 ldlm_lock_allow_match(lock);
2457                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2458                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2459                 rc = ELDLM_OK;
2460         }
2461
2462         if (lock != NULL) {
2463                 if (rc != ELDLM_OK)
2464                         ldlm_lock_fail_match(lock);
2465
2466                 LDLM_LOCK_PUT(lock);
2467         }
2468 }
2469 EXPORT_SYMBOL(osc_update_enqueue);
2470
2471 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2472
2473 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2474  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2475  * other synchronous requests, however keeping some locks and trying to obtain
2476  * others may take a considerable amount of time in a case of ost failure; and
2477  * when other sync requests do not get released lock from a client, the client
2478  * is excluded from the cluster -- such scenarious make the life difficult, so
2479  * release locks just after they are obtained. */
2480 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2481                      __u64 *flags, ldlm_policy_data_t *policy,
2482                      struct ost_lvb *lvb, int kms_valid,
2483                      obd_enqueue_update_f upcall, void *cookie,
2484                      struct ldlm_enqueue_info *einfo,
2485                      struct lustre_handle *lockh,
2486                      struct ptlrpc_request_set *rqset, int async, int agl)
2487 {
2488         struct obd_device *obd = exp->exp_obd;
2489         struct ptlrpc_request *req = NULL;
2490         int intent = *flags & LDLM_FL_HAS_INTENT;
2491         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2492         ldlm_mode_t mode;
2493         int rc;
2494         ENTRY;
2495
2496         /* Filesystem lock extents are extended to page boundaries so that
2497          * dealing with the page cache is a little smoother.  */
2498         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2499         policy->l_extent.end |= ~CFS_PAGE_MASK;
2500
2501         /*
2502          * kms is not valid when either object is completely fresh (so that no
2503          * locks are cached), or object was evicted. In the latter case cached
2504          * lock cannot be used, because it would prime inode state with
2505          * potentially stale LVB.
2506          */
2507         if (!kms_valid)
2508                 goto no_match;
2509
2510         /* Next, search for already existing extent locks that will cover us */
2511         /* If we're trying to read, we also search for an existing PW lock.  The
2512          * VFS and page cache already protect us locally, so lots of readers/
2513          * writers can share a single PW lock.
2514          *
2515          * There are problems with conversion deadlocks, so instead of
2516          * converting a read lock to a write lock, we'll just enqueue a new
2517          * one.
2518          *
2519          * At some point we should cancel the read lock instead of making them
2520          * send us a blocking callback, but there are problems with canceling
2521          * locks out from other users right now, too. */
2522         mode = einfo->ei_mode;
2523         if (einfo->ei_mode == LCK_PR)
2524                 mode |= LCK_PW;
2525         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2526                                einfo->ei_type, policy, mode, lockh, 0);
2527         if (mode) {
2528                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2529
2530                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2531                         /* For AGL, if enqueue RPC is sent but the lock is not
2532                          * granted, then skip to process this strpe.
2533                          * Return -ECANCELED to tell the caller. */
2534                         ldlm_lock_decref(lockh, mode);
2535                         LDLM_LOCK_PUT(matched);
2536                         RETURN(-ECANCELED);
2537                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2538                         *flags |= LDLM_FL_LVB_READY;
2539                         /* addref the lock only if not async requests and PW
2540                          * lock is matched whereas we asked for PR. */
2541                         if (!rqset && einfo->ei_mode != mode)
2542                                 ldlm_lock_addref(lockh, LCK_PR);
2543                         if (intent) {
2544                                 /* I would like to be able to ASSERT here that
2545                                  * rss <= kms, but I can't, for reasons which
2546                                  * are explained in lov_enqueue() */
2547                         }
2548
2549                         /* We already have a lock, and it's referenced.
2550                          *
2551                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2552                          * AGL upcall may change it to CLS_HELD directly. */
2553                         (*upcall)(cookie, ELDLM_OK);
2554
2555                         if (einfo->ei_mode != mode)
2556                                 ldlm_lock_decref(lockh, LCK_PW);
2557                         else if (rqset)
2558                                 /* For async requests, decref the lock. */
2559                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2560                         LDLM_LOCK_PUT(matched);
2561                         RETURN(ELDLM_OK);
2562                 } else {
2563                         ldlm_lock_decref(lockh, mode);
2564                         LDLM_LOCK_PUT(matched);
2565                 }
2566         }
2567
2568  no_match:
2569         if (intent) {
2570                 CFS_LIST_HEAD(cancels);
2571                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2572                                            &RQF_LDLM_ENQUEUE_LVB);
2573                 if (req == NULL)
2574                         RETURN(-ENOMEM);
2575
2576                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2577                 if (rc) {
2578                         ptlrpc_request_free(req);
2579                         RETURN(rc);
2580                 }
2581
2582                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2583                                      sizeof *lvb);
2584                 ptlrpc_request_set_replen(req);
2585         }
2586
2587         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2588         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2589
2590         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2591                               sizeof(*lvb), LVB_T_OST, lockh, async);
2592         if (rqset) {
2593                 if (!rc) {
2594                         struct osc_enqueue_args *aa;
2595                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2596                         aa = ptlrpc_req_async_args(req);
2597                         aa->oa_ei = einfo;
2598                         aa->oa_exp = exp;
2599                         aa->oa_flags  = flags;
2600                         aa->oa_upcall = upcall;
2601                         aa->oa_cookie = cookie;
2602                         aa->oa_lvb    = lvb;
2603                         aa->oa_lockh  = lockh;
2604                         aa->oa_agl    = !!agl;
2605
2606                         req->rq_interpret_reply =
2607                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2608                         if (rqset == PTLRPCD_SET)
2609                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2610                         else
2611                                 ptlrpc_set_add_req(rqset, req);
2612                 } else if (intent) {
2613                         ptlrpc_req_finished(req);
2614                 }
2615                 RETURN(rc);
2616         }
2617
2618         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2619         if (intent)
2620                 ptlrpc_req_finished(req);
2621
2622         RETURN(rc);
2623 }
2624
2625 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2626                        struct ldlm_enqueue_info *einfo,
2627                        struct ptlrpc_request_set *rqset)
2628 {
2629         struct ldlm_res_id res_id;
2630         int rc;
2631         ENTRY;
2632
2633         ostid_build_res_name(&oinfo->oi_md->lsm_object_oid, &res_id);
2634         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2635                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2636                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2637                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2638                               rqset, rqset != NULL, 0);
2639         RETURN(rc);
2640 }
2641
2642 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2643                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2644                    int *flags, void *data, struct lustre_handle *lockh,
2645                    int unref)
2646 {
2647         struct obd_device *obd = exp->exp_obd;
2648         int lflags = *flags;
2649         ldlm_mode_t rc;
2650         ENTRY;
2651
2652         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2653                 RETURN(-EIO);
2654
2655         /* Filesystem lock extents are extended to page boundaries so that
2656          * dealing with the page cache is a little smoother */
2657         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2658         policy->l_extent.end |= ~CFS_PAGE_MASK;
2659
2660         /* Next, search for already existing extent locks that will cover us */
2661         /* If we're trying to read, we also search for an existing PW lock.  The
2662          * VFS and page cache already protect us locally, so lots of readers/
2663          * writers can share a single PW lock. */
2664         rc = mode;
2665         if (mode == LCK_PR)
2666                 rc |= LCK_PW;
2667         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2668                              res_id, type, policy, rc, lockh, unref);
2669         if (rc) {
2670                 if (data != NULL) {
2671                         if (!osc_set_data_with_check(lockh, data)) {
2672                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2673                                         ldlm_lock_decref(lockh, rc);
2674                                 RETURN(0);
2675                         }
2676                 }
2677                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2678                         ldlm_lock_addref(lockh, LCK_PR);
2679                         ldlm_lock_decref(lockh, LCK_PW);
2680                 }
2681                 RETURN(rc);
2682         }
2683         RETURN(rc);
2684 }
2685
2686 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2687 {
2688         ENTRY;
2689
2690         if (unlikely(mode == LCK_GROUP))
2691                 ldlm_lock_decref_and_cancel(lockh, mode);
2692         else
2693                 ldlm_lock_decref(lockh, mode);
2694
2695         RETURN(0);
2696 }
2697
2698 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2699                       __u32 mode, struct lustre_handle *lockh)
2700 {
2701         ENTRY;
2702         RETURN(osc_cancel_base(lockh, mode));
2703 }
2704
2705 static int osc_cancel_unused(struct obd_export *exp,
2706                              struct lov_stripe_md *lsm,
2707                              ldlm_cancel_flags_t flags,
2708                              void *opaque)
2709 {
2710         struct obd_device *obd = class_exp2obd(exp);
2711         struct ldlm_res_id res_id, *resp = NULL;
2712
2713         if (lsm != NULL) {
2714                 ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2715                 resp = &res_id;
2716         }
2717
2718         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2719 }
2720
2721 static int osc_statfs_interpret(const struct lu_env *env,
2722                                 struct ptlrpc_request *req,
2723                                 struct osc_async_args *aa, int rc)
2724 {
2725         struct obd_statfs *msfs;
2726         ENTRY;
2727
2728         if (rc == -EBADR)
2729                 /* The request has in fact never been sent
2730                  * due to issues at a higher level (LOV).
2731                  * Exit immediately since the caller is
2732                  * aware of the problem and takes care
2733                  * of the clean up */
2734                  RETURN(rc);
2735
2736         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2737             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2738                 GOTO(out, rc = 0);
2739
2740         if (rc != 0)
2741                 GOTO(out, rc);
2742
2743         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2744         if (msfs == NULL) {
2745                 GOTO(out, rc = -EPROTO);
2746         }
2747
2748         *aa->aa_oi->oi_osfs = *msfs;
2749 out:
2750         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2751         RETURN(rc);
2752 }
2753
2754 static int osc_statfs_async(struct obd_export *exp,
2755                             struct obd_info *oinfo, __u64 max_age,
2756                             struct ptlrpc_request_set *rqset)
2757 {
2758         struct obd_device     *obd = class_exp2obd(exp);
2759         struct ptlrpc_request *req;
2760         struct osc_async_args *aa;
2761         int                    rc;
2762         ENTRY;
2763
2764         /* We could possibly pass max_age in the request (as an absolute
2765          * timestamp or a "seconds.usec ago") so the target can avoid doing
2766          * extra calls into the filesystem if that isn't necessary (e.g.
2767          * during mount that would help a bit).  Having relative timestamps
2768          * is not so great if request processing is slow, while absolute
2769          * timestamps are not ideal because they need time synchronization. */
2770         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2771         if (req == NULL)
2772                 RETURN(-ENOMEM);
2773
2774         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2775         if (rc) {
2776                 ptlrpc_request_free(req);
2777                 RETURN(rc);
2778         }
2779         ptlrpc_request_set_replen(req);
2780         req->rq_request_portal = OST_CREATE_PORTAL;
2781         ptlrpc_at_set_req_timeout(req);
2782
2783         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2784                 /* procfs requests not want stat in wait for avoid deadlock */
2785                 req->rq_no_resend = 1;
2786                 req->rq_no_delay = 1;
2787         }
2788
2789         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2790         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2791         aa = ptlrpc_req_async_args(req);
2792         aa->aa_oi = oinfo;
2793
2794         ptlrpc_set_add_req(rqset, req);
2795         RETURN(0);
2796 }
2797
2798 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2799                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2800 {
2801         struct obd_device     *obd = class_exp2obd(exp);
2802         struct obd_statfs     *msfs;
2803         struct ptlrpc_request *req;
2804         struct obd_import     *imp = NULL;
2805         int rc;
2806         ENTRY;
2807
2808         /*Since the request might also come from lprocfs, so we need
2809          *sync this with client_disconnect_export Bug15684*/
2810         down_read(&obd->u.cli.cl_sem);
2811         if (obd->u.cli.cl_import)
2812                 imp = class_import_get(obd->u.cli.cl_import);
2813         up_read(&obd->u.cli.cl_sem);
2814         if (!imp)
2815                 RETURN(-ENODEV);
2816
2817         /* We could possibly pass max_age in the request (as an absolute
2818          * timestamp or a "seconds.usec ago") so the target can avoid doing
2819          * extra calls into the filesystem if that isn't necessary (e.g.
2820          * during mount that would help a bit).  Having relative timestamps
2821          * is not so great if request processing is slow, while absolute
2822          * timestamps are not ideal because they need time synchronization. */
2823         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2824
2825         class_import_put(imp);
2826
2827         if (req == NULL)
2828                 RETURN(-ENOMEM);
2829
2830         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2831         if (rc) {
2832                 ptlrpc_request_free(req);
2833                 RETURN(rc);
2834         }
2835         ptlrpc_request_set_replen(req);
2836         req->rq_request_portal = OST_CREATE_PORTAL;
2837         ptlrpc_at_set_req_timeout(req);
2838
2839         if (flags & OBD_STATFS_NODELAY) {
2840                 /* procfs requests not want stat in wait for avoid deadlock */
2841                 req->rq_no_resend = 1;
2842                 req->rq_no_delay = 1;
2843         }
2844
2845         rc = ptlrpc_queue_wait(req);
2846         if (rc)
2847                 GOTO(out, rc);
2848
2849         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2850         if (msfs == NULL) {
2851                 GOTO(out, rc = -EPROTO);
2852         }
2853
2854         *osfs = *msfs;
2855
2856         EXIT;
2857  out:
2858         ptlrpc_req_finished(req);
2859         return rc;
2860 }
2861
2862 /* Retrieve object striping information.
2863  *
2864  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2865  * the maximum number of OST indices which will fit in the user buffer.
2866  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2867  */
2868 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2869 {
2870         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2871         struct lov_user_md_v3 lum, *lumk;
2872         struct lov_user_ost_data_v1 *lmm_objects;
2873         int rc = 0, lum_size;
2874         ENTRY;
2875
2876         if (!lsm)
2877                 RETURN(-ENODATA);
2878
2879         /* we only need the header part from user space to get lmm_magic and
2880          * lmm_stripe_count, (the header part is common to v1 and v3) */
2881         lum_size = sizeof(struct lov_user_md_v1);
2882         if (cfs_copy_from_user(&lum, lump, lum_size))
2883                 RETURN(-EFAULT);
2884
2885         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2886             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2887                 RETURN(-EINVAL);
2888
2889         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2890         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2891         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2892         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2893
2894         /* we can use lov_mds_md_size() to compute lum_size
2895          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2896         if (lum.lmm_stripe_count > 0) {
2897                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2898                 OBD_ALLOC(lumk, lum_size);
2899                 if (!lumk)
2900                         RETURN(-ENOMEM);
2901
2902                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2903                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2904                 else
2905                         lmm_objects = &(lumk->lmm_objects[0]);
2906                 lmm_objects->l_object_id = lsm->lsm_object_id;
2907         } else {
2908                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2909                 lumk = &lum;
2910         }
2911
2912         lumk->lmm_object_id = lsm->lsm_object_id;
2913         lumk->lmm_object_seq = lsm->lsm_object_seq;
2914         lumk->lmm_stripe_count = 1;
2915
2916         if (cfs_copy_to_user(lump, lumk, lum_size))
2917                 rc = -EFAULT;
2918
2919         if (lumk != &lum)
2920                 OBD_FREE(lumk, lum_size);
2921
2922         RETURN(rc);
2923 }
2924
2925
2926 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2927                          void *karg, void *uarg)
2928 {
2929         struct obd_device *obd = exp->exp_obd;
2930         struct obd_ioctl_data *data = karg;
2931         int err = 0;
2932         ENTRY;
2933
2934         if (!cfs_try_module_get(THIS_MODULE)) {
2935                 CERROR("Can't get module. Is it alive?");
2936                 return -EINVAL;
2937         }
2938         switch (cmd) {
2939         case OBD_IOC_LOV_GET_CONFIG: {
2940                 char *buf;
2941                 struct lov_desc *desc;
2942                 struct obd_uuid uuid;
2943
2944                 buf = NULL;
2945                 len = 0;
2946                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2947                         GOTO(out, err = -EINVAL);
2948
2949                 data = (struct obd_ioctl_data *)buf;
2950
2951                 if (sizeof(*desc) > data->ioc_inllen1) {
2952                         obd_ioctl_freedata(buf, len);
2953                         GOTO(out, err = -EINVAL);
2954                 }
2955
2956                 if (data->ioc_inllen2 < sizeof(uuid)) {
2957                         obd_ioctl_freedata(buf, len);
2958                         GOTO(out, err = -EINVAL);
2959                 }
2960
2961                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2962                 desc->ld_tgt_count = 1;
2963                 desc->ld_active_tgt_count = 1;
2964                 desc->ld_default_stripe_count = 1;
2965                 desc->ld_default_stripe_size = 0;
2966                 desc->ld_default_stripe_offset = 0;
2967                 desc->ld_pattern = 0;
2968                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2969
2970                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2971
2972                 err = cfs_copy_to_user((void *)uarg, buf, len);
2973                 if (err)
2974                         err = -EFAULT;
2975                 obd_ioctl_freedata(buf, len);
2976                 GOTO(out, err);
2977         }
2978         case LL_IOC_LOV_SETSTRIPE:
2979                 err = obd_alloc_memmd(exp, karg);
2980                 if (err > 0)
2981                         err = 0;
2982                 GOTO(out, err);
2983         case LL_IOC_LOV_GETSTRIPE:
2984                 err = osc_getstripe(karg, uarg);
2985                 GOTO(out, err);
2986         case OBD_IOC_CLIENT_RECOVER:
2987                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2988                                             data->ioc_inlbuf1, 0);
2989                 if (err > 0)
2990                         err = 0;
2991                 GOTO(out, err);
2992         case IOC_OSC_SET_ACTIVE:
2993                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2994                                                data->ioc_offset);
2995                 GOTO(out, err);
2996         case OBD_IOC_POLL_QUOTACHECK:
2997                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2998                 GOTO(out, err);
2999         case OBD_IOC_PING_TARGET:
3000                 err = ptlrpc_obd_ping(obd);
3001                 GOTO(out, err);
3002         default:
3003                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3004                        cmd, cfs_curproc_comm());
3005                 GOTO(out, err = -ENOTTY);
3006         }
3007 out:
3008         cfs_module_put(THIS_MODULE);
3009         return err;
3010 }
3011
3012 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3013                         obd_count keylen, void *key, __u32 *vallen, void *val,
3014                         struct lov_stripe_md *lsm)
3015 {
3016         ENTRY;
3017         if (!vallen || !val)
3018                 RETURN(-EFAULT);
3019
3020         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3021                 __u32 *stripe = val;
3022                 *vallen = sizeof(*stripe);
3023                 *stripe = 0;
3024                 RETURN(0);
3025         } else if (KEY_IS(KEY_LAST_ID)) {
3026                 struct ptlrpc_request *req;
3027                 obd_id                *reply;
3028                 char                  *tmp;
3029                 int                    rc;
3030
3031                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3032                                            &RQF_OST_GET_INFO_LAST_ID);
3033                 if (req == NULL)
3034                         RETURN(-ENOMEM);
3035
3036                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3037                                      RCL_CLIENT, keylen);
3038                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3039                 if (rc) {
3040                         ptlrpc_request_free(req);
3041                         RETURN(rc);
3042                 }
3043
3044                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3045                 memcpy(tmp, key, keylen);
3046
3047                 req->rq_no_delay = req->rq_no_resend = 1;
3048                 ptlrpc_request_set_replen(req);
3049                 rc = ptlrpc_queue_wait(req);
3050                 if (rc)
3051                         GOTO(out, rc);
3052
3053                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3054                 if (reply == NULL)
3055                         GOTO(out, rc = -EPROTO);
3056
3057                 *((obd_id *)val) = *reply;
3058         out:
3059                 ptlrpc_req_finished(req);
3060                 RETURN(rc);
3061         } else if (KEY_IS(KEY_FIEMAP)) {
3062                 struct ptlrpc_request *req;
3063                 struct ll_user_fiemap *reply;
3064                 char *tmp;
3065                 int rc;
3066
3067                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3068                                            &RQF_OST_GET_INFO_FIEMAP);
3069                 if (req == NULL)
3070                         RETURN(-ENOMEM);
3071
3072                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3073                                      RCL_CLIENT, keylen);
3074                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3075                                      RCL_CLIENT, *vallen);
3076                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3077                                      RCL_SERVER, *vallen);
3078
3079                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3080                 if (rc) {
3081                         ptlrpc_request_free(req);
3082                         RETURN(rc);
3083                 }
3084
3085                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3086                 memcpy(tmp, key, keylen);
3087                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3088                 memcpy(tmp, val, *vallen);
3089
3090                 ptlrpc_request_set_replen(req);
3091                 rc = ptlrpc_queue_wait(req);
3092                 if (rc)
3093                         GOTO(out1, rc);
3094
3095                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3096                 if (reply == NULL)
3097                         GOTO(out1, rc = -EPROTO);
3098
3099                 memcpy(val, reply, *vallen);
3100         out1:
3101                 ptlrpc_req_finished(req);
3102
3103                 RETURN(rc);
3104         }
3105
3106         RETURN(-EINVAL);
3107 }
3108
3109 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3110                               obd_count keylen, void *key, obd_count vallen,
3111                               void *val, struct ptlrpc_request_set *set)
3112 {
3113         struct ptlrpc_request *req;
3114         struct obd_device     *obd = exp->exp_obd;
3115         struct obd_import     *imp = class_exp2cliimp(exp);
3116         char                  *tmp;
3117         int                    rc;
3118         ENTRY;
3119
3120         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3121
3122         if (KEY_IS(KEY_CHECKSUM)) {
3123                 if (vallen != sizeof(int))
3124                         RETURN(-EINVAL);
3125                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3126                 RETURN(0);
3127         }
3128
3129         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3130                 sptlrpc_conf_client_adapt(obd);
3131                 RETURN(0);
3132         }
3133
3134         if (KEY_IS(KEY_FLUSH_CTX)) {
3135                 sptlrpc_import_flush_my_ctx(imp);
3136                 RETURN(0);
3137         }
3138
3139         if (KEY_IS(KEY_CACHE_SET)) {
3140                 struct client_obd *cli = &obd->u.cli;
3141
3142                 LASSERT(cli->cl_cache == NULL); /* only once */
3143                 cli->cl_cache = (struct cl_client_cache *)val;
3144                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3145                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3146
3147                 /* add this osc into entity list */
3148                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3149                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3150                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3151                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3152
3153                 RETURN(0);
3154         }
3155
3156         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3157                 struct client_obd *cli = &obd->u.cli;
3158                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3159                 int target = *(int *)val;
3160
3161                 nr = osc_lru_shrink(cli, min(nr, target));
3162                 *(int *)val -= nr;
3163                 RETURN(0);
3164         }
3165
3166         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3167                 RETURN(-EINVAL);
3168
3169         /* We pass all other commands directly to OST. Since nobody calls osc
3170            methods directly and everybody is supposed to go through LOV, we
3171            assume lov checked invalid values for us.
3172            The only recognised values so far are evict_by_nid and mds_conn.
3173            Even if something bad goes through, we'd get a -EINVAL from OST
3174            anyway. */
3175
3176         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3177                                                 &RQF_OST_SET_GRANT_INFO :
3178                                                 &RQF_OBD_SET_INFO);
3179         if (req == NULL)
3180                 RETURN(-ENOMEM);
3181
3182         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3183                              RCL_CLIENT, keylen);
3184         if (!KEY_IS(KEY_GRANT_SHRINK))
3185                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3186                                      RCL_CLIENT, vallen);
3187         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3188         if (rc) {
3189                 ptlrpc_request_free(req);
3190                 RETURN(rc);
3191         }
3192
3193         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3194         memcpy(tmp, key, keylen);
3195         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3196                                                         &RMF_OST_BODY :
3197                                                         &RMF_SETINFO_VAL);
3198         memcpy(tmp, val, vallen);
3199
3200         if (KEY_IS(KEY_GRANT_SHRINK)) {
3201                 struct osc_grant_args *aa;
3202                 struct obdo *oa;
3203
3204                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3205                 aa = ptlrpc_req_async_args(req);
3206                 OBDO_ALLOC(oa);
3207                 if (!oa) {
3208                         ptlrpc_req_finished(req);
3209                         RETURN(-ENOMEM);
3210                 }
3211                 *oa = ((struct ost_body *)val)->oa;
3212                 aa->aa_oa = oa;
3213                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3214         }
3215
3216         ptlrpc_request_set_replen(req);
3217         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3218                 LASSERT(set != NULL);
3219                 ptlrpc_set_add_req(set, req);
3220                 ptlrpc_check_set(NULL, set);
3221         } else
3222                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3223
3224         RETURN(0);
3225 }
3226
3227
3228 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3229                          struct obd_device *disk_obd, int *index)
3230 {
3231         /* this code is not supposed to be used with LOD/OSP
3232          * to be removed soon */
3233         LBUG();
3234         return 0;
3235 }
3236
3237 static int osc_llog_finish(struct obd_device *obd, int count)
3238 {
3239         struct llog_ctxt *ctxt;
3240
3241         ENTRY;
3242
3243         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3244         if (ctxt) {
3245                 llog_cat_close(NULL, ctxt->loc_handle);
3246                 llog_cleanup(NULL, ctxt);
3247         }
3248
3249         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3250         if (ctxt)
3251                 llog_cleanup(NULL, ctxt);
3252         RETURN(0);
3253 }
3254
3255 static int osc_reconnect(const struct lu_env *env,
3256                          struct obd_export *exp, struct obd_device *obd,
3257                          struct obd_uuid *cluuid,
3258                          struct obd_connect_data *data,
3259                          void *localdata)
3260 {
3261         struct client_obd *cli = &obd->u.cli;
3262
3263         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3264                 long lost_grant;
3265
3266                 client_obd_list_lock(&cli->cl_loi_list_lock);
3267                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3268                                 2 * cli_brw_size(obd);
3269                 lost_grant = cli->cl_lost_grant;
3270                 cli->cl_lost_grant = 0;
3271                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3272
3273                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3274                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3275                        data->ocd_version, data->ocd_grant, lost_grant);
3276         }
3277
3278         RETURN(0);
3279 }
3280
3281 static int osc_disconnect(struct obd_export *exp)
3282 {
3283         struct obd_device *obd = class_exp2obd(exp);
3284         struct llog_ctxt  *ctxt;
3285         int rc;
3286
3287         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3288         if (ctxt) {
3289                 if (obd->u.cli.cl_conn_count == 1) {
3290                         /* Flush any remaining cancel messages out to the
3291                          * target */
3292                         llog_sync(ctxt, exp, 0);
3293                 }
3294                 llog_ctxt_put(ctxt);
3295         } else {
3296                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3297                        obd);
3298         }
3299
3300         rc = client_disconnect_export(exp);
3301         /**
3302          * Initially we put del_shrink_grant before disconnect_export, but it
3303          * causes the following problem if setup (connect) and cleanup
3304          * (disconnect) are tangled together.
3305          *      connect p1                     disconnect p2
3306          *   ptlrpc_connect_import
3307          *     ...............               class_manual_cleanup
3308          *                                     osc_disconnect
3309          *                                     del_shrink_grant
3310          *   ptlrpc_connect_interrupt
3311          *     init_grant_shrink
3312          *   add this client to shrink list
3313          *                                      cleanup_osc
3314          * Bang! pinger trigger the shrink.
3315          * So the osc should be disconnected from the shrink list, after we
3316          * are sure the import has been destroyed. BUG18662
3317          */
3318         if (obd->u.cli.cl_import == NULL)
3319                 osc_del_shrink_grant(&obd->u.cli);
3320         return rc;
3321 }
3322
3323 static int osc_import_event(struct obd_device *obd,
3324                             struct obd_import *imp,
3325                             enum obd_import_event event)
3326 {
3327         struct client_obd *cli;
3328         int rc = 0;
3329
3330         ENTRY;
3331         LASSERT(imp->imp_obd == obd);
3332
3333         switch (event) {
3334         case IMP_EVENT_DISCON: {
3335                 cli = &obd->u.cli;
3336                 client_obd_list_lock(&cli->cl_loi_list_lock);
3337                 cli->cl_avail_grant = 0;
3338                 cli->cl_lost_grant = 0;
3339                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3340                 break;
3341         }
3342         case IMP_EVENT_INACTIVE: {
3343                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3344                 break;
3345         }
3346         case IMP_EVENT_INVALIDATE: {
3347                 struct ldlm_namespace *ns = obd->obd_namespace;
3348                 struct lu_env         *env;
3349                 int                    refcheck;
3350
3351                 env = cl_env_get(&refcheck);
3352                 if (!IS_ERR(env)) {
3353                         /* Reset grants */
3354                         cli = &obd->u.cli;
3355                         /* all pages go to failing rpcs due to the invalid
3356                          * import */
3357                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3358
3359                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3360                         cl_env_put(env, &refcheck);
3361                 } else
3362                         rc = PTR_ERR(env);
3363                 break;
3364         }
3365         case IMP_EVENT_ACTIVE: {
3366                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3367                 break;
3368         }
3369         case IMP_EVENT_OCD: {
3370                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3371
3372                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3373                         osc_init_grant(&obd->u.cli, ocd);
3374
3375                 /* See bug 7198 */
3376                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3377                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3378
3379                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3380                 break;
3381         }
3382         case IMP_EVENT_DEACTIVATE: {
3383                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3384                 break;
3385         }
3386         case IMP_EVENT_ACTIVATE: {
3387                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3388                 break;
3389         }
3390         default:
3391                 CERROR("Unknown import event %d\n", event);
3392                 LBUG();
3393         }
3394         RETURN(rc);
3395 }
3396
3397 /**
3398  * Determine whether the lock can be canceled before replaying the lock
3399  * during recovery, see bug16774 for detailed information.
3400  *
3401  * \retval zero the lock can't be canceled
3402  * \retval other ok to cancel
3403  */
3404 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3405 {
3406         check_res_locked(lock->l_resource);
3407
3408         /*
3409          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3410          *
3411          * XXX as a future improvement, we can also cancel unused write lock
3412          * if it doesn't have dirty data and active mmaps.
3413          */
3414         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3415             (lock->l_granted_mode == LCK_PR ||
3416              lock->l_granted_mode == LCK_CR) &&
3417             (osc_dlm_lock_pageref(lock) == 0))
3418                 RETURN(1);
3419
3420         RETURN(0);
3421 }
3422
3423 static int brw_queue_work(const struct lu_env *env, void *data)
3424 {
3425         struct client_obd *cli = data;
3426
3427         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3428
3429         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3430         RETURN(0);
3431 }
3432
3433 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3434 {
3435         struct lprocfs_static_vars lvars = { 0 };
3436         struct client_obd          *cli = &obd->u.cli;
3437         void                       *handler;
3438         int                        rc;
3439         ENTRY;
3440
3441         rc = ptlrpcd_addref();
3442         if (rc)
3443                 RETURN(rc);
3444
3445         rc = client_obd_setup(obd, lcfg);
3446         if (rc)
3447                 GOTO(out_ptlrpcd, rc);
3448
3449         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3450         if (IS_ERR(handler))
3451                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3452         cli->cl_writeback_work = handler;
3453
3454         rc = osc_quota_setup(obd);
3455         if (rc)
3456                 GOTO(out_ptlrpcd_work, rc);
3457
3458         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3459         lprocfs_osc_init_vars(&lvars);
3460         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3461                 lproc_osc_attach_seqstat(obd);
3462                 sptlrpc_lprocfs_cliobd_attach(obd);
3463                 ptlrpc_lprocfs_register_obd(obd);
3464         }
3465
3466         /* We need to allocate a few requests more, because
3467          * brw_interpret tries to create new requests before freeing
3468          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3469          * reserved, but I'm afraid that might be too much wasted RAM
3470          * in fact, so 2 is just my guess and still should work. */
3471         cli->cl_import->imp_rq_pool =
3472                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3473                                     OST_MAXREQSIZE,
3474                                     ptlrpc_add_rqs_to_pool);
3475
3476         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3477         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3478         RETURN(rc);
3479
3480 out_ptlrpcd_work:
3481         ptlrpcd_destroy_work(handler);
3482 out_client_setup:
3483         client_obd_cleanup(obd);
3484 out_ptlrpcd:
3485         ptlrpcd_decref();
3486         RETURN(rc);
3487 }
3488
3489 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3490 {
3491         int rc = 0;
3492         ENTRY;
3493
3494         switch (stage) {
3495         case OBD_CLEANUP_EARLY: {
3496                 struct obd_import *imp;
3497                 imp = obd->u.cli.cl_import;
3498                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3499                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3500                 ptlrpc_deactivate_import(imp);
3501                 spin_lock(&imp->imp_lock);
3502                 imp->imp_pingable = 0;
3503                 spin_unlock(&imp->imp_lock);
3504                 break;
3505         }
3506         case OBD_CLEANUP_EXPORTS: {
3507                 struct client_obd *cli = &obd->u.cli;
3508                 /* LU-464
3509                  * for echo client, export may be on zombie list, wait for
3510                  * zombie thread to cull it, because cli.cl_import will be
3511                  * cleared in client_disconnect_export():
3512                  *   class_export_destroy() -> obd_cleanup() ->
3513                  *   echo_device_free() -> echo_client_cleanup() ->
3514                  *   obd_disconnect() -> osc_disconnect() ->
3515                  *   client_disconnect_export()
3516                  */
3517                 obd_zombie_barrier();
3518                 if (cli->cl_writeback_work) {
3519                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3520                         cli->cl_writeback_work = NULL;
3521                 }
3522                 obd_cleanup_client_import(obd);
3523                 ptlrpc_lprocfs_unregister_obd(obd);
3524                 lprocfs_obd_cleanup(obd);
3525                 rc = obd_llog_finish(obd, 0);
3526                 if (rc != 0)
3527                         CERROR("failed to cleanup llogging subsystems\n");
3528                 break;
3529                 }
3530         }
3531         RETURN(rc);
3532 }
3533
3534 int osc_cleanup(struct obd_device *obd)
3535 {
3536         struct client_obd *cli = &obd->u.cli;
3537         int rc;
3538
3539         ENTRY;
3540
3541         /* lru cleanup */
3542         if (cli->cl_cache != NULL) {
3543                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3544                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3545                 cfs_list_del_init(&cli->cl_lru_osc);
3546                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3547                 cli->cl_lru_left = NULL;
3548                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3549                 cli->cl_cache = NULL;
3550         }
3551
3552         /* free memory of osc quota cache */
3553         osc_quota_cleanup(obd);
3554
3555         rc = client_obd_cleanup(obd);
3556
3557         ptlrpcd_decref();
3558         RETURN(rc);
3559 }
3560
3561 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3562 {
3563         struct lprocfs_static_vars lvars = { 0 };
3564         int rc = 0;
3565
3566         lprocfs_osc_init_vars(&lvars);
3567
3568         switch (lcfg->lcfg_command) {
3569         default:
3570                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3571                                               lcfg, obd);
3572                 if (rc > 0)
3573                         rc = 0;
3574                 break;
3575         }
3576
3577         return(rc);
3578 }
3579
3580 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3581 {
3582         return osc_process_config_base(obd, buf);
3583 }
3584
3585 struct obd_ops osc_obd_ops = {
3586         .o_owner                = THIS_MODULE,
3587         .o_setup                = osc_setup,
3588         .o_precleanup           = osc_precleanup,
3589         .o_cleanup              = osc_cleanup,
3590         .o_add_conn             = client_import_add_conn,
3591         .o_del_conn             = client_import_del_conn,
3592         .o_connect              = client_connect_import,
3593         .o_reconnect            = osc_reconnect,
3594         .o_disconnect           = osc_disconnect,
3595         .o_statfs               = osc_statfs,
3596         .o_statfs_async         = osc_statfs_async,
3597         .o_packmd               = osc_packmd,
3598         .o_unpackmd             = osc_unpackmd,
3599         .o_create               = osc_create,
3600         .o_destroy              = osc_destroy,
3601         .o_getattr              = osc_getattr,
3602         .o_getattr_async        = osc_getattr_async,
3603         .o_setattr              = osc_setattr,
3604         .o_setattr_async        = osc_setattr_async,
3605         .o_brw                  = osc_brw,
3606         .o_punch                = osc_punch,
3607         .o_sync                 = osc_sync,
3608         .o_enqueue              = osc_enqueue,
3609         .o_change_cbdata        = osc_change_cbdata,
3610         .o_find_cbdata          = osc_find_cbdata,
3611         .o_cancel               = osc_cancel,
3612         .o_cancel_unused        = osc_cancel_unused,
3613         .o_iocontrol            = osc_iocontrol,
3614         .o_get_info             = osc_get_info,
3615         .o_set_info_async       = osc_set_info_async,
3616         .o_import_event         = osc_import_event,
3617         .o_llog_init            = osc_llog_init,
3618         .o_llog_finish          = osc_llog_finish,
3619         .o_process_config       = osc_process_config,
3620         .o_quotactl             = osc_quotactl,
3621         .o_quotacheck           = osc_quotacheck,
3622 };
3623
3624 extern struct lu_kmem_descr osc_caches[];
3625 extern spinlock_t osc_ast_guard;
3626 extern struct lock_class_key osc_ast_guard_class;
3627
3628 int __init osc_init(void)
3629 {
3630         struct lprocfs_static_vars lvars = { 0 };
3631         int rc;
3632         ENTRY;
3633
3634         /* print an address of _any_ initialized kernel symbol from this
3635          * module, to allow debugging with gdb that doesn't support data
3636          * symbols from modules.*/
3637         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3638
3639         rc = lu_kmem_init(osc_caches);
3640
3641         lprocfs_osc_init_vars(&lvars);
3642
3643         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3644                                  LUSTRE_OSC_NAME, &osc_device_type);
3645         if (rc) {
3646                 lu_kmem_fini(osc_caches);
3647                 RETURN(rc);
3648         }
3649
3650         spin_lock_init(&osc_ast_guard);
3651         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3652
3653         RETURN(rc);
3654 }
3655
3656 #ifdef __KERNEL__
3657 static void /*__exit*/ osc_exit(void)
3658 {
3659         class_unregister_type(LUSTRE_OSC_NAME);
3660         lu_kmem_fini(osc_caches);
3661 }
3662
3663 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3664 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3665 MODULE_LICENSE("GPL");
3666
3667 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3668 #endif