Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / osc / osc_quota.c
index 69a9306..0acf616 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  */
 /*
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ * Copyright (c) 2012, 2017, Intel Corporation.
  *
  * Code originally extracted from quota directory
  */
-#ifndef __KERNEL__
-# include <liblustre.h>
-#endif
-
-#include <obd_ost.h>
-#include "osc_internal.h"
-
-struct osc_quota_info {
-        cfs_list_t         oqi_hash; /* hash list */
-        struct client_obd *oqi_cli;  /* osc obd */
-        unsigned int       oqi_id;   /* uid/gid of a file */
-        short              oqi_type; /* quota type */
-};
 
-cfs_spinlock_t qinfo_list_lock = CFS_SPIN_LOCK_UNLOCKED;
+#include <obd_class.h>
+#include <lustre_osc.h>
+#include <lustre_quota.h>
 
-static cfs_list_t qinfo_hash[NR_DQHASH];
-/* SLAB cache for client quota context */
-cfs_mem_cache_t *qinfo_cachep = NULL;
-
-static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
-                         __attribute__((__const__));
+#include "osc_internal.h"
 
-static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
+int osc_quota_chkdq(struct client_obd *cli, const unsigned int qid[])
 {
-        unsigned long tmp = ((unsigned long)cli>>6) ^ id;
-        tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
-        return tmp;
+       int type;
+
+       ENTRY;
+       for (type = 0; type < LL_MAXQUOTAS; type++) {
+               u8 *qtype;
+
+               qtype = xa_load(&cli->cl_quota_exceeded_ids, qid[type]);
+               if (qtype && (xa_to_value(qtype) & BIT(type))) {
+                       /* the slot is busy, the user is about to run out of
+                        * quota space on this OST
+                        */
+                       CDEBUG(D_QUOTA, "chkdq found noquota for %s %d\n",
+                              qtype_name(type), qid[type]);
+                       RETURN(-EDQUOT);
+               }
+       }
+
+       RETURN(0);
 }
 
-/* caller must hold qinfo_list_lock */
-static inline void insert_qinfo_hash(struct osc_quota_info *oqi)
+static inline u32 md_quota_flag(int qtype)
 {
-        cfs_list_t *head = qinfo_hash +
-                hashfn(oqi->oqi_cli, oqi->oqi_id, oqi->oqi_type);
-
-        LASSERT_SPIN_LOCKED(&qinfo_list_lock);
-        cfs_list_add(&oqi->oqi_hash, head);
+       switch (qtype) {
+       case USRQUOTA:
+               return OBD_MD_FLUSRQUOTA;
+       case GRPQUOTA:
+               return OBD_MD_FLGRPQUOTA;
+       case PRJQUOTA:
+               return OBD_MD_FLPRJQUOTA;
+       default:
+               return 0;
+       }
 }
 
-/* caller must hold qinfo_list_lock */
-static inline void remove_qinfo_hash(struct osc_quota_info *oqi)
+static inline u32 fl_quota_flag(int qtype)
 {
-        LASSERT_SPIN_LOCKED(&qinfo_list_lock);
-        cfs_list_del_init(&oqi->oqi_hash);
+       switch (qtype) {
+       case USRQUOTA:
+               return OBD_FL_NO_USRQUOTA;
+       case GRPQUOTA:
+               return OBD_FL_NO_GRPQUOTA;
+       case PRJQUOTA:
+               return OBD_FL_NO_PRJQUOTA;
+       default:
+               return 0;
+       }
 }
 
