Whamcloud - gitweb
LU-5223 lmv: build master LMV EA dynamically build via readdir
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index d01af8b..f64795f 100644 (file)
@@ -1345,15 +1345,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                       "ldlm_rep = %p\n",
                       PFID(mdt_object_fid(child)), ldlm_rep);
 
-               if (is_resent) {
-                       /* Do not take lock for resent case. */
-                       lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-                       LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
-                                lhc->mlh_reg_lh.cookie);
-                       LASSERT(fid_res_name_eq(mdt_object_fid(child),
-                                               &lock->l_resource->lr_name));
-                       LDLM_LOCK_PUT(lock);
-               } else {
+               rc = mdt_check_resent_lock(info, child, lhc);
+               if (rc < 0) {
+                       RETURN(-EPROTO);
+               } else if (rc > 0) {
                        mdt_lock_handle_init(lhc);
                        mdt_lock_reg_init(lhc, LCK_PR);
 
@@ -1478,30 +1473,11 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
        if (unlikely(IS_ERR(child)))
                GOTO(out_parent, rc = PTR_ERR(child));
-       if (is_resent) {
-               /* Do not take lock for resent case. */
-               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-               LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
-                        lhc->mlh_reg_lh.cookie);
 
-               if (!fid_res_name_eq(mdt_object_fid(child),
-                                    &lock->l_resource->lr_name)) {
-                       LASSERTF(fid_res_name_eq(mdt_object_fid(parent),
-                                                &lock->l_resource->lr_name),
-                                "Lock res_id: "DLDLMRES", fid: "DFID"\n",
-                                PLDLMRES(lock->l_resource),
-                                PFID(mdt_object_fid(parent)));
-                       CWARN("Although resent, but still not get child lock"
-                             "parent:"DFID" child:"DFID"\n",
-                             PFID(mdt_object_fid(parent)),
-                             PFID(mdt_object_fid(child)));
-                       lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
-                       LDLM_LOCK_PUT(lock);
-                       GOTO(relock, 0);
-               }
-               LDLM_LOCK_PUT(lock);
-       } else {
-relock:
+       rc = mdt_check_resent_lock(info, child, lhc);
+       if (rc < 0) {
+               GOTO(out_child, rc);
+       } else if (rc > 0) {
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_PR);
@@ -2335,6 +2311,35 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
        RETURN(0);
 }
 
+int mdt_check_resent_lock(struct mdt_thread_info *info,
+                         struct mdt_object *mo,
+                         struct mdt_lock_handle *lhc)
+{
+       /* the lock might already be gotten in ldlm_handle_enqueue() */
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               struct ptlrpc_request *req = mdt_info_req(info);
+               struct ldlm_lock      *lock;
+
+               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+               LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+               LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
+                        lhc->mlh_reg_lh.cookie);
+
+               if (!fid_res_name_eq(mdt_object_fid(mo),
+                                    &lock->l_resource->lr_name)) {
+                       CWARN("%s: Although resent, but still not "
+                             "get child lock:"DFID"\n",
+                             info->mti_exp->exp_obd->obd_name,
+                             PFID(mdt_object_fid(mo)));
+                       LDLM_LOCK_PUT(lock);
+                       RETURN(-EPROTO);
+               }
+               LDLM_LOCK_PUT(lock);
+               return 0;
+       }
+       return 1;
+}
+
 int mdt_remote_object_lock(struct mdt_thread_info *mti,
                           struct mdt_object *o, const struct lu_fid *fid,
                           struct lustre_handle *lh, ldlm_mode_t mode,
@@ -2991,19 +2996,17 @@ static struct mdt_it_flavor {
 };
 
 static int
-mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp,
-                       struct ldlm_lock *new_lock, struct mdt_lock_handle *lh,
+mdt_intent_lock_replace(struct mdt_thread_info *info,
+                       struct ldlm_lock **lockp,
+                       struct mdt_lock_handle *lh,
                        __u64 flags)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct ldlm_lock       *lock = *lockp;
+       struct ldlm_lock       *new_lock;
 
-        /*
-         * Get new lock only for cases when possible resent did not find any
-         * lock.
-         */
-        if (new_lock == NULL)
-                new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
+       /* If possible resent found a lock, @lh is set to its handle */
+       new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
 
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
                 lh->mlh_reg_lh.cookie = 0;
@@ -3035,6 +3038,8 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp,
                  */
                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
                         MSG_RESENT);
+
+               LDLM_LOCK_RELEASE(new_lock);
                 lh->mlh_reg_lh.cookie = 0;
                 RETURN(ELDLM_LOCK_REPLACED);
         }
