Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / osc / osc_quota.c
index efc3b48..0acf616 100644 (file)
 /*
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  *
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  *
  * Code originally extracted from quota directory
  */
 
-#include <obd.h>
-#include "osc_internal.h"
-
-static inline struct osc_quota_info *osc_oqi_alloc(obd_uid id)
-{
-       struct osc_quota_info *oqi;
+#include <obd_class.h>
+#include <lustre_osc.h>
+#include <lustre_quota.h>
 
-       OBD_SLAB_ALLOC_PTR(oqi, osc_quota_kmem);
-       if (oqi != NULL)
-               oqi->oqi_id = id;
-
-       return oqi;
-}
+#include "osc_internal.h"
 
 int osc_quota_chkdq(struct client_obd *cli, const unsigned int qid[])
 {
        int type;
-       ENTRY;
-
-       for (type = 0; type < MAXQUOTAS; type++) {
-               struct osc_quota_info *oqi;
 
-               oqi = cfs_hash_lookup(cli->cl_quota_hash[type], &qid[type]);
-               if (oqi) {
-                       /* do not try to access oqi here, it could have been
-                        * freed by osc_quota_setdq() */
+       ENTRY;
+       for (type = 0; type < LL_MAXQUOTAS; type++) {
+               u8 *qtype;
 
+               qtype = xa_load(&cli->cl_quota_exceeded_ids, qid[type]);
+               if (qtype && (xa_to_value(qtype) & BIT(type))) {
                        /* the slot is busy, the user is about to run out of
-                        * quota space on this OST */
+                        * quota space on this OST
+                        */
                        CDEBUG(D_QUOTA, "chkdq found noquota for %s %d\n",
-                              type == USRQUOTA ? "user" : "grout", qid[type]);
-                       RETURN(NO_QUOTA);
+                              qtype_name(type), qid[type]);
+                       RETURN(-EDQUOT);
                }
        }
 
-       RETURN(QUOTA_OK);
+       RETURN(0);
+}
+
+static inline u32 md_quota_flag(int qtype)
+{
+       switch (qtype) {
+       case USRQUOTA:
+               return OBD_MD_FLUSRQUOTA;
+       case GRPQUOTA:
+               return OBD_MD_FLGRPQUOTA;
+       case PRJQUOTA:
+               return OBD_MD_FLPRJQUOTA;
+       default:
+               return 0;
+       }
 }
 
-#define MD_QUOTA_FLAG(type) ((type == USRQUOTA) ? OBD_MD_FLUSRQUOTA \
-                                               : OBD_MD_FLGRPQUOTA)
-#define FL_QUOTA_FLAG(type) ((type == USRQUOTA) ? OBD_FL_NO_USRQUOTA \
-                                               : OBD_FL_NO_GRPQUOTA)
+static inline u32 fl_quota_flag(int qtype)
+{
+       switch (qtype) {
+       case USRQUOTA:
+               return OBD_FL_NO_USRQUOTA;
+       case GRPQUOTA:
+               return OBD_FL_NO_GRPQUOTA;
+       case PRJQUOTA:
+               return OBD_FL_NO_PRJQUOTA;
+       default:
+               return 0;
+       }
+}
 
