Whamcloud - gitweb
LU-3677 mdt: Set HSM dirty open-for-write file when evicted.
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index f408eba..a967c44 100644 (file)
@@ -81,7 +81,7 @@ struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
  */
 struct mdt_file_data *mdt_handle2mfd(struct mdt_export_data *med,
                                     const struct lustre_handle *handle,
-                                    bool is_replay)
+                                    bool is_replay_or_resent)
 {
        struct mdt_file_data   *mfd;
        ENTRY;
@@ -89,7 +89,7 @@ struct mdt_file_data *mdt_handle2mfd(struct mdt_export_data *med,
        LASSERT(handle != NULL);
        mfd = class_handle2object(handle->cookie, med);
        /* during dw/setattr replay the mfd can be found by old handle */
-       if (mfd == NULL && is_replay) {
+       if (mfd == NULL && is_replay_or_resent) {
                cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
                        if (mfd->mfd_old_handle.cookie == handle->cookie)
                                RETURN(mfd);
@@ -651,14 +651,14 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
         EXIT;
 }
 
-void mdt_mfd_set_mode(struct mdt_file_data *mfd, int mode)
+void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
 {
-        LASSERT(mfd != NULL);
+       LASSERT(mfd != NULL);
 
-        CDEBUG(D_HA, "Change mfd %p mode 0x%x->0x%x\n",
-               mfd, (unsigned int)mfd->mfd_mode, (unsigned int)mode);
+       CDEBUG(D_HA, DFID " Change mfd mode "LPO64" -> "LPO64".\n",
+              PFID(mdt_object_fid(mfd->mfd_object)), mfd->mfd_mode, mode);
 
-        mfd->mfd_mode = mode;
+       mfd->mfd_mode = mode;
 }
 
 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
@@ -677,7 +677,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
 
         isreg = S_ISREG(la->la_mode);
         isdir = S_ISDIR(la->la_mode);
-        if (isreg && !(ma->ma_valid & MA_LOV)) {
+       if (isreg && !(ma->ma_valid & MA_LOV) && !(flags & MDS_OPEN_RELEASE)) {
                 /*
                  * No EA, check whether it is will set regEA and dirEA since in
                  * above attr get, these size might be zero, so reset it, to
@@ -736,6 +736,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
         * released by mdt_mfd_close().
         */
        mdt_object_get(info->mti_env, o);
+       mfd->mfd_object = o;
+       mfd->mfd_xid = req->rq_xid;
 
        /*
         * @flags is always not zero. At least it should be FMODE_READ,
@@ -746,8 +748,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
        /* Open handling. */
        mdt_mfd_set_mode(mfd, flags);
 
-       mfd->mfd_object = o;
-       mfd->mfd_xid = req->rq_xid;
+       atomic_inc(&o->mot_open_count);
+       if (flags & MDS_OPEN_LEASE)
+               atomic_inc(&o->mot_lease_count);
 
        /* replay handle */
        if (req_is_replay(req)) {
@@ -1159,26 +1162,17 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
        struct md_attr  *ma = &info->mti_attr;
        __u64            open_flags = info->mti_spec.sp_cr_flags;
        ldlm_mode_t      lm = LCK_CR;
+       bool             acq_lease = !!(open_flags & MDS_OPEN_LEASE);
        bool             try_layout = false;
        bool             create_layout = false;
        int              rc = 0;
        ENTRY;
 
        *ibits = 0;
-       if (open_flags & MDS_OPEN_LOCK) {
-               if (open_flags & FMODE_WRITE)
-                       lm = LCK_CW;
-               /* if file is released, we can't deny write because we must
-                * restore (write) it to access it. */
-               else if ((open_flags & MDS_FMODE_EXEC) &&
-                        !((ma->ma_valid & MA_HSM) &&
-                          (ma->ma_hsm.mh_flags & HS_RELEASED)))
-                       lm = LCK_PR;
-               else
-                       lm = LCK_CR;
+       mdt_lock_handle_init(lhc);
 
-               *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
-       }
+       if (req_is_replay(mdt_info_req(info)))
+               RETURN(0);
 
        if (S_ISREG(lu_object_attr(&obj->mot_obj))) {
                if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
@@ -1189,7 +1183,65 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        try_layout = true;
        }
 
-       mdt_lock_handle_init(lhc);
+       if (acq_lease) {
+               /* lease open, acquire write mode of open sem */
+               down_write(&obj->mot_open_sem);
+
+               /* Lease exists and ask for new lease */
+               if (atomic_read(&obj->mot_lease_count) > 0) {
+                       /* only exclusive open is supported, so lease
+                        * are conflicted to each other */
+                       GOTO(out, rc = -EBUSY);
+               }
+
+               /* Lease must be with open lock */
+               if (!(open_flags & MDS_OPEN_LOCK)) {
+                       CERROR("Request lease for file:"DFID ", but open lock "
+                               "is missed, open_flags = "LPO64".\n",
+                               PFID(mdt_object_fid(obj)), open_flags);
+                       GOTO(out, rc = -EPROTO);
+               }
+
+               /* XXX: only exclusive open is supported. */
+               lm = LCK_EX;
+               *ibits = MDS_INODELOCK_OPEN;
+
+               /* never grant LCK_EX layout lock to client */
+               try_layout = false;
+       } else { /* normal open */
+               /* normal open holds read mode of open sem */
+               down_read(&obj->mot_open_sem);
+
+               if (open_flags & MDS_OPEN_LOCK) {
+                       if (open_flags & FMODE_WRITE)
+                               lm = LCK_CW;
+                       /* if file is released, we can't deny write because we must
+                        * restore (write) it to access it. */
+                       else if ((open_flags & MDS_FMODE_EXEC) &&
+                                !((ma->ma_valid & MA_HSM) &&
+                                  (ma->ma_hsm.mh_flags & HS_RELEASED)))
+                               lm = LCK_PR;
+                       else
+                               lm = LCK_CR;
+
+                       *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
+               } else if (atomic_read(&obj->mot_lease_count) > 0) {
+                       if (open_flags & FMODE_WRITE)
+                               lm = LCK_CW;
+                       else
+                               lm = LCK_CR;
+
+                       /* revoke lease */
+                       *ibits = MDS_INODELOCK_OPEN;
+                       try_layout = false;
+
+                       lhc = &info->mti_lh[MDT_LH_LOCAL];
+               }
+               CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
+                       PFID(mdt_object_fid(obj)),
+                       atomic_read(&obj->mot_open_count), lm);
+       }
+
        mdt_lock_reg_init(lhc, lm);
 
        /* one problem to return layout lock on open is that it may result
@@ -1233,6 +1285,44 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
        }
 
+       /* Check if there is any other open handles after acquiring
+        * open lock. At this point, caching open handles have been revoked
+        * by open lock.
+        * XXX: Now only exclusive open is supported. Need to check the
+        * type of open for generic lease support. */
+       if (rc == 0 && acq_lease) {
+               struct ptlrpc_request *req = mdt_info_req(info);
+               struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+               struct mdt_file_data *mfd;
+               bool is_replay_or_resent;
+               int open_count = 0;
+
+               /* For lease: application can open a file and then apply lease,
+                * @handle contains original open handle in that case.
+                * In recovery, open REQ will be replayed and the lease REQ may
+                * be resent that means the open handle is already stale, so we
+                * need to fix it up here by finding new handle. */
+               is_replay_or_resent = req_is_replay(req) ||
+                       lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT;
+
+               /* if the request is _not_ a replay request, rr_handle
+                * may be used to hold an openhandle which is issuing the
+                * lease request, so that this openhandle doesn't count. */
+               mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle,
+                                    is_replay_or_resent);
+               if (mfd != NULL)
+                       ++open_count;
+
+               CDEBUG(D_INODE, "acq_lease "DFID": openers: %d, want: %d\n",
+                       PFID(mdt_object_fid(obj)),
+                       atomic_read(&obj->mot_open_count), open_count);
+
+               if (atomic_read(&obj->mot_open_count) > open_count)
+                       GOTO(out, rc = -EBUSY);
+       }
+       GOTO(out, rc);
+
+out:
        RETURN(rc);
 }
 
@@ -1242,18 +1332,32 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
                                   __u64 ibits, int rc)
 {
        __u64 open_flags = info->mti_spec.sp_cr_flags;
-       struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+       struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LOCAL];
+       ENTRY;
 
-       /* Release local layout lock - the layout lock put in MDT_LH_LAYOUT
-        * will never return to client side. */
+       if (req_is_replay(mdt_info_req(info)))
+               RETURN_EXIT;
+
+       /* Release local lock - the lock put in MDT_LH_LOCAL will never
+        * return to client side. */
+       if (lustre_handle_is_used(&ll->mlh_reg_lh))
+               mdt_object_unlock(info, obj, ll, 1);
+
+       ll = &info->mti_lh[MDT_LH_LAYOUT];
+       /* Release local layout lock, layout was created */
        if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
                LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
                mdt_object_unlock(info, obj, ll, 1);
        }
 
+       if (open_flags & MDS_OPEN_LEASE)
+               up_write(&obj->mot_open_sem);
+       else
+               up_read(&obj->mot_open_sem);
+
        /* Cross-ref case, the lock should be returned to the client */
        if (ibits == 0 || rc == -EREMOTE)
-               return;
+               RETURN_EXIT;
 
        if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
                /* for the open request, the lock will only return to client
@@ -1268,6 +1372,24 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
                mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
                mdt_object_unlock(info, obj, lhc, 1);
        }
+       RETURN_EXIT;
+}
+
+/**
+ * Check release is permitted for the current HSM flags.
+ */
+static bool mdt_hsm_release_allow(struct md_attr *ma)
+{
+       if (!(ma->ma_valid & MA_HSM))
+               return false;
+
+       if (ma->ma_hsm.mh_flags & (HS_DIRTY|HS_NORELEASE|HS_LOST))
+               return false;
+
+       if (!(ma->ma_hsm.mh_flags & HS_ARCHIVED))
+               return false;
+
+       return true;
 }
 
 int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
@@ -1275,7 +1397,7 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
 {
         const struct lu_env     *env   = info->mti_env;
         struct mdt_device       *mdt   = info->mti_mdt;
-        __u32                    flags = info->mti_spec.sp_cr_flags;
+        __u64                    flags = info->mti_spec.sp_cr_flags;
         struct mdt_reint_record *rr    = &info->mti_rr;
         struct md_attr          *ma    = &info->mti_attr;
         struct mdt_object       *parent= NULL;
@@ -1318,13 +1440,20 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
 
        mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
+       if (flags & MDS_OPEN_RELEASE)
+               ma->ma_need |= MA_HSM;
        rc = mdt_attr_get_complex(info, o, ma);
-        if (rc)
-                GOTO(out, rc);
+       if (rc)
+               GOTO(out, rc);
+
+       /* If a release request, check file flags are fine and ask for an
+        * exclusive open access. */
+       if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
+               GOTO(out, rc = -EPERM);
 
        rc = mdt_object_open_lock(info, o, lhc, &ibits);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
         if (ma->ma_valid & MA_PFID) {
                 parent = mdt_object_find(env, mdt, &ma->ma_pfid);
@@ -1342,15 +1471,18 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
                mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
                if (flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+               if (flags & MDS_OPEN_LEASE)
+                       mdt_set_disposition(info, rep, DISP_OPEN_LEASE);
        }
-        GOTO(out, rc);
+       GOTO(out_unlock, rc);
 
-out:
+out_unlock:
        mdt_object_open_unlock(info, o, lhc, ibits, rc);
-        mdt_object_put(env, o);
-        if (parent != NULL)
-                mdt_object_put(env, parent);
-        return rc;
+out:
+       mdt_object_put(env, o);
+       if (parent != NULL)
+               mdt_object_put(env, parent);
+       return rc;
 }
 
 int mdt_pin(struct mdt_thread_info* info)
@@ -1694,7 +1826,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        if (!req_is_replay(req)) {
                rc = mdt_object_open_lock(info, child, lhc, &ibits);
                if (rc != 0)
-                       GOTO(out_child, result = rc);
+                       GOTO(out_child_unlock, result = rc);
                else if (create_flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
        }
@@ -1706,6 +1838,15 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                result = rc;
                /* openlock will be released if mdt_finish_open failed */
                mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+
+               if (created && create_flags & MDS_OPEN_VOLATILE) {
+                       CERROR("%s: cannot open volatile file "DFID", orphan "
+                              "file will be left in PENDING directory until "
+                              "next reboot, rc = %d\n", mdt_obd_name(mdt),
+                              PFID(mdt_object_fid(child)), rc);
+                       GOTO(out_child_unlock, result);
+               }
+
                if (created) {
                        ma->ma_need = 0;
                        ma->ma_valid = 0;
@@ -1722,18 +1863,253 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                }
        }
-        EXIT;
-out_child:
+       EXIT;
+out_child_unlock:
        mdt_object_open_unlock(info, child, lhc, ibits, result);
-        mdt_object_put(info->mti_env, child);
+out_child:
+       mdt_object_put(info->mti_env, child);
 out_parent:
-        mdt_object_unlock_put(info, parent, lh, result || !created);
+       mdt_object_unlock_put(info, parent, lh, result || !created);
 out:
        if (result)
                lustre_msg_set_transno(req->rq_repmsg, 0);
        return result;
 }
 
+/**
+ * Create an orphan object use local root.
+ */
+static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
+                                         struct mdt_device *mdt,
+                                         const struct lu_fid *fid,
+                                         struct md_attr *attr, fmode_t fmode)
+{
+       const struct lu_env *env = info->mti_env;
+       struct md_op_spec *spec = &info->mti_spec;
+       struct lu_fid *rootfid = &info->mti_tmp_fid1;
+       struct mdt_object *obj = NULL;
+       struct mdt_object *local_root;
+       static const char name[] = "i_am_nobody";
+       struct lu_name *lname;
+       int rc;
+       ENTRY;
+
+       rc = dt_root_get(env, mdt->mdt_bottom, rootfid);
+       if (rc != 0)
+               RETURN(ERR_PTR(rc));
+
+       local_root = mdt_object_find(env, mdt, rootfid);
+       if (IS_ERR(local_root))
+               RETURN(local_root);
+
+       obj = mdt_object_new(env, mdt, fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       spec->sp_cr_lookup = 0;
+       spec->sp_feat = &dt_directory_features;
+       spec->sp_cr_mode = MDL_MINMODE; /* no lock */
+       spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
+       if (attr->ma_valid & MA_LOV) {
+               spec->u.sp_ea.eadata = attr->ma_lmm;
+               spec->u.sp_ea.eadatalen = attr->ma_lmm_size;
+               spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+       } else {
+               spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+       }
+
+       lname = mdt_name(env, (char *)name, sizeof(name) - 1);
+       rc = mdo_create(env, mdt_object_child(local_root), lname,
+                       mdt_object_child(obj), spec, attr);
+       if (rc == 0) {
+               rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
+               if (rc < 0)
+                       CERROR("%s: cannot open volatile file "DFID", orphan "
+                              "file will be left in PENDING directory until "
+                              "next reboot, rc = %d\n", mdt_obd_name(mdt),
+                              PFID(fid), rc);
+       }
+       EXIT;
+
+out:
+       if (rc < 0) {
+               if (!IS_ERR(obj))
+                       mdt_object_put(env, obj);
+               obj = ERR_PTR(rc);
+       }
+       mdt_object_put(env, local_root);
+       return obj;
+}
+
+static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct md_attr *ma)
+{
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LAYOUT];
+       struct close_data      *data;
+       struct ldlm_lock       *lease;
+       struct mdt_object      *orphan;
+       struct md_attr         *orp_ma;
+       struct lu_buf          *buf;
+       bool                    lease_broken;
+       int                     rc;
+       int                     rc2;
+       ENTRY;
+
+       data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+       if (data == NULL)
+               RETURN(-EPROTO);
+
+       lease = ldlm_handle2lock(&data->cd_handle);
+       if (lease == NULL)
+               RETURN(-ESTALE);
+
+       /* try to hold open_sem so that nobody else can open the file */
+       if (!down_write_trylock(&o->mot_open_sem)) {
+               ldlm_lock_cancel(lease);
+               ldlm_lock_put(lease);
+               RETURN(-EBUSY);
+       }
+
+       /* Check if the lease open lease has already canceled */
+       lock_res_and_lock(lease);
+       lease_broken = ldlm_is_cancel(lease);
+       unlock_res_and_lock(lease);
+
+       LDLM_DEBUG(lease, DFID " lease broken? %d\n",
+                  PFID(mdt_object_fid(o)), lease_broken);
+
+       /* Cancel server side lease. Client side counterpart should
+        * have been cancelled. It's okay to cancel it now as we've
+        * held mot_open_sem. */
+       ldlm_lock_cancel(lease);
+       ldlm_lock_put(lease);
+
+       if (lease_broken) /* don't perform release task */
+               GOTO(out_unlock, rc = -ESTALE);
+
+       if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
+               GOTO(out_unlock, rc = -EINVAL);
+
+       /* ma_need was set before but it seems fine to change it in order to
+        * avoid modifying the one from RPC */
+       ma->ma_need = MA_HSM | MA_LOV;
+       rc = mdt_attr_get_complex(info, o, ma);
+       if (rc != 0)
+               GOTO(out_unlock, rc);
+
+       if (!mdt_hsm_release_allow(ma))
+               GOTO(out_unlock, rc = -EPERM);
+
+       /* already released? */
+       if (ma->ma_hsm.mh_flags & HS_RELEASED)
+               GOTO(out_unlock, rc = 0);
+
+       /* Compare on-disk and packed data_version */
+       if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
+               CDEBUG(D_HSM, DFID" data_version mismatches: packed="LPU64
+                      " and on-disk="LPU64"\n", PFID(mdt_object_fid(o)),
+                      data->cd_data_version, ma->ma_hsm.mh_arch_ver);
+               /* XXX: Enable this line when hsm_archive is operational!
+               GOTO(out_unlock, rc = -EPERM);
+               */
+       }
+
+       ma->ma_valid = MA_INODE;
+       ma->ma_attr.la_valid &= LA_SIZE | LA_MTIME | LA_ATIME;
+       rc = mo_attr_set(info->mti_env, mdt_object_child(o), ma);
+       if (rc < 0)
+               GOTO(out_unlock, rc);
+
+       if (!(ma->ma_valid & MA_LOV)) {
+               /* Even empty file are released */
+               memset(ma->ma_lmm, 0, sizeof(*ma->ma_lmm));
+               ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+               ma->ma_lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+               ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE);
+               ma->ma_valid |= MA_LOV;
+       } else {
+               /* Magic must be LOV_MAGIC_Vx_DEF otherwise LOD will interpret
+                * ma_lmm as lov_user_md, then it will be confused by union of
+                * layout_gen and stripe_offset. */
+               if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1)
+                       ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+               else if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3)
+                       ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF);
+               else
+                       GOTO(out_unlock, rc = -EINVAL);
+       }
+
+       /* Set file as released */
+       ma->ma_lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
+
+       /* Hopefully it's not used in this call path */
+       orp_ma = &info->mti_u.som.attr;
+       orp_ma->ma_valid = MA_INODE | MA_LOV;
+       orp_ma->ma_attr.la_mode = S_IFREG;
+       orp_ma->ma_attr.la_valid = LA_MODE;
+       orp_ma->ma_lmm = ma->ma_lmm;
+       orp_ma->ma_lmm_size = ma->ma_lmm_size;
+       orphan = mdt_orphan_open(info, info->mti_mdt, &data->cd_fid, orp_ma,
+                                FMODE_WRITE);
+       if (IS_ERR(orphan)) {
+               CERROR("%s: cannot open orphan file "DFID": rc = %ld\n",
+                      mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid),
+                      PTR_ERR(orphan));
+               GOTO(out_unlock, rc = PTR_ERR(orphan));
+       }
+
+       /* Set up HSM attribute for orphan object */
+       CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       buf = &info->mti_buf;
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(struct hsm_attrs);
+       ma->ma_hsm.mh_flags |= HS_RELEASED;
+       lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
+       ma->ma_hsm.mh_flags &= ~HS_RELEASED;
+       rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
+                         XATTR_NAME_HSM, 0);
+       if (rc < 0)
+               GOTO(out_close, rc);
+
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT, MDT_LOCAL_LOCK);
+       if (rc == 0) {
+               /* Swap layout with orphan object */
+               rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
+                                    mdt_object_child(orphan),
+                                    SWAP_LAYOUTS_MDS_HSM);
+
+               /* Release exclusive LL */
+               mdt_object_unlock(info, o, lh, 1);
+       }
+       EXIT;
+
+out_close:
+       /* Close orphan object anyway */
+       rc2 = mo_close(info->mti_env, mdt_object_child(orphan), orp_ma,
+                      FMODE_WRITE);
+       if (rc2 < 0)
+               CERROR("%s: error closing volatile file "DFID": rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
+       LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
+                       "object closed\n");
+       mdt_object_put(info->mti_env, orphan);
+
+out_unlock:
+       up_write(&o->mot_open_sem);
+
+       if (rc == 0) { /* already released */
+               struct mdt_body *repbody;
+               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+               LASSERT(repbody != NULL);
+               repbody->valid |= OBD_MD_FLRELEASED;
+       }
+
+       ma->ma_valid = 0;
+       ma->ma_need = 0;
+       return rc;
+}
+
 #define MFD_CLOSED(mode) (((mode) & ~(MDS_FMODE_EPOCH | MDS_FMODE_SOM | \
                                       MDS_FMODE_TRUNC)) == MDS_FMODE_CLOSED)
 
