Whamcloud - gitweb
LU-11775 obdclass: protect imp_sec using rwlock_t
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
index be80dce..cca59f6 100644 (file)
@@ -21,7 +21,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
@@ -140,7 +140,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        /* on success, pack lvb in reply */
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
        lvb_len = ldlm_lvbo_size(*lockp);
-       lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+       lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
        if (lvb_len < 0)
                GOTO(out, rc = lvb_len);
 
@@ -173,14 +173,8 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
            res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
                RETURN(0);
 
-       OBD_ALLOC_PTR(env);
-       if (env == NULL)
-               RETURN(-ENOMEM);
-
-       /* initialize environment */
-       rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc != 0)
-               GOTO(out_free, rc);
+       env = lu_env_find();
+       LASSERT(env);
        qti = qmt_info(env);
 
        /* extract global index FID and quota identifier */
@@ -216,7 +210,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
                if (IS_ERR(obj))
                        GOTO(out, rc = PTR_ERR(obj));
                if (!dt_object_exists(obj)) {
-                       lu_object_put(env, &obj->do_lu);
+                       dt_object_put(env, obj);
                        GOTO(out, rc = -ENOENT);
                }
 
@@ -228,9 +222,6 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
        res->lr_lvb_len = sizeof(struct lquota_lvb);
        EXIT;
 out:
-       lu_env_fini(env);
-out_free:
-       OBD_FREE_PTR(env);
        return rc;
 }
 
@@ -281,18 +272,12 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                if (lvb->lvb_id_may_rel != 0)
                        /* but might still release later ... */
                        lqe->lqe_may_rel += lvb->lvb_id_may_rel;
-               GOTO(out_lqe, rc = 0);
+               GOTO(out, rc = 0);
        }
 
        /* allocate environement */
-       OBD_ALLOC_PTR(env);
-       if (env == NULL)
-               GOTO(out_lqe, rc = -ENOMEM);
-
-       /* initialize environment */
-       rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc)
-               GOTO(out_env, rc);
+       env = lu_env_find();
+       LASSERT(env);
        qti = qmt_info(env);
 
        /* The request is a glimpse callback which was sent via the
@@ -306,14 +291,14 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        if (IS_ERR(lock)) {
                CERROR("%s: failed to get lock from request!\n",
                       qmt->qmt_svname);
-               GOTO(out_env_init, rc = PTR_ERR(lock));
+               GOTO(out, rc = PTR_ERR(lock));
        }
 
        exp = class_export_get(lock->l_export);
        if (exp == NULL) {
                CERROR("%s: failed to get export from lock!\n",
                       qmt->qmt_svname);
-               GOTO(out_env_init, rc = -EFAULT);
+               GOTO(out, rc = -EFAULT);
        }
 
        /* release quota space */
@@ -325,13 +310,9 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                             lvb->lvb_id_rel, rc);
        class_export_put(exp);
        if (rc)
-               GOTO(out_env_init, rc);
+               GOTO(out, rc);
        EXIT;
-out_env_init:
-       lu_env_fini(env);
-out_env:
-       OBD_FREE_PTR(env);
-out_lqe:
+out:
        lqe_putref(lqe);
        return rc;
 }
@@ -376,26 +357,10 @@ int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
                lqe_putref(lqe);
        } else {
                /* global quota lock */
-               struct lu_env           *env;
-               int                      rc;
-               struct dt_object        *obj = res->lr_lvb_data;
-
-               OBD_ALLOC_PTR(env);
-               if (env == NULL)
-                       RETURN(-ENOMEM);
-
-               /* initialize environment */
-               rc = lu_env_init(env, LCT_LOCAL);
-               if (rc) {
-                       OBD_FREE_PTR(env);
-                       RETURN(rc);
-               }
+               struct dt_object *obj = res->lr_lvb_data;
 
                /* return current version of global index */
-               qlvb->lvb_glb_ver = dt_version_get(env, obj);
-
-               lu_env_fini(env);
-               OBD_FREE_PTR(env);
+               qlvb->lvb_glb_ver = dt_version_get(lu_env_find(), obj);
        }
 
        RETURN(sizeof(struct lquota_lvb));
@@ -419,36 +384,103 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
                /* release lqe reference */
                lqe_putref(lqe);
        } else {
-               struct dt_object        *obj = res->lr_lvb_data;
-               struct lu_env           *env;
-               int                      rc;
+               struct dt_object *obj = res->lr_lvb_data;
+               /* release object reference */
+               dt_object_put(lu_env_find(), obj);
+       }
 
