Whamcloud - gitweb
LU-7988 hsm: run HSM coordinator once per second at most
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
index 3e43a16..bf04033 100644 (file)
@@ -144,6 +144,10 @@ struct hsm_scan_data {
        struct hsm_scan_request         *request;
 };
 
+struct hsm_thread_data {
+       struct mdt_thread_info  *cdt_mti;
+       struct hsm_scan_request *request;
+};
 /**
  *  llog_cat_process() callback, used to:
  *  - find waiting request and start action
@@ -457,10 +461,10 @@ static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
 static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
        /* from -> to:    stopped init   running disable stopping */
        /* stopped */   { true,   true,  false,  false,  false },
-       /* init */      { true,   false, true,   false,  true },
+       /* init */      { true,   false, true,   false,  false },
        /* running */   { false,  false, true,   true,   true },
        /* disable */   { false,  false, true,   true,   true },
-       /* stopping */  { true,   false, false,  false,  true }
+       /* stopping */  { true,   false, false,  false,  false }
 };
 
 /**
@@ -505,26 +509,24 @@ static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
  */
 static int mdt_coordinator(void *data)
 {
-       struct mdt_thread_info  *mti = data;
+       struct hsm_thread_data  *thread_data = data;
+       struct mdt_thread_info  *mti = thread_data->cdt_mti;
        struct mdt_device       *mdt = mti->mti_mdt;
        struct coordinator      *cdt = &mdt->mdt_coordinator;
        struct hsm_scan_data     hsd = { NULL };
+       time64_t                 wait_event_time = 1 * HZ;
+       time64_t                 last_housekeeping = 0;
        int                      rc = 0;
        int                      request_sz;
        ENTRY;
 
-       CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
-              mdt_obd_name(mdt), current_pid());
-
-       /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
-        * increases due to a change from /proc we do not overflow the
-        * hsd.request[] vector
-        */
+       /* set up hsd->request and max_requests */
        hsd.max_requests = cdt->cdt_max_requests;
        request_sz = hsd.max_requests * sizeof(*hsd.request);
-       OBD_ALLOC_LARGE(hsd.request, request_sz);
-       if (!hsd.request)
-               GOTO(out, rc = -ENOMEM);
+       hsd.request = thread_data->request;
+
+       CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
+              mdt_obd_name(mdt), current_pid());
 
        hsd.mti = mti;
        obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
@@ -535,43 +537,60 @@ static int mdt_coordinator(void *data)
        wake_up_all(&cdt->cdt_waitq);
 
        while (1) {
-               struct l_wait_info lwi;
                int i;
 
-               lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
-                                 NULL, NULL);
-               l_wait_event(cdt->cdt_waitq,
-                            cdt->cdt_event || (cdt->cdt_state == CDT_STOPPING),
-                            &lwi);
+               /* Limit execution of the expensive requests traversal
+                * to at most every "wait_event_time" jiffies. This prevents
+                * repeatedly locking/unlocking the catalog for each request
+                * and preventing other HSM operations from happening */
+               wait_event_interruptible_timeout(cdt->cdt_waitq,
+                                                kthread_should_stop(),
+                                                wait_event_time);
 
                CDEBUG(D_HSM, "coordinator resumes\n");
 
-               if (cdt->cdt_state == CDT_STOPPING) {
+               if (kthread_should_stop()) {
+                       CDEBUG(D_HSM, "Coordinator stops\n");
                        rc = 0;
                        break;
                }
 
-               cdt->cdt_event = false;
-
                /* if coordinator is suspended continue to wait */
                if (cdt->cdt_state == CDT_DISABLE) {
                        CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
                        continue;
                }
 
+               /* If no event, and no housekeeping to do, continue to
+                * wait. */
+               if (last_housekeeping + cdt->cdt_loop_period <= get_seconds())
+                       last_housekeeping = get_seconds();
+               else if (!cdt->cdt_event)
+                       continue;
+
+               cdt->cdt_event = false;
+
                CDEBUG(D_HSM, "coordinator starts reading llog\n");
 
                if (hsd.max_requests != cdt->cdt_max_requests) {
                        /* cdt_max_requests has changed,
                         * we need to allocate a new buffer
                         */
-                       OBD_FREE_LARGE(hsd.request, request_sz);
-                       hsd.max_requests = cdt->cdt_max_requests;
-                       request_sz = hsd.max_requests * sizeof(*hsd.request);
-                       OBD_ALLOC_LARGE(hsd.request, request_sz);
-                       if (!hsd.request) {
-                               rc = -ENOMEM;
-                               break;
+                       struct hsm_scan_request *tmp = NULL;
+                       int max_requests = cdt->cdt_max_requests;
+                       OBD_ALLOC_LARGE(tmp, max_requests *
+                                       sizeof(struct hsm_scan_request));
+                       if (!tmp) {
+                               CERROR("Failed to resize request buffer, "
+                                      "keeping it at %d\n",
+                                      hsd.max_requests);
+                               cdt->cdt_max_requests = hsd.max_requests;
+                       } else {
+                               OBD_FREE_LARGE(hsd.request, request_sz);
+                               hsd.max_requests = max_requests;
+                               request_sz = hsd.max_requests *
+                                       sizeof(struct hsm_scan_request);
+                               hsd.request = tmp;
                        }
                }
 
@@ -645,18 +664,12 @@ clean_cb_alloc:
                        OBD_FREE(request->hal, request->hal_sz);
                }
        }
