LDLM_MATCH_AST = BIT(1),
LDLM_MATCH_AST_ANY = BIT(2),
LDLM_MATCH_RIGHT = BIT(3),
+ LDLM_MATCH_GROUP = BIT(4),
};
/**
return 0;
}
ldlm_set_destroyed(lock);
+ wake_up(&lock->l_waitq);
if (lock->l_export && lock->l_export->exp_lock_hash) {
/* NB: it's safe to call cfs_hash_del() even lock isn't
* whose parents already hold a lock so forward progress
* can still happen. */
if (ldlm_is_cbpending(lock) &&
- !(data->lmd_flags & LDLM_FL_CBPENDING))
+ !(data->lmd_flags & LDLM_FL_CBPENDING) &&
+ !(data->lmd_match & LDLM_MATCH_GROUP))
return false;
- if (!(data->lmd_match & LDLM_MATCH_UNREF) && ldlm_is_cbpending(lock) &&
+ if (!(data->lmd_match & (LDLM_MATCH_UNREF | LDLM_MATCH_GROUP)) &&
+ ldlm_is_cbpending(lock) &&
lock->l_readers == 0 && lock->l_writers == 0)
return false;
return false;
matched:
- if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+ /**
+ * In case the lock is a CBPENDING grouplock, just pin it and return,
+ * we need to wait until it gets to DESTROYED.
+ */
+ if ((data->lmd_flags & LDLM_FL_TEST_LOCK) ||
+ (ldlm_is_cbpending(lock) && (data->lmd_match & LDLM_MATCH_GROUP))) {
LDLM_LOCK_GET(lock);
ldlm_lock_touch_in_lru(lock);
} else {
};
struct ldlm_resource *res;
struct ldlm_lock *lock;
+ struct ldlm_lock *group_lock;
int matched;
ENTRY;
RETURN(0);
}
+repeat:
+ group_lock = NULL;
LDLM_RESOURCE_ADDREF(res);
lock_res(res);
if (res->lr_type == LDLM_EXTENT)
if (!lock && !(flags & LDLM_FL_BLOCK_GRANTED))
lock = search_queue(&res->lr_waiting, &data);
matched = lock ? mode : 0;
+
+ if (lock && ldlm_is_cbpending(lock) &&
+ (data.lmd_match & LDLM_MATCH_GROUP))
+ group_lock = lock;
unlock_res(res);
LDLM_RESOURCE_DELREF(res);
+
+ if (group_lock) {
+ l_wait_event_abortable(group_lock->l_waitq,
+ ldlm_is_destroyed(lock));
+ LDLM_LOCK_RELEASE(lock);
+ goto repeat;
+ }
ldlm_resource_putref(res);
if (lock) {
lock->l_conn_export = exp;
lock->l_export = NULL;
lock->l_blocking_ast = einfo->ei_cb_bl;
- lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL |
- LDLM_FL_ATOMIC_CB));
+ lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
lock->l_activity = ktime_get_real_seconds();
/* lock not sent to server yet */
struct ldlm_intent *lit;
enum ldlm_mode mode;
bool glimpse = *flags & LDLM_FL_HAS_INTENT;
- __u64 match_flags = *flags;
+ __u64 search_flags = *flags;
+ __u64 match_flags = 0;
LIST_HEAD(cancels);
int rc, count;
int lvb_size;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
- match_flags |= LDLM_FL_LVB_READY;
+ search_flags |= LDLM_FL_LVB_READY;
if (glimpse)
- match_flags |= LDLM_FL_BLOCK_GRANTED;
- mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
- einfo->ei_type, policy, mode, &lockh);
+ search_flags |= LDLM_FL_BLOCK_GRANTED;
+ if (mode == LCK_GROUP)
+ match_flags = LDLM_MATCH_GROUP;
+ mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+ res_id, einfo->ei_type, policy, mode,
+ &lockh, match_flags);
if (mode) {
struct ldlm_lock *matched;
ols->ols_flags = flags;
ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
- if (lock->cll_descr.cld_mode == CLM_GROUP)
- ols->ols_flags |= LDLM_FL_ATOMIC_CB;
if (ols->ols_flags & LDLM_FL_HAS_INTENT) {
ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
- if (lock->cll_descr.cld_mode == CLM_GROUP)
- oscl->ols_flags |= LDLM_FL_ATOMIC_CB;
if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
struct lustre_handle lockh = { 0 };
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
- __u64 match_flags = *flags;
+ __u64 search_flags = *flags;
+ __u64 match_flags = 0;
enum ldlm_mode mode;
int rc;
ENTRY;
* matching a lock; speculative lock requests do not need to,
* because they will not actually use the lock. */
if (!speculative)
- match_flags |= LDLM_FL_LVB_READY;
+ search_flags |= LDLM_FL_LVB_READY;
if (intent != 0)
- match_flags |= LDLM_FL_BLOCK_GRANTED;
- mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
- einfo->ei_type, policy, mode, &lockh);
+ search_flags |= LDLM_FL_BLOCK_GRANTED;
+ if (mode == LCK_GROUP)
+ match_flags = LDLM_MATCH_GROUP;
+ mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+ res_id, einfo->ei_type, policy, mode,
+ &lockh, match_flags);
if (mode) {
struct ldlm_lock *matched;