* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2012, Intel Corporation.
+ * Copyright (c) 2010, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
static int ldlm_completion_tail(struct ldlm_lock *lock)
{
- long delay;
- int result;
-
- if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
- LDLM_DEBUG(lock, "client-side enqueue: destroyed");
- result = -EIO;
- } else {
- delay = cfs_time_sub(cfs_time_current_sec(),
- lock->l_last_activity);
- LDLM_DEBUG(lock, "client-side enqueue: granted after "
- CFS_DURATION_T"s", delay);
-
- /* Update our time estimate */
- at_measured(ldlm_lock_to_ns_at(lock),
- delay);
- result = 0;
- }
- return result;
+ long delay;
+ int result;
+
+ if (lock->l_flags & (LDLM_FL_DESTROYED | LDLM_FL_FAILED)) {
+ LDLM_DEBUG(lock, "client-side enqueue: destroyed");
+ result = -EIO;
+ } else {
+ delay = cfs_time_sub(cfs_time_current_sec(),
+ lock->l_last_activity);
+ LDLM_DEBUG(lock, "client-side enqueue: granted after "
+ CFS_DURATION_T"s", delay);
+
+ /* Update our time estimate */
+ at_measured(ldlm_lock_to_ns_at(lock),
+ delay);
+ result = 0;
+ }
+ return result;
}
/**
LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
+ rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
if (rc < 0)
CERROR("ldlm_cli_cancel: %d\n", rc);
} else {
lock->l_req_mode = newmode;
}
- if (memcmp(reply->lock_desc.l_resource.lr_name.name,
- lock->l_resource->lr_name.name,
- sizeof(struct ldlm_res_id))) {
- CDEBUG(D_INFO, "remote intent success, locking "
- "(%ld,%ld,%ld) instead of "
- "(%ld,%ld,%ld)\n",
- (long)reply->lock_desc.l_resource.lr_name.name[0],
- (long)reply->lock_desc.l_resource.lr_name.name[1],
- (long)reply->lock_desc.l_resource.lr_name.name[2],
- (long)lock->l_resource->lr_name.name[0],
- (long)lock->l_resource->lr_name.name[1],
- (long)lock->l_resource->lr_name.name[2]);
-
- rc = ldlm_lock_change_resource(ns, lock,
- &reply->lock_desc.l_resource.lr_name);
- if (rc || lock->l_resource == NULL)
- GOTO(cleanup, rc = -ENOMEM);
- LDLM_DEBUG(lock, "client-side enqueue, new resource");
- }
+ if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name,
+ &lock->l_resource->lr_name)) {
+ CDEBUG(D_INFO, "remote intent success, locking "DLDLMRES
+ " instead of "DLDLMRES"\n",
+ PLDLMRES(&reply->lock_desc.l_resource),
+ PLDLMRES(lock->l_resource));
+
+ rc = ldlm_lock_change_resource(ns, lock,
+ &reply->lock_desc.l_resource.lr_name);
+ if (rc || lock->l_resource == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ LDLM_DEBUG(lock, "client-side enqueue, new resource");
+ }
if (with_policy)
if (!(type == LDLM_IBITS &&
!(exp_connect_flags(exp) & OBD_CONNECT_IBITS)))
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
lock->l_lvb_data, size);
unlock_res_and_lock(lock);
- if (rc < 0)
+ if (rc < 0) {
+ cleanup_phase = 1;
GOTO(cleanup, rc);
+ }
}
if (!is_replay) {
{
int avail;
- avail = min_t(int, LDLM_MAXREQSIZE, CFS_PAGE_SIZE - 512) - req_size;
+ avail = min_t(int, LDLM_MAXREQSIZE, PAGE_CACHE_SIZE - 512) - req_size;
if (likely(avail >= 0))
avail /= (int)sizeof(struct lustre_handle);
else
LDLM_DEBUG(lock, "client-side enqueue START");
LASSERT(exp == lock->l_conn_export);
} else {
- const struct ldlm_callback_suite cbs = {
- .lcs_completion = einfo->ei_cb_cp,
- .lcs_blocking = einfo->ei_cb_bl,
- .lcs_glimpse = einfo->ei_cb_gl,
- .lcs_weigh = einfo->ei_cb_wg
- };
- lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
- einfo->ei_mode, &cbs, einfo->ei_cbdata,
+ const struct ldlm_callback_suite cbs = {
+ .lcs_completion = einfo->ei_cb_cp,
+ .lcs_blocking = einfo->ei_cb_bl,
+ .lcs_glimpse = einfo->ei_cb_gl
+ };
+ lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
+ einfo->ei_mode, &cbs, einfo->ei_cbdata,
lvb_len, lvb_type);
- if (lock == NULL)
- RETURN(-ENOMEM);
+ if (lock == NULL)
+ RETURN(-ENOMEM);
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, einfo->ei_mode);
ldlm_lock2handle(lock, lockh);
lock->l_conn_export = exp;
lock->l_export = NULL;
lock->l_blocking_ast = einfo->ei_cb_bl;
- lock->l_flags |= (*flags & LDLM_FL_NO_LRU);
+ lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
/* lock not sent to server yet */
ldlm_reprocess_all(res);
rc = 0;
} else {
- rc = EDEADLOCK;
+ rc = LUSTRE_EDEADLK;
}
LDLM_DEBUG(lock, "client-side local convert handler END");
LDLM_LOCK_PUT(lock);
GOTO(out, rc);
}
} else {
- rc = EDEADLOCK;
+ rc = LUSTRE_EDEADLK;
}
EXIT;
out:
} else {
rc = ptlrpc_queue_wait(req);
}
- if (rc == ESTALE) {
+ if (rc == LUSTRE_ESTALE) {
CDEBUG(D_DLMTRACE, "client/server (nid %s) "
"out of sync -- not fatal\n",
libcfs_nid2str(req->rq_import->
*
* Lock must not have any readers or writers by this time.
*/
-int ldlm_cli_cancel(struct lustre_handle *lockh)
+int ldlm_cli_cancel(struct lustre_handle *lockh,
+ ldlm_cancel_flags_t cancel_flags)
{
struct obd_export *exp;
int avail, flags, count = 1;
}
rc = ldlm_cli_cancel_local(lock);
- if (rc == LDLM_FL_LOCAL_ONLY) {
- LDLM_LOCK_RELEASE(lock);
+ if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
+ LDLM_LOCK_RELEASE(lock);
RETURN(0);
- }
+ }
/* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
* RPC which goes to canceld portal, so we can cancel other LRU locks
* here and send them all as one LDLM_CANCEL RPC. */
count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
LCF_BL_AST, flags);
}
- ldlm_cli_cancel_list(&cancels, count, NULL, 0);
+ ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
RETURN(0);
}
EXPORT_SYMBOL(ldlm_cli_cancel);
/**
* Cancel at least \a nr locks from given namespace LRU.
*
- * When called with LDLM_ASYNC the blocking callback will be handled
+ * When called with LCF_ASYNC the blocking callback will be handled
* in a thread and this function will return after the thread has been
- * asked to call the callback. When called with LDLM_SYNC the blocking
+ * asked to call the callback. When called with LCF_ASYNC the blocking
* callback will be performed in this function.
*/
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t mode,
- int flags)
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
+ ldlm_cancel_flags_t cancel_flags,
+ int flags)
{
- CFS_LIST_HEAD(cancels);
- int count, rc;
- ENTRY;
+ CFS_LIST_HEAD(cancels);
+ int count, rc;
+ ENTRY;
#ifndef __KERNEL__
- mode = LDLM_SYNC; /* force to be sync in user space */
+ cancel_flags &= ~LCF_ASYNC; /* force to be sync in user space */
#endif
- /* Just prepare the list of locks, do not actually cancel them yet.
- * Locks are cancelled later in a separate thread. */
- count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
- rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, mode);
- if (rc == 0)
- RETURN(count);
-
- RETURN(0);
+ /* Just prepare the list of locks, do not actually cancel them yet.
+ * Locks are cancelled later in a separate thread. */
+ count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
+ rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
+ if (rc == 0)
+ RETURN(count);
+
+ RETURN(0);
}
/**
0, flags | LCF_BL_AST, opaque);
rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
if (rc != ELDLM_OK)
- CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
+ CERROR("canceling unused lock "DLDLMRES": rc = %d\n",
+ PLDLMRES(res), rc);
LDLM_RESOURCE_DELREF(res);
ldlm_resource_putref(res);
};
static int ldlm_cli_hash_cancel_unused(cfs_hash_t *hs, cfs_hash_bd_t *bd,
- cfs_hlist_node_t *hnode, void *arg)
+ cfs_hlist_node_t *hnode, void *arg)
{
- struct ldlm_resource *res = cfs_hash_object(hs, hnode);
- struct ldlm_cli_cancel_arg *lc = arg;
- int rc;
-
- rc = ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
- NULL, LCK_MINMODE,
- lc->lc_flags, lc->lc_opaque);
- if (rc != 0) {
- CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n",
- res->lr_name.name[0], rc);
- }
- /* must return 0 for hash iteration */
- return 0;
+ struct ldlm_resource *res = cfs_hash_object(hs, hnode);
+ struct ldlm_cli_cancel_arg *lc = arg;
+
+ ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
+ NULL, LCK_MINMODE, lc->lc_flags,
+ lc->lc_opaque);
+ /* must return 0 for hash iteration */
+ return 0;
}
/**