* GPL HEADER END
*/
/*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
struct quota_body *repbody;
struct obd_uuid *uuid;
struct lquota_lvb *lvb;
- int rc;
+ struct ldlm_resource *res = (*lockp)->l_resource;
+ int rc, lvb_len;
ENTRY;
req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ ldlm_lvbo_size(*lockp));
/* extract quota body and intent opc */
it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
uuid = &(*lockp)->l_export->exp_client_uuid;
switch (it->opc) {
- case IT_QUOTA_DQACQ:
- /* XXX: to be added in a next patch */
- GOTO(out, -EOPNOTSUPP);
+ case IT_QUOTA_DQACQ: {
+ struct lquota_entry *lqe;
+ struct ldlm_lock *lock;
+
+ if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
+ /* acquire on global lock? something is wrong ... */
+ GOTO(out, rc = -EPROTO);
+
+ /* verify global lock isn't stale */
+ if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
+ GOTO(out, rc = -ENOLCK);
+
+ lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
+ if (lock == NULL)
+ GOTO(out, rc = -ENOLCK);
+ LDLM_LOCK_PUT(lock);
+
+ lqe = res->lr_lvb_data;
+ LASSERT(lqe != NULL);
+ lqe_getref(lqe);
+
+ /* acquire quota space */
+ rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
+ reqbody->qb_count, reqbody->qb_usage,
+ repbody);
+ lqe_putref(lqe);
+ if (rc)
+ GOTO(out, rc);
break;
+ }
case IT_QUOTA_CONN:
/* new connection from slave */
+
+ if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
+ /* connection on per-ID lock? something is wrong ... */
+ GOTO(out, rc = -EPROTO);
+
rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
&repbody->qb_slv_fid,
&repbody->qb_slv_ver, uuid);
}
/* on success, pack lvb in reply */
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
- ldlm_lvbo_size(*lockp));
lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
- ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp));
+ lvb_len = ldlm_lvbo_size(*lockp);
+ lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
EXIT;
out:
return rc;
struct qmt_device *qmt = lu2qmt_dev(ld);
struct lquota_entry *lqe;
struct lquota_lvb *lvb;
+ struct ldlm_lock *lock;
+ struct obd_export *exp;
int rc = 0;
ENTRY;
/* no need to update lvb for global quota locks */
RETURN(0);
+ lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
+ lustre_swab_lquota_lvb);
+ if (lvb == NULL) {
+ CERROR("%s: failed to extract lvb from request\n",
+ qmt->qmt_svname);
+ RETURN(-EFAULT);
+ }
+
lqe = res->lr_lvb_data;
LASSERT(lqe != NULL);
+ lqe_getref(lqe);
+
+ LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
+ lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+ if (lvb->lvb_id_rel == 0) {
+ /* nothing to release */
+ if (lvb->lvb_id_may_rel != 0)
+ /* but might still release later ... */
+ lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+ GOTO(out_lqe, rc = 0);
+ }
/* allocate environement */
OBD_ALLOC_PTR(env);
if (env == NULL)
- RETURN(-ENOMEM);
+ GOTO(out_lqe, rc = -ENOMEM);
/* initialize environment */
rc = lu_env_init(env, LCT_MD_THREAD);
- if (rc) {
- OBD_FREE_PTR(env);
- RETURN(rc);
- }
+ if (rc)
+ GOTO(out_env, rc);
qti = qmt_info(env);
- lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
- if (lvb == NULL) {
- CERROR("%s: failed to extract lvb from request\n",
+ /* The request is a glimpse callback which was sent via the
+ * reverse import to the slave. What we care about here is the
+ * export associated with the slave and req->rq_export is
+ * definitely not what we are looking for (it is actually set to
+ * NULL here).
+ * Therefore we extract the lock from the request argument
+ * and use lock->l_export. */
+ lock = ldlm_request_lock(req);
+ if (IS_ERR(lock)) {
+ CERROR("%s: failed to get lock from request!\n",
qmt->qmt_svname);
- GOTO(out, rc);
+ GOTO(out_env_init, rc = PTR_ERR(lock));
}
- /* XXX: Space release handling to be added in a next patch */
+ exp = class_export_get(lock->l_export);
+ if (exp == NULL) {
+ CERROR("%s: failed to get export from lock!\n",
+ qmt->qmt_svname);
+ GOTO(out_env_init, rc = -EFAULT);
+ }
+ /* release quota space */
+ rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
+ QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
+ if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+ LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
+ LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
+ lvb->lvb_id_rel, rc);
+ class_export_put(exp);
+ if (rc)
+ GOTO(out_env_init, rc);
EXIT;
-out:
+out_env_init:
lu_env_fini(env);
+out_env:
OBD_FREE_PTR(env);
+out_lqe:
+ lqe_putref(lqe);
return rc;
}
qti->qti_gl_desc.lquota_desc.gl_flags = 0;
qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+ qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
qti->qti_gl_desc.lquota_desc.gl_ver = ver;
/* look up ldlm resource associated with global index */
* opportunity to enqueue quota lock yet. */
LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
"lock "DFID, PFID(&qti->qti_fid));
+ lqe_write_lock(lqe);
+ if (lqe->lqe_revoke_time == 0 &&
+ lqe->lqe_qunit == pool->qpi_least_qunit)
+ lqe->lqe_revoke_time = cfs_time_current_64();
+ lqe_write_unlock(lqe);
RETURN_EXIT;
}
- lqe_read_lock(lqe);
+ lqe_write_lock(lqe);
/* The purpose of glimpse callback on per-ID lock is twofold:
* - notify slaves of new qunit value and hope they will release some
* spare quota space in return
else
qti->qti_gl_desc.lquota_desc.gl_flags = 0;
qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
- lqe_read_unlock(lqe);
+
+ if (lqe->lqe_revoke_time == 0 &&
+ qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+ /* reset lqe_may_rel, it will be updated on glimpse callback
+ * replies if needed */
+ lqe->lqe_may_rel = 0;
/* The rebalance thread is the only thread which can issue glimpses */
LASSERT(!lqe->lqe_gl);
lqe->lqe_gl = true;
+ lqe_write_unlock(lqe);
/* issue glimpse callback to slaves */
rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
+ lqe_write_lock(lqe);
+ if (lqe->lqe_revoke_time == 0 &&
+ qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
+ lqe->lqe_qunit == pool->qpi_least_qunit) {
+ lqe->lqe_revoke_time = cfs_time_current_64();
+ qmt_adjust_edquot(lqe, cfs_time_current_sec());
+ }
LASSERT(lqe->lqe_gl);
lqe->lqe_gl = false;
+ lqe_write_unlock(lqe);
ldlm_resource_putref(res);
EXIT;
ENTRY;
lqe_getref(lqe);
- cfs_spin_lock(&qmt->qmt_reba_lock);
+ spin_lock(&qmt->qmt_reba_lock);
if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
added = true;
}
- cfs_spin_unlock(&qmt->qmt_reba_lock);
+ spin_unlock(&qmt->qmt_reba_lock);
if (added)
cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
!cfs_list_empty(&qmt->qmt_reba_list) ||
!thread_is_running(thread), &lwi);
- cfs_spin_lock(&qmt->qmt_reba_lock);
+ spin_lock(&qmt->qmt_reba_lock);
cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
lqe_link) {
cfs_list_del_init(&lqe->lqe_link);
- cfs_spin_unlock(&qmt->qmt_reba_lock);
+ spin_unlock(&qmt->qmt_reba_lock);
if (thread_is_running(thread))
qmt_id_lock_glimpse(env, qmt, lqe, NULL);
lqe_putref(lqe);
- cfs_spin_lock(&qmt->qmt_reba_lock);
+ spin_lock(&qmt->qmt_reba_lock);
}
- cfs_spin_unlock(&qmt->qmt_reba_lock);
+ spin_unlock(&qmt->qmt_reba_lock);
if (!thread_is_running(thread))
break;