Whamcloud - gitweb
LU-15132 hsm: Protect against parallel HSM restore requests
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
index 87296cb..b321d94 100644 (file)
@@ -296,7 +296,7 @@ static int mdt_cdt_waiting_cb(const struct lu_env *env,
                break;
        case HSMA_RESTORE:
                hsd->hsd_one_restore = true;
-               /* Intentional fallthrough */
+               fallthrough;
        default:
                cdt_agent_record_hash_add(cdt, hai->hai_cookie,
                                          llh->lgh_hdr->llh_cat_idx,
@@ -488,6 +488,9 @@ static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
        mutex_lock(&cdt->cdt_restore_lock);
        list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_handle_list,
                                 crh_list) {
+               /* not locked yet, cleanup by cdt_restore_handle_add() */
+               if (crh->crh_lh.mlh_type == MDT_NUL_LOCK)
+                       continue;
                list_del(&crh->crh_list);
                /* give back layout lock */
                mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
@@ -549,7 +552,31 @@ static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state)
        return rc;
 }
 
+static int mdt_hsm_pending_restore(struct mdt_thread_info *mti);
+
+static void cdt_start_pending_restore(struct mdt_device *mdt,
+                                     struct coordinator *cdt)
+{
+       struct mdt_thread_info *cdt_mti;
+       unsigned int i = 0;
+       int rc;
 
+       /* wait until MDD initialize hsm actions llog */
+       while (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state) && i < obd_timeout) {
+               schedule_timeout_interruptible(cfs_time_seconds(1));
+               i++;
+       }
+       if (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state))
+               CWARN("%s: trying to init HSM before MDD\n", mdt_obd_name(mdt));
+
+       /* set up list of started restore requests */
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(cdt_mti);
+       if (rc)
+               CERROR("%s: cannot take the layout locks needed for registered restore: %d\n",
+                      mdt_obd_name(mdt), rc);
+
+}
 
 /**
  * coordinator thread
@@ -579,6 +606,7 @@ static int mdt_coordinator(void *data)
 
        /* Inform mdt_hsm_cdt_start(). */
        wake_up(&cdt->cdt_waitq);
+       cdt_start_pending_restore(mdt, cdt);
 
        while (1) {
                int i;
@@ -776,10 +804,21 @@ clean_cb_alloc:
        RETURN(rc);
 }
 
+/**
+ * register a new HSM restore handle for a file and take EX lock on the layout
+ * \param mti [IN] thread info
+ * \param cdt [IN] coordinator
+ * \param fid [IN] fid of the file to restore
+ * \param he  [IN] HSM extent
+ * \retval 0 success
+ * \retval 1 restore handle already exists for the fid
+ * \retval -ve failure
+ */
 int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
                           const struct lu_fid *fid,
                           const struct hsm_extent *he)
 {
+       struct mdt_lock_handle lh = { 0 };
        struct cdt_restore_handle *crh;
        struct mdt_object *obj;
        int rc;
@@ -796,31 +835,48 @@ int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
         */
        crh->crh_extent.start = 0;
        crh->crh_extent.end = he->length;
+       crh->crh_lh.mlh_type = MDT_NUL_LOCK;
+
+       mutex_lock(&cdt->cdt_restore_lock);
+       if (cdt_restore_handle_find(cdt, fid) != NULL)
+               GOTO(out_crl, rc = 1);
+
+       if (unlikely(cdt->cdt_state == CDT_STOPPED ||
+                    cdt->cdt_state == CDT_STOPPING))
+               GOTO(out_crl, rc = -EAGAIN);
+
+       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       mutex_unlock(&cdt->cdt_restore_lock);
+
        /* get the layout lock */
-       mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
-       obj = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
+       mdt_lock_reg_init(&lh, LCK_EX);
+       obj = mdt_object_find_lock(mti, &crh->crh_fid, &lh,
                                   MDS_INODELOCK_LAYOUT);
-       if (IS_ERR(obj))
-               GOTO(out_crh, rc = PTR_ERR(obj));
+       if (IS_ERR(obj)) {
+               mutex_lock(&cdt->cdt_restore_lock);
+               GOTO(out_ldel, rc = PTR_ERR(obj));
+       }
 
        /* We do not keep a reference on the object during the restore
-        * which can be very long. */
+        * which can be very long.
+        */
        mdt_object_put(mti->mti_env, obj);
 
        mutex_lock(&cdt->cdt_restore_lock);
        if (unlikely(cdt->cdt_state == CDT_STOPPED ||
-                    cdt->cdt_state == CDT_STOPPING)) {
-               mutex_unlock(&cdt->cdt_restore_lock);
+                    cdt->cdt_state == CDT_STOPPING))
                GOTO(out_lh, rc = -EAGAIN);
-       }
 
-       list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
+       crh->crh_lh = lh;
        mutex_unlock(&cdt->cdt_restore_lock);
 
        RETURN(0);
 out_lh:
        mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
-out_crh:
+out_ldel:
+       list_del(&crh->crh_list);
+out_crl:
+       mutex_unlock(&cdt->cdt_restore_lock);
        OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
 
        return rc;
@@ -928,6 +984,8 @@ static int hsm_restore_cb(const struct lu_env *env,
        }
 
        rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
+       if (rc == 1)
+               rc = 0;
 out:
        RETURN(rc);
 }
@@ -1103,7 +1161,6 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
 {
        struct coordinator *cdt = &mdt->mdt_coordinator;
        struct mdt_thread_info *cdt_mti;
-       unsigned int i = 0;
        int rc;
        void *ptr;
        struct task_struct *task;
@@ -1135,28 +1192,13 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
        cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
 
-       /* wait until MDD initialize hsm actions llog */
-       while (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state) && i < obd_timeout) {
-               schedule_timeout_interruptible(cfs_time_seconds(1));
-               i++;
-       }
-       if (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state))
-               CWARN("%s: trying to init HSM before MDD\n", mdt_obd_name(mdt));
-
        /* to avoid deadlock when start is made through sysfs
         * sysfs entries are created by the coordinator thread
         */
-       /* set up list of started restore requests */
-       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       rc = mdt_hsm_pending_restore(cdt_mti);
-       if (rc)
-               CERROR("%s: cannot take the layout locks needed"
-                      " for registered restore: %d\n",
-                      mdt_obd_name(mdt), rc);
-
        if (mdt->mdt_bottom->dd_rdonly)
                RETURN(0);
 
+       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
        task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
@@ -2286,9 +2328,10 @@ ssize_t hsm_control_store(struct kobject *kobj, struct attribute *attr,
                           strlen(CDT_DISABLE_CMD)) == 0) {
                if ((cdt->cdt_state == CDT_STOPPING) ||
                    (cdt->cdt_state == CDT_STOPPED)) {
-                       CERROR("%s: Coordinator is stopped\n",
-                              mdt_obd_name(mdt));
-                       rc = -EINVAL;
+                       /* exit gracefully if coordinator is being stopped
+                        * or stopped already.
+                        */
+                       rc = 0;
                } else {
                        rc = set_cdt_state(cdt, CDT_DISABLE);
                }