Whamcloud - gitweb
LU-15132 hsm: Protect against parallel HSM restore requests
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
index d19a592..b321d94 100644 (file)
@@ -47,8 +47,6 @@
 #include <lustre_kernelcomm.h>
 #include "mdt_internal.h"
 
-static struct lprocfs_vars lprocfs_mdt_hsm_vars[];
-
 /**
  * get obj and HSM attributes on a fid
  * \param mti [IN] context
@@ -143,6 +141,8 @@ struct hsm_scan_data {
         */
        bool                     hsd_housekeeping;
        bool                     hsd_one_restore;
+       u32                      hsd_start_cat_idx;
+       u32                      hsd_start_rec_idx;
        int                      hsd_action_count;
        int                      hsd_request_len; /* array alloc len */
        int                      hsd_request_count; /* array used count */
@@ -160,6 +160,7 @@ static int mdt_cdt_waiting_cb(const struct lu_env *env,
        struct hsm_action_item *hai;
        size_t hai_size;
        u32 archive_id;
+       bool wrapped;
        int i;
 
        /* Are agents full? */
@@ -295,13 +296,23 @@ static int mdt_cdt_waiting_cb(const struct lu_env *env,
                break;
        case HSMA_RESTORE:
                hsd->hsd_one_restore = true;
-               /* Intentional fallthrough */
+               fallthrough;
        default:
                cdt_agent_record_hash_add(cdt, hai->hai_cookie,
                                          llh->lgh_hdr->llh_cat_idx,
                                          larr->arr_hdr.lrh_index);
        }
 
+       wrapped = llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
+                 llh->lgh_hdr->llh_count > 1;
+       if ((!wrapped && llh->lgh_hdr->llh_cat_idx > hsd->hsd_start_cat_idx) ||
+           (wrapped && llh->lgh_hdr->llh_cat_idx < hsd->hsd_start_cat_idx) ||
+           (llh->lgh_hdr->llh_cat_idx == hsd->hsd_start_cat_idx &&
+            larr->arr_hdr.lrh_index > hsd->hsd_start_rec_idx)) {
+               hsd->hsd_start_cat_idx = llh->lgh_hdr->llh_cat_idx;
+               hsd->hsd_start_rec_idx = larr->arr_hdr.lrh_index;
+       }
+
        RETURN(0);
 }
 
@@ -477,6 +488,9 @@ static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
        mutex_lock(&cdt->cdt_restore_lock);
        list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_handle_list,
                                 crh_list) {
+               /* not locked yet, cleanup by cdt_restore_handle_add() */
+               if (crh->crh_lh.mlh_type == MDT_NUL_LOCK)
+                       continue;
                list_del(&crh->crh_list);
                /* give back layout lock */
                mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
@@ -538,7 +552,31 @@ static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state)
        return rc;
 }
 
+static int mdt_hsm_pending_restore(struct mdt_thread_info *mti);
 
+static void cdt_start_pending_restore(struct mdt_device *mdt,
+                                     struct coordinator *cdt)
+{
+       struct mdt_thread_info *cdt_mti;
+       unsigned int i = 0;
+       int rc;
+
+       /* wait until MDD initialize hsm actions llog */
+       while (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state) && i < obd_timeout) {
+               schedule_timeout_interruptible(cfs_time_seconds(1));
+               i++;
+       }
+       if (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state))
+               CWARN("%s: trying to init HSM before MDD\n", mdt_obd_name(mdt));
+
+       /* set up list of started restore requests */
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(cdt_mti);
+       if (rc)
+               CERROR("%s: cannot take the layout locks needed for registered restore: %d\n",
+                      mdt_obd_name(mdt), rc);
+
+}
 
 /**
  * coordinator thread
@@ -567,13 +605,16 @@ static int mdt_coordinator(void *data)
        set_cdt_state(cdt, CDT_RUNNING);
 
        /* Inform mdt_hsm_cdt_start(). */