-/* caller must hold qinfo_list_lock */
-static inline struct osc_quota_info *find_qinfo(struct client_obd *cli,
-                                                unsigned int id, int type)
+int osc_quota_setdq(struct client_obd *cli, u64 xid, const unsigned int qid[],
+                   u64 valid, u32 flags)
 {
-        struct osc_quota_info *oqi;
-        unsigned int           hashent = hashfn(cli, id, type);
-        ENTRY;
+       int type;
+       int rc = 0;
 
-        LASSERT_SPIN_LOCKED(&qinfo_list_lock);
-        cfs_list_for_each_entry(oqi, &qinfo_hash[hashent], oqi_hash) {
-                if (oqi->oqi_cli == cli &&
-                    oqi->oqi_id == id && oqi->oqi_type == type)
-                        RETURN(oqi);
-        }
-        RETURN(NULL);
-}
-
-static struct osc_quota_info *alloc_qinfo(struct client_obd *cli,
-                                          unsigned int id, int type)
-{
-        struct osc_quota_info *oqi;
         ENTRY;
-
-        OBD_SLAB_ALLOC(oqi, qinfo_cachep, CFS_ALLOC_IO, sizeof(*oqi));
-        if(!oqi)
-                RETURN(NULL);
-
-        CFS_INIT_LIST_HEAD(&oqi->oqi_hash);
-        oqi->oqi_cli = cli;
-        oqi->oqi_id = id;
-        oqi->oqi_type = type;
-
-        RETURN(oqi);
+       if ((valid & (OBD_MD_FLALLQUOTA)) == 0)
+               RETURN(0);
+
+       mutex_lock(&cli->cl_quota_mutex);
+       cli->cl_root_squash = !!(flags & OBD_FL_ROOT_SQUASH);
+       cli->cl_root_prjquota = !!(flags & OBD_FL_ROOT_PRJQUOTA);
+       /* still mark the quots is running out for the old request, because it
+        * could be processed after the new request at OST, the side effect is
+        * the following request will be processed synchronously, but it will
+        * not break the quota enforcement. */
+       if (cli->cl_quota_last_xid > xid && !(flags & OBD_FL_NO_QUOTA_ALL))
+               GOTO(out_unlock, rc);
+
+       if (cli->cl_quota_last_xid < xid)
+               cli->cl_quota_last_xid = xid;
+
+       for (type = 0; type < LL_MAXQUOTAS; type++) {
+               unsigned long bits = 0;
+               u8 *qtypes;
+
+               if ((valid & md_quota_flag(type)) == 0)
+                       continue;
+
+               /* lookup the quota IDs in the ID xarray */
+               qtypes = xa_load(&cli->cl_quota_exceeded_ids, qid[type]);
+               if ((flags & fl_quota_flag(type)) != 0) {
+                       /* This ID is getting close to its quota limit, let's
+                        * switch to sync I/O
+                        */
+                       if (qtypes) { /* ID already cached */
+                               bits = xa_to_value(qtypes);
+                               /* test if ID type already set */
+                               if (!(bits & BIT(type))) {
+                                       bits |= BIT(type);
+                                       rc = xa_err(xa_store(&cli->cl_quota_exceeded_ids,
+                                                            qid[type],
+                                                            xa_mk_value(bits),
+                                                            GFP_KERNEL));
+                                       if (rc < 0)
+                                               GOTO(out_unlock, rc);
+                               }
+                               continue;
+                       }
+
+                       /* Only insert if we see the this ID for the
+                        * very first time.
+                        */
+                       bits |= BIT(type);
+                       rc = ll_xa_insert(&cli->cl_quota_exceeded_ids,
+                                         qid[type], xa_mk_value(bits),
+                                         GFP_KERNEL);
+                       if (rc == -ENOMEM)
+                               break;
+
+                       CDEBUG(D_QUOTA, "%s: setdq to insert for %s %d: rc = %d\n",
+                              cli_name(cli), qtype_name(type), qid[type], rc);
+               } else {
+                       /* This ID is now off the hook, let's remove it from
+                        * the xarray
+                        */
+                       if (!qtypes)
+                               continue;
+
+                       bits = xa_to_value(qtypes);
+                       if (!(bits & BIT(type)))
+                               continue;
+
+                       bits &= ~BIT(type);
+                       if (bits) {
+                               rc = xa_err(xa_store(&cli->cl_quota_exceeded_ids,
+                                                    qid[type],
+                                                    xa_mk_value(bits),
+                                                    GFP_KERNEL));
+                               if (rc < 0)
+                                       GOTO(out_unlock, rc);
+                       } else {
+                               xa_erase(&cli->cl_quota_exceeded_ids, qid[type]);
+                       }
+
+                       CDEBUG(D_QUOTA, "%s: setdq to remove for %s %d\n",
+                              cli_name(cli), qtype_name(type), qid[type]);
+               }
+       }
+
+out_unlock:
+       mutex_unlock(&cli->cl_quota_mutex);
+       RETURN(rc);
 }
 
