X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=81bfebc720241c6620c399f84f150ced7be8ed2b;hp=d5f054caab75ef2831bfd08edf527bfe156a943f;hb=4b8518ee4fa542f45fcdaeaec580d858dfcaf137;hpb=3a0a50f44f68066642a23365fa4a6f1924dd1108;ds=sidebyside diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index d5f054c..81bfebc 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -43,6 +43,7 @@ #include #include #include +#include #include "mdc_internal.h" @@ -196,11 +197,12 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) spin_lock(&req->rq_lock); req->rq_replay = 0; spin_unlock(&req->rq_lock); - } - if (rc && req->rq_transno != 0) { - DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc); - LBUG(); - } + } + if (rc && req->rq_transno != 0) { + DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d", + rc); + LBUG(); + } } /* Save a large LOV EA into the request buffer so that it is available @@ -219,7 +221,7 @@ int mdc_save_lovea(struct ptlrpc_request *req, void *data, u32 size) { struct req_capsule *pill = &req->rq_pill; - void *lmm; + struct lov_user_md *lmm; int rc = 0; if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) { @@ -236,39 +238,48 @@ int mdc_save_lovea(struct ptlrpc_request *req, req_capsule_set_size(pill, field, RCL_CLIENT, size); lmm = req_capsule_client_get(pill, field); - if (lmm) + if (lmm) { memcpy(lmm, data, size); + /* overwrite layout generation returned from the MDS */ + lmm->lmm_stripe_offset = + (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT; + } return rc; } static struct ptlrpc_request * mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *op_data) + struct md_op_data *op_data, __u32 acl_bufsize) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); struct ldlm_intent *lit; const void *lmm = op_data->op_data; __u32 lmmsize = op_data->op_data_size; - struct list_head cancels = LIST_HEAD_INIT(cancels); + __u32 mdt_md_capsule_size; + LIST_HEAD(cancels); int count = 0; enum ldlm_mode mode; int rc; + int repsize, repsize_estimate; + ENTRY; + mdt_md_capsule_size = obddev->u.cli.cl_default_mds_easize; + it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; /* XXX: openlock is not cancelled for cross-refs. */ /* If inode is known, cancel conflicting OPEN locks. */ if (fid_is_sane(&op_data->op_fid2)) { if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */ - if (it->it_flags & FMODE_WRITE) + if (it->it_flags & MDS_FMODE_WRITE) mode = LCK_EX; else mode = LCK_PR; } else { - if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC)) mode = LCK_CW; #ifdef FMODE_EXEC else if (it->it_flags & FMODE_EXEC) @@ -311,11 +322,21 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME, RCL_CLIENT, op_data->op_file_secctx_name != NULL ? - strlen(op_data->op_file_secctx_name) + 1 : 0); + op_data->op_file_secctx_name_size : 0); req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT, op_data->op_file_secctx_size); + /* get SELinux policy info if any */ + rc = sptlrpc_get_sepol(req); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT, + strlen(req->rq_sepol) ? + strlen(req->rq_sepol) + 1 : 0); + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc < 0) { ptlrpc_request_free(req); @@ -335,11 +356,72 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, lmmsize); req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_max_mds_easize); - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - req->rq_import->imp_connect_data.ocd_max_easize); - ptlrpc_request_set_replen(req); - return req; + mdt_md_capsule_size); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + + if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN && + req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT) && + op_data->op_file_secctx_name_size > 0 && + op_data->op_file_secctx_name != NULL) { + char *secctx_name; + + secctx_name = req_capsule_client_get(&req->rq_pill, + &RMF_FILE_SECCTX_NAME); + memcpy(secctx_name, op_data->op_file_secctx_name, + op_data->op_file_secctx_name_size); + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, + obddev->u.cli.cl_max_mds_easize); + + CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n", + op_data->op_file_secctx_name_size, + op_data->op_file_secctx_name); + + } else { + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, 0); + } + + /** + * Inline buffer for possible data from Data-on-MDT files. + */ + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER, + sizeof(struct niobuf_remote)); + ptlrpc_request_set_replen(req); + + /* Get real repbuf allocated size as rounded up power of 2 */ + repsize = size_roundup_power2(req->rq_replen + + lustre_msg_early_size()); + /* Estimate free space for DoM files in repbuf */ + repsize_estimate = repsize - (req->rq_replen - + mdt_md_capsule_size + + sizeof(struct lov_comp_md_v1) + + sizeof(struct lov_comp_md_entry_v1) + + lov_mds_md_size(0, LOV_MAGIC_V3)); + + if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) { + repsize = obddev->u.cli.cl_dom_min_inline_repsize - + repsize_estimate + sizeof(struct niobuf_remote); + req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, + RCL_SERVER, + sizeof(struct niobuf_remote) + repsize); + ptlrpc_request_set_replen(req); + CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n", + repsize, req->rq_replen); + repsize = size_roundup_power2(req->rq_replen + + lustre_msg_early_size()); + } + /* The only way to report real allocated repbuf size to the server + * is the lm_repsize but it must be set prior buffer allocation itself + * due to security reasons - it is part of buffer used in signature + * calculation (see LU-11414). Therefore the saved size is predicted + * value as rq_replen rounded to the next higher power of 2. + * Such estimation is safe. Though the final allocated buffer might + * be even larger, it is not possible to know that at this point. + */ + req->rq_reqmsg->lm_repsize = repsize; + RETURN(req); } #define GA_DEFAULT_EA_NAME_LEN 20 @@ -354,7 +436,8 @@ mdc_intent_getxattr_pack(struct obd_export *exp, struct ptlrpc_request *req; struct ldlm_intent *lit; int rc, count = 0; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); + u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM; ENTRY; @@ -363,6 +446,16 @@ mdc_intent_getxattr_pack(struct obd_export *exp, if (req == NULL) RETURN(ERR_PTR(-ENOMEM)); + /* get SELinux policy info if any */ + rc = sptlrpc_get_sepol(req); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT, + strlen(req->rq_sepol) ? + strlen(req->rq_sepol) + 1 : 0); + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc) { ptlrpc_request_free(req); @@ -372,20 +465,38 @@ mdc_intent_getxattr_pack(struct obd_export *exp, /* pack the intent */ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); lit->opc = IT_GETXATTR; + CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1)); + +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0) + /* If the supplied buffer is too small then the server will + * return -ERANGE and llite will fallback to using non cached + * xattr operations. On servers before 2.10.1 a (non-cached) + * listxattr RPC for an orphan or dead file causes an oops. So + * let's try to avoid sending too small a buffer to too old a + * server. This is effectively undoing the memory conservation + * of LU-9417 when it would be *more* likely to crash the + * server. See LU-9856. */ + if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0)) + ea_vals_buf_size = max_t(u32, ea_vals_buf_size, + exp->exp_connect_data.ocd_max_easize); +#endif /* pack the intended request */ mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, - GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM, - -1, 0); + ea_vals_buf_size, -1, 0); + + /* get SELinux policy info if any */ + mdc_file_sepol_pack(req); req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM); req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, - GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM); + ea_vals_buf_size); req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER, - sizeof(__u32) * GA_DEFAULT_EA_NUM); + sizeof(u32) * GA_DEFAULT_EA_NUM); req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); @@ -394,86 +505,81 @@ mdc_intent_getxattr_pack(struct obd_export *exp, RETURN(req); } -static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) +static struct ptlrpc_request * +mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it, + struct md_op_data *op_data, __u32 acl_bufsize) { - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_intent *lit; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct obd_device *obddev = class_exp2obd(exp); + u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE | + OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL | + OBD_MD_DEFAULT_MEA; + struct ldlm_intent *lit; + __u32 easize; + bool have_secctx = false; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_UNLINK); - if (req == NULL) - RETURN(ERR_PTR(-ENOMEM)); + ENTRY; - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_GETATTR); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(ERR_PTR(rc)); - } + /* send name of security xattr to get upon intent */ + if (it->it_op & (IT_LOOKUP | IT_GETATTR) && + req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT) && + op_data->op_file_secctx_name_size > 0 && + op_data->op_file_secctx_name != NULL) { + have_secctx = true; + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT, + op_data->op_file_secctx_name_size); + } - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); - /* pack the intended request */ - mdc_unlink_pack(req, op_data); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obddev->u.cli.cl_default_mds_easize); - ptlrpc_request_set_replen(req); - RETURN(req); -} + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; -static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, - struct lookup_intent *it, - struct md_op_data *op_data) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | - OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | - OBD_MD_MEA | OBD_MD_FLACL; - struct ldlm_intent *lit; - int rc; - __u32 easize; - ENTRY; + easize = obddev->u.cli.cl_default_mds_easize; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_GETATTR); - if (req == NULL) - RETURN(ERR_PTR(-ENOMEM)); + /* pack the intended request */ + mdc_getattr_pack(req, valid, it->it_flags, op_data, easize); - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER, + sizeof(struct lmv_user_md)); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(ERR_PTR(rc)); - } + if (have_secctx) { + char *secctx_name; - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; + secctx_name = req_capsule_client_get(&req->rq_pill, + &RMF_FILE_SECCTX_NAME); + memcpy(secctx_name, op_data->op_file_secctx_name, + op_data->op_file_secctx_name_size); - if (obddev->u.cli.cl_default_mds_easize > 0) - easize = obddev->u.cli.cl_default_mds_easize; - else - easize = obddev->u.cli.cl_max_mds_easize; + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, easize); - /* pack the intended request */ - mdc_getattr_pack(req, valid, it->it_flags, op_data, easize); + CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n", + op_data->op_file_secctx_name_size, + op_data->op_file_secctx_name); + } else { + req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, + RCL_SERVER, 0); + } - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize); - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - req->rq_import->imp_connect_data.ocd_max_easize); ptlrpc_request_set_replen(req); RETURN(req); } @@ -483,10 +589,11 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); + LIST_HEAD(cancels); struct ptlrpc_request *req; struct ldlm_intent *lit; struct layout_intent *layout; - int rc; + int count = 0, rc; ENTRY; req = ptlrpc_request_alloc(class_exp2cliimp(exp), @@ -494,8 +601,15 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, if (req == NULL) RETURN(ERR_PTR(-ENOMEM)); + if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) && + (it->it_flags & FMODE_WRITE)) { + count = mdc_resource_get_unused(exp, &op_data->op_fid2, + &cancels, LCK_EX, + MDS_INODELOCK_LAYOUT); + } + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc) { ptlrpc_request_free(req); RETURN(ERR_PTR(rc)); @@ -603,14 +717,14 @@ static int mdc_finish_enqueue(struct obd_export *exp, * It's important that we do this first! Otherwise we might exit the * function without doing so, and try to replay a failed create * (bug 3440) */ - if (it->it_op & IT_OPEN && req->rq_replay && + if (it->it_op & IT_OPEN && req->rq_replay && (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0)) mdc_clear_replay_flag(req, it->it_status); - DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d", + DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d", it->it_op, it->it_disposition, it->it_status); - /* We know what to expect, so we do any byte flipping required here */ + /* We know what to expect, so we do any byte flipping required here */ if (it_has_reply_body(it)) { body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (body == NULL) { @@ -629,6 +743,12 @@ static int mdc_finish_enqueue(struct obd_export *exp, mdc_set_open_replay_data(NULL, NULL, it); } + if (it_disposition(it, DISP_OPEN_CREATE) && + !it_open_error(DISP_OPEN_CREATE, it)) { + lprocfs_counter_incr(exp->exp_obd->obd_md_stats, + LPROC_MD_CREATE); + } + if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) { void *eadata; @@ -671,6 +791,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, /* maybe the lock was granted right away and layout * is packed into RMF_DLM_LVB of req */ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); + CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n", + class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno); if (lvb_len > 0) { lvb_data = req_capsule_server_sized_get(pill, &RMF_DLM_LVB, lvb_len); @@ -728,7 +850,7 @@ static int mdc_finish_enqueue(struct obd_export *exp, body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) { - LDLM_ERROR(lock, "%s: DoM lock without size.\n", + LDLM_ERROR(lock, "%s: DoM lock without size.", exp->exp_obd->obd_name); GOTO(out_lock, rc = -EPROTO); } @@ -744,6 +866,16 @@ out_lock: RETURN(rc); } +static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it) +{ + if (it != NULL && + (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || + it->it_op == IT_READDIR || + (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE)))) + return true; + return false; +} + /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ static int mdc_enqueue_base(struct obd_export *exp, @@ -755,7 +887,7 @@ static int mdc_enqueue_base(struct obd_export *exp, __u64 extra_lock_flags) { struct obd_device *obddev = class_exp2obd(exp); - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req; __u64 flags, saved_flags = extra_lock_flags; struct ldlm_res_id res_id; static const union ldlm_policy_data lookup_policy = { @@ -768,6 +900,8 @@ static int mdc_enqueue_base(struct obd_export *exp, .l_inodebits = { MDS_INODELOCK_XATTR } }; int generation, resends = 0; struct ldlm_reply *lockrep; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize; enum lvb_type lvb_type = 0; int rc; ENTRY; @@ -780,34 +914,39 @@ static int mdc_enqueue_base(struct obd_export *exp, LASSERT(policy == NULL); saved_flags |= LDLM_FL_HAS_INTENT; - if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) + if (it->it_op & (IT_GETATTR | IT_READDIR)) policy = &update_policy; else if (it->it_op & IT_LAYOUT) policy = &layout_policy; - else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) + else if (it->it_op & IT_GETXATTR) policy = &getxattr_policy; else policy = &lookup_policy; } - generation = obddev->u.cli.cl_import->imp_generation; + generation = obddev->u.cli.cl_import->imp_generation; + if (!it || (it->it_op & (IT_OPEN | IT_CREAT))) + acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); + else + acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + resend: - flags = saved_flags; + flags = saved_flags; if (it == NULL) { /* The only way right now is FLOCK. */ LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", einfo->ei_type); res_id.name[3] = LDLM_FLOCK; + req = ldlm_enqueue_pack(exp, 0); } else if (it->it_op & IT_OPEN) { - req = mdc_intent_open_pack(exp, it, op_data); - } else if (it->it_op & IT_UNLINK) { - req = mdc_intent_unlink_pack(exp, it, op_data); + req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - req = mdc_intent_getattr_pack(exp, it, op_data); + req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & IT_READDIR) { req = mdc_enqueue_pack(exp, 0); } else if (it->it_op & IT_LAYOUT) { - if (!imp_connect_lvb_type(class_exp2cliimp(exp))) + if (!imp_connect_lvb_type(imp)) RETURN(-EOPNOTSUPP); req = mdc_intent_layout_pack(exp, it, op_data); lvb_type = LVB_T_LAYOUT; @@ -827,20 +966,7 @@ resend: req->rq_sent = ktime_get_real_seconds() + resends; } - /* It is important to obtain modify RPC slot first (if applicable), so - * that threads that are waiting for a modify RPC slot are not polluting - * our rpcs in flight counter. - * We do not do flock request limiting, though */ - if (it) { - mdc_get_mod_rpc_slot(req, it); - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - mdc_put_mod_rpc_slot(req, it); - mdc_clear_replay_flag(req, 0); - ptlrpc_req_finished(req); - RETURN(rc); - } - } + einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it); /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() @@ -851,6 +977,7 @@ resend: rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, 0, lvb_type, lockh, 0); + if (!it) { /* For flock requests we immediatelly return without further delay and let caller deal with the rest, since rest of @@ -864,12 +991,10 @@ resend: (einfo->ei_type == LDLM_FLOCK) && (einfo->ei_mode == LCK_NL)) goto resend; + ptlrpc_req_finished(req); RETURN(rc); } - obd_put_request_slot(&obddev->u.cli); - mdc_put_mod_rpc_slot(req, it); - if (rc < 0) { CDEBUG(D_INFO, "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n", @@ -909,6 +1034,16 @@ resend: } } + if ((int)lockrep->lock_policy_res2 == -ERANGE && + it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) && + acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) { + mdc_clear_replay_flag(req, -ERANGE); + ptlrpc_req_finished(req); + acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); + goto resend; + } + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); if (rc < 0) { if (lustre_handle_is_used(lockh)) { @@ -1196,16 +1331,15 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, } static int mdc_intent_getattr_async_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *args, int rc) + struct ptlrpc_request *req, + void *args, int rc) { struct mdc_getattr_args *ga = args; - struct obd_export *exp = ga->ga_exp; - struct md_enqueue_info *minfo = ga->ga_minfo; + struct obd_export *exp = ga->ga_exp; + struct md_enqueue_info *minfo = ga->ga_minfo; struct ldlm_enqueue_info *einfo = &minfo->mi_einfo; struct lookup_intent *it; struct lustre_handle *lockh; - struct obd_device *obddev; struct ldlm_reply *lockrep; __u64 flags = LDLM_FL_HAS_INTENT; ENTRY; @@ -1213,9 +1347,6 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, it = &minfo->mi_it; lockh = &minfo->mi_lockh; - obddev = class_exp2obd(exp); - - obd_put_request_slot(&obddev->u.cli); if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) rc = -ETIMEDOUT; @@ -1252,7 +1383,6 @@ int mdc_intent_getattr_async(struct obd_export *exp, struct lookup_intent *it = &minfo->mi_it; struct ptlrpc_request *req; struct mdc_getattr_args *ga; - struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id; union ldlm_policy_data policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP | @@ -1266,16 +1396,13 @@ int mdc_intent_getattr_async(struct obd_export *exp, PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); fid_build_reg_res_name(&op_data->op_fid1, &res_id); - req = mdc_intent_getattr_pack(exp, it, op_data); + /* If the MDT return -ERANGE because of large ACL, then the sponsor + * of the async getattr RPC will handle that by itself. */ + req = mdc_intent_getattr_pack(exp, it, op_data, + LUSTRE_POSIX_ACL_MAX_SIZE_OLD); if (IS_ERR(req)) RETURN(PTR_ERR(req)); - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - ptlrpc_req_finished(req); - RETURN(rc); - } - /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() * to avoid possible races. It is safe to have glimpse handler @@ -1286,13 +1413,11 @@ int mdc_intent_getattr_async(struct obd_export *exp, rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy, &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) { - obd_put_request_slot(&obddev->u.cli); ptlrpc_req_finished(req); RETURN(rc); } - CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args)); - ga = ptlrpc_req_async_args(req); + ga = ptlrpc_req_async_args(ga, req); ga->ga_exp = exp; ga->ga_minfo = minfo;