Whamcloud - gitweb
LU-3866 hsm: permission checks on HSM operations
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 28eb0ca..76ed060 100644 (file)
@@ -77,7 +77,8 @@ struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
 /*
  * Find the mfd pointed to by handle in global hash table.
  * In case of replay the handle is obsoleted
- * but mfd can be found in mfd list by that handle
+ * but mfd can be found in mfd list by that handle.
+ * Callers need to be holding med_open_lock.
  */
 struct mdt_file_data *mdt_handle2mfd(struct mdt_export_data *med,
                                     const struct lustre_handle *handle,
@@ -457,7 +458,7 @@ static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
                 RETURN(mdt_ioepoch_close_on_eviction(info, o));
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
                 RETURN(mdt_ioepoch_close_on_replay(info, o));
-        if (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)
+       if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_EPOCH_CLOSE))
                 RETURN(mdt_ioepoch_close_reg(info, o));
         /* IO epoch is not closed. */
         RETURN(MDT_IOEPOCH_OPENED);
@@ -503,7 +504,8 @@ int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
                 ioepoch =  info->mti_ioepoch ?
                         info->mti_ioepoch->ioepoch : o->mot_ioepoch;
 
-                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
+               if (req != NULL
+                   && !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
                         rc = mdt_som_attr_set(info, o, ioepoch, act);
                 mdt_object_som_enable(o, ioepoch);
         }
@@ -761,13 +763,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 * restart replay, so there maybe some orphan
                 * mfd here, we should remove them */
                LASSERT(info->mti_rr.rr_handle != NULL);
+               spin_lock(&med->med_open_lock);
                old_mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle, true);
                if (old_mfd != NULL) {
                        CDEBUG(D_HA, "delete orphan mfd = %p, fid = "DFID", "
                               "cookie = "LPX64"\n", mfd,
                               PFID(mdt_object_fid(mfd->mfd_object)),
                               info->mti_rr.rr_handle->cookie);
-                       spin_lock(&med->med_open_lock);
                        class_handle_unhash(&old_mfd->mfd_handle);
                        cfs_list_del_init(&old_mfd->mfd_list);
                        spin_unlock(&med->med_open_lock);
@@ -778,6 +780,12 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                        mdt_mfd_close(info, old_mfd);
                        ma->ma_attr_flags &= ~MDS_RECOV_OPEN;
                        ma->ma_valid &= ~MA_FLAGS;
+               } else {
+                       spin_unlock(&med->med_open_lock);
+                       CDEBUG(D_HA, "orphan mfd not found, fid = "DFID", "
+                              "cookie = "LPX64"\n",
+                              PFID(mdt_object_fid(mfd->mfd_object)),
+                              info->mti_rr.rr_handle->cookie);
                }
 
                CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
@@ -1003,7 +1011,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct lsd_client_data  *lcd  = ted->ted_lcd;
         struct md_attr          *ma   = &info->mti_attr;
         struct mdt_reint_record *rr   = &info->mti_rr;
-        __u32                   flags = info->mti_spec.sp_cr_flags;
+       __u64                   flags = info->mti_spec.sp_cr_flags;
         struct ldlm_reply       *ldlm_rep;
         struct mdt_object       *parent;
         struct mdt_object       *child;
@@ -1109,10 +1117,9 @@ out:
         LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
 }
 
