LDLM_LOCK_PUT(lock);
rc = 0;
} else {
+ bool try_layout = false;
+
relock:
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
mdt_lock_handle_init(lhc);
- if (child_bits == MDS_INODELOCK_LAYOUT)
- mdt_lock_reg_init(lhc, LCK_CR);
- else
- mdt_lock_reg_init(lhc, LCK_PR);
+ mdt_lock_reg_init(lhc, LCK_PR);
if (mdt_object_exists(child) == 0) {
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
if (unlikely(rc != 0))
GOTO(out_child, rc);
- /* layout lock is used only on regular files */
- if ((ma->ma_valid & MA_INODE) &&
- (ma->ma_attr.la_valid & LA_MODE) &&
- !S_ISREG(ma->ma_attr.la_mode))
- child_bits &= ~MDS_INODELOCK_LAYOUT;
-
/* If the file has not been changed for some time, we
* return not only a LOOKUP lock, but also an UPDATE
* lock and this might save us RPC on later STAT. For
child_bits |= MDS_INODELOCK_UPDATE;
}
- rc = mdt_object_lock(info, child, lhc, child_bits,
- MDT_CROSS_LOCK);
+ /* layout lock must be granted in a best-effort way
+ * for IT operations */
+ LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+ exp_connect_layout(info->mti_exp) &&
+ S_ISREG(lu_object_attr(&child->mot_obj.mo_lu)) &&
+ ldlm_rep != NULL) {
+ /* try to grant layout lock for regular file. */
+ try_layout = true;
+ }
+ rc = 0;
+ if (try_layout) {
+ child_bits |= MDS_INODELOCK_LAYOUT;
+ /* try layout lock, it may fail to be granted due to
+ * contention at LOOKUP or UPDATE */
+ if (!mdt_object_lock_try(info, child, lhc, child_bits,
+ MDT_CROSS_LOCK)) {
+ child_bits &= ~MDS_INODELOCK_LAYOUT;
+ LASSERT(child_bits != 0);
+ rc = mdt_object_lock(info, child, lhc,
+ child_bits, MDT_CROSS_LOCK);
+ } else {
+ ma_need |= MA_LOV;
+ }
+ } else {
+ rc = mdt_object_lock(info, child, lhc, child_bits,
+ MDT_CROSS_LOCK);
+ }
if (unlikely(rc != 0))
GOTO(out_child, rc);
}
if (lock &&
lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE &&
S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu)))
- ma_need = MA_SOM;
+ ma_need |= MA_SOM;
/* finally, we can get attr for child. */
mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
/*
* DLM handlers.
*/
+
static struct ldlm_callback_suite cbs = {
.lcs_completion = ldlm_server_completion_ast,
.lcs_blocking = ldlm_server_blocking_ast,
RETURN(rc);
}
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits, int locality)
+static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ bool nonblock, int locality)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
ldlm_policy_data_t *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
+ __u64 dlmflags;
int rc;
ENTRY;
memset(policy, 0, sizeof(*policy));
fid_build_reg_res_name(mdt_object_fid(o), res_id);
+ dlmflags = LDLM_FL_ATOMIC_CB;
+ if (nonblock)
+ dlmflags |= LDLM_FL_BLOCK_NOWAIT;
+
/*
* Take PDO lock on whole directory and build correct @res_id for lock
* on part of directory.
*/
policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
- policy, res_id, LDLM_FL_ATOMIC_CB,
+ policy, res_id, dlmflags,
&info->mti_exp->exp_handle.h_cookie);
if (unlikely(rc))
RETURN(rc);
* fix it up and turn FL_LOCAL flag off.
*/
rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+ res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
&info->mti_exp->exp_handle.h_cookie);
if (rc)
mdt_object_unlock(info, o, lh, 1);
RETURN(rc);
}
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+ return mdt_object_lock0(info, o, lh, ibits, false, locality);
+}
+
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+ struct mdt_lock_handle tmp = *lh;
+ int rc;
+
+ rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
+ if (rc == 0)
+ *lh = tmp;
+
+ return rc == 0;
+}
+
/**
* Save a lock within request object.
*
struct mdt_thread_info *info,
struct ldlm_lock **,
__u64);
+static int mdt_intent_layout(enum mdt_it_code opcode,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **,
+ __u64);
static int mdt_intent_reint(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **,
.it_flags = 0,
.it_act = NULL
},
- [MDT_IT_LAYOUT] = {
- .it_fmt = &RQF_LDLM_INTENT_GETATTR,
- .it_flags = HABEO_REFERO,
- .it_act = mdt_intent_getattr
- }
+ [MDT_IT_LAYOUT] = {
+ .it_fmt = &RQF_LDLM_INTENT_LAYOUT,
+ .it_flags = 0,
+ .it_act = mdt_intent_layout
+ }
};
int mdt_intent_lock_replace(struct mdt_thread_info *info,
case MDT_IT_GETATTR:
child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
break;
- case MDT_IT_LAYOUT: {
- static int printed = 0;
-
- if (!printed) {
- CERROR("layout lock not supported by this version\n");
- printed = 1;
- }
- GOTO(out_shrink, rc = -EINVAL);
- break;
- }
default:
CERROR("Unsupported intent (%d)\n", opcode);
GOTO(out_shrink, rc = -EINVAL);
return rc;
}
+static int mdt_intent_layout(enum mdt_it_code opcode,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ __u64 flags)
+{
+ struct layout_intent *layout;
+ int rc;
+ ENTRY;
+
+ if (opcode != MDT_IT_LAYOUT) {
+ CERROR("%s: Unknown intent (%d)\n",
+ info->mti_exp->exp_obd->obd_name, opcode);
+ RETURN(-EINVAL);
+ }
+
+ (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+ req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
+ ldlm_lvbo_size(*lockp));
+ rc = req_capsule_server_pack(info->mti_pill);
+ if (rc != 0)
+ RETURN(-EINVAL);
+
+ layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
+ LASSERT(layout != NULL);
+ if (layout->li_opc == LAYOUT_INTENT_ACCESS)
+ /* return to normal ldlm handling */
+ RETURN(0);
+
+ CERROR("%s: Unsupported layout intent (%d)\n",
+ info->mti_exp->exp_obd->obd_name, layout->li_opc);
+ RETURN(-EINVAL);
+}
+
static int mdt_intent_reint(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
*/
if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
LASSERTF(rc == 0, "Error occurred but lock handle "
- "is still in use\n");
+ "is still in use, rc = %d\n", rc);
rep->lock_policy_res2 = 0;
rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
RETURN(rc);
RETURN(rc);
}
- if (opc == MDT_IT_LAYOUT) {
- (*lockp)->l_lvb_type = LVB_T_LAYOUT;
- /* XXX: set replay RMF_DLM_LVB as the real EA size when LAYOUT
- * lock enabled. */
- } else if (opc == MDT_IT_READDIR) {
- req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
- }
-
flv = &mdt_it_flavor[opc];
if (flv->it_fmt != NULL)
req_capsule_extend(pill, flv->it_fmt);