*
* This can be optimized to not update attributes when lock is a result of a
* local match.
+ *
+ * Called under lock and resource spin-locks.
*/
static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
int rc)
dlmlock = olck->ols_lock;
LASSERT(dlmlock != NULL);
+ /* re-grab LVB from a dlm lock under DLM spin-locks. */
+ *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
size = lvb->lvb_size;
/* Extend KMS up to the end of this lock and no further
* A lock on [x,y] means a KMS of up to y + 1 bytes! */
lvb->lvb_size, oinfo->loi_kms,
dlmlock->l_policy_data.l_extent.end);
}
- ldlm_lock_allow_match(dlmlock);
+ ldlm_lock_allow_match_locked(dlmlock);
} else if (rc == -ENAVAIL && olck->ols_glimpse) {
CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
" kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
EXIT;
}
+/**
+ * Called when a lock is granted, from an upcall (when server returned a
+ * granted lock), or from completion AST, when server returned a blocked lock.
+ *
+ * Called under lock and resource spin-locks, that are released temporarily
+ * here.
+ */
static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
struct ldlm_lock *dlmlock, int rc)
{
* tell upper layers the extent of the lock that was actually
* granted
*/
- cl_lock_modify(env, lock, descr);
LINVRNT(osc_lock_invariant(olck));
olck->ols_state = OLS_GRANTED;
osc_lock_lvb_update(env, olck, rc);
+
+ /* release DLM spin-locks to allow cl_lock_modify() to take a
+ * semaphore on a parent lock. This is safe, because
+ * spin-locks are needed to protect consistency of
+ * dlmlock->l_*_mode and LVB, and we have finished processing
+ * them. */
+ unlock_res_and_lock(dlmlock);
+ cl_lock_modify(env, lock, descr);
+ lock_res_and_lock(dlmlock);
+
cl_lock_signal(env, lock);
}
EXIT;
LASSERT(olck->ols_lock == NULL);
olck->ols_lock = dlmlock;
spin_unlock(&osc_ast_guard);
- unlock_res_and_lock(dlmlock);
/*
* Lock might be not yet granted. In this case, completion ast
*/
if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
osc_lock_granted(env, olck, dlmlock, 0);
+ unlock_res_and_lock(dlmlock);
+
/*
* osc_enqueue_interpret() decrefs asynchronous locks, counter
* this.
* to lock->l_lvb_data, store it in osc_lock.
*/
LASSERT(dlmlock->l_lvb_data != NULL);
+ lock_res_and_lock(dlmlock);
olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
if (olck->ols_lock == NULL)
/*
osc_lock_granted(env, olck, dlmlock, dlmrc);
if (dlmrc != 0)
cl_lock_error(env, lock, dlmrc);
+ unlock_res_and_lock(dlmlock);
cl_lock_mutex_put(env, lock);
osc_ast_data_put(env, olck);
result = 0;