struct quota_body *repbody;
struct obd_uuid *uuid;
struct lquota_lvb *lvb;
+ struct ldlm_resource *res = (*lockp)->l_resource;
int rc;
ENTRY;
uuid = &(*lockp)->l_export->exp_client_uuid;
switch (it->opc) {
- case IT_QUOTA_DQACQ:
- /* XXX: to be added in a next patch */
- GOTO(out, -EOPNOTSUPP);
+ case IT_QUOTA_DQACQ: {
+ struct lquota_entry *lqe;
+ struct ldlm_lock *lock;
+
+ if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
+ /* acquire on global lock? something is wrong ... */
+ GOTO(out, rc = -EPROTO);
+
+ /* verify global lock isn't stale */
+ if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
+ GOTO(out, rc = -ENOLCK);
+
+ lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
+ if (lock == NULL)
+ GOTO(out, rc = -ENOLCK);
+ LDLM_LOCK_PUT(lock);
+
+ lqe = res->lr_lvb_data;
+ LASSERT(lqe != NULL);
+ lqe_getref(lqe);
+
+ /* acquire quota space */
+ rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
+ reqbody->qb_count, reqbody->qb_usage,
+ repbody);
+ lqe_putref(lqe);
+ if (rc)
+ GOTO(out, rc);
break;
+ }
case IT_QUOTA_CONN:
/* new connection from slave */
+
+ if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
+ /* connection on per-ID lock? something is wrong ... */
+ GOTO(out, rc = -EPROTO);
+
rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
&repbody->qb_slv_fid,
&repbody->qb_slv_ver, uuid);
struct qmt_device *qmt = lu2qmt_dev(ld);
struct lquota_entry *lqe;
struct lquota_lvb *lvb;
+ struct ldlm_lock *lock;
+ struct obd_export *exp;
int rc = 0;
ENTRY;
/* no need to update lvb for global quota locks */
RETURN(0);
+ lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+ if (lvb == NULL) {
+ CERROR("%s: failed to extract lvb from request\n",
+ qmt->qmt_svname);
+ RETURN(-EFAULT);
+ }
+
lqe = res->lr_lvb_data;
LASSERT(lqe != NULL);
+ lqe_getref(lqe);
+
+ LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
+ lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+ if (lvb->lvb_id_rel == 0) {
+ /* nothing to release */
+ if (lvb->lvb_id_may_rel != 0)
+ /* but might still release later ... */
+ lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+ GOTO(out_lqe, rc = 0);
+ }
/* allocate environement */
OBD_ALLOC_PTR(env);
if (env == NULL)
- RETURN(-ENOMEM);
+ GOTO(out_lqe, rc = -ENOMEM);
/* initialize environment */
rc = lu_env_init(env, LCT_MD_THREAD);
- if (rc) {
- OBD_FREE_PTR(env);
- RETURN(rc);
- }
+ if (rc)
+ GOTO(out_env, rc);
qti = qmt_info(env);
- lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
- if (lvb == NULL) {
- CERROR("%s: failed to extract lvb from request\n",
+ /* The request is a glimpse callback which was sent via the
+ * reverse import to the slave. What we care about here is the
+ * export associated with the slave and req->rq_export is
+ * definitely not what we are looking for (it is actually set to
+ * NULL here).
+ * Therefore we extract the lock from the request argument
+ * and use lock->l_export. */
+ lock = ldlm_request_lock(req);
+ if (IS_ERR(lock)) {
+ CERROR("%s: failed to get lock from request!\n",
qmt->qmt_svname);
- GOTO(out, rc);
+ GOTO(out_env_init, rc = PTR_ERR(lock));
}
- /* XXX: Space release handling to be added in a next patch */
+ exp = class_export_get(lock->l_export);
+ if (exp == NULL) {
+ CERROR("%s: failed to get export from lock!\n",
+ qmt->qmt_svname);
+ GOTO(out_env_init, rc = -EFAULT);
+ }
+ /* release quota space */
+ rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
+ QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
+ if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+ LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
+ LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
+ lvb->lvb_id_rel, rc);
+ class_export_put(exp);
+ if (rc)
+ GOTO(out_env_init, rc);
EXIT;
-out:
+out_env_init:
lu_env_fini(env);
+out_env:
OBD_FREE_PTR(env);
+out_lqe:
+ lqe_putref(lqe);
return rc;
}
* opportunity to enqueue quota lock yet. */
LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
"lock "DFID, PFID(&qti->qti_fid));
+ lqe_write_lock(lqe);
+ if (lqe->lqe_revoke_time == 0 &&
+ lqe->lqe_qunit == pool->qpi_least_qunit)
+ lqe->lqe_revoke_time = cfs_time_current_64();
+ lqe_write_unlock(lqe);
RETURN_EXIT;
}
- lqe_read_lock(lqe);
+ lqe_write_lock(lqe);
/* The purpose of glimpse callback on per-ID lock is twofold:
* - notify slaves of new qunit value and hope they will release some
* spare quota space in return
else
qti->qti_gl_desc.lquota_desc.gl_flags = 0;
qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
- lqe_read_unlock(lqe);
+
+ if (lqe->lqe_revoke_time == 0 &&
+ qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+ /* reset lqe_may_rel, it will be updated on glimpse callback
+ * replies if needed */
+ lqe->lqe_may_rel = 0;
/* The rebalance thread is the only thread which can issue glimpses */
LASSERT(!lqe->lqe_gl);
lqe->lqe_gl = true;
+ lqe_write_unlock(lqe);
/* issue glimpse callback to slaves */
rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
+ lqe_write_lock(lqe);
+ if (lqe->lqe_revoke_time == 0 &&
+ qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
+ lqe->lqe_qunit == pool->qpi_least_qunit) {
+ lqe->lqe_revoke_time = cfs_time_current_64();
+ qmt_adjust_edquot(lqe, cfs_time_current_sec());
+ }
LASSERT(lqe->lqe_gl);
lqe->lqe_gl = false;
+ lqe_write_unlock(lqe);
ldlm_resource_putref(res);
EXIT;