Whamcloud - gitweb
ed8b962c33382e81cec6aefed14c152973075a46
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50
51 #ifdef  __CYGWIN__
52 # include <ctype.h>
53 #endif
54
55 #include <lustre_ha.h>
56 #include <lprocfs_status.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_fid.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (lmmp == NULL)
78                 RETURN(lmm_size);
79
80         if (*lmmp != NULL && lsm == NULL) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
85                 RETURN(-EBADF);
86         }
87
88         if (*lmmp == NULL) {
89                 OBD_ALLOC(*lmmp, lmm_size);
90                 if (*lmmp == NULL)
91                         RETURN(-ENOMEM);
92         }
93
94         if (lsm)
95                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
96
97         RETURN(lmm_size);
98 }
99
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102                         struct lov_mds_md *lmm, int lmm_bytes)
103 {
104         int lsm_size;
105         struct obd_import *imp = class_exp2cliimp(exp);
106         ENTRY;
107
108         if (lmm != NULL) {
109                 if (lmm_bytes < sizeof(*lmm)) {
110                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
111                                exp->exp_obd->obd_name, lmm_bytes,
112                                (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
118                         CERROR("%s: zero lmm_object_id: rc = %d\n",
119                                exp->exp_obd->obd_name, -EINVAL);
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (unlikely(*lsmp == NULL))
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
146                 RETURN(-EBADF);
147         }
148
149         if (lmm != NULL)
150                 /* XXX zero *lsmp? */
151                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
152
153         if (imp != NULL &&
154             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
155                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
156         else
157                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
158
159         RETURN(lsm_size);
160 }
161
162 static inline void osc_pack_capa(struct ptlrpc_request *req,
163                                  struct ost_body *body, void *capa)
164 {
165         struct obd_capa *oc = (struct obd_capa *)capa;
166         struct lustre_capa *c;
167
168         if (!capa)
169                 return;
170
171         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
172         LASSERT(c);
173         capa_cpy(c, oc);
174         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
175         DEBUG_CAPA(D_SEC, c, "pack");
176 }
177
178 static inline void osc_pack_req_body(struct ptlrpc_request *req,
179                                      struct obd_info *oinfo)
180 {
181         struct ost_body *body;
182
183         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
184         LASSERT(body);
185
186         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
187                              oinfo->oi_oa);
188         osc_pack_capa(req, body, oinfo->oi_capa);
189 }
190
191 static inline void osc_set_capa_size(struct ptlrpc_request *req,
192                                      const struct req_msg_field *field,
193                                      struct obd_capa *oc)
194 {
195         if (oc == NULL)
196                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
197         else
198                 /* it is already calculated as sizeof struct obd_capa */
199                 ;
200 }
201
202 static int osc_getattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body) {
214                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
215                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
216                                      aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
296                              &body->oa);
297
298         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
341                              &body->oa);
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(const struct lu_env *env,
350                                  struct ptlrpc_request *req,
351                                  struct osc_setattr_args *sa, int rc)
352 {
353         struct ost_body *body;
354         ENTRY;
355
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360         if (body == NULL)
361                 GOTO(out, rc = -EPROTO);
362
363         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
364                              &body->oa);
365 out:
366         rc = sa->sa_upcall(sa->sa_cookie, rc);
367         RETURN(rc);
368 }
369
370 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
371                            struct obd_trans_info *oti,
372                            obd_enqueue_update_f upcall, void *cookie,
373                            struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request   *req;
376         struct osc_setattr_args *sa;
377         int                      rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
392                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
393
394         osc_pack_req_body(req, oinfo);
395
396         ptlrpc_request_set_replen(req);
397
398         /* do mds to ost setattr asynchronously */
399         if (!rqset) {
400                 /* Do not wait for response. */
401                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
402         } else {
403                 req->rq_interpret_reply =
404                         (ptlrpc_interpterer_t)osc_setattr_interpret;
405
406                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
407                 sa = ptlrpc_req_async_args(req);
408                 sa->sa_oa = oinfo->oi_oa;
409                 sa->sa_upcall = upcall;
410                 sa->sa_cookie = cookie;
411
412                 if (rqset == PTLRPCD_SET)
413                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
414                 else
415                         ptlrpc_set_add_req(rqset, req);
416         }
417
418         RETURN(0);
419 }
420
421 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
422                              struct obd_trans_info *oti,
423                              struct ptlrpc_request_set *rqset)
424 {
425         return osc_setattr_async_base(exp, oinfo, oti,
426                                       oinfo->oi_cb_up, oinfo, rqset);
427 }
428
429 int osc_real_create(struct obd_export *exp, struct obdo *oa,
430                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
431 {
432         struct ptlrpc_request *req;
433         struct ost_body       *body;
434         struct lov_stripe_md  *lsm;
435         int                    rc;
436         ENTRY;
437
438         LASSERT(oa);
439         LASSERT(ea);
440
441         lsm = *ea;
442         if (!lsm) {
443                 rc = obd_alloc_memmd(exp, &lsm);
444                 if (rc < 0)
445                         RETURN(rc);
446         }
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
449         if (req == NULL)
450                 GOTO(out, rc = -ENOMEM);
451
452         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
453         if (rc) {
454                 ptlrpc_request_free(req);
455                 GOTO(out, rc);
456         }
457
458         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
459         LASSERT(body);
460
461         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
462
463         ptlrpc_request_set_replen(req);
464
465         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
466             oa->o_flags == OBD_FL_DELORPHAN) {
467                 DEBUG_REQ(D_HA, req,
468                           "delorphan from OST integration");
469                 /* Don't resend the delorphan req */
470                 req->rq_no_resend = req->rq_no_delay = 1;
471         }
472
473         rc = ptlrpc_queue_wait(req);
474         if (rc)
475                 GOTO(out_req, rc);
476
477         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
478         if (body == NULL)
479                 GOTO(out_req, rc = -EPROTO);
480
481         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
482         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
483
484         oa->o_blksize = cli_brw_size(exp->exp_obd);
485         oa->o_valid |= OBD_MD_FLBLKSZ;
486
487         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
488          * have valid lsm_oinfo data structs, so don't go touching that.
489          * This needs to be fixed in a big way.
490          */
491         lsm->lsm_oi = oa->o_oi;
492         *ea = lsm;
493
494         if (oti != NULL) {
495                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
496
497                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
498                         if (!oti->oti_logcookies)
499                                 oti_alloc_cookies(oti, 1);
500                         *oti->oti_logcookies = oa->o_lcookie;
501                 }
502         }
503
504         CDEBUG(D_HA, "transno: "LPD64"\n",
505                lustre_msg_get_transno(req->rq_repmsg));
506 out_req:
507         ptlrpc_req_finished(req);
508 out:
509         if (rc && !*ea)
510                 obd_free_memmd(exp, &lsm);
511         RETURN(rc);
512 }
513
514 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
515                    obd_enqueue_update_f upcall, void *cookie,
516                    struct ptlrpc_request_set *rqset)
517 {
518         struct ptlrpc_request   *req;
519         struct osc_setattr_args *sa;
520         struct ost_body         *body;
521         int                      rc;
522         ENTRY;
523
524         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525         if (req == NULL)
526                 RETURN(-ENOMEM);
527
528         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
529         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
530         if (rc) {
531                 ptlrpc_request_free(req);
532                 RETURN(rc);
533         }
534         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
535         ptlrpc_at_set_req_timeout(req);
536
537         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
538         LASSERT(body);
539         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
540                              oinfo->oi_oa);
541         osc_pack_capa(req, body, oinfo->oi_capa);
542
543         ptlrpc_request_set_replen(req);
544
545         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
546         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
547         sa = ptlrpc_req_async_args(req);
548         sa->sa_oa     = oinfo->oi_oa;
549         sa->sa_upcall = upcall;
550         sa->sa_cookie = cookie;
551         if (rqset == PTLRPCD_SET)
552                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
553         else
554                 ptlrpc_set_add_req(rqset, req);
555
556         RETURN(0);
557 }
558
559 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
560                      struct obd_info *oinfo, struct obd_trans_info *oti,
561                      struct ptlrpc_request_set *rqset)
562 {
563         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
564         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
565         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
566         return osc_punch_base(exp, oinfo,
567                               oinfo->oi_cb_up, oinfo, rqset);
568 }
569
570 static int osc_sync_interpret(const struct lu_env *env,
571                               struct ptlrpc_request *req,
572                               void *arg, int rc)
573 {
574         struct osc_fsync_args *fa = arg;
575         struct ost_body *body;
576         ENTRY;
577
578         if (rc)
579                 GOTO(out, rc);
580
581         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
582         if (body == NULL) {
583                 CERROR ("can't unpack ost_body\n");
584                 GOTO(out, rc = -EPROTO);
585         }
586
587         *fa->fa_oi->oi_oa = body->oa;
588 out:
589         rc = fa->fa_upcall(fa->fa_cookie, rc);
590         RETURN(rc);
591 }
592
593 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
594                   obd_enqueue_update_f upcall, void *cookie,
595                   struct ptlrpc_request_set *rqset)
596 {
597         struct ptlrpc_request *req;
598         struct ost_body       *body;
599         struct osc_fsync_args *fa;
600         int                    rc;
601         ENTRY;
602
603         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
604         if (req == NULL)
605                 RETURN(-ENOMEM);
606
607         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
608         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
609         if (rc) {
610                 ptlrpc_request_free(req);
611                 RETURN(rc);
612         }
613
614         /* overload the size and blocks fields in the oa with start/end */
615         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
616         LASSERT(body);
617         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
618                              oinfo->oi_oa);
619         osc_pack_capa(req, body, oinfo->oi_capa);
620
621         ptlrpc_request_set_replen(req);
622         req->rq_interpret_reply = osc_sync_interpret;
623
624         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
625         fa = ptlrpc_req_async_args(req);
626         fa->fa_oi = oinfo;
627         fa->fa_upcall = upcall;
628         fa->fa_cookie = cookie;
629
630         if (rqset == PTLRPCD_SET)
631                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
632         else
633                 ptlrpc_set_add_req(rqset, req);
634
635         RETURN (0);
636 }
637
638 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
639                     struct obd_info *oinfo, obd_size start, obd_size end,
640                     struct ptlrpc_request_set *set)
641 {
642         ENTRY;
643
644         if (!oinfo->oi_oa) {
645                 CDEBUG(D_INFO, "oa NULL\n");
646                 RETURN(-EINVAL);
647         }
648
649         oinfo->oi_oa->o_size = start;
650         oinfo->oi_oa->o_blocks = end;
651         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
652
653         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
654 }
655
656 /* Find and cancel locally locks matched by @mode in the resource found by
657  * @objid. Found locks are added into @cancel list. Returns the amount of
658  * locks added to @cancels list. */
659 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
660                                    cfs_list_t *cancels,
661                                    ldlm_mode_t mode, __u64 lock_flags)
662 {
663         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
664         struct ldlm_res_id res_id;
665         struct ldlm_resource *res;
666         int count;
667         ENTRY;
668
669         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
670          * export) but disabled through procfs (flag in NS).
671          *
672          * This distinguishes from a case when ELC is not supported originally,
673          * when we still want to cancel locks in advance and just cancel them
674          * locally, without sending any RPC. */
675         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676                 RETURN(0);
677
678         ostid_build_res_name(&oa->o_oi, &res_id);
679         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
680         if (res == NULL)
681                 RETURN(0);
682
683         LDLM_RESOURCE_ADDREF(res);
684         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685                                            lock_flags, 0, NULL);
686         LDLM_RESOURCE_DELREF(res);
687         ldlm_resource_putref(res);
688         RETURN(count);
689 }
690
691 static int osc_destroy_interpret(const struct lu_env *env,
692                                  struct ptlrpc_request *req, void *data,
693                                  int rc)
694 {
695         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
696
697         atomic_dec(&cli->cl_destroy_in_flight);
698         wake_up(&cli->cl_destroy_waitq);
699         return 0;
700 }
701
702 static int osc_can_send_destroy(struct client_obd *cli)
703 {
704         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705             cli->cl_max_rpcs_in_flight) {
706                 /* The destroy request can be sent */
707                 return 1;
708         }
709         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710             cli->cl_max_rpcs_in_flight) {
711                 /*
712                  * The counter has been modified between the two atomic
713                  * operations.
714                  */
715                 wake_up(&cli->cl_destroy_waitq);
716         }
717         return 0;
718 }
719
720 int osc_create(const struct lu_env *env, struct obd_export *exp,
721                struct obdo *oa, struct lov_stripe_md **ea,
722                struct obd_trans_info *oti)
723 {
724         int rc = 0;
725         ENTRY;
726
727         LASSERT(oa);
728         LASSERT(ea);
729         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
730
731         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
732             oa->o_flags == OBD_FL_RECREATE_OBJS) {
733                 RETURN(osc_real_create(exp, oa, ea, oti));
734         }
735
736         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
737                 RETURN(osc_real_create(exp, oa, ea, oti));
738
739         /* we should not get here anymore */
740         LBUG();
741
742         RETURN(rc);
743 }
744
745 /* Destroy requests can be async always on the client, and we don't even really
746  * care about the return code since the client cannot do anything at all about
747  * a destroy failure.
748  * When the MDS is unlinking a filename, it saves the file objects into a
749  * recovery llog, and these object records are cancelled when the OST reports
750  * they were destroyed and sync'd to disk (i.e. transaction committed).
751  * If the client dies, or the OST is down when the object should be destroyed,
752  * the records are not cancelled, and when the OST reconnects to the MDS next,
753  * it will retrieve the llog unlink logs and then sends the log cancellation
754  * cookies to the MDS after committing destroy transactions. */
755 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
756                        struct obdo *oa, struct lov_stripe_md *ea,
757                        struct obd_trans_info *oti, struct obd_export *md_export,
758                        void *capa)
759 {
760         struct client_obd     *cli = &exp->exp_obd->u.cli;
761         struct ptlrpc_request *req;
762         struct ost_body       *body;
763         CFS_LIST_HEAD(cancels);
764         int rc, count;
765         ENTRY;
766
767         if (!oa) {
768                 CDEBUG(D_INFO, "oa NULL\n");
769                 RETURN(-EINVAL);
770         }
771
772         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
773                                         LDLM_FL_DISCARD_DATA);
774
775         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
776         if (req == NULL) {
777                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
778                 RETURN(-ENOMEM);
779         }
780
781         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
782         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
783                                0, &cancels, count);
784         if (rc) {
785                 ptlrpc_request_free(req);
786                 RETURN(rc);
787         }
788
789         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
790         ptlrpc_at_set_req_timeout(req);
791
792         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
793                 oa->o_lcookie = *oti->oti_logcookies;
794         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
795         LASSERT(body);
796         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
797
798         osc_pack_capa(req, body, (struct obd_capa *)capa);
799         ptlrpc_request_set_replen(req);
800
801         /* If osc_destory is for destroying the unlink orphan,
802          * sent from MDT to OST, which should not be blocked here,
803          * because the process might be triggered by ptlrpcd, and
804          * it is not good to block ptlrpcd thread (b=16006)*/
805         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
806                 req->rq_interpret_reply = osc_destroy_interpret;
807                 if (!osc_can_send_destroy(cli)) {
808                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
809                                                           NULL);
810
811                         /*
812                          * Wait until the number of on-going destroy RPCs drops
813                          * under max_rpc_in_flight
814                          */
815                         l_wait_event_exclusive(cli->cl_destroy_waitq,
816                                                osc_can_send_destroy(cli), &lwi);
817                 }
818         }
819
820         /* Do not wait for response */
821         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
822         RETURN(0);
823 }
824
825 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
826                                 long writing_bytes)
827 {
828         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
829
830         LASSERT(!(oa->o_valid & bits));
831
832         oa->o_valid |= bits;
833         client_obd_list_lock(&cli->cl_loi_list_lock);
834         oa->o_dirty = cli->cl_dirty;
835         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
836                      cli->cl_dirty_max)) {
837                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
838                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
839                 oa->o_undirty = 0;
840         } else if (unlikely(atomic_read(&obd_unstable_pages) +
841                             atomic_read(&obd_dirty_pages) -
842                             atomic_read(&obd_dirty_transit_pages) >
843                             (long)(obd_max_dirty_pages + 1))) {
844                 /* The atomic_read() allowing the atomic_inc() are
845                  * not covered by a lock thus they may safely race and trip
846                  * this CERROR() unless we add in a small fudge factor (+1). */
847                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
848                        cli->cl_import->imp_obd->obd_name,
849                        atomic_read(&obd_unstable_pages),
850                        atomic_read(&obd_dirty_pages),
851                        atomic_read(&obd_dirty_transit_pages),
852                        obd_max_dirty_pages);
853                 oa->o_undirty = 0;
854         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
855                 CERROR("dirty %lu - dirty_max %lu too big???\n",
856                        cli->cl_dirty, cli->cl_dirty_max);
857                 oa->o_undirty = 0;
858         } else {
859                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
860                                       PAGE_CACHE_SHIFT) *
861                                      (cli->cl_max_rpcs_in_flight + 1);
862                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
863         }
864         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
865         oa->o_dropped = cli->cl_lost_grant;
866         cli->cl_lost_grant = 0;
867         client_obd_list_unlock(&cli->cl_loi_list_lock);
868         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
869                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
870
871 }
872
873 void osc_update_next_shrink(struct client_obd *cli)
874 {
875         cli->cl_next_shrink_grant =
876                 cfs_time_shift(cli->cl_grant_shrink_interval);
877         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
878                cli->cl_next_shrink_grant);
879 }
880
881 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
882 {
883         client_obd_list_lock(&cli->cl_loi_list_lock);
884         cli->cl_avail_grant += grant;
885         client_obd_list_unlock(&cli->cl_loi_list_lock);
886 }
887
888 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
889 {
890         if (body->oa.o_valid & OBD_MD_FLGRANT) {
891                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
892                 __osc_update_grant(cli, body->oa.o_grant);
893         }
894 }
895
896 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
897                               obd_count keylen, void *key, obd_count vallen,
898                               void *val, struct ptlrpc_request_set *set);
899
900 static int osc_shrink_grant_interpret(const struct lu_env *env,
901                                       struct ptlrpc_request *req,
902                                       void *aa, int rc)
903 {
904         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
905         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
906         struct ost_body *body;
907
908         if (rc != 0) {
909                 __osc_update_grant(cli, oa->o_grant);
910                 GOTO(out, rc);
911         }
912
913         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
914         LASSERT(body);
915         osc_update_grant(cli, body);
916 out:
917         OBDO_FREE(oa);
918         return rc;
919 }
920
921 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
922 {
923         client_obd_list_lock(&cli->cl_loi_list_lock);
924         oa->o_grant = cli->cl_avail_grant / 4;
925         cli->cl_avail_grant -= oa->o_grant;
926         client_obd_list_unlock(&cli->cl_loi_list_lock);
927         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
928                 oa->o_valid |= OBD_MD_FLFLAGS;
929                 oa->o_flags = 0;
930         }
931         oa->o_flags |= OBD_FL_SHRINK_GRANT;
932         osc_update_next_shrink(cli);
933 }
934
935 /* Shrink the current grant, either from some large amount to enough for a
936  * full set of in-flight RPCs, or if we have already shrunk to that limit
937  * then to enough for a single RPC.  This avoids keeping more grant than
938  * needed, and avoids shrinking the grant piecemeal. */
939 static int osc_shrink_grant(struct client_obd *cli)
940 {
941         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
942                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
943
944         client_obd_list_lock(&cli->cl_loi_list_lock);
945         if (cli->cl_avail_grant <= target_bytes)
946                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
947         client_obd_list_unlock(&cli->cl_loi_list_lock);
948
949         return osc_shrink_grant_to_target(cli, target_bytes);
950 }
951
952 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
953 {
954         int                     rc = 0;
955         struct ost_body        *body;
956         ENTRY;
957
958         client_obd_list_lock(&cli->cl_loi_list_lock);
959         /* Don't shrink if we are already above or below the desired limit
960          * We don't want to shrink below a single RPC, as that will negatively
961          * impact block allocation and long-term performance. */
962         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
963                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
964
965         if (target_bytes >= cli->cl_avail_grant) {
966                 client_obd_list_unlock(&cli->cl_loi_list_lock);
967                 RETURN(0);
968         }
969         client_obd_list_unlock(&cli->cl_loi_list_lock);
970
971         OBD_ALLOC_PTR(body);
972         if (!body)
973                 RETURN(-ENOMEM);
974
975         osc_announce_cached(cli, &body->oa, 0);
976
977         client_obd_list_lock(&cli->cl_loi_list_lock);
978         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
979         cli->cl_avail_grant = target_bytes;
980         client_obd_list_unlock(&cli->cl_loi_list_lock);
981         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
982                 body->oa.o_valid |= OBD_MD_FLFLAGS;
983                 body->oa.o_flags = 0;
984         }
985         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
986         osc_update_next_shrink(cli);
987
988         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
989                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
990                                 sizeof(*body), body, NULL);
991         if (rc != 0)
992                 __osc_update_grant(cli, body->oa.o_grant);
993         OBD_FREE_PTR(body);
994         RETURN(rc);
995 }
996
997 static int osc_should_shrink_grant(struct client_obd *client)
998 {
999         cfs_time_t time = cfs_time_current();
1000         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1001
1002         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1003              OBD_CONNECT_GRANT_SHRINK) == 0)
1004                 return 0;
1005
1006         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1007                 /* Get the current RPC size directly, instead of going via:
1008                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1009                  * Keep comment here so that it can be found by searching. */
1010                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1011
1012                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1013                     client->cl_avail_grant > brw_size)
1014                         return 1;
1015                 else
1016                         osc_update_next_shrink(client);
1017         }
1018         return 0;
1019 }
1020
1021 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1022 {
1023         struct client_obd *client;
1024
1025         cfs_list_for_each_entry(client, &item->ti_obd_list,
1026                                 cl_grant_shrink_list) {
1027                 if (osc_should_shrink_grant(client))
1028                         osc_shrink_grant(client);
1029         }
1030         return 0;
1031 }
1032
1033 static int osc_add_shrink_grant(struct client_obd *client)
1034 {
1035         int rc;
1036
1037         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1038                                        TIMEOUT_GRANT,
1039                                        osc_grant_shrink_grant_cb, NULL,
1040                                        &client->cl_grant_shrink_list);
1041         if (rc) {
1042                 CERROR("add grant client %s error %d\n",
1043                         client->cl_import->imp_obd->obd_name, rc);
1044                 return rc;
1045         }
1046         CDEBUG(D_CACHE, "add grant client %s \n",
1047                client->cl_import->imp_obd->obd_name);
1048         osc_update_next_shrink(client);
1049         return 0;
1050 }
1051
1052 static int osc_del_shrink_grant(struct client_obd *client)
1053 {
1054         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1055                                          TIMEOUT_GRANT);
1056 }
1057
1058 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1059 {
1060         /*
1061          * ocd_grant is the total grant amount we're expect to hold: if we've
1062          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1063          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1064          *
1065          * race is tolerable here: if we're evicted, but imp_state already
1066          * left EVICTED state, then cl_dirty must be 0 already.
1067          */
1068         client_obd_list_lock(&cli->cl_loi_list_lock);
1069         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1070                 cli->cl_avail_grant = ocd->ocd_grant;
1071         else
1072                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1073
1074         if (cli->cl_avail_grant < 0) {
1075                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1076                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1077                       ocd->ocd_grant, cli->cl_dirty);
1078                 /* workaround for servers which do not have the patch from
1079                  * LU-2679 */
1080                 cli->cl_avail_grant = ocd->ocd_grant;
1081         }
1082
1083         /* determine the appropriate chunk size used by osc_extent. */
1084         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1085         client_obd_list_unlock(&cli->cl_loi_list_lock);
1086
1087         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1088                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1089                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1090
1091         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1092             cfs_list_empty(&cli->cl_grant_shrink_list))
1093                 osc_add_shrink_grant(cli);
1094 }
1095
1096 /* We assume that the reason this OSC got a short read is because it read
1097  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1098  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1099  * this stripe never got written at or beyond this stripe offset yet. */
1100 static void handle_short_read(int nob_read, obd_count page_count,
1101                               struct brw_page **pga)
1102 {
1103         char *ptr;
1104         int i = 0;
1105
1106         /* skip bytes read OK */
1107         while (nob_read > 0) {
1108                 LASSERT (page_count > 0);
1109
1110                 if (pga[i]->count > nob_read) {
1111                         /* EOF inside this page */
1112                         ptr = kmap(pga[i]->pg) +
1113                                 (pga[i]->off & ~CFS_PAGE_MASK);
1114                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1115                         kunmap(pga[i]->pg);
1116                         page_count--;
1117                         i++;
1118                         break;
1119                 }
1120
1121                 nob_read -= pga[i]->count;
1122                 page_count--;
1123                 i++;
1124         }
1125
1126         /* zero remaining pages */
1127         while (page_count-- > 0) {
1128                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1129                 memset(ptr, 0, pga[i]->count);
1130                 kunmap(pga[i]->pg);
1131                 i++;
1132         }
1133 }
1134
1135 static int check_write_rcs(struct ptlrpc_request *req,
1136                            int requested_nob, int niocount,
1137                            obd_count page_count, struct brw_page **pga)
1138 {
1139         int     i;
1140         __u32   *remote_rcs;
1141
1142         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1143                                                   sizeof(*remote_rcs) *
1144                                                   niocount);
1145         if (remote_rcs == NULL) {
1146                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1147                 return(-EPROTO);
1148         }
1149
1150         /* return error if any niobuf was in error */
1151         for (i = 0; i < niocount; i++) {
1152                 if ((int)remote_rcs[i] < 0)
1153                         return(remote_rcs[i]);
1154
1155                 if (remote_rcs[i] != 0) {
1156                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1157                                 i, remote_rcs[i], req);
1158                         return(-EPROTO);
1159                 }
1160         }
1161
1162         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1163                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1164                        req->rq_bulk->bd_nob_transferred, requested_nob);
1165                 return(-EPROTO);
1166         }
1167
1168         return (0);
1169 }
1170
1171 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1172 {
1173         if (p1->flag != p2->flag) {
1174                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1175                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1176                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1177
1178                 /* warn if we try to combine flags that we don't know to be
1179                  * safe to combine */
1180                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1181                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1182                               "report this at http://bugs.whamcloud.com/\n",
1183                               p1->flag, p2->flag);
1184                 }
1185                 return 0;
1186         }
1187
1188         return (p1->off + p1->count == p2->off);
1189 }
1190
1191 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1192                                    struct brw_page **pga, int opc,
1193                                    cksum_type_t cksum_type)
1194 {
1195         __u32                           cksum;
1196         int                             i = 0;
1197         struct cfs_crypto_hash_desc     *hdesc;
1198         unsigned int                    bufsize;
1199         int                             err;
1200         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1201
1202         LASSERT(pg_count > 0);
1203
1204         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1205         if (IS_ERR(hdesc)) {
1206                 CERROR("Unable to initialize checksum hash %s\n",
1207                        cfs_crypto_hash_name(cfs_alg));
1208                 return PTR_ERR(hdesc);
1209         }
1210
1211         while (nob > 0 && pg_count > 0) {
1212                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1213
1214                 /* corrupt the data before we compute the checksum, to
1215                  * simulate an OST->client data error */
1216                 if (i == 0 && opc == OST_READ &&
1217                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1218                         unsigned char *ptr = kmap(pga[i]->pg);
1219                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1220                         memcpy(ptr + off, "bad1", min(4, nob));
1221                         kunmap(pga[i]->pg);
1222                 }
1223                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1224                                   pga[i]->off & ~CFS_PAGE_MASK,
1225                                   count);
1226                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1227                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1228
1229                 nob -= pga[i]->count;
1230                 pg_count--;
1231                 i++;
1232         }
1233
1234         bufsize = 4;
1235         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1236
1237         if (err)
1238                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1239
1240         /* For sending we only compute the wrong checksum instead
1241          * of corrupting the data so it is still correct on a redo */
1242         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1243                 cksum++;
1244
1245         return cksum;
1246 }
1247
1248 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1249                                 struct lov_stripe_md *lsm, obd_count page_count,
1250                                 struct brw_page **pga,
1251                                 struct ptlrpc_request **reqp,
1252                                 struct obd_capa *ocapa, int reserve,
1253                                 int resend)
1254 {
1255         struct ptlrpc_request   *req;
1256         struct ptlrpc_bulk_desc *desc;
1257         struct ost_body         *body;
1258         struct obd_ioobj        *ioobj;
1259         struct niobuf_remote    *niobuf;
1260         int niocount, i, requested_nob, opc, rc;
1261         struct osc_brw_async_args *aa;
1262         struct req_capsule      *pill;
1263         struct brw_page *pg_prev;
1264
1265         ENTRY;
1266         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1267                 RETURN(-ENOMEM); /* Recoverable */
1268         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1269                 RETURN(-EINVAL); /* Fatal */
1270
1271         if ((cmd & OBD_BRW_WRITE) != 0) {
1272                 opc = OST_WRITE;
1273                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1274                                                 cli->cl_import->imp_rq_pool,
1275                                                 &RQF_OST_BRW_WRITE);
1276         } else {
1277                 opc = OST_READ;
1278                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1279         }
1280         if (req == NULL)
1281                 RETURN(-ENOMEM);
1282
1283         for (niocount = i = 1; i < page_count; i++) {
1284                 if (!can_merge_pages(pga[i - 1], pga[i]))
1285                         niocount++;
1286         }
1287
1288         pill = &req->rq_pill;
1289         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1290                              sizeof(*ioobj));
1291         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1292                              niocount * sizeof(*niobuf));
1293         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1294
1295         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1296         if (rc) {
1297                 ptlrpc_request_free(req);
1298                 RETURN(rc);
1299         }
1300         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1301         ptlrpc_at_set_req_timeout(req);
1302         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1303          * retry logic */
1304         req->rq_no_retry_einprogress = 1;
1305
1306         desc = ptlrpc_prep_bulk_imp(req, page_count,
1307                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1308                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1309                 OST_BULK_PORTAL);
1310
1311         if (desc == NULL)
1312                 GOTO(out, rc = -ENOMEM);
1313         /* NB request now owns desc and will free it when it gets freed */
1314
1315         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1316         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1317         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1318         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1319
1320         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1321
1322         obdo_to_ioobj(oa, ioobj);
1323         ioobj->ioo_bufcnt = niocount;
1324         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1325          * that might be send for this request.  The actual number is decided
1326          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1327          * "max - 1" for old client compatibility sending "0", and also so the
1328          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1329         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1330         osc_pack_capa(req, body, ocapa);
1331         LASSERT(page_count > 0);
1332         pg_prev = pga[0];
1333         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1334                 struct brw_page *pg = pga[i];
1335                 int poff = pg->off & ~CFS_PAGE_MASK;
1336
1337                 LASSERT(pg->count > 0);
1338                 /* make sure there is no gap in the middle of page array */
1339                 LASSERTF(page_count == 1 ||
1340                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1341                           ergo(i > 0 && i < page_count - 1,
1342                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1343                           ergo(i == page_count - 1, poff == 0)),
1344                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1345                          i, page_count, pg, pg->off, pg->count);
1346 #ifdef __linux__
1347                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1348                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1349                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1350                          i, page_count,
1351                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1352                          pg_prev->pg, page_private(pg_prev->pg),
1353                          pg_prev->pg->index, pg_prev->off);
1354 #else
1355                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1356                          "i %d p_c %u\n", i, page_count);
1357 #endif
1358                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1359                         (pg->flag & OBD_BRW_SRVLOCK));
1360
1361                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1362                 requested_nob += pg->count;
1363
1364                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1365                         niobuf--;
1366                         niobuf->len += pg->count;
1367                 } else {
1368                         niobuf->offset = pg->off;
1369                         niobuf->len    = pg->count;
1370                         niobuf->flags  = pg->flag;
1371                 }
1372                 pg_prev = pg;
1373         }
1374
1375         LASSERTF((void *)(niobuf - niocount) ==
1376                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1377                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1378                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1379
1380         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1381         if (resend) {
1382                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1383                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1384                         body->oa.o_flags = 0;
1385                 }
1386                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1387         }
1388
1389         if (osc_should_shrink_grant(cli))
1390                 osc_shrink_grant_local(cli, &body->oa);
1391
1392         /* size[REQ_REC_OFF] still sizeof (*body) */
1393         if (opc == OST_WRITE) {
1394                 if (cli->cl_checksum &&
1395                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1396                         /* store cl_cksum_type in a local variable since
1397                          * it can be changed via lprocfs */
1398                         cksum_type_t cksum_type = cli->cl_cksum_type;
1399
1400                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1402                                 body->oa.o_flags = 0;
1403                         }
1404                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1405                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1406                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1407                                                              page_count, pga,
1408                                                              OST_WRITE,
1409                                                              cksum_type);
1410                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1411                                body->oa.o_cksum);
1412                         /* save this in 'oa', too, for later checking */
1413                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1414                         oa->o_flags |= cksum_type_pack(cksum_type);
1415                 } else {
1416                         /* clear out the checksum flag, in case this is a
1417                          * resend but cl_checksum is no longer set. b=11238 */
1418                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1419                 }
1420                 oa->o_cksum = body->oa.o_cksum;
1421                 /* 1 RC per niobuf */
1422                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1423                                      sizeof(__u32) * niocount);
1424         } else {
1425                 if (cli->cl_checksum &&
1426                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1427                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1428                                 body->oa.o_flags = 0;
1429                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1430                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431                 }
1432         }
1433         ptlrpc_request_set_replen(req);
1434
1435         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1436         aa = ptlrpc_req_async_args(req);
1437         aa->aa_oa = oa;
1438         aa->aa_requested_nob = requested_nob;
1439         aa->aa_nio_count = niocount;
1440         aa->aa_page_count = page_count;
1441         aa->aa_resends = 0;
1442         aa->aa_ppga = pga;
1443         aa->aa_cli = cli;
1444         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1445         if (ocapa && reserve)
1446                 aa->aa_ocapa = capa_get(ocapa);
1447
1448         *reqp = req;
1449         RETURN(0);
1450
1451  out:
1452         ptlrpc_req_finished(req);
1453         RETURN(rc);
1454 }
1455
1456 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1457                                 __u32 client_cksum, __u32 server_cksum, int nob,
1458                                 obd_count page_count, struct brw_page **pga,
1459                                 cksum_type_t client_cksum_type)
1460 {
1461         __u32 new_cksum;
1462         char *msg;
1463         cksum_type_t cksum_type;
1464
1465         if (server_cksum == client_cksum) {
1466                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1467                 return 0;
1468         }
1469
1470         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1471                                        oa->o_flags : 0);
1472         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1473                                       cksum_type);
1474
1475         if (cksum_type != client_cksum_type)
1476                 msg = "the server did not use the checksum type specified in "
1477                       "the original request - likely a protocol problem";
1478         else if (new_cksum == server_cksum)
1479                 msg = "changed on the client after we checksummed it - "
1480                       "likely false positive due to mmap IO (bug 11742)";
1481         else if (new_cksum == client_cksum)
1482                 msg = "changed in transit before arrival at OST";
1483         else
1484                 msg = "changed in transit AND doesn't match the original - "
1485                       "likely false positive due to mmap IO (bug 11742)";
1486
1487         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1488                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1489                            msg, libcfs_nid2str(peer->nid),
1490                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1491                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1492                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1493                            POSTID(&oa->o_oi), pga[0]->off,
1494                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1495         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1496                "client csum now %x\n", client_cksum, client_cksum_type,
1497                server_cksum, cksum_type, new_cksum);
1498         return 1;
1499 }
1500
1501 /* Note rc enters this function as number of bytes transferred */
1502 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1503 {
1504         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1505         const lnet_process_id_t *peer =
1506                         &req->rq_import->imp_connection->c_peer;
1507         struct client_obd *cli = aa->aa_cli;
1508         struct ost_body *body;
1509         __u32 client_cksum = 0;
1510         ENTRY;
1511
1512         if (rc < 0 && rc != -EDQUOT) {
1513                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1514                 RETURN(rc);
1515         }
1516
1517         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1518         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1519         if (body == NULL) {
1520                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1521                 RETURN(-EPROTO);
1522         }
1523
1524         /* set/clear over quota flag for a uid/gid */
1525         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1526             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1527                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1528
1529                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1530                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1531                        body->oa.o_flags);
1532                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1533         }
1534
1535         osc_update_grant(cli, body);
1536
1537         if (rc < 0)
1538                 RETURN(rc);
1539
1540         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1541                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1542
1543         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1544                 if (rc > 0) {
1545                         CERROR("Unexpected +ve rc %d\n", rc);
1546                         RETURN(-EPROTO);
1547                 }
1548                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1549
1550                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1551                         RETURN(-EAGAIN);
1552
1553                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1554                     check_write_checksum(&body->oa, peer, client_cksum,
1555                                          body->oa.o_cksum, aa->aa_requested_nob,
1556                                          aa->aa_page_count, aa->aa_ppga,
1557                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1558                         RETURN(-EAGAIN);
1559
1560                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1561                                      aa->aa_page_count, aa->aa_ppga);
1562                 GOTO(out, rc);
1563         }
1564
1565         /* The rest of this function executes only for OST_READs */
1566
1567         /* if unwrap_bulk failed, return -EAGAIN to retry */
1568         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1569         if (rc < 0)
1570                 GOTO(out, rc = -EAGAIN);
1571
1572         if (rc > aa->aa_requested_nob) {
1573                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1574                        aa->aa_requested_nob);
1575                 RETURN(-EPROTO);
1576         }
1577
1578         if (rc != req->rq_bulk->bd_nob_transferred) {
1579                 CERROR ("Unexpected rc %d (%d transferred)\n",
1580                         rc, req->rq_bulk->bd_nob_transferred);
1581                 return (-EPROTO);
1582         }
1583
1584         if (rc < aa->aa_requested_nob)
1585                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1586
1587         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1588                 static int cksum_counter;
1589                 __u32      server_cksum = body->oa.o_cksum;
1590                 char      *via;
1591                 char      *router;
1592                 cksum_type_t cksum_type;
1593
1594                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1595                                                body->oa.o_flags : 0);
1596                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1597                                                  aa->aa_ppga, OST_READ,
1598                                                  cksum_type);
1599
1600                 if (peer->nid == req->rq_bulk->bd_sender) {
1601                         via = router = "";
1602                 } else {
1603                         via = " via ";
1604                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1605                 }
1606
1607                 if (server_cksum == ~0 && rc > 0) {
1608                         CERROR("Protocol error: server %s set the 'checksum' "
1609                                "bit, but didn't send a checksum.  Not fatal, "
1610                                "but please notify on http://bugs.whamcloud.com/\n",
1611                                libcfs_nid2str(peer->nid));
1612                 } else if (server_cksum != client_cksum) {
1613                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1614                                            "%s%s%s inode "DFID" object "DOSTID
1615                                            " extent ["LPU64"-"LPU64"]\n",
1616                                            req->rq_import->imp_obd->obd_name,
1617                                            libcfs_nid2str(peer->nid),
1618                                            via, router,
1619                                            body->oa.o_valid & OBD_MD_FLFID ?
1620                                                 body->oa.o_parent_seq : (__u64)0,
1621                                            body->oa.o_valid & OBD_MD_FLFID ?
1622                                                 body->oa.o_parent_oid : 0,
1623                                            body->oa.o_valid & OBD_MD_FLFID ?
1624                                                 body->oa.o_parent_ver : 0,
1625                                            POSTID(&body->oa.o_oi),
1626                                            aa->aa_ppga[0]->off,
1627                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1628                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1629                                                                         1);
1630                         CERROR("client %x, server %x, cksum_type %x\n",
1631                                client_cksum, server_cksum, cksum_type);
1632                         cksum_counter = 0;
1633                         aa->aa_oa->o_cksum = client_cksum;
1634                         rc = -EAGAIN;
1635                 } else {
1636                         cksum_counter++;
1637                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1638                         rc = 0;
1639                 }
1640         } else if (unlikely(client_cksum)) {
1641                 static int cksum_missed;
1642
1643                 cksum_missed++;
1644                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1645                         CERROR("Checksum %u requested from %s but not sent\n",
1646                                cksum_missed, libcfs_nid2str(peer->nid));
1647         } else {
1648                 rc = 0;
1649         }
1650 out:
1651         if (rc >= 0)
1652                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1653                                      aa->aa_oa, &body->oa);
1654
1655         RETURN(rc);
1656 }
1657
1658 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1659                             struct lov_stripe_md *lsm,
1660                             obd_count page_count, struct brw_page **pga,
1661                             struct obd_capa *ocapa)
1662 {
1663         struct ptlrpc_request *req;
1664         int                    rc;
1665         wait_queue_head_t            waitq;
1666         int                    generation, resends = 0;
1667         struct l_wait_info     lwi;
1668
1669         ENTRY;
1670
1671         init_waitqueue_head(&waitq);
1672         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1673
1674 restart_bulk:
1675         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1676                                   page_count, pga, &req, ocapa, 0, resends);
1677         if (rc != 0)
1678                 return (rc);
1679
1680         if (resends) {
1681                 req->rq_generation_set = 1;
1682                 req->rq_import_generation = generation;
1683                 req->rq_sent = cfs_time_current_sec() + resends;
1684         }
1685
1686         rc = ptlrpc_queue_wait(req);
1687
1688         if (rc == -ETIMEDOUT && req->rq_resend) {
1689                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1690                 ptlrpc_req_finished(req);
1691                 goto restart_bulk;
1692         }
1693
1694         rc = osc_brw_fini_request(req, rc);
1695
1696         ptlrpc_req_finished(req);
1697         /* When server return -EINPROGRESS, client should always retry
1698          * regardless of the number of times the bulk was resent already.*/
1699         if (osc_recoverable_error(rc)) {
1700                 resends++;
1701                 if (rc != -EINPROGRESS &&
1702                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1703                         CERROR("%s: too many resend retries for object: "
1704                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1705                                POSTID(&oa->o_oi), rc);
1706                         goto out;
1707                 }
1708                 if (generation !=
1709                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1710                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1711                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1712                                POSTID(&oa->o_oi), rc);
1713                         goto out;
1714                 }
1715
1716                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1717                                        NULL);
1718                 l_wait_event(waitq, 0, &lwi);
1719
1720                 goto restart_bulk;
1721         }
1722 out:
1723         if (rc == -EAGAIN || rc == -EINPROGRESS)
1724                 rc = -EIO;
1725         RETURN (rc);
1726 }
1727
1728 static int osc_brw_redo_request(struct ptlrpc_request *request,
1729                                 struct osc_brw_async_args *aa, int rc)
1730 {
1731         struct ptlrpc_request *new_req;
1732         struct osc_brw_async_args *new_aa;
1733         struct osc_async_page *oap;
1734         ENTRY;
1735
1736         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1737                   "redo for recoverable error %d", rc);
1738
1739         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1740                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1741                                   aa->aa_cli, aa->aa_oa,
1742                                   NULL /* lsm unused by osc currently */,
1743                                   aa->aa_page_count, aa->aa_ppga,
1744                                   &new_req, aa->aa_ocapa, 0, 1);
1745         if (rc)
1746                 RETURN(rc);
1747
1748         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1749                 if (oap->oap_request != NULL) {
1750                         LASSERTF(request == oap->oap_request,
1751                                  "request %p != oap_request %p\n",
1752                                  request, oap->oap_request);
1753                         if (oap->oap_interrupted) {
1754                                 ptlrpc_req_finished(new_req);
1755                                 RETURN(-EINTR);
1756                         }
1757                 }
1758         }
1759         /* New request takes over pga and oaps from old request.
1760          * Note that copying a list_head doesn't work, need to move it... */
1761         aa->aa_resends++;
1762         new_req->rq_interpret_reply = request->rq_interpret_reply;
1763         new_req->rq_async_args = request->rq_async_args;
1764         new_req->rq_commit_cb = request->rq_commit_cb;
1765         /* cap resend delay to the current request timeout, this is similar to
1766          * what ptlrpc does (see after_reply()) */
1767         if (aa->aa_resends > new_req->rq_timeout)
1768                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1769         else
1770                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1771         new_req->rq_generation_set = 1;
1772         new_req->rq_import_generation = request->rq_import_generation;
1773
1774         new_aa = ptlrpc_req_async_args(new_req);
1775
1776         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1777         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1778         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1779         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1780         new_aa->aa_resends = aa->aa_resends;
1781
1782         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1783                 if (oap->oap_request) {
1784                         ptlrpc_req_finished(oap->oap_request);
1785                         oap->oap_request = ptlrpc_request_addref(new_req);
1786                 }
1787         }
1788
1789         new_aa->aa_ocapa = aa->aa_ocapa;
1790         aa->aa_ocapa = NULL;
1791
1792         /* XXX: This code will run into problem if we're going to support
1793          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1794          * and wait for all of them to be finished. We should inherit request
1795          * set from old request. */
1796         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1797
1798         DEBUG_REQ(D_INFO, new_req, "new request");
1799         RETURN(0);
1800 }
1801
1802 /*
1803  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1804  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1805  * fine for our small page arrays and doesn't require allocation.  its an
1806  * insertion sort that swaps elements that are strides apart, shrinking the
1807  * stride down until its '1' and the array is sorted.
1808  */
1809 static void sort_brw_pages(struct brw_page **array, int num)
1810 {
1811         int stride, i, j;
1812         struct brw_page *tmp;
1813
1814         if (num == 1)
1815                 return;
1816         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1817                 ;
1818
1819         do {
1820                 stride /= 3;
1821                 for (i = stride ; i < num ; i++) {
1822                         tmp = array[i];
1823                         j = i;
1824                         while (j >= stride && array[j - stride]->off > tmp->off) {
1825                                 array[j] = array[j - stride];
1826                                 j -= stride;
1827                         }
1828                         array[j] = tmp;
1829                 }
1830         } while (stride > 1);
1831 }
1832
1833 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1834 {
1835         int count = 1;
1836         int offset;
1837         int i = 0;
1838
1839         LASSERT (pages > 0);
1840         offset = pg[i]->off & ~CFS_PAGE_MASK;
1841
1842         for (;;) {
1843                 pages--;
1844                 if (pages == 0)         /* that's all */
1845                         return count;
1846
1847                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1848                         return count;   /* doesn't end on page boundary */
1849
1850                 i++;
1851                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1852                 if (offset != 0)        /* doesn't start on page boundary */
1853                         return count;
1854
1855                 count++;
1856         }
1857 }
1858
1859 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1860 {
1861         struct brw_page **ppga;
1862         int i;
1863
1864         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1865         if (ppga == NULL)
1866                 return NULL;
1867
1868         for (i = 0; i < count; i++)
1869                 ppga[i] = pga + i;
1870         return ppga;
1871 }
1872
1873 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1874 {
1875         LASSERT(ppga != NULL);
1876         OBD_FREE(ppga, sizeof(*ppga) * count);
1877 }
1878
1879 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1880                    obd_count page_count, struct brw_page *pga,
1881                    struct obd_trans_info *oti)
1882 {
1883         struct obdo *saved_oa = NULL;
1884         struct brw_page **ppga, **orig;
1885         struct obd_import *imp = class_exp2cliimp(exp);
1886         struct client_obd *cli;
1887         int rc, page_count_orig;
1888         ENTRY;
1889
1890         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1891         cli = &imp->imp_obd->u.cli;
1892
1893         if (cmd & OBD_BRW_CHECK) {
1894                 /* The caller just wants to know if there's a chance that this
1895                  * I/O can succeed */
1896
1897                 if (imp->imp_invalid)
1898                         RETURN(-EIO);
1899                 RETURN(0);
1900         }
1901
1902         /* test_brw with a failed create can trip this, maybe others. */
1903         LASSERT(cli->cl_max_pages_per_rpc);
1904
1905         rc = 0;
1906
1907         orig = ppga = osc_build_ppga(pga, page_count);
1908         if (ppga == NULL)
1909                 RETURN(-ENOMEM);
1910         page_count_orig = page_count;
1911
1912         sort_brw_pages(ppga, page_count);
1913         while (page_count) {
1914                 obd_count pages_per_brw;
1915
1916                 if (page_count > cli->cl_max_pages_per_rpc)
1917                         pages_per_brw = cli->cl_max_pages_per_rpc;
1918                 else
1919                         pages_per_brw = page_count;
1920
1921                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1922
1923                 if (saved_oa != NULL) {
1924                         /* restore previously saved oa */
1925                         *oinfo->oi_oa = *saved_oa;
1926                 } else if (page_count > pages_per_brw) {
1927                         /* save a copy of oa (brw will clobber it) */
1928                         OBDO_ALLOC(saved_oa);
1929                         if (saved_oa == NULL)
1930                                 GOTO(out, rc = -ENOMEM);
1931                         *saved_oa = *oinfo->oi_oa;
1932                 }
1933
1934                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1935                                       pages_per_brw, ppga, oinfo->oi_capa);
1936
1937                 if (rc != 0)
1938                         break;
1939
1940                 page_count -= pages_per_brw;
1941                 ppga += pages_per_brw;
1942         }
1943
1944 out:
1945         osc_release_ppga(orig, page_count_orig);
1946
1947         if (saved_oa != NULL)
1948                 OBDO_FREE(saved_oa);
1949
1950         RETURN(rc);
1951 }
1952
1953 static int brw_interpret(const struct lu_env *env,
1954                          struct ptlrpc_request *req, void *data, int rc)
1955 {
1956         struct osc_brw_async_args *aa = data;
1957         struct osc_extent *ext;
1958         struct osc_extent *tmp;
1959         struct client_obd *cli = aa->aa_cli;
1960         ENTRY;
1961
1962         rc = osc_brw_fini_request(req, rc);
1963         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1964         /* When server return -EINPROGRESS, client should always retry
1965          * regardless of the number of times the bulk was resent already. */
1966         if (osc_recoverable_error(rc)) {
1967                 if (req->rq_import_generation !=
1968                     req->rq_import->imp_generation) {
1969                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1970                                ""DOSTID", rc = %d.\n",
1971                                req->rq_import->imp_obd->obd_name,
1972                                POSTID(&aa->aa_oa->o_oi), rc);
1973                 } else if (rc == -EINPROGRESS ||
1974                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1975                         rc = osc_brw_redo_request(req, aa, rc);
1976                 } else {
1977                         CERROR("%s: too many resent retries for object: "
1978                                ""LPU64":"LPU64", rc = %d.\n",
1979                                req->rq_import->imp_obd->obd_name,
1980                                POSTID(&aa->aa_oa->o_oi), rc);
1981                 }
1982
1983                 if (rc == 0)
1984                         RETURN(0);
1985                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1986                         rc = -EIO;
1987         }
1988
1989         if (aa->aa_ocapa) {
1990                 capa_put(aa->aa_ocapa);
1991                 aa->aa_ocapa = NULL;
1992         }
1993
1994         if (rc == 0) {
1995                 struct obdo *oa = aa->aa_oa;
1996                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1997                 unsigned long valid = 0;
1998                 struct cl_object *obj;
1999                 struct osc_async_page *last;
2000
2001                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2002                 obj = osc2cl(last->oap_obj);
2003
2004                 cl_object_attr_lock(obj);
2005                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2006                         attr->cat_blocks = oa->o_blocks;
2007                         valid |= CAT_BLOCKS;
2008                 }
2009                 if (oa->o_valid & OBD_MD_FLMTIME) {
2010                         attr->cat_mtime = oa->o_mtime;
2011                         valid |= CAT_MTIME;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLATIME) {
2014                         attr->cat_atime = oa->o_atime;
2015                         valid |= CAT_ATIME;
2016                 }
2017                 if (oa->o_valid & OBD_MD_FLCTIME) {
2018                         attr->cat_ctime = oa->o_ctime;
2019                         valid |= CAT_CTIME;
2020                 }
2021
2022                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2023                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2024                         loff_t last_off = last->oap_count + last->oap_obj_off;
2025
2026                         /* Change file size if this is an out of quota or
2027                          * direct IO write and it extends the file size */
2028                         if (loi->loi_lvb.lvb_size < last_off) {
2029                                 attr->cat_size = last_off;
2030                                 valid |= CAT_SIZE;
2031                         }
2032                         /* Extend KMS if it's not a lockless write */
2033                         if (loi->loi_kms < last_off &&
2034                             oap2osc_page(last)->ops_srvlock == 0) {
2035                                 attr->cat_kms = last_off;
2036                                 valid |= CAT_KMS;
2037                         }
2038                 }
2039
2040                 if (valid != 0)
2041                         cl_object_attr_set(env, obj, attr, valid);
2042                 cl_object_attr_unlock(obj);
2043         }
2044         OBDO_FREE(aa->aa_oa);
2045
2046         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2047                 cfs_list_del_init(&ext->oe_link);
2048                 osc_extent_finish(env, ext, 1, rc);
2049         }
2050         LASSERT(cfs_list_empty(&aa->aa_exts));
2051         LASSERT(cfs_list_empty(&aa->aa_oaps));
2052
2053         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2054                           req->rq_bulk->bd_nob_transferred);
2055         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2056         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2057
2058         client_obd_list_lock(&cli->cl_loi_list_lock);
2059         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2060          * is called so we know whether to go to sync BRWs or wait for more
2061          * RPCs to complete */
2062         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2063                 cli->cl_w_in_flight--;
2064         else
2065                 cli->cl_r_in_flight--;
2066         osc_wake_cache_waiters(cli);
2067         client_obd_list_unlock(&cli->cl_loi_list_lock);
2068
2069         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2070         RETURN(rc);
2071 }
2072
2073 static void brw_commit(struct ptlrpc_request *req)
2074 {
2075         spin_lock(&req->rq_lock);
2076         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2077          * this called via the rq_commit_cb, I need to ensure
2078          * osc_dec_unstable_pages is still called. Otherwise unstable
2079          * pages may be leaked. */
2080         if (req->rq_unstable) {
2081                 spin_unlock(&req->rq_lock);
2082                 osc_dec_unstable_pages(req);
2083                 spin_lock(&req->rq_lock);
2084         } else {
2085                 req->rq_committed = 1;
2086         }
2087         spin_unlock(&req->rq_lock);
2088 }
2089
2090 /**
2091  * Build an RPC by the list of extent @ext_list. The caller must ensure
2092  * that the total pages in this list are NOT over max pages per RPC.
2093  * Extents in the list must be in OES_RPC state.
2094  */
2095 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2096                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2097 {
2098         struct ptlrpc_request           *req = NULL;
2099         struct osc_extent               *ext;
2100         struct brw_page                 **pga = NULL;
2101         struct osc_brw_async_args       *aa = NULL;
2102         struct obdo                     *oa = NULL;
2103         struct osc_async_page           *oap;
2104         struct osc_async_page           *tmp;
2105         struct cl_req                   *clerq = NULL;
2106         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2107                                                                       CRT_READ;
2108         struct ldlm_lock                *lock = NULL;
2109         struct cl_req_attr              *crattr = NULL;
2110         obd_off                         starting_offset = OBD_OBJECT_EOF;
2111         obd_off                         ending_offset = 0;
2112         int                             mpflag = 0;
2113         int                             mem_tight = 0;
2114         int                             page_count = 0;
2115         int                             i;
2116         int                             rc;
2117         CFS_LIST_HEAD(rpc_list);
2118
2119         ENTRY;
2120         LASSERT(!cfs_list_empty(ext_list));
2121
2122         /* add pages into rpc_list to build BRW rpc */
2123         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2124                 LASSERT(ext->oe_state == OES_RPC);
2125                 mem_tight |= ext->oe_memalloc;
2126                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2127                         ++page_count;
2128                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2129                         if (starting_offset > oap->oap_obj_off)
2130                                 starting_offset = oap->oap_obj_off;
2131                         else
2132                                 LASSERT(oap->oap_page_off == 0);
2133                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2134                                 ending_offset = oap->oap_obj_off +
2135                                                 oap->oap_count;
2136                         else
2137                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2138                                         PAGE_CACHE_SIZE);
2139                 }
2140         }
2141
2142         if (mem_tight)
2143                 mpflag = cfs_memory_pressure_get_and_set();
2144
2145         OBD_ALLOC(crattr, sizeof(*crattr));
2146         if (crattr == NULL)
2147                 GOTO(out, rc = -ENOMEM);
2148
2149         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2150         if (pga == NULL)
2151                 GOTO(out, rc = -ENOMEM);
2152
2153         OBDO_ALLOC(oa);
2154         if (oa == NULL)
2155                 GOTO(out, rc = -ENOMEM);
2156
2157         i = 0;
2158         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2159                 struct cl_page *page = oap2cl_page(oap);
2160                 if (clerq == NULL) {
2161                         clerq = cl_req_alloc(env, page, crt,
2162                                              1 /* only 1-object rpcs for now */);
2163                         if (IS_ERR(clerq))
2164                                 GOTO(out, rc = PTR_ERR(clerq));
2165                         lock = oap->oap_ldlm_lock;
2166                 }
2167                 if (mem_tight)
2168                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2169                 pga[i] = &oap->oap_brw_page;
2170                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2171                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2172                        pga[i]->pg, page_index(oap->oap_page), oap,
2173                        pga[i]->flag);
2174                 i++;
2175                 cl_req_page_add(env, clerq, page);
2176         }
2177
2178         /* always get the data for the obdo for the rpc */
2179         LASSERT(clerq != NULL);
2180         crattr->cra_oa = oa;
2181         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2182         if (lock) {
2183                 oa->o_handle = lock->l_remote_handle;
2184                 oa->o_valid |= OBD_MD_FLHANDLE;
2185         }
2186
2187         rc = cl_req_prep(env, clerq);
2188         if (rc != 0) {
2189                 CERROR("cl_req_prep failed: %d\n", rc);
2190                 GOTO(out, rc);
2191         }
2192
2193         sort_brw_pages(pga, page_count);
2194         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2195                         pga, &req, crattr->cra_capa, 1, 0);
2196         if (rc != 0) {
2197                 CERROR("prep_req failed: %d\n", rc);
2198                 GOTO(out, rc);
2199         }
2200
2201         req->rq_commit_cb = brw_commit;
2202         req->rq_interpret_reply = brw_interpret;
2203
2204         if (mem_tight != 0)
2205                 req->rq_memalloc = 1;
2206
2207         /* Need to update the timestamps after the request is built in case
2208          * we race with setattr (locally or in queue at OST).  If OST gets
2209          * later setattr before earlier BRW (as determined by the request xid),
2210          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2211          * way to do this in a single call.  bug 10150 */
2212         cl_req_attr_set(env, clerq, crattr,
2213                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2214
2215         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2216
2217         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2218         aa = ptlrpc_req_async_args(req);
2219         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2220         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2221         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2222         cfs_list_splice_init(ext_list, &aa->aa_exts);
2223         aa->aa_clerq = clerq;
2224
2225         /* queued sync pages can be torn down while the pages
2226          * were between the pending list and the rpc */
2227         tmp = NULL;
2228         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2229                 /* only one oap gets a request reference */
2230                 if (tmp == NULL)
2231                         tmp = oap;
2232                 if (oap->oap_interrupted && !req->rq_intr) {
2233                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2234                                         oap, req);
2235                         ptlrpc_mark_interrupted(req);
2236                 }
2237         }
2238         if (tmp != NULL)
2239                 tmp->oap_request = ptlrpc_request_addref(req);
2240
2241         client_obd_list_lock(&cli->cl_loi_list_lock);
2242         starting_offset >>= PAGE_CACHE_SHIFT;
2243         if (cmd == OBD_BRW_READ) {
2244                 cli->cl_r_in_flight++;
2245                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2246                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2247                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2248                                       starting_offset + 1);
2249         } else {
2250                 cli->cl_w_in_flight++;
2251                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2252                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2253                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2254                                       starting_offset + 1);
2255         }
2256         client_obd_list_unlock(&cli->cl_loi_list_lock);
2257
2258         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2259                   page_count, aa, cli->cl_r_in_flight,
2260                   cli->cl_w_in_flight);
2261
2262         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2263          * see which CPU/NUMA node the majority of pages were allocated
2264          * on, and try to assign the async RPC to the CPU core
2265          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2266          *
2267          * But on the other hand, we expect that multiple ptlrpcd
2268          * threads and the initial write sponsor can run in parallel,
2269          * especially when data checksum is enabled, which is CPU-bound
2270          * operation and single ptlrpcd thread cannot process in time.
2271          * So more ptlrpcd threads sharing BRW load
2272          * (with PDL_POLICY_ROUND) seems better.
2273          */
2274         ptlrpcd_add_req(req, pol, -1);
2275         rc = 0;
2276         EXIT;
2277
2278 out:
2279         if (mem_tight != 0)
2280                 cfs_memory_pressure_restore(mpflag);
2281
2282         if (crattr != NULL) {
2283                 capa_put(crattr->cra_capa);
2284                 OBD_FREE(crattr, sizeof(*crattr));
2285         }
2286
2287         if (rc != 0) {
2288                 LASSERT(req == NULL);
2289
2290                 if (oa)
2291                         OBDO_FREE(oa);
2292                 if (pga)
2293                         OBD_FREE(pga, sizeof(*pga) * page_count);
2294                 /* this should happen rarely and is pretty bad, it makes the
2295                  * pending list not follow the dirty order */
2296                 while (!cfs_list_empty(ext_list)) {
2297                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2298                                              oe_link);
2299                         cfs_list_del_init(&ext->oe_link);
2300                         osc_extent_finish(env, ext, 0, rc);
2301                 }
2302                 if (clerq && !IS_ERR(clerq))
2303                         cl_req_completion(env, clerq, rc);
2304         }
2305         RETURN(rc);
2306 }
2307
2308 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2309                                         struct ldlm_enqueue_info *einfo)
2310 {
2311         void *data = einfo->ei_cbdata;
2312         int set = 0;
2313
2314         LASSERT(lock != NULL);
2315         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2316         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2317         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2318         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2319
2320         lock_res_and_lock(lock);
2321         spin_lock(&osc_ast_guard);
2322
2323         if (lock->l_ast_data == NULL)
2324                 lock->l_ast_data = data;
2325         if (lock->l_ast_data == data)
2326                 set = 1;
2327
2328         spin_unlock(&osc_ast_guard);
2329         unlock_res_and_lock(lock);
2330
2331         return set;
2332 }
2333
2334 static int osc_set_data_with_check(struct lustre_handle *lockh,
2335                                    struct ldlm_enqueue_info *einfo)
2336 {
2337         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2338         int set = 0;
2339
2340         if (lock != NULL) {
2341                 set = osc_set_lock_data_with_check(lock, einfo);
2342                 LDLM_LOCK_PUT(lock);
2343         } else
2344                 CERROR("lockh %p, data %p - client evicted?\n",
2345                        lockh, einfo->ei_cbdata);
2346         return set;
2347 }
2348
2349 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2350                              ldlm_iterator_t replace, void *data)
2351 {
2352         struct ldlm_res_id res_id;
2353         struct obd_device *obd = class_exp2obd(exp);
2354
2355         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2356         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2357         return 0;
2358 }
2359
2360 /* find any ldlm lock of the inode in osc
2361  * return 0    not find
2362  *        1    find one
2363  *      < 0    error */
2364 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2365                            ldlm_iterator_t replace, void *data)
2366 {
2367         struct ldlm_res_id res_id;
2368         struct obd_device *obd = class_exp2obd(exp);
2369         int rc = 0;
2370
2371         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2372         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2373         if (rc == LDLM_ITER_STOP)
2374                 return(1);
2375         if (rc == LDLM_ITER_CONTINUE)
2376                 return(0);
2377         return(rc);
2378 }
2379
2380 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2381                             obd_enqueue_update_f upcall, void *cookie,
2382                             __u64 *flags, int agl, int rc)
2383 {
2384         int intent = *flags & LDLM_FL_HAS_INTENT;
2385         ENTRY;
2386
2387         if (intent) {
2388                 /* The request was created before ldlm_cli_enqueue call. */
2389                 if (rc == ELDLM_LOCK_ABORTED) {
2390                         struct ldlm_reply *rep;
2391                         rep = req_capsule_server_get(&req->rq_pill,
2392                                                      &RMF_DLM_REP);
2393
2394                         LASSERT(rep != NULL);
2395                         rep->lock_policy_res1 =
2396                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2397                         if (rep->lock_policy_res1)
2398                                 rc = rep->lock_policy_res1;
2399                 }
2400         }
2401
2402         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2403             (rc == 0)) {
2404                 *flags |= LDLM_FL_LVB_READY;
2405                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2406                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2407         }
2408
2409         /* Call the update callback. */
2410         rc = (*upcall)(cookie, rc);
2411         RETURN(rc);
2412 }
2413
2414 static int osc_enqueue_interpret(const struct lu_env *env,
2415                                  struct ptlrpc_request *req,
2416                                  struct osc_enqueue_args *aa, int rc)
2417 {
2418         struct ldlm_lock *lock;
2419         struct lustre_handle handle;
2420         __u32 mode;
2421         struct ost_lvb *lvb;
2422         __u32 lvb_len;
2423         __u64 *flags = aa->oa_flags;
2424
2425         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2426          * might be freed anytime after lock upcall has been called. */
2427         lustre_handle_copy(&handle, aa->oa_lockh);
2428         mode = aa->oa_ei->ei_mode;
2429
2430         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2431          * be valid. */
2432         lock = ldlm_handle2lock(&handle);
2433
2434         /* Take an additional reference so that a blocking AST that
2435          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2436          * to arrive after an upcall has been executed by
2437          * osc_enqueue_fini(). */
2438         ldlm_lock_addref(&handle, mode);
2439
2440         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2441         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2442
2443         /* Let CP AST to grant the lock first. */
2444         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2445
2446         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2447                 lvb = NULL;
2448                 lvb_len = 0;
2449         } else {
2450                 lvb = aa->oa_lvb;
2451                 lvb_len = sizeof(*aa->oa_lvb);
2452         }
2453
2454         /* Complete obtaining the lock procedure. */
2455         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2456                                    mode, flags, lvb, lvb_len, &handle, rc);
2457         /* Complete osc stuff. */
2458         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2459                               flags, aa->oa_agl, rc);
2460
2461         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2462
2463         /* Release the lock for async request. */
2464         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2465                 /*
2466                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2467                  * not already released by
2468                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2469                  */
2470                 ldlm_lock_decref(&handle, mode);
2471
2472         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2473                  aa->oa_lockh, req, aa);
2474         ldlm_lock_decref(&handle, mode);
2475         LDLM_LOCK_PUT(lock);
2476         return rc;
2477 }
2478
2479 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2480                         struct lov_oinfo *loi, __u64 flags,
2481                         struct ost_lvb *lvb, __u32 mode, int rc)
2482 {
2483         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2484
2485         if (rc == ELDLM_OK) {
2486                 __u64 tmp;
2487
2488                 LASSERT(lock != NULL);
2489                 loi->loi_lvb = *lvb;
2490                 tmp = loi->loi_lvb.lvb_size;
2491                 /* Extend KMS up to the end of this lock and no further
2492                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2493                 if (tmp > lock->l_policy_data.l_extent.end)
2494                         tmp = lock->l_policy_data.l_extent.end + 1;
2495                 if (tmp >= loi->loi_kms) {
2496                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2497                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2498                         loi_kms_set(loi, tmp);
2499                 } else {
2500                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2501                                    LPU64"; leaving kms="LPU64", end="LPU64,
2502                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2503                                    lock->l_policy_data.l_extent.end);
2504                 }
2505                 ldlm_lock_allow_match(lock);
2506         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2507                 LASSERT(lock != NULL);
2508                 loi->loi_lvb = *lvb;
2509                 ldlm_lock_allow_match(lock);
2510                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2511                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2512                 rc = ELDLM_OK;
2513         }
2514
2515         if (lock != NULL) {
2516                 if (rc != ELDLM_OK)
2517                         ldlm_lock_fail_match(lock);
2518
2519                 LDLM_LOCK_PUT(lock);
2520         }
2521 }
2522 EXPORT_SYMBOL(osc_update_enqueue);
2523
2524 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2525
2526 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2527  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2528  * other synchronous requests, however keeping some locks and trying to obtain
2529  * others may take a considerable amount of time in a case of ost failure; and
2530  * when other sync requests do not get released lock from a client, the client
2531  * is excluded from the cluster -- such scenarious make the life difficult, so
2532  * release locks just after they are obtained. */
2533 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2534                      __u64 *flags, ldlm_policy_data_t *policy,
2535                      struct ost_lvb *lvb, int kms_valid,
2536                      obd_enqueue_update_f upcall, void *cookie,
2537                      struct ldlm_enqueue_info *einfo,
2538                      struct lustre_handle *lockh,
2539                      struct ptlrpc_request_set *rqset, int async, int agl)
2540 {
2541         struct obd_device *obd = exp->exp_obd;
2542         struct ptlrpc_request *req = NULL;
2543         int intent = *flags & LDLM_FL_HAS_INTENT;
2544         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2545         ldlm_mode_t mode;
2546         int rc;
2547         ENTRY;
2548
2549         /* Filesystem lock extents are extended to page boundaries so that
2550          * dealing with the page cache is a little smoother.  */
2551         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2552         policy->l_extent.end |= ~CFS_PAGE_MASK;
2553
2554         /*
2555          * kms is not valid when either object is completely fresh (so that no
2556          * locks are cached), or object was evicted. In the latter case cached
2557          * lock cannot be used, because it would prime inode state with
2558          * potentially stale LVB.
2559          */
2560         if (!kms_valid)
2561                 goto no_match;
2562
2563         /* Next, search for already existing extent locks that will cover us */
2564         /* If we're trying to read, we also search for an existing PW lock.  The
2565          * VFS and page cache already protect us locally, so lots of readers/
2566          * writers can share a single PW lock.
2567          *
2568          * There are problems with conversion deadlocks, so instead of
2569          * converting a read lock to a write lock, we'll just enqueue a new
2570          * one.
2571          *
2572          * At some point we should cancel the read lock instead of making them
2573          * send us a blocking callback, but there are problems with canceling
2574          * locks out from other users right now, too. */
2575         mode = einfo->ei_mode;
2576         if (einfo->ei_mode == LCK_PR)
2577                 mode |= LCK_PW;
2578         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2579                                einfo->ei_type, policy, mode, lockh, 0);
2580         if (mode) {
2581                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2582
2583                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2584                         /* For AGL, if enqueue RPC is sent but the lock is not
2585                          * granted, then skip to process this strpe.
2586                          * Return -ECANCELED to tell the caller. */
2587                         ldlm_lock_decref(lockh, mode);
2588                         LDLM_LOCK_PUT(matched);
2589                         RETURN(-ECANCELED);
2590                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2591                         *flags |= LDLM_FL_LVB_READY;
2592                         /* addref the lock only if not async requests and PW
2593                          * lock is matched whereas we asked for PR. */
2594                         if (!rqset && einfo->ei_mode != mode)
2595                                 ldlm_lock_addref(lockh, LCK_PR);
2596                         if (intent) {
2597                                 /* I would like to be able to ASSERT here that
2598                                  * rss <= kms, but I can't, for reasons which
2599                                  * are explained in lov_enqueue() */
2600                         }
2601
2602                         /* We already have a lock, and it's referenced.
2603                          *
2604                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2605                          * AGL upcall may change it to CLS_HELD directly. */
2606                         (*upcall)(cookie, ELDLM_OK);
2607
2608                         if (einfo->ei_mode != mode)
2609                                 ldlm_lock_decref(lockh, LCK_PW);
2610                         else if (rqset)
2611                                 /* For async requests, decref the lock. */
2612                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2613                         LDLM_LOCK_PUT(matched);
2614                         RETURN(ELDLM_OK);
2615                 } else {
2616                         ldlm_lock_decref(lockh, mode);
2617                         LDLM_LOCK_PUT(matched);
2618                 }
2619         }
2620
2621  no_match:
2622         if (intent) {
2623                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2624                                            &RQF_LDLM_ENQUEUE_LVB);
2625                 if (req == NULL)
2626                         RETURN(-ENOMEM);
2627
2628                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2629                 if (rc < 0) {
2630                         ptlrpc_request_free(req);
2631                         RETURN(rc);
2632                 }
2633
2634                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2635                                      sizeof *lvb);
2636                 ptlrpc_request_set_replen(req);
2637         }
2638
2639         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2640         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2641
2642         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2643                               sizeof(*lvb), LVB_T_OST, lockh, async);
2644         if (rqset) {
2645                 if (!rc) {
2646                         struct osc_enqueue_args *aa;
2647                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2648                         aa = ptlrpc_req_async_args(req);
2649                         aa->oa_ei = einfo;
2650                         aa->oa_exp = exp;
2651                         aa->oa_flags  = flags;
2652                         aa->oa_upcall = upcall;
2653                         aa->oa_cookie = cookie;
2654                         aa->oa_lvb    = lvb;
2655                         aa->oa_lockh  = lockh;
2656                         aa->oa_agl    = !!agl;
2657
2658                         req->rq_interpret_reply =
2659                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2660                         if (rqset == PTLRPCD_SET)
2661                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2662                         else
2663                                 ptlrpc_set_add_req(rqset, req);
2664                 } else if (intent) {
2665                         ptlrpc_req_finished(req);
2666                 }
2667                 RETURN(rc);
2668         }
2669
2670         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2671         if (intent)
2672                 ptlrpc_req_finished(req);
2673
2674         RETURN(rc);
2675 }
2676
2677 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2678                        struct ldlm_enqueue_info *einfo,
2679                        struct ptlrpc_request_set *rqset)
2680 {
2681         struct ldlm_res_id res_id;
2682         int rc;
2683         ENTRY;
2684
2685         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2686         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2687                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2688                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2689                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2690                               rqset, rqset != NULL, 0);
2691         RETURN(rc);
2692 }
2693
2694 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2695                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2696                    __u64 *flags, void *data, struct lustre_handle *lockh,
2697                    int unref)
2698 {
2699         struct obd_device *obd = exp->exp_obd;
2700         __u64 lflags = *flags;
2701         ldlm_mode_t rc;
2702         ENTRY;
2703
2704         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2705                 RETURN(-EIO);
2706
2707         /* Filesystem lock extents are extended to page boundaries so that
2708          * dealing with the page cache is a little smoother */
2709         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2710         policy->l_extent.end |= ~CFS_PAGE_MASK;
2711
2712         /* Next, search for already existing extent locks that will cover us */
2713         /* If we're trying to read, we also search for an existing PW lock.  The
2714          * VFS and page cache already protect us locally, so lots of readers/
2715          * writers can share a single PW lock. */
2716         rc = mode;
2717         if (mode == LCK_PR)
2718                 rc |= LCK_PW;
2719         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2720                              res_id, type, policy, rc, lockh, unref);
2721         if (rc) {
2722                 if (data != NULL) {
2723                         if (!osc_set_data_with_check(lockh, data)) {
2724                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2725                                         ldlm_lock_decref(lockh, rc);
2726                                 RETURN(0);
2727                         }
2728                 }
2729                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2730                         ldlm_lock_addref(lockh, LCK_PR);
2731                         ldlm_lock_decref(lockh, LCK_PW);
2732                 }
2733                 RETURN(rc);
2734         }
2735         RETURN(rc);
2736 }
2737
2738 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2739 {
2740         ENTRY;
2741
2742         if (unlikely(mode == LCK_GROUP))
2743                 ldlm_lock_decref_and_cancel(lockh, mode);
2744         else
2745                 ldlm_lock_decref(lockh, mode);
2746
2747         RETURN(0);
2748 }
2749
2750 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2751                       __u32 mode, struct lustre_handle *lockh)
2752 {
2753         ENTRY;
2754         RETURN(osc_cancel_base(lockh, mode));
2755 }
2756
2757 static int osc_cancel_unused(struct obd_export *exp,
2758                              struct lov_stripe_md *lsm,
2759                              ldlm_cancel_flags_t flags,
2760                              void *opaque)
2761 {
2762         struct obd_device *obd = class_exp2obd(exp);
2763         struct ldlm_res_id res_id, *resp = NULL;
2764
2765         if (lsm != NULL) {
2766                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2767                 resp = &res_id;
2768         }
2769
2770         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2771 }
2772
2773 static int osc_statfs_interpret(const struct lu_env *env,
2774                                 struct ptlrpc_request *req,
2775                                 struct osc_async_args *aa, int rc)
2776 {
2777         struct obd_statfs *msfs;
2778         ENTRY;
2779
2780         if (rc == -EBADR)
2781                 /* The request has in fact never been sent
2782                  * due to issues at a higher level (LOV).
2783                  * Exit immediately since the caller is
2784                  * aware of the problem and takes care
2785                  * of the clean up */
2786                  RETURN(rc);
2787
2788         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2789             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2790                 GOTO(out, rc = 0);
2791
2792         if (rc != 0)
2793                 GOTO(out, rc);
2794
2795         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2796         if (msfs == NULL) {
2797                 GOTO(out, rc = -EPROTO);
2798         }
2799
2800         *aa->aa_oi->oi_osfs = *msfs;
2801 out:
2802         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2803         RETURN(rc);
2804 }
2805
2806 static int osc_statfs_async(struct obd_export *exp,
2807                             struct obd_info *oinfo, __u64 max_age,
2808                             struct ptlrpc_request_set *rqset)
2809 {
2810         struct obd_device     *obd = class_exp2obd(exp);
2811         struct ptlrpc_request *req;
2812         struct osc_async_args *aa;
2813         int                    rc;
2814         ENTRY;
2815
2816         /* We could possibly pass max_age in the request (as an absolute
2817          * timestamp or a "seconds.usec ago") so the target can avoid doing
2818          * extra calls into the filesystem if that isn't necessary (e.g.
2819          * during mount that would help a bit).  Having relative timestamps
2820          * is not so great if request processing is slow, while absolute
2821          * timestamps are not ideal because they need time synchronization. */
2822         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2823         if (req == NULL)
2824                 RETURN(-ENOMEM);
2825
2826         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2827         if (rc) {
2828                 ptlrpc_request_free(req);
2829                 RETURN(rc);
2830         }
2831         ptlrpc_request_set_replen(req);
2832         req->rq_request_portal = OST_CREATE_PORTAL;
2833         ptlrpc_at_set_req_timeout(req);
2834
2835         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2836                 /* procfs requests not want stat in wait for avoid deadlock */
2837                 req->rq_no_resend = 1;
2838                 req->rq_no_delay = 1;
2839         }
2840
2841         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2842         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2843         aa = ptlrpc_req_async_args(req);
2844         aa->aa_oi = oinfo;
2845
2846         ptlrpc_set_add_req(rqset, req);
2847         RETURN(0);
2848 }
2849
2850 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2851                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2852 {
2853         struct obd_device     *obd = class_exp2obd(exp);
2854         struct obd_statfs     *msfs;
2855         struct ptlrpc_request *req;
2856         struct obd_import     *imp = NULL;
2857         int rc;
2858         ENTRY;
2859
2860         /*Since the request might also come from lprocfs, so we need
2861          *sync this with client_disconnect_export Bug15684*/
2862         down_read(&obd->u.cli.cl_sem);
2863         if (obd->u.cli.cl_import)
2864                 imp = class_import_get(obd->u.cli.cl_import);
2865         up_read(&obd->u.cli.cl_sem);
2866         if (!imp)
2867                 RETURN(-ENODEV);
2868
2869         /* We could possibly pass max_age in the request (as an absolute
2870          * timestamp or a "seconds.usec ago") so the target can avoid doing
2871          * extra calls into the filesystem if that isn't necessary (e.g.
2872          * during mount that would help a bit).  Having relative timestamps
2873          * is not so great if request processing is slow, while absolute
2874          * timestamps are not ideal because they need time synchronization. */
2875         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2876
2877         class_import_put(imp);
2878
2879         if (req == NULL)
2880                 RETURN(-ENOMEM);
2881
2882         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2883         if (rc) {
2884                 ptlrpc_request_free(req);
2885                 RETURN(rc);
2886         }
2887         ptlrpc_request_set_replen(req);
2888         req->rq_request_portal = OST_CREATE_PORTAL;
2889         ptlrpc_at_set_req_timeout(req);
2890
2891         if (flags & OBD_STATFS_NODELAY) {
2892                 /* procfs requests not want stat in wait for avoid deadlock */
2893                 req->rq_no_resend = 1;
2894                 req->rq_no_delay = 1;
2895         }
2896
2897         rc = ptlrpc_queue_wait(req);
2898         if (rc)
2899                 GOTO(out, rc);
2900
2901         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2902         if (msfs == NULL) {
2903                 GOTO(out, rc = -EPROTO);
2904         }
2905
2906         *osfs = *msfs;
2907
2908         EXIT;
2909  out:
2910         ptlrpc_req_finished(req);
2911         return rc;
2912 }
2913
2914 /* Retrieve object striping information.
2915  *
2916  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2917  * the maximum number of OST indices which will fit in the user buffer.
2918  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2919  */
2920 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2921 {
2922         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2923         struct lov_user_md_v3 lum, *lumk;
2924         struct lov_user_ost_data_v1 *lmm_objects;
2925         int rc = 0, lum_size;
2926         ENTRY;
2927
2928         if (!lsm)
2929                 RETURN(-ENODATA);
2930
2931         /* we only need the header part from user space to get lmm_magic and
2932          * lmm_stripe_count, (the header part is common to v1 and v3) */
2933         lum_size = sizeof(struct lov_user_md_v1);
2934         if (copy_from_user(&lum, lump, lum_size))
2935                 RETURN(-EFAULT);
2936
2937         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2938             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2939                 RETURN(-EINVAL);
2940
2941         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2942         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2943         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2944         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2945
2946         /* we can use lov_mds_md_size() to compute lum_size
2947          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2948         if (lum.lmm_stripe_count > 0) {
2949                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2950                 OBD_ALLOC(lumk, lum_size);
2951                 if (!lumk)
2952                         RETURN(-ENOMEM);
2953
2954                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2955                         lmm_objects =
2956                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2957                 else
2958                         lmm_objects = &(lumk->lmm_objects[0]);
2959                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2960         } else {
2961                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2962                 lumk = &lum;
2963         }
2964
2965         lumk->lmm_oi = lsm->lsm_oi;
2966         lumk->lmm_stripe_count = 1;
2967
2968         if (copy_to_user(lump, lumk, lum_size))
2969                 rc = -EFAULT;
2970
2971         if (lumk != &lum)
2972                 OBD_FREE(lumk, lum_size);
2973
2974         RETURN(rc);
2975 }
2976
2977
2978 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2979                          void *karg, void *uarg)
2980 {
2981         struct obd_device *obd = exp->exp_obd;
2982         struct obd_ioctl_data *data = karg;
2983         int err = 0;
2984         ENTRY;
2985
2986         if (!try_module_get(THIS_MODULE)) {
2987                 CERROR("Can't get module. Is it alive?");
2988                 return -EINVAL;
2989         }
2990         switch (cmd) {
2991         case OBD_IOC_LOV_GET_CONFIG: {
2992                 char *buf;
2993                 struct lov_desc *desc;
2994                 struct obd_uuid uuid;
2995
2996                 buf = NULL;
2997                 len = 0;
2998                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2999                         GOTO(out, err = -EINVAL);
3000
3001                 data = (struct obd_ioctl_data *)buf;
3002
3003                 if (sizeof(*desc) > data->ioc_inllen1) {
3004                         obd_ioctl_freedata(buf, len);
3005                         GOTO(out, err = -EINVAL);
3006                 }
3007
3008                 if (data->ioc_inllen2 < sizeof(uuid)) {
3009                         obd_ioctl_freedata(buf, len);
3010                         GOTO(out, err = -EINVAL);
3011                 }
3012
3013                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3014                 desc->ld_tgt_count = 1;
3015                 desc->ld_active_tgt_count = 1;
3016                 desc->ld_default_stripe_count = 1;
3017                 desc->ld_default_stripe_size = 0;
3018                 desc->ld_default_stripe_offset = 0;
3019                 desc->ld_pattern = 0;
3020                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3021
3022                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3023
3024                 err = copy_to_user((void *)uarg, buf, len);
3025                 if (err)
3026                         err = -EFAULT;
3027                 obd_ioctl_freedata(buf, len);
3028                 GOTO(out, err);
3029         }
3030         case LL_IOC_LOV_SETSTRIPE:
3031                 err = obd_alloc_memmd(exp, karg);
3032                 if (err > 0)
3033                         err = 0;
3034                 GOTO(out, err);
3035         case LL_IOC_LOV_GETSTRIPE:
3036                 err = osc_getstripe(karg, uarg);
3037                 GOTO(out, err);
3038         case OBD_IOC_CLIENT_RECOVER:
3039                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3040                                             data->ioc_inlbuf1, 0);
3041                 if (err > 0)
3042                         err = 0;
3043                 GOTO(out, err);
3044         case IOC_OSC_SET_ACTIVE:
3045                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3046                                                data->ioc_offset);
3047                 GOTO(out, err);
3048         case OBD_IOC_POLL_QUOTACHECK:
3049                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3050                 GOTO(out, err);
3051         case OBD_IOC_PING_TARGET:
3052                 err = ptlrpc_obd_ping(obd);
3053                 GOTO(out, err);
3054         default:
3055                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3056                        cmd, current_comm());
3057                 GOTO(out, err = -ENOTTY);
3058         }
3059 out:
3060         module_put(THIS_MODULE);
3061         return err;
3062 }
3063
3064 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3065                         obd_count keylen, void *key, __u32 *vallen, void *val,
3066                         struct lov_stripe_md *lsm)
3067 {
3068         ENTRY;
3069         if (!vallen || !val)
3070                 RETURN(-EFAULT);
3071
3072         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3073                 __u32 *stripe = val;
3074                 *vallen = sizeof(*stripe);
3075                 *stripe = 0;
3076                 RETURN(0);
3077         } else if (KEY_IS(KEY_LAST_ID)) {
3078                 struct ptlrpc_request *req;
3079                 obd_id                *reply;
3080                 char                  *tmp;
3081                 int                    rc;
3082
3083                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3084                                            &RQF_OST_GET_INFO_LAST_ID);
3085                 if (req == NULL)
3086                         RETURN(-ENOMEM);
3087
3088                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3089                                      RCL_CLIENT, keylen);
3090                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3091                 if (rc) {
3092                         ptlrpc_request_free(req);
3093                         RETURN(rc);
3094                 }
3095
3096                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3097                 memcpy(tmp, key, keylen);
3098
3099                 req->rq_no_delay = req->rq_no_resend = 1;
3100                 ptlrpc_request_set_replen(req);
3101                 rc = ptlrpc_queue_wait(req);
3102                 if (rc)
3103                         GOTO(out, rc);
3104
3105                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3106                 if (reply == NULL)
3107                         GOTO(out, rc = -EPROTO);
3108
3109                 *((obd_id *)val) = *reply;
3110         out:
3111                 ptlrpc_req_finished(req);
3112                 RETURN(rc);
3113         } else if (KEY_IS(KEY_FIEMAP)) {
3114                 struct ll_fiemap_info_key *fm_key =
3115                                 (struct ll_fiemap_info_key *)key;
3116                 struct ldlm_res_id       res_id;
3117                 ldlm_policy_data_t       policy;
3118                 struct lustre_handle     lockh;
3119                 ldlm_mode_t              mode = 0;
3120                 struct ptlrpc_request   *req;
3121                 struct ll_user_fiemap   *reply;
3122                 char                    *tmp;
3123                 int                      rc;
3124
3125                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3126                         goto skip_locking;
3127
3128                 policy.l_extent.start = fm_key->fiemap.fm_start &
3129                                                 CFS_PAGE_MASK;
3130
3131                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3132                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3133                         policy.l_extent.end = OBD_OBJECT_EOF;
3134                 else
3135                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3136                                 fm_key->fiemap.fm_length +
3137                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3138
3139                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3140                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3141                                        LDLM_FL_BLOCK_GRANTED |
3142                                        LDLM_FL_LVB_READY,
3143                                        &res_id, LDLM_EXTENT, &policy,
3144                                        LCK_PR | LCK_PW, &lockh, 0);
3145                 if (mode) { /* lock is cached on client */
3146                         if (mode != LCK_PR) {
3147                                 ldlm_lock_addref(&lockh, LCK_PR);
3148                                 ldlm_lock_decref(&lockh, LCK_PW);
3149                         }
3150                 } else { /* no cached lock, needs acquire lock on server side */
3151                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3152                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3153                 }
3154
3155 skip_locking:
3156                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3157                                            &RQF_OST_GET_INFO_FIEMAP);
3158                 if (req == NULL)
3159                         GOTO(drop_lock, rc = -ENOMEM);
3160
3161                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3162                                      RCL_CLIENT, keylen);
3163                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3164                                      RCL_CLIENT, *vallen);
3165                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3166                                      RCL_SERVER, *vallen);
3167
3168                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3169                 if (rc) {
3170                         ptlrpc_request_free(req);
3171                         GOTO(drop_lock, rc);
3172                 }
3173
3174                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3175                 memcpy(tmp, key, keylen);
3176                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3177                 memcpy(tmp, val, *vallen);
3178
3179                 ptlrpc_request_set_replen(req);
3180                 rc = ptlrpc_queue_wait(req);
3181                 if (rc)
3182                         GOTO(fini_req, rc);
3183
3184                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3185                 if (reply == NULL)
3186                         GOTO(fini_req, rc = -EPROTO);
3187
3188                 memcpy(val, reply, *vallen);
3189 fini_req:
3190                 ptlrpc_req_finished(req);
3191 drop_lock:
3192                 if (mode)
3193                         ldlm_lock_decref(&lockh, LCK_PR);
3194                 RETURN(rc);
3195         }
3196
3197         RETURN(-EINVAL);
3198 }
3199
3200 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3201                               obd_count keylen, void *key, obd_count vallen,
3202                               void *val, struct ptlrpc_request_set *set)
3203 {
3204         struct ptlrpc_request *req;
3205         struct obd_device     *obd = exp->exp_obd;
3206         struct obd_import     *imp = class_exp2cliimp(exp);
3207         char                  *tmp;
3208         int                    rc;
3209         ENTRY;
3210
3211         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3212
3213         if (KEY_IS(KEY_CHECKSUM)) {
3214                 if (vallen != sizeof(int))
3215                         RETURN(-EINVAL);
3216                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3217                 RETURN(0);
3218         }
3219
3220         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3221                 sptlrpc_conf_client_adapt(obd);
3222                 RETURN(0);
3223         }
3224
3225         if (KEY_IS(KEY_FLUSH_CTX)) {
3226                 sptlrpc_import_flush_my_ctx(imp);
3227                 RETURN(0);
3228         }
3229
3230         if (KEY_IS(KEY_CACHE_SET)) {
3231                 struct client_obd *cli = &obd->u.cli;
3232
3233                 LASSERT(cli->cl_cache == NULL); /* only once */
3234                 cli->cl_cache = (struct cl_client_cache *)val;
3235                 atomic_inc(&cli->cl_cache->ccc_users);
3236                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3237
3238                 /* add this osc into entity list */
3239                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3240                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3241                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3242                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3243
3244                 RETURN(0);
3245         }
3246
3247         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3248                 struct client_obd *cli = &obd->u.cli;
3249                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3250                 int target = *(int *)val;
3251
3252                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3253                 *(int *)val -= nr;
3254                 RETURN(0);
3255         }
3256
3257         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3258                 RETURN(-EINVAL);
3259
3260         /* We pass all other commands directly to OST. Since nobody calls osc
3261            methods directly and everybody is supposed to go through LOV, we
3262            assume lov checked invalid values for us.
3263            The only recognised values so far are evict_by_nid and mds_conn.
3264            Even if something bad goes through, we'd get a -EINVAL from OST
3265            anyway. */
3266
3267         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3268                                                 &RQF_OST_SET_GRANT_INFO :
3269                                                 &RQF_OBD_SET_INFO);
3270         if (req == NULL)
3271                 RETURN(-ENOMEM);
3272
3273         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3274                              RCL_CLIENT, keylen);
3275         if (!KEY_IS(KEY_GRANT_SHRINK))
3276                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3277                                      RCL_CLIENT, vallen);
3278         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3279         if (rc) {
3280                 ptlrpc_request_free(req);
3281                 RETURN(rc);
3282         }
3283
3284         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3285         memcpy(tmp, key, keylen);
3286         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3287                                                         &RMF_OST_BODY :
3288                                                         &RMF_SETINFO_VAL);
3289         memcpy(tmp, val, vallen);
3290
3291         if (KEY_IS(KEY_GRANT_SHRINK)) {
3292                 struct osc_grant_args *aa;
3293                 struct obdo *oa;
3294
3295                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3296                 aa = ptlrpc_req_async_args(req);
3297                 OBDO_ALLOC(oa);
3298                 if (!oa) {
3299                         ptlrpc_req_finished(req);
3300                         RETURN(-ENOMEM);
3301                 }
3302                 *oa = ((struct ost_body *)val)->oa;
3303                 aa->aa_oa = oa;
3304                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3305         }
3306
3307         ptlrpc_request_set_replen(req);
3308         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3309                 LASSERT(set != NULL);
3310                 ptlrpc_set_add_req(set, req);
3311                 ptlrpc_check_set(NULL, set);
3312         } else
3313                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3314
3315         RETURN(0);
3316 }
3317
3318
3319 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3320                          struct obd_device *disk_obd, int *index)
3321 {
3322         /* this code is not supposed to be used with LOD/OSP
3323          * to be removed soon */
3324         LBUG();
3325         return 0;
3326 }
3327
3328 static int osc_llog_finish(struct obd_device *obd, int count)
3329 {
3330         struct llog_ctxt *ctxt;
3331
3332         ENTRY;
3333
3334         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3335         if (ctxt) {
3336                 llog_cat_close(NULL, ctxt->loc_handle);
3337                 llog_cleanup(NULL, ctxt);
3338         }
3339
3340         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3341         if (ctxt)
3342                 llog_cleanup(NULL, ctxt);
3343         RETURN(0);
3344 }
3345
3346 static int osc_reconnect(const struct lu_env *env,
3347                          struct obd_export *exp, struct obd_device *obd,
3348                          struct obd_uuid *cluuid,
3349                          struct obd_connect_data *data,
3350                          void *localdata)
3351 {
3352         struct client_obd *cli = &obd->u.cli;
3353
3354         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3355                 long lost_grant;
3356
3357                 client_obd_list_lock(&cli->cl_loi_list_lock);
3358                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3359                                 2 * cli_brw_size(obd);
3360                 lost_grant = cli->cl_lost_grant;
3361                 cli->cl_lost_grant = 0;
3362                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3363
3364                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3365                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3366                        data->ocd_version, data->ocd_grant, lost_grant);
3367         }
3368
3369         RETURN(0);
3370 }
3371
3372 static int osc_disconnect(struct obd_export *exp)
3373 {
3374         struct obd_device *obd = class_exp2obd(exp);
3375         struct llog_ctxt  *ctxt;
3376         int rc;
3377
3378         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3379         if (ctxt) {
3380                 if (obd->u.cli.cl_conn_count == 1) {
3381                         /* Flush any remaining cancel messages out to the
3382                          * target */
3383                         llog_sync(ctxt, exp, 0);
3384                 }
3385                 llog_ctxt_put(ctxt);
3386         } else {
3387                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3388                        obd);
3389         }
3390
3391         rc = client_disconnect_export(exp);
3392         /**
3393          * Initially we put del_shrink_grant before disconnect_export, but it
3394          * causes the following problem if setup (connect) and cleanup
3395          * (disconnect) are tangled together.
3396          *      connect p1                     disconnect p2
3397          *   ptlrpc_connect_import
3398          *     ...............               class_manual_cleanup
3399          *                                     osc_disconnect
3400          *                                     del_shrink_grant
3401          *   ptlrpc_connect_interrupt
3402          *     init_grant_shrink
3403          *   add this client to shrink list
3404          *                                      cleanup_osc
3405          * Bang! pinger trigger the shrink.
3406          * So the osc should be disconnected from the shrink list, after we
3407          * are sure the import has been destroyed. BUG18662
3408          */
3409         if (obd->u.cli.cl_import == NULL)
3410                 osc_del_shrink_grant(&obd->u.cli);
3411         return rc;
3412 }
3413
3414 static int osc_import_event(struct obd_device *obd,
3415                             struct obd_import *imp,
3416                             enum obd_import_event event)
3417 {
3418         struct client_obd *cli;
3419         int rc = 0;
3420
3421         ENTRY;
3422         LASSERT(imp->imp_obd == obd);
3423
3424         switch (event) {
3425         case IMP_EVENT_DISCON: {
3426                 cli = &obd->u.cli;
3427                 client_obd_list_lock(&cli->cl_loi_list_lock);
3428                 cli->cl_avail_grant = 0;
3429                 cli->cl_lost_grant = 0;
3430                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3431                 break;
3432         }
3433         case IMP_EVENT_INACTIVE: {
3434                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3435                 break;
3436         }
3437         case IMP_EVENT_INVALIDATE: {
3438                 struct ldlm_namespace *ns = obd->obd_namespace;
3439                 struct lu_env         *env;
3440                 int                    refcheck;
3441
3442                 env = cl_env_get(&refcheck);
3443                 if (!IS_ERR(env)) {
3444                         /* Reset grants */
3445                         cli = &obd->u.cli;
3446                         /* all pages go to failing rpcs due to the invalid
3447                          * import */
3448                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3449
3450                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3451                         cl_env_put(env, &refcheck);
3452                 } else
3453                         rc = PTR_ERR(env);
3454                 break;
3455         }
3456         case IMP_EVENT_ACTIVE: {
3457                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3458                 break;
3459         }
3460         case IMP_EVENT_OCD: {
3461                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3462
3463                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3464                         osc_init_grant(&obd->u.cli, ocd);
3465
3466                 /* See bug 7198 */
3467                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3468                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3469
3470                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3471                 break;
3472         }
3473         case IMP_EVENT_DEACTIVATE: {
3474                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3475                 break;
3476         }
3477         case IMP_EVENT_ACTIVATE: {
3478                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3479                 break;
3480         }
3481         default:
3482                 CERROR("Unknown import event %d\n", event);
3483                 LBUG();
3484         }
3485         RETURN(rc);
3486 }
3487
3488 /**
3489  * Determine whether the lock can be canceled before replaying the lock
3490  * during recovery, see bug16774 for detailed information.
3491  *
3492  * \retval zero the lock can't be canceled
3493  * \retval other ok to cancel
3494  */
3495 static int osc_cancel_weight(struct ldlm_lock *lock)
3496 {
3497         /*
3498          * Cancel all unused and granted extent lock.
3499          */
3500         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3501             lock->l_granted_mode == lock->l_req_mode &&
3502             osc_ldlm_weigh_ast(lock) == 0)
3503                 RETURN(1);
3504
3505         RETURN(0);
3506 }
3507
3508 static int brw_queue_work(const struct lu_env *env, void *data)
3509 {
3510         struct client_obd *cli = data;
3511
3512         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3513
3514         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3515         RETURN(0);
3516 }
3517
3518 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3519 {
3520         struct client_obd          *cli = &obd->u.cli;
3521         void                       *handler;
3522         int                        rc;
3523         ENTRY;
3524
3525         rc = ptlrpcd_addref();
3526         if (rc)
3527                 RETURN(rc);
3528
3529         rc = client_obd_setup(obd, lcfg);
3530         if (rc)
3531                 GOTO(out_ptlrpcd, rc);
3532
3533         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3534         if (IS_ERR(handler))
3535                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3536         cli->cl_writeback_work = handler;
3537
3538         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3539         if (IS_ERR(handler))
3540                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3541         cli->cl_lru_work = handler;
3542
3543         rc = osc_quota_setup(obd);
3544         if (rc)
3545                 GOTO(out_ptlrpcd_work, rc);
3546
3547         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3548 #ifdef LPROCFS
3549         obd->obd_vars = lprocfs_osc_obd_vars;
3550 #endif
3551         if (lprocfs_seq_obd_setup(obd) == 0) {
3552                 lproc_osc_attach_seqstat(obd);
3553                 sptlrpc_lprocfs_cliobd_attach(obd);
3554                 ptlrpc_lprocfs_register_obd(obd);
3555         }
3556
3557         /* We need to allocate a few requests more, because
3558          * brw_interpret tries to create new requests before freeing
3559          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3560          * reserved, but I'm afraid that might be too much wasted RAM
3561          * in fact, so 2 is just my guess and still should work. */
3562         cli->cl_import->imp_rq_pool =
3563                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3564                                     OST_MAXREQSIZE,
3565                                     ptlrpc_add_rqs_to_pool);
3566
3567         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3568         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3569         RETURN(rc);
3570
3571 out_ptlrpcd_work:
3572         if (cli->cl_writeback_work != NULL) {
3573                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3574                 cli->cl_writeback_work = NULL;
3575         }
3576         if (cli->cl_lru_work != NULL) {
3577                 ptlrpcd_destroy_work(cli->cl_lru_work);
3578                 cli->cl_lru_work = NULL;
3579         }
3580 out_client_setup:
3581         client_obd_cleanup(obd);
3582 out_ptlrpcd:
3583         ptlrpcd_decref();
3584         RETURN(rc);
3585 }
3586
3587 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3588 {
3589         int rc = 0;
3590         ENTRY;
3591
3592         switch (stage) {
3593         case OBD_CLEANUP_EARLY: {
3594                 struct obd_import *imp;
3595                 imp = obd->u.cli.cl_import;
3596                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3597                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3598                 ptlrpc_deactivate_import(imp);
3599                 spin_lock(&imp->imp_lock);
3600                 imp->imp_pingable = 0;
3601                 spin_unlock(&imp->imp_lock);
3602                 break;
3603         }
3604         case OBD_CLEANUP_EXPORTS: {
3605                 struct client_obd *cli = &obd->u.cli;
3606                 /* LU-464
3607                  * for echo client, export may be on zombie list, wait for
3608                  * zombie thread to cull it, because cli.cl_import will be
3609                  * cleared in client_disconnect_export():
3610                  *   class_export_destroy() -> obd_cleanup() ->
3611                  *   echo_device_free() -> echo_client_cleanup() ->
3612                  *   obd_disconnect() -> osc_disconnect() ->
3613                  *   client_disconnect_export()
3614                  */
3615                 obd_zombie_barrier();
3616                 if (cli->cl_writeback_work) {
3617                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3618                         cli->cl_writeback_work = NULL;
3619                 }
3620                 if (cli->cl_lru_work) {
3621                         ptlrpcd_destroy_work(cli->cl_lru_work);
3622                         cli->cl_lru_work = NULL;
3623                 }
3624                 obd_cleanup_client_import(obd);
3625                 ptlrpc_lprocfs_unregister_obd(obd);
3626                 lprocfs_obd_cleanup(obd);
3627                 rc = obd_llog_finish(obd, 0);
3628                 if (rc != 0)
3629                         CERROR("failed to cleanup llogging subsystems\n");
3630                 break;
3631                 }
3632         }
3633         RETURN(rc);
3634 }
3635
3636 int osc_cleanup(struct obd_device *obd)
3637 {
3638         struct client_obd *cli = &obd->u.cli;
3639         int rc;
3640
3641         ENTRY;
3642
3643         /* lru cleanup */
3644         if (cli->cl_cache != NULL) {
3645                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3646                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3647                 cfs_list_del_init(&cli->cl_lru_osc);
3648                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3649                 cli->cl_lru_left = NULL;
3650                 atomic_dec(&cli->cl_cache->ccc_users);
3651                 cli->cl_cache = NULL;
3652         }
3653
3654         /* free memory of osc quota cache */
3655         osc_quota_cleanup(obd);
3656
3657         rc = client_obd_cleanup(obd);
3658
3659         ptlrpcd_decref();
3660         RETURN(rc);
3661 }
3662
3663 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3664 {
3665         int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
3666                                               lcfg, obd);
3667         return rc > 0 ? 0: rc;
3668 }
3669
3670 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3671 {
3672         return osc_process_config_base(obd, buf);
3673 }
3674
3675 struct obd_ops osc_obd_ops = {
3676         .o_owner                = THIS_MODULE,
3677         .o_setup                = osc_setup,
3678         .o_precleanup           = osc_precleanup,
3679         .o_cleanup              = osc_cleanup,
3680         .o_add_conn             = client_import_add_conn,
3681         .o_del_conn             = client_import_del_conn,
3682         .o_connect              = client_connect_import,
3683         .o_reconnect            = osc_reconnect,
3684         .o_disconnect           = osc_disconnect,
3685         .o_statfs               = osc_statfs,
3686         .o_statfs_async         = osc_statfs_async,
3687         .o_packmd               = osc_packmd,
3688         .o_unpackmd             = osc_unpackmd,
3689         .o_create               = osc_create,
3690         .o_destroy              = osc_destroy,
3691         .o_getattr              = osc_getattr,
3692         .o_getattr_async        = osc_getattr_async,
3693         .o_setattr              = osc_setattr,
3694         .o_setattr_async        = osc_setattr_async,
3695         .o_brw                  = osc_brw,
3696         .o_punch                = osc_punch,
3697         .o_sync                 = osc_sync,
3698         .o_enqueue              = osc_enqueue,
3699         .o_change_cbdata        = osc_change_cbdata,
3700         .o_find_cbdata          = osc_find_cbdata,
3701         .o_cancel               = osc_cancel,
3702         .o_cancel_unused        = osc_cancel_unused,
3703         .o_iocontrol            = osc_iocontrol,
3704         .o_get_info             = osc_get_info,
3705         .o_set_info_async       = osc_set_info_async,
3706         .o_import_event         = osc_import_event,
3707         .o_llog_init            = osc_llog_init,
3708         .o_llog_finish          = osc_llog_finish,
3709         .o_process_config       = osc_process_config,
3710         .o_quotactl             = osc_quotactl,
3711         .o_quotacheck           = osc_quotacheck,
3712 };
3713
3714 extern struct lu_kmem_descr osc_caches[];
3715 extern spinlock_t osc_ast_guard;
3716 extern struct lock_class_key osc_ast_guard_class;
3717
3718 int __init osc_init(void)
3719 {
3720         int rc;
3721         ENTRY;
3722
3723         /* print an address of _any_ initialized kernel symbol from this
3724          * module, to allow debugging with gdb that doesn't support data
3725          * symbols from modules.*/
3726         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3727
3728         rc = lu_kmem_init(osc_caches);
3729         if (rc)
3730                 RETURN(rc);
3731
3732         rc = class_register_type(&osc_obd_ops, NULL, NULL,
3733 #ifndef HAVE_ONLY_PROCFS_SEQ
3734                                 NULL,
3735 #endif
3736                                 LUSTRE_OSC_NAME, &osc_device_type);
3737         if (rc) {
3738                 lu_kmem_fini(osc_caches);
3739                 RETURN(rc);
3740         }
3741
3742         spin_lock_init(&osc_ast_guard);
3743         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3744
3745         RETURN(rc);
3746 }
3747
3748 #ifdef __KERNEL__
3749 static void /*__exit*/ osc_exit(void)
3750 {
3751         class_unregister_type(LUSTRE_OSC_NAME);
3752         lu_kmem_fini(osc_caches);
3753 }
3754
3755 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3756 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3757 MODULE_LICENSE("GPL");
3758
3759 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3760 #endif