Whamcloud - gitweb
LU-4684 mdt: improve directory stripe lock
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index f307bd6..602072d 100644 (file)
@@ -290,82 +290,49 @@ static int mdt_remote_permission(struct mdt_thread_info *info)
 }
 
 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
-                            struct mdt_object *obj, __u64 ibits,
-                            struct mdt_lock_handle *s0_lh,
-                            struct mdt_object *s0_obj,
+                            struct mdt_object *obj,
                             struct ldlm_enqueue_info *einfo,
                             int decref)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
+       struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
        struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
        int i;
-       int rc;
-       ENTRY;
-
-       if (!S_ISDIR(obj->mot_header.loh_attr))
-               RETURN(0);
 
-       /* Unlock stripe 0 */
-       if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
-               LASSERT(s0_obj != NULL);
-               mdt_object_unlock_put(mti, s0_obj, s0_lh, decref);
-       }
+       LASSERT(S_ISDIR(obj->mot_header.loh_attr));
+       LASSERT(slave_locks);
 
        memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = ibits;
-
-       if (slave_locks != NULL) {
-               LASSERT(s0_lh != NULL);
-               for (i = 1; i < slave_locks->count; i++) {
-                       /* borrow s0_lh temporarily to do mdt unlock */
-                       mdt_lock_reg_init(s0_lh, einfo->ei_mode);
-                       s0_lh->mlh_rreg_lh = slave_locks->handles[i];
-                       mdt_object_unlock(mti, NULL, s0_lh, decref);
-                       slave_locks->handles[i].cookie = 0ull;
-               }
+       policy->l_inodebits.bits = einfo->ei_inodebits;
+       mdt_lock_handle_init(lh);
+       mdt_lock_reg_init(lh, einfo->ei_mode);
+       for (i = 0; i < slave_locks->ha_count; i++) {
+               if (test_bit(i, (void *)slave_locks->ha_map))
+                       lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+               else
+                       lh->mlh_reg_lh = slave_locks->ha_handles[i];
+               mdt_object_unlock(mti, NULL, lh, decref);
+               slave_locks->ha_handles[i].cookie = 0ull;
        }
 
-       rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
-                             policy);
-       RETURN(rc);
+       return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
+                               policy);
 }
 
-static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
-                          struct lu_fid *fid)
+static inline int mdt_object_striped(struct mdt_thread_info *mti,
+                                    struct mdt_object *obj)
 {
-       struct lu_buf *buf = &mti->mti_buf;
-       struct lmv_mds_md_v1 *lmv;
        int rc;
-       ENTRY;
 
        if (!S_ISDIR(obj->mot_header.loh_attr))
-               RETURN(0);
+               return 0;
 
-       buf->lb_buf = mti->mti_xattr_buf;
-       buf->lb_len = sizeof(mti->mti_xattr_buf);
-       rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf,
+       rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), &LU_BUF_NULL,
                          XATTR_NAME_LMV);
-       if (rc == -ERANGE) {
-               rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV);
-               if (rc > 0) {
-                       buf->lb_buf = mti->mti_big_lmm;
-                       buf->lb_len = mti->mti_big_lmmsize;
-               }
-       }
-
-       if (rc == -ENODATA || rc == -ENOENT)
-               RETURN(0);
-
        if (rc <= 0)
-               RETURN(rc);
-
-       lmv = buf->lb_buf;
-       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
-               RETURN(-EINVAL);
+               return rc == -ENODATA ? 0 : rc;
 
-       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
-
-       RETURN(rc);
+       return 1;
 }
 
 /**
@@ -374,40 +341,12 @@ static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
  **/
 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
                           enum ldlm_mode mode, __u64 ibits,
-                          struct lu_fid *s0_fid,
-                          struct mdt_lock_handle *s0_lh,
-                          struct mdt_object **s0_objp,
                           struct ldlm_enqueue_info *einfo)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
-       int rc;
-       ENTRY;
-
-       memset(einfo, 0, sizeof(*einfo));
-
-       rc = mdt_init_slaves(mti, obj, s0_fid);
-       if (rc <= 0)
-               RETURN(rc);
 
        LASSERT(S_ISDIR(obj->mot_header.loh_attr));
 
