Whamcloud - gitweb
LU-3616 mdt: deny write on executed file during restore
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 5c42b38..d3c3081 100644 (file)
@@ -77,7 +77,8 @@ struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
 /*
  * Find the mfd pointed to by handle in global hash table.
  * In case of replay the handle is obsoleted
- * but mfd can be found in mfd list by that handle
+ * but mfd can be found in mfd list by that handle.
+ * Callers need to be holding med_open_lock.
  */
 struct mdt_file_data *mdt_handle2mfd(struct mdt_export_data *med,
                                     const struct lustre_handle *handle,
@@ -457,7 +458,7 @@ static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
                 RETURN(mdt_ioepoch_close_on_eviction(info, o));
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
                 RETURN(mdt_ioepoch_close_on_replay(info, o));
-        if (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)
+       if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_EPOCH_CLOSE))
                 RETURN(mdt_ioepoch_close_reg(info, o));
         /* IO epoch is not closed. */
         RETURN(MDT_IOEPOCH_OPENED);
@@ -503,7 +504,8 @@ int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
                 ioepoch =  info->mti_ioepoch ?
                         info->mti_ioepoch->ioepoch : o->mot_ioepoch;
 
-                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
+               if (req != NULL
+                   && !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
                         rc = mdt_som_attr_set(info, o, ioepoch, act);
                 mdt_object_som_enable(o, ioepoch);
         }
@@ -711,13 +713,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                         repbody->ioepoch = o->mot_ioepoch;
                 }
         } else if (flags & MDS_FMODE_EXEC) {
-               /* if file is released, we can't deny write because we must
-                * restore (write) it to access it.*/
-               if ((ma->ma_valid & MA_HSM) &&
-                   (ma->ma_hsm.mh_flags & HS_RELEASED))
-                       rc = 0;
-               else
-                       rc = mdt_write_deny(o);
+               rc = mdt_write_deny(o);
         }
         if (rc)
                 RETURN(rc);
@@ -761,13 +757,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 * restart replay, so there maybe some orphan
                 * mfd here, we should remove them */
                LASSERT(info->mti_rr.rr_handle != NULL);
+               spin_lock(&med->med_open_lock);
                old_mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle, true);
                if (old_mfd != NULL) {
                        CDEBUG(D_HA, "delete orphan mfd = %p, fid = "DFID", "
                               "cookie = "LPX64"\n", mfd,
                               PFID(mdt_object_fid(mfd->mfd_object)),
                               info->mti_rr.rr_handle->cookie);
-                       spin_lock(&med->med_open_lock);
                        class_handle_unhash(&old_mfd->mfd_handle);
                        cfs_list_del_init(&old_mfd->mfd_list);
                        spin_unlock(&med->med_open_lock);
@@ -778,6 +774,12 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                        mdt_mfd_close(info, old_mfd);
                        ma->ma_attr_flags &= ~MDS_RECOV_OPEN;
                        ma->ma_valid &= ~MA_FLAGS;
+               } else {
+                       spin_unlock(&med->med_open_lock);
+                       CDEBUG(D_HA, "orphan mfd not found, fid = "DFID", "
+                              "cookie = "LPX64"\n",
+                              PFID(mdt_object_fid(mfd->mfd_object)),
+                              info->mti_rr.rr_handle->cookie);
                }
 
                CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
@@ -897,7 +899,7 @@ int mdt_finish_open(struct mdt_thread_info *info,
         }
 #endif
 