@@ -3076,52 +3081,30 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp,
 
 static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                                    struct ldlm_lock *new_lock,
-                                   struct ldlm_lock **old_lock,
                                    struct mdt_lock_handle *lh,
-                                   enum mdt_it_code opcode)
+                                   __u64 flags)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
-        struct obd_export      *exp = req->rq_export;
-        struct lustre_handle    remote_hdl;
         struct ldlm_request    *dlmreq;
-        struct ldlm_lock       *lock;
 
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
                 return;
 
         dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
-        remote_hdl = dlmreq->lock_handle[0];
-       /* If the client does not require open lock, it does not need to
-        * search lock in exp_lock_hash, since the server thread will
-        * make sure the lock will be released, and the resend request
-        * can always re-enqueue the lock */
-       if ((opcode != MDT_IT_OPEN) || (opcode == MDT_IT_OPEN &&
-           info->mti_spec.sp_cr_flags & MDS_OPEN_LOCK)) {
-               /* In the function below, .hs_keycmp resolves to
-                * ldlm_export_lock_keycmp() */
-               /* coverity[overrun-buffer-val] */
-               lock = cfs_hash_lookup(exp->exp_lock_hash, &remote_hdl);
-               if (lock) {
-                       lock_res_and_lock(lock);
-                       if (lock != new_lock) {
-                               lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
-                               lh->mlh_reg_mode = lock->l_granted_mode;
-
-                               LDLM_DEBUG(lock, "Restoring lock cookie");
-                               DEBUG_REQ(D_DLMTRACE, req,
-                                         "restoring lock cookie "LPX64,
-                                         lh->mlh_reg_lh.cookie);
-                               if (old_lock)
-                                       *old_lock = LDLM_LOCK_GET(lock);
-                               cfs_hash_put(exp->exp_lock_hash,
-                                            &lock->l_exp_hash);
-                               unlock_res_and_lock(lock);
-                               return;
-                       }
-                       cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash);
-                       unlock_res_and_lock(lock);
-               }
+
+       /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
+        * lock was found by ldlm_handle_enqueue(); if so @lh must be
+        * initialized. */
+       if (flags & LDLM_FL_RESENT) {
+               lh->mlh_reg_lh.cookie = new_lock->l_handle.h_cookie;
+               lh->mlh_reg_mode = new_lock->l_granted_mode;
+
+               LDLM_DEBUG(new_lock, "Restoring lock cookie");
+               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie "LPX64,
+                         lh->mlh_reg_lh.cookie);
+               return;
        }
+
         /*
          * If the xid matches, then we know this is a resent request, and allow
          * it. (It's probably an OPEN, for which we don't send a lock.
@@ -3136,8 +3119,8 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
          */
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
-        DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
-                  remote_hdl.cookie);
+       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
+                 dlmreq->lock_handle[0].cookie);
 }
 
 static int mdt_intent_getxattr(enum mdt_it_code opcode,
@@ -3154,7 +3137,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
         * (for the resend case) or a new lock. Below we will use it to
         * replace the original lock.
         */
-       mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode);
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
        if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
                rc = mdt_object_lock(info, info->mti_object, lhc,
@@ -3166,7 +3149,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
 
        grc = mdt_getxattr(info);
 
-       rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
 
        if (mdt_info_req(info)->rq_repmsg != NULL)
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
@@ -3184,7 +3167,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                              __u64 flags)
 {
         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
-        struct ldlm_lock       *new_lock = NULL;
         __u64                   child_bits;
         struct ldlm_reply      *ldlm_rep;
         struct mdt_body        *reqbody;
@@ -3223,7 +3205,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
 
        /* Get lock from request for possible resent case. */
-       mdt_intent_fixup_resent(info, *lockp, &new_lock, lhc, opcode);
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
 
        rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
        ldlm_rep->lock_policy_res2 = clear_serious(rc);
@@ -3236,7 +3218,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
         }
 
-        rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
         EXIT;
 out_ucred:
         mdt_exit_ucred(info);
@@ -3253,6 +3235,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                             struct ldlm_lock **lockp,
                             __u64 flags)
 {
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT];
        struct layout_intent *layout;
        struct lu_fid *fid;
        struct mdt_object *obj = NULL;
@@ -3268,6 +3251,9 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
 
+       /* Get lock from request for possible resent case. */
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
+
        obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
        if (IS_ERR(obj))
                RETURN(PTR_ERR(obj));