-       if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) {
-               /* Except migrating object, whose 0_stripe and master
-                * object are the same object, 0_stripe and master
-                * object are different, though they are in the same
-                * MDT, to avoid adding osd_object_lock here, so we
-                * will enqueue the stripe0 lock in MDT0 for now */
-               *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid);
-               if (IS_ERR(*s0_objp))
-                       RETURN(PTR_ERR(*s0_objp));
-
-               rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true);
-               if (rc < 0) {
-                       mdt_object_put(mti->mti_env, *s0_objp);
-                       RETURN(rc);
-               }
-       }
-
        einfo->ei_type = LDLM_IBITS;
        einfo->ei_mode = mode;
        einfo->ei_cb_bl = mdt_remote_blocking_ast;
@@ -415,12 +354,56 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        einfo->ei_cb_cp = ldlm_completion_ast;
        einfo->ei_enq_slave = 1;
        einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
+       einfo->ei_inodebits = ibits;
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
-       rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
-                           policy);
-       RETURN(rc);
+       return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
+                             policy);
+}
+
+static inline int mdt_reint_striped_lock(struct mdt_thread_info *info,
+                                        struct mdt_object *o,
+                                        struct mdt_lock_handle *lh,
+                                        __u64 ibits,
+                                        struct ldlm_enqueue_info *einfo,
+                                        bool cos_incompat)
+{
+       int rc;
+
+       LASSERT(!mdt_object_remote(o));
+
+       memset(einfo, 0, sizeof(*einfo));
+
+       rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
+       if (rc)
+               return rc;
+
+       rc = mdt_object_striped(info, o);
+       if (rc != 1) {
+               if (rc < 0)
+                       mdt_object_unlock(info, o, lh, rc);
+               return rc;
+       }
+
+       rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
+       if (rc) {
+               mdt_object_unlock(info, o, lh, rc);
+               if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+                       rc = 0;
+       }
+
+       return rc;
+}
+
+static inline void
+mdt_reint_striped_unlock(struct mdt_thread_info *info, struct mdt_object *o,
+                        struct mdt_lock_handle *lh,
+                        struct ldlm_enqueue_info *einfo, int decref)
+{
+       if (einfo->ei_cbdata)
+               mdt_unlock_slaves(info, o, einfo, decref);
+       mdt_object_unlock(info, o, lh, decref);
 }
 
 /*
@@ -536,15 +519,15 @@ static int mdt_create(struct mdt_thread_info *info)
         */
        if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
                struct mdt_lock_handle *lhc;
-               struct mdt_lock_handle *s0_lh;
-               struct mdt_object *s0_obj = NULL;
-               struct ldlm_enqueue_info *einfo;
-               struct lu_fid *s0_fid = &info->mti_tmp_fid1;
-               bool cos_incompat = false;
-
-               rc = mdt_init_slaves(info, child, s0_fid);
-               if (rc > 0) {
-                       cos_incompat = true;
+               struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+               bool cos_incompat;
+
+               rc = mdt_object_striped(info, child);
+               if (rc < 0)
+                       GOTO(put_child, rc);
+
+               cos_incompat = rc;
+               if (cos_incompat) {
                        if (!mdt_object_remote(parent)) {
                                mdt_object_unlock(info, parent, lh, 1);
                                mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
@@ -556,26 +539,16 @@ static int mdt_create(struct mdt_thread_info *info)
                        }
                }
 
-               einfo = &info->mti_einfo;
                lhc = &info->mti_lh[MDT_LH_CHILD];
                mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_PW);
-               rc = mdt_reint_object_lock(info, child, lhc,
-                                          MDS_INODELOCK_UPDATE,
-                                          cos_incompat);
+               rc = mdt_reint_striped_lock(info, child, lhc,
+                                           MDS_INODELOCK_UPDATE, einfo,
+                                           cos_incompat);
                if (rc)
                        GOTO(put_child, rc);
