* GPL HEADER END
*/
/*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
* Author: Niu Yawei <yawei.niu@intel.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
#define DEBUG_SUBSYSTEM S_LQUOTA
#include <lustre_dlm.h>
#include "qsd_internal.h"
+typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc, void *data,
+ int flag);
+static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
+
+typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
+static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
+
+struct ldlm_enqueue_info qsd_glb_einfo = {
+ .ei_type = LDLM_PLAIN,
+ .ei_mode = LCK_CR,
+ .ei_cb_bl = qsd_glb_blocking_ast,
+ .ei_cb_cp = ldlm_completion_ast,
+ .ei_cb_gl = qsd_glb_glimpse_ast,
+};
+
+struct ldlm_enqueue_info qsd_id_einfo = {
+ .ei_type = LDLM_PLAIN,
+ .ei_mode = LCK_CR,
+ .ei_cb_bl = qsd_id_blocking_ast,
+ .ei_cb_cp = ldlm_completion_ast,
+ .ei_cb_gl = qsd_id_glimpse_ast,
+};
+
/*
* Return qsd_qtype_info structure associated with a global lock
*
LDLM_DEBUG(lock, "blocking AST on global quota lock");
ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
+ rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
break;
}
case LDLM_CB_CANCELING: {
/* kick off reintegration thread if not running already, if
* it's just local cancel (for stack clean up or eviction),
* don't re-trigger the reintegration. */
- if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
+ if (!ldlm_is_local_only(lock))
qsd_start_reint_thread(qqi);
lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
break;
}
default:
- LASSERTF(0, "invalid flags for blocking ast %d", flag);
+ LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
}
RETURN(rc);
return rc;
}
-struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
- LCK_CR,
- qsd_glb_blocking_ast,
- ldlm_completion_ast,
- qsd_glb_glimpse_ast,
- NULL, NULL };
-/*
+/**
* Blocking callback handler for per-ID lock
*
* \param lock - is the lock for which ast occurred.
LDLM_DEBUG(lock, "blocking AST on ID quota lock");
ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
+ rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
break;
}
case LDLM_CB_CANCELING: {
struct lu_env *env;
struct lquota_entry *lqe;
- bool rel = false;
- LDLM_DEBUG(lock, "canceling global quota lock");
+ LDLM_DEBUG(lock, "canceling ID quota lock");
lqe = qsd_id_ast_data_get(lock, true);
if (lqe == NULL)
break;
LQUOTA_DEBUG(lqe, "losing ID lock");
- /* just local cancel (for stack clean up or eviction), don't
- * release quota space in this case */
- if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
- lqe_putref(lqe);
- break;
- }
-
- /* allocate environment */
- OBD_ALLOC_PTR(env);
- if (env == NULL) {
- lqe_putref(lqe);
- rc = -ENOMEM;
- break;
- }
-
- /* initialize environment */
- rc = lu_env_init(env, LCT_DT_THREAD);
- if (rc) {
- OBD_FREE_PTR(env);
- lqe_putref(lqe);
- break;
- }
-
ldlm_lock2handle(lock, &lockh);
lqe_write_lock(lqe);
if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
/* Clear lqe_lockh & reset qunit to 0 */
qsd_set_qunit(lqe, 0);
memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
- lqe->lqe_edquot = false;
- rel = true;
+ qsd_set_edquot(lqe, false);
}
lqe_write_unlock(lqe);
- /* If there is qqacq inflight, the release will be skipped
+ /* If there is dqacq inflight, the release will be skipped
* at this time, and triggered on dqacq completion later,
* which means there could be a short window that slave is
* holding spare grant wihtout per-ID lock. */
- if (rel)
+
+ /* don't release quota space for local cancel (stack clean
+ * up or eviction) */
+ if (!ldlm_is_local_only(lock)) {
+ /* allocate environment */
+ OBD_ALLOC_PTR(env);
+ if (env == NULL) {
+ lqe_putref(lqe);
+ rc = -ENOMEM;
+ break;
+ }
+
+ /* initialize environment */
+ rc = lu_env_init(env, LCT_DT_THREAD);
+ if (rc) {
+ OBD_FREE_PTR(env);
+ lqe_putref(lqe);
+ break;
+ }
+
rc = qsd_adjust(env, lqe);
+ lu_env_fini(env);
+ OBD_FREE_PTR(env);
+ }
+
/* release lqe reference grabbed by qsd_id_ast_data_get() */
lqe_putref(lqe);
- lu_env_fini(env);
- OBD_FREE_PTR(env);
break;
}
default:
- LASSERTF(0, "invalid flags for blocking ast %d", flag);
+ LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
}
RETURN(rc);
}
}
- lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+ qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
lqe_write_unlock(lqe);
if (wakeup)
- cfs_waitq_broadcast(&lqe->lqe_waiters);
+ wake_up_all(&lqe->lqe_waiters);
lqe_putref(lqe);
out:
req->rq_status = rc;
RETURN(rc);
}
-struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
- LCK_CR,
- qsd_id_blocking_ast,
- ldlm_completion_ast,
- qsd_id_glimpse_ast,
- NULL, NULL };
-
-/*
+/**
* Check whether a slave already own a ldlm lock for the quota identifier \qid.
*
* \param lockh - is the local lock handle from lquota entry.
if (lustre_handle_is_used(&qti->qti_lockh)) {
memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
qsd_set_qunit(lqe, 0);
- lqe->lqe_edquot = false;
+ qsd_set_edquot(lqe, false);
}
lqe_write_unlock(lqe);