Whamcloud - gitweb
LU-15132 hsm: Protect against parallel HSM restore requests
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
index a1fc6b0..b321d94 100644 (file)
@@ -296,7 +296,7 @@ static int mdt_cdt_waiting_cb(const struct lu_env *env,
                break;
        case HSMA_RESTORE:
                hsd->hsd_one_restore = true;
-               /* Intentional fallthrough */
+               fallthrough;
        default:
                cdt_agent_record_hash_add(cdt, hai->hai_cookie,
                                          llh->lgh_hdr->llh_cat_idx,
@@ -488,6 +488,9 @@ static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
        mutex_lock(&cdt->cdt_restore_lock);
        list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_handle_list,
                                 crh_list) {
+               /* not locked yet, cleanup by cdt_restore_handle_add() */
+               if (crh->crh_lh.mlh_type == MDT_NUL_LOCK)
+                       continue;
                list_del(&crh->crh_list);
                /* give back layout lock */
                mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
@@ -549,7 +552,31 @@ static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state)
        return rc;
 }
 
+static int mdt_hsm_pending_restore(struct mdt_thread_info *mti);
+
+static void cdt_start_pending_restore(struct mdt_device *mdt,
+                                     struct coordinator *cdt)
+{
+       struct mdt_thread_info *cdt_mti;
+       unsigned int i = 0;
+       int rc;
 
+       /* wait until MDD initialize hsm actions llog */
+       while (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state) && i < obd_timeout) {
+               schedule_timeout_interruptible(cfs_time_seconds(1));
+               i++;
+       }
+       if (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state))
+               CWARN("%s: trying to init HSM before MDD\n", mdt_obd_name(mdt));
+
+       /* set up list of started restore requests */
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(cdt_mti);
+       if (rc)
+               CERROR("%s: cannot take the layout locks needed for registered restore: %d\n",
+                      mdt_obd_name(mdt), rc);
+
+}
 
 /**
  * coordinator thread
@@ -579,6 +606,7 @@ static int mdt_coordinator(void *data)
 
        /* Inform mdt_hsm_cdt_start(). */
        wake_up(&cdt->cdt_waitq);
+       cdt_start_pending_restore(mdt, cdt);
 
        while (1) {
                int i;
@@ -741,8 +769,7 @@ static int mdt_coordinator(void *data)
                }
 
                if (update_idx) {
-                       rc = mdt_agent_record_update(mti->mti_env, mdt,
-                                                    updates, update_idx);
+                       rc = mdt_agent_record_update(mti, updates, update_idx);
                        if (rc)
                                CERROR("%s: mdt_agent_record_update() failed, "
                                       "rc=%d, cannot update records "
@@ -777,10 +804,21 @@ clean_cb_alloc:
        RETURN(rc);
 }
 
+/**
+ * register a new HSM restore handle for a file and take EX lock on the layout
+ * \param mti [IN] thread info
+ * \param cdt [IN] coordinator
+ * \param fid [IN] fid of the file to restore
+ * \param he  [IN] HSM extent
+ * \retval 0 success
+ * \retval 1 restore handle already exists for the fid
+ * \retval -ve failure
+ */
 int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
                           const struct lu_fid *fid,
                           const struct hsm_extent *he)
 {
+       struct mdt_lock_handle lh = { 0 };
        struct cdt_restore_handle *crh;
        struct mdt_object *obj;
        int rc;
@@ -797,31 +835,48 @@ int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
         */
        crh->crh_extent.start = 0;
        crh->crh_extent.end = he->length;
+       crh->crh_lh.mlh_type = MDT_NUL_LOCK;
+
+       mutex_lock(&cdt->cdt_restore_lock);
+       if (cdt_restore_handle_find(cdt, fid) != NULL)
+               GOTO(out_crl, rc = 1);
+
+       if (unlikely(cdt->cdt_state == CDT_STOPPED ||
+                    cdt->cdt_state == CDT_STOPPING))
+               GOTO(out_crl, rc = -EAGAIN);
+
+       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
        /* get the layout lock */
-       mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
-       obj = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
+       mdt_lock_reg_init(&lh, LCK_EX);
+       obj = mdt_object_find_lock(mti, &crh->crh_fid, &lh,
                                   MDS_INODELOCK_LAYOUT);
-       if (IS_ERR(obj))
-               GOTO(out_crh, rc = PTR_ERR(obj));
+       if (IS_ERR(obj)) {
+               mutex_lock(&cdt->cdt_restore_lock);
+               GOTO(out_ldel, rc = PTR_ERR(obj));
+       }
 
        /* We do not keep a reference on the object during the restore
-        * which can be very long. */
+        * which can be very long.
+        */
        mdt_object_put(mti->mti_env, obj);
 
        mutex_lock(&cdt->cdt_restore_lock);
        if (unlikely(cdt->cdt_state == CDT_STOPPED ||
-                    cdt->cdt_state == CDT_STOPPING)) {
-               mutex_unlock(&cdt->cdt_restore_lock);
+                    cdt->cdt_state == CDT_STOPPING))
                GOTO(out_lh, rc = -EAGAIN);
-       }
 
-       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       crh->crh_lh = lh;
        mutex_unlock(&cdt->cdt_restore_lock);
 
        RETURN(0);
 out_lh:
        mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
-out_crh:
+out_ldel:
+       list_del(&crh->crh_list);
+out_crl:
+       mutex_unlock(&cdt->cdt_restore_lock);
        OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
 
        return rc;
@@ -929,6 +984,8 @@ static int hsm_restore_cb(const struct lu_env *env,
        }
 
        rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
+       if (rc == 1)
+               rc = 0;
 out:
        RETURN(rc);
 }
