Whamcloud - gitweb
LU-10308 misc: update Intel copyright messages for 2017
[fs/lustre-release.git] / lustre / mdt / mdt_hsm_cdt_client.c
index 8264070..9d0335c 100644 (file)
@@ -23,6 +23,7 @@
  * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
  *     alternatives
  *
+ * Copyright (c) 2013, 2017, Intel Corporation.
  */
 /*
  * lustre/mdt/mdt_hsm_cdt_client.c
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <obd_support.h>
-#include <lustre_net.h>
 #include <lustre_export.h>
 #include <obd.h>
-#include <obd_lov.h>
 #include <lprocfs_status.h>
 #include <lustre_log.h>
 #include "mdt_internal.h"
@@ -155,7 +154,8 @@ static int hsm_find_compatible(const struct lu_env *env, struct mdt_device *mdt,
        hcdcb.cdt = &mdt->mdt_coordinator;
        hcdcb.hal = hal;
 
-       rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb);
+       rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb, 0, 0,
+                             READ);
 
        RETURN(rc);
 }
@@ -200,8 +200,8 @@ static bool hsm_action_is_needed(struct hsm_action_item *hai, int hal_an,
                is_needed = true;
                break;
        }
-       CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64
-                     " extent="LPX64"-"LPX64" hsm_flags=%X %s\n",
+       CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags=%#llx"
+                     " extent=%#llx-%#llx hsm_flags=%X %s\n",
                      PFID(&hai->hai_fid),
                      hsm_copytool_action2name(hai->hai_action), rq_flags,
                      hai->hai_extent.offset, hai->hai_extent.length,
@@ -245,47 +245,57 @@ static bool hal_is_sane(struct hsm_action_list *hal)
        RETURN(true);
 }
 
-/*
- * Coordinator external API
- */
-
-/**
- * register a list of requests
- * \param mti [IN]
- * \param hal [IN] list of requests
- * \param compound_id [OUT] id of the compound request
- * \retval 0 success
- * \retval -ve failure
- * in case of restore, caller must hold layout lock
- */
-int mdt_hsm_add_actions(struct mdt_thread_info *mti,
-                       struct hsm_action_list *hal, __u64 *compound_id)
+static int
+hsm_action_permission(struct mdt_thread_info *mti,
+                     struct mdt_object *obj,
+                     enum hsm_copytool_action hsma)
 {
-       struct mdt_device       *mdt = mti->mti_mdt;
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-       struct hsm_action_item  *hai;
-       struct mdt_object       *obj = NULL;
-       int                      rc = 0, i;
-       struct md_hsm            mh;
-       bool                     is_restore = false;
+       struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
+       struct lu_ucred *uc = mdt_ucred(mti);
+       struct md_attr *ma = &mti->mti_attr;
+       const __u64 *mask;
+       int rc;
        ENTRY;
 
-       /* no coordinator started, so we cannot serve requests */
-       if (cdt->cdt_state == CDT_STOPPED)
-               RETURN(-EAGAIN);
+       if (hsma != HSMA_RESTORE && mdt_rdonly(mti->mti_exp))
+               RETURN(-EROFS);
 
-       if (!hal_is_sane(hal))
+       if (md_capable(uc, CFS_CAP_SYS_ADMIN))
+               RETURN(0);
+
+       ma->ma_need = MA_INODE;
+       rc = mdt_attr_get_complex(mti, obj, ma);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (uc->uc_fsuid == ma->ma_attr.la_uid)
+               mask = &cdt->cdt_user_request_mask;
+       else if (lustre_in_group_p(uc, ma->ma_attr.la_gid))
+               mask = &cdt->cdt_group_request_mask;
+       else
+               mask = &cdt->cdt_other_request_mask;
+
+       if (!(0 <= hsma && hsma < 8 * sizeof(*mask)))
                RETURN(-EINVAL);
 
-       *compound_id = atomic_inc_return(&cdt->cdt_compound_id);
+       RETURN(*mask & (1UL << hsma) ? 0 : -EPERM);
+}
 