-int mdt_open_by_fid(struct mdt_thread_info* info,
-                    struct ldlm_reply *rep)
+int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
 {
-        __u32                    flags = info->mti_spec.sp_cr_flags;
+       __u64                    flags = info->mti_spec.sp_cr_flags;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct md_attr          *ma = &info->mti_attr;
         struct mdt_object       *o;
@@ -1378,7 +1385,7 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
 /**
  * Check release is permitted for the current HSM flags.
  */
-static bool mdt_hsm_release_allow(struct md_attr *ma)
+static bool mdt_hsm_release_allow(const struct md_attr *ma)
 {
        if (!(ma->ma_valid & MA_HSM))
                return false;
@@ -1563,7 +1570,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
         struct md_attr          *ma = &info->mti_attr;
         __u64                    create_flags = info->mti_spec.sp_cr_flags;
-       __u64                    ibits;
+       __u64                    ibits = 0;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct lu_name          *lname;
         int                      result, rc;
@@ -1696,6 +1703,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 }
                 if (!(create_flags & MDS_OPEN_CREAT))
                         GOTO(out_parent, result);
+               if (exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+                       GOTO(out_parent, result = -EROFS);
                 *child_fid = *info->mti_rr.rr_fid2;
                 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
                          PFID(child_fid));
@@ -1886,19 +1895,21 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
 {
        const struct lu_env *env = info->mti_env;
        struct md_op_spec *spec = &info->mti_spec;
-       struct lu_fid *rootfid = &info->mti_tmp_fid1;
+       struct lu_fid *local_root_fid = &info->mti_tmp_fid1;
        struct mdt_object *obj = NULL;
        struct mdt_object *local_root;
        static const char name[] = "i_am_nobody";
        struct lu_name *lname;
+       struct lu_ucred *uc;
+       cfs_cap_t uc_cap_save;
        int rc;
        ENTRY;
 
-       rc = dt_root_get(env, mdt->mdt_bottom, rootfid);
+       rc = dt_root_get(env, mdt->mdt_bottom, local_root_fid);
        if (rc != 0)
                RETURN(ERR_PTR(rc));
 
-       local_root = mdt_object_find(env, mdt, rootfid);
+       local_root = mdt_object_find(env, mdt, local_root_fid);
        if (IS_ERR(local_root))
                RETURN(local_root);
 
@@ -1919,17 +1930,26 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
        }
 
        lname = mdt_name(env, (char *)name, sizeof(name) - 1);
+
+       uc = lu_ucred(env);
+       uc_cap_save = uc->uc_cap;
+       uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
        rc = mdo_create(env, mdt_object_child(local_root), lname,
                        mdt_object_child(obj), spec, attr);
-       if (rc == 0) {
-               rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
-               if (rc < 0)
-                       CERROR("%s: cannot open volatile file "DFID", orphan "
-                              "file will be left in PENDING directory until "
-                              "next reboot, rc = %d\n", mdt_obd_name(mdt),
-                              PFID(fid), rc);
+       uc->uc_cap = uc_cap_save;
+       if (rc < 0) {
+               CERROR("%s: cannot create volatile file "DFID": rc = %d\n",
+                      mdt_obd_name(mdt), PFID(fid), rc);
+               GOTO(out, rc);
        }
-       EXIT;
+
+       rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
+       if (rc < 0)
+               CERROR("%s: cannot open volatile file "DFID", orphan "
+                      "file will be left in PENDING directory until "
+                      "next reboot, rc = %d\n", mdt_obd_name(mdt),
+                      PFID(fid), rc);
+       GOTO(out, rc);
 
 out:
        if (rc < 0) {
@@ -1955,6 +1975,9 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        int                     rc2;
        ENTRY;
 
+       if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
+
        data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
        if (data == NULL)
                RETURN(-EPROTO);
@@ -1966,8 +1989,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        /* try to hold open_sem so that nobody else can open the file */
        if (!down_write_trylock(&o->mot_open_sem)) {
                ldlm_lock_cancel(lease);
-               LDLM_LOCK_PUT(lease);
-               RETURN(-EBUSY);
+               GOTO(out_reprocess, rc = -EBUSY);
        }
 
        /* Check if the lease open lease has already canceled */
@@ -1982,7 +2004,6 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
         * have been cancelled. It's okay to cancel it now as we've
         * held mot_open_sem. */
        ldlm_lock_cancel(lease);
-       LDLM_LOCK_PUT(lease);
 
        if (lease_broken) /* don't perform release task */
                GOTO(out_unlock, rc = -ESTALE);
@@ -1992,7 +2013,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
 
        /* ma_need was set before but it seems fine to change it in order to
         * avoid modifying the one from RPC */
-       ma->ma_need = MA_HSM | MA_LOV;
+       ma->ma_need = MA_HSM;
        rc = mdt_attr_get_complex(info, o, ma);
        if (rc != 0)
                GOTO(out_unlock, rc);
@@ -2009,24 +2030,27 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
                CDEBUG(D_HSM, DFID" data_version mismatches: packed="LPU64
                       " and on-disk="LPU64"\n", PFID(mdt_object_fid(o)),
                       data->cd_data_version, ma->ma_hsm.mh_arch_ver);
-               /* XXX: Enable this line when hsm_archive is operational!
                GOTO(out_unlock, rc = -EPERM);
-               */
        }
 
        ma->ma_valid = MA_INODE;
-       ma->ma_attr.la_valid &= LA_SIZE | LA_MTIME | LA_ATIME;
+       ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME | LA_SIZE;
        rc = mo_attr_set(info->mti_env, mdt_object_child(o), ma);
        if (rc < 0)
                GOTO(out_unlock, rc);
 
+       ma->ma_need = MA_INODE | MA_LOV;
+       rc = mdt_attr_get_complex(info, o, ma);
+       if (rc < 0)
+               GOTO(out_unlock, rc);
+
        if (!(ma->ma_valid & MA_LOV)) {
                /* Even empty file are released */
                memset(ma->ma_lmm, 0, sizeof(*ma->ma_lmm));
                ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
                ma->ma_lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
                ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE);
-               ma->ma_valid |= MA_LOV;
+               ma->ma_lmm_size = sizeof(*ma->ma_lmm);
        } else {
                /* Magic must be LOV_MAGIC_Vx_DEF otherwise LOD will interpret
                 * ma_lmm as lov_user_md, then it will be confused by union of
@@ -2044,11 +2068,13 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
 
        /* Hopefully it's not used in this call path */
        orp_ma = &info->mti_u.som.attr;
-       orp_ma->ma_valid = MA_INODE | MA_LOV;
-       orp_ma->ma_attr.la_mode = S_IFREG;
-       orp_ma->ma_attr.la_valid = LA_MODE;
+       orp_ma->ma_attr.la_mode = S_IFREG | S_IWUSR;
+       orp_ma->ma_attr.la_uid = ma->ma_attr.la_uid;
+       orp_ma->ma_attr.la_gid = ma->ma_attr.la_gid;
+       orp_ma->ma_attr.la_valid = LA_MODE | LA_UID | LA_GID;
        orp_ma->ma_lmm = ma->ma_lmm;
        orp_ma->ma_lmm_size = ma->ma_lmm_size;
+       orp_ma->ma_valid = MA_INODE | MA_LOV;
        orphan = mdt_orphan_open(info, info->mti_mdt, &data->cd_fid, orp_ma,
                                 FMODE_WRITE);
        if (IS_ERR(orphan)) {
@@ -2066,22 +2092,25 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        ma->ma_hsm.mh_flags |= HS_RELEASED;
        lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
        ma->ma_hsm.mh_flags &= ~HS_RELEASED;
+
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
+                            MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
+       if (rc != 0)
+               GOTO(out_close, rc);
+
        rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
                          XATTR_NAME_HSM, 0);
-       if (rc < 0)
-               GOTO(out_close, rc);
 
-       mdt_lock_reg_init(lh, LCK_EX);
-       rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT, MDT_LOCAL_LOCK);
-       if (rc == 0) {
+       if (rc == 0)
                /* Swap layout with orphan object */
                rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
                                     mdt_object_child(orphan),
                                     SWAP_LAYOUTS_MDS_HSM);
 
-               /* Release exclusive LL */
-               mdt_object_unlock(info, o, lh, 1);
-       }
+       /* Release exclusive LL */
+       mdt_object_unlock(info, o, lh, 1);
+
        EXIT;
 
 out_close:
@@ -2105,8 +2134,13 @@ out_unlock:
                repbody->valid |= OBD_MD_FLRELEASED;
        }
 
+out_reprocess:
+       ldlm_reprocess_all(lease->l_resource);
+       LDLM_LOCK_PUT(lease);
+
        ma->ma_valid = 0;
        ma->ma_need = 0;
+
        return rc;
 }