Whamcloud - gitweb
LU-3219 ost: Ensure dirty flushed on fiemap ioctl
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
188                              oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
217                                      aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265                        struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
297                              &body->oa);
298
299         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309                        struct obd_info *oinfo, struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
342                              &body->oa);
343
344         EXIT;
345 out:
346         ptlrpc_req_finished(req);
347         RETURN(rc);
348 }
349
350 static int osc_setattr_interpret(const struct lu_env *env,
351                                  struct ptlrpc_request *req,
352                                  struct osc_setattr_args *sa, int rc)
353 {
354         struct ost_body *body;
355         ENTRY;
356
357         if (rc != 0)
358                 GOTO(out, rc);
359
360         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361         if (body == NULL)
362                 GOTO(out, rc = -EPROTO);
363
364         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
365                              &body->oa);
366 out:
367         rc = sa->sa_upcall(sa->sa_cookie, rc);
368         RETURN(rc);
369 }
370
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372                            struct obd_trans_info *oti,
373                            obd_enqueue_update_f upcall, void *cookie,
374                            struct ptlrpc_request_set *rqset)
375 {
376         struct ptlrpc_request   *req;
377         struct osc_setattr_args *sa;
378         int                      rc;
379         ENTRY;
380
381         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382         if (req == NULL)
383                 RETURN(-ENOMEM);
384
385         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 RETURN(rc);
390         }
391
392         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
394
395         osc_pack_req_body(req, oinfo);
396
397         ptlrpc_request_set_replen(req);
398
399         /* do mds to ost setattr asynchronously */
400         if (!rqset) {
401                 /* Do not wait for response. */
402                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
403         } else {
404                 req->rq_interpret_reply =
405                         (ptlrpc_interpterer_t)osc_setattr_interpret;
406
407                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408                 sa = ptlrpc_req_async_args(req);
409                 sa->sa_oa = oinfo->oi_oa;
410                 sa->sa_upcall = upcall;
411                 sa->sa_cookie = cookie;
412
413                 if (rqset == PTLRPCD_SET)
414                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
415                 else
416                         ptlrpc_set_add_req(rqset, req);
417         }
418
419         RETURN(0);
420 }
421
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423                              struct obd_trans_info *oti,
424                              struct ptlrpc_request_set *rqset)
425 {
426         return osc_setattr_async_base(exp, oinfo, oti,
427                                       oinfo->oi_cb_up, oinfo, rqset);
428 }
429
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
432 {
433         struct ptlrpc_request *req;
434         struct ost_body       *body;
435         struct lov_stripe_md  *lsm;
436         int                    rc;
437         ENTRY;
438
439         LASSERT(oa);
440         LASSERT(ea);
441
442         lsm = *ea;
443         if (!lsm) {
444                 rc = obd_alloc_memmd(exp, &lsm);
445                 if (rc < 0)
446                         RETURN(rc);
447         }
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
450         if (req == NULL)
451                 GOTO(out, rc = -ENOMEM);
452
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 GOTO(out, rc);
457         }
458
459         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
460         LASSERT(body);
461
462         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
463
464         ptlrpc_request_set_replen(req);
465
466         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467             oa->o_flags == OBD_FL_DELORPHAN) {
468                 DEBUG_REQ(D_HA, req,
469                           "delorphan from OST integration");
470                 /* Don't resend the delorphan req */
471                 req->rq_no_resend = req->rq_no_delay = 1;
472         }
473
474         rc = ptlrpc_queue_wait(req);
475         if (rc)
476                 GOTO(out_req, rc);
477
478         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479         if (body == NULL)
480                 GOTO(out_req, rc = -EPROTO);
481
482         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
483         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
484
485         oa->o_blksize = cli_brw_size(exp->exp_obd);
486         oa->o_valid |= OBD_MD_FLBLKSZ;
487
488         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489          * have valid lsm_oinfo data structs, so don't go touching that.
490          * This needs to be fixed in a big way.
491          */
492         lsm->lsm_oi = oa->o_oi;
493         *ea = lsm;
494
495         if (oti != NULL) {
496                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
497
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (!oti->oti_logcookies)
500                                 oti_alloc_cookies(oti, 1);
501                         *oti->oti_logcookies = oa->o_lcookie;
502                 }
503         }
504
505         CDEBUG(D_HA, "transno: "LPD64"\n",
506                lustre_msg_get_transno(req->rq_repmsg));
507 out_req:
508         ptlrpc_req_finished(req);
509 out:
510         if (rc && !*ea)
511                 obd_free_memmd(exp, &lsm);
512         RETURN(rc);
513 }
514
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516                    obd_enqueue_update_f upcall, void *cookie,
517                    struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request   *req;
520         struct osc_setattr_args *sa;
521         struct ost_body         *body;
522         int                      rc;
523         ENTRY;
524
525         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526         if (req == NULL)
527                 RETURN(-ENOMEM);
528
529         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(rc);
534         }
535         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536         ptlrpc_at_set_req_timeout(req);
537
538         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539         LASSERT(body);
540         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
541                              oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561                      struct obd_info *oinfo, struct obd_trans_info *oti,
562                      struct ptlrpc_request_set *rqset)
563 {
564         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
565         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567         return osc_punch_base(exp, oinfo,
568                               oinfo->oi_cb_up, oinfo, rqset);
569 }
570
571 static int osc_sync_interpret(const struct lu_env *env,
572                               struct ptlrpc_request *req,
573                               void *arg, int rc)
574 {
575         struct osc_fsync_args *fa = arg;
576         struct ost_body *body;
577         ENTRY;
578
579         if (rc)
580                 GOTO(out, rc);
581
582         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
583         if (body == NULL) {
584                 CERROR ("can't unpack ost_body\n");
585                 GOTO(out, rc = -EPROTO);
586         }
587
588         *fa->fa_oi->oi_oa = body->oa;
589 out:
590         rc = fa->fa_upcall(fa->fa_cookie, rc);
591         RETURN(rc);
592 }
593
594 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
595                   obd_enqueue_update_f upcall, void *cookie,
596                   struct ptlrpc_request_set *rqset)
597 {
598         struct ptlrpc_request *req;
599         struct ost_body       *body;
600         struct osc_fsync_args *fa;
601         int                    rc;
602         ENTRY;
603
604         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
605         if (req == NULL)
606                 RETURN(-ENOMEM);
607
608         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
609         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
610         if (rc) {
611                 ptlrpc_request_free(req);
612                 RETURN(rc);
613         }
614
615         /* overload the size and blocks fields in the oa with start/end */
616         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
617         LASSERT(body);
618         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
619                              oinfo->oi_oa);
620         osc_pack_capa(req, body, oinfo->oi_capa);
621
622         ptlrpc_request_set_replen(req);
623         req->rq_interpret_reply = osc_sync_interpret;
624
625         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
626         fa = ptlrpc_req_async_args(req);
627         fa->fa_oi = oinfo;
628         fa->fa_upcall = upcall;
629         fa->fa_cookie = cookie;
630
631         if (rqset == PTLRPCD_SET)
632                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
633         else
634                 ptlrpc_set_add_req(rqset, req);
635
636         RETURN (0);
637 }
638
639 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
640                     struct obd_info *oinfo, obd_size start, obd_size end,
641                     struct ptlrpc_request_set *set)
642 {
643         ENTRY;
644
645         if (!oinfo->oi_oa) {
646                 CDEBUG(D_INFO, "oa NULL\n");
647                 RETURN(-EINVAL);
648         }
649
650         oinfo->oi_oa->o_size = start;
651         oinfo->oi_oa->o_blocks = end;
652         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
653
654         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
655 }
656
657 /* Find and cancel locally locks matched by @mode in the resource found by
658  * @objid. Found locks are added into @cancel list. Returns the amount of
659  * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661                                    cfs_list_t *cancels,
662                                    ldlm_mode_t mode, int lock_flags)
663 {
664         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665         struct ldlm_res_id res_id;
666         struct ldlm_resource *res;
667         int count;
668         ENTRY;
669
670         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671          * export) but disabled through procfs (flag in NS).
672          *
673          * This distinguishes from a case when ELC is not supported originally,
674          * when we still want to cancel locks in advance and just cancel them
675          * locally, without sending any RPC. */
676         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
677                 RETURN(0);
678
679         ostid_build_res_name(&oa->o_oi, &res_id);
680         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681         if (res == NULL)
682                 RETURN(0);
683
684         LDLM_RESOURCE_ADDREF(res);
685         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686                                            lock_flags, 0, NULL);
687         LDLM_RESOURCE_DELREF(res);
688         ldlm_resource_putref(res);
689         RETURN(count);
690 }
691
692 static int osc_destroy_interpret(const struct lu_env *env,
693                                  struct ptlrpc_request *req, void *data,
694                                  int rc)
695 {
696         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697
698         cfs_atomic_dec(&cli->cl_destroy_in_flight);
699         cfs_waitq_signal(&cli->cl_destroy_waitq);
700         return 0;
701 }
702
703 static int osc_can_send_destroy(struct client_obd *cli)
704 {
705         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
706             cli->cl_max_rpcs_in_flight) {
707                 /* The destroy request can be sent */
708                 return 1;
709         }
710         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
711             cli->cl_max_rpcs_in_flight) {
712                 /*
713                  * The counter has been modified between the two atomic
714                  * operations.
715                  */
716                 cfs_waitq_signal(&cli->cl_destroy_waitq);
717         }
718         return 0;
719 }
720
721 int osc_create(const struct lu_env *env, struct obd_export *exp,
722                struct obdo *oa, struct lov_stripe_md **ea,
723                struct obd_trans_info *oti)
724 {
725         int rc = 0;
726         ENTRY;
727
728         LASSERT(oa);
729         LASSERT(ea);
730         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
731
732         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
733             oa->o_flags == OBD_FL_RECREATE_OBJS) {
734                 RETURN(osc_real_create(exp, oa, ea, oti));
735         }
736
737         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
738                 RETURN(osc_real_create(exp, oa, ea, oti));
739
740         /* we should not get here anymore */
741         LBUG();
742
743         RETURN(rc);
744 }
745
746 /* Destroy requests can be async always on the client, and we don't even really
747  * care about the return code since the client cannot do anything at all about
748  * a destroy failure.
749  * When the MDS is unlinking a filename, it saves the file objects into a
750  * recovery llog, and these object records are cancelled when the OST reports
751  * they were destroyed and sync'd to disk (i.e. transaction committed).
752  * If the client dies, or the OST is down when the object should be destroyed,
753  * the records are not cancelled, and when the OST reconnects to the MDS next,
754  * it will retrieve the llog unlink logs and then sends the log cancellation
755  * cookies to the MDS after committing destroy transactions. */
756 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
757                        struct obdo *oa, struct lov_stripe_md *ea,
758                        struct obd_trans_info *oti, struct obd_export *md_export,
759                        void *capa)
760 {
761         struct client_obd     *cli = &exp->exp_obd->u.cli;
762         struct ptlrpc_request *req;
763         struct ost_body       *body;
764         CFS_LIST_HEAD(cancels);
765         int rc, count;
766         ENTRY;
767
768         if (!oa) {
769                 CDEBUG(D_INFO, "oa NULL\n");
770                 RETURN(-EINVAL);
771         }
772
773         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
774                                         LDLM_FL_DISCARD_DATA);
775
776         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
777         if (req == NULL) {
778                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779                 RETURN(-ENOMEM);
780         }
781
782         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
783         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
784                                0, &cancels, count);
785         if (rc) {
786                 ptlrpc_request_free(req);
787                 RETURN(rc);
788         }
789
790         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
791         ptlrpc_at_set_req_timeout(req);
792
793         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
794                 oa->o_lcookie = *oti->oti_logcookies;
795         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
796         LASSERT(body);
797         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
798
799         osc_pack_capa(req, body, (struct obd_capa *)capa);
800         ptlrpc_request_set_replen(req);
801
802         /* If osc_destory is for destroying the unlink orphan,
803          * sent from MDT to OST, which should not be blocked here,
804          * because the process might be triggered by ptlrpcd, and
805          * it is not good to block ptlrpcd thread (b=16006)*/
806         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
807                 req->rq_interpret_reply = osc_destroy_interpret;
808                 if (!osc_can_send_destroy(cli)) {
809                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810                                                           NULL);
811
812                         /*
813                          * Wait until the number of on-going destroy RPCs drops
814                          * under max_rpc_in_flight
815                          */
816                         l_wait_event_exclusive(cli->cl_destroy_waitq,
817                                                osc_can_send_destroy(cli), &lwi);
818                 }
819         }
820
821         /* Do not wait for response */
822         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823         RETURN(0);
824 }
825
826 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
827                                 long writing_bytes)
828 {
829         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
830
831         LASSERT(!(oa->o_valid & bits));
832
833         oa->o_valid |= bits;
834         client_obd_list_lock(&cli->cl_loi_list_lock);
835         oa->o_dirty = cli->cl_dirty;
836         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
837                      cli->cl_dirty_max)) {
838                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
839                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
840                 oa->o_undirty = 0;
841         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
842                             cfs_atomic_read(&obd_dirty_transit_pages) >
843                             (long)(obd_max_dirty_pages + 1))) {
844                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
845                  * not covered by a lock thus they may safely race and trip
846                  * this CERROR() unless we add in a small fudge factor (+1). */
847                 CERROR("dirty %d - %d > system dirty_max %d\n",
848                        cfs_atomic_read(&obd_dirty_pages),
849                        cfs_atomic_read(&obd_dirty_transit_pages),
850                        obd_max_dirty_pages);
851                 oa->o_undirty = 0;
852         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
853                 CERROR("dirty %lu - dirty_max %lu too big???\n",
854                        cli->cl_dirty, cli->cl_dirty_max);
855                 oa->o_undirty = 0;
856         } else {
857                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
858                                       CFS_PAGE_SHIFT)*
859                                      (cli->cl_max_rpcs_in_flight + 1);
860                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
861         }
862         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
863         oa->o_dropped = cli->cl_lost_grant;
864         cli->cl_lost_grant = 0;
865         client_obd_list_unlock(&cli->cl_loi_list_lock);
866         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
867                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
868
869 }
870
871 void osc_update_next_shrink(struct client_obd *cli)
872 {
873         cli->cl_next_shrink_grant =
874                 cfs_time_shift(cli->cl_grant_shrink_interval);
875         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
876                cli->cl_next_shrink_grant);
877 }
878
879 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
880 {
881         client_obd_list_lock(&cli->cl_loi_list_lock);
882         cli->cl_avail_grant += grant;
883         client_obd_list_unlock(&cli->cl_loi_list_lock);
884 }
885
886 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
887 {
888         if (body->oa.o_valid & OBD_MD_FLGRANT) {
889                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
890                 __osc_update_grant(cli, body->oa.o_grant);
891         }
892 }
893
894 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
895                               obd_count keylen, void *key, obd_count vallen,
896                               void *val, struct ptlrpc_request_set *set);
897
898 static int osc_shrink_grant_interpret(const struct lu_env *env,
899                                       struct ptlrpc_request *req,
900                                       void *aa, int rc)
901 {
902         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
903         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
904         struct ost_body *body;
905
906         if (rc != 0) {
907                 __osc_update_grant(cli, oa->o_grant);
908                 GOTO(out, rc);
909         }
910
911         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
912         LASSERT(body);
913         osc_update_grant(cli, body);
914 out:
915         OBDO_FREE(oa);
916         return rc;
917 }
918
919 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
920 {
921         client_obd_list_lock(&cli->cl_loi_list_lock);
922         oa->o_grant = cli->cl_avail_grant / 4;
923         cli->cl_avail_grant -= oa->o_grant;
924         client_obd_list_unlock(&cli->cl_loi_list_lock);
925         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
926                 oa->o_valid |= OBD_MD_FLFLAGS;
927                 oa->o_flags = 0;
928         }
929         oa->o_flags |= OBD_FL_SHRINK_GRANT;
930         osc_update_next_shrink(cli);
931 }
932
933 /* Shrink the current grant, either from some large amount to enough for a
934  * full set of in-flight RPCs, or if we have already shrunk to that limit
935  * then to enough for a single RPC.  This avoids keeping more grant than
936  * needed, and avoids shrinking the grant piecemeal. */
937 static int osc_shrink_grant(struct client_obd *cli)
938 {
939         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
940                              (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
941
942         client_obd_list_lock(&cli->cl_loi_list_lock);
943         if (cli->cl_avail_grant <= target_bytes)
944                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
945         client_obd_list_unlock(&cli->cl_loi_list_lock);
946
947         return osc_shrink_grant_to_target(cli, target_bytes);
948 }
949
950 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
951 {
952         int                     rc = 0;
953         struct ost_body        *body;
954         ENTRY;
955
956         client_obd_list_lock(&cli->cl_loi_list_lock);
957         /* Don't shrink if we are already above or below the desired limit
958          * We don't want to shrink below a single RPC, as that will negatively
959          * impact block allocation and long-term performance. */
960         if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
961                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
962
963         if (target_bytes >= cli->cl_avail_grant) {
964                 client_obd_list_unlock(&cli->cl_loi_list_lock);
965                 RETURN(0);
966         }
967         client_obd_list_unlock(&cli->cl_loi_list_lock);
968
969         OBD_ALLOC_PTR(body);
970         if (!body)
971                 RETURN(-ENOMEM);
972
973         osc_announce_cached(cli, &body->oa, 0);
974
975         client_obd_list_lock(&cli->cl_loi_list_lock);
976         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
977         cli->cl_avail_grant = target_bytes;
978         client_obd_list_unlock(&cli->cl_loi_list_lock);
979         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
980                 body->oa.o_valid |= OBD_MD_FLFLAGS;
981                 body->oa.o_flags = 0;
982         }
983         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
984         osc_update_next_shrink(cli);
985
986         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
987                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
988                                 sizeof(*body), body, NULL);
989         if (rc != 0)
990                 __osc_update_grant(cli, body->oa.o_grant);
991         OBD_FREE_PTR(body);
992         RETURN(rc);
993 }
994
995 static int osc_should_shrink_grant(struct client_obd *client)
996 {
997         cfs_time_t time = cfs_time_current();
998         cfs_time_t next_shrink = client->cl_next_shrink_grant;
999
1000         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1001              OBD_CONNECT_GRANT_SHRINK) == 0)
1002                 return 0;
1003
1004         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1005                 /* Get the current RPC size directly, instead of going via:
1006                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1007                  * Keep comment here so that it can be found by searching. */
1008                 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1009
1010                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1011                     client->cl_avail_grant > brw_size)
1012                         return 1;
1013                 else
1014                         osc_update_next_shrink(client);
1015         }
1016         return 0;
1017 }
1018
1019 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1020 {
1021         struct client_obd *client;
1022
1023         cfs_list_for_each_entry(client, &item->ti_obd_list,
1024                                 cl_grant_shrink_list) {
1025                 if (osc_should_shrink_grant(client))
1026                         osc_shrink_grant(client);
1027         }
1028         return 0;
1029 }
1030
1031 static int osc_add_shrink_grant(struct client_obd *client)
1032 {
1033         int rc;
1034
1035         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1036                                        TIMEOUT_GRANT,
1037                                        osc_grant_shrink_grant_cb, NULL,
1038                                        &client->cl_grant_shrink_list);
1039         if (rc) {
1040                 CERROR("add grant client %s error %d\n",
1041                         client->cl_import->imp_obd->obd_name, rc);
1042                 return rc;
1043         }
1044         CDEBUG(D_CACHE, "add grant client %s \n",
1045                client->cl_import->imp_obd->obd_name);
1046         osc_update_next_shrink(client);
1047         return 0;
1048 }
1049
1050 static int osc_del_shrink_grant(struct client_obd *client)
1051 {
1052         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1053                                          TIMEOUT_GRANT);
1054 }
1055
1056 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1057 {
1058         /*
1059          * ocd_grant is the total grant amount we're expect to hold: if we've
1060          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1061          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1062          *
1063          * race is tolerable here: if we're evicted, but imp_state already
1064          * left EVICTED state, then cl_dirty must be 0 already.
1065          */
1066         client_obd_list_lock(&cli->cl_loi_list_lock);
1067         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1068                 cli->cl_avail_grant = ocd->ocd_grant;
1069         else
1070                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1071
1072         if (cli->cl_avail_grant < 0) {
1073                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1074                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1075                       ocd->ocd_grant, cli->cl_dirty);
1076                 /* workaround for servers which do not have the patch from
1077                  * LU-2679 */
1078                 cli->cl_avail_grant = ocd->ocd_grant;
1079         }
1080
1081         /* determine the appropriate chunk size used by osc_extent. */
1082         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1083         client_obd_list_unlock(&cli->cl_loi_list_lock);
1084
1085         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1086                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1087                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1088
1089         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1090             cfs_list_empty(&cli->cl_grant_shrink_list))
1091                 osc_add_shrink_grant(cli);
1092 }
1093
1094 /* We assume that the reason this OSC got a short read is because it read
1095  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1096  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1097  * this stripe never got written at or beyond this stripe offset yet. */
1098 static void handle_short_read(int nob_read, obd_count page_count,
1099                               struct brw_page **pga)
1100 {
1101         char *ptr;
1102         int i = 0;
1103
1104         /* skip bytes read OK */
1105         while (nob_read > 0) {
1106                 LASSERT (page_count > 0);
1107
1108                 if (pga[i]->count > nob_read) {
1109                         /* EOF inside this page */
1110                         ptr = cfs_kmap(pga[i]->pg) +
1111                                 (pga[i]->off & ~CFS_PAGE_MASK);
1112                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1113                         cfs_kunmap(pga[i]->pg);
1114                         page_count--;
1115                         i++;
1116                         break;
1117                 }
1118
1119                 nob_read -= pga[i]->count;
1120                 page_count--;
1121                 i++;
1122         }
1123
1124         /* zero remaining pages */
1125         while (page_count-- > 0) {
1126                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1127                 memset(ptr, 0, pga[i]->count);
1128                 cfs_kunmap(pga[i]->pg);
1129                 i++;
1130         }
1131 }
1132
1133 static int check_write_rcs(struct ptlrpc_request *req,
1134                            int requested_nob, int niocount,
1135                            obd_count page_count, struct brw_page **pga)
1136 {
1137         int     i;
1138         __u32   *remote_rcs;
1139
1140         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1141                                                   sizeof(*remote_rcs) *
1142                                                   niocount);
1143         if (remote_rcs == NULL) {
1144                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1145                 return(-EPROTO);
1146         }
1147
1148         /* return error if any niobuf was in error */
1149         for (i = 0; i < niocount; i++) {
1150                 if ((int)remote_rcs[i] < 0)
1151                         return(remote_rcs[i]);
1152
1153                 if (remote_rcs[i] != 0) {
1154                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1155                                 i, remote_rcs[i], req);
1156                         return(-EPROTO);
1157                 }
1158         }
1159
1160         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1161                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1162                        req->rq_bulk->bd_nob_transferred, requested_nob);
1163                 return(-EPROTO);
1164         }
1165
1166         return (0);
1167 }
1168
1169 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1170 {
1171         if (p1->flag != p2->flag) {
1172                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1173                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1174
1175                 /* warn if we try to combine flags that we don't know to be
1176                  * safe to combine */
1177                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1178                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1179                               "report this at http://bugs.whamcloud.com/\n",
1180                               p1->flag, p2->flag);
1181                 }
1182                 return 0;
1183         }
1184
1185         return (p1->off + p1->count == p2->off);
1186 }
1187
1188 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1189                                    struct brw_page **pga, int opc,
1190                                    cksum_type_t cksum_type)
1191 {
1192         __u32                           cksum;
1193         int                             i = 0;
1194         struct cfs_crypto_hash_desc     *hdesc;
1195         unsigned int                    bufsize;
1196         int                             err;
1197         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1198
1199         LASSERT(pg_count > 0);
1200
1201         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1202         if (IS_ERR(hdesc)) {
1203                 CERROR("Unable to initialize checksum hash %s\n",
1204                        cfs_crypto_hash_name(cfs_alg));
1205                 return PTR_ERR(hdesc);
1206         }
1207
1208         while (nob > 0 && pg_count > 0) {
1209                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1210
1211                 /* corrupt the data before we compute the checksum, to
1212                  * simulate an OST->client data error */
1213                 if (i == 0 && opc == OST_READ &&
1214                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1215                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1216                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1217                         memcpy(ptr + off, "bad1", min(4, nob));
1218                         cfs_kunmap(pga[i]->pg);
1219                 }
1220                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1221                                   pga[i]->off & ~CFS_PAGE_MASK,
1222                                   count);
1223                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1224                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1225
1226                 nob -= pga[i]->count;
1227                 pg_count--;
1228                 i++;
1229         }
1230
1231         bufsize = 4;
1232         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1233
1234         if (err)
1235                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1236
1237         /* For sending we only compute the wrong checksum instead
1238          * of corrupting the data so it is still correct on a redo */
1239         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1240                 cksum++;
1241
1242         return cksum;
1243 }
1244
1245 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1246                                 struct lov_stripe_md *lsm, obd_count page_count,
1247                                 struct brw_page **pga,
1248                                 struct ptlrpc_request **reqp,
1249                                 struct obd_capa *ocapa, int reserve,
1250                                 int resend)
1251 {
1252         struct ptlrpc_request   *req;
1253         struct ptlrpc_bulk_desc *desc;
1254         struct ost_body         *body;
1255         struct obd_ioobj        *ioobj;
1256         struct niobuf_remote    *niobuf;
1257         int niocount, i, requested_nob, opc, rc;
1258         struct osc_brw_async_args *aa;
1259         struct req_capsule      *pill;
1260         struct brw_page *pg_prev;
1261
1262         ENTRY;
1263         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1264                 RETURN(-ENOMEM); /* Recoverable */
1265         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1266                 RETURN(-EINVAL); /* Fatal */
1267
1268         if ((cmd & OBD_BRW_WRITE) != 0) {
1269                 opc = OST_WRITE;
1270                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1271                                                 cli->cl_import->imp_rq_pool,
1272                                                 &RQF_OST_BRW_WRITE);
1273         } else {
1274                 opc = OST_READ;
1275                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1276         }
1277         if (req == NULL)
1278                 RETURN(-ENOMEM);
1279
1280         for (niocount = i = 1; i < page_count; i++) {
1281                 if (!can_merge_pages(pga[i - 1], pga[i]))
1282                         niocount++;
1283         }
1284
1285         pill = &req->rq_pill;
1286         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1287                              sizeof(*ioobj));
1288         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1289                              niocount * sizeof(*niobuf));
1290         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1291
1292         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1293         if (rc) {
1294                 ptlrpc_request_free(req);
1295                 RETURN(rc);
1296         }
1297         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1298         ptlrpc_at_set_req_timeout(req);
1299         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1300          * retry logic */
1301         req->rq_no_retry_einprogress = 1;
1302
1303         desc = ptlrpc_prep_bulk_imp(req, page_count,
1304                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1305                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306                 OST_BULK_PORTAL);
1307
1308         if (desc == NULL)
1309                 GOTO(out, rc = -ENOMEM);
1310         /* NB request now owns desc and will free it when it gets freed */
1311
1312         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1313         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1314         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1315         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1316
1317         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1318
1319         obdo_to_ioobj(oa, ioobj);
1320         ioobj->ioo_bufcnt = niocount;
1321         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1322          * that might be send for this request.  The actual number is decided
1323          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1324          * "max - 1" for old client compatibility sending "0", and also so the
1325          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1326         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1327         osc_pack_capa(req, body, ocapa);
1328         LASSERT(page_count > 0);
1329         pg_prev = pga[0];
1330         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1331                 struct brw_page *pg = pga[i];
1332                 int poff = pg->off & ~CFS_PAGE_MASK;
1333
1334                 LASSERT(pg->count > 0);
1335                 /* make sure there is no gap in the middle of page array */
1336                 LASSERTF(page_count == 1 ||
1337                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1338                           ergo(i > 0 && i < page_count - 1,
1339                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1340                           ergo(i == page_count - 1, poff == 0)),
1341                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1342                          i, page_count, pg, pg->off, pg->count);
1343 #ifdef __linux__
1344                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1345                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1346                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1347                          i, page_count,
1348                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1349                          pg_prev->pg, page_private(pg_prev->pg),
1350                          pg_prev->pg->index, pg_prev->off);
1351 #else
1352                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1353                          "i %d p_c %u\n", i, page_count);
1354 #endif
1355                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1356                         (pg->flag & OBD_BRW_SRVLOCK));
1357
1358                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1359                 requested_nob += pg->count;
1360
1361                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1362                         niobuf--;
1363                         niobuf->len += pg->count;
1364                 } else {
1365                         niobuf->offset = pg->off;
1366                         niobuf->len    = pg->count;
1367                         niobuf->flags  = pg->flag;
1368                 }
1369                 pg_prev = pg;
1370         }
1371
1372         LASSERTF((void *)(niobuf - niocount) ==
1373                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1374                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1375                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1376
1377         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1378         if (resend) {
1379                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1380                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1381                         body->oa.o_flags = 0;
1382                 }
1383                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1384         }
1385
1386         if (osc_should_shrink_grant(cli))
1387                 osc_shrink_grant_local(cli, &body->oa);
1388
1389         /* size[REQ_REC_OFF] still sizeof (*body) */
1390         if (opc == OST_WRITE) {
1391                 if (cli->cl_checksum &&
1392                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1393                         /* store cl_cksum_type in a local variable since
1394                          * it can be changed via lprocfs */
1395                         cksum_type_t cksum_type = cli->cl_cksum_type;
1396
1397                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1398                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1399                                 body->oa.o_flags = 0;
1400                         }
1401                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1402                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1404                                                              page_count, pga,
1405                                                              OST_WRITE,
1406                                                              cksum_type);
1407                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1408                                body->oa.o_cksum);
1409                         /* save this in 'oa', too, for later checking */
1410                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1411                         oa->o_flags |= cksum_type_pack(cksum_type);
1412                 } else {
1413                         /* clear out the checksum flag, in case this is a
1414                          * resend but cl_checksum is no longer set. b=11238 */
1415                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1416                 }
1417                 oa->o_cksum = body->oa.o_cksum;
1418                 /* 1 RC per niobuf */
1419                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1420                                      sizeof(__u32) * niocount);
1421         } else {
1422                 if (cli->cl_checksum &&
1423                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1424                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1425                                 body->oa.o_flags = 0;
1426                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1427                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1428                 }
1429         }
1430         ptlrpc_request_set_replen(req);
1431
1432         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1433         aa = ptlrpc_req_async_args(req);
1434         aa->aa_oa = oa;
1435         aa->aa_requested_nob = requested_nob;
1436         aa->aa_nio_count = niocount;
1437         aa->aa_page_count = page_count;
1438         aa->aa_resends = 0;
1439         aa->aa_ppga = pga;
1440         aa->aa_cli = cli;
1441         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1442         if (ocapa && reserve)
1443                 aa->aa_ocapa = capa_get(ocapa);
1444
1445         *reqp = req;
1446         RETURN(0);
1447
1448  out:
1449         ptlrpc_req_finished(req);
1450         RETURN(rc);
1451 }
1452
1453 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1454                                 __u32 client_cksum, __u32 server_cksum, int nob,
1455                                 obd_count page_count, struct brw_page **pga,
1456                                 cksum_type_t client_cksum_type)
1457 {
1458         __u32 new_cksum;
1459         char *msg;
1460         cksum_type_t cksum_type;
1461
1462         if (server_cksum == client_cksum) {
1463                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1464                 return 0;
1465         }
1466
1467         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1468                                        oa->o_flags : 0);
1469         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1470                                       cksum_type);
1471
1472         if (cksum_type != client_cksum_type)
1473                 msg = "the server did not use the checksum type specified in "
1474                       "the original request - likely a protocol problem";
1475         else if (new_cksum == server_cksum)
1476                 msg = "changed on the client after we checksummed it - "
1477                       "likely false positive due to mmap IO (bug 11742)";
1478         else if (new_cksum == client_cksum)
1479                 msg = "changed in transit before arrival at OST";
1480         else
1481                 msg = "changed in transit AND doesn't match the original - "
1482                       "likely false positive due to mmap IO (bug 11742)";
1483
1484         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1485                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1486                            msg, libcfs_nid2str(peer->nid),
1487                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1488                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1489                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1490                            POSTID(&oa->o_oi), pga[0]->off,
1491                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1492         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1493                "client csum now %x\n", client_cksum, client_cksum_type,
1494                server_cksum, cksum_type, new_cksum);
1495         return 1;
1496 }
1497
1498 /* Note rc enters this function as number of bytes transferred */
1499 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1500 {
1501         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1502         const lnet_process_id_t *peer =
1503                         &req->rq_import->imp_connection->c_peer;
1504         struct client_obd *cli = aa->aa_cli;
1505         struct ost_body *body;
1506         __u32 client_cksum = 0;
1507         ENTRY;
1508
1509         if (rc < 0 && rc != -EDQUOT) {
1510                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1511                 RETURN(rc);
1512         }
1513
1514         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1515         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1516         if (body == NULL) {
1517                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1518                 RETURN(-EPROTO);
1519         }
1520
1521         /* set/clear over quota flag for a uid/gid */
1522         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1523             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1524                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1525
1526                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1527                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1528                        body->oa.o_flags);
1529                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1530         }
1531
1532         osc_update_grant(cli, body);
1533
1534         if (rc < 0)
1535                 RETURN(rc);
1536
1537         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1538                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1539
1540         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1541                 if (rc > 0) {
1542                         CERROR("Unexpected +ve rc %d\n", rc);
1543                         RETURN(-EPROTO);
1544                 }
1545                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1546
1547                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1548                         RETURN(-EAGAIN);
1549
1550                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1551                     check_write_checksum(&body->oa, peer, client_cksum,
1552                                          body->oa.o_cksum, aa->aa_requested_nob,
1553                                          aa->aa_page_count, aa->aa_ppga,
1554                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1555                         RETURN(-EAGAIN);
1556
1557                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1558                                      aa->aa_page_count, aa->aa_ppga);
1559                 GOTO(out, rc);
1560         }
1561
1562         /* The rest of this function executes only for OST_READs */
1563
1564         /* if unwrap_bulk failed, return -EAGAIN to retry */
1565         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1566         if (rc < 0)
1567                 GOTO(out, rc = -EAGAIN);
1568
1569         if (rc > aa->aa_requested_nob) {
1570                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1571                        aa->aa_requested_nob);
1572                 RETURN(-EPROTO);
1573         }
1574
1575         if (rc != req->rq_bulk->bd_nob_transferred) {
1576                 CERROR ("Unexpected rc %d (%d transferred)\n",
1577                         rc, req->rq_bulk->bd_nob_transferred);
1578                 return (-EPROTO);
1579         }
1580
1581         if (rc < aa->aa_requested_nob)
1582                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1583
1584         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1585                 static int cksum_counter;
1586                 __u32      server_cksum = body->oa.o_cksum;
1587                 char      *via;
1588                 char      *router;
1589                 cksum_type_t cksum_type;
1590
1591                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1592                                                body->oa.o_flags : 0);
1593                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1594                                                  aa->aa_ppga, OST_READ,
1595                                                  cksum_type);
1596
1597                 if (peer->nid == req->rq_bulk->bd_sender) {
1598                         via = router = "";
1599                 } else {
1600                         via = " via ";
1601                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1602                 }
1603
1604                 if (server_cksum == ~0 && rc > 0) {
1605                         CERROR("Protocol error: server %s set the 'checksum' "
1606                                "bit, but didn't send a checksum.  Not fatal, "
1607                                "but please notify on http://bugs.whamcloud.com/\n",
1608                                libcfs_nid2str(peer->nid));
1609                 } else if (server_cksum != client_cksum) {
1610                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1611                                            "%s%s%s inode "DFID" object "DOSTID
1612                                            " extent ["LPU64"-"LPU64"]\n",
1613                                            req->rq_import->imp_obd->obd_name,
1614                                            libcfs_nid2str(peer->nid),
1615                                            via, router,
1616                                            body->oa.o_valid & OBD_MD_FLFID ?
1617                                                 body->oa.o_parent_seq : (__u64)0,
1618                                            body->oa.o_valid & OBD_MD_FLFID ?
1619                                                 body->oa.o_parent_oid : 0,
1620                                            body->oa.o_valid & OBD_MD_FLFID ?
1621                                                 body->oa.o_parent_ver : 0,
1622                                            POSTID(&body->oa.o_oi),
1623                                            aa->aa_ppga[0]->off,
1624                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1625                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1626                                                                         1);
1627                         CERROR("client %x, server %x, cksum_type %x\n",
1628                                client_cksum, server_cksum, cksum_type);
1629                         cksum_counter = 0;
1630                         aa->aa_oa->o_cksum = client_cksum;
1631                         rc = -EAGAIN;
1632                 } else {
1633                         cksum_counter++;
1634                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1635                         rc = 0;
1636                 }
1637         } else if (unlikely(client_cksum)) {
1638                 static int cksum_missed;
1639
1640                 cksum_missed++;
1641                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1642                         CERROR("Checksum %u requested from %s but not sent\n",
1643                                cksum_missed, libcfs_nid2str(peer->nid));
1644         } else {
1645                 rc = 0;
1646         }
1647 out:
1648         if (rc >= 0)
1649                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1650                                      aa->aa_oa, &body->oa);
1651
1652         RETURN(rc);
1653 }
1654
1655 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1656                             struct lov_stripe_md *lsm,
1657                             obd_count page_count, struct brw_page **pga,
1658                             struct obd_capa *ocapa)
1659 {
1660         struct ptlrpc_request *req;
1661         int                    rc;
1662         cfs_waitq_t            waitq;
1663         int                    generation, resends = 0;
1664         struct l_wait_info     lwi;
1665
1666         ENTRY;
1667
1668         cfs_waitq_init(&waitq);
1669         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1670
1671 restart_bulk:
1672         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1673                                   page_count, pga, &req, ocapa, 0, resends);
1674         if (rc != 0)
1675                 return (rc);
1676
1677         if (resends) {
1678                 req->rq_generation_set = 1;
1679                 req->rq_import_generation = generation;
1680                 req->rq_sent = cfs_time_current_sec() + resends;
1681         }
1682
1683         rc = ptlrpc_queue_wait(req);
1684
1685         if (rc == -ETIMEDOUT && req->rq_resend) {
1686                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1687                 ptlrpc_req_finished(req);
1688                 goto restart_bulk;
1689         }
1690
1691         rc = osc_brw_fini_request(req, rc);
1692
1693         ptlrpc_req_finished(req);
1694         /* When server return -EINPROGRESS, client should always retry
1695          * regardless of the number of times the bulk was resent already.*/
1696         if (osc_recoverable_error(rc)) {
1697                 resends++;
1698                 if (rc != -EINPROGRESS &&
1699                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1700                         CERROR("%s: too many resend retries for object: "
1701                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1702                                POSTID(&oa->o_oi), rc);
1703                         goto out;
1704                 }
1705                 if (generation !=
1706                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1707                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1708                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1709                                POSTID(&oa->o_oi), rc);
1710                         goto out;
1711                 }
1712
1713                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1714                                        NULL);
1715                 l_wait_event(waitq, 0, &lwi);
1716
1717                 goto restart_bulk;
1718         }
1719 out:
1720         if (rc == -EAGAIN || rc == -EINPROGRESS)
1721                 rc = -EIO;
1722         RETURN (rc);
1723 }
1724
1725 static int osc_brw_redo_request(struct ptlrpc_request *request,
1726                                 struct osc_brw_async_args *aa, int rc)
1727 {
1728         struct ptlrpc_request *new_req;
1729         struct osc_brw_async_args *new_aa;
1730         struct osc_async_page *oap;
1731         ENTRY;
1732
1733         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1734                   "redo for recoverable error %d", rc);
1735
1736         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1737                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1738                                   aa->aa_cli, aa->aa_oa,
1739                                   NULL /* lsm unused by osc currently */,
1740                                   aa->aa_page_count, aa->aa_ppga,
1741                                   &new_req, aa->aa_ocapa, 0, 1);
1742         if (rc)
1743                 RETURN(rc);
1744
1745         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1746                 if (oap->oap_request != NULL) {
1747                         LASSERTF(request == oap->oap_request,
1748                                  "request %p != oap_request %p\n",
1749                                  request, oap->oap_request);
1750                         if (oap->oap_interrupted) {
1751                                 ptlrpc_req_finished(new_req);
1752                                 RETURN(-EINTR);
1753                         }
1754                 }
1755         }
1756         /* New request takes over pga and oaps from old request.
1757          * Note that copying a list_head doesn't work, need to move it... */
1758         aa->aa_resends++;
1759         new_req->rq_interpret_reply = request->rq_interpret_reply;
1760         new_req->rq_async_args = request->rq_async_args;
1761         /* cap resend delay to the current request timeout, this is similar to
1762          * what ptlrpc does (see after_reply()) */
1763         if (aa->aa_resends > new_req->rq_timeout)
1764                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1765         else
1766                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1767         new_req->rq_generation_set = 1;
1768         new_req->rq_import_generation = request->rq_import_generation;
1769
1770         new_aa = ptlrpc_req_async_args(new_req);
1771
1772         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1773         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1774         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1775         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1776         new_aa->aa_resends = aa->aa_resends;
1777
1778         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1779                 if (oap->oap_request) {
1780                         ptlrpc_req_finished(oap->oap_request);
1781                         oap->oap_request = ptlrpc_request_addref(new_req);
1782                 }
1783         }
1784
1785         new_aa->aa_ocapa = aa->aa_ocapa;
1786         aa->aa_ocapa = NULL;
1787
1788         /* XXX: This code will run into problem if we're going to support
1789          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1790          * and wait for all of them to be finished. We should inherit request
1791          * set from old request. */
1792         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1793
1794         DEBUG_REQ(D_INFO, new_req, "new request");
1795         RETURN(0);
1796 }
1797
1798 /*
1799  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1800  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1801  * fine for our small page arrays and doesn't require allocation.  its an
1802  * insertion sort that swaps elements that are strides apart, shrinking the
1803  * stride down until its '1' and the array is sorted.
1804  */
1805 static void sort_brw_pages(struct brw_page **array, int num)
1806 {
1807         int stride, i, j;
1808         struct brw_page *tmp;
1809
1810         if (num == 1)
1811                 return;
1812         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1813                 ;
1814
1815         do {
1816                 stride /= 3;
1817                 for (i = stride ; i < num ; i++) {
1818                         tmp = array[i];
1819                         j = i;
1820                         while (j >= stride && array[j - stride]->off > tmp->off) {
1821                                 array[j] = array[j - stride];
1822                                 j -= stride;
1823                         }
1824                         array[j] = tmp;
1825                 }
1826         } while (stride > 1);
1827 }
1828
1829 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1830 {
1831         int count = 1;
1832         int offset;
1833         int i = 0;
1834
1835         LASSERT (pages > 0);
1836         offset = pg[i]->off & ~CFS_PAGE_MASK;
1837
1838         for (;;) {
1839                 pages--;
1840                 if (pages == 0)         /* that's all */
1841                         return count;
1842
1843                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1844                         return count;   /* doesn't end on page boundary */
1845
1846                 i++;
1847                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1848                 if (offset != 0)        /* doesn't start on page boundary */
1849                         return count;
1850
1851                 count++;
1852         }
1853 }
1854
1855 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1856 {
1857         struct brw_page **ppga;
1858         int i;
1859
1860         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1861         if (ppga == NULL)
1862                 return NULL;
1863
1864         for (i = 0; i < count; i++)
1865                 ppga[i] = pga + i;
1866         return ppga;
1867 }
1868
1869 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1870 {
1871         LASSERT(ppga != NULL);
1872         OBD_FREE(ppga, sizeof(*ppga) * count);
1873 }
1874
1875 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1876                    obd_count page_count, struct brw_page *pga,
1877                    struct obd_trans_info *oti)
1878 {
1879         struct obdo *saved_oa = NULL;
1880         struct brw_page **ppga, **orig;
1881         struct obd_import *imp = class_exp2cliimp(exp);
1882         struct client_obd *cli;
1883         int rc, page_count_orig;
1884         ENTRY;
1885
1886         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1887         cli = &imp->imp_obd->u.cli;
1888
1889         if (cmd & OBD_BRW_CHECK) {
1890                 /* The caller just wants to know if there's a chance that this
1891                  * I/O can succeed */
1892
1893                 if (imp->imp_invalid)
1894                         RETURN(-EIO);
1895                 RETURN(0);
1896         }
1897
1898         /* test_brw with a failed create can trip this, maybe others. */
1899         LASSERT(cli->cl_max_pages_per_rpc);
1900
1901         rc = 0;
1902
1903         orig = ppga = osc_build_ppga(pga, page_count);
1904         if (ppga == NULL)
1905                 RETURN(-ENOMEM);
1906         page_count_orig = page_count;
1907
1908         sort_brw_pages(ppga, page_count);
1909         while (page_count) {
1910                 obd_count pages_per_brw;
1911
1912                 if (page_count > cli->cl_max_pages_per_rpc)
1913                         pages_per_brw = cli->cl_max_pages_per_rpc;
1914                 else
1915                         pages_per_brw = page_count;
1916
1917                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1918
1919                 if (saved_oa != NULL) {
1920                         /* restore previously saved oa */
1921                         *oinfo->oi_oa = *saved_oa;
1922                 } else if (page_count > pages_per_brw) {
1923                         /* save a copy of oa (brw will clobber it) */
1924                         OBDO_ALLOC(saved_oa);
1925                         if (saved_oa == NULL)
1926                                 GOTO(out, rc = -ENOMEM);
1927                         *saved_oa = *oinfo->oi_oa;
1928                 }
1929
1930                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1931                                       pages_per_brw, ppga, oinfo->oi_capa);
1932
1933                 if (rc != 0)
1934                         break;
1935
1936                 page_count -= pages_per_brw;
1937                 ppga += pages_per_brw;
1938         }
1939
1940 out:
1941         osc_release_ppga(orig, page_count_orig);
1942
1943         if (saved_oa != NULL)
1944                 OBDO_FREE(saved_oa);
1945
1946         RETURN(rc);
1947 }
1948
1949 static int brw_interpret(const struct lu_env *env,
1950                          struct ptlrpc_request *req, void *data, int rc)
1951 {
1952         struct osc_brw_async_args *aa = data;
1953         struct osc_extent *ext;
1954         struct osc_extent *tmp;
1955         struct cl_object  *obj = NULL;
1956         struct client_obd *cli = aa->aa_cli;
1957         ENTRY;
1958
1959         rc = osc_brw_fini_request(req, rc);
1960         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1961         /* When server return -EINPROGRESS, client should always retry
1962          * regardless of the number of times the bulk was resent already. */
1963         if (osc_recoverable_error(rc)) {
1964                 if (req->rq_import_generation !=
1965                     req->rq_import->imp_generation) {
1966                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1967                                ""DOSTID", rc = %d.\n",
1968                                req->rq_import->imp_obd->obd_name,
1969                                POSTID(&aa->aa_oa->o_oi), rc);
1970                 } else if (rc == -EINPROGRESS ||
1971                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1972                         rc = osc_brw_redo_request(req, aa, rc);
1973                 } else {
1974                         CERROR("%s: too many resent retries for object: "
1975                                ""LPU64":"LPU64", rc = %d.\n",
1976                                req->rq_import->imp_obd->obd_name,
1977                                POSTID(&aa->aa_oa->o_oi), rc);
1978                 }
1979
1980                 if (rc == 0)
1981                         RETURN(0);
1982                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1983                         rc = -EIO;
1984         }
1985
1986         if (aa->aa_ocapa) {
1987                 capa_put(aa->aa_ocapa);
1988                 aa->aa_ocapa = NULL;
1989         }
1990
1991         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1992                 if (obj == NULL && rc == 0) {
1993                         obj = osc2cl(ext->oe_obj);
1994                         cl_object_get(obj);
1995                 }
1996
1997                 cfs_list_del_init(&ext->oe_link);
1998                 osc_extent_finish(env, ext, 1, rc);
1999         }
2000         LASSERT(cfs_list_empty(&aa->aa_exts));
2001         LASSERT(cfs_list_empty(&aa->aa_oaps));
2002
2003         if (obj != NULL) {
2004                 struct obdo *oa = aa->aa_oa;
2005                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
2006                 unsigned long valid = 0;
2007
2008                 LASSERT(rc == 0);
2009                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2010                         attr->cat_blocks = oa->o_blocks;
2011                         valid |= CAT_BLOCKS;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLMTIME) {
2014                         attr->cat_mtime = oa->o_mtime;
2015                         valid |= CAT_MTIME;
2016                 }
2017                 if (oa->o_valid & OBD_MD_FLATIME) {
2018                         attr->cat_atime = oa->o_atime;
2019                         valid |= CAT_ATIME;
2020                 }
2021                 if (oa->o_valid & OBD_MD_FLCTIME) {
2022                         attr->cat_ctime = oa->o_ctime;
2023                         valid |= CAT_CTIME;
2024                 }
2025                 if (valid != 0) {
2026                         cl_object_attr_lock(obj);
2027                         cl_object_attr_set(env, obj, attr, valid);
2028                         cl_object_attr_unlock(obj);
2029                 }
2030                 cl_object_put(env, obj);
2031         }
2032         OBDO_FREE(aa->aa_oa);
2033
2034         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2035                           req->rq_bulk->bd_nob_transferred);
2036         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2037         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2038
2039         client_obd_list_lock(&cli->cl_loi_list_lock);
2040         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2041          * is called so we know whether to go to sync BRWs or wait for more
2042          * RPCs to complete */
2043         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2044                 cli->cl_w_in_flight--;
2045         else
2046                 cli->cl_r_in_flight--;
2047         osc_wake_cache_waiters(cli);
2048         client_obd_list_unlock(&cli->cl_loi_list_lock);
2049
2050         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2051         RETURN(rc);
2052 }
2053
2054 /**
2055  * Build an RPC by the list of extent @ext_list. The caller must ensure
2056  * that the total pages in this list are NOT over max pages per RPC.
2057  * Extents in the list must be in OES_RPC state.
2058  */
2059 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2060                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2061 {
2062         struct ptlrpc_request           *req = NULL;
2063         struct osc_extent               *ext;
2064         struct brw_page                 **pga = NULL;
2065         struct osc_brw_async_args       *aa = NULL;
2066         struct obdo                     *oa = NULL;
2067         struct osc_async_page           *oap;
2068         struct osc_async_page           *tmp;
2069         struct cl_req                   *clerq = NULL;
2070         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2071                                                                       CRT_READ;
2072         struct ldlm_lock                *lock = NULL;
2073         struct cl_req_attr              *crattr = NULL;
2074         obd_off                         starting_offset = OBD_OBJECT_EOF;
2075         obd_off                         ending_offset = 0;
2076         int                             mpflag = 0;
2077         int                             mem_tight = 0;
2078         int                             page_count = 0;
2079         int                             i;
2080         int                             rc;
2081         CFS_LIST_HEAD(rpc_list);
2082
2083         ENTRY;
2084         LASSERT(!cfs_list_empty(ext_list));
2085
2086         /* add pages into rpc_list to build BRW rpc */
2087         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2088                 LASSERT(ext->oe_state == OES_RPC);
2089                 mem_tight |= ext->oe_memalloc;
2090                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2091                         ++page_count;
2092                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2093                         if (starting_offset > oap->oap_obj_off)
2094                                 starting_offset = oap->oap_obj_off;
2095                         else
2096                                 LASSERT(oap->oap_page_off == 0);
2097                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2098                                 ending_offset = oap->oap_obj_off +
2099                                                 oap->oap_count;
2100                         else
2101                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2102                                         CFS_PAGE_SIZE);
2103                 }
2104         }
2105
2106         if (mem_tight)
2107                 mpflag = cfs_memory_pressure_get_and_set();
2108
2109         OBD_ALLOC(crattr, sizeof(*crattr));
2110         if (crattr == NULL)
2111                 GOTO(out, rc = -ENOMEM);
2112
2113         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2114         if (pga == NULL)
2115                 GOTO(out, rc = -ENOMEM);
2116
2117         OBDO_ALLOC(oa);
2118         if (oa == NULL)
2119                 GOTO(out, rc = -ENOMEM);
2120
2121         i = 0;
2122         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2123                 struct cl_page *page = oap2cl_page(oap);
2124                 if (clerq == NULL) {
2125                         clerq = cl_req_alloc(env, page, crt,
2126                                              1 /* only 1-object rpcs for now */);
2127                         if (IS_ERR(clerq))
2128                                 GOTO(out, rc = PTR_ERR(clerq));
2129                         lock = oap->oap_ldlm_lock;
2130                 }
2131                 if (mem_tight)
2132                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2133                 pga[i] = &oap->oap_brw_page;
2134                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2135                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2136                        pga[i]->pg, cfs_page_index(oap->oap_page), oap,
2137                        pga[i]->flag);
2138                 i++;
2139                 cl_req_page_add(env, clerq, page);
2140         }
2141
2142         /* always get the data for the obdo for the rpc */
2143         LASSERT(clerq != NULL);
2144         crattr->cra_oa = oa;
2145         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2146         if (lock) {
2147                 oa->o_handle = lock->l_remote_handle;
2148                 oa->o_valid |= OBD_MD_FLHANDLE;
2149         }
2150
2151         rc = cl_req_prep(env, clerq);
2152         if (rc != 0) {
2153                 CERROR("cl_req_prep failed: %d\n", rc);
2154                 GOTO(out, rc);
2155         }
2156
2157         sort_brw_pages(pga, page_count);
2158         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2159                         pga, &req, crattr->cra_capa, 1, 0);
2160         if (rc != 0) {
2161                 CERROR("prep_req failed: %d\n", rc);
2162                 GOTO(out, rc);
2163         }
2164
2165         req->rq_interpret_reply = brw_interpret;
2166         if (mem_tight != 0)
2167                 req->rq_memalloc = 1;
2168
2169         /* Need to update the timestamps after the request is built in case
2170          * we race with setattr (locally or in queue at OST).  If OST gets
2171          * later setattr before earlier BRW (as determined by the request xid),
2172          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2173          * way to do this in a single call.  bug 10150 */
2174         cl_req_attr_set(env, clerq, crattr,
2175                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2176
2177         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2178
2179         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2180         aa = ptlrpc_req_async_args(req);
2181         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2182         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2183         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2184         cfs_list_splice_init(ext_list, &aa->aa_exts);
2185         aa->aa_clerq = clerq;
2186
2187         /* queued sync pages can be torn down while the pages
2188          * were between the pending list and the rpc */
2189         tmp = NULL;
2190         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2191                 /* only one oap gets a request reference */
2192                 if (tmp == NULL)
2193                         tmp = oap;
2194                 if (oap->oap_interrupted && !req->rq_intr) {
2195                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2196                                         oap, req);
2197                         ptlrpc_mark_interrupted(req);
2198                 }
2199         }
2200         if (tmp != NULL)
2201                 tmp->oap_request = ptlrpc_request_addref(req);
2202
2203         client_obd_list_lock(&cli->cl_loi_list_lock);
2204         starting_offset >>= CFS_PAGE_SHIFT;
2205         if (cmd == OBD_BRW_READ) {
2206                 cli->cl_r_in_flight++;
2207                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2208                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2209                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2210                                       starting_offset + 1);
2211         } else {
2212                 cli->cl_w_in_flight++;
2213                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2214                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2215                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2216                                       starting_offset + 1);
2217         }
2218         client_obd_list_unlock(&cli->cl_loi_list_lock);
2219
2220         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2221                   page_count, aa, cli->cl_r_in_flight,
2222                   cli->cl_w_in_flight);
2223
2224         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2225          * see which CPU/NUMA node the majority of pages were allocated
2226          * on, and try to assign the async RPC to the CPU core
2227          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2228          *
2229          * But on the other hand, we expect that multiple ptlrpcd
2230          * threads and the initial write sponsor can run in parallel,
2231          * especially when data checksum is enabled, which is CPU-bound
2232          * operation and single ptlrpcd thread cannot process in time.
2233          * So more ptlrpcd threads sharing BRW load
2234          * (with PDL_POLICY_ROUND) seems better.
2235          */
2236         ptlrpcd_add_req(req, pol, -1);
2237         rc = 0;
2238         EXIT;
2239
2240 out:
2241         if (mem_tight != 0)
2242                 cfs_memory_pressure_restore(mpflag);
2243
2244         if (crattr != NULL) {
2245                 capa_put(crattr->cra_capa);
2246                 OBD_FREE(crattr, sizeof(*crattr));
2247         }
2248
2249         if (rc != 0) {
2250                 LASSERT(req == NULL);
2251
2252                 if (oa)
2253                         OBDO_FREE(oa);
2254                 if (pga)
2255                         OBD_FREE(pga, sizeof(*pga) * page_count);
2256                 /* this should happen rarely and is pretty bad, it makes the
2257                  * pending list not follow the dirty order */
2258                 while (!cfs_list_empty(ext_list)) {
2259                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2260                                              oe_link);
2261                         cfs_list_del_init(&ext->oe_link);
2262                         osc_extent_finish(env, ext, 0, rc);
2263                 }
2264                 if (clerq && !IS_ERR(clerq))
2265                         cl_req_completion(env, clerq, rc);
2266         }
2267         RETURN(rc);
2268 }
2269
2270 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2271                                         struct ldlm_enqueue_info *einfo)
2272 {
2273         void *data = einfo->ei_cbdata;
2274         int set = 0;
2275
2276         LASSERT(lock != NULL);
2277         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2278         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2279         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2280         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2281
2282         lock_res_and_lock(lock);
2283         spin_lock(&osc_ast_guard);
2284
2285         if (lock->l_ast_data == NULL)
2286                 lock->l_ast_data = data;
2287         if (lock->l_ast_data == data)
2288                 set = 1;
2289
2290         spin_unlock(&osc_ast_guard);
2291         unlock_res_and_lock(lock);
2292
2293         return set;
2294 }
2295
2296 static int osc_set_data_with_check(struct lustre_handle *lockh,
2297                                    struct ldlm_enqueue_info *einfo)
2298 {
2299         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2300         int set = 0;
2301
2302         if (lock != NULL) {
2303                 set = osc_set_lock_data_with_check(lock, einfo);
2304                 LDLM_LOCK_PUT(lock);
2305         } else
2306                 CERROR("lockh %p, data %p - client evicted?\n",
2307                        lockh, einfo->ei_cbdata);
2308         return set;
2309 }
2310
2311 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2312                              ldlm_iterator_t replace, void *data)
2313 {
2314         struct ldlm_res_id res_id;
2315         struct obd_device *obd = class_exp2obd(exp);
2316
2317         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2318         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2319         return 0;
2320 }
2321
2322 /* find any ldlm lock of the inode in osc
2323  * return 0    not find
2324  *        1    find one
2325  *      < 0    error */
2326 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2327                            ldlm_iterator_t replace, void *data)
2328 {
2329         struct ldlm_res_id res_id;
2330         struct obd_device *obd = class_exp2obd(exp);
2331         int rc = 0;
2332
2333         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2334         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2335         if (rc == LDLM_ITER_STOP)
2336                 return(1);
2337         if (rc == LDLM_ITER_CONTINUE)
2338                 return(0);
2339         return(rc);
2340 }
2341
2342 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2343                             obd_enqueue_update_f upcall, void *cookie,
2344                             __u64 *flags, int agl, int rc)
2345 {
2346         int intent = *flags & LDLM_FL_HAS_INTENT;
2347         ENTRY;
2348
2349         if (intent) {
2350                 /* The request was created before ldlm_cli_enqueue call. */
2351                 if (rc == ELDLM_LOCK_ABORTED) {
2352                         struct ldlm_reply *rep;
2353                         rep = req_capsule_server_get(&req->rq_pill,
2354                                                      &RMF_DLM_REP);
2355
2356                         LASSERT(rep != NULL);
2357                         if (rep->lock_policy_res1)
2358                                 rc = rep->lock_policy_res1;
2359                 }
2360         }
2361
2362         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2363             (rc == 0)) {
2364                 *flags |= LDLM_FL_LVB_READY;
2365                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2366                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2367         }
2368
2369         /* Call the update callback. */
2370         rc = (*upcall)(cookie, rc);
2371         RETURN(rc);
2372 }
2373
2374 static int osc_enqueue_interpret(const struct lu_env *env,
2375                                  struct ptlrpc_request *req,
2376                                  struct osc_enqueue_args *aa, int rc)
2377 {
2378         struct ldlm_lock *lock;
2379         struct lustre_handle handle;
2380         __u32 mode;
2381         struct ost_lvb *lvb;
2382         __u32 lvb_len;
2383         __u64 *flags = aa->oa_flags;
2384
2385         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2386          * might be freed anytime after lock upcall has been called. */
2387         lustre_handle_copy(&handle, aa->oa_lockh);
2388         mode = aa->oa_ei->ei_mode;
2389
2390         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2391          * be valid. */
2392         lock = ldlm_handle2lock(&handle);
2393
2394         /* Take an additional reference so that a blocking AST that
2395          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2396          * to arrive after an upcall has been executed by
2397          * osc_enqueue_fini(). */
2398         ldlm_lock_addref(&handle, mode);
2399
2400         /* Let CP AST to grant the lock first. */
2401         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2402
2403         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2404                 lvb = NULL;
2405                 lvb_len = 0;
2406         } else {
2407                 lvb = aa->oa_lvb;
2408                 lvb_len = sizeof(*aa->oa_lvb);
2409         }
2410
2411         /* Complete obtaining the lock procedure. */
2412         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2413                                    mode, flags, lvb, lvb_len, &handle, rc);
2414         /* Complete osc stuff. */
2415         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2416                               flags, aa->oa_agl, rc);
2417
2418         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2419
2420         /* Release the lock for async request. */
2421         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2422                 /*
2423                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2424                  * not already released by
2425                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2426                  */
2427                 ldlm_lock_decref(&handle, mode);
2428
2429         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2430                  aa->oa_lockh, req, aa);
2431         ldlm_lock_decref(&handle, mode);
2432         LDLM_LOCK_PUT(lock);
2433         return rc;
2434 }
2435
2436 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2437                         struct lov_oinfo *loi, int flags,
2438                         struct ost_lvb *lvb, __u32 mode, int rc)
2439 {
2440         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2441
2442         if (rc == ELDLM_OK) {
2443                 __u64 tmp;
2444
2445                 LASSERT(lock != NULL);
2446                 loi->loi_lvb = *lvb;
2447                 tmp = loi->loi_lvb.lvb_size;
2448                 /* Extend KMS up to the end of this lock and no further
2449                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2450                 if (tmp > lock->l_policy_data.l_extent.end)
2451                         tmp = lock->l_policy_data.l_extent.end + 1;
2452                 if (tmp >= loi->loi_kms) {
2453                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2454                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2455                         loi_kms_set(loi, tmp);
2456                 } else {
2457                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2458                                    LPU64"; leaving kms="LPU64", end="LPU64,
2459                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2460                                    lock->l_policy_data.l_extent.end);
2461                 }
2462                 ldlm_lock_allow_match(lock);
2463         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2464                 LASSERT(lock != NULL);
2465                 loi->loi_lvb = *lvb;
2466                 ldlm_lock_allow_match(lock);
2467                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2468                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2469                 rc = ELDLM_OK;
2470         }
2471
2472         if (lock != NULL) {
2473                 if (rc != ELDLM_OK)
2474                         ldlm_lock_fail_match(lock);
2475
2476                 LDLM_LOCK_PUT(lock);
2477         }
2478 }
2479 EXPORT_SYMBOL(osc_update_enqueue);
2480
2481 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2482
2483 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2484  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2485  * other synchronous requests, however keeping some locks and trying to obtain
2486  * others may take a considerable amount of time in a case of ost failure; and
2487  * when other sync requests do not get released lock from a client, the client
2488  * is excluded from the cluster -- such scenarious make the life difficult, so
2489  * release locks just after they are obtained. */
2490 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2491                      __u64 *flags, ldlm_policy_data_t *policy,
2492                      struct ost_lvb *lvb, int kms_valid,
2493                      obd_enqueue_update_f upcall, void *cookie,
2494                      struct ldlm_enqueue_info *einfo,
2495                      struct lustre_handle *lockh,
2496                      struct ptlrpc_request_set *rqset, int async, int agl)
2497 {
2498         struct obd_device *obd = exp->exp_obd;
2499         struct ptlrpc_request *req = NULL;
2500         int intent = *flags & LDLM_FL_HAS_INTENT;
2501         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2502         ldlm_mode_t mode;
2503         int rc;
2504         ENTRY;
2505
2506         /* Filesystem lock extents are extended to page boundaries so that
2507          * dealing with the page cache is a little smoother.  */
2508         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2509         policy->l_extent.end |= ~CFS_PAGE_MASK;
2510
2511         /*
2512          * kms is not valid when either object is completely fresh (so that no
2513          * locks are cached), or object was evicted. In the latter case cached
2514          * lock cannot be used, because it would prime inode state with
2515          * potentially stale LVB.
2516          */
2517         if (!kms_valid)
2518                 goto no_match;
2519
2520         /* Next, search for already existing extent locks that will cover us */
2521         /* If we're trying to read, we also search for an existing PW lock.  The
2522          * VFS and page cache already protect us locally, so lots of readers/
2523          * writers can share a single PW lock.
2524          *
2525          * There are problems with conversion deadlocks, so instead of
2526          * converting a read lock to a write lock, we'll just enqueue a new
2527          * one.
2528          *
2529          * At some point we should cancel the read lock instead of making them
2530          * send us a blocking callback, but there are problems with canceling
2531          * locks out from other users right now, too. */
2532         mode = einfo->ei_mode;
2533         if (einfo->ei_mode == LCK_PR)
2534                 mode |= LCK_PW;
2535         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2536                                einfo->ei_type, policy, mode, lockh, 0);
2537         if (mode) {
2538                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2539
2540                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2541                         /* For AGL, if enqueue RPC is sent but the lock is not
2542                          * granted, then skip to process this strpe.
2543                          * Return -ECANCELED to tell the caller. */
2544                         ldlm_lock_decref(lockh, mode);
2545                         LDLM_LOCK_PUT(matched);
2546                         RETURN(-ECANCELED);
2547                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2548                         *flags |= LDLM_FL_LVB_READY;
2549                         /* addref the lock only if not async requests and PW
2550                          * lock is matched whereas we asked for PR. */
2551                         if (!rqset && einfo->ei_mode != mode)
2552                                 ldlm_lock_addref(lockh, LCK_PR);
2553                         if (intent) {
2554                                 /* I would like to be able to ASSERT here that
2555                                  * rss <= kms, but I can't, for reasons which
2556                                  * are explained in lov_enqueue() */
2557                         }
2558
2559                         /* We already have a lock, and it's referenced.
2560                          *
2561                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2562                          * AGL upcall may change it to CLS_HELD directly. */
2563                         (*upcall)(cookie, ELDLM_OK);
2564
2565                         if (einfo->ei_mode != mode)
2566                                 ldlm_lock_decref(lockh, LCK_PW);
2567                         else if (rqset)
2568                                 /* For async requests, decref the lock. */
2569                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2570                         LDLM_LOCK_PUT(matched);
2571                         RETURN(ELDLM_OK);
2572                 } else {
2573                         ldlm_lock_decref(lockh, mode);
2574                         LDLM_LOCK_PUT(matched);
2575                 }
2576         }
2577
2578  no_match:
2579         if (intent) {
2580                 CFS_LIST_HEAD(cancels);
2581                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2582                                            &RQF_LDLM_ENQUEUE_LVB);
2583                 if (req == NULL)
2584                         RETURN(-ENOMEM);
2585
2586                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2587                 if (rc) {
2588                         ptlrpc_request_free(req);
2589                         RETURN(rc);
2590                 }
2591
2592                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2593                                      sizeof *lvb);
2594                 ptlrpc_request_set_replen(req);
2595         }
2596
2597         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2598         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2599
2600         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2601                               sizeof(*lvb), LVB_T_OST, lockh, async);
2602         if (rqset) {
2603                 if (!rc) {
2604                         struct osc_enqueue_args *aa;
2605                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2606                         aa = ptlrpc_req_async_args(req);
2607                         aa->oa_ei = einfo;
2608                         aa->oa_exp = exp;
2609                         aa->oa_flags  = flags;
2610                         aa->oa_upcall = upcall;
2611                         aa->oa_cookie = cookie;
2612                         aa->oa_lvb    = lvb;
2613                         aa->oa_lockh  = lockh;
2614                         aa->oa_agl    = !!agl;
2615
2616                         req->rq_interpret_reply =
2617                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2618                         if (rqset == PTLRPCD_SET)
2619                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2620                         else
2621                                 ptlrpc_set_add_req(rqset, req);
2622                 } else if (intent) {
2623                         ptlrpc_req_finished(req);
2624                 }
2625                 RETURN(rc);
2626         }
2627
2628         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2629         if (intent)
2630                 ptlrpc_req_finished(req);
2631
2632         RETURN(rc);
2633 }
2634
2635 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2636                        struct ldlm_enqueue_info *einfo,
2637                        struct ptlrpc_request_set *rqset)
2638 {
2639         struct ldlm_res_id res_id;
2640         int rc;
2641         ENTRY;
2642
2643         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2644         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2645                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2646                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2647                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2648                               rqset, rqset != NULL, 0);
2649         RETURN(rc);
2650 }
2651
2652 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2653                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2654                    int *flags, void *data, struct lustre_handle *lockh,
2655                    int unref)
2656 {
2657         struct obd_device *obd = exp->exp_obd;
2658         int lflags = *flags;
2659         ldlm_mode_t rc;
2660         ENTRY;
2661
2662         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2663                 RETURN(-EIO);
2664
2665         /* Filesystem lock extents are extended to page boundaries so that
2666          * dealing with the page cache is a little smoother */
2667         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2668         policy->l_extent.end |= ~CFS_PAGE_MASK;
2669
2670         /* Next, search for already existing extent locks that will cover us */
2671         /* If we're trying to read, we also search for an existing PW lock.  The
2672          * VFS and page cache already protect us locally, so lots of readers/
2673          * writers can share a single PW lock. */
2674         rc = mode;
2675         if (mode == LCK_PR)
2676                 rc |= LCK_PW;
2677         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2678                              res_id, type, policy, rc, lockh, unref);
2679         if (rc) {
2680                 if (data != NULL) {
2681                         if (!osc_set_data_with_check(lockh, data)) {
2682                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2683                                         ldlm_lock_decref(lockh, rc);
2684                                 RETURN(0);
2685                         }
2686                 }
2687                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2688                         ldlm_lock_addref(lockh, LCK_PR);
2689                         ldlm_lock_decref(lockh, LCK_PW);
2690                 }
2691                 RETURN(rc);
2692         }
2693         RETURN(rc);
2694 }
2695
2696 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2697 {
2698         ENTRY;
2699
2700         if (unlikely(mode == LCK_GROUP))
2701                 ldlm_lock_decref_and_cancel(lockh, mode);
2702         else
2703                 ldlm_lock_decref(lockh, mode);
2704
2705         RETURN(0);
2706 }
2707
2708 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2709                       __u32 mode, struct lustre_handle *lockh)
2710 {
2711         ENTRY;
2712         RETURN(osc_cancel_base(lockh, mode));
2713 }
2714
2715 static int osc_cancel_unused(struct obd_export *exp,
2716                              struct lov_stripe_md *lsm,
2717                              ldlm_cancel_flags_t flags,
2718                              void *opaque)
2719 {
2720         struct obd_device *obd = class_exp2obd(exp);
2721         struct ldlm_res_id res_id, *resp = NULL;
2722
2723         if (lsm != NULL) {
2724                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2725                 resp = &res_id;
2726         }
2727
2728         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2729 }
2730
2731 static int osc_statfs_interpret(const struct lu_env *env,
2732                                 struct ptlrpc_request *req,
2733                                 struct osc_async_args *aa, int rc)
2734 {
2735         struct obd_statfs *msfs;
2736         ENTRY;
2737
2738         if (rc == -EBADR)
2739                 /* The request has in fact never been sent
2740                  * due to issues at a higher level (LOV).
2741                  * Exit immediately since the caller is
2742                  * aware of the problem and takes care
2743                  * of the clean up */
2744                  RETURN(rc);
2745
2746         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2747             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2748                 GOTO(out, rc = 0);
2749
2750         if (rc != 0)
2751                 GOTO(out, rc);
2752
2753         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2754         if (msfs == NULL) {
2755                 GOTO(out, rc = -EPROTO);
2756         }
2757
2758         *aa->aa_oi->oi_osfs = *msfs;
2759 out:
2760         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2761         RETURN(rc);
2762 }
2763
2764 static int osc_statfs_async(struct obd_export *exp,
2765                             struct obd_info *oinfo, __u64 max_age,
2766                             struct ptlrpc_request_set *rqset)
2767 {
2768         struct obd_device     *obd = class_exp2obd(exp);
2769         struct ptlrpc_request *req;
2770         struct osc_async_args *aa;
2771         int                    rc;
2772         ENTRY;
2773
2774         /* We could possibly pass max_age in the request (as an absolute
2775          * timestamp or a "seconds.usec ago") so the target can avoid doing
2776          * extra calls into the filesystem if that isn't necessary (e.g.
2777          * during mount that would help a bit).  Having relative timestamps
2778          * is not so great if request processing is slow, while absolute
2779          * timestamps are not ideal because they need time synchronization. */
2780         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2781         if (req == NULL)
2782                 RETURN(-ENOMEM);
2783
2784         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2785         if (rc) {
2786                 ptlrpc_request_free(req);
2787                 RETURN(rc);
2788         }
2789         ptlrpc_request_set_replen(req);
2790         req->rq_request_portal = OST_CREATE_PORTAL;
2791         ptlrpc_at_set_req_timeout(req);
2792
2793         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2794                 /* procfs requests not want stat in wait for avoid deadlock */
2795                 req->rq_no_resend = 1;
2796                 req->rq_no_delay = 1;
2797         }
2798
2799         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2800         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2801         aa = ptlrpc_req_async_args(req);
2802         aa->aa_oi = oinfo;
2803
2804         ptlrpc_set_add_req(rqset, req);
2805         RETURN(0);
2806 }
2807
2808 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2809                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2810 {
2811         struct obd_device     *obd = class_exp2obd(exp);
2812         struct obd_statfs     *msfs;
2813         struct ptlrpc_request *req;
2814         struct obd_import     *imp = NULL;
2815         int rc;
2816         ENTRY;
2817
2818         /*Since the request might also come from lprocfs, so we need
2819          *sync this with client_disconnect_export Bug15684*/
2820         down_read(&obd->u.cli.cl_sem);
2821         if (obd->u.cli.cl_import)
2822                 imp = class_import_get(obd->u.cli.cl_import);
2823         up_read(&obd->u.cli.cl_sem);
2824         if (!imp)
2825                 RETURN(-ENODEV);
2826
2827         /* We could possibly pass max_age in the request (as an absolute
2828          * timestamp or a "seconds.usec ago") so the target can avoid doing
2829          * extra calls into the filesystem if that isn't necessary (e.g.
2830          * during mount that would help a bit).  Having relative timestamps
2831          * is not so great if request processing is slow, while absolute
2832          * timestamps are not ideal because they need time synchronization. */
2833         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2834
2835         class_import_put(imp);
2836
2837         if (req == NULL)
2838                 RETURN(-ENOMEM);
2839
2840         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2841         if (rc) {
2842                 ptlrpc_request_free(req);
2843                 RETURN(rc);
2844         }
2845         ptlrpc_request_set_replen(req);
2846         req->rq_request_portal = OST_CREATE_PORTAL;
2847         ptlrpc_at_set_req_timeout(req);
2848
2849         if (flags & OBD_STATFS_NODELAY) {
2850                 /* procfs requests not want stat in wait for avoid deadlock */
2851                 req->rq_no_resend = 1;
2852                 req->rq_no_delay = 1;
2853         }
2854
2855         rc = ptlrpc_queue_wait(req);
2856         if (rc)
2857                 GOTO(out, rc);
2858
2859         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2860         if (msfs == NULL) {
2861                 GOTO(out, rc = -EPROTO);
2862         }
2863
2864         *osfs = *msfs;
2865
2866         EXIT;
2867  out:
2868         ptlrpc_req_finished(req);
2869         return rc;
2870 }
2871
2872 /* Retrieve object striping information.
2873  *
2874  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2875  * the maximum number of OST indices which will fit in the user buffer.
2876  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2877  */
2878 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2879 {
2880         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2881         struct lov_user_md_v3 lum, *lumk;
2882         struct lov_user_ost_data_v1 *lmm_objects;
2883         int rc = 0, lum_size;
2884         ENTRY;
2885
2886         if (!lsm)
2887                 RETURN(-ENODATA);
2888
2889         /* we only need the header part from user space to get lmm_magic and
2890          * lmm_stripe_count, (the header part is common to v1 and v3) */
2891         lum_size = sizeof(struct lov_user_md_v1);
2892         if (cfs_copy_from_user(&lum, lump, lum_size))
2893                 RETURN(-EFAULT);
2894
2895         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2896             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2897                 RETURN(-EINVAL);
2898
2899         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2900         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2901         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2902         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2903
2904         /* we can use lov_mds_md_size() to compute lum_size
2905          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2906         if (lum.lmm_stripe_count > 0) {
2907                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2908                 OBD_ALLOC(lumk, lum_size);
2909                 if (!lumk)
2910                         RETURN(-ENOMEM);
2911
2912                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2913                         lmm_objects =
2914                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2915                 else
2916                         lmm_objects = &(lumk->lmm_objects[0]);
2917                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2918         } else {
2919                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2920                 lumk = &lum;
2921         }
2922
2923         lumk->lmm_oi = lsm->lsm_oi;
2924         lumk->lmm_stripe_count = 1;
2925
2926         if (cfs_copy_to_user(lump, lumk, lum_size))
2927                 rc = -EFAULT;
2928
2929         if (lumk != &lum)
2930                 OBD_FREE(lumk, lum_size);
2931
2932         RETURN(rc);
2933 }
2934
2935
2936 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2937                          void *karg, void *uarg)
2938 {
2939         struct obd_device *obd = exp->exp_obd;
2940         struct obd_ioctl_data *data = karg;
2941         int err = 0;
2942         ENTRY;
2943
2944         if (!cfs_try_module_get(THIS_MODULE)) {
2945                 CERROR("Can't get module. Is it alive?");
2946                 return -EINVAL;
2947         }
2948         switch (cmd) {
2949         case OBD_IOC_LOV_GET_CONFIG: {
2950                 char *buf;
2951                 struct lov_desc *desc;
2952                 struct obd_uuid uuid;
2953
2954                 buf = NULL;
2955                 len = 0;
2956                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2957                         GOTO(out, err = -EINVAL);
2958
2959                 data = (struct obd_ioctl_data *)buf;
2960
2961                 if (sizeof(*desc) > data->ioc_inllen1) {
2962                         obd_ioctl_freedata(buf, len);
2963                         GOTO(out, err = -EINVAL);
2964                 }
2965
2966                 if (data->ioc_inllen2 < sizeof(uuid)) {
2967                         obd_ioctl_freedata(buf, len);
2968                         GOTO(out, err = -EINVAL);
2969                 }
2970
2971                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2972                 desc->ld_tgt_count = 1;
2973                 desc->ld_active_tgt_count = 1;
2974                 desc->ld_default_stripe_count = 1;
2975                 desc->ld_default_stripe_size = 0;
2976                 desc->ld_default_stripe_offset = 0;
2977                 desc->ld_pattern = 0;
2978                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2979
2980                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2981
2982                 err = cfs_copy_to_user((void *)uarg, buf, len);
2983                 if (err)
2984                         err = -EFAULT;
2985                 obd_ioctl_freedata(buf, len);
2986                 GOTO(out, err);
2987         }
2988         case LL_IOC_LOV_SETSTRIPE:
2989                 err = obd_alloc_memmd(exp, karg);
2990                 if (err > 0)
2991                         err = 0;
2992                 GOTO(out, err);
2993         case LL_IOC_LOV_GETSTRIPE:
2994                 err = osc_getstripe(karg, uarg);
2995                 GOTO(out, err);
2996         case OBD_IOC_CLIENT_RECOVER:
2997                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2998                                             data->ioc_inlbuf1, 0);
2999                 if (err > 0)
3000                         err = 0;
3001                 GOTO(out, err);
3002         case IOC_OSC_SET_ACTIVE:
3003                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3004                                                data->ioc_offset);
3005                 GOTO(out, err);
3006         case OBD_IOC_POLL_QUOTACHECK:
3007                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3008                 GOTO(out, err);
3009         case OBD_IOC_PING_TARGET:
3010                 err = ptlrpc_obd_ping(obd);
3011                 GOTO(out, err);
3012         default:
3013                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3014                        cmd, cfs_curproc_comm());
3015                 GOTO(out, err = -ENOTTY);
3016         }
3017 out:
3018         cfs_module_put(THIS_MODULE);
3019         return err;
3020 }
3021
3022 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3023                         obd_count keylen, void *key, __u32 *vallen, void *val,
3024                         struct lov_stripe_md *lsm)
3025 {
3026         ENTRY;
3027         if (!vallen || !val)
3028                 RETURN(-EFAULT);
3029
3030         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3031                 __u32 *stripe = val;
3032                 *vallen = sizeof(*stripe);
3033                 *stripe = 0;
3034                 RETURN(0);
3035         } else if (KEY_IS(KEY_LAST_ID)) {
3036                 struct ptlrpc_request *req;
3037                 obd_id                *reply;
3038                 char                  *tmp;
3039                 int                    rc;
3040
3041                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3042                                            &RQF_OST_GET_INFO_LAST_ID);
3043                 if (req == NULL)
3044                         RETURN(-ENOMEM);
3045
3046                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3047                                      RCL_CLIENT, keylen);
3048                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3049                 if (rc) {
3050                         ptlrpc_request_free(req);
3051                         RETURN(rc);
3052                 }
3053
3054                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3055                 memcpy(tmp, key, keylen);
3056
3057                 req->rq_no_delay = req->rq_no_resend = 1;
3058                 ptlrpc_request_set_replen(req);
3059                 rc = ptlrpc_queue_wait(req);
3060                 if (rc)
3061                         GOTO(out, rc);
3062
3063                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3064                 if (reply == NULL)
3065                         GOTO(out, rc = -EPROTO);
3066
3067                 *((obd_id *)val) = *reply;
3068         out:
3069                 ptlrpc_req_finished(req);
3070                 RETURN(rc);
3071         } else if (KEY_IS(KEY_FIEMAP)) {
3072                 struct ll_fiemap_info_key *fm_key =
3073                                 (struct ll_fiemap_info_key *)key;
3074                 struct ldlm_res_id       res_id;
3075                 ldlm_policy_data_t       policy;
3076                 struct lustre_handle     lockh;
3077                 ldlm_mode_t              mode = 0;
3078                 struct ptlrpc_request   *req;
3079                 struct ll_user_fiemap   *reply;
3080                 char                    *tmp;
3081                 int                      rc;
3082
3083                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3084                         goto skip_locking;
3085
3086                 policy.l_extent.start = fm_key->fiemap.fm_start &
3087                                                 CFS_PAGE_MASK;
3088
3089                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3090                     fm_key->fiemap.fm_start + CFS_PAGE_SIZE - 1)
3091                         policy.l_extent.end = OBD_OBJECT_EOF;
3092                 else
3093                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3094                                 fm_key->fiemap.fm_length +
3095                                 CFS_PAGE_SIZE - 1) & CFS_PAGE_MASK;
3096
3097                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3098                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3099                                        LDLM_FL_BLOCK_GRANTED |
3100                                        LDLM_FL_LVB_READY,
3101                                        &res_id, LDLM_EXTENT, &policy,
3102                                        LCK_PR | LCK_PW, &lockh, 0);
3103                 if (mode) { /* lock is cached on client */
3104                         if (mode != LCK_PR) {
3105                                 ldlm_lock_addref(&lockh, LCK_PR);
3106                                 ldlm_lock_decref(&lockh, LCK_PW);
3107                         }
3108                 } else { /* no cached lock, needs acquire lock on server side */
3109                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3110                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3111                 }
3112
3113 skip_locking:
3114                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3115                                            &RQF_OST_GET_INFO_FIEMAP);
3116                 if (req == NULL)
3117                         GOTO(drop_lock, rc = -ENOMEM);
3118
3119                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3120                                      RCL_CLIENT, keylen);
3121                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3122                                      RCL_CLIENT, *vallen);
3123                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3124                                      RCL_SERVER, *vallen);
3125
3126                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3127                 if (rc) {
3128                         ptlrpc_request_free(req);
3129                         GOTO(drop_lock, rc);
3130                 }
3131
3132                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3133                 memcpy(tmp, key, keylen);
3134                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3135                 memcpy(tmp, val, *vallen);
3136
3137                 ptlrpc_request_set_replen(req);
3138                 rc = ptlrpc_queue_wait(req);
3139                 if (rc)
3140                         GOTO(fini_req, rc);
3141
3142                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3143                 if (reply == NULL)
3144                         GOTO(fini_req, rc = -EPROTO);
3145
3146                 memcpy(val, reply, *vallen);
3147 fini_req:
3148                 ptlrpc_req_finished(req);
3149 drop_lock:
3150                 if (mode)
3151                         ldlm_lock_decref(&lockh, LCK_PR);
3152                 RETURN(rc);
3153         }
3154
3155         RETURN(-EINVAL);
3156 }
3157
3158 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3159                               obd_count keylen, void *key, obd_count vallen,
3160                               void *val, struct ptlrpc_request_set *set)
3161 {
3162         struct ptlrpc_request *req;
3163         struct obd_device     *obd = exp->exp_obd;
3164         struct obd_import     *imp = class_exp2cliimp(exp);
3165         char                  *tmp;
3166         int                    rc;
3167         ENTRY;
3168
3169         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3170
3171         if (KEY_IS(KEY_CHECKSUM)) {
3172                 if (vallen != sizeof(int))
3173                         RETURN(-EINVAL);
3174                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3175                 RETURN(0);
3176         }
3177
3178         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3179                 sptlrpc_conf_client_adapt(obd);
3180                 RETURN(0);
3181         }
3182
3183         if (KEY_IS(KEY_FLUSH_CTX)) {
3184                 sptlrpc_import_flush_my_ctx(imp);
3185                 RETURN(0);
3186         }
3187
3188         if (KEY_IS(KEY_CACHE_SET)) {
3189                 struct client_obd *cli = &obd->u.cli;
3190
3191                 LASSERT(cli->cl_cache == NULL); /* only once */
3192                 cli->cl_cache = (struct cl_client_cache *)val;
3193                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3194                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3195
3196                 /* add this osc into entity list */
3197                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3198                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3199                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3200                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3201
3202                 RETURN(0);
3203         }
3204
3205         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3206                 struct client_obd *cli = &obd->u.cli;
3207                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3208                 int target = *(int *)val;
3209
3210                 nr = osc_lru_shrink(cli, min(nr, target));
3211                 *(int *)val -= nr;
3212                 RETURN(0);
3213         }
3214
3215         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3216                 RETURN(-EINVAL);
3217
3218         /* We pass all other commands directly to OST. Since nobody calls osc
3219            methods directly and everybody is supposed to go through LOV, we
3220            assume lov checked invalid values for us.
3221            The only recognised values so far are evict_by_nid and mds_conn.
3222            Even if something bad goes through, we'd get a -EINVAL from OST
3223            anyway. */
3224
3225         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3226                                                 &RQF_OST_SET_GRANT_INFO :
3227                                                 &RQF_OBD_SET_INFO);
3228         if (req == NULL)
3229                 RETURN(-ENOMEM);
3230
3231         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3232                              RCL_CLIENT, keylen);
3233         if (!KEY_IS(KEY_GRANT_SHRINK))
3234                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3235                                      RCL_CLIENT, vallen);
3236         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3237         if (rc) {
3238                 ptlrpc_request_free(req);
3239                 RETURN(rc);
3240         }
3241
3242         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3243         memcpy(tmp, key, keylen);
3244         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3245                                                         &RMF_OST_BODY :
3246                                                         &RMF_SETINFO_VAL);
3247         memcpy(tmp, val, vallen);
3248
3249         if (KEY_IS(KEY_GRANT_SHRINK)) {
3250                 struct osc_grant_args *aa;
3251                 struct obdo *oa;
3252
3253                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3254                 aa = ptlrpc_req_async_args(req);
3255                 OBDO_ALLOC(oa);
3256                 if (!oa) {
3257                         ptlrpc_req_finished(req);
3258                         RETURN(-ENOMEM);
3259                 }
3260                 *oa = ((struct ost_body *)val)->oa;
3261                 aa->aa_oa = oa;
3262                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3263         }
3264
3265         ptlrpc_request_set_replen(req);
3266         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3267                 LASSERT(set != NULL);
3268                 ptlrpc_set_add_req(set, req);
3269                 ptlrpc_check_set(NULL, set);
3270         } else
3271                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3272
3273         RETURN(0);
3274 }
3275
3276
3277 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3278                          struct obd_device *disk_obd, int *index)
3279 {
3280         /* this code is not supposed to be used with LOD/OSP
3281          * to be removed soon */
3282         LBUG();
3283         return 0;
3284 }
3285
3286 static int osc_llog_finish(struct obd_device *obd, int count)
3287 {
3288         struct llog_ctxt *ctxt;
3289
3290         ENTRY;
3291
3292         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3293         if (ctxt) {
3294                 llog_cat_close(NULL, ctxt->loc_handle);
3295                 llog_cleanup(NULL, ctxt);
3296         }
3297
3298         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3299         if (ctxt)
3300                 llog_cleanup(NULL, ctxt);
3301         RETURN(0);
3302 }
3303
3304 static int osc_reconnect(const struct lu_env *env,
3305                          struct obd_export *exp, struct obd_device *obd,
3306                          struct obd_uuid *cluuid,
3307                          struct obd_connect_data *data,
3308                          void *localdata)
3309 {
3310         struct client_obd *cli = &obd->u.cli;
3311
3312         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3313                 long lost_grant;
3314
3315                 client_obd_list_lock(&cli->cl_loi_list_lock);
3316                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3317                                 2 * cli_brw_size(obd);
3318                 lost_grant = cli->cl_lost_grant;
3319                 cli->cl_lost_grant = 0;
3320                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3321
3322                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3323                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3324                        data->ocd_version, data->ocd_grant, lost_grant);
3325         }
3326
3327         RETURN(0);
3328 }
3329
3330 static int osc_disconnect(struct obd_export *exp)
3331 {
3332         struct obd_device *obd = class_exp2obd(exp);
3333         struct llog_ctxt  *ctxt;
3334         int rc;
3335
3336         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3337         if (ctxt) {
3338                 if (obd->u.cli.cl_conn_count == 1) {
3339                         /* Flush any remaining cancel messages out to the
3340                          * target */
3341                         llog_sync(ctxt, exp, 0);
3342                 }
3343                 llog_ctxt_put(ctxt);
3344         } else {
3345                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3346                        obd);
3347         }
3348
3349         rc = client_disconnect_export(exp);
3350         /**
3351          * Initially we put del_shrink_grant before disconnect_export, but it
3352          * causes the following problem if setup (connect) and cleanup
3353          * (disconnect) are tangled together.
3354          *      connect p1                     disconnect p2
3355          *   ptlrpc_connect_import
3356          *     ...............               class_manual_cleanup
3357          *                                     osc_disconnect
3358          *                                     del_shrink_grant
3359          *   ptlrpc_connect_interrupt
3360          *     init_grant_shrink
3361          *   add this client to shrink list
3362          *                                      cleanup_osc
3363          * Bang! pinger trigger the shrink.
3364          * So the osc should be disconnected from the shrink list, after we
3365          * are sure the import has been destroyed. BUG18662
3366          */
3367         if (obd->u.cli.cl_import == NULL)
3368                 osc_del_shrink_grant(&obd->u.cli);
3369         return rc;
3370 }
3371
3372 static int osc_import_event(struct obd_device *obd,
3373                             struct obd_import *imp,
3374                             enum obd_import_event event)
3375 {
3376         struct client_obd *cli;
3377         int rc = 0;
3378
3379         ENTRY;
3380         LASSERT(imp->imp_obd == obd);
3381
3382         switch (event) {
3383         case IMP_EVENT_DISCON: {
3384                 cli = &obd->u.cli;
3385                 client_obd_list_lock(&cli->cl_loi_list_lock);
3386                 cli->cl_avail_grant = 0;
3387                 cli->cl_lost_grant = 0;
3388                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3389                 break;
3390         }
3391         case IMP_EVENT_INACTIVE: {
3392                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3393                 break;
3394         }
3395         case IMP_EVENT_INVALIDATE: {
3396                 struct ldlm_namespace *ns = obd->obd_namespace;
3397                 struct lu_env         *env;
3398                 int                    refcheck;
3399
3400                 env = cl_env_get(&refcheck);
3401                 if (!IS_ERR(env)) {
3402                         /* Reset grants */
3403                         cli = &obd->u.cli;
3404                         /* all pages go to failing rpcs due to the invalid
3405                          * import */
3406                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3407
3408                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3409                         cl_env_put(env, &refcheck);
3410                 } else
3411                         rc = PTR_ERR(env);
3412                 break;
3413         }
3414         case IMP_EVENT_ACTIVE: {
3415                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3416                 break;
3417         }
3418         case IMP_EVENT_OCD: {
3419                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3420
3421                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3422                         osc_init_grant(&obd->u.cli, ocd);
3423
3424                 /* See bug 7198 */
3425                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3426                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3427
3428                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3429                 break;
3430         }
3431         case IMP_EVENT_DEACTIVATE: {
3432                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3433                 break;
3434         }
3435         case IMP_EVENT_ACTIVATE: {
3436                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3437                 break;
3438         }
3439         default:
3440                 CERROR("Unknown import event %d\n", event);
3441                 LBUG();
3442         }
3443         RETURN(rc);
3444 }
3445
3446 /**
3447  * Determine whether the lock can be canceled before replaying the lock
3448  * during recovery, see bug16774 for detailed information.
3449  *
3450  * \retval zero the lock can't be canceled
3451  * \retval other ok to cancel
3452  */
3453 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3454 {
3455         check_res_locked(lock->l_resource);
3456
3457         /*
3458          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3459          *
3460          * XXX as a future improvement, we can also cancel unused write lock
3461          * if it doesn't have dirty data and active mmaps.
3462          */
3463         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3464             (lock->l_granted_mode == LCK_PR ||
3465              lock->l_granted_mode == LCK_CR) &&
3466             (osc_dlm_lock_pageref(lock) == 0))
3467                 RETURN(1);
3468
3469         RETURN(0);
3470 }
3471
3472 static int brw_queue_work(const struct lu_env *env, void *data)
3473 {
3474         struct client_obd *cli = data;
3475
3476         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3477
3478         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3479         RETURN(0);
3480 }
3481
3482 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3483 {
3484         struct lprocfs_static_vars lvars = { 0 };
3485         struct client_obd          *cli = &obd->u.cli;
3486         void                       *handler;
3487         int                        rc;
3488         ENTRY;
3489
3490         rc = ptlrpcd_addref();
3491         if (rc)
3492                 RETURN(rc);
3493
3494         rc = client_obd_setup(obd, lcfg);
3495         if (rc)
3496                 GOTO(out_ptlrpcd, rc);
3497
3498         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3499         if (IS_ERR(handler))
3500                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3501         cli->cl_writeback_work = handler;
3502
3503         rc = osc_quota_setup(obd);
3504         if (rc)
3505                 GOTO(out_ptlrpcd_work, rc);
3506
3507         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3508         lprocfs_osc_init_vars(&lvars);
3509         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3510                 lproc_osc_attach_seqstat(obd);
3511                 sptlrpc_lprocfs_cliobd_attach(obd);
3512                 ptlrpc_lprocfs_register_obd(obd);
3513         }
3514
3515         /* We need to allocate a few requests more, because
3516          * brw_interpret tries to create new requests before freeing
3517          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3518          * reserved, but I'm afraid that might be too much wasted RAM
3519          * in fact, so 2 is just my guess and still should work. */
3520         cli->cl_import->imp_rq_pool =
3521                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3522                                     OST_MAXREQSIZE,
3523                                     ptlrpc_add_rqs_to_pool);
3524
3525         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3526         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3527         RETURN(rc);
3528
3529 out_ptlrpcd_work:
3530         ptlrpcd_destroy_work(handler);
3531 out_client_setup:
3532         client_obd_cleanup(obd);
3533 out_ptlrpcd:
3534         ptlrpcd_decref();
3535         RETURN(rc);
3536 }
3537
3538 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3539 {
3540         int rc = 0;
3541         ENTRY;
3542
3543         switch (stage) {
3544         case OBD_CLEANUP_EARLY: {
3545                 struct obd_import *imp;
3546                 imp = obd->u.cli.cl_import;
3547                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3548                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3549                 ptlrpc_deactivate_import(imp);
3550                 spin_lock(&imp->imp_lock);
3551                 imp->imp_pingable = 0;
3552                 spin_unlock(&imp->imp_lock);
3553                 break;
3554         }
3555         case OBD_CLEANUP_EXPORTS: {
3556                 struct client_obd *cli = &obd->u.cli;
3557                 /* LU-464
3558                  * for echo client, export may be on zombie list, wait for
3559                  * zombie thread to cull it, because cli.cl_import will be
3560                  * cleared in client_disconnect_export():
3561                  *   class_export_destroy() -> obd_cleanup() ->
3562                  *   echo_device_free() -> echo_client_cleanup() ->
3563                  *   obd_disconnect() -> osc_disconnect() ->
3564                  *   client_disconnect_export()
3565                  */
3566                 obd_zombie_barrier();
3567                 if (cli->cl_writeback_work) {
3568                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3569                         cli->cl_writeback_work = NULL;
3570                 }
3571                 obd_cleanup_client_import(obd);
3572                 ptlrpc_lprocfs_unregister_obd(obd);
3573                 lprocfs_obd_cleanup(obd);
3574                 rc = obd_llog_finish(obd, 0);
3575                 if (rc != 0)
3576                         CERROR("failed to cleanup llogging subsystems\n");
3577                 break;
3578                 }
3579         }
3580         RETURN(rc);
3581 }
3582
3583 int osc_cleanup(struct obd_device *obd)
3584 {
3585         struct client_obd *cli = &obd->u.cli;
3586         int rc;
3587
3588         ENTRY;
3589
3590         /* lru cleanup */
3591         if (cli->cl_cache != NULL) {
3592                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3593                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3594                 cfs_list_del_init(&cli->cl_lru_osc);
3595                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3596                 cli->cl_lru_left = NULL;
3597                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3598                 cli->cl_cache = NULL;
3599         }
3600
3601         /* free memory of osc quota cache */
3602         osc_quota_cleanup(obd);
3603
3604         rc = client_obd_cleanup(obd);
3605
3606         ptlrpcd_decref();
3607         RETURN(rc);
3608 }
3609
3610 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3611 {
3612         struct lprocfs_static_vars lvars = { 0 };
3613         int rc = 0;
3614
3615         lprocfs_osc_init_vars(&lvars);
3616
3617         switch (lcfg->lcfg_command) {
3618         default:
3619                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3620                                               lcfg, obd);
3621                 if (rc > 0)
3622                         rc = 0;
3623                 break;
3624         }
3625
3626         return(rc);
3627 }
3628
3629 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3630 {
3631         return osc_process_config_base(obd, buf);
3632 }
3633
3634 struct obd_ops osc_obd_ops = {
3635         .o_owner                = THIS_MODULE,
3636         .o_setup                = osc_setup,
3637         .o_precleanup           = osc_precleanup,
3638         .o_cleanup              = osc_cleanup,
3639         .o_add_conn             = client_import_add_conn,
3640         .o_del_conn             = client_import_del_conn,
3641         .o_connect              = client_connect_import,
3642         .o_reconnect            = osc_reconnect,
3643         .o_disconnect           = osc_disconnect,
3644         .o_statfs               = osc_statfs,
3645         .o_statfs_async         = osc_statfs_async,
3646         .o_packmd               = osc_packmd,
3647         .o_unpackmd             = osc_unpackmd,
3648         .o_create               = osc_create,
3649         .o_destroy              = osc_destroy,
3650         .o_getattr              = osc_getattr,
3651         .o_getattr_async        = osc_getattr_async,
3652         .o_setattr              = osc_setattr,
3653         .o_setattr_async        = osc_setattr_async,
3654         .o_brw                  = osc_brw,
3655         .o_punch                = osc_punch,
3656         .o_sync                 = osc_sync,
3657         .o_enqueue              = osc_enqueue,
3658         .o_change_cbdata        = osc_change_cbdata,
3659         .o_find_cbdata          = osc_find_cbdata,
3660         .o_cancel               = osc_cancel,
3661         .o_cancel_unused        = osc_cancel_unused,
3662         .o_iocontrol            = osc_iocontrol,
3663         .o_get_info             = osc_get_info,
3664         .o_set_info_async       = osc_set_info_async,
3665         .o_import_event         = osc_import_event,
3666         .o_llog_init            = osc_llog_init,
3667         .o_llog_finish          = osc_llog_finish,
3668         .o_process_config       = osc_process_config,
3669         .o_quotactl             = osc_quotactl,
3670         .o_quotacheck           = osc_quotacheck,
3671 };
3672
3673 extern struct lu_kmem_descr osc_caches[];
3674 extern spinlock_t osc_ast_guard;
3675 extern struct lock_class_key osc_ast_guard_class;
3676
3677 int __init osc_init(void)
3678 {
3679         struct lprocfs_static_vars lvars = { 0 };
3680         int rc;
3681         ENTRY;
3682
3683         /* print an address of _any_ initialized kernel symbol from this
3684          * module, to allow debugging with gdb that doesn't support data
3685          * symbols from modules.*/
3686         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3687
3688         rc = lu_kmem_init(osc_caches);
3689
3690         lprocfs_osc_init_vars(&lvars);
3691
3692         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3693                                  LUSTRE_OSC_NAME, &osc_device_type);
3694         if (rc) {
3695                 lu_kmem_fini(osc_caches);
3696                 RETURN(rc);
3697         }
3698
3699         spin_lock_init(&osc_ast_guard);
3700         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3701
3702         RETURN(rc);
3703 }
3704
3705 #ifdef __KERNEL__
3706 static void /*__exit*/ osc_exit(void)
3707 {
3708         class_unregister_type(LUSTRE_OSC_NAME);
3709         lu_kmem_fini(osc_caches);
3710 }
3711
3712 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3713 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3714 MODULE_LICENSE("GPL");
3715
3716 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3717 #endif