- LASSERT(cl_lock_is_mutexed(lock));
- LASSERTF(ols->ols_state == OLS_NEW,
- "Impossible state: %d\n", ols->ols_state);
-
- ols->ols_flags = osc_enq2ldlm_flags(enqflags);
- if (enqflags & CEF_AGL) {
- ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
- ols->ols_agl = 1;
- }
- if (ols->ols_flags & LDLM_FL_HAS_INTENT)
- ols->ols_glimpse = 1;
- if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
- /* try to convert this lock to a lockless lock */
- osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
-
- result = osc_lock_enqueue_wait(env, ols);
- if (result == 0) {
- if (!osc_lock_is_lockless(ols)) {
- struct osc_object *obj = cl2osc(slice->cls_obj);
- struct osc_thread_info *info = osc_env_info(env);
- struct ldlm_res_id *resname = &info->oti_resname;
- ldlm_policy_data_t *policy = &info->oti_policy;
- struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
-
- if (ols->ols_locklessable)
- ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
-
- /* a reference for lock, passed as an upcall cookie */
- cl_lock_get(lock);
- lu_ref_add(&lock->cll_reference, "upcall", lock);
- /* a user for lock also */
- cl_lock_user_add(env, lock);
- ols->ols_state = OLS_ENQUEUED;
-
- /*
- * XXX: this is possible blocking point as
- * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
- * LDLM_CP_CALLBACK.
- */
- osc_lock_build_res(env, obj, resname);
- osc_lock_build_policy(env, lock, policy);
- result = osc_enqueue_base(osc_export(obj), resname,
- &ols->ols_flags, policy,
- &ols->ols_lvb,
- obj->oo_oinfo->loi_kms_valid,
- osc_lock_upcall,
- ols, einfo, &ols->ols_handle,
- PTLRPCD_SET, 1, ols->ols_agl);
- if (result != 0) {
- cl_lock_user_del(env, lock);
- lu_ref_del(&lock->cll_reference,
- "upcall", lock);
- cl_lock_put(env, lock);
- if (unlikely(result == -ECANCELED)) {
- ols->ols_state = OLS_NEW;
- result = 0;
- }
- }
- } else {
- ols->ols_state = OLS_GRANTED;
- ols->ols_owner = osc_env_io(env);
- }
- }
- LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
- RETURN(result);
+ LASSERTF(ergo(oscl->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
+ "lock = %p, ols = %p\n", lock, oscl);
+
+ if (oscl->ols_state == OLS_GRANTED)
+ RETURN(0);
+
+ if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) &&
+ !(exp_connect_lockahead_old(exp) || exp_connect_lockahead(exp))) {
+ result = -EOPNOTSUPP;
+ CERROR("%s: server does not support lockahead/locknoexpand:"
+ "rc = %d\n", exp->exp_obd->obd_name, result);
+ RETURN(result);
+ }
+
+ if (oscl->ols_flags & LDLM_FL_TEST_LOCK)
+ GOTO(enqueue_base, 0);
+
+ /* For glimpse and/or speculative locks, do not wait for reply from
+ * server on LDLM request */
+ if (oscl->ols_glimpse || oscl->ols_speculative) {
+ /* Speculative and glimpse locks do not have an anchor */
+ LASSERT(equi(oscl->ols_speculative, anchor == NULL));
+ async = true;
+ GOTO(enqueue_base, 0);
+ }
+
+ result = osc_lock_enqueue_wait(env, osc, oscl);
+ if (result < 0)
+ GOTO(out, result);
+
+ /* we can grant lockless lock right after all conflicting locks
+ * are canceled. */
+ if (osc_lock_is_lockless(oscl)) {
+ oscl->ols_state = OLS_GRANTED;
+ oio->oi_lockless = 1;
+ RETURN(0);
+ }
+
+enqueue_base:
+ oscl->ols_state = OLS_ENQUEUED;
+ if (anchor != NULL) {
+ atomic_inc(&anchor->csi_sync_nr);
+ oscl->ols_owner = anchor;
+ }
+
+ /**
+ * DLM lock's ast data must be osc_object;
+ * if glimpse or speculative lock, async of osc_enqueue_base()
+ * must be true
+ *
+ * For non-speculative locks:
+ * DLM's enqueue callback set to osc_lock_upcall() with cookie as
+ * osc_lock.
+ * For speculative locks:
+ * osc_lock_upcall_speculative & cookie is the osc object, since
+ * there is no osc_lock
+ */
+ ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname);
+ osc_lock_build_policy(env, lock, policy);
+ if (oscl->ols_speculative) {
+ oscl->ols_einfo.ei_cbdata = NULL;
+ /* hold a reference for callback */
+ cl_object_get(osc2cl(osc));
+ upcall = osc_lock_upcall_speculative;
+ cookie = osc;
+ }
+ result = osc_enqueue_base(exp, resname, &oscl->ols_flags,
+ policy, &oscl->ols_lvb,
+ osc->oo_oinfo->loi_kms_valid,
+ upcall, cookie,
+ &oscl->ols_einfo, PTLRPCD_SET, async,
+ oscl->ols_speculative);
+ if (result == 0) {
+ if (osc_lock_is_lockless(oscl)) {
+ oio->oi_lockless = 1;
+ } else if (!async) {
+ LASSERT(oscl->ols_state == OLS_GRANTED);
+ LASSERT(oscl->ols_hold);
+ LASSERT(oscl->ols_dlmlock != NULL);
+ }
+ } else if (oscl->ols_speculative) {
+ cl_object_put(env, osc2cl(osc));
+ if (oscl->ols_glimpse) {
+ /* hide error for AGL request */
+ result = 0;
+ }
+ }
+
+out:
+ if (result < 0) {
+ oscl->ols_state = OLS_CANCELLED;
+ osc_lock_wake_waiters(env, osc, oscl);
+
+ if (anchor != NULL)
+ cl_sync_io_note(env, anchor, result);
+ }
+ RETURN(result);