-       /* search for compatible request, if found hai_cookie is set
-        * to the request cookie
-        * it is also used to set the cookie for cancel request by FID
-        */
-       rc = hsm_find_compatible(mti->mti_env, mdt, hal);
-       if (rc)
-               GOTO(out, rc);
+/* Process a single HAL. hsm_find_compatible has already been called
+ * on it. */
+static int mdt_hsm_register_hal(struct mdt_thread_info *mti,
+                               struct mdt_device *mdt,
+                               struct coordinator *cdt,
+                               struct hsm_action_list *hal)
+{
+       struct hsm_action_item  *hai;
+       struct mdt_object       *obj = NULL;
+       int                      rc, i;
+       struct md_hsm            mh;
+       bool                     is_restore = false;
+       __u64                    compound_id;
+
+       compound_id = atomic_inc_return(&cdt->cdt_compound_id);
 
        hai = hai_first(hal);
        for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
@@ -321,23 +331,30 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                 * if restore, we take the layout lock
                 */
 
-               /* if action is cancel, also no need to check */
-               if (hai->hai_action == HSMA_CANCEL)
-                       goto record;
-
-               /* get HSM attributes */
+               /* Get HSM attributes and check permissions. */
                obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
-               if (IS_ERR(obj) || obj == NULL) {
-                       /* in case of archive remove, Lustre file
-                        * is not mandatory */
-                       if (hai->hai_action == HSMA_REMOVE)
+               if (IS_ERR(obj)) {
+                       /* In case of REMOVE and CANCEL a Lustre file
+                        * is not mandatory, but restrict this
+                        * exception to admins. */
+                       if (md_capable(mdt_ucred(mti), CFS_CAP_SYS_ADMIN) &&
+                           (hai->hai_action == HSMA_REMOVE ||
+                            hai->hai_action == HSMA_CANCEL))
                                goto record;
-                       if (obj == NULL)
-                               GOTO(out, rc = -ENOENT);
-                       GOTO(out, rc = PTR_ERR(obj));
+                       else
+                               GOTO(out, rc = PTR_ERR(obj));
                }
+
+               rc = hsm_action_permission(mti, obj, hai->hai_action);
                mdt_object_put(mti->mti_env, obj);
 
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               /* if action is cancel, also no need to check */
+               if (hai->hai_action == HSMA_CANCEL)
+                       goto record;
+
                /* Check if an action is needed, compare request
                 * and HSM flags status */
                if (!hsm_action_is_needed(hai, archive_id, flags, &mh))
@@ -351,6 +368,7 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
 
                /* for cancel archive number is taken from canceled request
                 * for other request, we take from lma if not specified,
+                * or we use the default if none found in lma
                 * this works also for archive because the default value is 0
                 * /!\ there is a side effect: in case of restore on multiple
                 * files which are in different backend, the initial compound
@@ -358,51 +376,71 @@ int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                 * warranty an agent can serve any combinaison of archive
                 * backend
                 */
-               if (hai->hai_action != HSMA_CANCEL && archive_id == 0)
-                       archive_id = mh.mh_arch_id;
+               if (hai->hai_action != HSMA_CANCEL && archive_id == 0) {
+                       if (mh.mh_arch_id != 0)
+                               archive_id = mh.mh_arch_id;
+                       else
+                               archive_id = cdt->cdt_default_archive_id;
+               }
 
                /* if restore, take an exclusive lock on layout */
                if (hai->hai_action == HSMA_RESTORE) {
-                       struct cdt_restore_handle       *crh;
-                       struct mdt_object               *child;
+                       struct cdt_restore_handle *crh;
+
+                       /* in V1 only whole file is supported. */
+                       if (hai->hai_extent.offset != 0)
+                               GOTO(out, rc = -EPROTO);
 
                        OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
                        if (crh == NULL)
                                GOTO(out, rc = -ENOMEM);
 
                        crh->crh_fid = hai->hai_fid;
-                       /* in V1 only whole file is supported
-                       crh->extent.start = hai->hai_extent.offset;
-                       crh->extent.end = hai->hai_extent.offset +
-                                           hai->hai_extent.length;
-                        */
+                       /* in V1 only whole file is supported. However the
+                        * restore may be due to truncate. */
                        crh->crh_extent.start = 0;
-                       crh->crh_extent.end = OBD_OBJECT_EOF;
+                       crh->crh_extent.end = hai->hai_extent.length;
 
                        mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
-                       child = mdt_object_find_lock(mti, &crh->crh_fid,
-                                                    &crh->crh_lh,
-                                                    MDS_INODELOCK_LAYOUT);
-                       if (IS_ERR(child)) {
-                               rc = PTR_ERR(child);
+                       obj = mdt_object_find_lock(mti, &crh->crh_fid,
+                                                  &crh->crh_lh,
+                                                  MDS_INODELOCK_LAYOUT);
+                       if (IS_ERR(obj)) {
+                               rc = PTR_ERR(obj);
                                CERROR("%s: cannot take layout lock for "
                                       DFID": rc = %d\n", mdt_obd_name(mdt),
                                       PFID(&crh->crh_fid), rc);
                                OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
                                GOTO(out, rc);
                        }
+
                        /* we choose to not keep a keep a reference
                         * on the object during the restore time which can be
                         * very long */
-                       mdt_object_put(mti->mti_env, child);
+                       mdt_object_put(mti->mti_env, obj);
 
                        mutex_lock(&cdt->cdt_restore_lock);
+                       if (unlikely((cdt->cdt_state == CDT_STOPPED) ||
+                                    (cdt->cdt_state == CDT_STOPPING))) {
+                               mutex_unlock(&cdt->cdt_restore_lock);
+                               mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
+                               OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+                               GOTO(out, rc = -EAGAIN);
+                       }
                        list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
                        mutex_unlock(&cdt->cdt_restore_lock);
                }
 record:
+               /*
+                * Wait here to catch the 2nd RESTORE request to the same FID.
+                * Normally layout lock protects against adding such request.
+                * But when cdt is stopping it cancel all locks via
+                * ldlm_resource_clean and protections may not work.
+                * See LU-9266 and sanity-hsm_407 for details.
+                */
+               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_HSM_CDT_DELAY, cfs_fail_val);
                /* record request */
-               rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id,
+               rc = mdt_agent_record_add(mti->mti_env, mdt, compound_id,
                                          archive_id, flags, hai);
                if (rc)
                        GOTO(out, rc);
@@ -414,47 +452,55 @@ record:
                rc = 0;
 
        GOTO(out, rc);
-out:
-       /* if work has been added, wake up coordinator */
-       if (rc == 0 || rc == -ENODATA)
-               mdt_hsm_cdt_wakeup(mdt);
 
+out:
        return rc;
 }
 
+/*
+ * Coordinator external API
+ */
+
 /**
- * get running action on a FID list or from cookie
+ * register a list of requests
  * \param mti [IN]
- * \param hal [IN/OUT] requests
+ * \param hal [IN] list of requests
  * \retval 0 success
  * \retval -ve failure
+ * in case of restore, caller must hold layout lock
  */
-int mdt_hsm_get_running(struct mdt_thread_info *mti,
+int mdt_hsm_add_actions(struct mdt_thread_info *mti,
                        struct hsm_action_list *hal)
 {
        struct mdt_device       *mdt = mti->mti_mdt;
        struct coordinator      *cdt = &mdt->mdt_coordinator;
-       struct hsm_action_item  *hai;
-       int                      i;
+       int                      rc;
        ENTRY;
 
-       hai = hai_first(hal);
-       for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
-               struct cdt_agent_req *car;
+       /* no coordinator started, so we cannot serve requests */
+       if (cdt->cdt_state == CDT_STOPPED)
+               RETURN(-EAGAIN);
 
-               if (!fid_is_sane(&hai->hai_fid))
-                       RETURN(-EINVAL);
+       if (!hal_is_sane(hal))
+               RETURN(-EINVAL);
 
-               car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
-               if (car == NULL) {
-                       hai->hai_cookie = 0;
-                       hai->hai_action = HSMA_NONE;
-               } else {
-                       *hai = *car->car_hai;
-                       mdt_cdt_put_request(car);
-               }
-       }
-       RETURN(0);
+       /* search for compatible request, if found hai_cookie is set
+        * to the request cookie
+        * it is also used to set the cookie for cancel request by FID
+        */
+       rc = hsm_find_compatible(mti->mti_env, mdt, hal);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdt_hsm_register_hal(mti, mdt, cdt, hal);
+
+       GOTO(out, rc);
+out:
+       /* if work has been added, signal the coordinator */
+       if (rc == 0 || rc == -ENODATA)
+               mdt_hsm_cdt_event(cdt);
+
+       return rc;
 }
 
 /**
@@ -532,7 +578,7 @@ int mdt_hsm_get_actions(struct mdt_thread_info *mti,
        for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
                struct cdt_agent_req *car;
 
-               car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
+               car = mdt_cdt_find_request(cdt, hai->hai_cookie);
                if (car == NULL) {
                        hai->hai_cookie = 0;
                } else {
@@ -550,4 +596,3 @@ int mdt_hsm_get_actions(struct mdt_thread_info *mti,
 
        RETURN(0);
 }
-