@@ -3275,8 +3261,10 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        if (mdt_object_exists(obj) && !mdt_object_remote(obj)) {
                /* get the length of lsm */
                rc = mdt_attr_get_eabuf_size(info, obj);
-               if (rc < 0)
+               if (rc < 0) {
+                       mdt_object_put(info->mti_env, obj);
                        RETURN(rc);
+               }
 
                if (rc > info->mti_mdt->mdt_max_mdsize)
                        info->mti_mdt->mdt_max_mdsize = rc;
@@ -3290,11 +3278,14 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        if (rc != 0)
                RETURN(-EINVAL);
 
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+               rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
+
        layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
        LASSERT(layout != NULL);
        if (layout->li_opc == LAYOUT_INTENT_ACCESS)
-               /* return to normal ldlm handling */
-               RETURN(0);
+               /* return to normal/resent ldlm handling */
+               RETURN(rc);
 
        CERROR("%s: Unsupported layout intent (%d)\n",
                mdt_obd_name(info->mti_mdt), layout->li_opc);
@@ -3329,7 +3320,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
         }
 
        /* Get lock from request for possible resent case. */
-       mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode);
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
 
         rc = mdt_reint_internal(info, lhc, opc);
 
@@ -3348,7 +3339,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
        if (rc == -EREMOTE || mdt_get_disposition(rep, DISP_OPEN_LOCK)) {
                LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
                rep->lock_policy_res2 = 0;
-               rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+               rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
                RETURN(rc);
        }
 
@@ -3381,7 +3372,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                         LASSERTF(rc == 0, "Error occurred but lock handle "
                                  "is still in use, rc = %d\n", rc);
                         rep->lock_policy_res2 = 0;
-                        rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+                       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
                         RETURN(rc);
                 } else {
                         lhc->mlh_reg_lh.cookie = 0ull;
@@ -5340,44 +5331,21 @@ static int mdt_path_current(struct mdt_thread_info *info,
                if (IS_ERR(mdt_obj))
                        GOTO(out, rc = PTR_ERR(mdt_obj));
 
-               if (mdt_object_remote(mdt_obj)) {
+               if (!mdt_object_exists(mdt_obj)) {
                        mdt_object_put(info->mti_env, mdt_obj);
-                       GOTO(remote_out, rc = -EREMOTE);
-               }
-
-               lmv_buf.lb_buf = info->mti_xattr_buf;
-               lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
-
-               /* Check if it is slave stripes */
-               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
-                                 &lmv_buf, XATTR_NAME_LMV);
-               if (rc > 0) {
-                       union lmv_mds_md *lmm = lmv_buf.lb_buf;
-
-                       /* For slave stripes, get its master */
-                       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
-                               struct lmv_mds_md_v1 *lmm1 = &lmm->lmv_md_v1;
-
-                               fid_le_to_cpu(tmpfid, &lmm1->lmv_master_fid);
-                               if (!fid_is_sane(tmpfid)) {
-                                       mdt_object_put(info->mti_env, mdt_obj);
-                                       GOTO(out, rc = -EINVAL);
-                               }
-                               mdt_object_put(info->mti_env, mdt_obj);
-                               pli->pli_fids[pli->pli_fidcount] = *tmpfid;
-                               continue;
-                       }
+                       GOTO(out, rc = -ENOENT);
                }
 
-               if (!mdt_object_exists(mdt_obj)) {
+               if (mdt_object_remote(mdt_obj)) {
                        mdt_object_put(info->mti_env, mdt_obj);
-                       GOTO(out, rc = -ENOENT);
+                       GOTO(remote_out, rc = -EREMOTE);
                }
 
                rc = mdt_links_read(info, mdt_obj, &ldata);
-               mdt_object_put(info->mti_env, mdt_obj);
-               if (rc != 0)
+               if (rc != 0) {
+                       mdt_object_put(info->mti_env, mdt_obj);
                        GOTO(out, rc);
+               }
 
                leh = buf->lb_buf;
                lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
@@ -5398,6 +5366,26 @@ static int mdt_path_current(struct mdt_thread_info *info,
                                pli->pli_linkno++;
                }
 
+               lmv_buf.lb_buf = info->mti_xattr_buf;
+               lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
+               /* Check if it is slave stripes */
+               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+                                 &lmv_buf, XATTR_NAME_LMV);
+               mdt_object_put(info->mti_env, mdt_obj);
+               if (rc > 0) {
+                       union lmv_mds_md *lmm = lmv_buf.lb_buf;
+
+                       /* For slave stripes, get its master */
+                       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
+                               pli->pli_fids[pli->pli_fidcount] = *tmpfid;
+                               continue;
+                       }
+               } else if (rc < 0 && rc != -ENODATA) {
+                       GOTO(out, rc);
+               }
+
+               rc = 0;
+
                /* Pack the name in the end of the buffer */
                ptr -= tmpname->ln_namelen;
                if (ptr - 1 <= pli->pli_path)