-       if (info->mti_mdt->mdt_opts.mo_mds_capa &&
+       if (info->mti_mdt->mdt_lut.lut_mds_capa &&
            exp_connect_flags(exp) & OBD_CONNECT_MDS_CAPA) {
                 struct lustre_capa *capa;
 
@@ -909,8 +911,7 @@ int mdt_finish_open(struct mdt_thread_info *info,
                         RETURN(rc);
                 repbody->valid |= OBD_MD_FLMDSCAPA;
         }
-
-       if (info->mti_mdt->mdt_opts.mo_oss_capa &&
+       if (info->mti_mdt->mdt_lut.lut_oss_capa &&
            exp_connect_flags(exp) & OBD_CONNECT_OSS_CAPA &&
            S_ISREG(lu_object_attr(&o->mot_obj))) {
                 struct lustre_capa *capa;
@@ -1214,11 +1215,7 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                if (open_flags & MDS_OPEN_LOCK) {
                        if (open_flags & FMODE_WRITE)
                                lm = LCK_CW;
-                       /* if file is released, we can't deny write because we must
-                        * restore (write) it to access it. */
-                       else if ((open_flags & MDS_FMODE_EXEC) &&
-                                !((ma->ma_valid & MA_HSM) &&
-                                  (ma->ma_hsm.mh_flags & HS_RELEASED)))
+                       else if (open_flags & MDS_FMODE_EXEC)
                                lm = LCK_PR;
                        else
                                lm = LCK_CR;
@@ -1484,12 +1481,6 @@ out:
        return rc;
 }
 
-int mdt_pin(struct mdt_thread_info* info)
-{
-        ENTRY;
-        RETURN(err_serious(-EOPNOTSUPP));
-}
-
 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
 static int mdt_cross_open(struct mdt_thread_info *info,
                          const struct lu_fid *parent_fid,
@@ -1967,6 +1958,9 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        int                     rc2;
        ENTRY;
 
+       if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
+
        data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
        if (data == NULL)
                RETURN(-EPROTO);
@@ -1978,8 +1972,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        /* try to hold open_sem so that nobody else can open the file */
        if (!down_write_trylock(&o->mot_open_sem)) {
                ldlm_lock_cancel(lease);
-               LDLM_LOCK_PUT(lease);
-               RETURN(-EBUSY);
+               GOTO(out_reprocess, rc = -EBUSY);
        }
 
        /* Check if the lease open lease has already canceled */
@@ -1994,7 +1987,6 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
         * have been cancelled. It's okay to cancel it now as we've
         * held mot_open_sem. */
        ldlm_lock_cancel(lease);
-       LDLM_LOCK_PUT(lease);
 
        if (lease_broken) /* don't perform release task */
                GOTO(out_unlock, rc = -ESTALE);
@@ -2083,22 +2075,25 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        ma->ma_hsm.mh_flags |= HS_RELEASED;
        lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
        ma->ma_hsm.mh_flags &= ~HS_RELEASED;
+
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
+                            MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
+       if (rc != 0)
+               GOTO(out_close, rc);
+
        rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
                          XATTR_NAME_HSM, 0);
-       if (rc < 0)
-               GOTO(out_close, rc);
 
-       mdt_lock_reg_init(lh, LCK_EX);
-       rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT, MDT_LOCAL_LOCK);
-       if (rc == 0) {
+       if (rc == 0)
                /* Swap layout with orphan object */
                rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
                                     mdt_object_child(orphan),
                                     SWAP_LAYOUTS_MDS_HSM);
 
-               /* Release exclusive LL */
-               mdt_object_unlock(info, o, lh, 1);
-       }
+       /* Release exclusive LL */
+       mdt_object_unlock(info, o, lh, 1);
+
        EXIT;
 
 out_close:
@@ -2122,6 +2117,10 @@ out_unlock:
                repbody->valid |= OBD_MD_FLRELEASED;
        }
 
+out_reprocess:
+       ldlm_reprocess_all(lease->l_resource);
+       LDLM_LOCK_PUT(lease);
+
        ma->ma_valid = 0;
        ma->ma_need = 0;
 
@@ -2227,22 +2226,23 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        RETURN(rc ? rc : ret);
 }
 
-int mdt_close(struct mdt_thread_info *info)
+int mdt_close(struct tgt_session_info *tsi)
 {
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
         struct mdt_object      *o;
         struct md_attr         *ma = &info->mti_attr;
         struct mdt_body        *repbody = NULL;
-        struct ptlrpc_request  *req = mdt_info_req(info);
         int rc, ret = 0;
         ENTRY;
 
        mdt_counter_incr(req, LPROC_MDT_CLOSE);
-        /* Close may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Close may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
         LASSERT(info->mti_ioepoch);
 
@@ -2256,7 +2256,7 @@ int mdt_close(struct mdt_thread_info *info)
                 if (rc == 0)
                         mdt_fix_reply(info);
                mdt_exit_ucred(info);
-                RETURN(lustre_msg_get_status(req->rq_repmsg));
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
         }
 
         /* Continue to close handle even if we can not pack reply */
@@ -2311,13 +2311,15 @@ int mdt_close(struct mdt_thread_info *info)
         }
 
        mdt_exit_ucred(info);
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
-                RETURN(err_serious(-ENOMEM));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
+               GOTO(out, rc = err_serious(-ENOMEM));
 
-        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
-                                 OBD_FAIL_MDS_CLOSE_NET_REP))
-                info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
-        RETURN(rc ? rc : ret);
+       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
+                                OBD_FAIL_MDS_CLOSE_NET_REP))
+               tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
+out:
+       mdt_thread_info_fini(info);
+       RETURN(rc ? rc : ret);
 }
 
 /**
@@ -2328,35 +2330,35 @@ int mdt_close(struct mdt_thread_info *info)
  * and got a trasid. Waiting for such DONE_WRITING is not reliable, so just
  * skip attributes and reconstruct the reply here.
  */
-int mdt_done_writing(struct mdt_thread_info *info)
+int mdt_done_writing(struct tgt_session_info *tsi)
 {
-        struct ptlrpc_request   *req = mdt_info_req(info);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
         struct mdt_body         *repbody = NULL;
         struct mdt_export_data  *med;
         struct mdt_file_data    *mfd;
         int rc;
         ENTRY;
 
-        rc = req_capsule_server_pack(info->mti_pill);
-        if (rc)
-                RETURN(err_serious(rc));
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
-        repbody = req_capsule_server_get(info->mti_pill,
-                                         &RMF_MDT_BODY);
-        repbody->eadatasize = 0;
-        repbody->aclsize = 0;
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
+       repbody->eadatasize = 0;
+       repbody->aclsize = 0;
 
-        /* Done Writing may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Done Writing may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
        if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
                mdt_exit_ucred(info);
-               RETURN(lustre_msg_get_status(req->rq_repmsg));
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
        }
 
-        med = &info->mti_exp->exp_mdt_data;
+       med = &info->mti_exp->exp_mdt_data;
        spin_lock(&med->med_open_lock);
        mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
                             req_is_replay(req));
@@ -2398,5 +2400,7 @@ int mdt_done_writing(struct mdt_thread_info *info)
         mdt_empty_transno(info, rc);
 error_ucred:
        mdt_exit_ucred(info);
+out:
+       mdt_thread_info_fini(info);
        RETURN(rc);
 }