Whamcloud - gitweb
LU-2875 mdt: use lu_name for rr_name and rr_tgt
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index d3c3081..30c80e7 100644 (file)
@@ -664,7 +664,8 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
 }
 
 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
-                        struct mdt_object *o, __u64 flags, int created)
+                       struct mdt_object *o, __u64 flags, int created,
+                       struct ldlm_reply *rep)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
@@ -692,6 +693,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 rc = mdt_create_data(info, p, o);
                 if (rc)
                         RETURN(rc);
+
+               if (exp_connect_flags(req->rq_export) & OBD_CONNECT_DISP_STRIPE)
+                       mdt_set_disposition(info, rep, DISP_OPEN_STRIPE);
         }
 
         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
@@ -979,15 +983,15 @@ int mdt_finish_open(struct mdt_thread_info *info,
                                         repbody->valid |= OBD_MD_FLEASIZE;
                         }
                        mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
-                        RETURN(0);
-                }
-        }
+                       RETURN(0);
+               }
+       }
 
-        rc = mdt_mfd_open(info, p, o, flags, created);
+       rc = mdt_mfd_open(info, p, o, flags, created, rep);
        if (!rc)
                mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 extern void mdt_req_from_lcd(struct ptlrpc_request *req,
@@ -1272,6 +1276,15 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        ", open_flags = "LPO64"\n",
                        PFID(mdt_object_fid(obj)), open_flags);
 
+               /* We cannot enqueue another lock for the same resource we
+                * already have a lock for, due to mechanics of waiting list
+                * iterating in ldlm, see LU-3601.
+                * As such we'll drop the open lock we just got above here,
+                * it's ok not to have this open lock as it's main purpose is to
+                * flush unused cached client open handles. */
+               if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+                       mdt_object_unlock(info, obj, lhc, 1);
+
                LASSERT(!try_layout);
                mdt_lock_handle_init(ll);
                mdt_lock_reg_init(ll, LCK_EX);
@@ -1361,12 +1374,13 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
                rc = 1;
        }
 
-       if (rc != 0) {
+       if (rc != 0 || !lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                struct ldlm_reply       *ldlm_rep;
 
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
                mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-               mdt_object_unlock(info, obj, lhc, 1);
+               if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+                       mdt_object_unlock(info, obj, lhc, 1);
        }
        RETURN_EXIT;
 }
@@ -1555,7 +1569,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         __u64                    create_flags = info->mti_spec.sp_cr_flags;
        __u64                    ibits = 0;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct lu_name          *lname;
         int                      result, rc;
         int                      created = 0;
         __u32                    msg_flags;
@@ -1589,11 +1602,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
             info->mti_spec.u.sp_ea.eadata == NULL)
                 GOTO(out, result = err_serious(-EINVAL));
 
-        CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
-               "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
-               PFID(rr->rr_fid1), rr->rr_name,
-               PFID(rr->rr_fid2), create_flags,
-               ma->ma_attr.la_mode, msg_flags);
+       CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") "
+              "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
+              PFID(rr->rr_fid1), PNAME(&rr->rr_name),
+              PFID(rr->rr_fid2), create_flags,
+              ma->ma_attr.la_mode, msg_flags);
+
        if (info->mti_cross_ref) {
                /* This is cross-ref open */
                mdt_set_disposition(info, ldlm_rep,
@@ -1621,7 +1635,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        GOTO(out, result = -EFAULT);
                }
                CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
-       } else if ((rr->rr_namelen == 0 && create_flags & MDS_OPEN_LOCK) ||
+       } else if ((!lu_name_is_valid(&rr->rr_name) &&
+                   (create_flags & MDS_OPEN_LOCK)) ||
                   (create_flags & MDS_OPEN_BY_FID)) {
                result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
                /* If result is 0 then open by FID has found the file
@@ -1636,9 +1651,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                    result != -EREMOTE)
                        GOTO(out, result);
 
-               if (unlikely(rr->rr_namelen == 0))
-                       GOTO(out, result = -EINVAL);
-
                CDEBUG(D_INFO, "No object(2), continue as regular open.\n");
        }
 
@@ -1648,9 +1660,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         mdt_set_disposition(info, ldlm_rep,
                             (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
+       if (!lu_name_is_valid(&rr->rr_name))
+               GOTO(out, result = -EPROTO);
+
         lh = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
-                          LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
+       mdt_lock_pdo_init(lh,
+                         (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
+                         &rr->rr_name);
 
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
@@ -1664,12 +1680,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         fid_zero(child_fid);
 
-        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-        result = mdo_lookup(info->mti_env, mdt_object_child(parent),
-                            lname, child_fid, &info->mti_spec);
-        LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
-                 "looking for "DFID"/%s, result fid="DFID"\n",
-                 PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid));
+       result = mdo_lookup(info->mti_env, mdt_object_child(parent),
+                           &rr->rr_name, child_fid, &info->mti_spec);
+
+       LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
+                "looking for "DFID"/"DNAME", found FID = "DFID"\n",
+                PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
+                PFID(child_fid));
 
         if (result != 0 && result != -ENOENT && result != -ESTALE)
                 GOTO(out_parent, result);
@@ -1717,12 +1734,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
                        GOTO(out_child, result = -EPERM);
 
-                /* save versions in reply */
-                mdt_version_get_save(info, parent, 0);
-                mdt_version_get_save(info, child, 1);
+               /* save versions in reply */
+               mdt_version_get_save(info, parent, 0);
+               mdt_version_get_save(info, child, 1);
 
-                /* version of child will be changed */
-                info->mti_mos = child;
+               /* version of child will be changed */
+               tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
 
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
@@ -1738,12 +1755,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 info->mti_spec.sp_cr_lookup = 0;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
-                result = mdo_create(info->mti_env,
-                                    mdt_object_child(parent),
-                                    lname,
-                                    mdt_object_child(child),
-                                    &info->mti_spec,
-                                    &info->mti_attr);
+               result = mdo_create(info->mti_env,
+                                   mdt_object_child(parent),
+                                   &rr->rr_name,
+                                   mdt_object_child(child),
+                                   &info->mti_spec,
+                                   &info->mti_attr);
                 if (result == -ERESTART) {
                         mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                         GOTO(out_child, result);
@@ -1799,30 +1816,38 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        else
                                result = -EREMOTE;
                         GOTO(out_child, result);
+               } else if (mdt_object_exists(child)) {
+                       /* We have to get attr & LOV EA & HSM for this
+                        * object. */
+                       ma->ma_need |= MA_HSM;
+                       result = mdt_attr_get_complex(info, child, ma);
                } else {
-                       if (mdt_object_exists(child)) {
-                               /* We have to get attr & LOV EA & HSM for this
-                                * object */
-                               ma->ma_need |= MA_HSM;
-                               result = mdt_attr_get_complex(info, child, ma);
-                       } else {
-                               /*object non-exist!!!*/
-                               LBUG();
-                       }
+                       /* Object does not exist. Likely FS corruption. */
+                       CERROR("%s: name '"DNAME"' present, but FID "
+                              DFID" is invalid\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PNAME(&rr->rr_name), PFID(child_fid));
+                       GOTO(out_child, result = -EIO);
                }
         }
 
-        LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
-
-       /* get openlock if this is not replay and if a client requested it */
-       if (!req_is_replay(req)) {
-               rc = mdt_object_open_lock(info, child, lhc, &ibits);
-               if (rc != 0)
-                       GOTO(out_child_unlock, result = rc);
-               else if (create_flags & MDS_OPEN_LOCK)
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               /* the open lock might already be gotten in
+                * mdt_intent_fixup_resent */
+               LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+               if (create_flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+       } else {
+               /* get openlock if this isn't replay and client requested it */
+               if (!req_is_replay(req)) {
+                       rc = mdt_object_open_lock(info, child, lhc, &ibits);
+                       if (rc != 0)
+                               GOTO(out_child_unlock, result = rc);
+                       else if (create_flags & MDS_OPEN_LOCK)
+                               mdt_set_disposition(info, ldlm_rep,
+                                                   DISP_OPEN_LOCK);
+               }
        }
-
        /* Try to open it now. */
        rc = mdt_finish_open(info, parent, child, create_flags,
                             created, ldlm_rep);
@@ -1846,7 +1871,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        rc = mdo_unlink(info->mti_env,
                                        mdt_object_child(parent),
                                        mdt_object_child(child),
-                                       lname,
+                                       &rr->rr_name,
                                        &info->mti_attr, 0);
                        if (rc != 0)
                                CERROR("%s: "DFID" cleanup of open: rc = %d\n",
@@ -1881,8 +1906,10 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
        struct lu_fid *local_root_fid = &info->mti_tmp_fid1;
        struct mdt_object *obj = NULL;
        struct mdt_object *local_root;
-       static const char name[] = "i_am_nobody";
-       struct lu_name *lname;
+       static const struct lu_name lname = {
+               .ln_name = "i_am_nobody",
+               .ln_namelen = sizeof("i_am_nobody") - 1,
+       };
        struct lu_ucred *uc;
        cfs_cap_t uc_cap_save;
        int rc;
@@ -1912,12 +1939,10 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
                spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
        }
 
-       lname = mdt_name(env, (char *)name, sizeof(name) - 1);
-
        uc = lu_ucred(env);
        uc_cap_save = uc->uc_cap;
        uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
-       rc = mdo_create(env, mdt_object_child(local_root), lname,
+       rc = mdo_create(env, mdt_object_child(local_root), &lname,
                        mdt_object_child(obj), spec, attr);
        uc->uc_cap = uc_cap_save;
        if (rc < 0) {