-       EXIT;
-out:
-       set_cdt_state(cdt, CDT_STOPPING, NULL);
 
        if (hsd.request)
                OBD_FREE_LARGE(hsd.request, request_sz);
 
        mdt_hsm_cdt_cleanup(mdt);
 
-       set_cdt_state(cdt, CDT_STOPPED, NULL);
-       wake_up_all(&cdt->cdt_waitq);
-
        if (rc != 0)
                CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
                       mdt_obd_name(mdt), current_pid(), rc);
@@ -665,7 +678,7 @@ out:
                              " no error\n",
                       mdt_obd_name(mdt), current_pid());
 
-       return rc;
+       RETURN(rc);
 }
 
 /**
@@ -793,7 +806,7 @@ static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
        hrd.hrd_mti = mti;
 
        rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
-                             0, 0, READ);
+                             0, 0, WRITE);
 
        RETURN(rc);
 }
@@ -822,27 +835,6 @@ static int hsm_init_ucred(struct lu_ucred *uc)
 }
 
 /**
- * wake up coordinator thread
- * \param mdt [IN] device
- * \retval 0 success
- * \retval -ve failure
- */
-int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
-{
-       struct coordinator      *cdt = &mdt->mdt_coordinator;
-       ENTRY;
-
-       if (cdt->cdt_state == CDT_STOPPED)
-               RETURN(-ESRCH);
-
-       /* wake up coordinator */
-       cdt->cdt_event = true;
-       wake_up_all(&cdt->cdt_waitq);
-
-       RETURN(0);
-}
-
-/**
  * initialize coordinator struct
  * \param mdt [IN] device
  * \retval 0 success
@@ -861,7 +853,7 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
        init_rwsem(&cdt->cdt_request_lock);
        mutex_init(&cdt->cdt_restore_lock);
        spin_lock_init(&cdt->cdt_state_lock);
-       cdt->cdt_state = CDT_STOPPED;
+       set_cdt_state(cdt, CDT_STOPPED, NULL);
 
        INIT_LIST_HEAD(&cdt->cdt_request_list);
        INIT_LIST_HEAD(&cdt->cdt_agents);
@@ -920,6 +912,13 @@ int mdt_hsm_cdt_init(struct mdt_device *mdt)
        cdt->cdt_policy = CDT_DEFAULT_POLICY;
        cdt->cdt_active_req_timeout = 3600;
 
+       /* Initialize cdt_compound_id here to allow its usage for
+        * delayed requests from RAoLU policy */
+       atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
+
+       /* by default do not remove archives on last unlink */
+       cdt->cdt_remove_archive_on_last_unlink = false;
+
        RETURN(0);
 
 out_env:
@@ -968,8 +967,9 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        struct coordinator      *cdt = &mdt->mdt_coordinator;
        int                      rc;
        void                    *ptr;
-       struct mdt_thread_info  *cdt_mti;
        struct task_struct      *task;
+       int                      request_sz;
+       struct hsm_thread_data   thread_data;
        ENTRY;
 
        /* functions defined but not yet used
@@ -987,7 +987,6 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        CLASSERT(1 << (CDT_POLICY_SHIFT_COUNT - 1) == CDT_POLICY_LAST);
        cdt->cdt_policy = CDT_DEFAULT_POLICY;
 
-       atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
        /* just need to be larger than previous one */
        /* cdt_last_cookie is protected by cdt_llog_lock */
        cdt->cdt_last_cookie = cfs_time_current_sec();
@@ -1000,8 +999,9 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
         * /proc entries are created by the coordinator thread */
 
        /* set up list of started restore requests */
