Whamcloud - gitweb
LU-2361 quota: allow upgraded fs to start w/o spc accounting
[fs/lustre-release.git] / lustre / quota / qsd_lib.c
index 277791b..dc4fdae 100644 (file)
@@ -21,7 +21,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
@@ -104,16 +104,26 @@ static int lprocfs_qsd_rd_state(char *page, char **start, off_t off,
                      qsd->qsd_is_md ? "md" : "dt", enabled,
                      qsd->qsd_exp_valid ? "setup" : "not setup yet");
 
-       if (qsd->qsd_prepared)
+       if (qsd->qsd_prepared) {
+               memset(enabled, 0, sizeof(enabled));
+               if (qsd->qsd_type_array[USRQUOTA]->qqi_acct_obj != NULL)
+                       strcat(enabled, "u");
+               if (qsd->qsd_type_array[GRPQUOTA]->qqi_acct_obj != NULL)
+                       strcat(enabled, "g");
+               if (strlen(enabled) == 0)
+                       strcat(enabled, "none");
                rc +=  snprintf(page + rc, count - rc,
+                               "space acct:     %s\n"
                                "user uptodate:  glb[%d],slv[%d],reint[%d]\n"
                                "group uptodate: glb[%d],slv[%d],reint[%d]\n",
+                               enabled,
                                qsd->qsd_type_array[USRQUOTA]->qqi_glb_uptodate,
                                qsd->qsd_type_array[USRQUOTA]->qqi_slv_uptodate,
                                qsd->qsd_type_array[USRQUOTA]->qqi_reint,
                                qsd->qsd_type_array[GRPQUOTA]->qqi_glb_uptodate,
                                qsd->qsd_type_array[GRPQUOTA]->qqi_slv_uptodate,
                                qsd->qsd_type_array[GRPQUOTA]->qqi_reint);
+       }
        return rc;
 }
 
@@ -122,6 +132,7 @@ static int lprocfs_qsd_rd_enabled(char *page, char **start, off_t off,
 {
        struct qsd_instance     *qsd = (struct qsd_instance *)data;
        char                     enabled[5];
+
        LASSERT(qsd != NULL);
 
        memset(enabled, 0, sizeof(enabled));
@@ -135,9 +146,75 @@ static int lprocfs_qsd_rd_enabled(char *page, char **start, off_t off,
        return snprintf(page, count, "%s\n", enabled);
 }
 
+/* force reintegration procedure to be executed.
+ * Used for test/debugging purpose */
+static int lprocfs_qsd_wr_force_reint(struct file *file, const char *buffer,
+                                     unsigned long count, void *data)
+{
+       struct qsd_instance     *qsd = (struct qsd_instance *)data;
+       int                      rc = 0, qtype;
+
+       LASSERT(qsd != NULL);
+
+       write_lock(&qsd->qsd_lock);
+       if (qsd->qsd_stopping) {
+               /* don't mess up with shutdown procedure, it is already
+                * complicated enough */
+               rc = -ESHUTDOWN;
+       } else if (!qsd->qsd_prepared) {
+               rc = -EAGAIN;
+       } else {
+               /* mark all indexes as stale */
+               for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+                       qsd->qsd_type_array[qtype]->qqi_glb_uptodate = false;
+                       qsd->qsd_type_array[qtype]->qqi_slv_uptodate = false;
+               }
+       }
+       write_unlock(&qsd->qsd_lock);
+
+       if (rc)
+               return rc;
+
+       /* kick off reintegration */
+       for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+               rc = qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
+               if (rc)
+                       break;
+       }
+       return rc == 0 ? count : rc;
+}
+
+static int lprocfs_qsd_rd_timeout(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+       struct qsd_instance     *qsd = (struct qsd_instance *)data;
+       LASSERT(qsd != NULL);
+
+       return snprintf(page, count, "%d\n", qsd_wait_timeout(qsd));
+}
+
+static int lprocfs_qsd_wr_timeout(struct file *file, const char *buffer,
+                                 unsigned long count, void *data)
+{
+       struct qsd_instance     *qsd = (struct qsd_instance *)data;
+       int                      timeout, rc;
+       LASSERT(qsd != NULL);
+
+       rc = lprocfs_write_helper(buffer, count, &timeout);
+       if (rc)
+               return rc;
+       if (timeout < 0)
+               return -EINVAL;
+
+       qsd->qsd_timeout = timeout;
+       return count;
+}
+
 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
        { "info", lprocfs_qsd_rd_state, 0, 0},
        { "enabled", lprocfs_qsd_rd_enabled, 0, 0},