-int osc_quota_setdq(struct client_obd *cli, const unsigned int qid[],
-                    obd_flag valid, obd_flag flags)
+int osc_quota_setdq(struct client_obd *cli, u64 xid, const unsigned int qid[],
+                   u64 valid, u32 flags)
 {
        int type;
        int rc = 0;
-        ENTRY;
 
-       if ((valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) == 0)
+        ENTRY;
+       if ((valid & (OBD_MD_FLALLQUOTA)) == 0)
                RETURN(0);
 
-       for (type = 0; type < MAXQUOTAS; type++) {
-               struct osc_quota_info *oqi;
+       mutex_lock(&cli->cl_quota_mutex);
+       cli->cl_root_squash = !!(flags & OBD_FL_ROOT_SQUASH);
+       cli->cl_root_prjquota = !!(flags & OBD_FL_ROOT_PRJQUOTA);
+       /* still mark the quots is running out for the old request, because it
+        * could be processed after the new request at OST, the side effect is
+        * the following request will be processed synchronously, but it will
+        * not break the quota enforcement. */
+       if (cli->cl_quota_last_xid > xid && !(flags & OBD_FL_NO_QUOTA_ALL))
+               GOTO(out_unlock, rc);
+
+       if (cli->cl_quota_last_xid < xid)
+               cli->cl_quota_last_xid = xid;
 
-               if ((valid & MD_QUOTA_FLAG(type)) == 0)
+       for (type = 0; type < LL_MAXQUOTAS; type++) {
+               unsigned long bits = 0;
+               u8 *qtypes;
+
+               if ((valid & md_quota_flag(type)) == 0)
                        continue;
 
-               /* lookup the ID in the per-type hash table */
-               oqi = cfs_hash_lookup(cli->cl_quota_hash[type], &qid[type]);
-               if ((flags & FL_QUOTA_FLAG(type)) != 0) {
+               /* lookup the quota IDs in the ID xarray */
+               qtypes = xa_load(&cli->cl_quota_exceeded_ids, qid[type]);
+               if ((flags & fl_quota_flag(type)) != 0) {
                        /* This ID is getting close to its quota limit, let's
-                        * switch to sync I/O */
-                       if (oqi != NULL)
+                        * switch to sync I/O
+                        */
+                       if (qtypes) { /* ID already cached */
+                               bits = xa_to_value(qtypes);
+                               /* test if ID type already set */
+                               if (!(bits & BIT(type))) {
+                                       bits |= BIT(type);
+                                       rc = xa_err(xa_store(&cli->cl_quota_exceeded_ids,
+                                                            qid[type],
+                                                            xa_mk_value(bits),
+                                                            GFP_KERNEL));
+                                       if (rc < 0)
+                                               GOTO(out_unlock, rc);
+                               }
                                continue;
-
-                       oqi = osc_oqi_alloc(qid[type]);
-                       if (oqi == NULL) {
-                               rc = -ENOMEM;
-                               break;
                        }
 
-                       rc = cfs_hash_add_unique(cli->cl_quota_hash[type],
-                                                &qid[type], &oqi->oqi_hash);
-                       /* race with others? */
-                       if (rc == -EALREADY) {
-                               rc = 0;
-                               OBD_SLAB_FREE_PTR(oqi, osc_quota_kmem);
-                       }
+                       /* Only insert if we see the this ID for the
+                        * very first time.
+                        */
+                       bits |= BIT(type);
+                       rc = ll_xa_insert(&cli->cl_quota_exceeded_ids,
+                                         qid[type], xa_mk_value(bits),
+                                         GFP_KERNEL);
+                       if (rc == -ENOMEM)
+                               break;
 
-                       CDEBUG(D_QUOTA, "%s: setdq to insert for %s %d (%d)\n",
-                              cli->cl_import->imp_obd->obd_name,
-                              type == USRQUOTA ? "user" : "group",
-                              qid[type], rc);
+                       CDEBUG(D_QUOTA, "%s: setdq to insert for %s %d: rc = %d\n",
+                              cli_name(cli), qtype_name(type), qid[type], rc);
                } else {
                        /* This ID is now off the hook, let's remove it from
-                        * the hash table */
-                       if (oqi == NULL)
+                        * the xarray
+                        */
+                       if (!qtypes)
                                continue;
 
-                       oqi = cfs_hash_del_key(cli->cl_quota_hash[type],
-                                              &qid[type]);
-                       if (oqi)
-                               OBD_SLAB_FREE_PTR(oqi, osc_quota_kmem);
+                       bits = xa_to_value(qtypes);
+                       if (!(bits & BIT(type)))
+                               continue;
+
+                       bits &= ~BIT(type);
+                       if (bits) {
+                               rc = xa_err(xa_store(&cli->cl_quota_exceeded_ids,
+                                                    qid[type],
+                                                    xa_mk_value(bits),
+                                                    GFP_KERNEL));
+                               if (rc < 0)
+                                       GOTO(out_unlock, rc);
+                       } else {
+                               xa_erase(&cli->cl_quota_exceeded_ids, qid[type]);
+                       }
 
-                       CDEBUG(D_QUOTA, "%s: setdq to remove for %s %d (%p)\n",
-                              cli->cl_import->imp_obd->obd_name,
-                              type == USRQUOTA ? "user" : "group",
-                              qid[type], oqi);
+                       CDEBUG(D_QUOTA, "%s: setdq to remove for %s %d\n",
+                              cli_name(cli), qtype_name(type), qid[type]);
                }
        }
 
+out_unlock:
+       mutex_unlock(&cli->cl_quota_mutex);
        RETURN(rc);
 }
 
-/*
- * Hash operations for uid/gid <-> osc_quota_info
- */
-static unsigned
-oqi_hashfn(cfs_hash_t *hs, const void *key, unsigned mask)
-{
-       return cfs_hash_u32_hash(*((__u32*)key), mask);
-}
-
-static int
-oqi_keycmp(const void *key, struct hlist_node *hnode)
-{
-       struct osc_quota_info *oqi;
-       obd_uid uid;
-
-       LASSERT(key != NULL);
-       uid = *((obd_uid*)key);
-       oqi = hlist_entry(hnode, struct osc_quota_info, oqi_hash);
-
-       return uid == oqi->oqi_id;
-}
-
-static void *
-oqi_key(struct hlist_node *hnode)
-{
-       struct osc_quota_info *oqi;
-       oqi = hlist_entry(hnode, struct osc_quota_info, oqi_hash);
-       return &oqi->oqi_id;
-}
-
-static void *
-oqi_object(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct osc_quota_info, oqi_hash);
-}
-
-static void
-oqi_get(cfs_hash_t *hs, struct hlist_node *hnode)
-{
-}
-
-static void
-oqi_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
-{
-}
-
-static void
-oqi_exit(cfs_hash_t *hs, struct hlist_node *hnode)
-{
-       struct osc_quota_info *oqi;
-
-       oqi = hlist_entry(hnode, struct osc_quota_info, oqi_hash);
-
-        OBD_SLAB_FREE_PTR(oqi, osc_quota_kmem);
-}
-
-#define HASH_QUOTA_BKT_BITS 5
-#define HASH_QUOTA_CUR_BITS 5
-#define HASH_QUOTA_MAX_BITS 15
-
-static cfs_hash_ops_t quota_hash_ops = {
-       .hs_hash        = oqi_hashfn,
-       .hs_keycmp      = oqi_keycmp,
-       .hs_key         = oqi_key,
-       .hs_object      = oqi_object,
-       .hs_get         = oqi_get,
-       .hs_put_locked  = oqi_put_locked,
-       .hs_exit        = oqi_exit,
-};
-
 int osc_quota_setup(struct obd_device *obd)
 {
        struct client_obd *cli = &obd->u.cli;
-       int i, type;
-       ENTRY;
-
-       for (type = 0; type < MAXQUOTAS; type++) {
-               cli->cl_quota_hash[type] = cfs_hash_create("QUOTA_HASH",
-                                                          HASH_QUOTA_CUR_BITS,
-                                                          HASH_QUOTA_MAX_BITS,
-                                                          HASH_QUOTA_BKT_BITS,
-                                                          0,
-                                                          CFS_HASH_MIN_THETA,
-                                                          CFS_HASH_MAX_THETA,
-                                                          &quota_hash_ops,
-                                                          CFS_HASH_DEFAULT);
-               if (cli->cl_quota_hash[type] == NULL)
-                       break;
-       }
 
-       if (type == MAXQUOTAS)
-               RETURN(0);
+       mutex_init(&cli->cl_quota_mutex);
 
-       for (i = 0; i < type; i++)
-               cfs_hash_putref(cli->cl_quota_hash[i]);
+       xa_init(&cli->cl_quota_exceeded_ids);
 
-       RETURN(-ENOMEM);
+       return 0;
 }
 
-int osc_quota_cleanup(struct obd_device *obd)
+void osc_quota_cleanup(struct obd_device *obd)
 {
-       struct client_obd     *cli = &obd->u.cli;
-       int type;
-       ENTRY;
+       struct client_obd *cli = &obd->u.cli;
+       unsigned long qid;
+       u8 *qtypes;
 
-       for (type = 0; type < MAXQUOTAS; type++)
-               cfs_hash_putref(cli->cl_quota_hash[type]);
+       xa_for_each(&cli->cl_quota_exceeded_ids, qid, qtypes)
+               xa_erase(&cli->cl_quota_exceeded_ids, qid);
 
-       RETURN(0);
+       xa_destroy(&cli->cl_quota_exceeded_ids);
 }
 
 int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
                  struct obd_quotactl *oqctl)
 {
-        struct ptlrpc_request *req;
-        struct obd_quotactl   *oqc;
-        int                    rc;
-        ENTRY;
+       struct ptlrpc_request   *req;
+       struct obd_quotactl     *oqc;
+       int                      rc;
+
+       ENTRY;
+
+       req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                       &RQF_OST_QUOTACTL, LUSTRE_OST_VERSION,
+                                       OST_QUOTACTL);
+       if (req == NULL)
+               RETURN(-ENOMEM);
 
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                        &RQF_OST_QUOTACTL, LUSTRE_OST_VERSION,
-                                        OST_QUOTACTL);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA)
+               req_capsule_set_size(&req->rq_pill, &RMF_OBD_QUOTA_ITER,
+                                    RCL_SERVER, LQUOTA_ITER_BUFLEN);
+       else
+               req_capsule_set_size(&req->rq_pill, &RMF_OBD_QUOTA_ITER,
+                                    RCL_SERVER, 0);
 
         oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
         *oqc = *oqctl;