-       cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
-       rc = mdt_hsm_pending_restore(cdt_mti);
+       thread_data.cdt_mti =
+               lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
+       rc = mdt_hsm_pending_restore(thread_data.cdt_mti);
        if (rc)
                CERROR("%s: cannot take the layout locks needed"
                       " for registered restore: %d\n",
@@ -1010,25 +1010,28 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
        if (mdt->mdt_bottom->dd_rdonly)
                RETURN(0);
 
-       task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
+       /* Allocate the initial hsd.request[] vector*/
+       request_sz = cdt->cdt_max_requests * sizeof(struct hsm_scan_request);
+       OBD_ALLOC_LARGE(thread_data.request, request_sz);
+       if (!thread_data.request) {
+               set_cdt_state(cdt, CDT_STOPPED, NULL);
+               RETURN(-ENOMEM);
+       }
+
+       task = kthread_run(mdt_coordinator, &thread_data, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
                set_cdt_state(cdt, CDT_STOPPED, NULL);
+               OBD_FREE(thread_data.request, request_sz);
                CERROR("%s: error starting coordinator thread: %d\n",
                       mdt_obd_name(mdt), rc);
        } else {
+               cdt->cdt_task = task;
                wait_event(cdt->cdt_waitq,
                           cdt->cdt_state != CDT_INIT);
-               if (cdt->cdt_state == CDT_RUNNING) {
-                       CDEBUG(D_HSM, "%s: coordinator thread started\n",
-                              mdt_obd_name(mdt));
-                       rc = 0;
-               } else {
-                       CDEBUG(D_HSM,
-                              "%s: coordinator thread failed to start\n",
-                              mdt_obd_name(mdt));
-                       rc = -EINVAL;
-               }
+               CDEBUG(D_HSM, "%s: coordinator thread started\n",
+                      mdt_obd_name(mdt));
+               rc = 0;
        }
 
        RETURN(rc);
@@ -1046,11 +1049,11 @@ int mdt_hsm_cdt_stop(struct mdt_device *mdt)
        ENTRY;
        /* stop coordinator thread */
        rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
-       if (rc != 0)
-               RETURN(rc);
-
-       wake_up_all(&cdt->cdt_waitq);
-       wait_event(cdt->cdt_waitq, cdt->cdt_state != CDT_STOPPING);
+       if (rc == 0) {
+               kthread_stop(cdt->cdt_task);
+               cdt->cdt_task = NULL;
+               set_cdt_state(cdt, CDT_STOPPED, NULL);
+       }
 
        RETURN(rc);
 }
@@ -1552,9 +1555,9 @@ int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
                /* then remove request from memory list (LU-9075) */
                mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
 
-               /* ct has completed a request, so a slot is available, wakeup
-                * cdt to find new work */
-               mdt_hsm_cdt_wakeup(mdt);
+               /* ct has completed a request, so a slot is available,
+                * signal the coordinator to find new work */
+               mdt_hsm_cdt_event(cdt);
        } else {
                /* if copytool send a progress on a canceled request
                 * we inform copytool it should stop
@@ -2026,7 +2029,8 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
        if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
                if (cdt->cdt_state == CDT_DISABLE) {
                        rc = set_cdt_state(cdt, CDT_RUNNING, NULL);
-                       mdt_hsm_cdt_wakeup(mdt);
+                       mdt_hsm_cdt_event(cdt);
+                       wake_up(&cdt->cdt_waitq);
                } else {
                        rc = mdt_hsm_cdt_start(mdt);
                }
@@ -2038,7 +2042,6 @@ mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
                        rc = -EALREADY;
                } else {
                        rc = mdt_hsm_cdt_stop(mdt);
-                       mdt_hsm_cdt_wakeup(mdt);
                }
        } else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
                if ((cdt->cdt_state == CDT_STOPPING) ||
@@ -2228,6 +2231,36 @@ mdt_hsm_other_request_mask_seq_write(struct file *file, const char __user *buf,
                                           &cdt->cdt_other_request_mask);
 }
 
+static int mdt_hsm_cdt_raolu_seq_show(struct seq_file *m, void *data)
+{
+       struct mdt_device *mdt = m->private;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       ENTRY;
+
+       seq_printf(m, "%d\n", (int)cdt->cdt_remove_archive_on_last_unlink);
+       RETURN(0);
+}
+
+static ssize_t
+mdt_hsm_cdt_raolu_seq_write(struct file *file, const char __user *buffer,
+                           size_t count, loff_t *off)
+
+{
+       struct seq_file *m = file->private_data;
+       struct mdt_device *mdt = m->private;
+       struct coordinator *cdt = &mdt->mdt_coordinator;
+       __s64 val;
+       int rc;
+       ENTRY;
+
+       rc = lprocfs_str_to_s64(buffer, count, &val);
+       if (rc < 0)
+               RETURN(rc);
+
+       cdt->cdt_remove_archive_on_last_unlink = val;
+       RETURN(count);
+}
+
 LPROC_SEQ_FOPS(mdt_hsm_cdt_loop_period);
 LPROC_SEQ_FOPS(mdt_hsm_cdt_grace_delay);
 LPROC_SEQ_FOPS(mdt_hsm_cdt_active_req_timeout);
@@ -2236,6 +2269,7 @@ LPROC_SEQ_FOPS(mdt_hsm_cdt_default_archive_id);
 LPROC_SEQ_FOPS(mdt_hsm_user_request_mask);
 LPROC_SEQ_FOPS(mdt_hsm_group_request_mask);
 LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
+LPROC_SEQ_FOPS(mdt_hsm_cdt_raolu);
 
 static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
        { .name =       "agents",
@@ -2263,5 +2297,7 @@ static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
          .fops =       &mdt_hsm_group_request_mask_fops,       },
        { .name =       "other_request_mask",
          .fops =       &mdt_hsm_other_request_mask_fops,       },
+       { .name =       "remove_archive_on_last_unlink",
+         .fops =       &mdt_hsm_cdt_raolu_fops,                },
        { 0 }
 };