Whamcloud - gitweb
LU-5458: libcfs: protect kkuc_groups from write access
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
index e37d041..9500611 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include <lustre_dlm.h>
@@ -51,10 +47,13 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        struct quota_body       *repbody;
        struct obd_uuid         *uuid;
        struct lquota_lvb       *lvb;
-       int                      rc;
+       struct ldlm_resource    *res = (*lockp)->l_resource;
+       int                      rc, lvb_len;
        ENTRY;
 
        req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
+       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                            ldlm_lvbo_size(*lockp));
 
        /* extract quota body and intent opc */
        it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
@@ -79,13 +78,44 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        uuid = &(*lockp)->l_export->exp_client_uuid;
        switch (it->opc) {
 
-       case IT_QUOTA_DQACQ:
-               /* XXX: to be added in a next patch */
-               GOTO(out, -EOPNOTSUPP);
+       case IT_QUOTA_DQACQ: {
+               struct lquota_entry     *lqe;
+               struct ldlm_lock        *lock;
+
+               if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
+                       /* acquire on global lock? something is wrong ... */
+                       GOTO(out, rc = -EPROTO);
+
+               /* verify global lock isn't stale */
+               if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
+                       GOTO(out, rc = -ENOLCK);
+
+               lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
+               if (lock == NULL)
+                       GOTO(out, rc = -ENOLCK);
+               LDLM_LOCK_PUT(lock);
+
+               lqe = res->lr_lvb_data;
+               LASSERT(lqe != NULL);
+               lqe_getref(lqe);
+
+               /* acquire quota space */
+               rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
+                               reqbody->qb_count, reqbody->qb_usage,
+                               repbody);
+               lqe_putref(lqe);
+               if (rc)
+                       GOTO(out, rc);
                break;
+       }
 
        case IT_QUOTA_CONN:
                /* new connection from slave */
+
+               if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
+                       /* connection on per-ID lock? something is wrong ... */
+                       GOTO(out, rc = -EPROTO);
+
                rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
                                       &repbody->qb_slv_fid,
                                       &repbody->qb_slv_ver, uuid);
@@ -100,10 +130,13 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        }
 
        /* on success, pack lvb in reply */
-       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
-                            ldlm_lvbo_size(*lockp));
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
-       ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp));
+       lvb_len = ldlm_lvbo_size(*lockp);
+       lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+       if (lvb_len < 0)
+               GOTO(out, rc = lvb_len);
+
+       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
        EXIT;
 out:
        return rc;
@@ -137,14 +170,12 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
 
        /* initialize environment */
        rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc) {
-               OBD_FREE_PTR(env);
-               RETURN(rc);
-       }
+       if (rc != 0)
+               GOTO(out_free, rc);
        qti = qmt_info(env);
 
        /* extract global index FID and quota identifier */
-       fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id);
+       fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
 
        /* sanity check the global index FID */
        rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
@@ -185,10 +216,11 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
                CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
        }
 
-       res->lr_lvb_len  = sizeof(struct lquota_lvb);
+       res->lr_lvb_len = sizeof(struct lquota_lvb);
        EXIT;
 out:
        lu_env_fini(env);
+out_free:
        OBD_FREE_PTR(env);
        return rc;
 }
@@ -206,6 +238,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        struct qmt_device       *qmt = lu2qmt_dev(ld);
        struct lquota_entry     *lqe;
        struct lquota_lvb       *lvb;
+       struct ldlm_lock        *lock;
+       struct obd_export       *exp;
        int                      rc = 0;
        ENTRY;
 
@@ -218,35 +252,78 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                /* no need to update lvb for global quota locks */
                RETURN(0);
 
+       lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
+                                         lustre_swab_lquota_lvb);
+       if (lvb == NULL) {
+               CERROR("%s: failed to extract lvb from request\n",
+                      qmt->qmt_svname);
+               RETURN(-EFAULT);
+       }
+
        lqe = res->lr_lvb_data;
        LASSERT(lqe != NULL);
+       lqe_getref(lqe);
+
+       LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
+                    lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+       if (lvb->lvb_id_rel == 0) {
+               /* nothing to release */
+               if (lvb->lvb_id_may_rel != 0)
+                       /* but might still release later ... */
+                       lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+               GOTO(out_lqe, rc = 0);
+       }
 
        /* allocate environement */
        OBD_ALLOC_PTR(env);
        if (env == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out_lqe, rc = -ENOMEM);
 
        /* initialize environment */
        rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc) {
-               OBD_FREE_PTR(env);
-               RETURN(rc);
-       }
+       if (rc)
+               GOTO(out_env, rc);
        qti = qmt_info(env);
 
