4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 # include <liblustre.h>
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67 struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72 struct lov_stripe_md *lsm)
77 lmm_size = sizeof(**lmmp);
81 if (*lmmp != NULL && lsm == NULL) {
82 OBD_FREE(*lmmp, lmm_size);
85 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
90 OBD_ALLOC(*lmmp, lmm_size);
96 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103 struct lov_mds_md *lmm, int lmm_bytes)
106 struct obd_import *imp = class_exp2cliimp(exp);
110 if (lmm_bytes < sizeof(*lmm)) {
111 CERROR("%s: lov_mds_md too small: %d, need %d\n",
112 exp->exp_obd->obd_name, lmm_bytes,
116 /* XXX LOV_MAGIC etc check? */
118 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119 CERROR("%s: zero lmm_object_id: rc = %d\n",
120 exp->exp_obd->obd_name, -EINVAL);
125 lsm_size = lov_stripe_md_size(1);
129 if (*lsmp != NULL && lmm == NULL) {
130 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131 OBD_FREE(*lsmp, lsm_size);
137 OBD_ALLOC(*lsmp, lsm_size);
138 if (unlikely(*lsmp == NULL))
140 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142 OBD_FREE(*lsmp, lsm_size);
145 loi_init((*lsmp)->lsm_oinfo[0]);
146 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
151 /* XXX zero *lsmp? */
152 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
155 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164 struct ost_body *body, void *capa)
166 struct obd_capa *oc = (struct obd_capa *)capa;
167 struct lustre_capa *c;
172 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176 DEBUG_CAPA(D_SEC, c, "pack");
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180 struct obd_info *oinfo)
182 struct ost_body *body;
184 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
217 aa->aa_oi->oi_oa, &body->oa);
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265 struct obd_info *oinfo)
267 struct ptlrpc_request *req;
268 struct ost_body *body;
272 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
276 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
279 ptlrpc_request_free(req);
283 osc_pack_req_body(req, oinfo);
285 ptlrpc_request_set_replen(req);
287 rc = ptlrpc_queue_wait(req);
291 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
293 GOTO(out, rc = -EPROTO);
295 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
299 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
300 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
304 ptlrpc_req_finished(req);
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309 struct obd_info *oinfo, struct obd_trans_info *oti)
311 struct ptlrpc_request *req;
312 struct ost_body *body;
316 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
325 ptlrpc_request_free(req);
329 osc_pack_req_body(req, oinfo);
331 ptlrpc_request_set_replen(req);
333 rc = ptlrpc_queue_wait(req);
337 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
339 GOTO(out, rc = -EPROTO);
341 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
346 ptlrpc_req_finished(req);
350 static int osc_setattr_interpret(const struct lu_env *env,
351 struct ptlrpc_request *req,
352 struct osc_setattr_args *sa, int rc)
354 struct ost_body *body;
360 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362 GOTO(out, rc = -EPROTO);
364 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
367 rc = sa->sa_upcall(sa->sa_cookie, rc);
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372 struct obd_trans_info *oti,
373 obd_enqueue_update_f upcall, void *cookie,
374 struct ptlrpc_request_set *rqset)
376 struct ptlrpc_request *req;
377 struct osc_setattr_args *sa;
381 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
385 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
388 ptlrpc_request_free(req);
392 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
395 osc_pack_req_body(req, oinfo);
397 ptlrpc_request_set_replen(req);
399 /* do mds to ost setattr asynchronously */
401 /* Do not wait for response. */
402 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
404 req->rq_interpret_reply =
405 (ptlrpc_interpterer_t)osc_setattr_interpret;
407 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408 sa = ptlrpc_req_async_args(req);
409 sa->sa_oa = oinfo->oi_oa;
410 sa->sa_upcall = upcall;
411 sa->sa_cookie = cookie;
413 if (rqset == PTLRPCD_SET)
414 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
416 ptlrpc_set_add_req(rqset, req);
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423 struct obd_trans_info *oti,
424 struct ptlrpc_request_set *rqset)
426 return osc_setattr_async_base(exp, oinfo, oti,
427 oinfo->oi_cb_up, oinfo, rqset);
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431 struct lov_stripe_md **ea, struct obd_trans_info *oti)
433 struct ptlrpc_request *req;
434 struct ost_body *body;
435 struct lov_stripe_md *lsm;
444 rc = obd_alloc_memmd(exp, &lsm);
449 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
451 GOTO(out, rc = -ENOMEM);
453 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
455 ptlrpc_request_free(req);
459 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
464 ptlrpc_request_set_replen(req);
466 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467 oa->o_flags == OBD_FL_DELORPHAN) {
469 "delorphan from OST integration");
470 /* Don't resend the delorphan req */
471 req->rq_no_resend = req->rq_no_delay = 1;
474 rc = ptlrpc_queue_wait(req);
478 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
480 GOTO(out_req, rc = -EPROTO);
482 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
483 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
485 oa->o_blksize = cli_brw_size(exp->exp_obd);
486 oa->o_valid |= OBD_MD_FLBLKSZ;
488 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489 * have valid lsm_oinfo data structs, so don't go touching that.
490 * This needs to be fixed in a big way.
492 lsm->lsm_oi = oa->o_oi;
496 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
498 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499 if (!oti->oti_logcookies)
500 oti_alloc_cookies(oti, 1);
501 *oti->oti_logcookies = oa->o_lcookie;
505 CDEBUG(D_HA, "transno: "LPD64"\n",
506 lustre_msg_get_transno(req->rq_repmsg));
508 ptlrpc_req_finished(req);
511 obd_free_memmd(exp, &lsm);
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516 obd_enqueue_update_f upcall, void *cookie,
517 struct ptlrpc_request_set *rqset)
519 struct ptlrpc_request *req;
520 struct osc_setattr_args *sa;
521 struct ost_body *body;
525 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532 ptlrpc_request_free(req);
535 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536 ptlrpc_at_set_req_timeout(req);
538 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
542 osc_pack_capa(req, body, oinfo->oi_capa);
544 ptlrpc_request_set_replen(req);
546 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548 sa = ptlrpc_req_async_args(req);
549 sa->sa_oa = oinfo->oi_oa;
550 sa->sa_upcall = upcall;
551 sa->sa_cookie = cookie;
552 if (rqset == PTLRPCD_SET)
553 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555 ptlrpc_set_add_req(rqset, req);
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561 struct obd_info *oinfo, struct obd_trans_info *oti,
562 struct ptlrpc_request_set *rqset)
564 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
565 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567 return osc_punch_base(exp, oinfo,
568 oinfo->oi_cb_up, oinfo, rqset);
571 static int osc_sync_interpret(const struct lu_env *env,
572 struct ptlrpc_request *req,
575 struct osc_fsync_args *fa = arg;
576 struct ost_body *body;
582 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
584 CERROR ("can't unpack ost_body\n");
585 GOTO(out, rc = -EPROTO);
588 *fa->fa_oi->oi_oa = body->oa;
590 rc = fa->fa_upcall(fa->fa_cookie, rc);
594 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
595 obd_enqueue_update_f upcall, void *cookie,
596 struct ptlrpc_request_set *rqset)
598 struct ptlrpc_request *req;
599 struct ost_body *body;
600 struct osc_fsync_args *fa;
604 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
608 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
609 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
611 ptlrpc_request_free(req);
615 /* overload the size and blocks fields in the oa with start/end */
616 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
618 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
620 osc_pack_capa(req, body, oinfo->oi_capa);
622 ptlrpc_request_set_replen(req);
623 req->rq_interpret_reply = osc_sync_interpret;
625 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
626 fa = ptlrpc_req_async_args(req);
628 fa->fa_upcall = upcall;
629 fa->fa_cookie = cookie;
631 if (rqset == PTLRPCD_SET)
632 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
634 ptlrpc_set_add_req(rqset, req);
639 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
640 struct obd_info *oinfo, obd_size start, obd_size end,
641 struct ptlrpc_request_set *set)
646 CDEBUG(D_INFO, "oa NULL\n");
650 oinfo->oi_oa->o_size = start;
651 oinfo->oi_oa->o_blocks = end;
652 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
654 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
657 /* Find and cancel locally locks matched by @mode in the resource found by
658 * @objid. Found locks are added into @cancel list. Returns the amount of
659 * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
662 ldlm_mode_t mode, __u64 lock_flags)
664 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665 struct ldlm_res_id res_id;
666 struct ldlm_resource *res;
670 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671 * export) but disabled through procfs (flag in NS).
673 * This distinguishes from a case when ELC is not supported originally,
674 * when we still want to cancel locks in advance and just cancel them
675 * locally, without sending any RPC. */
676 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
679 ostid_build_res_name(&oa->o_oi, &res_id);
680 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
684 LDLM_RESOURCE_ADDREF(res);
685 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686 lock_flags, 0, NULL);
687 LDLM_RESOURCE_DELREF(res);
688 ldlm_resource_putref(res);
692 static int osc_destroy_interpret(const struct lu_env *env,
693 struct ptlrpc_request *req, void *data,
696 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
698 cfs_atomic_dec(&cli->cl_destroy_in_flight);
699 wake_up(&cli->cl_destroy_waitq);
703 static int osc_can_send_destroy(struct client_obd *cli)
705 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
706 cli->cl_max_rpcs_in_flight) {
707 /* The destroy request can be sent */
710 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
711 cli->cl_max_rpcs_in_flight) {
713 * The counter has been modified between the two atomic
716 wake_up(&cli->cl_destroy_waitq);
721 int osc_create(const struct lu_env *env, struct obd_export *exp,
722 struct obdo *oa, struct lov_stripe_md **ea,
723 struct obd_trans_info *oti)
730 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
732 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
733 oa->o_flags == OBD_FL_RECREATE_OBJS) {
734 RETURN(osc_real_create(exp, oa, ea, oti));
737 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
738 RETURN(osc_real_create(exp, oa, ea, oti));
740 /* we should not get here anymore */
746 /* Destroy requests can be async always on the client, and we don't even really
747 * care about the return code since the client cannot do anything at all about
749 * When the MDS is unlinking a filename, it saves the file objects into a
750 * recovery llog, and these object records are cancelled when the OST reports
751 * they were destroyed and sync'd to disk (i.e. transaction committed).
752 * If the client dies, or the OST is down when the object should be destroyed,
753 * the records are not cancelled, and when the OST reconnects to the MDS next,
754 * it will retrieve the llog unlink logs and then sends the log cancellation
755 * cookies to the MDS after committing destroy transactions. */
756 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
757 struct obdo *oa, struct lov_stripe_md *ea,
758 struct obd_trans_info *oti, struct obd_export *md_export,
761 struct client_obd *cli = &exp->exp_obd->u.cli;
762 struct ptlrpc_request *req;
763 struct ost_body *body;
764 CFS_LIST_HEAD(cancels);
769 CDEBUG(D_INFO, "oa NULL\n");
773 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
774 LDLM_FL_DISCARD_DATA);
776 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
778 ldlm_lock_list_put(&cancels, l_bl_ast, count);
782 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
783 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
786 ptlrpc_request_free(req);
790 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
791 ptlrpc_at_set_req_timeout(req);
793 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
794 oa->o_lcookie = *oti->oti_logcookies;
795 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
797 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
799 osc_pack_capa(req, body, (struct obd_capa *)capa);
800 ptlrpc_request_set_replen(req);
802 /* If osc_destory is for destroying the unlink orphan,
803 * sent from MDT to OST, which should not be blocked here,
804 * because the process might be triggered by ptlrpcd, and
805 * it is not good to block ptlrpcd thread (b=16006)*/
806 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
807 req->rq_interpret_reply = osc_destroy_interpret;
808 if (!osc_can_send_destroy(cli)) {
809 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
813 * Wait until the number of on-going destroy RPCs drops
814 * under max_rpc_in_flight
816 l_wait_event_exclusive(cli->cl_destroy_waitq,
817 osc_can_send_destroy(cli), &lwi);
821 /* Do not wait for response */
822 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
826 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
829 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
831 LASSERT(!(oa->o_valid & bits));
834 client_obd_list_lock(&cli->cl_loi_list_lock);
835 oa->o_dirty = cli->cl_dirty;
836 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
837 cli->cl_dirty_max)) {
838 CERROR("dirty %lu - %lu > dirty_max %lu\n",
839 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
841 } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) +
842 cfs_atomic_read(&obd_dirty_pages) -
843 cfs_atomic_read(&obd_dirty_transit_pages) >
844 (long)(obd_max_dirty_pages + 1))) {
845 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
846 * not covered by a lock thus they may safely race and trip
847 * this CERROR() unless we add in a small fudge factor (+1). */
848 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
849 cli->cl_import->imp_obd->obd_name,
850 cfs_atomic_read(&obd_unstable_pages),
851 cfs_atomic_read(&obd_dirty_pages),
852 cfs_atomic_read(&obd_dirty_transit_pages),
853 obd_max_dirty_pages);
855 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
856 CERROR("dirty %lu - dirty_max %lu too big???\n",
857 cli->cl_dirty, cli->cl_dirty_max);
860 long max_in_flight = (cli->cl_max_pages_per_rpc <<
862 (cli->cl_max_rpcs_in_flight + 1);
863 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
865 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
866 oa->o_dropped = cli->cl_lost_grant;
867 cli->cl_lost_grant = 0;
868 client_obd_list_unlock(&cli->cl_loi_list_lock);
869 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
870 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
874 void osc_update_next_shrink(struct client_obd *cli)
876 cli->cl_next_shrink_grant =
877 cfs_time_shift(cli->cl_grant_shrink_interval);
878 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
879 cli->cl_next_shrink_grant);
882 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
884 client_obd_list_lock(&cli->cl_loi_list_lock);
885 cli->cl_avail_grant += grant;
886 client_obd_list_unlock(&cli->cl_loi_list_lock);
889 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
891 if (body->oa.o_valid & OBD_MD_FLGRANT) {
892 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
893 __osc_update_grant(cli, body->oa.o_grant);
897 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
898 obd_count keylen, void *key, obd_count vallen,
899 void *val, struct ptlrpc_request_set *set);
901 static int osc_shrink_grant_interpret(const struct lu_env *env,
902 struct ptlrpc_request *req,
905 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
906 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
907 struct ost_body *body;
910 __osc_update_grant(cli, oa->o_grant);
914 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
916 osc_update_grant(cli, body);
922 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
924 client_obd_list_lock(&cli->cl_loi_list_lock);
925 oa->o_grant = cli->cl_avail_grant / 4;
926 cli->cl_avail_grant -= oa->o_grant;
927 client_obd_list_unlock(&cli->cl_loi_list_lock);
928 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
929 oa->o_valid |= OBD_MD_FLFLAGS;
932 oa->o_flags |= OBD_FL_SHRINK_GRANT;
933 osc_update_next_shrink(cli);
936 /* Shrink the current grant, either from some large amount to enough for a
937 * full set of in-flight RPCs, or if we have already shrunk to that limit
938 * then to enough for a single RPC. This avoids keeping more grant than
939 * needed, and avoids shrinking the grant piecemeal. */
940 static int osc_shrink_grant(struct client_obd *cli)
942 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
943 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
945 client_obd_list_lock(&cli->cl_loi_list_lock);
946 if (cli->cl_avail_grant <= target_bytes)
947 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
948 client_obd_list_unlock(&cli->cl_loi_list_lock);
950 return osc_shrink_grant_to_target(cli, target_bytes);
953 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
956 struct ost_body *body;
959 client_obd_list_lock(&cli->cl_loi_list_lock);
960 /* Don't shrink if we are already above or below the desired limit
961 * We don't want to shrink below a single RPC, as that will negatively
962 * impact block allocation and long-term performance. */
963 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
964 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
966 if (target_bytes >= cli->cl_avail_grant) {
967 client_obd_list_unlock(&cli->cl_loi_list_lock);
970 client_obd_list_unlock(&cli->cl_loi_list_lock);
976 osc_announce_cached(cli, &body->oa, 0);
978 client_obd_list_lock(&cli->cl_loi_list_lock);
979 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
980 cli->cl_avail_grant = target_bytes;
981 client_obd_list_unlock(&cli->cl_loi_list_lock);
982 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
983 body->oa.o_valid |= OBD_MD_FLFLAGS;
984 body->oa.o_flags = 0;
986 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
987 osc_update_next_shrink(cli);
989 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
990 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
991 sizeof(*body), body, NULL);
993 __osc_update_grant(cli, body->oa.o_grant);
998 static int osc_should_shrink_grant(struct client_obd *client)
1000 cfs_time_t time = cfs_time_current();
1001 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1003 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1004 OBD_CONNECT_GRANT_SHRINK) == 0)
1007 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1008 /* Get the current RPC size directly, instead of going via:
1009 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1010 * Keep comment here so that it can be found by searching. */
1011 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1013 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1014 client->cl_avail_grant > brw_size)
1017 osc_update_next_shrink(client);
1022 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1024 struct client_obd *client;
1026 cfs_list_for_each_entry(client, &item->ti_obd_list,
1027 cl_grant_shrink_list) {
1028 if (osc_should_shrink_grant(client))
1029 osc_shrink_grant(client);
1034 static int osc_add_shrink_grant(struct client_obd *client)
1038 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1040 osc_grant_shrink_grant_cb, NULL,
1041 &client->cl_grant_shrink_list);
1043 CERROR("add grant client %s error %d\n",
1044 client->cl_import->imp_obd->obd_name, rc);
1047 CDEBUG(D_CACHE, "add grant client %s \n",
1048 client->cl_import->imp_obd->obd_name);
1049 osc_update_next_shrink(client);
1053 static int osc_del_shrink_grant(struct client_obd *client)
1055 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1059 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1062 * ocd_grant is the total grant amount we're expect to hold: if we've
1063 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1064 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1066 * race is tolerable here: if we're evicted, but imp_state already
1067 * left EVICTED state, then cl_dirty must be 0 already.
1069 client_obd_list_lock(&cli->cl_loi_list_lock);
1070 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1071 cli->cl_avail_grant = ocd->ocd_grant;
1073 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1075 if (cli->cl_avail_grant < 0) {
1076 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1077 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1078 ocd->ocd_grant, cli->cl_dirty);
1079 /* workaround for servers which do not have the patch from
1081 cli->cl_avail_grant = ocd->ocd_grant;
1084 /* determine the appropriate chunk size used by osc_extent. */
1085 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1086 client_obd_list_unlock(&cli->cl_loi_list_lock);
1088 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1089 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1090 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1092 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1093 cfs_list_empty(&cli->cl_grant_shrink_list))
1094 osc_add_shrink_grant(cli);
1097 /* We assume that the reason this OSC got a short read is because it read
1098 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1099 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1100 * this stripe never got written at or beyond this stripe offset yet. */
1101 static void handle_short_read(int nob_read, obd_count page_count,
1102 struct brw_page **pga)
1107 /* skip bytes read OK */
1108 while (nob_read > 0) {
1109 LASSERT (page_count > 0);
1111 if (pga[i]->count > nob_read) {
1112 /* EOF inside this page */
1113 ptr = kmap(pga[i]->pg) +
1114 (pga[i]->off & ~CFS_PAGE_MASK);
1115 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1122 nob_read -= pga[i]->count;
1127 /* zero remaining pages */
1128 while (page_count-- > 0) {
1129 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1130 memset(ptr, 0, pga[i]->count);
1136 static int check_write_rcs(struct ptlrpc_request *req,
1137 int requested_nob, int niocount,
1138 obd_count page_count, struct brw_page **pga)
1143 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1144 sizeof(*remote_rcs) *
1146 if (remote_rcs == NULL) {
1147 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1151 /* return error if any niobuf was in error */
1152 for (i = 0; i < niocount; i++) {
1153 if ((int)remote_rcs[i] < 0)
1154 return(remote_rcs[i]);
1156 if (remote_rcs[i] != 0) {
1157 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1158 i, remote_rcs[i], req);
1163 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1164 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1165 req->rq_bulk->bd_nob_transferred, requested_nob);
1172 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1174 if (p1->flag != p2->flag) {
1175 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1176 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1177 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1179 /* warn if we try to combine flags that we don't know to be
1180 * safe to combine */
1181 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1182 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1183 "report this at http://bugs.whamcloud.com/\n",
1184 p1->flag, p2->flag);
1189 return (p1->off + p1->count == p2->off);
1192 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1193 struct brw_page **pga, int opc,
1194 cksum_type_t cksum_type)
1198 struct cfs_crypto_hash_desc *hdesc;
1199 unsigned int bufsize;
1201 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1203 LASSERT(pg_count > 0);
1205 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1206 if (IS_ERR(hdesc)) {
1207 CERROR("Unable to initialize checksum hash %s\n",
1208 cfs_crypto_hash_name(cfs_alg));
1209 return PTR_ERR(hdesc);
1212 while (nob > 0 && pg_count > 0) {
1213 int count = pga[i]->count > nob ? nob : pga[i]->count;
1215 /* corrupt the data before we compute the checksum, to
1216 * simulate an OST->client data error */
1217 if (i == 0 && opc == OST_READ &&
1218 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1219 unsigned char *ptr = kmap(pga[i]->pg);
1220 int off = pga[i]->off & ~CFS_PAGE_MASK;
1221 memcpy(ptr + off, "bad1", min(4, nob));
1224 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1225 pga[i]->off & ~CFS_PAGE_MASK,
1227 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1228 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1230 nob -= pga[i]->count;
1236 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1239 cfs_crypto_hash_final(hdesc, NULL, NULL);
1241 /* For sending we only compute the wrong checksum instead
1242 * of corrupting the data so it is still correct on a redo */
1243 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1249 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1250 struct lov_stripe_md *lsm, obd_count page_count,
1251 struct brw_page **pga,
1252 struct ptlrpc_request **reqp,
1253 struct obd_capa *ocapa, int reserve,
1256 struct ptlrpc_request *req;
1257 struct ptlrpc_bulk_desc *desc;
1258 struct ost_body *body;
1259 struct obd_ioobj *ioobj;
1260 struct niobuf_remote *niobuf;
1261 int niocount, i, requested_nob, opc, rc;
1262 struct osc_brw_async_args *aa;
1263 struct req_capsule *pill;
1264 struct brw_page *pg_prev;
1267 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1268 RETURN(-ENOMEM); /* Recoverable */
1269 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1270 RETURN(-EINVAL); /* Fatal */
1272 if ((cmd & OBD_BRW_WRITE) != 0) {
1274 req = ptlrpc_request_alloc_pool(cli->cl_import,
1275 cli->cl_import->imp_rq_pool,
1276 &RQF_OST_BRW_WRITE);
1279 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1284 for (niocount = i = 1; i < page_count; i++) {
1285 if (!can_merge_pages(pga[i - 1], pga[i]))
1289 pill = &req->rq_pill;
1290 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1292 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1293 niocount * sizeof(*niobuf));
1294 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1296 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1298 ptlrpc_request_free(req);
1301 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1302 ptlrpc_at_set_req_timeout(req);
1303 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1305 req->rq_no_retry_einprogress = 1;
1307 desc = ptlrpc_prep_bulk_imp(req, page_count,
1308 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1309 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1313 GOTO(out, rc = -ENOMEM);
1314 /* NB request now owns desc and will free it when it gets freed */
1316 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1317 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1318 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1319 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1321 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1323 obdo_to_ioobj(oa, ioobj);
1324 ioobj->ioo_bufcnt = niocount;
1325 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1326 * that might be send for this request. The actual number is decided
1327 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1328 * "max - 1" for old client compatibility sending "0", and also so the
1329 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1330 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1331 osc_pack_capa(req, body, ocapa);
1332 LASSERT(page_count > 0);
1334 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1335 struct brw_page *pg = pga[i];
1336 int poff = pg->off & ~CFS_PAGE_MASK;
1338 LASSERT(pg->count > 0);
1339 /* make sure there is no gap in the middle of page array */
1340 LASSERTF(page_count == 1 ||
1341 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1342 ergo(i > 0 && i < page_count - 1,
1343 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1344 ergo(i == page_count - 1, poff == 0)),
1345 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1346 i, page_count, pg, pg->off, pg->count);
1348 LASSERTF(i == 0 || pg->off > pg_prev->off,
1349 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1350 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1352 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1353 pg_prev->pg, page_private(pg_prev->pg),
1354 pg_prev->pg->index, pg_prev->off);
1356 LASSERTF(i == 0 || pg->off > pg_prev->off,
1357 "i %d p_c %u\n", i, page_count);
1359 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1360 (pg->flag & OBD_BRW_SRVLOCK));
1362 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1363 requested_nob += pg->count;
1365 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1367 niobuf->len += pg->count;
1369 niobuf->offset = pg->off;
1370 niobuf->len = pg->count;
1371 niobuf->flags = pg->flag;
1376 LASSERTF((void *)(niobuf - niocount) ==
1377 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1378 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1379 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1381 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1383 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1384 body->oa.o_valid |= OBD_MD_FLFLAGS;
1385 body->oa.o_flags = 0;
1387 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1390 if (osc_should_shrink_grant(cli))
1391 osc_shrink_grant_local(cli, &body->oa);
1393 /* size[REQ_REC_OFF] still sizeof (*body) */
1394 if (opc == OST_WRITE) {
1395 if (cli->cl_checksum &&
1396 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397 /* store cl_cksum_type in a local variable since
1398 * it can be changed via lprocfs */
1399 cksum_type_t cksum_type = cli->cl_cksum_type;
1401 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1402 oa->o_flags &= OBD_FL_LOCAL_MASK;
1403 body->oa.o_flags = 0;
1405 body->oa.o_flags |= cksum_type_pack(cksum_type);
1406 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1407 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1411 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1413 /* save this in 'oa', too, for later checking */
1414 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1415 oa->o_flags |= cksum_type_pack(cksum_type);
1417 /* clear out the checksum flag, in case this is a
1418 * resend but cl_checksum is no longer set. b=11238 */
1419 oa->o_valid &= ~OBD_MD_FLCKSUM;
1421 oa->o_cksum = body->oa.o_cksum;
1422 /* 1 RC per niobuf */
1423 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1424 sizeof(__u32) * niocount);
1426 if (cli->cl_checksum &&
1427 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1428 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1429 body->oa.o_flags = 0;
1430 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1431 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1434 ptlrpc_request_set_replen(req);
1436 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1437 aa = ptlrpc_req_async_args(req);
1439 aa->aa_requested_nob = requested_nob;
1440 aa->aa_nio_count = niocount;
1441 aa->aa_page_count = page_count;
1445 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1446 if (ocapa && reserve)
1447 aa->aa_ocapa = capa_get(ocapa);
1453 ptlrpc_req_finished(req);
1457 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1458 __u32 client_cksum, __u32 server_cksum, int nob,
1459 obd_count page_count, struct brw_page **pga,
1460 cksum_type_t client_cksum_type)
1464 cksum_type_t cksum_type;
1466 if (server_cksum == client_cksum) {
1467 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1471 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1473 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1476 if (cksum_type != client_cksum_type)
1477 msg = "the server did not use the checksum type specified in "
1478 "the original request - likely a protocol problem";
1479 else if (new_cksum == server_cksum)
1480 msg = "changed on the client after we checksummed it - "
1481 "likely false positive due to mmap IO (bug 11742)";
1482 else if (new_cksum == client_cksum)
1483 msg = "changed in transit before arrival at OST";
1485 msg = "changed in transit AND doesn't match the original - "
1486 "likely false positive due to mmap IO (bug 11742)";
1488 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1489 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1490 msg, libcfs_nid2str(peer->nid),
1491 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1492 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1493 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1494 POSTID(&oa->o_oi), pga[0]->off,
1495 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1496 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1497 "client csum now %x\n", client_cksum, client_cksum_type,
1498 server_cksum, cksum_type, new_cksum);
1502 /* Note rc enters this function as number of bytes transferred */
1503 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1505 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1506 const lnet_process_id_t *peer =
1507 &req->rq_import->imp_connection->c_peer;
1508 struct client_obd *cli = aa->aa_cli;
1509 struct ost_body *body;
1510 __u32 client_cksum = 0;
1513 if (rc < 0 && rc != -EDQUOT) {
1514 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1518 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1519 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1521 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1525 /* set/clear over quota flag for a uid/gid */
1526 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1527 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1528 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1530 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1531 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1533 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1536 osc_update_grant(cli, body);
1541 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1542 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1544 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1546 CERROR("Unexpected +ve rc %d\n", rc);
1549 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1551 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1554 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1555 check_write_checksum(&body->oa, peer, client_cksum,
1556 body->oa.o_cksum, aa->aa_requested_nob,
1557 aa->aa_page_count, aa->aa_ppga,
1558 cksum_type_unpack(aa->aa_oa->o_flags)))
1561 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1562 aa->aa_page_count, aa->aa_ppga);
1566 /* The rest of this function executes only for OST_READs */
1568 /* if unwrap_bulk failed, return -EAGAIN to retry */
1569 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1571 GOTO(out, rc = -EAGAIN);
1573 if (rc > aa->aa_requested_nob) {
1574 CERROR("Unexpected rc %d (%d requested)\n", rc,
1575 aa->aa_requested_nob);
1579 if (rc != req->rq_bulk->bd_nob_transferred) {
1580 CERROR ("Unexpected rc %d (%d transferred)\n",
1581 rc, req->rq_bulk->bd_nob_transferred);
1585 if (rc < aa->aa_requested_nob)
1586 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1588 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1589 static int cksum_counter;
1590 __u32 server_cksum = body->oa.o_cksum;
1593 cksum_type_t cksum_type;
1595 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1596 body->oa.o_flags : 0);
1597 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1598 aa->aa_ppga, OST_READ,
1601 if (peer->nid == req->rq_bulk->bd_sender) {
1605 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1608 if (server_cksum == ~0 && rc > 0) {
1609 CERROR("Protocol error: server %s set the 'checksum' "
1610 "bit, but didn't send a checksum. Not fatal, "
1611 "but please notify on http://bugs.whamcloud.com/\n",
1612 libcfs_nid2str(peer->nid));
1613 } else if (server_cksum != client_cksum) {
1614 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1615 "%s%s%s inode "DFID" object "DOSTID
1616 " extent ["LPU64"-"LPU64"]\n",
1617 req->rq_import->imp_obd->obd_name,
1618 libcfs_nid2str(peer->nid),
1620 body->oa.o_valid & OBD_MD_FLFID ?
1621 body->oa.o_parent_seq : (__u64)0,
1622 body->oa.o_valid & OBD_MD_FLFID ?
1623 body->oa.o_parent_oid : 0,
1624 body->oa.o_valid & OBD_MD_FLFID ?
1625 body->oa.o_parent_ver : 0,
1626 POSTID(&body->oa.o_oi),
1627 aa->aa_ppga[0]->off,
1628 aa->aa_ppga[aa->aa_page_count-1]->off +
1629 aa->aa_ppga[aa->aa_page_count-1]->count -
1631 CERROR("client %x, server %x, cksum_type %x\n",
1632 client_cksum, server_cksum, cksum_type);
1634 aa->aa_oa->o_cksum = client_cksum;
1638 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1641 } else if (unlikely(client_cksum)) {
1642 static int cksum_missed;
1645 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1646 CERROR("Checksum %u requested from %s but not sent\n",
1647 cksum_missed, libcfs_nid2str(peer->nid));
1653 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1654 aa->aa_oa, &body->oa);
1659 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1660 struct lov_stripe_md *lsm,
1661 obd_count page_count, struct brw_page **pga,
1662 struct obd_capa *ocapa)
1664 struct ptlrpc_request *req;
1666 wait_queue_head_t waitq;
1667 int generation, resends = 0;
1668 struct l_wait_info lwi;
1672 init_waitqueue_head(&waitq);
1673 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1676 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1677 page_count, pga, &req, ocapa, 0, resends);
1682 req->rq_generation_set = 1;
1683 req->rq_import_generation = generation;
1684 req->rq_sent = cfs_time_current_sec() + resends;
1687 rc = ptlrpc_queue_wait(req);
1689 if (rc == -ETIMEDOUT && req->rq_resend) {
1690 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1691 ptlrpc_req_finished(req);
1695 rc = osc_brw_fini_request(req, rc);
1697 ptlrpc_req_finished(req);
1698 /* When server return -EINPROGRESS, client should always retry
1699 * regardless of the number of times the bulk was resent already.*/
1700 if (osc_recoverable_error(rc)) {
1702 if (rc != -EINPROGRESS &&
1703 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1704 CERROR("%s: too many resend retries for object: "
1705 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1706 POSTID(&oa->o_oi), rc);
1710 exp->exp_obd->u.cli.cl_import->imp_generation) {
1711 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1712 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1713 POSTID(&oa->o_oi), rc);
1717 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1719 l_wait_event(waitq, 0, &lwi);
1724 if (rc == -EAGAIN || rc == -EINPROGRESS)
1729 static int osc_brw_redo_request(struct ptlrpc_request *request,
1730 struct osc_brw_async_args *aa, int rc)
1732 struct ptlrpc_request *new_req;
1733 struct osc_brw_async_args *new_aa;
1734 struct osc_async_page *oap;
1737 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1738 "redo for recoverable error %d", rc);
1740 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1741 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1742 aa->aa_cli, aa->aa_oa,
1743 NULL /* lsm unused by osc currently */,
1744 aa->aa_page_count, aa->aa_ppga,
1745 &new_req, aa->aa_ocapa, 0, 1);
1749 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1750 if (oap->oap_request != NULL) {
1751 LASSERTF(request == oap->oap_request,
1752 "request %p != oap_request %p\n",
1753 request, oap->oap_request);
1754 if (oap->oap_interrupted) {
1755 ptlrpc_req_finished(new_req);
1760 /* New request takes over pga and oaps from old request.
1761 * Note that copying a list_head doesn't work, need to move it... */
1763 new_req->rq_interpret_reply = request->rq_interpret_reply;
1764 new_req->rq_async_args = request->rq_async_args;
1765 new_req->rq_commit_cb = request->rq_commit_cb;
1766 /* cap resend delay to the current request timeout, this is similar to
1767 * what ptlrpc does (see after_reply()) */
1768 if (aa->aa_resends > new_req->rq_timeout)
1769 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1771 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1772 new_req->rq_generation_set = 1;
1773 new_req->rq_import_generation = request->rq_import_generation;
1775 new_aa = ptlrpc_req_async_args(new_req);
1777 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1778 cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1779 CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1780 cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1781 new_aa->aa_resends = aa->aa_resends;
1783 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1784 if (oap->oap_request) {
1785 ptlrpc_req_finished(oap->oap_request);
1786 oap->oap_request = ptlrpc_request_addref(new_req);
1790 new_aa->aa_ocapa = aa->aa_ocapa;
1791 aa->aa_ocapa = NULL;
1793 /* XXX: This code will run into problem if we're going to support
1794 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1795 * and wait for all of them to be finished. We should inherit request
1796 * set from old request. */
1797 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1799 DEBUG_REQ(D_INFO, new_req, "new request");
1804 * ugh, we want disk allocation on the target to happen in offset order. we'll
1805 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1806 * fine for our small page arrays and doesn't require allocation. its an
1807 * insertion sort that swaps elements that are strides apart, shrinking the
1808 * stride down until its '1' and the array is sorted.
1810 static void sort_brw_pages(struct brw_page **array, int num)
1813 struct brw_page *tmp;
1817 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1822 for (i = stride ; i < num ; i++) {
1825 while (j >= stride && array[j - stride]->off > tmp->off) {
1826 array[j] = array[j - stride];
1831 } while (stride > 1);
1834 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1840 LASSERT (pages > 0);
1841 offset = pg[i]->off & ~CFS_PAGE_MASK;
1845 if (pages == 0) /* that's all */
1848 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1849 return count; /* doesn't end on page boundary */
1852 offset = pg[i]->off & ~CFS_PAGE_MASK;
1853 if (offset != 0) /* doesn't start on page boundary */
1860 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1862 struct brw_page **ppga;
1865 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1869 for (i = 0; i < count; i++)
1874 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1876 LASSERT(ppga != NULL);
1877 OBD_FREE(ppga, sizeof(*ppga) * count);
1880 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1881 obd_count page_count, struct brw_page *pga,
1882 struct obd_trans_info *oti)
1884 struct obdo *saved_oa = NULL;
1885 struct brw_page **ppga, **orig;
1886 struct obd_import *imp = class_exp2cliimp(exp);
1887 struct client_obd *cli;
1888 int rc, page_count_orig;
1891 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1892 cli = &imp->imp_obd->u.cli;
1894 if (cmd & OBD_BRW_CHECK) {
1895 /* The caller just wants to know if there's a chance that this
1896 * I/O can succeed */
1898 if (imp->imp_invalid)
1903 /* test_brw with a failed create can trip this, maybe others. */
1904 LASSERT(cli->cl_max_pages_per_rpc);
1908 orig = ppga = osc_build_ppga(pga, page_count);
1911 page_count_orig = page_count;
1913 sort_brw_pages(ppga, page_count);
1914 while (page_count) {
1915 obd_count pages_per_brw;
1917 if (page_count > cli->cl_max_pages_per_rpc)
1918 pages_per_brw = cli->cl_max_pages_per_rpc;
1920 pages_per_brw = page_count;
1922 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1924 if (saved_oa != NULL) {
1925 /* restore previously saved oa */
1926 *oinfo->oi_oa = *saved_oa;
1927 } else if (page_count > pages_per_brw) {
1928 /* save a copy of oa (brw will clobber it) */
1929 OBDO_ALLOC(saved_oa);
1930 if (saved_oa == NULL)
1931 GOTO(out, rc = -ENOMEM);
1932 *saved_oa = *oinfo->oi_oa;
1935 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1936 pages_per_brw, ppga, oinfo->oi_capa);
1941 page_count -= pages_per_brw;
1942 ppga += pages_per_brw;
1946 osc_release_ppga(orig, page_count_orig);
1948 if (saved_oa != NULL)
1949 OBDO_FREE(saved_oa);
1954 static int brw_interpret(const struct lu_env *env,
1955 struct ptlrpc_request *req, void *data, int rc)
1957 struct osc_brw_async_args *aa = data;
1958 struct osc_extent *ext;
1959 struct osc_extent *tmp;
1960 struct cl_object *obj = NULL;
1961 struct client_obd *cli = aa->aa_cli;
1964 rc = osc_brw_fini_request(req, rc);
1965 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1966 /* When server return -EINPROGRESS, client should always retry
1967 * regardless of the number of times the bulk was resent already. */
1968 if (osc_recoverable_error(rc)) {
1969 if (req->rq_import_generation !=
1970 req->rq_import->imp_generation) {
1971 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1972 ""DOSTID", rc = %d.\n",
1973 req->rq_import->imp_obd->obd_name,
1974 POSTID(&aa->aa_oa->o_oi), rc);
1975 } else if (rc == -EINPROGRESS ||
1976 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1977 rc = osc_brw_redo_request(req, aa, rc);
1979 CERROR("%s: too many resent retries for object: "
1980 ""LPU64":"LPU64", rc = %d.\n",
1981 req->rq_import->imp_obd->obd_name,
1982 POSTID(&aa->aa_oa->o_oi), rc);
1987 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1992 capa_put(aa->aa_ocapa);
1993 aa->aa_ocapa = NULL;
1996 cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1997 if (obj == NULL && rc == 0) {
1998 obj = osc2cl(ext->oe_obj);
2002 cfs_list_del_init(&ext->oe_link);
2003 osc_extent_finish(env, ext, 1, rc);
2005 LASSERT(cfs_list_empty(&aa->aa_exts));
2006 LASSERT(cfs_list_empty(&aa->aa_oaps));
2009 struct obdo *oa = aa->aa_oa;
2010 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2011 unsigned long valid = 0;
2014 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2015 attr->cat_blocks = oa->o_blocks;
2016 valid |= CAT_BLOCKS;
2018 if (oa->o_valid & OBD_MD_FLMTIME) {
2019 attr->cat_mtime = oa->o_mtime;
2022 if (oa->o_valid & OBD_MD_FLATIME) {
2023 attr->cat_atime = oa->o_atime;
2026 if (oa->o_valid & OBD_MD_FLCTIME) {
2027 attr->cat_ctime = oa->o_ctime;
2031 cl_object_attr_lock(obj);
2032 cl_object_attr_set(env, obj, attr, valid);
2033 cl_object_attr_unlock(obj);
2035 cl_object_put(env, obj);
2037 OBDO_FREE(aa->aa_oa);
2039 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2040 req->rq_bulk->bd_nob_transferred);
2041 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2042 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2044 client_obd_list_lock(&cli->cl_loi_list_lock);
2045 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2046 * is called so we know whether to go to sync BRWs or wait for more
2047 * RPCs to complete */
2048 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2049 cli->cl_w_in_flight--;
2051 cli->cl_r_in_flight--;
2052 osc_wake_cache_waiters(cli);
2053 client_obd_list_unlock(&cli->cl_loi_list_lock);
2055 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2059 static void brw_commit(struct ptlrpc_request *req)
2061 spin_lock(&req->rq_lock);
2062 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2063 * this called via the rq_commit_cb, I need to ensure
2064 * osc_dec_unstable_pages is still called. Otherwise unstable
2065 * pages may be leaked. */
2066 if (req->rq_unstable) {
2067 spin_unlock(&req->rq_lock);
2068 osc_dec_unstable_pages(req);
2069 spin_lock(&req->rq_lock);
2071 req->rq_committed = 1;
2073 spin_unlock(&req->rq_lock);
2077 * Build an RPC by the list of extent @ext_list. The caller must ensure
2078 * that the total pages in this list are NOT over max pages per RPC.
2079 * Extents in the list must be in OES_RPC state.
2081 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2082 cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2084 struct ptlrpc_request *req = NULL;
2085 struct osc_extent *ext;
2086 struct brw_page **pga = NULL;
2087 struct osc_brw_async_args *aa = NULL;
2088 struct obdo *oa = NULL;
2089 struct osc_async_page *oap;
2090 struct osc_async_page *tmp;
2091 struct cl_req *clerq = NULL;
2092 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2094 struct ldlm_lock *lock = NULL;
2095 struct cl_req_attr *crattr = NULL;
2096 obd_off starting_offset = OBD_OBJECT_EOF;
2097 obd_off ending_offset = 0;
2103 CFS_LIST_HEAD(rpc_list);
2106 LASSERT(!cfs_list_empty(ext_list));
2108 /* add pages into rpc_list to build BRW rpc */
2109 cfs_list_for_each_entry(ext, ext_list, oe_link) {
2110 LASSERT(ext->oe_state == OES_RPC);
2111 mem_tight |= ext->oe_memalloc;
2112 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2114 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2115 if (starting_offset > oap->oap_obj_off)
2116 starting_offset = oap->oap_obj_off;
2118 LASSERT(oap->oap_page_off == 0);
2119 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2120 ending_offset = oap->oap_obj_off +
2123 LASSERT(oap->oap_page_off + oap->oap_count ==
2129 mpflag = cfs_memory_pressure_get_and_set();
2131 OBD_ALLOC(crattr, sizeof(*crattr));
2133 GOTO(out, rc = -ENOMEM);
2135 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2137 GOTO(out, rc = -ENOMEM);
2141 GOTO(out, rc = -ENOMEM);
2144 cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2145 struct cl_page *page = oap2cl_page(oap);
2146 if (clerq == NULL) {
2147 clerq = cl_req_alloc(env, page, crt,
2148 1 /* only 1-object rpcs for now */);
2150 GOTO(out, rc = PTR_ERR(clerq));
2151 lock = oap->oap_ldlm_lock;
2154 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2155 pga[i] = &oap->oap_brw_page;
2156 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2157 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2158 pga[i]->pg, page_index(oap->oap_page), oap,
2161 cl_req_page_add(env, clerq, page);
2164 /* always get the data for the obdo for the rpc */
2165 LASSERT(clerq != NULL);
2166 crattr->cra_oa = oa;
2167 cl_req_attr_set(env, clerq, crattr, ~0ULL);
2169 oa->o_handle = lock->l_remote_handle;
2170 oa->o_valid |= OBD_MD_FLHANDLE;
2173 rc = cl_req_prep(env, clerq);
2175 CERROR("cl_req_prep failed: %d\n", rc);
2179 sort_brw_pages(pga, page_count);
2180 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2181 pga, &req, crattr->cra_capa, 1, 0);
2183 CERROR("prep_req failed: %d\n", rc);
2187 req->rq_commit_cb = brw_commit;
2188 req->rq_interpret_reply = brw_interpret;
2191 req->rq_memalloc = 1;
2193 /* Need to update the timestamps after the request is built in case
2194 * we race with setattr (locally or in queue at OST). If OST gets
2195 * later setattr before earlier BRW (as determined by the request xid),
2196 * the OST will not use BRW timestamps. Sadly, there is no obvious
2197 * way to do this in a single call. bug 10150 */
2198 cl_req_attr_set(env, clerq, crattr,
2199 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2201 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2203 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2204 aa = ptlrpc_req_async_args(req);
2205 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2206 cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2207 CFS_INIT_LIST_HEAD(&aa->aa_exts);
2208 cfs_list_splice_init(ext_list, &aa->aa_exts);
2209 aa->aa_clerq = clerq;
2211 /* queued sync pages can be torn down while the pages
2212 * were between the pending list and the rpc */
2214 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2215 /* only one oap gets a request reference */
2218 if (oap->oap_interrupted && !req->rq_intr) {
2219 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2221 ptlrpc_mark_interrupted(req);
2225 tmp->oap_request = ptlrpc_request_addref(req);
2227 client_obd_list_lock(&cli->cl_loi_list_lock);
2228 starting_offset >>= PAGE_CACHE_SHIFT;
2229 if (cmd == OBD_BRW_READ) {
2230 cli->cl_r_in_flight++;
2231 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2232 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2233 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2234 starting_offset + 1);
2236 cli->cl_w_in_flight++;
2237 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2238 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2239 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2240 starting_offset + 1);
2242 client_obd_list_unlock(&cli->cl_loi_list_lock);
2244 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2245 page_count, aa, cli->cl_r_in_flight,
2246 cli->cl_w_in_flight);
2248 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2249 * see which CPU/NUMA node the majority of pages were allocated
2250 * on, and try to assign the async RPC to the CPU core
2251 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2253 * But on the other hand, we expect that multiple ptlrpcd
2254 * threads and the initial write sponsor can run in parallel,
2255 * especially when data checksum is enabled, which is CPU-bound
2256 * operation and single ptlrpcd thread cannot process in time.
2257 * So more ptlrpcd threads sharing BRW load
2258 * (with PDL_POLICY_ROUND) seems better.
2260 ptlrpcd_add_req(req, pol, -1);
2266 cfs_memory_pressure_restore(mpflag);
2268 if (crattr != NULL) {
2269 capa_put(crattr->cra_capa);
2270 OBD_FREE(crattr, sizeof(*crattr));
2274 LASSERT(req == NULL);
2279 OBD_FREE(pga, sizeof(*pga) * page_count);
2280 /* this should happen rarely and is pretty bad, it makes the
2281 * pending list not follow the dirty order */
2282 while (!cfs_list_empty(ext_list)) {
2283 ext = cfs_list_entry(ext_list->next, struct osc_extent,
2285 cfs_list_del_init(&ext->oe_link);
2286 osc_extent_finish(env, ext, 0, rc);
2288 if (clerq && !IS_ERR(clerq))
2289 cl_req_completion(env, clerq, rc);
2294 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2295 struct ldlm_enqueue_info *einfo)
2297 void *data = einfo->ei_cbdata;
2300 LASSERT(lock != NULL);
2301 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2302 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2303 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2304 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2306 lock_res_and_lock(lock);
2307 spin_lock(&osc_ast_guard);
2309 if (lock->l_ast_data == NULL)
2310 lock->l_ast_data = data;
2311 if (lock->l_ast_data == data)
2314 spin_unlock(&osc_ast_guard);
2315 unlock_res_and_lock(lock);
2320 static int osc_set_data_with_check(struct lustre_handle *lockh,
2321 struct ldlm_enqueue_info *einfo)
2323 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2327 set = osc_set_lock_data_with_check(lock, einfo);
2328 LDLM_LOCK_PUT(lock);
2330 CERROR("lockh %p, data %p - client evicted?\n",
2331 lockh, einfo->ei_cbdata);
2335 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2336 ldlm_iterator_t replace, void *data)
2338 struct ldlm_res_id res_id;
2339 struct obd_device *obd = class_exp2obd(exp);
2341 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2342 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2346 /* find any ldlm lock of the inode in osc
2350 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2351 ldlm_iterator_t replace, void *data)
2353 struct ldlm_res_id res_id;
2354 struct obd_device *obd = class_exp2obd(exp);
2357 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2358 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2359 if (rc == LDLM_ITER_STOP)
2361 if (rc == LDLM_ITER_CONTINUE)
2366 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2367 obd_enqueue_update_f upcall, void *cookie,
2368 __u64 *flags, int agl, int rc)
2370 int intent = *flags & LDLM_FL_HAS_INTENT;
2374 /* The request was created before ldlm_cli_enqueue call. */
2375 if (rc == ELDLM_LOCK_ABORTED) {
2376 struct ldlm_reply *rep;
2377 rep = req_capsule_server_get(&req->rq_pill,
2380 LASSERT(rep != NULL);
2381 rep->lock_policy_res1 =
2382 ptlrpc_status_ntoh(rep->lock_policy_res1);
2383 if (rep->lock_policy_res1)
2384 rc = rep->lock_policy_res1;
2388 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2390 *flags |= LDLM_FL_LVB_READY;
2391 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2392 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2395 /* Call the update callback. */
2396 rc = (*upcall)(cookie, rc);
2400 static int osc_enqueue_interpret(const struct lu_env *env,
2401 struct ptlrpc_request *req,
2402 struct osc_enqueue_args *aa, int rc)
2404 struct ldlm_lock *lock;
2405 struct lustre_handle handle;
2407 struct ost_lvb *lvb;
2409 __u64 *flags = aa->oa_flags;
2411 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2412 * might be freed anytime after lock upcall has been called. */
2413 lustre_handle_copy(&handle, aa->oa_lockh);
2414 mode = aa->oa_ei->ei_mode;
2416 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2418 lock = ldlm_handle2lock(&handle);
2420 /* Take an additional reference so that a blocking AST that
2421 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2422 * to arrive after an upcall has been executed by
2423 * osc_enqueue_fini(). */
2424 ldlm_lock_addref(&handle, mode);
2426 /* Let CP AST to grant the lock first. */
2427 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2429 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2434 lvb_len = sizeof(*aa->oa_lvb);
2437 /* Complete obtaining the lock procedure. */
2438 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2439 mode, flags, lvb, lvb_len, &handle, rc);
2440 /* Complete osc stuff. */
2441 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2442 flags, aa->oa_agl, rc);
2444 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2446 /* Release the lock for async request. */
2447 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2449 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2450 * not already released by
2451 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2453 ldlm_lock_decref(&handle, mode);
2455 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2456 aa->oa_lockh, req, aa);
2457 ldlm_lock_decref(&handle, mode);
2458 LDLM_LOCK_PUT(lock);
2462 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2463 struct lov_oinfo *loi, __u64 flags,
2464 struct ost_lvb *lvb, __u32 mode, int rc)
2466 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2468 if (rc == ELDLM_OK) {
2471 LASSERT(lock != NULL);
2472 loi->loi_lvb = *lvb;
2473 tmp = loi->loi_lvb.lvb_size;
2474 /* Extend KMS up to the end of this lock and no further
2475 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2476 if (tmp > lock->l_policy_data.l_extent.end)
2477 tmp = lock->l_policy_data.l_extent.end + 1;
2478 if (tmp >= loi->loi_kms) {
2479 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2480 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2481 loi_kms_set(loi, tmp);
2483 LDLM_DEBUG(lock, "lock acquired, setting rss="
2484 LPU64"; leaving kms="LPU64", end="LPU64,
2485 loi->loi_lvb.lvb_size, loi->loi_kms,
2486 lock->l_policy_data.l_extent.end);
2488 ldlm_lock_allow_match(lock);
2489 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2490 LASSERT(lock != NULL);
2491 loi->loi_lvb = *lvb;
2492 ldlm_lock_allow_match(lock);
2493 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2494 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2500 ldlm_lock_fail_match(lock);
2502 LDLM_LOCK_PUT(lock);
2505 EXPORT_SYMBOL(osc_update_enqueue);
2507 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2509 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2510 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2511 * other synchronous requests, however keeping some locks and trying to obtain
2512 * others may take a considerable amount of time in a case of ost failure; and
2513 * when other sync requests do not get released lock from a client, the client
2514 * is excluded from the cluster -- such scenarious make the life difficult, so
2515 * release locks just after they are obtained. */
2516 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2517 __u64 *flags, ldlm_policy_data_t *policy,
2518 struct ost_lvb *lvb, int kms_valid,
2519 obd_enqueue_update_f upcall, void *cookie,
2520 struct ldlm_enqueue_info *einfo,
2521 struct lustre_handle *lockh,
2522 struct ptlrpc_request_set *rqset, int async, int agl)
2524 struct obd_device *obd = exp->exp_obd;
2525 struct ptlrpc_request *req = NULL;
2526 int intent = *flags & LDLM_FL_HAS_INTENT;
2527 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2532 /* Filesystem lock extents are extended to page boundaries so that
2533 * dealing with the page cache is a little smoother. */
2534 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2535 policy->l_extent.end |= ~CFS_PAGE_MASK;
2538 * kms is not valid when either object is completely fresh (so that no
2539 * locks are cached), or object was evicted. In the latter case cached
2540 * lock cannot be used, because it would prime inode state with
2541 * potentially stale LVB.
2546 /* Next, search for already existing extent locks that will cover us */
2547 /* If we're trying to read, we also search for an existing PW lock. The
2548 * VFS and page cache already protect us locally, so lots of readers/
2549 * writers can share a single PW lock.
2551 * There are problems with conversion deadlocks, so instead of
2552 * converting a read lock to a write lock, we'll just enqueue a new
2555 * At some point we should cancel the read lock instead of making them
2556 * send us a blocking callback, but there are problems with canceling
2557 * locks out from other users right now, too. */
2558 mode = einfo->ei_mode;
2559 if (einfo->ei_mode == LCK_PR)
2561 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2562 einfo->ei_type, policy, mode, lockh, 0);
2564 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2566 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2567 /* For AGL, if enqueue RPC is sent but the lock is not
2568 * granted, then skip to process this strpe.
2569 * Return -ECANCELED to tell the caller. */
2570 ldlm_lock_decref(lockh, mode);
2571 LDLM_LOCK_PUT(matched);
2573 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2574 *flags |= LDLM_FL_LVB_READY;
2575 /* addref the lock only if not async requests and PW
2576 * lock is matched whereas we asked for PR. */
2577 if (!rqset && einfo->ei_mode != mode)
2578 ldlm_lock_addref(lockh, LCK_PR);
2580 /* I would like to be able to ASSERT here that
2581 * rss <= kms, but I can't, for reasons which
2582 * are explained in lov_enqueue() */
2585 /* We already have a lock, and it's referenced.
2587 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2588 * AGL upcall may change it to CLS_HELD directly. */
2589 (*upcall)(cookie, ELDLM_OK);
2591 if (einfo->ei_mode != mode)
2592 ldlm_lock_decref(lockh, LCK_PW);
2594 /* For async requests, decref the lock. */
2595 ldlm_lock_decref(lockh, einfo->ei_mode);
2596 LDLM_LOCK_PUT(matched);
2599 ldlm_lock_decref(lockh, mode);
2600 LDLM_LOCK_PUT(matched);
2606 CFS_LIST_HEAD(cancels);
2607 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2608 &RQF_LDLM_ENQUEUE_LVB);
2612 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2614 ptlrpc_request_free(req);
2618 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2620 ptlrpc_request_set_replen(req);
2623 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2624 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2626 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2627 sizeof(*lvb), LVB_T_OST, lockh, async);
2630 struct osc_enqueue_args *aa;
2631 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2632 aa = ptlrpc_req_async_args(req);
2635 aa->oa_flags = flags;
2636 aa->oa_upcall = upcall;
2637 aa->oa_cookie = cookie;
2639 aa->oa_lockh = lockh;
2642 req->rq_interpret_reply =
2643 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2644 if (rqset == PTLRPCD_SET)
2645 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2647 ptlrpc_set_add_req(rqset, req);
2648 } else if (intent) {
2649 ptlrpc_req_finished(req);
2654 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2656 ptlrpc_req_finished(req);
2661 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2662 struct ldlm_enqueue_info *einfo,
2663 struct ptlrpc_request_set *rqset)
2665 struct ldlm_res_id res_id;
2669 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2670 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2671 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2672 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2673 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2674 rqset, rqset != NULL, 0);
2678 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2679 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2680 __u64 *flags, void *data, struct lustre_handle *lockh,
2683 struct obd_device *obd = exp->exp_obd;
2684 __u64 lflags = *flags;
2688 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2691 /* Filesystem lock extents are extended to page boundaries so that
2692 * dealing with the page cache is a little smoother */
2693 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2694 policy->l_extent.end |= ~CFS_PAGE_MASK;
2696 /* Next, search for already existing extent locks that will cover us */
2697 /* If we're trying to read, we also search for an existing PW lock. The
2698 * VFS and page cache already protect us locally, so lots of readers/
2699 * writers can share a single PW lock. */
2703 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2704 res_id, type, policy, rc, lockh, unref);
2707 if (!osc_set_data_with_check(lockh, data)) {
2708 if (!(lflags & LDLM_FL_TEST_LOCK))
2709 ldlm_lock_decref(lockh, rc);
2713 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2714 ldlm_lock_addref(lockh, LCK_PR);
2715 ldlm_lock_decref(lockh, LCK_PW);
2722 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2726 if (unlikely(mode == LCK_GROUP))
2727 ldlm_lock_decref_and_cancel(lockh, mode);
2729 ldlm_lock_decref(lockh, mode);
2734 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2735 __u32 mode, struct lustre_handle *lockh)
2738 RETURN(osc_cancel_base(lockh, mode));
2741 static int osc_cancel_unused(struct obd_export *exp,
2742 struct lov_stripe_md *lsm,
2743 ldlm_cancel_flags_t flags,
2746 struct obd_device *obd = class_exp2obd(exp);
2747 struct ldlm_res_id res_id, *resp = NULL;
2750 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2754 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2757 static int osc_statfs_interpret(const struct lu_env *env,
2758 struct ptlrpc_request *req,
2759 struct osc_async_args *aa, int rc)
2761 struct obd_statfs *msfs;
2765 /* The request has in fact never been sent
2766 * due to issues at a higher level (LOV).
2767 * Exit immediately since the caller is
2768 * aware of the problem and takes care
2769 * of the clean up */
2772 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2773 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2779 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2781 GOTO(out, rc = -EPROTO);
2784 *aa->aa_oi->oi_osfs = *msfs;
2786 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2790 static int osc_statfs_async(struct obd_export *exp,
2791 struct obd_info *oinfo, __u64 max_age,
2792 struct ptlrpc_request_set *rqset)
2794 struct obd_device *obd = class_exp2obd(exp);
2795 struct ptlrpc_request *req;
2796 struct osc_async_args *aa;
2800 /* We could possibly pass max_age in the request (as an absolute
2801 * timestamp or a "seconds.usec ago") so the target can avoid doing
2802 * extra calls into the filesystem if that isn't necessary (e.g.
2803 * during mount that would help a bit). Having relative timestamps
2804 * is not so great if request processing is slow, while absolute
2805 * timestamps are not ideal because they need time synchronization. */
2806 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2810 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2812 ptlrpc_request_free(req);
2815 ptlrpc_request_set_replen(req);
2816 req->rq_request_portal = OST_CREATE_PORTAL;
2817 ptlrpc_at_set_req_timeout(req);
2819 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2820 /* procfs requests not want stat in wait for avoid deadlock */
2821 req->rq_no_resend = 1;
2822 req->rq_no_delay = 1;
2825 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2826 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2827 aa = ptlrpc_req_async_args(req);
2830 ptlrpc_set_add_req(rqset, req);
2834 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2835 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2837 struct obd_device *obd = class_exp2obd(exp);
2838 struct obd_statfs *msfs;
2839 struct ptlrpc_request *req;
2840 struct obd_import *imp = NULL;
2844 /*Since the request might also come from lprocfs, so we need
2845 *sync this with client_disconnect_export Bug15684*/
2846 down_read(&obd->u.cli.cl_sem);
2847 if (obd->u.cli.cl_import)
2848 imp = class_import_get(obd->u.cli.cl_import);
2849 up_read(&obd->u.cli.cl_sem);
2853 /* We could possibly pass max_age in the request (as an absolute
2854 * timestamp or a "seconds.usec ago") so the target can avoid doing
2855 * extra calls into the filesystem if that isn't necessary (e.g.
2856 * during mount that would help a bit). Having relative timestamps
2857 * is not so great if request processing is slow, while absolute
2858 * timestamps are not ideal because they need time synchronization. */
2859 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2861 class_import_put(imp);
2866 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2868 ptlrpc_request_free(req);
2871 ptlrpc_request_set_replen(req);
2872 req->rq_request_portal = OST_CREATE_PORTAL;
2873 ptlrpc_at_set_req_timeout(req);
2875 if (flags & OBD_STATFS_NODELAY) {
2876 /* procfs requests not want stat in wait for avoid deadlock */
2877 req->rq_no_resend = 1;
2878 req->rq_no_delay = 1;
2881 rc = ptlrpc_queue_wait(req);
2885 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2887 GOTO(out, rc = -EPROTO);
2894 ptlrpc_req_finished(req);
2898 /* Retrieve object striping information.
2900 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2901 * the maximum number of OST indices which will fit in the user buffer.
2902 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2904 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2906 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2907 struct lov_user_md_v3 lum, *lumk;
2908 struct lov_user_ost_data_v1 *lmm_objects;
2909 int rc = 0, lum_size;
2915 /* we only need the header part from user space to get lmm_magic and
2916 * lmm_stripe_count, (the header part is common to v1 and v3) */
2917 lum_size = sizeof(struct lov_user_md_v1);
2918 if (copy_from_user(&lum, lump, lum_size))
2921 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2922 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2925 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2926 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2927 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2928 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2930 /* we can use lov_mds_md_size() to compute lum_size
2931 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2932 if (lum.lmm_stripe_count > 0) {
2933 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2934 OBD_ALLOC(lumk, lum_size);
2938 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2940 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2942 lmm_objects = &(lumk->lmm_objects[0]);
2943 lmm_objects->l_ost_oi = lsm->lsm_oi;
2945 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2949 lumk->lmm_oi = lsm->lsm_oi;
2950 lumk->lmm_stripe_count = 1;
2952 if (copy_to_user(lump, lumk, lum_size))
2956 OBD_FREE(lumk, lum_size);
2962 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2963 void *karg, void *uarg)
2965 struct obd_device *obd = exp->exp_obd;
2966 struct obd_ioctl_data *data = karg;
2970 if (!try_module_get(THIS_MODULE)) {
2971 CERROR("Can't get module. Is it alive?");
2975 case OBD_IOC_LOV_GET_CONFIG: {
2977 struct lov_desc *desc;
2978 struct obd_uuid uuid;
2982 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2983 GOTO(out, err = -EINVAL);
2985 data = (struct obd_ioctl_data *)buf;
2987 if (sizeof(*desc) > data->ioc_inllen1) {
2988 obd_ioctl_freedata(buf, len);
2989 GOTO(out, err = -EINVAL);
2992 if (data->ioc_inllen2 < sizeof(uuid)) {
2993 obd_ioctl_freedata(buf, len);
2994 GOTO(out, err = -EINVAL);
2997 desc = (struct lov_desc *)data->ioc_inlbuf1;
2998 desc->ld_tgt_count = 1;
2999 desc->ld_active_tgt_count = 1;
3000 desc->ld_default_stripe_count = 1;
3001 desc->ld_default_stripe_size = 0;
3002 desc->ld_default_stripe_offset = 0;
3003 desc->ld_pattern = 0;
3004 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3006 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3008 err = copy_to_user((void *)uarg, buf, len);
3011 obd_ioctl_freedata(buf, len);
3014 case LL_IOC_LOV_SETSTRIPE:
3015 err = obd_alloc_memmd(exp, karg);
3019 case LL_IOC_LOV_GETSTRIPE:
3020 err = osc_getstripe(karg, uarg);
3022 case OBD_IOC_CLIENT_RECOVER:
3023 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3024 data->ioc_inlbuf1, 0);
3028 case IOC_OSC_SET_ACTIVE:
3029 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3032 case OBD_IOC_POLL_QUOTACHECK:
3033 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3035 case OBD_IOC_PING_TARGET:
3036 err = ptlrpc_obd_ping(obd);
3039 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3040 cmd, current_comm());
3041 GOTO(out, err = -ENOTTY);
3044 module_put(THIS_MODULE);
3048 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3049 obd_count keylen, void *key, __u32 *vallen, void *val,
3050 struct lov_stripe_md *lsm)
3053 if (!vallen || !val)
3056 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3057 __u32 *stripe = val;
3058 *vallen = sizeof(*stripe);
3061 } else if (KEY_IS(KEY_LAST_ID)) {
3062 struct ptlrpc_request *req;
3067 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3068 &RQF_OST_GET_INFO_LAST_ID);
3072 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3073 RCL_CLIENT, keylen);
3074 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3076 ptlrpc_request_free(req);
3080 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3081 memcpy(tmp, key, keylen);
3083 req->rq_no_delay = req->rq_no_resend = 1;
3084 ptlrpc_request_set_replen(req);
3085 rc = ptlrpc_queue_wait(req);
3089 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3091 GOTO(out, rc = -EPROTO);
3093 *((obd_id *)val) = *reply;
3095 ptlrpc_req_finished(req);
3097 } else if (KEY_IS(KEY_FIEMAP)) {
3098 struct ll_fiemap_info_key *fm_key =
3099 (struct ll_fiemap_info_key *)key;
3100 struct ldlm_res_id res_id;
3101 ldlm_policy_data_t policy;
3102 struct lustre_handle lockh;
3103 ldlm_mode_t mode = 0;
3104 struct ptlrpc_request *req;
3105 struct ll_user_fiemap *reply;
3109 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3112 policy.l_extent.start = fm_key->fiemap.fm_start &
3115 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3116 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3117 policy.l_extent.end = OBD_OBJECT_EOF;
3119 policy.l_extent.end = (fm_key->fiemap.fm_start +
3120 fm_key->fiemap.fm_length +
3121 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3123 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3124 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3125 LDLM_FL_BLOCK_GRANTED |
3127 &res_id, LDLM_EXTENT, &policy,
3128 LCK_PR | LCK_PW, &lockh, 0);
3129 if (mode) { /* lock is cached on client */
3130 if (mode != LCK_PR) {
3131 ldlm_lock_addref(&lockh, LCK_PR);
3132 ldlm_lock_decref(&lockh, LCK_PW);
3134 } else { /* no cached lock, needs acquire lock on server side */
3135 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3136 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3140 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3141 &RQF_OST_GET_INFO_FIEMAP);
3143 GOTO(drop_lock, rc = -ENOMEM);
3145 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3146 RCL_CLIENT, keylen);
3147 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3148 RCL_CLIENT, *vallen);
3149 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3150 RCL_SERVER, *vallen);
3152 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3154 ptlrpc_request_free(req);
3155 GOTO(drop_lock, rc);
3158 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3159 memcpy(tmp, key, keylen);