-       wake_up_all(&cdt->cdt_waitq);
+       wake_up(&cdt->cdt_waitq);
+       cdt_start_pending_restore(mdt, cdt);
 
        while (1) {
                int i;
                int update_idx = 0;
                int updates_sz;
                int updates_cnt;
+               u32 start_cat_idx;
+               u32 start_rec_idx;
                struct hsm_record_update *updates;
 
                /* Limit execution of the expensive requests traversal
@@ -607,8 +648,12 @@ static int mdt_coordinator(void *data)
                    ktime_get_real_seconds()) {
                        last_housekeeping = ktime_get_real_seconds();
                        hsd.hsd_housekeeping = true;
+                       start_cat_idx = 0;
+                       start_rec_idx = 0;
                } else if (cdt->cdt_event) {
                        hsd.hsd_housekeeping = false;
+                       start_cat_idx = hsd.hsd_start_cat_idx;
+                       start_rec_idx = hsd.hsd_start_rec_idx;
                } else {
                        continue;
                }
@@ -646,7 +691,8 @@ static int mdt_coordinator(void *data)
                hsd.hsd_one_restore = false;
 
                rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
-                                     &hsd, 0, 0, WRITE);
+                                     &hsd, start_cat_idx, start_rec_idx,
+                                     WRITE);
                if (rc < 0)
                        goto clean_cb_alloc;
 
@@ -656,6 +702,9 @@ static int mdt_coordinator(void *data)
                if (list_empty(&cdt->cdt_agents)) {
                        CDEBUG(D_HSM, "no agent available, "
                                      "coordinator sleeps\n");
+                       /* reset HSM scanning index range. */
+                       hsd.hsd_start_cat_idx = start_cat_idx;
+                       hsd.hsd_start_rec_idx = start_rec_idx;
                        goto clean_cb_alloc;
                }
 
@@ -708,11 +757,19 @@ static int mdt_coordinator(void *data)
                                hai = hai_next(hai);
                                update_idx++;
                        }
+
+                       /* TODO: narrow down the HSM action range that already
+                        * scanned accroding to the cookies when a failure
+                        * occurs.
+                        */
+                       if (rc) {
+                               hsd.hsd_start_cat_idx = start_cat_idx;
+                               hsd.hsd_start_rec_idx = start_rec_idx;
+                       }
                }
 
                if (update_idx) {
-                       rc = mdt_agent_record_update(mti->mti_env, mdt,
-                                                    updates, update_idx);
+                       rc = mdt_agent_record_update(mti, updates, update_idx);
                        if (rc)
                                CERROR("%s: mdt_agent_record_update() failed, "
                                       "rc=%d, cannot update records "
@@ -747,10 +804,21 @@ clean_cb_alloc:
        RETURN(rc);
 }
 
+/**
+ * register a new HSM restore handle for a file and take EX lock on the layout
+ * \param mti [IN] thread info
+ * \param cdt [IN] coordinator
+ * \param fid [IN] fid of the file to restore
+ * \param he  [IN] HSM extent
+ * \retval 0 success
+ * \retval 1 restore handle already exists for the fid
+ * \retval -ve failure
+ */
 int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
                           const struct lu_fid *fid,
                           const struct hsm_extent *he)
 {
+       struct mdt_lock_handle lh = { 0 };
        struct cdt_restore_handle *crh;
        struct mdt_object *obj;
        int rc;
@@ -767,31 +835,48 @@ int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
         */
        crh->crh_extent.start = 0;
        crh->crh_extent.end = he->length;
+       crh->crh_lh.mlh_type = MDT_NUL_LOCK;
+
+       mutex_lock(&cdt->cdt_restore_lock);
+       if (cdt_restore_handle_find(cdt, fid) != NULL)
+               GOTO(out_crl, rc = 1);
+
+       if (unlikely(cdt->cdt_state == CDT_STOPPED ||
+                    cdt->cdt_state == CDT_STOPPING))
+               GOTO(out_crl, rc = -EAGAIN);
+
+       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
        /* get the layout lock */
-       mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
-       obj = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
+       mdt_lock_reg_init(&lh, LCK_EX);
+       obj = mdt_object_find_lock(mti, &crh->crh_fid, &lh,
                                   MDS_INODELOCK_LAYOUT);
-       if (IS_ERR(obj))
-               GOTO(out_crh, rc = PTR_ERR(obj));
+       if (IS_ERR(obj)) {
+               mutex_lock(&cdt->cdt_restore_lock);
+               GOTO(out_ldel, rc = PTR_ERR(obj));
+       }
 
        /* We do not keep a reference on the object during the restore
-        * which can be very long. */
+        * which can be very long.
+        */
        mdt_object_put(mti->mti_env, obj);
 
        mutex_lock(&cdt->cdt_restore_lock);
        if (unlikely(cdt->cdt_state == CDT_STOPPED ||
-                    cdt->cdt_state == CDT_STOPPING)) {
-               mutex_unlock(&cdt->cdt_restore_lock);
+                    cdt->cdt_state == CDT_STOPPING))
                GOTO(out_lh, rc = -EAGAIN);
-       }
 