+       { "force_reint", 0, lprocfs_qsd_wr_force_reint, 0},
+       { "timeout", lprocfs_qsd_rd_timeout, lprocfs_qsd_wr_timeout, 0},
        { NULL }
 };
 
@@ -162,10 +239,10 @@ static int qsd_conn_callback(void *data)
        ldlm_namespace_get(class_exp2obd(qsd->qsd_exp)->obd_namespace);
        qsd->qsd_ns = class_exp2obd(qsd->qsd_exp)->obd_namespace;
 
-       cfs_write_lock(&qsd->qsd_lock);
+       write_lock(&qsd->qsd_lock);
        /* notify that qsd_exp is now valid */
        qsd->qsd_exp_valid = true;
-       cfs_write_unlock(&qsd->qsd_lock);
+       write_unlock(&qsd->qsd_lock);
 
        /* Now that the connection to master is setup, we can initiate the
         * reintegration procedure for quota types which are enabled.
@@ -290,12 +367,14 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
        /* open accounting object */
        LASSERT(qqi->qqi_acct_obj == NULL);
        qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev, qtype);
-       if (qqi->qqi_acct_obj == NULL) {
-               LCONSOLE_ERROR("%s: No %s space accounting support. Please use "
-                              "tunefs.lustre --quota option to enable quota "
-                              "accounting.\n",
-                              qsd->qsd_svname, QTYPE_NAME(qtype));
-               GOTO(out, rc = -ENOENT);
+       if (IS_ERR(qqi->qqi_acct_obj)) {
+               LCONSOLE_WARN("%s: No %s space accounting support. Please "
+                             "consider running tunefs.lustre --quota on an "
+                             "unmounted filesystem to enable quota accounting."
+                             "\n", qsd->qsd_svname,
+                             QTYPE_NAME(qtype));
+               qqi->qqi_acct_obj = NULL;
+               qsd->qsd_acct_failed = true;
        }
 
        /* open global index copy */
@@ -334,7 +413,7 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
                GOTO(out, rc = PTR_ERR(qqi->qqi_site));
        }
 
-       /* register proc entry for accounting object */
+       /* register proc entry for accounting & global index copy objects */
        rc = lprocfs_seq_create(qsd->qsd_proc,
                                qtype == USRQUOTA ? "acct_user" : "acct_group",
                                0444, &lprocfs_quota_seq_fops,
@@ -344,6 +423,16 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
                       qsd->qsd_svname, rc);
                GOTO(out, rc);
        }
+
+       rc = lprocfs_seq_create(qsd->qsd_proc,
+                               qtype == USRQUOTA ? "limit_user" : "limit_group",
+                               0444, &lprocfs_quota_seq_fops,
+                               qqi->qqi_glb_obj);
+       if (rc) {
+               CERROR("%s: can't add procfs entry for global index copy %d\n",
+                      qsd->qsd_svname, rc);
+               GOTO(out, rc);
+       }
        EXIT;
 out:
        if (rc)
@@ -365,17 +454,20 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
        int     qtype;
        ENTRY;
 
+       if (unlikely(qsd == NULL))
+               RETURN_EXIT;
+
        CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname);
-       cfs_write_lock(&qsd->qsd_lock);
+       write_lock(&qsd->qsd_lock);
        qsd->qsd_stopping = true;
-       cfs_write_unlock(&qsd->qsd_lock);
+       write_unlock(&qsd->qsd_lock);
 
        /* remove from the list of fsinfo */
        if (!cfs_list_empty(&qsd->qsd_link)) {
                LASSERT(qsd->qsd_fsinfo != NULL);
-               cfs_down(&qsd->qsd_fsinfo->qfs_sem);
+               down(&qsd->qsd_fsinfo->qfs_sem);
                cfs_list_del_init(&qsd->qsd_link);
-               cfs_up(&qsd->qsd_fsinfo->qfs_sem);
+               up(&qsd->qsd_fsinfo->qfs_sem);
        }
 
        /* remove qsd proc entry */