@@ -1749,11 +2125,21 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
         struct md_attr *ma = &info->mti_attr;
         int ret = MDT_IOEPOCH_CLOSED;
         int rc = 0;
-        int mode;
+       __u64 mode;
         ENTRY;
 
         mode = mfd->mfd_mode;
 
+       if (ma->ma_attr_flags & MDS_HSM_RELEASE) {
+               rc = mdt_hsm_release(info, o, ma);
+               if (rc < 0) {
+                       CDEBUG(D_HSM, "%s: File " DFID " release failed: %d\n",
+                               mdt_obd_name(info->mti_mdt),
+                               PFID(mdt_object_fid(o)), rc);
+                       /* continue to close even error occurred. */
+               }
+       }
+
         if ((mode & FMODE_WRITE) || (mode & MDS_FMODE_TRUNC)) {
                 mdt_write_put(o);
                 ret = mdt_ioepoch_close(info, o);
@@ -1774,22 +2160,8 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                 rc = mo_attr_set(info->mti_env, next, ma);
         }
 
-       /* If file data is modified, add the dirty flag.
-        *
-        * If MDS_CLOSE_CLEANUP is set, this file is being closed due to an
-        * eviction, file could have been modified and now dirty
-        * regarding to HSM archive, check this!
-        * The logic here is to mark a file dirty if there's a chance it was
-        * dirtied before the client was evicted, so that we don't have to wait
-        * for a release attempt before finding out the file was actually dirty
-        * and fail the release. Aggressively marking it dirty here will cause
-        * the policy engine to attempt to re-archive it; when rearchiving, we
-        * can compare the current version to the LMA data_version and make the
-        * archive request into a noop if it's not actually dirty.
-        */
-       if ((ma->ma_attr_flags & MDS_DATA_MODIFIED) ||
-           ((ma->ma_attr_flags & MDS_CLOSE_CLEANUP) &&
-            (mode & (FMODE_WRITE|MDS_FMODE_TRUNC))))
+       /* If file data is modified, add the dirty flag. */
+       if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
                rc = mdt_add_dirty_flag(info, o, ma);
 
         ma->ma_need |= MA_INODE;
@@ -1822,11 +2194,19 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                                "needed on "DFID"\n", PFID(mdt_object_fid(o)));
                 }
         } else {
-                mdt_mfd_free(mfd);
-                mdt_object_put(info->mti_env, o);
-        }
+               /* adjust open and lease count */
+               if (mode & MDS_OPEN_LEASE) {
+                       LASSERT(atomic_read(&o->mot_lease_count) > 0);
+                       atomic_dec(&o->mot_lease_count);
+               }
+               LASSERT(atomic_read(&o->mot_open_count) > 0);
+               atomic_dec(&o->mot_open_count);
 
-        RETURN(rc ? rc : ret);
+               mdt_mfd_free(mfd);
+               mdt_object_put(info->mti_env, o);
+       }
+
+       RETURN(rc ? rc : ret);
 }
 
 int mdt_close(struct mdt_thread_info *info)