-               OBD_ALLOC_PTR(env);
-               if (env == NULL)
-                       RETURN(-ENOMEM);
+       res->lr_lvb_data = NULL;
+       res->lr_lvb_len  = 0;
+
+       RETURN(0);
+}
+
+typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *);
+
+struct qmt_gl_lock_array {
+       unsigned long             q_max;
+       unsigned long             q_cnt;
+       struct ldlm_lock        **q_locks;
+};
+
+static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
+{
+       int i;
+
+       if (array->q_max == 0) {
+               LASSERT(array->q_locks == NULL);
+               return;
+       }
+
+       for (i = 0; i < array->q_cnt; i++) {
+               LASSERT(array->q_locks[i]);
+               LDLM_LOCK_RELEASE(array->q_locks[i]);
+               array->q_locks[i] = NULL;
+       }
+       array->q_cnt = 0;
+       OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks));
+       array->q_locks = NULL;
+       array->q_max = 0;
+}
+
+static int qmt_alloc_lock_array(struct ldlm_resource *res,
+                               struct qmt_gl_lock_array *array,
+                               qmt_glimpse_cb_t cb, void *arg)
+{
+       struct list_head *pos;
+       unsigned long count = 0;
+       int fail_cnt = 0;
+       ENTRY;
+
+       LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
+again:
+       lock_res(res);
+       /* scan list of granted locks */
+       list_for_each(pos, &res->lr_granted) {
+               struct ldlm_lock *lock;
+               int rc;
+
+               lock = list_entry(pos, struct ldlm_lock, l_res_link);
+               LASSERT(lock->l_export);
 
-               /* initialize environment */
-               rc = lu_env_init(env, LCT_LOCAL);
-               if (rc) {
-                       OBD_FREE_PTR(env);
-                       RETURN(rc);
+               if (cb != NULL) {
+                       rc = cb(lock, arg);
+                       /* slave should not be notified */
+                       if (rc == 0)
+                               continue;
                }
 
-               /* release object reference */
-               lu_object_put(env, &obj->do_lu);
-               lu_env_fini(env);
-               OBD_FREE_PTR(env);
+               count++;
+               if (array->q_max != 0 && array->q_cnt < array->q_max) {
+                       array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
+                       array->q_cnt++;
+               }
        }
+       unlock_res(res);
 
-       res->lr_lvb_data = NULL;
-       res->lr_lvb_len  = 0;
+       if (count > array->q_max) {
+               qmt_free_lock_array(array);
+               if (++fail_cnt > 5)
+                       RETURN(-EAGAIN);
+               /*
+                * allocate more slots in case of more qualified locks are
+                * found during next loop
+                */
+               array->q_max = count + count / 2 + 10;
+               count = 0;
+               LASSERT(array->q_locks == NULL && array->q_cnt == 0);
+               OBD_ALLOC(array->q_locks,
+                         sizeof(*array->q_locks) * array->q_max);
+               if (array->q_locks == NULL) {
+                       array->q_max = 0;
+                       RETURN(-ENOMEM);
+               }
 
+               goto again;
+       }
        RETURN(0);
 }
 
-typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
-                               struct obd_uuid *, union ldlm_gl_desc *,
-                               void *);
 /*
  * Send glimpse callback to slaves holding a lock on resource \res.
  * This is used to notify slaves of new quota settings or to claim quota space
@@ -468,47 +500,39 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
 {
        struct list_head *tmp, *pos;
        struct list_head gl_list = LIST_HEAD_INIT(gl_list);
-       int              rc = 0;
+       struct qmt_gl_lock_array locks;
+       unsigned long i;
+       int rc = 0;
        ENTRY;
 
-       lock_res(res);
-       /* scan list of granted locks */
-       list_for_each(pos, &res->lr_granted) {
-               struct ldlm_glimpse_work        *work;
-               struct ldlm_lock                *lock;
-               struct obd_uuid                 *uuid;
-
-               lock = list_entry(pos, struct ldlm_lock, l_res_link);
-               LASSERT(lock->l_export);
-               uuid = &lock->l_export->exp_client_uuid;
+       memset(&locks, 0, sizeof(locks));
+       rc = qmt_alloc_lock_array(res, &locks, cb, arg);
+       if (rc) {
+               CERROR("%s: failed to allocate glimpse lock array (%d)\n",
+                      qmt->qmt_svname, rc);
+               RETURN(rc);
+       }
 
-               if (cb != NULL) {
-                       rc = cb(env, qmt, uuid, desc, arg);
-                       if (rc == 0)
-                               /* slave should not be notified */
-                               continue;
-                       if (rc < 0)
-                               /* something wrong happened, we still notify */
-                               CERROR("%s: callback function failed to "
-                                      "determine whether slave %s should be "
-                                      "notified (%d)\n", qmt->qmt_svname,
-                                      obd_uuid2str(uuid), rc);
-               }
+       for (i = locks.q_cnt; i > 0; i--) {
+               struct ldlm_glimpse_work *work;
 
                OBD_ALLOC_PTR(work);
                if (work == NULL) {
-                       CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
-                              obd_uuid2str(uuid));
+                       CERROR("%s: failed to notify a lock.\n",
+                              qmt->qmt_svname);
                        continue;
                }
 
                list_add_tail(&work->gl_list, &gl_list);
-               work->gl_lock  = LDLM_LOCK_GET(lock);
+               work->gl_lock  = locks.q_locks[i - 1];
                work->gl_flags = 0;
                work->gl_desc  = desc;
 
+               locks.q_locks[i - 1] = NULL;
+               locks.q_cnt--;
        }
