RETURN(rc);
}
+/* lock object for open */
+static int mdt_object_open_lock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
+ __u64 *ibits)
+{
+ struct md_attr *ma = &info->mti_attr;
+ __u64 open_flags = info->mti_spec.sp_cr_flags;
+ ldlm_mode_t lm = LCK_CR;
+ bool try_layout = false;
+ bool create_layout = false;
+ int rc = 0;
+ ENTRY;
+
+ *ibits = 0;
+ if (open_flags & MDS_OPEN_LOCK) {
+ if (open_flags & FMODE_WRITE)
+ lm = LCK_CW;
+ else if (open_flags & MDS_FMODE_EXEC)
+ lm = LCK_PR;
+ else
+ lm = LCK_CR;
+
+ *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
+ }
+
+ if (S_ISREG(lu_object_attr(&obj->mot_obj.mo_lu))) {
+ if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
+ md_should_create(open_flags))
+ create_layout = true;
+ if (exp_connect_layout(info->mti_exp) && !create_layout &&
+ ma->ma_need & MA_LOV)
+ try_layout = true;
+ }
+
+ mdt_lock_handle_init(lhc);
+ mdt_lock_reg_init(lhc, lm);
+
+ /* one problem to return layout lock on open is that it may result
+ * in too many layout locks cached on the client side. */
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
+ /* return lookup lock to validate inode at the client side,
+ * this is pretty important otherwise mdt will return layout
+ * lock for each open.
+ * However this is a double-edged sword because changing
+ * permission will revoke huge # of LOOKUP locks. */
+ *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
+ if (!mdt_object_lock_try(info, obj, lhc, *ibits,
+ MDT_CROSS_LOCK)) {
+ *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
+ if (*ibits != 0)
+ rc = mdt_object_lock(info, obj, lhc, *ibits,
+ MDT_CROSS_LOCK);
+ }
+ } else if (*ibits != 0) {
+ rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK);
+ }
+
+ CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
+ ", open_flags = "LPO64", try_layout = %d, rc = %d\n",
+ PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc);
+
+ /* will change layout, revoke layout locks by enqueuing EX lock. */
+ if (rc == 0 && create_layout) {
+ struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+ CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
+ ", open_flags = "LPO64"\n",
+ PFID(mdt_object_fid(obj)), open_flags);
+
+ LASSERT(!try_layout);
+ mdt_lock_handle_init(ll);
+ mdt_lock_reg_init(ll, LCK_EX);
+ rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
+ }
+
+ RETURN(rc);
+}
+
+static void mdt_object_open_unlock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
+ __u64 ibits, int rc)
+{
+ __u64 open_flags = info->mti_spec.sp_cr_flags;
+ struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+ /* Release local layout lock - the layout lock put in MDT_LH_LAYOUT
+ * will never return to client side. */
+ if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
+ LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
+ mdt_object_unlock(info, obj, ll, 1);
+ }
+
+ if (ibits == 0)
+ return;
+
+ if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
+ /* for the open request, the lock will only return to client
+ * if open or layout lock is granted. */
+ rc = 1;
+ }
+ if (rc != 0)
+ mdt_object_unlock(info, obj, lhc, 1);
+}
+
int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
struct mdt_lock_handle *lhc)
{
struct mdt_object *parent= NULL;
struct mdt_object *o;
int rc;
- ldlm_mode_t lm;
+ __u64 ibits;
ENTRY;
if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
mdt_set_disposition(info, rep, (DISP_IT_EXECD |
DISP_LOOKUP_EXECD));
- if (flags & FMODE_WRITE)
- lm = LCK_CW;
- else if (flags & MDS_FMODE_EXEC)
- lm = LCK_PR;
- else
- lm = LCK_CR;
-
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, lm);
- rc = mdt_object_lock(info, o, lhc,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
- MDT_CROSS_LOCK);
+ rc = mdt_attr_get_complex(info, o, ma);
if (rc)
GOTO(out, rc);
- rc = mdt_attr_get_complex(info, o, ma);
+ rc = mdt_object_open_lock(info, o, lhc, &ibits);
if (rc)
GOTO(out, rc);
}
rc = mdt_finish_open(info, parent, o, flags, 0, rep);
-
- if (!(flags & MDS_OPEN_LOCK) || rc)
- mdt_object_unlock(info, o, lhc, 1);
-
if (!rc) {
mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
if (flags & MDS_OPEN_LOCK)
mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
}
-
GOTO(out, rc);
+
out:
+ mdt_object_open_unlock(info, o, lhc, ibits, rc);
mdt_object_put(env, o);
if (parent != NULL)
mdt_object_put(env, parent);
struct lu_fid *child_fid = &info->mti_tmp_fid1;
struct md_attr *ma = &info->mti_attr;
__u64 create_flags = info->mti_spec.sp_cr_flags;
+ __u64 ibits;
struct mdt_reint_record *rr = &info->mti_rr;
struct lu_name *lname;
int result, rc;
LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
- /* get openlock if this is not replay and if a client requested it */
- if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
- ldlm_mode_t lm;
-
- if (create_flags & FMODE_WRITE)
- lm = LCK_CW;
- else if (create_flags & MDS_FMODE_EXEC)
- lm = LCK_PR;
- else
- lm = LCK_CR;
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, lm);
- rc = mdt_object_lock(info, child, lhc,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
- MDT_CROSS_LOCK);
- if (rc) {
- result = rc;
- GOTO(out_child, result);
- } else {
- result = -EREMOTE;
- mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
- }
- }
+ /* get openlock if this is not replay and if a client requested it */
+ if (!req_is_replay(req)) {
+ rc = mdt_object_open_lock(info, child, lhc, &ibits);
+ if (rc != 0) {
+ GOTO(out_child, result = rc);
+ } else if (create_flags & MDS_OPEN_LOCK) {
+ result = -EREMOTE;
+ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+ }
+ }
/* Try to open it now. */
rc = mdt_finish_open(info, parent, child, create_flags,
created, ldlm_rep);
if (rc) {
result = rc;
- if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
- /* openlock was acquired and mdt_finish_open failed -
- drop the openlock */
- mdt_object_unlock(info, child, lhc, 1);
- mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
- }
+ /* openlock will be released if mdt_finish_open failed */
+ mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
if (created) {
ma->ma_need = 0;
ma->ma_valid = 0;
}
EXIT;
out_child:
+ mdt_object_open_unlock(info, child, lhc, ibits,
+ result == -EREMOTE ? 0 : result);
mdt_object_put(info->mti_env, child);
out_parent:
mdt_object_unlock_put(info, parent, lh, result || !created);