if (lock->l_conn_export == NULL) {
static cfs_time_t next_dump = 0, last_dump = 0;
+ if (ptlrpc_check_suspend())
+ RETURN(0);
+
LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago); "
"not entering recovery in server code, just going "
"back to sleep", lock->l_enqueued_time.tv_sec,
return -ELDLM_NO_LOCK_DATA;
}
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id,
+int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+ const struct ldlm_res_id *res_id,
ldlm_type_t type, ldlm_policy_data_t *policy,
ldlm_mode_t mode, int *flags,
ldlm_blocking_callback blocking,
ENTRY;
LASSERT(!(*flags & LDLM_FL_REPLAY));
- if (ns->ns_client) {
+ if (unlikely(ns->ns_client)) {
CERROR("Trying to enqueue local lock in a shadow namespace\n");
LBUG();
}
lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
completion, glimpse, data, lvb_len);
- if (!lock)
+ if (unlikely(!lock))
GOTO(out_nolock, err = -ENOMEM);
LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
lock->l_req_extent = policy->l_extent;
err = ldlm_lock_enqueue(ns, &lock, policy, flags);
- if (err != ELDLM_OK)
+ if (unlikely(err != ELDLM_OK))
GOTO(out, err);
if (policy != NULL)
*policy = lock->l_policy_data;
- if ((*flags) & LDLM_FL_LOCK_CHANGED)
- res_id = lock->l_resource->lr_name;
LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
lock);
lock->l_req_mode = newmode;
}
- if (reply->lock_desc.l_resource.lr_name.name[0] !=
- lock->l_resource->lr_name.name[0]) {
- CDEBUG(D_INFO, "remote intent success, locking %ld "
- "instead of %ld\n",
+ if (memcmp(reply->lock_desc.l_resource.lr_name.name,
+ lock->l_resource->lr_name.name,
+ sizeof(struct ldlm_res_id))) {
+ CDEBUG(D_INFO, "remote intent success, locking "
+ "(%ld,%ld,%ld) instead of "
+ "(%ld,%ld,%ld)\n",
(long)reply->lock_desc.l_resource.lr_name.name[0],
- (long)lock->l_resource->lr_name.name[0]);
+ (long)reply->lock_desc.l_resource.lr_name.name[1],
+ (long)reply->lock_desc.l_resource.lr_name.name[2],
+ (long)lock->l_resource->lr_name.name[0],
+ (long)lock->l_resource->lr_name.name[1],
+ (long)lock->l_resource->lr_name.name[2]);
ldlm_lock_change_resource(ns, lock,
- reply->lock_desc.l_resource.lr_name);
+ &reply->lock_desc.l_resource.lr_name);
if (lock->l_resource == NULL) {
LBUG();
GOTO(cleanup, rc = -ENOMEM);
* request was created in ldlm_cli_enqueue and it is the async request,
* pass it to the caller in @reqp. */
int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
- struct ldlm_res_id res_id, ldlm_type_t type,
- ldlm_policy_data_t *policy, ldlm_mode_t mode, int *flags,
+ const struct ldlm_res_id *res_id,
+ ldlm_type_t type, ldlm_policy_data_t *policy,
+ ldlm_mode_t mode, int *flags,
ldlm_blocking_callback blocking,
ldlm_completion_callback completion,
ldlm_glimpse_callback glimpse,
lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
if (lock == NULL)
RETURN(0);
-
+
if (lock->l_conn_export) {
int local_only;
struct obd_import *imp;
(LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
ldlm_cancel_callback(lock);
unlock_res_and_lock(lock);
-
+
if (local_only) {
CDEBUG(D_INFO, "not sending request (at caller's "
"instruction)\n");
}
static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
- struct ldlm_res_id res_id, int flags,
- void *opaque)
+ const struct ldlm_res_id *res_id,
+ int flags, void *opaque)
{
struct list_head *tmp, *next, list = CFS_LIST_HEAD_INIT(list);
struct ldlm_resource *res;
res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
if (res == NULL) {
/* This is not a problem. */
- CDEBUG(D_INFO, "No resource "LPU64"\n", res_id.name[0]);
+ CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
RETURN(0);
}
* to notify the server.
* If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
- struct ldlm_res_id *res_id, int flags, void *opaque)
+ const struct ldlm_res_id *res_id,
+ int flags, void *opaque)
{
int i;
ENTRY;
RETURN(ELDLM_OK);
if (res_id)
- RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
+ RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, flags,
opaque));
spin_lock(&ns->ns_hash_lock);
ldlm_resource_getref(res);
spin_unlock(&ns->ns_hash_lock);
- rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
+ rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name,
flags, opaque);
if (rc)
/* join/split resource locks to/from lru list */
int ldlm_cli_join_lru(struct ldlm_namespace *ns,
- struct ldlm_res_id *res_id, int join)
+ const struct ldlm_res_id *res_id, int join)
{
struct ldlm_resource *res;
struct ldlm_lock *lock, *n;
LASSERT(ns->ns_client == LDLM_NAMESPACE_CLIENT);
- res = ldlm_resource_get(ns, NULL, *res_id, LDLM_EXTENT, 0);
+ res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
if (res == NULL)
RETURN(count);
LASSERT(res->lr_type == LDLM_EXTENT);
}
/* non-blocking function to manipulate a lock whose cb_data is being put away.*/
-void ldlm_resource_iterate(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+void ldlm_resource_iterate(struct ldlm_namespace *ns,
+ const struct ldlm_res_id *res_id,
ldlm_iterator_t iter, void *data)
{
struct ldlm_resource *res;
LBUG();
}
- res = ldlm_resource_get(ns, NULL, *res_id, 0, 0);
+ res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
if (res == NULL) {
EXIT;
return;
size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
}
ptlrpc_req_set_repsize(req, buffers, size);
+ /* notify the server we've replayed all requests.
+ * also, we mark the request to be put on a dedicated
+ * queue to be processed after all request replayes.
+ * bug 6063 */
+ lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
LDLM_DEBUG(lock, "replaying lock:");