-       lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
-       if (lvb == NULL) {
-               CERROR("%s: failed to extract lvb from request\n",
+       /* The request is a glimpse callback which was sent via the
+        * reverse import to the slave. What we care about here is the
+        * export associated with the slave and req->rq_export is
+        * definitely not what we are looking for (it is actually set to
+        * NULL here).
+        * Therefore we extract the lock from the request argument
+        * and use lock->l_export. */
+       lock = ldlm_request_lock(req);
+       if (IS_ERR(lock)) {
+               CERROR("%s: failed to get lock from request!\n",
                       qmt->qmt_svname);
-               GOTO(out, rc);
+               GOTO(out_env_init, rc = PTR_ERR(lock));
        }
 
-       /* XXX: Space release handling to be added in a next patch */
+       exp = class_export_get(lock->l_export);
+       if (exp == NULL) {
+               CERROR("%s: failed to get export from lock!\n",
+                      qmt->qmt_svname);
+               GOTO(out_env_init, rc = -EFAULT);
+       }
 
+       /* release quota space */
+       rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
+                       QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
+       if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+               LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
+                            LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
+                            lvb->lvb_id_rel, rc);
+       class_export_put(exp);
+       if (rc)
+               GOTO(out_env_init, rc);
        EXIT;
-out:
+out_env_init:
        lu_env_fini(env);
+out_env:
        OBD_FREE_PTR(env);
+out_lqe:
+       lqe_putref(lqe);
        return rc;
 }
 