-       unlock_res(res);
+
+       qmt_free_lock_array(&locks);
 
        if (list_empty(&gl_list)) {
                CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
@@ -556,9 +580,17 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
        /* send glimpse callback to notify slaves of new quota settings */
        qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
        qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
-       qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
-       qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
-       qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
+       if (lqe->lqe_is_default) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_DEFAULT);
+
+       } else {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+               qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
+       }
        qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
 
        /* look up ldlm resource associated with global index */
@@ -581,11 +613,10 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
 
 /* Callback function used to select locks that should be glimpsed when
  * broadcasting the new qunit value */
-static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
-                         struct obd_uuid *uuid, union ldlm_gl_desc *desc,
-                         void *arg)
+static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg)
 {
        struct obd_uuid *slv_uuid = arg;
+       struct obd_uuid *uuid = &lock->l_export->exp_client_uuid;
 
        if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
                RETURN(0);
@@ -625,7 +656,7 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
                lqe_write_lock(lqe);
                if (lqe->lqe_revoke_time == 0 &&
                    lqe->lqe_qunit == pool->qpi_least_qunit)
-                       lqe->lqe_revoke_time = cfs_time_current_64();
+                       lqe->lqe_revoke_time = ktime_get_seconds();
                lqe_write_unlock(lqe);
                RETURN_EXIT;
        }
@@ -663,8 +694,8 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        if (lqe->lqe_revoke_time == 0 &&
            qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
            lqe->lqe_qunit == pool->qpi_least_qunit) {
-               lqe->lqe_revoke_time = cfs_time_current_64();
-               qmt_adjust_edquot(lqe, cfs_time_current_sec());
+               lqe->lqe_revoke_time = ktime_get_seconds();
+               qmt_adjust_edquot(lqe, ktime_get_real_seconds());
        }
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
@@ -722,15 +753,19 @@ static int qmt_reba_thread(void *arg)
        ENTRY;
 
        OBD_ALLOC_PTR(env);
-       if (env == NULL)
+       if (env == NULL) {
+               thread_set_flags(thread, SVC_STOPPED);
                RETURN(-ENOMEM);
+       }
 
        rc = lu_env_init(env, LCT_MD_THREAD);
        if (rc) {
                CERROR("%s: failed to init env.", qmt->qmt_svname);
-               OBD_FREE_PTR(env);
-               RETURN(rc);
+               GOTO(out_env, rc);
        }
+       rc = lu_env_add(env);
+       if (rc)
+               GOTO(out_env_fini, rc);
 
        thread_set_flags(thread, SVC_RUNNING);
        wake_up(&thread->t_ctl_waitq);
@@ -757,7 +792,10 @@ static int qmt_reba_thread(void *arg)
                if (!thread_is_running(thread))
                        break;
        }
+       lu_env_remove(env);
+out_env_fini:
        lu_env_fini(env);
+out_env:
        OBD_FREE_PTR(env);
        thread_set_flags(thread, SVC_STOPPED);
        wake_up(&thread->t_ctl_waitq);