@@ -464,12 +556,12 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
                RETURN(ERR_PTR(-ENOMEM));
 
        /* generic initializations */
-       cfs_rwlock_init(&qsd->qsd_lock);
+       rwlock_init(&qsd->qsd_lock);
        CFS_INIT_LIST_HEAD(&qsd->qsd_link);
        thread_set_flags(&qsd->qsd_upd_thread, SVC_STOPPED);
        cfs_waitq_init(&qsd->qsd_upd_thread.t_ctl_waitq);
        CFS_INIT_LIST_HEAD(&qsd->qsd_upd_list);
-       cfs_spin_lock_init(&qsd->qsd_adjust_lock);
+       spin_lock_init(&qsd->qsd_adjust_lock);
        CFS_INIT_LIST_HEAD(&qsd->qsd_adjust_list);
        qsd->qsd_prepared = false;
        qsd->qsd_started = false;
@@ -502,9 +594,9 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
        }
 
        /* add in the list of lquota_fsinfo */
-       cfs_down(&qsd->qsd_fsinfo->qfs_sem);
+       down(&qsd->qsd_fsinfo->qfs_sem);
        list_add_tail(&qsd->qsd_link, &qsd->qsd_fsinfo->qfs_qsd_list);
-       cfs_up(&qsd->qsd_fsinfo->qfs_sem);
+       up(&qsd->qsd_fsinfo->qfs_sem);
 
        /* register procfs directory */
        qsd->qsd_proc = lprocfs_register(QSD_DIR, osd_proc,
@@ -547,14 +639,15 @@ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
        int                      qtype, rc = 0;
        ENTRY;
 
-       LASSERT(qsd != NULL);
+       if (unlikely(qsd == NULL))
+               RETURN(0);
 
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (qsd->qsd_prepared) {
                CERROR("%s: qsd instance already prepared\n", qsd->qsd_svname);
                rc = -EALREADY;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
        if (rc)
                RETURN(rc);
 
@@ -586,14 +679,23 @@ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
        }
 
        /* pools successfully setup, mark the qsd as prepared */
-       cfs_write_lock(&qsd->qsd_lock);
+       write_lock(&qsd->qsd_lock);
        qsd->qsd_prepared = true;
-       cfs_write_unlock(&qsd->qsd_lock);
+       write_unlock(&qsd->qsd_lock);
 
        /* start reintegration thread for each type, if required */
        for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
                struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
 
+               if (qsd_type_enabled(qsd, qtype) && qsd->qsd_acct_failed) {
+                       LCONSOLE_ERROR("%s: can't enable quota enforcement "
+                                      "since space accounting isn't functional"
+                                      ". Please run tunefs.lustre --quota on "
+                                      "an unmounted filesystem if not done "
+                                      "already\n", qsd->qsd_svname);
+                       break;
+               }
+
                rc = qsd_start_reint_thread(qqi);
                if (rc) {
                        CERROR("%s: failed to start reint thread for type %s "
@@ -647,9 +749,12 @@ int qsd_start(const struct lu_env *env, struct qsd_instance *qsd)
        int     type, rc = 0;
        ENTRY;
 
-       cfs_write_lock(&qsd->qsd_lock);
+       if (unlikely(qsd == NULL))
+               RETURN(0);
+
+       write_lock(&qsd->qsd_lock);
        if (!qsd->qsd_prepared) {
-               CERROR("%s: can't start qsd instance since it was properly "
+               CERROR("%s: can't start qsd instance since it wasn't properly "
                       "initialized\n", qsd->qsd_svname);
                rc = -EFAULT;
        } else if (qsd->qsd_started) {
@@ -659,7 +764,7 @@ int qsd_start(const struct lu_env *env, struct qsd_instance *qsd)
                /* notify that the qsd_instance is now started */
                qsd->qsd_started = true;
        }
-       cfs_write_unlock(&qsd->qsd_lock);
+       write_unlock(&qsd->qsd_lock);
 
        if (rc)
                RETURN(rc);