-       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       crh->crh_lh = lh;
        mutex_unlock(&cdt->cdt_restore_lock);
 
        RETURN(0);
 out_lh:
        mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
-out_crh:
+out_ldel:
+       list_del(&crh->crh_list);
+out_crl:
+       mutex_unlock(&cdt->cdt_restore_lock);
        OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
 
        return rc;
@@ -899,6 +984,8 @@ static int hsm_restore_cb(const struct lu_env *env,
        }
 
        rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
+       if (rc == 1)
+               rc = 0;
 out:
        RETURN(rc);
 }
@@ -925,7 +1012,6 @@ static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
 int hsm_init_ucred(struct lu_ucred *uc)
 {
        ENTRY;
-
        uc->uc_valid = UCRED_OLD;
        uc->uc_o_uid = 0;
        uc->uc_o_gid = 0;
@@ -937,7 +1023,7 @@ int hsm_init_ucred(struct lu_ucred *uc)
        uc->uc_fsgid = 0;
        uc->uc_suppgids[0] = -1;
        uc->uc_suppgids[1] = -1;
-       uc->uc_cap = CFS_CAP_FS_MASK;
+       uc->uc_cap = cap_combine(CAP_FS_SET, CAP_NFSD_SET);
        uc->uc_umask = 0777;
        uc->uc_ginfo = NULL;
        uc->uc_identity = NULL;
@@ -1073,11 +1159,11 @@ int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
  */
 static int mdt_hsm_cdt_start(struct mdt_device *mdt)
 {
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
        struct mdt_thread_info *cdt_mti;
-       int                      rc;
-       void                    *ptr;
-       struct task_struct      *task;
+       int rc;
+       void *ptr;
+       struct task_struct *task;
        ENTRY;
 
        /* functions defined but not yet used
@@ -1109,17 +1195,10 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        /* to avoid deadlock when start is made through sysfs
         * sysfs entries are created by the coordinator thread
         */
-       /* set up list of started restore requests */
-       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       rc = mdt_hsm_pending_restore(cdt_mti);
-       if (rc)
-               CERROR("%s: cannot take the layout locks needed"
-                      " for registered restore: %d\n",
-                      mdt_obd_name(mdt), rc);
-
        if (mdt->mdt_bottom->dd_rdonly)
                RETURN(0);
 
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
        task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
@@ -1219,8 +1298,7 @@ int mdt_hsm_add_hal(struct mdt_thread_info *mti,
                                .status = ARS_CANCELED,
                        };
 
-                       rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
-                                                    &update, 1);
+                       rc = mdt_agent_record_update(mti, &update, 1);
                        if (rc) {
                                CERROR("%s: mdt_agent_record_update() failed, "
                                       "rc=%d, cannot update status to %s "
@@ -1644,8 +1722,7 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
                update.cookie = pgs->hpk_cookie;
                update.status = status;
 
-               rc1 = mdt_agent_record_update(mti->mti_env, mdt,
-                                             &update, 1);
+               rc1 = mdt_agent_record_update(mti, &update, 1);
                if (rc1)
                        CERROR("%s: mdt_agent_record_update() failed,"
                               " rc=%d, cannot update status to %s"
@@ -1679,20 +1756,12 @@ out:
 
 
 /**
- * data passed to llog_cat_process() callback
- * to cancel requests
- */
-struct hsm_cancel_all_data {
-       struct mdt_device       *mdt;
-};
-
-/**
  *  llog_cat_process() callback, used to:
  *  - purge all requests
  * \param env [IN] environment
  * \param llh [IN] llog handle
  * \param hdr [IN] llog record
- * \param data [IN] cb data = struct hsm_cancel_all_data
+ * \param data [IN] cb data = struct mdt_thread_info
  * \retval 0 success
  * \retval -ve failure
  */
@@ -1700,18 +1769,28 @@ static int mdt_cancel_all_cb(const struct lu_env *env,
                             struct llog_handle *llh,
                             struct llog_rec_hdr *hdr, void *data)
 {
-       struct llog_agent_req_rec       *larr;
-       struct hsm_cancel_all_data      *hcad;
-       int                              rc = 0;
+       struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+       struct hsm_action_item *hai = &larr->arr_hai;
+       struct mdt_thread_info  *mti = data;
+       struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
+       int rc;
        ENTRY;
 
-       larr = (struct llog_agent_req_rec *)hdr;
-       hcad = data;
-       if (larr->arr_status == ARS_WAITING ||
-           larr->arr_status == ARS_STARTED) {
-               larr->arr_status = ARS_CANCELED;
-               larr->arr_req_change = ktime_get_real_seconds();
-               rc = llog_write(env, llh, hdr, hdr->lrh_index);
+       if (larr->arr_status != ARS_WAITING &&
+           larr->arr_status != ARS_STARTED)
+               RETURN(0);
+
+       /* Unlock the EX layout lock */
+       if (hai->hai_action == HSMA_RESTORE)
+               cdt_restore_handle_del(mti, cdt, &hai->hai_fid);
+
+       larr->arr_status = ARS_CANCELED;
+       larr->arr_req_change = ktime_get_real_seconds();
+       rc = llog_write(env, llh, hdr, hdr->lrh_index);
+       if (rc < 0) {
+               CERROR("%s: cannot update agent log: rc = %d\n",
+                      mdt_obd_name(mti->mti_mdt), rc);
+               rc = LLOG_DEL_RECORD;
        }
 
        RETURN(rc);
@@ -1730,7 +1809,6 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
        struct cdt_agent_req            *car;
        struct hsm_action_list          *hal = NULL;
        struct hsm_action_item          *hai;
-       struct hsm_cancel_all_data       hcad;
        int                              hal_sz = 0, hal_len, rc;
        enum cdt_states                  old_state;
        ENTRY;
@@ -1828,10 +1906,8 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
                OBD_FREE(hal, hal_sz);
 
        /* cancel all on-disk records */
-       hcad.mdt = mdt;
-
        rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
-                             &hcad, 0, 0, WRITE);
+                             (void *)mti, 0, 0, WRITE);
 out_cdt_state:
        /* Enable coordinator, unless the coordinator was stopping. */
        set_cdt_state_locked(cdt, old_state);
@@ -2233,15 +2309,18 @@ ssize_t hsm_control_store(struct kobject *kobj, struct attribute *attr,
                        rc = set_cdt_state(cdt, CDT_RUNNING);
                        mdt_hsm_cdt_event(cdt);
                        wake_up(&cdt->cdt_waitq);
+               } else if (cdt->cdt_state == CDT_RUNNING) {
+                       rc = 0;
                } else {
                        rc = mdt_hsm_cdt_start(mdt);
                }
        } else if (strncmp(buffer, CDT_STOP_CMD, strlen(CDT_STOP_CMD)) == 0) {
-               if ((cdt->cdt_state == CDT_STOPPING) ||
-                   (cdt->cdt_state == CDT_STOPPED)) {
-                       CERROR("%s: Coordinator already stopped\n",
+               if (cdt->cdt_state == CDT_STOPPING) {
+                       CERROR("%s: Coordinator is already stopping\n",
                               mdt_obd_name(mdt));
                        rc = -EALREADY;
+               } else if (cdt->cdt_state == CDT_STOPPED) {
+                       rc = 0;
                } else {
                        rc = mdt_hsm_cdt_stop(mdt);
                }
@@ -2249,9 +2328,10 @@ ssize_t hsm_control_store(struct kobject *kobj, struct attribute *attr,
                           strlen(CDT_DISABLE_CMD)) == 0) {
                if ((cdt->cdt_state == CDT_STOPPING) ||
                    (cdt->cdt_state == CDT_STOPPED)) {
-                       CERROR("%s: Coordinator is stopped\n",
-                              mdt_obd_name(mdt));
-                       rc = -EINVAL;
+                       /* exit gracefully if coordinator is being stopped
+                        * or stopped already.
+                        */
+                       rc = 0;
                } else {
                        rc = set_cdt_state(cdt, CDT_DISABLE);
                }
@@ -2504,7 +2584,7 @@ static ssize_t remove_count_show(struct kobject *kobj, struct attribute *attr,
 }
 LUSTRE_RO_ATTR(remove_count);
 
-static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
+static struct ldebugfs_vars ldebugfs_mdt_hsm_vars[] = {
        { .name =       "agents",
          .fops =       &mdt_hsm_agent_fops                     },
        { .name =       "actions",
@@ -2576,7 +2656,7 @@ int hsm_cdt_tunables_init(struct mdt_device *mdt)
        /* init debugfs entries, failure is not critical */
        cdt->cdt_debugfs_dir = debugfs_create_dir("hsm",
                                                  obd->obd_debugfs_entry);
-       ldebugfs_add_vars(cdt->cdt_debugfs_dir, lprocfs_mdt_hsm_vars, mdt);
+       ldebugfs_add_vars(cdt->cdt_debugfs_dir, ldebugfs_mdt_hsm_vars, mdt);
 
        return 0;
 }