X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=32587cfa60df4a4e7be3b3e878fbf32787fa4494;hp=0b7a20ffa82e2bf83d82a985fdd855b248e9a2b3;hb=511ea5850f2553d14891aeda7972d0526c67a3af;hpb=72057a3af19ee02d9a686bd7e7d074917e381310 diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 0b7a20f..32587cf 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -43,6 +43,7 @@ #include #include #include +#include #include "mdc_internal.h" @@ -214,25 +215,37 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * original open if the MDS crashed just when this client also OOM'd) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ -static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body) +int mdc_save_lovea(struct ptlrpc_request *req, + const struct req_msg_field *field, + void *data, u32 size) { - int rc; + struct req_capsule *pill = &req->rq_pill; + void *lmm; + int rc = 0; - /* FIXME: remove this explicit offset. */ - rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, - body->mbo_eadatasize); - if (rc) { - CERROR("Can't enlarge segment %d size to %d\n", - DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize); - body->mbo_valid &= ~OBD_MD_FLEASIZE; - body->mbo_eadatasize = 0; + if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) { + rc = sptlrpc_cli_enlarge_reqbuf(req, field, size); + if (rc) { + CERROR("%s: Can't enlarge ea size to %d: rc = %d\n", + req->rq_export->exp_obd->obd_name, + size, rc); + return rc; + } + } else { + req_capsule_shrink(pill, field, size, RCL_CLIENT); } + + req_capsule_set_size(pill, field, RCL_CLIENT, size); + lmm = req_capsule_client_get(pill, field); + if (lmm) + memcpy(lmm, data, size); + + return rc; } static struct ptlrpc_request * mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *op_data) + struct md_op_data *op_data, __u32 acl_bufsize) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); @@ -243,6 +256,8 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, int count = 0; enum ldlm_mode mode; int rc; + int repsize; + ENTRY; it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; @@ -251,12 +266,12 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, /* If inode is known, cancel conflicting OPEN locks. */ if (fid_is_sane(&op_data->op_fid2)) { if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */ - if (it->it_flags & FMODE_WRITE) + if (it->it_flags & MDS_FMODE_WRITE) mode = LCK_EX; else mode = LCK_PR; } else { - if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC)) mode = LCK_CW; #ifdef FMODE_EXEC else if (it->it_flags & FMODE_EXEC) @@ -288,8 +303,14 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, op_data->op_namelen + 1); - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, + if (cl_is_lov_delay_create(it->it_flags)) { + /* open(O_LOV_DELAY_CREATE) won't pack lmm */ + LASSERT(lmmsize == 0); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); + } else { + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, max(lmmsize, obddev->u.cli.cl_default_mds_easize)); + } req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME, RCL_CLIENT, op_data->op_file_secctx_name != NULL ? @@ -318,10 +339,41 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, obddev->u.cli.cl_max_mds_easize); - ptlrpc_request_set_replen(req); - return req; + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + + /** + * Inline buffer for possible data from Data-on-MDT files. + */ + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER, + sizeof(struct niobuf_remote)); + ptlrpc_request_set_replen(req); + + /* Get real repbuf allocated size as rounded up power of 2 */ + repsize = size_roundup_power2(req->rq_replen + + lustre_msg_early_size()); + + /* Estimate free space for DoM files in repbuf */ + repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize + + sizeof(struct lov_comp_md_v1) + + sizeof(struct lov_comp_md_entry_v1) + + lov_mds_md_size(0, LOV_MAGIC_V3); + + if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) { + repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize; + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, + RCL_SERVER, + sizeof(struct niobuf_remote) + repsize); + ptlrpc_request_set_replen(req); + CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n", + repsize, req->rq_replen); + } + return req; } +#define GA_DEFAULT_EA_NAME_LEN 20 +#define GA_DEFAULT_EA_VAL_LEN 250 +#define GA_DEFAULT_EA_NUM 10 + static struct ptlrpc_request * mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it, @@ -330,8 +382,8 @@ mdc_intent_getxattr_pack(struct obd_export *exp, struct ptlrpc_request *req; struct ldlm_intent *lit; int rc, count = 0; - __u32 maxdata; struct list_head cancels = LIST_HEAD_INIT(cancels); + u32 min_buf_size = 0; ENTRY; @@ -350,66 +402,47 @@ mdc_intent_getxattr_pack(struct obd_export *exp, lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); lit->opc = IT_GETXATTR; - maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize; +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0) + /* If the supplied buffer is too small then the server will + * return -ERANGE and llite will fallback to using non cached + * xattr operations. On servers before 2.10.1 a (non-cached) + * listxattr RPC for an orphan or dead file causes an oops. So + * let's try to avoid sending too small a buffer to too old a + * server. This is effectively undoing the memory conservation + * of LU-9417 when it would be *more* likely to crash the + * server. See LU-9856. */ + if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0)) + min_buf_size = exp->exp_connect_data.ocd_max_easize; +#endif /* pack the intended request */ - mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1, - 0); + mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, + max_t(u32, min_buf_size, + GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM), + -1, 0); - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, - RCL_SERVER, maxdata); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, + max_t(u32, min_buf_size, + GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM)); - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, - RCL_SERVER, maxdata); + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, + max_t(u32, min_buf_size, + GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM)); - req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, - RCL_SERVER, maxdata); + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER, + max_t(u32, min_buf_size, + sizeof(__u32) * GA_DEFAULT_EA_NUM)); - ptlrpc_request_set_replen(req); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); - RETURN(req); -} - -static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_intent *lit; - int rc; - ENTRY; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_UNLINK); - if (req == NULL) - RETURN(ERR_PTR(-ENOMEM)); - - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); - - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(ERR_PTR(rc)); - } - - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_unlink_pack(req, op_data); - - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_default_mds_easize); ptlrpc_request_set_replen(req); + RETURN(req); } -static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) +static struct ptlrpc_request * +mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it, + struct md_op_data *op_data, __u32 acl_bufsize) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); @@ -448,13 +481,14 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, mdc_getattr_pack(req, valid, it->it_flags, op_data, easize); req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); ptlrpc_request_set_replen(req); RETURN(req); } static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *unused) + struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; @@ -481,9 +515,9 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, /* pack the layout intent request */ layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT); - /* LAYOUT_INTENT_ACCESS is generic, specific operation will be - * set for replication */ - layout->li_opc = LAYOUT_INTENT_ACCESS; + LASSERT(op_data->op_data != NULL); + LASSERT(op_data->op_data_size == sizeof(*layout)); + memcpy(layout, op_data->op_data, sizeof(*layout)); req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, obd->u.cli.cl_default_mds_easize); @@ -524,8 +558,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, struct ldlm_request *lockreq; struct ldlm_reply *lockrep; struct ldlm_lock *lock; + struct mdt_body *body = NULL; void *lvb_data = NULL; __u32 lvb_len = 0; + ENTRY; LASSERT(rc >= 0); @@ -584,8 +620,6 @@ static int mdc_finish_enqueue(struct obd_export *exp, /* We know what to expect, so we do any byte flipping required here */ if (it_has_reply_body(it)) { - struct mdt_body *body; - body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (body == NULL) { CERROR ("Can't swab mdt_body\n"); @@ -632,27 +666,16 @@ static int mdc_finish_enqueue(struct obd_export *exp, * (for example error one). */ if ((it->it_op & IT_OPEN) && req->rq_replay) { - void *lmm; - if (req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT) < - body->mbo_eadatasize) - mdc_realloc_openmsg(req, body); - else - req_capsule_shrink(pill, &RMF_EADATA, - body->mbo_eadatasize, - RCL_CLIENT); - - req_capsule_set_size(pill, &RMF_EADATA, - RCL_CLIENT, - body->mbo_eadatasize); - - lmm = req_capsule_client_get(pill, &RMF_EADATA); - if (lmm) - memcpy(lmm, eadata, - body->mbo_eadatasize); + rc = mdc_save_lovea(req, &RMF_EADATA, eadata, + body->mbo_eadatasize); + if (rc) { + body->mbo_valid &= ~OBD_MD_FLEASIZE; + body->mbo_eadatasize = 0; + rc = 0; + } } } - } else if (it->it_op & IT_LAYOUT) { + } else if (it->it_op & IT_LAYOUT) { /* maybe the lock was granted right away and layout * is packed into RMF_DLM_LVB of req */ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); @@ -661,6 +684,15 @@ static int mdc_finish_enqueue(struct obd_export *exp, &RMF_DLM_LVB, lvb_len); if (lvb_data == NULL) RETURN(-EPROTO); + + /** + * save replied layout data to the request buffer for + * recovery consideration (lest MDS reinitialize + * another set of OST objects). + */ + if (req->rq_transno) + (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data, + lvb_len); } } @@ -670,7 +702,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, * client still does this checking in case it's talking with an old * server. - Jinshan */ lock = ldlm_handle2lock(lockh); - if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL && + if (lock == NULL) + RETURN(rc); + + if (ldlm_has_layout(lock) && lvb_data != NULL && !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) { void *lmm; @@ -678,10 +713,9 @@ static int mdc_finish_enqueue(struct obd_export *exp, ldlm_it2str(it->it_op), lvb_len); OBD_ALLOC_LARGE(lmm, lvb_len); - if (lmm == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-ENOMEM); - } + if (lmm == NULL) + GOTO(out_lock, rc = -ENOMEM); + memcpy(lmm, lvb_data, lvb_len); /* install lvb_data */ @@ -696,8 +730,24 @@ static int mdc_finish_enqueue(struct obd_export *exp, if (lmm != NULL) OBD_FREE_LARGE(lmm, lvb_len); } - if (lock != NULL) - LDLM_LOCK_PUT(lock); + + if (ldlm_has_dom(lock)) { + LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast); + + body = req_capsule_server_get(pill, &RMF_MDT_BODY); + if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) { + LDLM_ERROR(lock, "%s: DoM lock without size.\n", + exp->exp_obd->obd_name); + GOTO(out_lock, rc = -EPROTO); + } + + LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu", + ldlm_it2str(it->it_op), body->mbo_dom_size); + + rc = mdc_fill_lvb(req, &lock->l_ost_lvb); + } +out_lock: + LDLM_LOCK_PUT(lock); RETURN(rc); } @@ -726,6 +776,8 @@ static int mdc_enqueue_base(struct obd_export *exp, .l_inodebits = { MDS_INODELOCK_XATTR } }; int generation, resends = 0; struct ldlm_reply *lockrep; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize; enum lvb_type lvb_type = 0; int rc; ENTRY; @@ -738,34 +790,37 @@ static int mdc_enqueue_base(struct obd_export *exp, LASSERT(policy == NULL); saved_flags |= LDLM_FL_HAS_INTENT; - if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR)) + if (it->it_op & (IT_GETATTR | IT_READDIR)) policy = &update_policy; else if (it->it_op & IT_LAYOUT) policy = &layout_policy; - else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) + else if (it->it_op & IT_GETXATTR) policy = &getxattr_policy; else policy = &lookup_policy; } - generation = obddev->u.cli.cl_import->imp_generation; + generation = obddev->u.cli.cl_import->imp_generation; + if (!it || (it->it_op & (IT_OPEN | IT_CREAT))) + acl_bufsize = imp->imp_connect_data.ocd_max_easize; + else + acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + resend: - flags = saved_flags; + flags = saved_flags; if (it == NULL) { /* The only way right now is FLOCK. */ LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", einfo->ei_type); res_id.name[3] = LDLM_FLOCK; } else if (it->it_op & IT_OPEN) { - req = mdc_intent_open_pack(exp, it, op_data); - } else if (it->it_op & IT_UNLINK) { - req = mdc_intent_unlink_pack(exp, it, op_data); + req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - req = mdc_intent_getattr_pack(exp, it, op_data); + req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & IT_READDIR) { req = mdc_enqueue_pack(exp, 0); } else if (it->it_op & IT_LAYOUT) { - if (!imp_connect_lvb_type(class_exp2cliimp(exp))) + if (!imp_connect_lvb_type(imp)) RETURN(-EOPNOTSUPP); req = mdc_intent_layout_pack(exp, it, op_data); lvb_type = LVB_T_LAYOUT; @@ -782,7 +837,7 @@ resend: if (resends) { req->rq_generation_set = 1; req->rq_import_generation = generation; - req->rq_sent = cfs_time_current_sec() + resends; + req->rq_sent = ktime_get_real_seconds() + resends; } /* It is important to obtain modify RPC slot first (if applicable), so @@ -794,18 +849,25 @@ resend: rc = obd_get_request_slot(&obddev->u.cli); if (rc != 0) { mdc_put_mod_rpc_slot(req, it); - mdc_clear_replay_flag(req, 0); - ptlrpc_req_finished(req); - RETURN(rc); - } - } + mdc_clear_replay_flag(req, 0); + ptlrpc_req_finished(req); + RETURN(rc); + } + } + + /* With Data-on-MDT the glimpse callback is needed too. + * It is set here in advance but not in mdc_finish_enqueue() + * to avoid possible races. It is safe to have glimpse handler + * for non-DOM locks and costs nothing.*/ + if (einfo->ei_cb_gl == NULL) + einfo->ei_cb_gl = mdc_ldlm_glimpse_ast; - rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, 0, lvb_type, lockh, 0); - if (!it) { - /* For flock requests we immediatelly return without further - delay and let caller deal with the rest, since rest of - this function metadata processing makes no sense for flock + if (!it) { + /* For flock requests we immediatelly return without further + delay and let caller deal with the rest, since rest of + this function metadata processing makes no sense for flock requests anyway. But in case of problem during comms with Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we can not rely on caller and this mainly for F_UNLCKs @@ -822,8 +884,10 @@ resend: mdc_put_mod_rpc_slot(req, it); if (rc < 0) { - CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n", - obddev->obd_name, rc); + CDEBUG(D_INFO, + "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n", + obddev->obd_name, PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2), op_data->op_name ?: "", rc); mdc_clear_replay_flag(req, rc); ptlrpc_req_finished(req); @@ -840,21 +904,32 @@ resend: * intent operation, when server returns -EINPROGRESS for acquiring * intent lock, we'll retry in after_reply(). */ if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) { - mdc_clear_replay_flag(req, rc); - ptlrpc_req_finished(req); - resends++; - - CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n", - obddev->obd_name, resends, it->it_op, - PFID(&op_data->op_fid1), PFID(&op_data->op_fid2)); - - if (generation == obddev->u.cli.cl_import->imp_generation) { - goto resend; - } else { + mdc_clear_replay_flag(req, rc); + ptlrpc_req_finished(req); + if (generation == obddev->u.cli.cl_import->imp_generation) { + if (signal_pending(current)) + RETURN(-EINTR); + + resends++; + CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n", + obddev->obd_name, resends, it->it_op, + PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2)); + goto resend; + } else { CDEBUG(D_HA, "resend cross eviction\n"); - RETURN(-EIO); - } - } + RETURN(-EIO); + } + } + + if ((int)lockrep->lock_policy_res2 == -ERANGE && + it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) && + acl_bufsize != imp->imp_connect_data.ocd_max_easize) { + mdc_clear_replay_flag(req, -ERANGE); + ptlrpc_req_finished(req); + acl_bufsize = imp->imp_connect_data.ocd_max_easize; + goto resend; + } rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); if (rc < 0) { @@ -1035,13 +1110,13 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, case IT_READDIR: policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; break; - case IT_LAYOUT: - policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; - break; - default: - policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; - break; - } + case IT_LAYOUT: + policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; + break; + default: + policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; + break; + } mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid, LDLM_IBITS, &policy, @@ -1096,6 +1171,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, .ei_mode = it_to_lock_mode(it), .ei_cb_bl = cb_blocking, .ei_cb_cp = ldlm_completion_ast, + .ei_cb_gl = mdc_ldlm_glimpse_ast, }; struct lustre_handle lockh; int rc = 0; @@ -1212,7 +1288,10 @@ int mdc_intent_getattr_async(struct obd_export *exp, PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); fid_build_reg_res_name(&op_data->op_fid1, &res_id); - req = mdc_intent_getattr_pack(exp, it, op_data); + /* If the MDT return -ERANGE because of large ACL, then the sponsor + * of the async getattr RPC will handle that by itself. */ + req = mdc_intent_getattr_pack(exp, it, op_data, + LUSTRE_POSIX_ACL_MAX_SIZE_OLD); if (IS_ERR(req)) RETURN(PTR_ERR(req)); @@ -1222,6 +1301,13 @@ int mdc_intent_getattr_async(struct obd_export *exp, RETURN(rc); } + /* With Data-on-MDT the glimpse callback is needed too. + * It is set here in advance but not in mdc_finish_enqueue() + * to avoid possible races. It is safe to have glimpse handler + * for non-DOM locks and costs nothing.*/ + if (minfo->mi_einfo.ei_cb_gl == NULL) + minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast; + rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy, &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) {