@@ -380,19 +457,19 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                            struct ldlm_resource *res, union ldlm_gl_desc *desc,
                            qmt_glimpse_cb_t cb, void *arg)
 {
-       cfs_list_t      *tmp, *pos;
-       CFS_LIST_HEAD(gl_list);
+       struct list_head *tmp, *pos;
+       struct list_head gl_list = LIST_HEAD_INIT(gl_list);
        int              rc = 0;
        ENTRY;
 
        lock_res(res);
        /* scan list of granted locks */
-       cfs_list_for_each(pos, &res->lr_granted) {
+       list_for_each(pos, &res->lr_granted) {
                struct ldlm_glimpse_work        *work;
                struct ldlm_lock                *lock;
                struct obd_uuid                 *uuid;
 
-               lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
+               lock = list_entry(pos, struct ldlm_lock, l_res_link);
                LASSERT(lock->l_export);
                uuid = &lock->l_export->exp_client_uuid;
 
@@ -416,7 +493,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                        continue;
                }
 
-               cfs_list_add_tail(&work->gl_list, &gl_list);
+               list_add_tail(&work->gl_list, &gl_list);
                work->gl_lock  = LDLM_LOCK_GET(lock);
                work->gl_flags = 0;
                work->gl_desc  = desc;
@@ -424,7 +501,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
        }
        unlock_res(res);
 
-       if (cfs_list_empty(&gl_list)) {
+       if (list_empty(&gl_list)) {
                CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
                RETURN(0);
        }
@@ -432,12 +509,12 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
        /* issue glimpse callbacks to all connected slaves */
        rc = ldlm_glimpse_locks(res, &gl_list);
 
-       cfs_list_for_each_safe(pos, tmp, &gl_list) {
+       list_for_each_safe(pos, tmp, &gl_list) {
                struct ldlm_glimpse_work *work;
 
-               work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
+               work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
 
-               cfs_list_del(&work->gl_list);
+               list_del(&work->gl_list);
                CERROR("%s: failed to notify %s of new quota settings\n",
                       qmt->qmt_svname,
                       obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
@@ -473,13 +550,14 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
        qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
        qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
        qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+       qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
        qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
 
        /* look up ldlm resource associated with global index */
        fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
        res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
                                LDLM_PLAIN, 0);
-       if (res == NULL) {
+       if (IS_ERR(res)) {
                /* this might happen if no slaves have enqueued global quota
                 * locks yet */
                LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
@@ -529,18 +607,23 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
 
        lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
                            pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
-       fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
+       fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
        res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
                                0);
-       if (res == NULL) {
+       if (IS_ERR(res)) {
                /* this might legitimately happens if slaves haven't had the
                 * opportunity to enqueue quota lock yet. */
                LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
                             "lock "DFID, PFID(&qti->qti_fid));
+               lqe_write_lock(lqe);
+               if (lqe->lqe_revoke_time == 0 &&
+                   lqe->lqe_qunit == pool->qpi_least_qunit)
+                       lqe->lqe_revoke_time = cfs_time_current_64();
+               lqe_write_unlock(lqe);
                RETURN_EXIT;
        }
 
-       lqe_read_lock(lqe);
+       lqe_write_lock(lqe);
        /* The purpose of glimpse callback on per-ID lock is twofold:
         * - notify slaves of new qunit value and hope they will release some
         *   spare quota space in return
@@ -553,18 +636,32 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        else
                qti->qti_gl_desc.lquota_desc.gl_flags = 0;
        qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
-       lqe_read_unlock(lqe);
+
+       if (lqe->lqe_revoke_time == 0 &&
+           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+               /* reset lqe_may_rel, it will be updated on glimpse callback
+                * replies if needed */
+               lqe->lqe_may_rel = 0;
 
        /* The rebalance thread is the only thread which can issue glimpses */
        LASSERT(!lqe->lqe_gl);
        lqe->lqe_gl = true;
+       lqe_write_unlock(lqe);
 
        /* issue glimpse callback to slaves */
        rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
                              uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
 
+       lqe_write_lock(lqe);
+       if (lqe->lqe_revoke_time == 0 &&
+           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
+           lqe->lqe_qunit == pool->qpi_least_qunit) {
+               lqe->lqe_revoke_time = cfs_time_current_64();
+               qmt_adjust_edquot(lqe, cfs_time_current_sec());
+       }
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
+       lqe_write_unlock(lqe);
 
        ldlm_resource_putref(res);
        EXIT;
@@ -583,15 +680,15 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
        ENTRY;
 
        lqe_getref(lqe);
-       cfs_spin_lock(&qmt->qmt_reba_lock);
-       if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
-               cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
+       spin_lock(&qmt->qmt_reba_lock);
+       if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
+               list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
                added = true;
        }
-       cfs_spin_unlock(&qmt->qmt_reba_lock);
+       spin_unlock(&qmt->qmt_reba_lock);
 
        if (added)
-               cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
+               wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
        else
                lqe_putref(lqe);
        EXIT;
@@ -614,7 +711,6 @@ static int qmt_reba_thread(void *arg)
        struct l_wait_info       lwi = { 0 };
        struct lu_env           *env;
        struct lquota_entry     *lqe, *tmp;
-       char                     pname[MTI_NAME_MAXLEN];
        int                      rc;
        ENTRY;
 
@@ -629,30 +725,27 @@ static int qmt_reba_thread(void *arg)
                RETURN(rc);
        }
 
-       snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname);
-       cfs_daemonize(pname);
-
        thread_set_flags(thread, SVC_RUNNING);
-       cfs_waitq_signal(&thread->t_ctl_waitq);
+       wake_up(&thread->t_ctl_waitq);
 
        while (1) {
                l_wait_event(thread->t_ctl_waitq,
-                            !cfs_list_empty(&qmt->qmt_reba_list) ||
+                            !list_empty(&qmt->qmt_reba_list) ||
                             !thread_is_running(thread), &lwi);
 
-               cfs_spin_lock(&qmt->qmt_reba_lock);
-               cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
-                                            lqe_link) {
-                       cfs_list_del_init(&lqe->lqe_link);
-                       cfs_spin_unlock(&qmt->qmt_reba_lock);
+               spin_lock(&qmt->qmt_reba_lock);
+               list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
+                                        lqe_link) {
+                       list_del_init(&lqe->lqe_link);
+                       spin_unlock(&qmt->qmt_reba_lock);
 
                        if (thread_is_running(thread))
                                qmt_id_lock_glimpse(env, qmt, lqe, NULL);
 
                        lqe_putref(lqe);
-                       cfs_spin_lock(&qmt->qmt_reba_lock);
+                       spin_lock(&qmt->qmt_reba_lock);
                }
-               cfs_spin_unlock(&qmt->qmt_reba_lock);
+               spin_unlock(&qmt->qmt_reba_lock);
 
                if (!thread_is_running(thread))
                        break;
@@ -660,7 +753,7 @@ static int qmt_reba_thread(void *arg)
        lu_env_fini(env);
        OBD_FREE_PTR(env);
        thread_set_flags(thread, SVC_STOPPED);
-       cfs_waitq_signal(&thread->t_ctl_waitq);
+       wake_up(&thread->t_ctl_waitq);
        RETURN(rc);
 }
 
@@ -671,15 +764,16 @@ int qmt_start_reba_thread(struct qmt_device *qmt)
 {
        struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
        struct l_wait_info       lwi    = { 0 };
-       int                      rc;
+       struct task_struct              *task;
        ENTRY;
 
-       rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0);
-       if (rc < 0) {
-               CERROR("%s: failed to start rebalance thread (%d)\n",
-                      qmt->qmt_svname, rc);
+       task = kthread_run(qmt_reba_thread, (void *)qmt,
+                              "qmt_reba_%s", qmt->qmt_svname);
+       if (IS_ERR(task)) {
+               CERROR("%s: failed to start rebalance thread (%ld)\n",
+                      qmt->qmt_svname, PTR_ERR(task));
                thread_set_flags(thread, SVC_STOPPED);
-               RETURN(rc);
+               RETURN(PTR_ERR(task));
        }
 
        l_wait_event(thread->t_ctl_waitq,
@@ -700,10 +794,10 @@ void qmt_stop_reba_thread(struct qmt_device *qmt)
                struct l_wait_info lwi = { 0 };
 
                thread_set_flags(thread, SVC_STOPPING);
-               cfs_waitq_signal(&thread->t_ctl_waitq);
+               wake_up(&thread->t_ctl_waitq);
 
                l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
                             &lwi);
        }
-       LASSERT(cfs_list_empty(&qmt->qmt_reba_list));
+       LASSERT(list_empty(&qmt->qmt_reba_list));
 }