-static void free_qinfo(struct osc_quota_info *oqi)
+int osc_quota_setup(struct obd_device *obd)
 {
-        OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi));
-}
+       struct client_obd *cli = &obd->u.cli;
 
-int osc_quota_chkdq(struct client_obd *cli, const unsigned int qid[])
-{
-        unsigned int id;
-        int          cnt, rc = QUOTA_OK;
-        ENTRY;
+       mutex_init(&cli->cl_quota_mutex);
 
-        cfs_spin_lock(&qinfo_list_lock);
-        for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
-                struct osc_quota_info *oqi = NULL;
-
-                id = (cnt == USRQUOTA) ? qid[USRQUOTA] : qid[GRPQUOTA];
-                oqi = find_qinfo(cli, id, cnt);
-                if (oqi) {
-                        rc = NO_QUOTA;
-                        break;
-                }
-        }
-        cfs_spin_unlock(&qinfo_list_lock);
-
-        if (rc == NO_QUOTA)
-                CDEBUG(D_QUOTA, "chkdq found noquota for %s %d\n",
-                       cnt == USRQUOTA ? "user" : "group", id);
-        RETURN(rc);
-}
+       xa_init(&cli->cl_quota_exceeded_ids);
 
-int osc_quota_setdq(struct client_obd *cli, const unsigned int qid[],
-                    obd_flag valid, obd_flag flags)
-{
-        unsigned int id;
-        obd_flag     noquota;
-        int          cnt, rc = 0;
-        ENTRY;
-
-        for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
-                struct osc_quota_info *oqi = NULL, *old;
-
-                if (!(valid & ((cnt == USRQUOTA) ?
-                    OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA)))
-                        continue;
-
-                id = (cnt == USRQUOTA) ? qid[USRQUOTA] : qid[GRPQUOTA];
-                noquota = (cnt == USRQUOTA) ?
-                    (flags & OBD_FL_NO_USRQUOTA) : (flags & OBD_FL_NO_GRPQUOTA);
-
-                if (noquota) {
-                        oqi = alloc_qinfo(cli, id, cnt);
-                        if (!oqi) {
-                                rc = -ENOMEM;
-                                CDEBUG(D_QUOTA, "setdq for %s %d failed, "
-                                       "(rc = %d)\n",
-                                       cnt == USRQUOTA ? "user" : "group",
-                                       id, rc);
-                                break;
-                        }
-                }
-
-                cfs_spin_lock(&qinfo_list_lock);
-                old = find_qinfo(cli, id, cnt);
-                if (old && !noquota)
-                        remove_qinfo_hash(old);
-                else if (!old && noquota)
-                        insert_qinfo_hash(oqi);
-                cfs_spin_unlock(&qinfo_list_lock);
-
-                if (old && !noquota)
-                        CDEBUG(D_QUOTA, "setdq to remove for %s %d\n",
-                               cnt == USRQUOTA ? "user" : "group", id);
-                else if (!old && noquota)
-                        CDEBUG(D_QUOTA, "setdq to insert for %s %d\n",
-                               cnt == USRQUOTA ? "user" : "group", id);
-
-                if (old) {
-                        if (noquota)
-                                free_qinfo(oqi);
-                        else
-                                free_qinfo(old);
-                }
-        }
-        RETURN(rc);
+       return 0;
 }
 