@@ -270,60 +236,52 @@ int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
         if (rc)
                 CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
 
-        if (req->rq_repmsg &&
-            (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) {
-                *oqctl = *oqc;
-        } else if (!rc) {
-                CERROR ("Can't unpack obd_quotactl\n");
-                rc = -EPROTO;
-        }
-        ptlrpc_req_finished(req);
+       if (req->rq_repmsg) {
+               struct list_head *lst = (struct list_head *)oqctl->qc_iter_list;
 
-        RETURN(rc);
-}
+               oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
+               if (!oqc)
+                       GOTO(out, rc = -EPROTO);
 
-int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
-                   struct obd_quotactl *oqctl)
-{
-        struct client_obd       *cli = &exp->exp_obd->u.cli;
-        struct ptlrpc_request   *req;
-        struct obd_quotactl     *body;
-        int                      rc;
-        ENTRY;
+               *oqctl = *oqc;
 
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                        &RQF_OST_QUOTACHECK, LUSTRE_OST_VERSION,
-                                        OST_QUOTACHECK);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+               if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) {
+                       void *buffer;
+                       struct lquota_iter *iter;
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
-        *body = *oqctl;
+                       buffer = req_capsule_server_get(&req->rq_pill,
+                                                       &RMF_OBD_QUOTA_ITER);
 
-        ptlrpc_request_set_replen(req);
+                       if (buffer == NULL) {
+                               CDEBUG(D_QUOTA, "%s: no buffer in iter req\n",
+                                      exp->exp_obd->obd_name);
 
-        /* the next poll will find -ENODATA, that means quotacheck is
-         * going on */
-        cli->cl_qchk_stat = -ENODATA;
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                cli->cl_qchk_stat = rc;
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
+                               rc = -EPROTO;
+                               GOTO(out, rc);
+                       }
 
-int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        int rc;
-        ENTRY;
+                       OBD_ALLOC_LARGE(iter,
+                              sizeof(struct lquota_iter) + LQUOTA_ITER_BUFLEN);
+                       if (iter == NULL)
+                               GOTO(out, rc = -ENOMEM);
+
+                       INIT_LIST_HEAD(&iter->li_link);
+                       list_add(&iter->li_link, lst);
 
-        qchk->obd_uuid = cli->cl_target_uuid;
-        memcpy(qchk->obd_type, LUSTRE_OST_NAME, strlen(LUSTRE_OST_NAME));
+                       memcpy(iter->li_buffer, buffer, LQUOTA_ITER_BUFLEN);
+                       iter->li_dt_size = oqctl->qc_iter_dt_buflen;
+                       oqctl->qc_iter_md_buflen = 0;
+                       oqctl->qc_iter_dt_buflen = 0;
+               }
+       } else if (!rc) {
+               CERROR("%s: cannot unpack obd_quotactl: rc = %d\n",
+                      exp->exp_obd->obd_name, rc);
+
+               rc = -EPROTO;
+       }
 
-        rc = cli->cl_qchk_stat;
-        /* the client is not the previous one */
-        if (rc == CL_NOT_QUOTACHECKED)
-                rc = -EINTR;
-        RETURN(rc);
+out:
+       ptlrpc_req_finished(req);
+
+       RETURN(rc);
 }