@@ -1104,7 +1161,6 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
 {
        struct coordinator *cdt = &mdt->mdt_coordinator;
        struct mdt_thread_info *cdt_mti;
-       unsigned int i = 0;
        int rc;
        void *ptr;
        struct task_struct *task;
@@ -1136,28 +1192,13 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
        cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
 
-       /* wait until MDD initialize hsm actions llog */
-       while (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state) && i < obd_timeout) {
-               schedule_timeout_interruptible(cfs_time_seconds(1));
-               i++;
-       }
-       if (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state))
-               CWARN("%s: trying to init HSM before MDD\n", mdt_obd_name(mdt));
-
        /* to avoid deadlock when start is made through sysfs
         * sysfs entries are created by the coordinator thread
         */
-       /* set up list of started restore requests */
-       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       rc = mdt_hsm_pending_restore(cdt_mti);
-       if (rc)
-               CERROR("%s: cannot take the layout locks needed"
-                      " for registered restore: %d\n",
-                      mdt_obd_name(mdt), rc);
-
        if (mdt->mdt_bottom->dd_rdonly)
                RETURN(0);
 
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
        task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
@@ -1257,8 +1298,7 @@ int mdt_hsm_add_hal(struct mdt_thread_info *mti,
                                .status = ARS_CANCELED,
                        };
 
-                       rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
-                                                    &update, 1);
+                       rc = mdt_agent_record_update(mti, &update, 1);
                        if (rc) {
                                CERROR("%s: mdt_agent_record_update() failed, "
                                       "rc=%d, cannot update status to %s "
@@ -1682,8 +1722,7 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
                update.cookie = pgs->hpk_cookie;
                update.status = status;
 
-               rc1 = mdt_agent_record_update(mti->mti_env, mdt,
-                                             &update, 1);
+               rc1 = mdt_agent_record_update(mti, &update, 1);
                if (rc1)
                        CERROR("%s: mdt_agent_record_update() failed,"
                               " rc=%d, cannot update status to %s"
@@ -1717,20 +1756,12 @@ out:
 
 
 /**
- * data passed to llog_cat_process() callback
- * to cancel requests
- */
-struct hsm_cancel_all_data {
-       struct mdt_device       *mdt;
-};
-
-/**
  *  llog_cat_process() callback, used to:
  *  - purge all requests
  * \param env [IN] environment
  * \param llh [IN] llog handle
  * \param hdr [IN] llog record
- * \param data [IN] cb data = struct hsm_cancel_all_data
+ * \param data [IN] cb data = struct mdt_thread_info
  * \retval 0 success
  * \retval -ve failure
  */
@@ -1738,18 +1769,28 @@ static int mdt_cancel_all_cb(const struct lu_env *env,
                             struct llog_handle *llh,
                             struct llog_rec_hdr *hdr, void *data)
 {
-       struct llog_agent_req_rec       *larr;
-       struct hsm_cancel_all_data      *hcad;
-       int                              rc = 0;
+       struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+       struct hsm_action_item *hai = &larr->arr_hai;
+       struct mdt_thread_info  *mti = data;
+       struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
+       int rc;
        ENTRY;
 
-       larr = (struct llog_agent_req_rec *)hdr;
-       hcad = data;
-       if (larr->arr_status == ARS_WAITING ||
-           larr->arr_status == ARS_STARTED) {
-               larr->arr_status = ARS_CANCELED;
-               larr->arr_req_change = ktime_get_real_seconds();
-               rc = llog_write(env, llh, hdr, hdr->lrh_index);
+       if (larr->arr_status != ARS_WAITING &&
+           larr->arr_status != ARS_STARTED)
+               RETURN(0);
+
+       /* Unlock the EX layout lock */
+       if (hai->hai_action == HSMA_RESTORE)
+               cdt_restore_handle_del(mti, cdt, &hai->hai_fid);
+
+       larr->arr_status = ARS_CANCELED;
+       larr->arr_req_change = ktime_get_real_seconds();
+       rc = llog_write(env, llh, hdr, hdr->lrh_index);
+       if (rc < 0) {
+               CERROR("%s: cannot update agent log: rc = %d\n",
+                      mdt_obd_name(mti->mti_mdt), rc);
+               rc = LLOG_DEL_RECORD;
        }
 
        RETURN(rc);
@@ -1768,7 +1809,6 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
        struct cdt_agent_req            *car;
        struct hsm_action_list          *hal = NULL;
        struct hsm_action_item          *hai;
-       struct hsm_cancel_all_data       hcad;
        int                              hal_sz = 0, hal_len, rc;
        enum cdt_states                  old_state;
        ENTRY;
@@ -1866,10 +1906,8 @@ static int hsm_cancel_all_actions(struct mdt_device *mdt)
                OBD_FREE(hal, hal_sz);
 
        /* cancel all on-disk records */
-       hcad.mdt = mdt;
-
        rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
-                             &hcad, 0, 0, WRITE);
+                             (void *)mti, 0, 0, WRITE);
 out_cdt_state:
        /* Enable coordinator, unless the coordinator was stopping. */
        set_cdt_state_locked(cdt, old_state);
@@ -2290,9 +2328,10 @@ ssize_t hsm_control_store(struct kobject *kobj, struct attribute *attr,
                           strlen(CDT_DISABLE_CMD)) == 0) {
                if ((cdt->cdt_state == CDT_STOPPING) ||
                    (cdt->cdt_state == CDT_STOPPED)) {
-                       CERROR("%s: Coordinator is stopped\n",
-                              mdt_obd_name(mdt));
-                       rc = -EINVAL;
+                       /* exit gracefully if coordinator is being stopped
+                        * or stopped already.
+                        */
+                       rc = 0;
                } else {
                        rc = set_cdt_state(cdt, CDT_DISABLE);
                }