-int osc_quota_cleanup(struct obd_device *obd)
+void osc_quota_cleanup(struct obd_device *obd)
 {
-        struct client_obd     *cli = &obd->u.cli;
-        struct osc_quota_info *oqi, *n;
-        int i;
-        ENTRY;
+       struct client_obd *cli = &obd->u.cli;
+       unsigned long qid;
+       u8 *qtypes;
 
-        cfs_spin_lock(&qinfo_list_lock);
-        for (i = 0; i < NR_DQHASH; i++) {
-                cfs_list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
-                        if (oqi->oqi_cli != cli)
-                                continue;
-                        remove_qinfo_hash(oqi);
-                        free_qinfo(oqi);
-                }
-        }
-        cfs_spin_unlock(&qinfo_list_lock);
-
-        RETURN(0);
-}
-
-int osc_quota_init()
-{
-        int i;
-        ENTRY;
+       xa_for_each(&cli->cl_quota_exceeded_ids, qid, qtypes)
+               xa_erase(&cli->cl_quota_exceeded_ids, qid);
 
-        LASSERT(qinfo_cachep == NULL);
-        qinfo_cachep = cfs_mem_cache_create("osc_quota_info",
-                                            sizeof(struct osc_quota_info),
-                                            0, 0);
-        if (!qinfo_cachep)
-                RETURN(-ENOMEM);
-
-        for (i = 0; i < NR_DQHASH; i++)
-                CFS_INIT_LIST_HEAD(qinfo_hash + i);
-
-        RETURN(0);
-}
-
-int osc_quota_exit()
-{
-        struct osc_quota_info *oqi, *n;
-        int                    i, rc;
-        ENTRY;
-
-        cfs_spin_lock(&qinfo_list_lock);
-        for (i = 0; i < NR_DQHASH; i++) {
-                cfs_list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
-                        remove_qinfo_hash(oqi);
-                        free_qinfo(oqi);
-                }
-        }
-        cfs_spin_unlock(&qinfo_list_lock);
-
-        rc = cfs_mem_cache_destroy(qinfo_cachep);
-        LASSERTF(rc == 0, "couldn't destory qinfo_cachep slab\n");
-        qinfo_cachep = NULL;
-
-        RETURN(0);
+       xa_destroy(&cli->cl_quota_exceeded_ids);
 }
 
 int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
                  struct obd_quotactl *oqctl)
 {
-        struct ptlrpc_request *req;
-        struct obd_quotactl   *oqc;
-        int                    rc;
-        ENTRY;
+       struct ptlrpc_request   *req;
+       struct obd_quotactl     *oqc;
+       int                      rc;
+
+       ENTRY;
+
+       req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                       &RQF_OST_QUOTACTL, LUSTRE_OST_VERSION,
+                                       OST_QUOTACTL);
+       if (req == NULL)
+               RETURN(-ENOMEM);
 
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                        &RQF_OST_QUOTACTL, LUSTRE_OST_VERSION,
-                                        OST_QUOTACTL);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA)
+               req_capsule_set_size(&req->rq_pill, &RMF_OBD_QUOTA_ITER,
+                                    RCL_SERVER, LQUOTA_ITER_BUFLEN);
+       else
+               req_capsule_set_size(&req->rq_pill, &RMF_OBD_QUOTA_ITER,
+                                    RCL_SERVER, 0);
 
         oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
         *oqc = *oqctl;
@@ -282,97 +236,52 @@ int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
         if (rc)
                 CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
 
-        if (req->rq_repmsg &&
-            (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) {
-                *oqctl = *oqc;
-        } else if (!rc) {
-                CERROR ("Can't unpack obd_quotactl\n");
-                rc = -EPROTO;
-        }
-        ptlrpc_req_finished(req);
+       if (req->rq_repmsg) {
+               struct list_head *lst = (struct list_head *)oqctl->qc_iter_list;
 
-        RETURN(rc);
-}
+               oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
+               if (!oqc)
+                       GOTO(out, rc = -EPROTO);
 
-int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
-                   struct obd_quotactl *oqctl)
-{
-        struct client_obd       *cli = &exp->exp_obd->u.cli;
-        struct ptlrpc_request   *req;
-        struct obd_quotactl     *body;
-        int                      rc;
-        ENTRY;
+               *oqctl = *oqc;
 
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                        &RQF_OST_QUOTACHECK, LUSTRE_OST_VERSION,
-                                        OST_QUOTACHECK);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+               if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) {
+                       void *buffer;
+                       struct lquota_iter *iter;
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
-        *body = *oqctl;
+                       buffer = req_capsule_server_get(&req->rq_pill,
+                                                       &RMF_OBD_QUOTA_ITER);
 
-        ptlrpc_request_set_replen(req);
-
-        /* the next poll will find -ENODATA, that means quotacheck is
-         * going on */
-        cli->cl_qchk_stat = -ENODATA;
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                cli->cl_qchk_stat = rc;
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
-
-int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        int rc;
-        ENTRY;
+                       if (buffer == NULL) {
+                               CDEBUG(D_QUOTA, "%s: no buffer in iter req\n",
+                                      exp->exp_obd->obd_name);
 
-        qchk->obd_uuid = cli->cl_target_uuid;
-        memcpy(qchk->obd_type, LUSTRE_OST_NAME, strlen(LUSTRE_OST_NAME));
+                               rc = -EPROTO;
+                               GOTO(out, rc);
+                       }
 
-        rc = cli->cl_qchk_stat;
-        /* the client is not the previous one */
-        if (rc == CL_NOT_QUOTACHECKED)
-                rc = -EINTR;
-        RETURN(rc);
-}
+                       OBD_ALLOC_LARGE(iter,
+                              sizeof(struct lquota_iter) + LQUOTA_ITER_BUFLEN);
+                       if (iter == NULL)
+                               GOTO(out, rc = -ENOMEM);
 
-int osc_quota_adjust_qunit(struct obd_export *exp,
-                           struct quota_adjust_qunit *oqaq,
-                           struct lustre_quota_ctxt *qctxt,
-                           struct ptlrpc_request_set *rqset)
-{
-        struct ptlrpc_request     *req;
-        struct quota_adjust_qunit *oqa;
-        int                        rc = 0;
-        ENTRY;
+                       INIT_LIST_HEAD(&iter->li_link);
+                       list_add(&iter->li_link, lst);
 
-        /* client don't support this kind of operation, abort it */
-        if (!(exp->exp_connect_flags & OBD_CONNECT_CHANGE_QS)) {
-                CDEBUG(D_QUOTA, "osc: %s don't support change qunit size\n",
-                       exp->exp_obd->obd_name);
-                RETURN(rc);
-        }
-        if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME))
-                RETURN(-EINVAL);
+                       memcpy(iter->li_buffer, buffer, LQUOTA_ITER_BUFLEN);
+                       iter->li_dt_size = oqctl->qc_iter_dt_buflen;
+                       oqctl->qc_iter_md_buflen = 0;
+                       oqctl->qc_iter_dt_buflen = 0;
+               }
+       } else if (!rc) {
+               CERROR("%s: cannot unpack obd_quotactl: rc = %d\n",
+                      exp->exp_obd->obd_name, rc);
 
-        LASSERT(rqset);
+               rc = -EPROTO;
+       }
 
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                        &RQF_OST_QUOTA_ADJUST_QUNIT,
-                                        LUSTRE_OST_VERSION,
-                                        OST_QUOTA_ADJUST_QUNIT);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        oqa = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
-        *oqa = *oqaq;
-
-        ptlrpc_request_set_replen(req);
+out:
+       ptlrpc_req_finished(req);
 
-        ptlrpc_set_add_req(rqset, req);
-        RETURN(rc);
+       RETURN(rc);
 }