void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_allow_match(struct ldlm_lock *lock);
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *,
- ldlm_type_t type, ldlm_policy_data_t *, ldlm_mode_t mode,
- struct lustre_handle *);
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+ struct ldlm_res_id *, ldlm_type_t type,
+ ldlm_policy_data_t *, ldlm_mode_t mode,
+ struct lustre_handle *);
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
int *flags);
void ldlm_lock_cancel(struct ldlm_lock *lock);
/* newly added fields to handle the RCU issue. -jxiong */
spinlock_t h_lock;
- unsigned int h_size;
void *h_ptr;
void (*h_free_cb)(void *, size_t);
struct rcu_head h_rcu;
+ unsigned int h_size;
+ __u8 h_in:1;
+ __u8 h_unused[3];
};
#define RCU2HANDLE(rcu) container_of(rcu, struct portals_handle, h_rcu)
/* returns a referenced lock or NULL. See the flag descriptions below, in the
* comment above ldlm_lock_match */
-static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
+static struct ldlm_lock *search_queue(struct list_head *queue,
+ ldlm_mode_t *mode,
ldlm_policy_data_t *policy,
struct ldlm_lock *old_lock, int flags)
{
struct list_head *tmp;
list_for_each(tmp, queue) {
+ ldlm_mode_t match;
+
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (lock == old_lock)
lock->l_readers == 0 && lock->l_writers == 0)
continue;
- if (!(lock->l_req_mode & mode))
+ if (!(lock->l_req_mode & *mode))
continue;
+ match = lock->l_req_mode;
if (lock->l_resource->lr_type == LDLM_EXTENT &&
(lock->l_policy_data.l_extent.start >
policy->l_extent.start ||
lock->l_policy_data.l_extent.end < policy->l_extent.end))
continue;
- if (unlikely(mode == LCK_GROUP) &&
+ if (unlikely(match == LCK_GROUP) &&
lock->l_resource->lr_type == LDLM_EXTENT &&
lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
continue;
LDLM_LOCK_GET(lock);
ldlm_lock_touch_in_lru(lock);
} else {
- ldlm_lock_addref_internal_nolock(lock, mode);
+ ldlm_lock_addref_internal_nolock(lock, match);
}
+ *mode = match;
return lock;
}
* Returns 1 if it finds an already-existing lock that is compatible; in this
* case, lockh is filled in with a addref()ed lock
*/
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
- struct ldlm_res_id *res_id, ldlm_type_t type,
- ldlm_policy_data_t *policy, ldlm_mode_t mode,
- struct lustre_handle *lockh)
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+ struct ldlm_res_id *res_id, ldlm_type_t type,
+ ldlm_policy_data_t *policy, ldlm_mode_t mode,
+ struct lustre_handle *lockh)
{
struct ldlm_resource *res;
struct ldlm_lock *lock, *old_lock = NULL;
lock_res(res);
- lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
+ lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags);
if (lock != NULL)
GOTO(out, rc = 1);
if (flags & LDLM_FL_BLOCK_GRANTED)
GOTO(out, rc = 0);
- lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
+ lock = search_queue(&res->lr_converting, &mode, policy, old_lock, flags);
if (lock != NULL)
GOTO(out, rc = 1);
- lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
+ lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, flags);
if (lock != NULL)
GOTO(out, rc = 1);
if (flags & LDLM_FL_TEST_LOCK && rc)
LDLM_LOCK_PUT(lock);
- return rc;
+ return rc ? mode : 0;
}
/* Returns a referenced lock */
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
- &policy, LCK_PW | LCK_PR, &lockh)) {
+ &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
RETURN(1);
}
RETURN(0);
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
- &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
+ &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
RETURN(1);
}
ast_flags = LDLM_FL_BLOCK_GRANTED;
rc = obd_match(sbi->ll_osc_exp, lsm, LDLM_EXTENT,
&policy, LCK_PW, &ast_flags, inode, &lockh);
- if (rc == 1) {
+ if (rc > 0) {
local_lock = 2;
rc = 0;
} else if (rc == 0) {
struct ldlm_enqueue_info *einfo,
struct ptlrpc_request_set *rqset)
{
+ ldlm_mode_t mode = einfo->ei_mode;
struct lov_request_set *set;
struct lov_request *req;
struct list_head *pos;
LASSERT(oinfo);
ASSERT_LSM_MAGIC(oinfo->oi_md);
+ LASSERT(mode == (mode & -mode));
/* we should never be asked to replay a lock this way. */
LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
RETURN(rc);
}
out:
- rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset);
+ rc = lov_fini_enqueue_set(set, mode, rc, rqset);
RETURN(rc);
}
ENTRY;
ASSERT_LSM_MAGIC(lsm);
+ LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
req->rq_oi.oi_md, type, &sub_policy,
mode, &lov_flags, data, lov_lockhp);
rc = lov_update_match_set(set, req, rc);
- if (rc != 1)
+ if (rc <= 0)
break;
}
lov_fini_match_set(set, mode, *flags);
int ret = rc;
ENTRY;
- if (rc == 1)
+ if (rc > 0)
ret = 0;
else if (rc == 0)
ret = 1;
}
EXPORT_SYMBOL(mdc_enqueue);
-int mdc_revalidate_lock(struct obd_export *exp,
- struct lookup_intent *it,
+int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
struct ll_fid *fid)
{
/* We could just return 1 immediately, but since we should only
* be called in revalidate_it if we already have a lock, let's
* verify that. */
struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
- struct lustre_handle lockh;
- ldlm_policy_data_t policy;
- int mode = LCK_CR;
- int rc;
-
- /* As not all attributes are kept under update lock, e.g.
- owner/group/acls are under lookup lock, we need both
- ibits for GETATTR. */
- policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
- MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
- MDS_INODELOCK_LOOKUP;
-
- rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
- &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
- if (!rc) {
- mode = LCK_CW;
- rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
- LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
- &policy, LCK_CW, &lockh);
- }
- if (!rc) {
- mode = LCK_PR;
- rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
- LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
- &policy, LCK_PR, &lockh);
- }
- if (rc) {
+ struct lustre_handle lockh;
+ ldlm_policy_data_t policy;
+ ldlm_mode_t mode;
+
+ /* As not all attributes are kept under update lock, e.g.
+ owner/group/acls are under lookup lock, we need both
+ ibits for GETATTR. */
+ policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
+ MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
+ MDS_INODELOCK_LOOKUP;
+
+ mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+ &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
+ if (mode) {
memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
- it->d.lustre.it_lock_mode = mode;
- }
+ it->d.lustre.it_lock_mode = mode;
+ }
- return rc;
+ return !!mode;
}
EXPORT_SYMBOL(mdc_revalidate_lock);
bucket = &handle_hash[h->h_cookie & HANDLE_HASH_MASK];
spin_lock(&bucket->lock);
list_add_rcu(&h->h_link, &bucket->head);
+ h->h_in = 1;
spin_unlock(&bucket->lock);
CDEBUG(D_INFO, "added object %p with handle "LPX64" to hash\n",
h, h->h_cookie);
spin_lock(&h->h_lock);
- if (h->h_cookie == 0) {
+ if (h->h_in == 0) {
spin_unlock(&h->h_lock);
return;
}
- h->h_cookie = 0;
+ h->h_in = 0;
spin_unlock(&h->h_lock);
list_del_rcu(&h->h_link);
}
return;
}
lock_res_and_lock(lock);
-#ifdef __KERNEL__
-#ifdef __LINUX__
+#if defined (__KERNEL__) && defined (__LINUX__)
/* Liang XXX: Darwin and Winnt checking should be added */
if (lock->l_ast_data && lock->l_ast_data != data) {
struct inode *new_inode = data;
new_inode, new_inode->i_ino, new_inode->i_generation);
}
#endif
-#endif
lock->l_ast_data = data;
lock->l_flags |= (flags & LDLM_FL_NO_LRU);
unlock_res_and_lock(lock);
struct ldlm_reply *rep;
struct ptlrpc_request *req = NULL;
int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
+ ldlm_mode_t mode;
int rc;
ENTRY;
goto no_match;
/* Next, search for already existing extent locks that will cover us */
- rc = ldlm_lock_match(obd->obd_namespace,
- oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
- einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
- oinfo->oi_lockh);
- if (rc == 1) {
+ /* If we're trying to read, we also search for an existing PW lock. The
+ * VFS and page cache already protect us locally, so lots of readers/
+ * writers can share a single PW lock.
+ *
+ * There are problems with conversion deadlocks, so instead of
+ * converting a read lock to a write lock, we'll just enqueue a new
+ * one.
+ *
+ * At some point we should cancel the read lock instead of making them
+ * send us a blocking callback, but there are problems with canceling
+ * locks out from other users right now, too. */
+ mode = einfo->ei_mode;
+ if (einfo->ei_mode == LCK_PR)
+ mode |= LCK_PW;
+ mode = ldlm_lock_match(obd->obd_namespace,
+ oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
+ einfo->ei_type, &oinfo->oi_policy, mode,
+ oinfo->oi_lockh);
+ if (mode) {
+ /* addref the lock only if not async requests and PW lock is
+ * matched whereas we asked for PR. */
+ if (!rqset && einfo->ei_mode != mode)
+ ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
oinfo->oi_flags);
if (intent) {
oinfo->oi_cb_up(oinfo, ELDLM_OK);
/* For async requests, decref the lock. */
- if (rqset)
+ if (einfo->ei_mode != mode)
+ ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
+ else if (rqset)
ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
RETURN(ELDLM_OK);
}
- /* If we're trying to read, we also search for an existing PW lock. The
- * VFS and page cache already protect us locally, so lots of readers/
- * writers can share a single PW lock.
- *
- * There are problems with conversion deadlocks, so instead of
- * converting a read lock to a write lock, we'll just enqueue a new
- * one.
- *
- * At some point we should cancel the read lock instead of making them
- * send us a blocking callback, but there are problems with canceling
- * locks out from other users right now, too. */
-
- if (einfo->ei_mode == LCK_PR) {
- rc = ldlm_lock_match(obd->obd_namespace,
- oinfo->oi_flags | LDLM_FL_LVB_READY,
- &res_id, einfo->ei_type, &oinfo->oi_policy,
- LCK_PW, oinfo->oi_lockh);
- if (rc == 1) {
- /* FIXME: This is not incredibly elegant, but it might
- * be more elegant than adding another parameter to
- * lock_match. I want a second opinion. */
- /* addref the lock only if not async requests. */
- if (!rqset)
- ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
- osc_set_data_with_check(oinfo->oi_lockh,
- einfo->ei_cbdata,
- oinfo->oi_flags);
- oinfo->oi_cb_up(oinfo, ELDLM_OK);
- ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
- RETURN(ELDLM_OK);
- }
- }
-
no_match:
if (intent) {
int size[3] = {
{
struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
struct obd_device *obd = exp->exp_obd;
- int rc;
int lflags = *flags;
+ ldlm_mode_t rc;
ENTRY;
OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
policy->l_extent.end |= ~CFS_PAGE_MASK;
/* Next, search for already existing extent locks that will cover us */
- rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
- policy, mode, lockh);
- if (rc) {
- //if (!(*flags & LDLM_FL_TEST_LOCK))
- osc_set_data_with_check(lockh, data, lflags);
- RETURN(rc);
- }
/* If we're trying to read, we also search for an existing PW lock. The
* VFS and page cache already protect us locally, so lots of readers/
* writers can share a single PW lock. */
- if (mode == LCK_PR) {
- rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
- &res_id, type,
- policy, LCK_PW, lockh);
- if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
- /* FIXME: This is not incredibly elegant, but it might
- * be more elegant than adding another parameter to
- * lock_match. I want a second opinion. */
- osc_set_data_with_check(lockh, data, lflags);
+ rc = mode;
+ if (mode == LCK_PR)
+ rc |= LCK_PW;
+ rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
+ &res_id, type, policy, rc, lockh);
+ if (rc) {
+ osc_set_data_with_check(lockh, data, lflags);
+ if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
ldlm_lock_addref(lockh, LCK_PR);
ldlm_lock_decref(lockh, LCK_PW);
}
+ RETURN(rc);
}
+
RETURN(rc);
}