X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=5b11d5d1eda35652d0c8cf98d80a6ab6359d5a19;hp=580a0589d144662fa563a71e8fbc3c488b6125f3;hb=f843facff59226d3788d855d1d6948523ab8d944;hpb=997a9f659e01b807997641a321241369a27c4293 diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 580a058..5b11d5d 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -36,73 +32,72 @@ #define DEBUG_SUBSYSTEM S_MDC -#ifdef __KERNEL__ -# include -# include -#else -# include -#endif +#include #include #include #include -#include /* fid_res_name_eq() */ +#include +#include #include #include #include +#include +#include + #include "mdc_internal.h" struct mdc_getattr_args { - struct obd_export *ga_exp; - struct md_enqueue_info *ga_minfo; - struct ldlm_enqueue_info *ga_einfo; + struct obd_export *ga_exp; + struct md_enqueue_info *ga_minfo; }; int it_open_error(int phase, struct lookup_intent *it) { if (it_disposition(it, DISP_OPEN_LEASE)) { if (phase >= DISP_OPEN_LEASE) - return it->d.lustre.it_status; + return it->it_status; + else + return 0; + } + if (it_disposition(it, DISP_OPEN_OPEN)) { + if (phase >= DISP_OPEN_OPEN) + return it->it_status; else return 0; } - if (it_disposition(it, DISP_OPEN_OPEN)) { - if (phase >= DISP_OPEN_OPEN) - return it->d.lustre.it_status; - else - return 0; - } - if (it_disposition(it, DISP_OPEN_CREATE)) { - if (phase >= DISP_OPEN_CREATE) - return it->d.lustre.it_status; - else - return 0; - } + if (it_disposition(it, DISP_OPEN_CREATE)) { + if (phase >= DISP_OPEN_CREATE) + return it->it_status; + else + return 0; + } - if (it_disposition(it, DISP_LOOKUP_EXECD)) { - if (phase >= DISP_LOOKUP_EXECD) - return it->d.lustre.it_status; - else - return 0; - } + if (it_disposition(it, DISP_LOOKUP_EXECD)) { + if (phase >= DISP_LOOKUP_EXECD) + return it->it_status; + else + return 0; + } + + if (it_disposition(it, DISP_IT_EXECD)) { + if (phase >= DISP_IT_EXECD) + return it->it_status; + else + return 0; + } + + CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status); + LBUG(); - if (it_disposition(it, DISP_IT_EXECD)) { - if (phase >= DISP_IT_EXECD) - return it->d.lustre.it_status; - else - return 0; - } - CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition, - it->d.lustre.it_status); - LBUG(); return 0; } EXPORT_SYMBOL(it_open_error); /* this must be called on a lockh that is known to have a referenced lock */ -int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, - __u64 *bits) +int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh, + void *data, __u64 *bits) { struct ldlm_lock *lock; struct inode *new_inode = data; @@ -111,14 +106,13 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, if(bits) *bits = 0; - if (!*lockh) - RETURN(0); + if (!lustre_handle_is_used(lockh)) + RETURN(0); - lock = ldlm_handle2lock((struct lustre_handle *)lockh); + lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); lock_res_and_lock(lock); -#ifdef __KERNEL__ if (lock->l_resource->lr_lvb_inode && lock->l_resource->lr_lvb_inode != data) { struct inode *old_inode = lock->l_resource->lr_lvb_inode; @@ -129,7 +123,6 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, old_inode->i_state, new_inode, new_inode->i_ino, new_inode->i_generation); } -#endif lock->l_resource->lr_lvb_inode = new_inode; if (bits) *bits = lock->l_policy_data.l_inodebits.bits; @@ -140,13 +133,13 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, RETURN(0); } -ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags, - const struct lu_fid *fid, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags, + const struct lu_fid *fid, enum ldlm_type type, + union ldlm_policy_data *policy, + enum ldlm_mode mode, struct lustre_handle *lockh) { struct ldlm_res_id res_id; - ldlm_mode_t rc; + enum ldlm_mode rc; ENTRY; fid_build_reg_res_name(fid, &res_id); @@ -157,23 +150,20 @@ ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags, RETURN(rc); } -int mdc_cancel_unused(struct obd_export *exp, - const struct lu_fid *fid, - ldlm_policy_data_t *policy, - ldlm_mode_t mode, - ldlm_cancel_flags_t flags, - void *opaque) +int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, + union ldlm_policy_data *policy, enum ldlm_mode mode, + enum ldlm_cancel_flags flags, void *opaque) { - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - int rc; + struct obd_device *obd = class_exp2obd(exp); + struct ldlm_res_id res_id; + int rc; - ENTRY; + ENTRY; - fid_build_reg_res_name(fid, &res_id); - rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id, - policy, mode, flags, opaque); - RETURN(rc); + fid_build_reg_res_name(fid, &res_id); + rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id, + policy, mode, flags, opaque); + RETURN(rc); } int mdc_null_inode(struct obd_export *exp, @@ -200,28 +190,6 @@ int mdc_null_inode(struct obd_export *exp, RETURN(0); } -/* find any ldlm lock of the inode in mdc - * return 0 not find - * 1 find one - * < 0 error */ -int mdc_find_cbdata(struct obd_export *exp, - const struct lu_fid *fid, - ldlm_iterator_t it, void *data) -{ - struct ldlm_res_id res_id; - int rc = 0; - ENTRY; - - fid_build_reg_res_name((struct lu_fid*)fid, &res_id); - rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id, - it, data); - if (rc == LDLM_ITER_STOP) - RETURN(1); - else if (rc == LDLM_ITER_CONTINUE) - RETURN(0); - RETURN(rc); -} - static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) { /* Don't hold error requests for replay. */ @@ -247,49 +215,70 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * original open if the MDS crashed just when this client also OOM'd) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ -static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body) +int mdc_save_lovea(struct ptlrpc_request *req, + const struct req_msg_field *field, + void *data, u32 size) { - int rc; + struct req_capsule *pill = &req->rq_pill; + struct lov_user_md *lmm; + int rc = 0; - /* FIXME: remove this explicit offset. */ - rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, - body->mbo_eadatasize); - if (rc) { - CERROR("Can't enlarge segment %d size to %d\n", - DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize); - body->mbo_valid &= ~OBD_MD_FLEASIZE; - body->mbo_eadatasize = 0; + if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) { + rc = sptlrpc_cli_enlarge_reqbuf(req, field, size); + if (rc) { + CERROR("%s: Can't enlarge ea size to %d: rc = %d\n", + req->rq_export->exp_obd->obd_name, + size, rc); + return rc; + } + } else { + req_capsule_shrink(pill, field, size, RCL_CLIENT); + } + + req_capsule_set_size(pill, field, RCL_CLIENT, size); + lmm = req_capsule_client_get(pill, field); + if (lmm) { + memcpy(lmm, data, size); + /* overwrite layout generation returned from the MDS */ + lmm->lmm_stripe_offset = + (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT; } + + return rc; } static struct ptlrpc_request * mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *op_data) + struct md_op_data *op_data, __u32 acl_bufsize) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); struct ldlm_intent *lit; const void *lmm = op_data->op_data; - int lmmsize = op_data->op_data_size; + __u32 lmmsize = op_data->op_data_size; + __u32 mdt_md_capsule_size; struct list_head cancels = LIST_HEAD_INIT(cancels); int count = 0; - int mode; + enum ldlm_mode mode; int rc; - ENTRY; + int repsize, repsize_estimate; + + ENTRY; + + mdt_md_capsule_size = obddev->u.cli.cl_default_mds_easize; - it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; + it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; - /* XXX: openlock is not cancelled for cross-refs. */ - /* If inode is known, cancel conflicting OPEN locks. */ + /* XXX: openlock is not cancelled for cross-refs. */ + /* If inode is known, cancel conflicting OPEN locks. */ if (fid_is_sane(&op_data->op_fid2)) { if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */ - if (it->it_flags & FMODE_WRITE) + if (it->it_flags & MDS_FMODE_WRITE) mode = LCK_EX; else mode = LCK_PR; } else { - if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC)) mode = LCK_CW; #ifdef FMODE_EXEC else if (it->it_flags & FMODE_EXEC) @@ -319,16 +308,33 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, RETURN(ERR_PTR(-ENOMEM)); } - /* parent capability */ - mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); - /* child capability, reserve the size according to parent capa, it will - * be filled after we get the reply */ - mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1); - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, op_data->op_namelen + 1); - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, + if (cl_is_lov_delay_create(it->it_flags)) { + /* open(O_LOV_DELAY_CREATE) won't pack lmm */ + LASSERT(lmmsize == 0); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); + } else { + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, max(lmmsize, obddev->u.cli.cl_default_mds_easize)); + } + + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT, op_data->op_file_secctx_name != NULL ? + op_data->op_file_secctx_name_size : 0); + + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT, + op_data->op_file_secctx_size); + + /* get SELinux policy info if any */ + rc = sptlrpc_get_sepol(req); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT, + strlen(req->rq_sepol) ? + strlen(req->rq_sepol) + 1 : 0); rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc < 0) { @@ -349,16 +355,78 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, lmmsize); req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_max_mds_easize); + mdt_md_capsule_size); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + + if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN && + req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT) && + op_data->op_file_secctx_name_size > 0 && + op_data->op_file_secctx_name != NULL) { + char *secctx_name; + + secctx_name = req_capsule_client_get(&req->rq_pill, + &RMF_FILE_SECCTX_NAME); + memcpy(secctx_name, op_data->op_file_secctx_name, + op_data->op_file_secctx_name_size); + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, + obddev->u.cli.cl_max_mds_easize); + + CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n", + op_data->op_file_secctx_name_size, + op_data->op_file_secctx_name); - /* for remote client, fetch remote perm for current user */ - if (client_is_remote(exp)) - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - sizeof(struct mdt_remote_perm)); - ptlrpc_request_set_replen(req); - return req; + } else { + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, 0); + } + + /** + * Inline buffer for possible data from Data-on-MDT files. + */ + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER, + sizeof(struct niobuf_remote)); + ptlrpc_request_set_replen(req); + + /* Get real repbuf allocated size as rounded up power of 2 */ + repsize = size_roundup_power2(req->rq_replen + + lustre_msg_early_size()); + /* Estimate free space for DoM files in repbuf */ + repsize_estimate = repsize - (req->rq_replen - + mdt_md_capsule_size + + sizeof(struct lov_comp_md_v1) + + sizeof(struct lov_comp_md_entry_v1) + + lov_mds_md_size(0, LOV_MAGIC_V3)); + + if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) { + repsize = obddev->u.cli.cl_dom_min_inline_repsize - + repsize_estimate + sizeof(struct niobuf_remote); + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, + RCL_SERVER, + sizeof(struct niobuf_remote) + repsize); + ptlrpc_request_set_replen(req); + CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n", + repsize, req->rq_replen); + repsize = size_roundup_power2(req->rq_replen + + lustre_msg_early_size()); + } + /* The only way to report real allocated repbuf size to the server + * is the lm_repsize but it must be set prior buffer allocation itself + * due to security reasons - it is part of buffer used in signature + * calculation (see LU-11414). Therefore the saved size is predicted + * value as rq_replen rounded to the next higher power of 2. + * Such estimation is safe. Though the final allocated buffer might + * be even larger, it is not possible to know that at this point. + */ + req->rq_reqmsg->lm_repsize = repsize; + RETURN(req); } +#define GA_DEFAULT_EA_NAME_LEN 20 +#define GA_DEFAULT_EA_VAL_LEN 250 +#define GA_DEFAULT_EA_NUM 10 + static struct ptlrpc_request * mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it, @@ -366,8 +434,9 @@ mdc_intent_getxattr_pack(struct obd_export *exp, { struct ptlrpc_request *req; struct ldlm_intent *lit; - int rc, count = 0, maxdata; + int rc, count = 0; struct list_head cancels = LIST_HEAD_INIT(cancels); + u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM; ENTRY; @@ -376,7 +445,15 @@ mdc_intent_getxattr_pack(struct obd_export *exp, if (req == NULL) RETURN(ERR_PTR(-ENOMEM)); - mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + /* get SELinux policy info if any */ + rc = sptlrpc_get_sepol(req); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT, + strlen(req->rq_sepol) ? + strlen(req->rq_sepol) + 1 : 0); rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc) { @@ -387,127 +464,135 @@ mdc_intent_getxattr_pack(struct obd_export *exp, /* pack the intent */ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); lit->opc = IT_GETXATTR; - - maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize; + CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1)); + +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0) + /* If the supplied buffer is too small then the server will + * return -ERANGE and llite will fallback to using non cached + * xattr operations. On servers before 2.10.1 a (non-cached) + * listxattr RPC for an orphan or dead file causes an oops. So + * let's try to avoid sending too small a buffer to too old a + * server. This is effectively undoing the memory conservation + * of LU-9417 when it would be *more* likely to crash the + * server. See LU-9856. */ + if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0)) + ea_vals_buf_size = max_t(u32, ea_vals_buf_size, + exp->exp_connect_data.ocd_max_easize); +#endif /* pack the intended request */ - mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, - op_data->op_valid, maxdata, -1, 0); - - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, - RCL_SERVER, maxdata); - - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, - RCL_SERVER, maxdata); - - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, - RCL_SERVER, maxdata); - - ptlrpc_request_set_replen(req); - - RETURN(req); -} - -static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_intent *lit; - int rc; - ENTRY; + mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, + ea_vals_buf_size, -1, 0); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_UNLINK); - if (req == NULL) - RETURN(ERR_PTR(-ENOMEM)); + /* get SELinux policy info if any */ + mdc_file_sepol_pack(req); - mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, + GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(ERR_PTR(rc)); - } + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, + ea_vals_buf_size); - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER, + sizeof(u32) * GA_DEFAULT_EA_NUM); - /* pack the intended request */ - mdc_unlink_pack(req, op_data); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_default_mds_easize); - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - obddev->u.cli.cl_default_mds_cookiesize); ptlrpc_request_set_replen(req); + RETURN(req); } -static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) +static struct ptlrpc_request * +mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it, + struct md_op_data *op_data, __u32 acl_bufsize) { struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | - OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | - OBD_MD_FLMDSCAPA | OBD_MD_MEA | - (client_is_remote(exp) ? - OBD_MD_FLRMTPERM : OBD_MD_FLACL); - struct ldlm_intent *lit; - int rc; - int easize; + struct obd_device *obddev = class_exp2obd(exp); + u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE | + OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL | + OBD_MD_DEFAULT_MEA; + struct ldlm_intent *lit; + __u32 easize; + bool have_secctx = false; + int rc; + ENTRY; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_GETATTR); - if (req == NULL) - RETURN(ERR_PTR(-ENOMEM)); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_GETATTR); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); - mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); + /* send name of security xattr to get upon intent */ + if (it->it_op & (IT_LOOKUP | IT_GETATTR) && + req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT) && + op_data->op_file_secctx_name_size > 0 && + op_data->op_file_secctx_name != NULL) { + have_secctx = true; + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT, + op_data->op_file_secctx_name_size); + } - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(ERR_PTR(rc)); - } + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; - if (obddev->u.cli.cl_default_mds_easize > 0) - easize = obddev->u.cli.cl_default_mds_easize; - else - easize = obddev->u.cli.cl_max_mds_easize; + easize = obddev->u.cli.cl_default_mds_easize; /* pack the intended request */ mdc_getattr_pack(req, valid, it->it_flags, op_data, easize); req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize); - if (client_is_remote(exp)) - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - sizeof(struct mdt_remote_perm)); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER, + sizeof(struct lmv_user_md)); + + if (have_secctx) { + char *secctx_name; + + secctx_name = req_capsule_client_get(&req->rq_pill, + &RMF_FILE_SECCTX_NAME); + memcpy(secctx_name, op_data->op_file_secctx_name, + op_data->op_file_secctx_name_size); + + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, easize); + + CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n", + op_data->op_file_secctx_name_size, + op_data->op_file_secctx_name); + } else { + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, 0); + } + ptlrpc_request_set_replen(req); RETURN(req); } static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *unused) + struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); + struct list_head cancels = LIST_HEAD_INIT(cancels); struct ptlrpc_request *req; struct ldlm_intent *lit; struct layout_intent *layout; - int rc; + int count = 0, rc; ENTRY; req = ptlrpc_request_alloc(class_exp2cliimp(exp), @@ -515,8 +600,15 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, if (req == NULL) RETURN(ERR_PTR(-ENOMEM)); + if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) && + (it->it_flags & FMODE_WRITE)) { + count = mdc_resource_get_unused(exp, &op_data->op_fid2, + &cancels, LCK_EX, + MDS_INODELOCK_LAYOUT); + } + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc) { ptlrpc_request_free(req); RETURN(ERR_PTR(rc)); @@ -528,9 +620,9 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, /* pack the layout intent request */ layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT); - /* LAYOUT_INTENT_ACCESS is generic, specific operation will be - * set for replication */ - layout->li_opc = LAYOUT_INTENT_ACCESS; + LASSERT(op_data->op_data != NULL); + LASSERT(op_data->op_data_size == sizeof(*layout)); + memcpy(layout, op_data->op_data, sizeof(*layout)); req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, obd->u.cli.cl_default_mds_easize); @@ -570,10 +662,11 @@ static int mdc_finish_enqueue(struct obd_export *exp, struct req_capsule *pill = &req->rq_pill; struct ldlm_request *lockreq; struct ldlm_reply *lockrep; - struct lustre_intent_data *intent = &it->d.lustre; struct ldlm_lock *lock; + struct mdt_body *body = NULL; void *lvb_data = NULL; - int lvb_len = 0; + __u32 lvb_len = 0; + ENTRY; LASSERT(rc >= 0); @@ -605,16 +698,16 @@ static int mdc_finish_enqueue(struct obd_export *exp, lockrep = req_capsule_server_get(pill, &RMF_DLM_REP); LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */ - intent->it_disposition = (int)lockrep->lock_policy_res1; - intent->it_status = (int)lockrep->lock_policy_res2; - intent->it_lock_mode = einfo->ei_mode; - intent->it_lock_handle = lockh->cookie; - intent->it_data = req; + it->it_disposition = (int)lockrep->lock_policy_res1; + it->it_status = (int)lockrep->lock_policy_res2; + it->it_lock_mode = einfo->ei_mode; + it->it_lock_handle = lockh->cookie; + it->it_request = req; /* Technically speaking rq_transno must already be zero if * it_status is in error, so the check is a bit redundant */ - if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay) - mdc_clear_replay_flag(req, intent->it_status); + if ((!req->rq_transno || it->it_status < 0) && req->rq_replay) + mdc_clear_replay_flag(req, it->it_status); /* If we're doing an IT_OPEN which did not result in an actual * successful open, then we need to remove the bit which saves @@ -623,17 +716,15 @@ static int mdc_finish_enqueue(struct obd_export *exp, * It's important that we do this first! Otherwise we might exit the * function without doing so, and try to replay a failed create * (bug 3440) */ - if (it->it_op & IT_OPEN && req->rq_replay && - (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0)) - mdc_clear_replay_flag(req, intent->it_status); + if (it->it_op & IT_OPEN && req->rq_replay && + (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0)) + mdc_clear_replay_flag(req, it->it_status); - DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d", - it->it_op, intent->it_disposition, intent->it_status); - - /* We know what to expect, so we do any byte flipping required here */ - if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) { - struct mdt_body *body; + DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d", + it->it_op, it->it_disposition, it->it_status); + /* We know what to expect, so we do any byte flipping required here */ + if (it_has_reply_body(it)) { body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (body == NULL) { CERROR ("Can't swab mdt_body\n"); @@ -680,82 +771,58 @@ static int mdc_finish_enqueue(struct obd_export *exp, * (for example error one). */ if ((it->it_op & IT_OPEN) && req->rq_replay) { - void *lmm; - if (req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT) < - body->mbo_eadatasize) - mdc_realloc_openmsg(req, body); - else - req_capsule_shrink(pill, &RMF_EADATA, - body->mbo_eadatasize, - RCL_CLIENT); - - req_capsule_set_size(pill, &RMF_EADATA, - RCL_CLIENT, - body->mbo_eadatasize); - - lmm = req_capsule_client_get(pill, &RMF_EADATA); - if (lmm) - memcpy(lmm, eadata, - body->mbo_eadatasize); + rc = mdc_save_lovea(req, &RMF_EADATA, eadata, + body->mbo_eadatasize); + if (rc) { + body->mbo_valid &= ~OBD_MD_FLEASIZE; + body->mbo_eadatasize = 0; + rc = 0; + } } } - - if (body->mbo_valid & OBD_MD_FLRMTPERM) { - struct mdt_remote_perm *perm; - - LASSERT(client_is_remote(exp)); - perm = req_capsule_server_swab_get(pill, &RMF_ACL, - lustre_swab_mdt_remote_perm); - if (perm == NULL) - RETURN(-EPROTO); - } - if (body->mbo_valid & OBD_MD_FLMDSCAPA) { - struct lustre_capa *capa, *p; - - capa = req_capsule_server_get(pill, &RMF_CAPA1); - if (capa == NULL) - RETURN(-EPROTO); - - if (it->it_op & IT_OPEN) { - /* client fid capa will be checked in replay */ - p = req_capsule_client_get(pill, &RMF_CAPA2); - LASSERT(p); - *p = *capa; - } - } - if (body->mbo_valid & OBD_MD_FLOSSCAPA) { - struct lustre_capa *capa; - - capa = req_capsule_server_get(pill, &RMF_CAPA2); - if (capa == NULL) - RETURN(-EPROTO); - } - } else if (it->it_op & IT_LAYOUT) { + } else if (it->it_op & IT_LAYOUT) { /* maybe the lock was granted right away and layout * is packed into RMF_DLM_LVB of req */ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); + CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n", + class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno); if (lvb_len > 0) { lvb_data = req_capsule_server_sized_get(pill, &RMF_DLM_LVB, lvb_len); if (lvb_data == NULL) RETURN(-EPROTO); + + /** + * save replied layout data to the request buffer for + * recovery consideration (lest MDS reinitialize + * another set of OST objects). + */ + if (req->rq_transno) + (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data, + lvb_len); } } - /* fill in stripe data for layout lock */ + /* fill in stripe data for layout lock. + * LU-6581: trust layout data only if layout lock is granted. The MDT + * has stopped sending layout unless the layout lock is granted. The + * client still does this checking in case it's talking with an old + * server. - Jinshan */ lock = ldlm_handle2lock(lockh); - if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) { + if (lock == NULL) + RETURN(rc); + + if (ldlm_has_layout(lock) && lvb_data != NULL && + !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) { void *lmm; - LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n", + LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d", ldlm_it2str(it->it_op), lvb_len); OBD_ALLOC_LARGE(lmm, lvb_len); - if (lmm == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-ENOMEM); - } + if (lmm == NULL) + GOTO(out_lock, rc = -ENOMEM); + memcpy(lmm, lvb_data, lvb_len); /* install lvb_data */ @@ -770,74 +837,98 @@ static int mdc_finish_enqueue(struct obd_export *exp, if (lmm != NULL) OBD_FREE_LARGE(lmm, lvb_len); } - if (lock != NULL) - LDLM_LOCK_PUT(lock); + + if (ldlm_has_dom(lock)) { + LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast); + + body = req_capsule_server_get(pill, &RMF_MDT_BODY); + if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) { + LDLM_ERROR(lock, "%s: DoM lock without size.", + exp->exp_obd->obd_name); + GOTO(out_lock, rc = -EPROTO); + } + + LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu", + ldlm_it2str(it->it_op), body->mbo_dom_size); + + rc = mdc_fill_lvb(req, &lock->l_ost_lvb); + } +out_lock: + LDLM_LOCK_PUT(lock); RETURN(rc); } /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, - struct ldlm_enqueue_info *einfo, - const union ldlm_policy_data *policy, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, __u64 extra_lock_flags) +static int mdc_enqueue_base(struct obd_export *exp, + struct ldlm_enqueue_info *einfo, + const union ldlm_policy_data *policy, + struct lookup_intent *it, + struct md_op_data *op_data, + struct lustre_handle *lockh, + __u64 extra_lock_flags) { - struct obd_device *obddev = class_exp2obd(exp); - struct ptlrpc_request *req = NULL; - __u64 flags, saved_flags = extra_lock_flags; - int rc; - struct ldlm_res_id res_id; - static const ldlm_policy_data_t lookup_policy = - { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; - static const ldlm_policy_data_t update_policy = - { .l_inodebits = { MDS_INODELOCK_UPDATE } }; - static const ldlm_policy_data_t layout_policy = - { .l_inodebits = { MDS_INODELOCK_LAYOUT } }; - static const ldlm_policy_data_t getxattr_policy = { - .l_inodebits = { MDS_INODELOCK_XATTR } }; - int generation, resends = 0; - struct ldlm_reply *lockrep; - enum lvb_type lvb_type = 0; - ENTRY; + struct obd_device *obddev = class_exp2obd(exp); + struct ptlrpc_request *req = NULL; + __u64 flags, saved_flags = extra_lock_flags; + struct ldlm_res_id res_id; + static const union ldlm_policy_data lookup_policy = { + .l_inodebits = { MDS_INODELOCK_LOOKUP } }; + static const union ldlm_policy_data update_policy = { + .l_inodebits = { MDS_INODELOCK_UPDATE } }; + static const union ldlm_policy_data layout_policy = { + .l_inodebits = { MDS_INODELOCK_LAYOUT } }; + static const union ldlm_policy_data getxattr_policy = { + .l_inodebits = { MDS_INODELOCK_XATTR } }; + int generation, resends = 0; + struct ldlm_reply *lockrep; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize; + enum lvb_type lvb_type = 0; + int rc; + ENTRY; - LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", - einfo->ei_type); - fid_build_reg_res_name(&op_data->op_fid1, &res_id); + LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", + einfo->ei_type); + fid_build_reg_res_name(&op_data->op_fid1, &res_id); if (it != NULL) { LASSERT(policy == NULL); saved_flags |= LDLM_FL_HAS_INTENT; - if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR)) + if (it->it_op & (IT_GETATTR | IT_READDIR)) policy = &update_policy; else if (it->it_op & IT_LAYOUT) policy = &layout_policy; - else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) + else if (it->it_op & IT_GETXATTR) policy = &getxattr_policy; else policy = &lookup_policy; } - generation = obddev->u.cli.cl_import->imp_generation; + generation = obddev->u.cli.cl_import->imp_generation; + if (!it || (it->it_op & (IT_OPEN | IT_CREAT))) + acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); + else + acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + resend: - flags = saved_flags; + flags = saved_flags; if (it == NULL) { /* The only way right now is FLOCK. */ LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", einfo->ei_type); res_id.name[3] = LDLM_FLOCK; } else if (it->it_op & IT_OPEN) { - req = mdc_intent_open_pack(exp, it, op_data); - } else if (it->it_op & IT_UNLINK) { - req = mdc_intent_unlink_pack(exp, it, op_data); + req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - req = mdc_intent_getattr_pack(exp, it, op_data); + req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & IT_READDIR) { req = mdc_enqueue_pack(exp, 0); } else if (it->it_op & IT_LAYOUT) { - if (!imp_connect_lvb_type(class_exp2cliimp(exp))) + if (!imp_connect_lvb_type(imp)) RETURN(-EOPNOTSUPP); req = mdc_intent_layout_pack(exp, it, op_data); lvb_type = LVB_T_LAYOUT; @@ -851,37 +942,40 @@ resend: if (IS_ERR(req)) RETURN(PTR_ERR(req)); - if (req != NULL && it && it->it_op & IT_CREAT) - /* ask ptlrpc not to resend on EINPROGRESS since we have our own - * retry logic */ - req->rq_no_retry_einprogress = 1; - if (resends) { req->rq_generation_set = 1; req->rq_import_generation = generation; - req->rq_sent = cfs_time_current_sec() + resends; + req->rq_sent = ktime_get_real_seconds() + resends; } - /* It is important to obtain rpc_lock first (if applicable), so that - * threads that are serialised with rpc_lock are not polluting our - * rpcs in flight counter. We do not do flock request limiting, though*/ - if (it) { - mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + /* It is important to obtain modify RPC slot first (if applicable), so + * that threads that are waiting for a modify RPC slot are not polluting + * our rpcs in flight counter. + * We do not do flock request limiting, though */ + if (it) { + mdc_get_mod_rpc_slot(req, it); rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - mdc_clear_replay_flag(req, 0); - ptlrpc_req_finished(req); - RETURN(rc); - } - } + if (rc != 0) { + mdc_put_mod_rpc_slot(req, it); + mdc_clear_replay_flag(req, 0); + ptlrpc_req_finished(req); + RETURN(rc); + } + } + + /* With Data-on-MDT the glimpse callback is needed too. + * It is set here in advance but not in mdc_finish_enqueue() + * to avoid possible races. It is safe to have glimpse handler + * for non-DOM locks and costs nothing.*/ + if (einfo->ei_cb_gl == NULL) + einfo->ei_cb_gl = mdc_ldlm_glimpse_ast; - rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, 0, lvb_type, lockh, 0); - if (!it) { - /* For flock requests we immediatelly return without further - delay and let caller deal with the rest, since rest of - this function metadata processing makes no sense for flock + if (!it) { + /* For flock requests we immediatelly return without further + delay and let caller deal with the rest, since rest of + this function metadata processing makes no sense for flock requests anyway. But in case of problem during comms with Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we can not rely on caller and this mainly for F_UNLCKs @@ -895,11 +989,13 @@ resend: } obd_put_request_slot(&obddev->u.cli); - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_put_mod_rpc_slot(req, it); if (rc < 0) { - CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n", - obddev->obd_name, rc); + CDEBUG(D_INFO, + "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n", + obddev->obd_name, PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2), op_data->op_name ?: "", rc); mdc_clear_replay_flag(req, rc); ptlrpc_req_finished(req); @@ -912,25 +1008,37 @@ resend: lockrep->lock_policy_res2 = ptlrpc_status_ntoh(lockrep->lock_policy_res2); - /* Retry the create infinitely when we get -EINPROGRESS from - * server. This is required by the new quota design. */ - if (it && it->it_op & IT_CREAT && - (int)lockrep->lock_policy_res2 == -EINPROGRESS) { - mdc_clear_replay_flag(req, rc); - ptlrpc_req_finished(req); - resends++; - - CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n", - obddev->obd_name, resends, it->it_op, - PFID(&op_data->op_fid1), PFID(&op_data->op_fid2)); - - if (generation == obddev->u.cli.cl_import->imp_generation) { - goto resend; - } else { + /* Retry infinitely when the server returns -EINPROGRESS for the + * intent operation, when server returns -EINPROGRESS for acquiring + * intent lock, we'll retry in after_reply(). */ + if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) { + mdc_clear_replay_flag(req, rc); + ptlrpc_req_finished(req); + if (generation == obddev->u.cli.cl_import->imp_generation) { + if (signal_pending(current)) + RETURN(-EINTR); + + resends++; + CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n", + obddev->obd_name, resends, it->it_op, + PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2)); + goto resend; + } else { CDEBUG(D_HA, "resend cross eviction\n"); - RETURN(-EIO); - } - } + RETURN(-EIO); + } + } + + if ((int)lockrep->lock_policy_res2 == -ERANGE && + it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) && + acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) { + mdc_clear_replay_flag(req, -ERANGE); + ptlrpc_req_finished(req); + acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); + goto resend; + } rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); if (rc < 0) { @@ -939,10 +1047,24 @@ resend: memset(lockh, 0, sizeof(*lockh)); } ptlrpc_req_finished(req); + + it->it_lock_handle = 0; + it->it_lock_mode = 0; + it->it_request = NULL; } + RETURN(rc); } +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + const union ldlm_policy_data *policy, + struct md_op_data *op_data, + struct lustre_handle *lockh, __u64 extra_lock_flags) +{ + return mdc_enqueue_base(exp, einfo, policy, NULL, + op_data, lockh, extra_lock_flags); +} + static int mdc_finish_intent_lock(struct obd_export *exp, struct ptlrpc_request *request, struct md_op_data *op_data, @@ -950,9 +1072,8 @@ static int mdc_finish_intent_lock(struct obd_export *exp, struct lustre_handle *lockh) { struct lustre_handle old_lock; - struct mdt_body *mdt_body; struct ldlm_lock *lock; - int rc; + int rc = 0; ENTRY; LASSERT(request != NULL); @@ -962,115 +1083,117 @@ static int mdc_finish_intent_lock(struct obd_export *exp, if (it->it_op & IT_READDIR) RETURN(0); - if (!it_disposition(it, DISP_IT_EXECD)) { - /* The server failed before it even started executing the - * intent, i.e. because it couldn't unpack the request. */ - LASSERT(it->d.lustre.it_status != 0); - RETURN(it->d.lustre.it_status); - } - rc = it_open_error(DISP_IT_EXECD, it); - if (rc) - RETURN(rc); - - mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); - LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ - - /* If we were revalidating a fid/name pair, mark the intent in - * case we fail and get called again from lookup */ - if (fid_is_sane(&op_data->op_fid2) && - it->it_create_mode & M_CHECK_STALE && - it->it_op != IT_GETATTR) { - /* Also: did we find the same inode? */ - /* sever can return one of two fids: - * op_fid2 - new allocated fid - if file is created. - * op_fid3 - existent fid - if file only open. - * op_fid3 is saved in lmv_intent_open */ - if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->mbo_fid1)) && - (!lu_fid_eq(&op_data->op_fid3, &mdt_body->mbo_fid1))) { - CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID - "\n", PFID(&op_data->op_fid2), - PFID(&op_data->op_fid2), - PFID(&mdt_body->mbo_fid1)); - RETURN(-ESTALE); - } - } - - rc = it_open_error(DISP_LOOKUP_EXECD, it); - if (rc) - RETURN(rc); - - /* keep requests around for the multiple phases of the call - * this shows the DISP_XX must guarantee we make it into the call - */ - if (!it_disposition(it, DISP_ENQ_CREATE_REF) && - it_disposition(it, DISP_OPEN_CREATE) && - !it_open_error(DISP_OPEN_CREATE, it)) { - it_set_disposition(it, DISP_ENQ_CREATE_REF); - ptlrpc_request_addref(request); /* balanced in ll_create_node */ - } - if (!it_disposition(it, DISP_ENQ_OPEN_REF) && - it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - it_set_disposition(it, DISP_ENQ_OPEN_REF); - ptlrpc_request_addref(request); /* balanced in ll_file_open */ - /* BUG 11546 - eviction in the middle of open rpc processing */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); - } + if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) { + if (it->it_status != 0) + GOTO(out, rc = it->it_status); + } else { + if (!it_disposition(it, DISP_IT_EXECD)) { + /* The server failed before it even started executing + * the intent, i.e. because it couldn't unpack the + * request. + */ + LASSERT(it->it_status != 0); + GOTO(out, rc = it->it_status); + } + rc = it_open_error(DISP_IT_EXECD, it); + if (rc) + GOTO(out, rc); + + rc = it_open_error(DISP_LOOKUP_EXECD, it); + if (rc) + GOTO(out, rc); + + /* keep requests around for the multiple phases of the call + * this shows the DISP_XX must guarantee we make it into the + * call + */ + if (!it_disposition(it, DISP_ENQ_CREATE_REF) && + it_disposition(it, DISP_OPEN_CREATE) && + !it_open_error(DISP_OPEN_CREATE, it)) { + it_set_disposition(it, DISP_ENQ_CREATE_REF); + /* balanced in ll_create_node */ + ptlrpc_request_addref(request); + } + if (!it_disposition(it, DISP_ENQ_OPEN_REF) && + it_disposition(it, DISP_OPEN_OPEN) && + !it_open_error(DISP_OPEN_OPEN, it)) { + it_set_disposition(it, DISP_ENQ_OPEN_REF); + /* balanced in ll_file_open */ + ptlrpc_request_addref(request); + /* BUG 11546 - eviction in the middle of open rpc + * processing + */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, + obd_timeout); + } - if (it->it_op & IT_CREAT) { - /* XXX this belongs in ll_create_it */ - } else if (it->it_op == IT_OPEN) { - LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); - } else { - LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT)); - } + if (it->it_op & IT_CREAT) { + /* XXX this belongs in ll_create_it */ + } else if (it->it_op == IT_OPEN) { + LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); + } else { + LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); + } + } - /* If we already have a matching lock, then cancel the new - * one. We have to set the data here instead of in - * mdc_enqueue, because we need to use the child's inode as - * the l_ast_data to match, and that's not available until - * intent_finish has performed the iget().) */ - lock = ldlm_handle2lock(lockh); - if (lock) { - ldlm_policy_data_t policy = lock->l_policy_data; - LDLM_DEBUG(lock, "matching against this"); - - LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1, - &lock->l_resource->lr_name), - "Lock res_id: "DLDLMRES", fid: "DFID"\n", - PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1)); + /* If we already have a matching lock, then cancel the new + * one. We have to set the data here instead of in + * mdc_enqueue, because we need to use the child's inode as + * the l_ast_data to match, and that's not available until + * intent_finish has performed the iget().) */ + lock = ldlm_handle2lock(lockh); + if (lock) { + union ldlm_policy_data policy = lock->l_policy_data; + LDLM_DEBUG(lock, "matching against this"); + + if (it_has_reply_body(it)) { + struct mdt_body *body; + + body = req_capsule_server_get(&request->rq_pill, + &RMF_MDT_BODY); + /* mdc_enqueue checked */ + LASSERT(body != NULL); + LASSERTF(fid_res_name_eq(&body->mbo_fid1, + &lock->l_resource->lr_name), + "Lock res_id: "DLDLMRES", fid: "DFID"\n", + PLDLMRES(lock->l_resource), + PFID(&body->mbo_fid1)); + } LDLM_LOCK_PUT(lock); memcpy(&old_lock, lockh, sizeof(*lockh)); if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) { - ldlm_lock_decref_and_cancel(lockh, - it->d.lustre.it_lock_mode); - memcpy(lockh, &old_lock, sizeof(old_lock)); - it->d.lustre.it_lock_handle = lockh->cookie; - } - } - CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", - op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op), - it->d.lustre.it_status, it->d.lustre.it_disposition, rc); - RETURN(rc); + ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode); + memcpy(lockh, &old_lock, sizeof(old_lock)); + it->it_lock_handle = lockh->cookie; + } + } + + EXIT; +out: + CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", + (int)op_data->op_namelen, op_data->op_name, + ldlm_it2str(it->it_op), it->it_status, + it->it_disposition, rc); + return rc; } int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid, __u64 *bits) + struct lu_fid *fid, __u64 *bits) { - /* We could just return 1 immediately, but since we should only - * be called in revalidate_it if we already have a lock, let's - * verify that. */ - struct ldlm_res_id res_id; - struct lustre_handle lockh; - ldlm_policy_data_t policy; - ldlm_mode_t mode; - ENTRY; + /* We could just return 1 immediately, but since we should only + * be called in revalidate_it if we already have a lock, let's + * verify that. */ + struct ldlm_res_id res_id; + struct lustre_handle lockh; + union ldlm_policy_data policy; + enum ldlm_mode mode; + ENTRY; - if (it->d.lustre.it_lock_handle) { - lockh.cookie = it->d.lustre.it_lock_handle; - mode = ldlm_revalidate_lock_handle(&lockh, bits); + if (it->it_lock_handle) { + lockh.cookie = it->it_lock_handle; + mode = ldlm_revalidate_lock_handle(&lockh, bits); } else { fid_build_reg_res_name(fid, &res_id); switch (it->it_op) { @@ -1096,13 +1219,13 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, case IT_READDIR: policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; break; - case IT_LAYOUT: - policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; - break; - default: - policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; - break; - } + case IT_LAYOUT: + policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; + break; + default: + policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; + break; + } mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid, LDLM_IBITS, &policy, @@ -1110,15 +1233,15 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, &lockh); } - if (mode) { - it->d.lustre.it_lock_handle = lockh.cookie; - it->d.lustre.it_lock_mode = mode; - } else { - it->d.lustre.it_lock_handle = 0; - it->d.lustre.it_lock_mode = 0; - } + if (mode) { + it->it_lock_handle = lockh.cookie; + it->it_lock_mode = mode; + } else { + it->it_lock_handle = 0; + it->it_lock_mode = 0; + } - RETURN(!!mode); + RETURN(!!mode); } /* @@ -1137,15 +1260,15 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, * ll_create/ll_open gets called. * * The server will return to us, in it_disposition, an indication of - * exactly what d.lustre.it_status refers to. + * exactly what it_status refers to. * - * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call, + * If DISP_OPEN_OPEN is set, then it_status refers to the open() call, * otherwise if DISP_OPEN_CREATE is set, then it status is the * creation failure mode. In either case, one of DISP_LOOKUP_NEG or * DISP_LOOKUP_POS will be set, indicating whether the child lookup * was successful. * - * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the + * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the * child lookup. */ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, @@ -1157,6 +1280,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, .ei_mode = it_to_lock_mode(it), .ei_cb_bl = cb_blocking, .ei_cb_cp = ldlm_completion_ast, + .ei_cb_gl = mdc_ldlm_glimpse_ast, }; struct lustre_handle lockh; int rc = 0; @@ -1164,7 +1288,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, LASSERT(it); CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID - ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen, + ", intent: %s flags %#llo\n", (int)op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid2), PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); @@ -1175,7 +1299,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ - it->d.lustre.it_lock_handle = 0; + it->it_lock_handle = 0; rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL); /* Only return failure if it was not GETATTR by cfid (from inode_revalidate) */ @@ -1192,30 +1316,30 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, } } - rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh, - extra_lock_flags); + rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh, + extra_lock_flags); if (rc < 0) RETURN(rc); - *reqp = it->d.lustre.it_data; + *reqp = it->it_request; rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh); RETURN(rc); } static int mdc_intent_getattr_async_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *args, int rc) + struct ptlrpc_request *req, + void *args, int rc) { - struct mdc_getattr_args *ga = args; - struct obd_export *exp = ga->ga_exp; - struct md_enqueue_info *minfo = ga->ga_minfo; - struct ldlm_enqueue_info *einfo = ga->ga_einfo; - struct lookup_intent *it; - struct lustre_handle *lockh; - struct obd_device *obddev; - struct ldlm_reply *lockrep; - __u64 flags = LDLM_FL_HAS_INTENT; - ENTRY; + struct mdc_getattr_args *ga = args; + struct obd_export *exp = ga->ga_exp; + struct md_enqueue_info *minfo = ga->ga_minfo; + struct ldlm_enqueue_info *einfo = &minfo->mi_einfo; + struct lookup_intent *it; + struct lustre_handle *lockh; + struct obd_device *obddev; + struct ldlm_reply *lockrep; + __u64 flags = LDLM_FL_HAS_INTENT; + ENTRY; it = &minfo->mi_it; lockh = &minfo->mi_lockh; @@ -1248,64 +1372,65 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, EXIT; out: - OBD_FREE_PTR(einfo); minfo->mi_cb(req, minfo, rc); return 0; } int mdc_intent_getattr_async(struct obd_export *exp, - struct md_enqueue_info *minfo, - struct ldlm_enqueue_info *einfo) + struct md_enqueue_info *minfo) { - struct md_op_data *op_data = &minfo->mi_data; - struct lookup_intent *it = &minfo->mi_it; - struct ptlrpc_request *req; - struct mdc_getattr_args *ga; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id; - /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed - * for statahead currently. Consider CMD in future, such two bits - * maybe managed by different MDS, should be adjusted then. */ - ldlm_policy_data_t policy = { - .l_inodebits = { MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_UPDATE } - }; - int rc = 0; - __u64 flags = LDLM_FL_HAS_INTENT; + struct md_op_data *op_data = &minfo->mi_data; + struct lookup_intent *it = &minfo->mi_it; + struct ptlrpc_request *req; + struct mdc_getattr_args *ga; + struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_res_id res_id; + union ldlm_policy_data policy = { + .l_inodebits = { MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE } }; + int rc = 0; + __u64 flags = LDLM_FL_HAS_INTENT; ENTRY; - CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#" - LPF64"o\n", - op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), - ldlm_it2str(it->it_op), it->it_flags); + CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n", + (int)op_data->op_namelen, op_data->op_name, + PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); fid_build_reg_res_name(&op_data->op_fid1, &res_id); - req = mdc_intent_getattr_pack(exp, it, op_data); + /* If the MDT return -ERANGE because of large ACL, then the sponsor + * of the async getattr RPC will handle that by itself. */ + req = mdc_intent_getattr_pack(exp, it, op_data, + LUSTRE_POSIX_ACL_MAX_SIZE_OLD); if (IS_ERR(req)) RETURN(PTR_ERR(req)); rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - ptlrpc_req_finished(req); - RETURN(rc); - } + if (rc != 0) { + ptlrpc_req_finished(req); + RETURN(rc); + } - rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, - 0, LVB_T_NONE, &minfo->mi_lockh, 1); - if (rc < 0) { + /* With Data-on-MDT the glimpse callback is needed too. + * It is set here in advance but not in mdc_finish_enqueue() + * to avoid possible races. It is safe to have glimpse handler + * for non-DOM locks and costs nothing.*/ + if (minfo->mi_einfo.ei_cb_gl == NULL) + minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast; + + rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy, + &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); + if (rc < 0) { obd_put_request_slot(&obddev->u.cli); - ptlrpc_req_finished(req); - RETURN(rc); - } + ptlrpc_req_finished(req); + RETURN(rc); + } - CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args)); - ga = ptlrpc_req_async_args(req); - ga->ga_exp = exp; - ga->ga_minfo = minfo; - ga->ga_einfo = einfo; + ga = ptlrpc_req_async_args(ga, req); + ga->ga_exp = exp; + ga->ga_minfo = minfo; - req->rq_interpret_reply = mdc_intent_getattr_async_interpret; - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + req->rq_interpret_reply = mdc_intent_getattr_async_interpret; + ptlrpcd_add_req(req); - RETURN(0); + RETURN(0); }