-               mdt_object_unlock(info, child, lhc, rc);
-
-               s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-               mdt_lock_handle_init(s0_lh);
-               mdt_lock_reg_init(s0_lh, LCK_PW);
-               rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE,
-                                    s0_fid, s0_lh, &s0_obj, einfo);
-               mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh,
-                                 s0_obj, einfo, rc);
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO)
-                       rc = 0;
+
+               mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
        }
 
        /* Return fid & attr to client. */
@@ -599,16 +572,15 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                        (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
        __u64 lockpart = MDS_INODELOCK_UPDATE;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct lu_fid *s0_fid = &info->mti_tmp_fid1;
-       struct mdt_lock_handle *s0_lh = NULL;
-       struct mdt_object *s0_obj = NULL;
-       bool cos_incompat = false;
+       bool cos_incompat;
        int rc;
        ENTRY;
 
-       rc = mdt_init_slaves(info, mo, s0_fid);
-       if (rc > 0)
-               cos_incompat = true;
+       rc = mdt_object_striped(info, mo);
+       if (rc < 0)
+               RETURN(rc);
+
+       cos_incompat = rc;
 
        lh = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_reg_init(lh, LCK_PW);
@@ -620,17 +592,11 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
                lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
 
-       rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat);
+       rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
+                                   cos_incompat);
        if (rc != 0)
                RETURN(rc);
 
-       s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_PW);
-       rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj,
-                            einfo);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
-
        /* all attrs are packed into mti_attr in unpack_setattr */
        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                       OBD_FAIL_MDS_REINT_SETATTR_WRITE);
@@ -659,8 +625,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        mdt_dom_obj_lvb_update(info->mti_env, mo, false);
        EXIT;
 out_unlock:
-       mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
-       mdt_object_unlock(info, mo, lh, rc);
+       mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
        return rc;
 }
 
@@ -872,9 +837,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_lock_handle *parent_lh;
        struct mdt_lock_handle *child_lh;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct lu_fid *s0_fid = &info->mti_tmp_fid2;
-       struct mdt_lock_handle *s0_lh = NULL;
-       struct mdt_object *s0_obj = NULL;
        __u64 lock_ibits;
        bool cos_incompat = false;
        int no_name = 0;
@@ -950,11 +912,17 @@ relock:
        if (IS_ERR(mc))
                GOTO(unlock_parent, rc = PTR_ERR(mc));
 
-       if (!cos_incompat && mdt_init_slaves(info, mc, s0_fid) > 0) {
-               cos_incompat = true;
-               mdt_object_put(info->mti_env, mc);
-               mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
-               goto relock;
+       if (!cos_incompat) {
+               rc = mdt_object_striped(info, mc);
+               if (rc < 0)
+                       GOTO(unlock_parent, rc = PTR_ERR(mc));
+
+               cos_incompat = rc;
+               if (cos_incompat) {
+                       mdt_object_put(info->mti_env, mc);
+                       mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
+                       goto relock;
+               }
        }
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
@@ -1021,10 +989,10 @@ relock:
                lock_ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits,
-                                  cos_incompat);
+       rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
+                                   cos_incompat);
        if (rc != 0)
-               GOTO(unlock_child, rc);
+               GOTO(put_child, rc);
 
        /*
         * Now we can only make sure we need MA_INODE, in mdd layer, will check
@@ -1033,13 +1001,6 @@ relock:
        ma->ma_need = MA_INODE;
        ma->ma_valid = 0;
 
-       s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_EX);
-       rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid,
-                            s0_lh, &s0_obj, einfo);
-       if (rc != 0)
-               GOTO(unlock_child, rc);
-
        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                       OBD_FAIL_MDS_REINT_UNLINK_WRITE);
        /* save version when object is locked */
@@ -1086,9 +1047,7 @@ out_stat:
        EXIT;
 
 unlock_child:
-       mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
-                         rc);
-       mdt_object_unlock(info, mc, child_lh, rc);
+       mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
 put_child:
        mdt_object_put(info->mti_env, mc);
 unlock_parent: