Whamcloud - gitweb
LU-2927 mdt: Not return -EREMOTE for open lock.
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 0e95251..e1717dd 100644 (file)
@@ -108,29 +108,34 @@ void mdt_mfd_free(struct mdt_file_data *mfd)
 static int mdt_create_data(struct mdt_thread_info *info,
                            struct mdt_object *p, struct mdt_object *o)
 {
-        struct md_op_spec     *spec = &info->mti_spec;
-        struct md_attr        *ma   = &info->mti_attr;
-        int                    rc   = 0;
-        ENTRY;
+       struct md_op_spec     *spec = &info->mti_spec;
+       struct md_attr        *ma   = &info->mti_attr;
+       int                    rc   = 0;
+       ENTRY;
 
-        if (!md_should_create(spec->sp_cr_flags))
-                RETURN(0);
+       if (!md_should_create(spec->sp_cr_flags))
+               RETURN(0);
 
-        ma->ma_need = MA_INODE | MA_LOV;
-        ma->ma_valid = 0;
+       ma->ma_need = MA_INODE | MA_LOV;
+       ma->ma_valid = 0;
        mutex_lock(&o->mot_lov_mutex);
-        if (!(o->mot_flags & MOF_LOV_CREATED)) {
-                rc = mdo_create_data(info->mti_env,
-                                     p ? mdt_object_child(p) : NULL,
-                                     mdt_object_child(o), spec, ma);
+       if (!(o->mot_flags & MOF_LOV_CREATED)) {
+               if (p != NULL && (fid_is_obf(mdt_object_fid(p)) ||
+                                 fid_is_dot_lustre(mdt_object_fid(p))))
+                       GOTO(unlock, rc = -EPERM);
+
+               rc = mdo_create_data(info->mti_env,
+                                    p ? mdt_object_child(p) : NULL,
+                                    mdt_object_child(o), spec, ma);
                if (rc == 0)
                        rc = mdt_attr_get_complex(info, o, ma);
 
-                if (rc == 0 && ma->ma_valid & MA_LOV)
-                        o->mot_flags |= MOF_LOV_CREATED;
-        }
+               if (rc == 0 && ma->ma_valid & MA_LOV)
+                       o->mot_flags |= MOF_LOV_CREATED;
+       }
+unlock:
        mutex_unlock(&o->mot_lov_mutex);
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_ioepoch_opened(struct mdt_object *mo)
@@ -833,9 +838,13 @@ int mdt_finish_open(struct mdt_thread_info *info,
         * that looks like it was actually almost succesful and a failure at the
         * same time */
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_NEGATIVE_POSITIVE)) {
-               mdt_set_disposition(info, rep, DISP_OPEN_LOCK | \
-                                   DISP_OPEN_OPEN | DISP_LOOKUP_NEG | \
-                                   DISP_LOOKUP_POS);
+               mdt_set_disposition(info, rep, DISP_OPEN_OPEN |
+                                              DISP_LOOKUP_NEG |
+                                              DISP_LOOKUP_POS);
+
+               if (flags & MDS_OPEN_LOCK)
+                       mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+
                RETURN(-ENOENT);
        }
 
@@ -1051,6 +1060,12 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
 
                if (unlikely(mdt_object_remote(child))) {
                        /* the child object was created on remote server */
+                       if (!mdt_is_dne_client(exp)) {
+                               /* Return -EIO for old client */
+                               mdt_object_put(env, parent);
+                               mdt_object_put(env, child);
+                               GOTO(out, rc = -EIO);
+                       }
                        repbody->fid1 = *rr->rr_fid2;
                        repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
                        rc = 0;
@@ -1241,8 +1256,14 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
                 * if open or layout lock is granted. */
                rc = 1;
        }
-       if (rc != 0)
+
+       if (rc != 0) {
+               struct ldlm_reply       *ldlm_rep;
+
+               ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+               mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
                mdt_object_unlock(info, obj, lhc, 1);
+       }
 }
 
 int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
@@ -1284,8 +1305,10 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
                       PFID(rr->rr_fid2));
                GOTO(out, rc = -EREMOTE);
        } else if (!mdt_object_exists(o)) {
-               mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
-                                   DISP_LOOKUP_NEG));
+               mdt_set_disposition(info, rep,
+                                   DISP_IT_EXECD |
+                                   DISP_LOOKUP_EXECD |
+                                   DISP_LOOKUP_NEG);
                GOTO(out, rc = -ENOENT);
        }
 
@@ -1333,9 +1356,10 @@ int mdt_pin(struct mdt_thread_info* info)
 }
 
 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
-int mdt_cross_open(struct mdt_thread_info* info,
-                   const struct lu_fid *fid,
-                   struct ldlm_reply *rep, __u32 flags)
+static int mdt_cross_open(struct mdt_thread_info *info,
+                         const struct lu_fid *parent_fid,
+                         const struct lu_fid *fid,
+                         struct ldlm_reply *rep, __u32 flags)
 {
         struct md_attr    *ma = &info->mti_attr;
         struct mdt_object *o;
@@ -1365,9 +1389,17 @@ int mdt_cross_open(struct mdt_thread_info* info,
 
                        mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
                        rc = mdt_attr_get_complex(info, o, ma);
-                       if (rc == 0)
-                               rc = mdt_finish_open(info, NULL, o, flags, 0,
-                                                    rep);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       /* Do not create lov object if the fid is opened
+                        * under OBF */
+                       if (S_ISREG(ma->ma_attr.la_mode) &&
+                           !(ma->ma_valid & MA_LOV) && (flags & FMODE_WRITE) &&
+                           fid_is_obf(parent_fid))
+                               GOTO(out, rc = -EPERM);
+
+                       rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
                } else {
                        /*
                         * Something is wrong here. lookup was positive but
@@ -1441,8 +1473,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                mdt_set_disposition(info, ldlm_rep,
                            (DISP_IT_EXECD | DISP_LOOKUP_EXECD |
                             DISP_LOOKUP_POS));
-               result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
-                                       create_flags);
+               result = mdt_cross_open(info, rr->rr_fid2, rr->rr_fid1,
+                                       ldlm_rep, create_flags);
                GOTO(out, result);
        } else if (req_is_replay(req) ||
            (req->rq_export->exp_libclient && create_flags & MDS_OPEN_HAS_EA)) {
@@ -1466,11 +1498,21 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        } else if ((rr->rr_namelen == 0 && create_flags & MDS_OPEN_LOCK) ||
                   (create_flags & MDS_OPEN_BY_FID)) {
                result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
-               if ((result != -ENOENT && !(create_flags & MDS_OPEN_CREAT)) &&
-                    result != -EREMOTE)
+               /* If result is 0 then open by FID has found the file
+                * and there is nothing left for us to do here.  More
+                * generally if it is anything other than -ENOENT or
+                * -EREMOTE then we return that now.  If -ENOENT and
+                * MDS_OPEN_CREAT is set then we must create the file
+                * below.  If -EREMOTE then we need to return a LOOKUP
+                * lock to the client, which we do below.  Hence this
+                * odd looking condition.  See LU-2523. */
+               if (!(result == -ENOENT && (create_flags & MDS_OPEN_CREAT)) &&
+                   result != -EREMOTE)
                        GOTO(out, result);
+
                if (unlikely(rr->rr_namelen == 0))
                        GOTO(out, result = -EINVAL);
+
                CDEBUG(D_INFO, "No object(2), continue as regular open.\n");
        }
 
@@ -1543,8 +1585,9 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
-                if (mdt_object_obf(parent))
-                        GOTO(out_child, result = -EPERM);
+               /* Create under OBF and .lustre is not permitted */
+               if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+                       GOTO(out_child, result = -EPERM);
 
                 /* save versions in reply */
                 mdt_version_get_save(info, parent, 0);
@@ -1646,35 +1689,35 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        /* get openlock if this is not replay and if a client requested it */
        if (!req_is_replay(req)) {
                rc = mdt_object_open_lock(info, child, lhc, &ibits);
-               if (rc != 0) {
+               if (rc != 0)
                        GOTO(out_child, result = rc);
-               } else if (create_flags & MDS_OPEN_LOCK) {
-                       result = -EREMOTE;
+               else if (create_flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-               }
        }
 
-        /* Try to open it now. */
-        rc = mdt_finish_open(info, parent, child, create_flags,
-                             created, ldlm_rep);
-        if (rc) {
-                result = rc;
+       /* Try to open it now. */
+       rc = mdt_finish_open(info, parent, child, create_flags,
+                            created, ldlm_rep);
+       if (rc) {
+               result = rc;
                /* openlock will be released if mdt_finish_open failed */
                mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-                if (created) {
-                        ma->ma_need = 0;
-                        ma->ma_valid = 0;
-                        ma->ma_cookie_size = 0;
-                        rc = mdo_unlink(info->mti_env,
-                                        mdt_object_child(parent),
-                                        mdt_object_child(child),
-                                        lname,
-                                        &info->mti_attr);
-                        if (rc != 0)
-                                CERROR("Error in cleanup of open\n");
+               if (created) {
+                       ma->ma_need = 0;
+                       ma->ma_valid = 0;
+                       ma->ma_cookie_size = 0;
+                       rc = mdo_unlink(info->mti_env,
+                                       mdt_object_child(parent),
+                                       mdt_object_child(child),
+                                       lname,
+                                       &info->mti_attr, 0);
+                       if (rc != 0)
+                               CERROR("%s: "DFID" cleanup of open: rc = %d\n",
+                                      mdt_obd_name(info->mti_mdt),
+                                      PFID(mdt_object_fid(child)), rc);
                        mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
-                }
-        }
+               }
+       }
         EXIT;
 out_child:
        mdt_object_open_unlock(info, child, lhc, ibits, result);
@@ -1682,9 +1725,9 @@ out_child:
 out_parent:
         mdt_object_unlock_put(info, parent, lh, result || !created);
 out:
-        if (result && result != -EREMOTE)
-                lustre_msg_set_transno(req->rq_repmsg, 0);
-        return result;
+       if (result)
+               lustre_msg_set_transno(req->rq_repmsg, 0);
+       return result;
 }
 
 #define MFD_CLOSED(mode) (((mode) & ~(MDS_FMODE_EPOCH | MDS_FMODE_SOM | \