X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=32587cfa60df4a4e7be3b3e878fbf32787fa4494;hp=8c3cacdfa8299f9bc7e58500f246fb27ca88cc29;hb=511ea5850f2553d14891aeda7972d0526c67a3af;hpb=40b5a643d51b678b7a0e9970c80740cea04b4837 diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 8c3cacd..32587cf 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -47,6 +43,7 @@ #include #include #include +#include #include "mdc_internal.h" @@ -99,8 +96,8 @@ int it_open_error(int phase, struct lookup_intent *it) EXPORT_SYMBOL(it_open_error); /* this must be called on a lockh that is known to have a referenced lock */ -int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, - __u64 *bits) +int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh, + void *data, __u64 *bits) { struct ldlm_lock *lock; struct inode *new_inode = data; @@ -109,10 +106,10 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, if(bits) *bits = 0; - if (!*lockh) - RETURN(0); + if (!lustre_handle_is_used(lockh)) + RETURN(0); - lock = ldlm_handle2lock((struct lustre_handle *)lockh); + lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); lock_res_and_lock(lock); @@ -218,25 +215,37 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * original open if the MDS crashed just when this client also OOM'd) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ -static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body) +int mdc_save_lovea(struct ptlrpc_request *req, + const struct req_msg_field *field, + void *data, u32 size) { - int rc; + struct req_capsule *pill = &req->rq_pill; + void *lmm; + int rc = 0; - /* FIXME: remove this explicit offset. */ - rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, - body->mbo_eadatasize); - if (rc) { - CERROR("Can't enlarge segment %d size to %d\n", - DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize); - body->mbo_valid &= ~OBD_MD_FLEASIZE; - body->mbo_eadatasize = 0; + if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) { + rc = sptlrpc_cli_enlarge_reqbuf(req, field, size); + if (rc) { + CERROR("%s: Can't enlarge ea size to %d: rc = %d\n", + req->rq_export->exp_obd->obd_name, + size, rc); + return rc; + } + } else { + req_capsule_shrink(pill, field, size, RCL_CLIENT); } + + req_capsule_set_size(pill, field, RCL_CLIENT, size); + lmm = req_capsule_client_get(pill, field); + if (lmm) + memcpy(lmm, data, size); + + return rc; } static struct ptlrpc_request * mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *op_data) + struct md_op_data *op_data, __u32 acl_bufsize) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); @@ -247,6 +256,8 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, int count = 0; enum ldlm_mode mode; int rc; + int repsize; + ENTRY; it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; @@ -255,12 +266,12 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, /* If inode is known, cancel conflicting OPEN locks. */ if (fid_is_sane(&op_data->op_fid2)) { if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */ - if (it->it_flags & FMODE_WRITE) + if (it->it_flags & MDS_FMODE_WRITE) mode = LCK_EX; else mode = LCK_PR; } else { - if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC)) mode = LCK_CW; #ifdef FMODE_EXEC else if (it->it_flags & FMODE_EXEC) @@ -292,8 +303,21 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, op_data->op_namelen + 1); - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, + if (cl_is_lov_delay_create(it->it_flags)) { + /* open(O_LOV_DELAY_CREATE) won't pack lmm */ + LASSERT(lmmsize == 0); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); + } else { + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, max(lmmsize, obddev->u.cli.cl_default_mds_easize)); + } + + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT, op_data->op_file_secctx_name != NULL ? + strlen(op_data->op_file_secctx_name) + 1 : 0); + + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT, + op_data->op_file_secctx_size); rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc < 0) { @@ -315,15 +339,41 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, obddev->u.cli.cl_max_mds_easize); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); - /* for remote client, fetch remote perm for current user */ - if (client_is_remote(exp)) - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - sizeof(struct mdt_remote_perm)); - ptlrpc_request_set_replen(req); - return req; + /** + * Inline buffer for possible data from Data-on-MDT files. + */ + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER, + sizeof(struct niobuf_remote)); + ptlrpc_request_set_replen(req); + + /* Get real repbuf allocated size as rounded up power of 2 */ + repsize = size_roundup_power2(req->rq_replen + + lustre_msg_early_size()); + + /* Estimate free space for DoM files in repbuf */ + repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize + + sizeof(struct lov_comp_md_v1) + + sizeof(struct lov_comp_md_entry_v1) + + lov_mds_md_size(0, LOV_MAGIC_V3); + + if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) { + repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize; + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, + RCL_SERVER, + sizeof(struct niobuf_remote) + repsize); + ptlrpc_request_set_replen(req); + CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n", + repsize, req->rq_replen); + } + return req; } +#define GA_DEFAULT_EA_NAME_LEN 20 +#define GA_DEFAULT_EA_VAL_LEN 250 +#define GA_DEFAULT_EA_NUM 10 + static struct ptlrpc_request * mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it, @@ -332,8 +382,8 @@ mdc_intent_getxattr_pack(struct obd_export *exp, struct ptlrpc_request *req; struct ldlm_intent *lit; int rc, count = 0; - __u32 maxdata; struct list_head cancels = LIST_HEAD_INIT(cancels); + u32 min_buf_size = 0; ENTRY; @@ -352,74 +402,53 @@ mdc_intent_getxattr_pack(struct obd_export *exp, lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); lit->opc = IT_GETXATTR; - maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize; +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0) + /* If the supplied buffer is too small then the server will + * return -ERANGE and llite will fallback to using non cached + * xattr operations. On servers before 2.10.1 a (non-cached) + * listxattr RPC for an orphan or dead file causes an oops. So + * let's try to avoid sending too small a buffer to too old a + * server. This is effectively undoing the memory conservation + * of LU-9417 when it would be *more* likely to crash the + * server. See LU-9856. */ + if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0)) + min_buf_size = exp->exp_connect_data.ocd_max_easize; +#endif /* pack the intended request */ - mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1, - 0); - - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, - RCL_SERVER, maxdata); - - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, - RCL_SERVER, maxdata); - - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, - RCL_SERVER, maxdata); - - ptlrpc_request_set_replen(req); - - RETURN(req); -} - -static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_intent *lit; - int rc; - ENTRY; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_UNLINK); - if (req == NULL) - RETURN(ERR_PTR(-ENOMEM)); + mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, + max_t(u32, min_buf_size, + GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM), + -1, 0); - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, + max_t(u32, min_buf_size, + GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM)); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(ERR_PTR(rc)); - } + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, + max_t(u32, min_buf_size, + GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM)); - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER, + max_t(u32, min_buf_size, + sizeof(__u32) * GA_DEFAULT_EA_NUM)); - /* pack the intended request */ - mdc_unlink_pack(req, op_data); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_default_mds_easize); ptlrpc_request_set_replen(req); + RETURN(req); } -static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) +static struct ptlrpc_request * +mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it, + struct md_op_data *op_data, __u32 acl_bufsize) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | - OBD_MD_MEA | - (client_is_remote(exp) ? - OBD_MD_FLRMTPERM : OBD_MD_FLACL); + OBD_MD_MEA | OBD_MD_FLACL; struct ldlm_intent *lit; int rc; __u32 easize; @@ -452,16 +481,14 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, mdc_getattr_pack(req, valid, it->it_flags, op_data, easize); req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize); - if (client_is_remote(exp)) - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - sizeof(struct mdt_remote_perm)); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); ptlrpc_request_set_replen(req); RETURN(req); } static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *unused) + struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; @@ -488,9 +515,9 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, /* pack the layout intent request */ layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT); - /* LAYOUT_INTENT_ACCESS is generic, specific operation will be - * set for replication */ - layout->li_opc = LAYOUT_INTENT_ACCESS; + LASSERT(op_data->op_data != NULL); + LASSERT(op_data->op_data_size == sizeof(*layout)); + memcpy(layout, op_data->op_data, sizeof(*layout)); req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, obd->u.cli.cl_default_mds_easize); @@ -531,8 +558,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, struct ldlm_request *lockreq; struct ldlm_reply *lockrep; struct ldlm_lock *lock; + struct mdt_body *body = NULL; void *lvb_data = NULL; __u32 lvb_len = 0; + ENTRY; LASSERT(rc >= 0); @@ -590,9 +619,7 @@ static int mdc_finish_enqueue(struct obd_export *exp, it->it_op, it->it_disposition, it->it_status); /* We know what to expect, so we do any byte flipping required here */ - if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) { - struct mdt_body *body; - + if (it_has_reply_body(it)) { body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (body == NULL) { CERROR ("Can't swab mdt_body\n"); @@ -639,37 +666,16 @@ static int mdc_finish_enqueue(struct obd_export *exp, * (for example error one). */ if ((it->it_op & IT_OPEN) && req->rq_replay) { - void *lmm; - if (req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT) < - body->mbo_eadatasize) - mdc_realloc_openmsg(req, body); - else - req_capsule_shrink(pill, &RMF_EADATA, - body->mbo_eadatasize, - RCL_CLIENT); - - req_capsule_set_size(pill, &RMF_EADATA, - RCL_CLIENT, - body->mbo_eadatasize); - - lmm = req_capsule_client_get(pill, &RMF_EADATA); - if (lmm) - memcpy(lmm, eadata, - body->mbo_eadatasize); + rc = mdc_save_lovea(req, &RMF_EADATA, eadata, + body->mbo_eadatasize); + if (rc) { + body->mbo_valid &= ~OBD_MD_FLEASIZE; + body->mbo_eadatasize = 0; + rc = 0; + } } } - - if (body->mbo_valid & OBD_MD_FLRMTPERM) { - struct mdt_remote_perm *perm; - - LASSERT(client_is_remote(exp)); - perm = req_capsule_server_swab_get(pill, &RMF_ACL, - lustre_swab_mdt_remote_perm); - if (perm == NULL) - RETURN(-EPROTO); - } - } else if (it->it_op & IT_LAYOUT) { + } else if (it->it_op & IT_LAYOUT) { /* maybe the lock was granted right away and layout * is packed into RMF_DLM_LVB of req */ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); @@ -678,6 +684,15 @@ static int mdc_finish_enqueue(struct obd_export *exp, &RMF_DLM_LVB, lvb_len); if (lvb_data == NULL) RETURN(-EPROTO); + + /** + * save replied layout data to the request buffer for + * recovery consideration (lest MDS reinitialize + * another set of OST objects). + */ + if (req->rq_transno) + (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data, + lvb_len); } } @@ -687,7 +702,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, * client still does this checking in case it's talking with an old * server. - Jinshan */ lock = ldlm_handle2lock(lockh); - if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL && + if (lock == NULL) + RETURN(rc); + + if (ldlm_has_layout(lock) && lvb_data != NULL && !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) { void *lmm; @@ -695,10 +713,9 @@ static int mdc_finish_enqueue(struct obd_export *exp, ldlm_it2str(it->it_op), lvb_len); OBD_ALLOC_LARGE(lmm, lvb_len); - if (lmm == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-ENOMEM); - } + if (lmm == NULL) + GOTO(out_lock, rc = -ENOMEM); + memcpy(lmm, lvb_data, lvb_len); /* install lvb_data */ @@ -713,19 +730,37 @@ static int mdc_finish_enqueue(struct obd_export *exp, if (lmm != NULL) OBD_FREE_LARGE(lmm, lvb_len); } - if (lock != NULL) - LDLM_LOCK_PUT(lock); + + if (ldlm_has_dom(lock)) { + LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast); + + body = req_capsule_server_get(pill, &RMF_MDT_BODY); + if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) { + LDLM_ERROR(lock, "%s: DoM lock without size.\n", + exp->exp_obd->obd_name); + GOTO(out_lock, rc = -EPROTO); + } + + LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu", + ldlm_it2str(it->it_op), body->mbo_dom_size); + + rc = mdc_fill_lvb(req, &lock->l_ost_lvb); + } +out_lock: + LDLM_LOCK_PUT(lock); RETURN(rc); } /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, - struct ldlm_enqueue_info *einfo, - const union ldlm_policy_data *policy, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, __u64 extra_lock_flags) +static int mdc_enqueue_base(struct obd_export *exp, + struct ldlm_enqueue_info *einfo, + const union ldlm_policy_data *policy, + struct lookup_intent *it, + struct md_op_data *op_data, + struct lustre_handle *lockh, + __u64 extra_lock_flags) { struct obd_device *obddev = class_exp2obd(exp); struct ptlrpc_request *req = NULL; @@ -741,6 +776,8 @@ int mdc_enqueue(struct obd_export *exp, .l_inodebits = { MDS_INODELOCK_XATTR } }; int generation, resends = 0; struct ldlm_reply *lockrep; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize; enum lvb_type lvb_type = 0; int rc; ENTRY; @@ -753,34 +790,37 @@ int mdc_enqueue(struct obd_export *exp, LASSERT(policy == NULL); saved_flags |= LDLM_FL_HAS_INTENT; - if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR)) + if (it->it_op & (IT_GETATTR | IT_READDIR)) policy = &update_policy; else if (it->it_op & IT_LAYOUT) policy = &layout_policy; - else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) + else if (it->it_op & IT_GETXATTR) policy = &getxattr_policy; else policy = &lookup_policy; } - generation = obddev->u.cli.cl_import->imp_generation; + generation = obddev->u.cli.cl_import->imp_generation; + if (!it || (it->it_op & (IT_OPEN | IT_CREAT))) + acl_bufsize = imp->imp_connect_data.ocd_max_easize; + else + acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + resend: - flags = saved_flags; + flags = saved_flags; if (it == NULL) { /* The only way right now is FLOCK. */ LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", einfo->ei_type); res_id.name[3] = LDLM_FLOCK; } else if (it->it_op & IT_OPEN) { - req = mdc_intent_open_pack(exp, it, op_data); - } else if (it->it_op & IT_UNLINK) { - req = mdc_intent_unlink_pack(exp, it, op_data); + req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - req = mdc_intent_getattr_pack(exp, it, op_data); + req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & IT_READDIR) { req = mdc_enqueue_pack(exp, 0); } else if (it->it_op & IT_LAYOUT) { - if (!imp_connect_lvb_type(class_exp2cliimp(exp))) + if (!imp_connect_lvb_type(imp)) RETURN(-EOPNOTSUPP); req = mdc_intent_layout_pack(exp, it, op_data); lvb_type = LVB_T_LAYOUT; @@ -797,7 +837,7 @@ resend: if (resends) { req->rq_generation_set = 1; req->rq_import_generation = generation; - req->rq_sent = cfs_time_current_sec() + resends; + req->rq_sent = ktime_get_real_seconds() + resends; } /* It is important to obtain modify RPC slot first (if applicable), so @@ -809,18 +849,25 @@ resend: rc = obd_get_request_slot(&obddev->u.cli); if (rc != 0) { mdc_put_mod_rpc_slot(req, it); - mdc_clear_replay_flag(req, 0); - ptlrpc_req_finished(req); - RETURN(rc); - } - } + mdc_clear_replay_flag(req, 0); + ptlrpc_req_finished(req); + RETURN(rc); + } + } + + /* With Data-on-MDT the glimpse callback is needed too. + * It is set here in advance but not in mdc_finish_enqueue() + * to avoid possible races. It is safe to have glimpse handler + * for non-DOM locks and costs nothing.*/ + if (einfo->ei_cb_gl == NULL) + einfo->ei_cb_gl = mdc_ldlm_glimpse_ast; - rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, 0, lvb_type, lockh, 0); - if (!it) { - /* For flock requests we immediatelly return without further - delay and let caller deal with the rest, since rest of - this function metadata processing makes no sense for flock + if (!it) { + /* For flock requests we immediatelly return without further + delay and let caller deal with the rest, since rest of + this function metadata processing makes no sense for flock requests anyway. But in case of problem during comms with Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we can not rely on caller and this mainly for F_UNLCKs @@ -837,8 +884,10 @@ resend: mdc_put_mod_rpc_slot(req, it); if (rc < 0) { - CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n", - obddev->obd_name, rc); + CDEBUG(D_INFO, + "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n", + obddev->obd_name, PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2), op_data->op_name ?: "", rc); mdc_clear_replay_flag(req, rc); ptlrpc_req_finished(req); @@ -855,21 +904,32 @@ resend: * intent operation, when server returns -EINPROGRESS for acquiring * intent lock, we'll retry in after_reply(). */ if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) { - mdc_clear_replay_flag(req, rc); - ptlrpc_req_finished(req); - resends++; - - CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n", - obddev->obd_name, resends, it->it_op, - PFID(&op_data->op_fid1), PFID(&op_data->op_fid2)); - - if (generation == obddev->u.cli.cl_import->imp_generation) { - goto resend; - } else { + mdc_clear_replay_flag(req, rc); + ptlrpc_req_finished(req); + if (generation == obddev->u.cli.cl_import->imp_generation) { + if (signal_pending(current)) + RETURN(-EINTR); + + resends++; + CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n", + obddev->obd_name, resends, it->it_op, + PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2)); + goto resend; + } else { CDEBUG(D_HA, "resend cross eviction\n"); - RETURN(-EIO); - } - } + RETURN(-EIO); + } + } + + if ((int)lockrep->lock_policy_res2 == -ERANGE && + it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) && + acl_bufsize != imp->imp_connect_data.ocd_max_easize) { + mdc_clear_replay_flag(req, -ERANGE); + ptlrpc_req_finished(req); + acl_bufsize = imp->imp_connect_data.ocd_max_easize; + goto resend; + } rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); if (rc < 0) { @@ -887,6 +947,15 @@ resend: RETURN(rc); } +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + const union ldlm_policy_data *policy, + struct md_op_data *op_data, + struct lustre_handle *lockh, __u64 extra_lock_flags) +{ + return mdc_enqueue_base(exp, einfo, policy, NULL, + op_data, lockh, extra_lock_flags); +} + static int mdc_finish_intent_lock(struct obd_export *exp, struct ptlrpc_request *request, struct md_op_data *op_data, @@ -894,9 +963,8 @@ static int mdc_finish_intent_lock(struct obd_export *exp, struct lustre_handle *lockh) { struct lustre_handle old_lock; - struct mdt_body *mdt_body; struct ldlm_lock *lock; - int rc; + int rc = 0; ENTRY; LASSERT(request != NULL); @@ -906,48 +974,58 @@ static int mdc_finish_intent_lock(struct obd_export *exp, if (it->it_op & IT_READDIR) RETURN(0); - if (!it_disposition(it, DISP_IT_EXECD)) { - /* The server failed before it even started executing the - * intent, i.e. because it couldn't unpack the request. */ - LASSERT(it->it_status != 0); - RETURN(it->it_status); - } - rc = it_open_error(DISP_IT_EXECD, it); - if (rc) - RETURN(rc); - - mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); - LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ - - rc = it_open_error(DISP_LOOKUP_EXECD, it); - if (rc) - RETURN(rc); - - /* keep requests around for the multiple phases of the call - * this shows the DISP_XX must guarantee we make it into the call - */ - if (!it_disposition(it, DISP_ENQ_CREATE_REF) && - it_disposition(it, DISP_OPEN_CREATE) && - !it_open_error(DISP_OPEN_CREATE, it)) { - it_set_disposition(it, DISP_ENQ_CREATE_REF); - ptlrpc_request_addref(request); /* balanced in ll_create_node */ - } - if (!it_disposition(it, DISP_ENQ_OPEN_REF) && - it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - it_set_disposition(it, DISP_ENQ_OPEN_REF); - ptlrpc_request_addref(request); /* balanced in ll_file_open */ - /* BUG 11546 - eviction in the middle of open rpc processing */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); - } + if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) { + if (it->it_status != 0) + GOTO(out, rc = it->it_status); + } else { + if (!it_disposition(it, DISP_IT_EXECD)) { + /* The server failed before it even started executing + * the intent, i.e. because it couldn't unpack the + * request. + */ + LASSERT(it->it_status != 0); + GOTO(out, rc = it->it_status); + } + rc = it_open_error(DISP_IT_EXECD, it); + if (rc) + GOTO(out, rc); + + rc = it_open_error(DISP_LOOKUP_EXECD, it); + if (rc) + GOTO(out, rc); + + /* keep requests around for the multiple phases of the call + * this shows the DISP_XX must guarantee we make it into the + * call + */ + if (!it_disposition(it, DISP_ENQ_CREATE_REF) && + it_disposition(it, DISP_OPEN_CREATE) && + !it_open_error(DISP_OPEN_CREATE, it)) { + it_set_disposition(it, DISP_ENQ_CREATE_REF); + /* balanced in ll_create_node */ + ptlrpc_request_addref(request); + } + if (!it_disposition(it, DISP_ENQ_OPEN_REF) && + it_disposition(it, DISP_OPEN_OPEN) && + !it_open_error(DISP_OPEN_OPEN, it)) { + it_set_disposition(it, DISP_ENQ_OPEN_REF); + /* balanced in ll_file_open */ + ptlrpc_request_addref(request); + /* BUG 11546 - eviction in the middle of open rpc + * processing + */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, + obd_timeout); + } - if (it->it_op & IT_CREAT) { - /* XXX this belongs in ll_create_it */ - } else if (it->it_op == IT_OPEN) { - LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); - } else { - LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT)); - } + if (it->it_op & IT_CREAT) { + /* XXX this belongs in ll_create_it */ + } else if (it->it_op == IT_OPEN) { + LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); + } else { + LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); + } + } /* If we already have a matching lock, then cancel the new * one. We have to set the data here instead of in @@ -959,10 +1037,19 @@ static int mdc_finish_intent_lock(struct obd_export *exp, union ldlm_policy_data policy = lock->l_policy_data; LDLM_DEBUG(lock, "matching against this"); - LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1, - &lock->l_resource->lr_name), - "Lock res_id: "DLDLMRES", fid: "DFID"\n", - PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1)); + if (it_has_reply_body(it)) { + struct mdt_body *body; + + body = req_capsule_server_get(&request->rq_pill, + &RMF_MDT_BODY); + /* mdc_enqueue checked */ + LASSERT(body != NULL); + LASSERTF(fid_res_name_eq(&body->mbo_fid1, + &lock->l_resource->lr_name), + "Lock res_id: "DLDLMRES", fid: "DFID"\n", + PLDLMRES(lock->l_resource), + PFID(&body->mbo_fid1)); + } LDLM_LOCK_PUT(lock); memcpy(&old_lock, lockh, sizeof(*lockh)); @@ -974,12 +1061,13 @@ static int mdc_finish_intent_lock(struct obd_export *exp, } } + EXIT; +out: CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", (int)op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc); - - RETURN(rc); + return rc; } int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, @@ -1022,13 +1110,13 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, case IT_READDIR: policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; break; - case IT_LAYOUT: - policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; - break; - default: - policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; - break; - } + case IT_LAYOUT: + policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; + break; + default: + policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; + break; + } mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid, LDLM_IBITS, &policy, @@ -1083,6 +1171,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, .ei_mode = it_to_lock_mode(it), .ei_cb_bl = cb_blocking, .ei_cb_cp = ldlm_completion_ast, + .ei_cb_gl = mdc_ldlm_glimpse_ast, }; struct lustre_handle lockh; int rc = 0; @@ -1090,7 +1179,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, LASSERT(it); CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID - ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen, + ", intent: %s flags %#llo\n", (int)op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid2), PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); @@ -1118,8 +1207,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, } } - rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh, - extra_lock_flags); + rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh, + extra_lock_flags); if (rc < 0) RETURN(rc); @@ -1194,13 +1283,15 @@ int mdc_intent_getattr_async(struct obd_export *exp, __u64 flags = LDLM_FL_HAS_INTENT; ENTRY; - CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#" - LPF64"o\n", + CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n", (int)op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); fid_build_reg_res_name(&op_data->op_fid1, &res_id); - req = mdc_intent_getattr_pack(exp, it, op_data); + /* If the MDT return -ERANGE because of large ACL, then the sponsor + * of the async getattr RPC will handle that by itself. */ + req = mdc_intent_getattr_pack(exp, it, op_data, + LUSTRE_POSIX_ACL_MAX_SIZE_OLD); if (IS_ERR(req)) RETURN(PTR_ERR(req)); @@ -1210,6 +1301,13 @@ int mdc_intent_getattr_async(struct obd_export *exp, RETURN(rc); } + /* With Data-on-MDT the glimpse callback is needed too. + * It is set here in advance but not in mdc_finish_enqueue() + * to avoid possible races. It is safe to have glimpse handler + * for non-DOM locks and costs nothing.*/ + if (minfo->mi_einfo.ei_cb_gl == NULL) + minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast; + rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy, &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) {