/*
* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
*
- * Copyright (c) 2012, Whamcloud, Inc.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*
* Code originally extracted from quota directory
*/
-#ifndef __KERNEL__
-# include <liblustre.h>
-#endif
-#include <obd_ost.h>
-#include "osc_internal.h"
-
-struct osc_quota_info {
- cfs_list_t oqi_hash; /* hash list */
- struct client_obd *oqi_cli; /* osc obd */
- unsigned int oqi_id; /* uid/gid of a file */
- short oqi_type; /* quota type */
-};
-
-cfs_spinlock_t qinfo_list_lock = CFS_SPIN_LOCK_UNLOCKED;
-
-static cfs_list_t qinfo_hash[NR_DQHASH];
-/* SLAB cache for client quota context */
-cfs_mem_cache_t *qinfo_cachep = NULL;
-
-static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
- __attribute__((__const__));
-
-static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
-{
- unsigned long tmp = ((unsigned long)cli>>6) ^ id;
- tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
- return tmp;
-}
-
-/* caller must hold qinfo_list_lock */
-static inline void insert_qinfo_hash(struct osc_quota_info *oqi)
-{
- cfs_list_t *head = qinfo_hash +
- hashfn(oqi->oqi_cli, oqi->oqi_id, oqi->oqi_type);
-
- LASSERT_SPIN_LOCKED(&qinfo_list_lock);
- cfs_list_add(&oqi->oqi_hash, head);
-}
-
-/* caller must hold qinfo_list_lock */
-static inline void remove_qinfo_hash(struct osc_quota_info *oqi)
-{
- LASSERT_SPIN_LOCKED(&qinfo_list_lock);
- cfs_list_del_init(&oqi->oqi_hash);
-}
-
-/* caller must hold qinfo_list_lock */
-static inline struct osc_quota_info *find_qinfo(struct client_obd *cli,
- unsigned int id, int type)
-{
- struct osc_quota_info *oqi;
- unsigned int hashent = hashfn(cli, id, type);
- ENTRY;
+#include <obd_class.h>
+#include <lustre_osc.h>
+#include <lustre_quota.h>
- LASSERT_SPIN_LOCKED(&qinfo_list_lock);
- cfs_list_for_each_entry(oqi, &qinfo_hash[hashent], oqi_hash) {
- if (oqi->oqi_cli == cli &&
- oqi->oqi_id == id && oqi->oqi_type == type)
- RETURN(oqi);
- }
- RETURN(NULL);
-}
+#include "osc_internal.h"
-static struct osc_quota_info *alloc_qinfo(struct client_obd *cli,
- unsigned int id, int type)
+int osc_quota_chkdq(struct client_obd *cli, const unsigned int qid[])
{
- struct osc_quota_info *oqi;
- ENTRY;
-
- OBD_SLAB_ALLOC(oqi, qinfo_cachep, CFS_ALLOC_IO, sizeof(*oqi));
- if(!oqi)
- RETURN(NULL);
-
- CFS_INIT_LIST_HEAD(&oqi->oqi_hash);
- oqi->oqi_cli = cli;
- oqi->oqi_id = id;
- oqi->oqi_type = type;
-
- RETURN(oqi);
+ int type;
+
+ ENTRY;
+ for (type = 0; type < LL_MAXQUOTAS; type++) {
+ u8 *qtype;
+
+ qtype = xa_load(&cli->cl_quota_exceeded_ids, qid[type]);
+ if (qtype && (xa_to_value(qtype) & BIT(type))) {
+ /* the slot is busy, the user is about to run out of
+ * quota space on this OST
+ */
+ CDEBUG(D_QUOTA, "chkdq found noquota for %s %d\n",
+ qtype_name(type), qid[type]);
+ RETURN(-EDQUOT);
+ }
+ }
+
+ RETURN(0);
}
-static void free_qinfo(struct osc_quota_info *oqi)
+static inline u32 md_quota_flag(int qtype)
{
- OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi));
+ switch (qtype) {
+ case USRQUOTA:
+ return OBD_MD_FLUSRQUOTA;
+ case GRPQUOTA:
+ return OBD_MD_FLGRPQUOTA;
+ case PRJQUOTA:
+ return OBD_MD_FLPRJQUOTA;
+ default:
+ return 0;
+ }
}
-int osc_quota_chkdq(struct client_obd *cli, const unsigned int qid[])
+static inline u32 fl_quota_flag(int qtype)
{
- unsigned int id;
- int cnt, rc = QUOTA_OK;
- ENTRY;
-
- cfs_spin_lock(&qinfo_list_lock);
- for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
- struct osc_quota_info *oqi = NULL;
-
- id = (cnt == USRQUOTA) ? qid[USRQUOTA] : qid[GRPQUOTA];
- oqi = find_qinfo(cli, id, cnt);
- if (oqi) {
- rc = NO_QUOTA;
- break;
- }
- }
- cfs_spin_unlock(&qinfo_list_lock);
-
- if (rc == NO_QUOTA)
- CDEBUG(D_QUOTA, "chkdq found noquota for %s %d\n",
- cnt == USRQUOTA ? "user" : "group", id);
- RETURN(rc);
+ switch (qtype) {
+ case USRQUOTA:
+ return OBD_FL_NO_USRQUOTA;
+ case GRPQUOTA:
+ return OBD_FL_NO_GRPQUOTA;
+ case PRJQUOTA:
+ return OBD_FL_NO_PRJQUOTA;
+ default:
+ return 0;
+ }
}
-int osc_quota_setdq(struct client_obd *cli, const unsigned int qid[],
- obd_flag valid, obd_flag flags)
+int osc_quota_setdq(struct client_obd *cli, u64 xid, const unsigned int qid[],
+ u64 valid, u32 flags)
{
- unsigned int id;
- obd_flag noquota;
- int cnt, rc = 0;
- ENTRY;
-
- for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
- struct osc_quota_info *oqi = NULL, *old;
-
- if (!(valid & ((cnt == USRQUOTA) ?
- OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA)))
- continue;
-
- id = (cnt == USRQUOTA) ? qid[USRQUOTA] : qid[GRPQUOTA];
- noquota = (cnt == USRQUOTA) ?
- (flags & OBD_FL_NO_USRQUOTA) : (flags & OBD_FL_NO_GRPQUOTA);
-
- if (noquota) {
- oqi = alloc_qinfo(cli, id, cnt);
- if (!oqi) {
- rc = -ENOMEM;
- CDEBUG(D_QUOTA, "setdq for %s %d failed, "
- "(rc = %d)\n",
- cnt == USRQUOTA ? "user" : "group",
- id, rc);
- break;
- }
- }
-
- cfs_spin_lock(&qinfo_list_lock);
- old = find_qinfo(cli, id, cnt);
- if (old && !noquota)
- remove_qinfo_hash(old);
- else if (!old && noquota)
- insert_qinfo_hash(oqi);
- cfs_spin_unlock(&qinfo_list_lock);
-
- if (old && !noquota)
- CDEBUG(D_QUOTA, "setdq to remove for %s %d\n",
- cnt == USRQUOTA ? "user" : "group", id);
- else if (!old && noquota)
- CDEBUG(D_QUOTA, "setdq to insert for %s %d\n",
- cnt == USRQUOTA ? "user" : "group", id);
-
- if (old) {
- if (noquota)
- free_qinfo(oqi);
- else
- free_qinfo(old);
- }
- }
- RETURN(rc);
-}
+ int type;
+ int rc = 0;
-int osc_quota_cleanup(struct obd_device *obd)
-{
- struct client_obd *cli = &obd->u.cli;
- struct osc_quota_info *oqi, *n;
- int i;
ENTRY;
-
- cfs_spin_lock(&qinfo_list_lock);
- for (i = 0; i < NR_DQHASH; i++) {
- cfs_list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
- if (oqi->oqi_cli != cli)
- continue;
- remove_qinfo_hash(oqi);
- free_qinfo(oqi);
- }
- }
- cfs_spin_unlock(&qinfo_list_lock);
-
- RETURN(0);
+ if ((valid & (OBD_MD_FLALLQUOTA)) == 0)
+ RETURN(0);
+
+ mutex_lock(&cli->cl_quota_mutex);
+ cli->cl_root_squash = !!(flags & OBD_FL_ROOT_SQUASH);
+ cli->cl_root_prjquota = !!(flags & OBD_FL_ROOT_PRJQUOTA);
+ /* still mark the quots is running out for the old request, because it
+ * could be processed after the new request at OST, the side effect is
+ * the following request will be processed synchronously, but it will
+ * not break the quota enforcement. */
+ if (cli->cl_quota_last_xid > xid && !(flags & OBD_FL_NO_QUOTA_ALL))
+ GOTO(out_unlock, rc);
+
+ if (cli->cl_quota_last_xid < xid)
+ cli->cl_quota_last_xid = xid;
+
+ for (type = 0; type < LL_MAXQUOTAS; type++) {
+ unsigned long bits = 0;
+ u8 *qtypes;
+
+ if ((valid & md_quota_flag(type)) == 0)
+ continue;
+
+ /* lookup the quota IDs in the ID xarray */
+ qtypes = xa_load(&cli->cl_quota_exceeded_ids, qid[type]);
+ if ((flags & fl_quota_flag(type)) != 0) {
+ /* This ID is getting close to its quota limit, let's
+ * switch to sync I/O
+ */
+ if (qtypes) { /* ID already cached */
+ bits = xa_to_value(qtypes);
+ /* test if ID type already set */
+ if (!(bits & BIT(type))) {
+ bits |= BIT(type);
+ rc = xa_err(xa_store(&cli->cl_quota_exceeded_ids,
+ qid[type],
+ xa_mk_value(bits),
+ GFP_KERNEL));
+ if (rc < 0)
+ GOTO(out_unlock, rc);
+ }
+ continue;
+ }
+
+ /* Only insert if we see the this ID for the
+ * very first time.
+ */
+ bits |= BIT(type);
+ rc = ll_xa_insert(&cli->cl_quota_exceeded_ids,
+ qid[type], xa_mk_value(bits),
+ GFP_KERNEL);
+ if (rc == -ENOMEM)
+ break;
+
+ CDEBUG(D_QUOTA, "%s: setdq to insert for %s %d: rc = %d\n",
+ cli_name(cli), qtype_name(type), qid[type], rc);
+ } else {
+ /* This ID is now off the hook, let's remove it from
+ * the xarray
+ */
+ if (!qtypes)
+ continue;
+
+ bits = xa_to_value(qtypes);
+ if (!(bits & BIT(type)))
+ continue;
+
+ bits &= ~BIT(type);
+ if (bits) {
+ rc = xa_err(xa_store(&cli->cl_quota_exceeded_ids,
+ qid[type],
+ xa_mk_value(bits),
+ GFP_KERNEL));
+ if (rc < 0)
+ GOTO(out_unlock, rc);
+ } else {
+ xa_erase(&cli->cl_quota_exceeded_ids, qid[type]);
+ }
+
+ CDEBUG(D_QUOTA, "%s: setdq to remove for %s %d\n",
+ cli_name(cli), qtype_name(type), qid[type]);
+ }
+ }
+
+out_unlock:
+ mutex_unlock(&cli->cl_quota_mutex);
+ RETURN(rc);
}
-int osc_quota_init()
+int osc_quota_setup(struct obd_device *obd)
{
- int i;
- ENTRY;
+ struct client_obd *cli = &obd->u.cli;
- LASSERT(qinfo_cachep == NULL);
- qinfo_cachep = cfs_mem_cache_create("osc_quota_info",
- sizeof(struct osc_quota_info),
- 0, 0);
- if (!qinfo_cachep)
- RETURN(-ENOMEM);
+ mutex_init(&cli->cl_quota_mutex);
- for (i = 0; i < NR_DQHASH; i++)
- CFS_INIT_LIST_HEAD(qinfo_hash + i);
+ xa_init(&cli->cl_quota_exceeded_ids);
- RETURN(0);
+ return 0;
}
-int osc_quota_exit()
+void osc_quota_cleanup(struct obd_device *obd)
{
- struct osc_quota_info *oqi, *n;
- int i, rc;
- ENTRY;
+ struct client_obd *cli = &obd->u.cli;
+ unsigned long qid;
+ u8 *qtypes;
- cfs_spin_lock(&qinfo_list_lock);
- for (i = 0; i < NR_DQHASH; i++) {
- cfs_list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
- remove_qinfo_hash(oqi);
- free_qinfo(oqi);
- }
- }
- cfs_spin_unlock(&qinfo_list_lock);
+ xa_for_each(&cli->cl_quota_exceeded_ids, qid, qtypes)
+ xa_erase(&cli->cl_quota_exceeded_ids, qid);
- rc = cfs_mem_cache_destroy(qinfo_cachep);
- LASSERTF(rc == 0, "couldn't destory qinfo_cachep slab\n");
- qinfo_cachep = NULL;
-
- RETURN(0);
+ xa_destroy(&cli->cl_quota_exceeded_ids);
}
int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
struct obd_quotactl *oqctl)
{
- struct ptlrpc_request *req;
- struct obd_quotactl *oqc;
- int rc;
- ENTRY;
+ struct ptlrpc_request *req;
+ struct obd_quotactl *oqc;
+ int rc;
+
+ ENTRY;
+
+ req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+ &RQF_OST_QUOTACTL, LUSTRE_OST_VERSION,
+ OST_QUOTACTL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
- req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
- &RQF_OST_QUOTACTL, LUSTRE_OST_VERSION,
- OST_QUOTACTL);
- if (req == NULL)
- RETURN(-ENOMEM);
+ if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA)
+ req_capsule_set_size(&req->rq_pill, &RMF_OBD_QUOTA_ITER,
+ RCL_SERVER, LQUOTA_ITER_BUFLEN);
+ else
+ req_capsule_set_size(&req->rq_pill, &RMF_OBD_QUOTA_ITER,
+ RCL_SERVER, 0);
oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
*oqc = *oqctl;
if (rc)
CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
- if (req->rq_repmsg &&
- (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) {
- *oqctl = *oqc;
- } else if (!rc) {
- CERROR ("Can't unpack obd_quotactl\n");
- rc = -EPROTO;
- }
- ptlrpc_req_finished(req);
-
- RETURN(rc);
-}
-
-int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
- struct obd_quotactl *oqctl)
-{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- struct ptlrpc_request *req;
- struct obd_quotactl *body;
- int rc;
- ENTRY;
+ if (req->rq_repmsg) {
+ struct list_head *lst = (struct list_head *)oqctl->qc_iter_list;
- req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
- &RQF_OST_QUOTACHECK, LUSTRE_OST_VERSION,
- OST_QUOTACHECK);
- if (req == NULL)
- RETURN(-ENOMEM);
+ oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
+ if (!oqc)
+ GOTO(out, rc = -EPROTO);
- body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
- *body = *oqctl;
-
- ptlrpc_request_set_replen(req);
-
- /* the next poll will find -ENODATA, that means quotacheck is
- * going on */
- cli->cl_qchk_stat = -ENODATA;
- rc = ptlrpc_queue_wait(req);
- if (rc)
- cli->cl_qchk_stat = rc;
- ptlrpc_req_finished(req);
- RETURN(rc);
-}
+ *oqctl = *oqc;
-int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk)
-{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- int rc;
- ENTRY;
+ if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) {
+ void *buffer;
+ struct lquota_iter *iter;
- qchk->obd_uuid = cli->cl_target_uuid;
- memcpy(qchk->obd_type, LUSTRE_OST_NAME, strlen(LUSTRE_OST_NAME));
+ buffer = req_capsule_server_get(&req->rq_pill,
+ &RMF_OBD_QUOTA_ITER);
- rc = cli->cl_qchk_stat;
- /* the client is not the previous one */
- if (rc == CL_NOT_QUOTACHECKED)
- rc = -EINTR;
- RETURN(rc);
-}
+ if (buffer == NULL) {
+ CDEBUG(D_QUOTA, "%s: no buffer in iter req\n",
+ exp->exp_obd->obd_name);
-int osc_quota_adjust_qunit(struct obd_export *exp,
- struct quota_adjust_qunit *oqaq,
- struct lustre_quota_ctxt *qctxt,
- struct ptlrpc_request_set *rqset)
-{
- struct ptlrpc_request *req;
- struct quota_adjust_qunit *oqa;
- int rc = 0;
- ENTRY;
+ rc = -EPROTO;
+ GOTO(out, rc);
+ }
- /* client don't support this kind of operation, abort it */
- if (!(exp->exp_connect_flags & OBD_CONNECT_CHANGE_QS)) {
- CDEBUG(D_QUOTA, "osc: %s don't support change qunit size\n",
- exp->exp_obd->obd_name);
- RETURN(rc);
- }
- if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME))
- RETURN(-EINVAL);
+ OBD_ALLOC_LARGE(iter,
+ sizeof(struct lquota_iter) + LQUOTA_ITER_BUFLEN);
+ if (iter == NULL)
+ GOTO(out, rc = -ENOMEM);
- LASSERT(rqset);
+ INIT_LIST_HEAD(&iter->li_link);
+ list_add(&iter->li_link, lst);
- req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
- &RQF_OST_QUOTA_ADJUST_QUNIT,
- LUSTRE_OST_VERSION,
- OST_QUOTA_ADJUST_QUNIT);
- if (req == NULL)
- RETURN(-ENOMEM);
+ memcpy(iter->li_buffer, buffer, LQUOTA_ITER_BUFLEN);
+ iter->li_dt_size = oqctl->qc_iter_dt_buflen;
+ oqctl->qc_iter_md_buflen = 0;
+ oqctl->qc_iter_dt_buflen = 0;
+ }
+ } else if (!rc) {
+ CERROR("%s: cannot unpack obd_quotactl: rc = %d\n",
+ exp->exp_obd->obd_name, rc);
- oqa = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
- *oqa = *oqaq;
+ rc = -EPROTO;
+ }
- ptlrpc_request_set_replen(req);
+out:
+ ptlrpc_req_finished(req);
- ptlrpc_set_add_req(rqset, req);
- RETURN(rc);
+ RETURN(rc);
}