struct list_head l_lru;
/**
* Linkage to resource's lock queues according to current lock state.
- * (could be granted, waiting or converting)
+ * (could be granted or waiting)
* Protected by lr_lock in struct ldlm_resource.
*/
struct list_head l_res_link;
* @{ */
/** List of locks in granted state */
struct list_head lr_granted;
- /** List of locks waiting to change their granted mode (converted) */
- struct list_head lr_converting;
/**
* List of locks that could not be granted due to conflicts and
* that are waiting for conflicts to go away */
* LDLM_PROCESS_RESCAN:
*
* It's used when policy functions are called from ldlm_reprocess_queue() to
- * reprocess the wait & convert list and try to grant locks, blocking ASTs
+ * reprocess the wait list and try to grant locks, blocking ASTs
* have already been sent in this situation, completion ASTs need be sent for
* the locks being granted.
*
* LDLM_PROCESS_ENQUEUE:
*
* It's used when policy functions are called from ldlm_lock_enqueue() to
- * process the wait & convert list for handling an enqueue request, blocking
+ * process the wait list for handling an enqueue request, blocking
* ASTs have not been sent yet, so list of conflicting locks would be
* collected and ASTs sent.
*
* LDLM_PROCESS_RECOVERY:
*
* It's used when policy functions are called from ldlm_reprocess_queue() to
- * reprocess the wait & convert list when recovery done. In case of blocking
+ * reprocess the wait list when recovery done. In case of blocking
* ASTs are lost before recovery, it needs not only to grant locks if
* available, but also send blocking ASTs to the locks doesn't have AST sent
* flag. Completion ASTs need be sent for the locks being granted.
* MDT or OST to pass through LDLM requests to LDLM for handling
* @{
*/
-int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback,
- ldlm_blocking_callback, ldlm_glimpse_callback);
int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
- const struct ldlm_request *dlm_req,
- const struct ldlm_callback_suite *cbs);
-int ldlm_handle_convert(struct ptlrpc_request *req);
+ const struct ldlm_request *dlm_req,
+ const struct ldlm_callback_suite *cbs);
int ldlm_handle_convert0(struct ptlrpc_request *req,
- const struct ldlm_request *dlm_req);
+ const struct ldlm_request *dlm_req);
int ldlm_handle_cancel(struct ptlrpc_request *req);
int ldlm_request_cancel(struct ptlrpc_request *req,
const struct ldlm_request *dlm_req,
struct lustre_handle *, int unref);
enum ldlm_mode ldlm_revalidate_lock_handle(const struct lustre_handle *lockh,
__u64 *bits);
-struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock,
- enum ldlm_mode new_mode, __u32 *flags);
-void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode);
+void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode);
void ldlm_lock_cancel(struct ldlm_lock *lock);
void ldlm_reprocess_all(struct ldlm_resource *res);
void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns);
void *data, __u32 lvb_len, enum lvb_type lvb_type,
const __u64 *client_cookie,
struct lustre_handle *lockh);
-int ldlm_cli_convert(const struct lustre_handle *lockh, int new_mode,
- __u32 *flags);
int ldlm_cli_update_pool(struct ptlrpc_request *req);
int ldlm_cli_cancel(const struct lustre_handle *lockh,
enum ldlm_cancel_flags cancel_flags);
/**
* Server placed lock on conv list, or a recovering client wants the lock
- * added to the conv list, no questions asked. */
+ * added to the conv list, no questions asked. (obsoleted) */
#define LDLM_FL_BLOCK_CONV 0x0000000000000004ULL // bit 2
#define ldlm_is_block_conv(_l) LDLM_TEST_FLAG(( _l), 1ULL << 2)
#define ldlm_set_block_conv(_l) LDLM_SET_FLAG(( _l), 1ULL << 2)
/** l_flags bits marked as "blocked" bits */
#define LDLM_FL_BLOCKED_MASK (LDLM_FL_BLOCK_GRANTED |\
- LDLM_FL_BLOCK_CONV |\
LDLM_FL_BLOCK_WAIT)
/** l_flags bits marked as "gone" bits */
ENTRY;
LASSERT(lock->l_granted_mode != lock->l_req_mode);
- LASSERT(list_empty(&res->lr_converting));
LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
!ldlm_is_ast_discard_data(lock));
INIT_LIST_HEAD(&rpc_list);
ENTRY;
LASSERT(lock->l_granted_mode != lock->l_req_mode);
- LASSERT(list_empty(&res->lr_converting));
INIT_LIST_HEAD(&rpc_list);
check_res_locked(res);
* NOTE: called by
* - ldlm_lock_enqueue
* - ldlm_reprocess_queue
- * - ldlm_lock_convert
*
* must be called with lr_lock held
*/
GOTO(out, rc = 1);
if (flags & LDLM_FL_BLOCK_GRANTED)
GOTO(out, rc = 0);
- lock = search_queue(&res->lr_converting, &data);
- if (lock != NULL)
- GOTO(out, rc = 1);
lock = search_queue(&res->lr_waiting, &data);
if (lock != NULL)
GOTO(out, rc = 1);
* more or less trusting the clients not to lie.
*
* FIXME (bug 268): Detect obvious lies by checking compatibility in
- * granted/converting queues. */
+ * granted queue. */
if (local) {
- if (*flags & LDLM_FL_BLOCK_CONV)
- ldlm_resource_add_lock(res, &res->lr_converting, lock);
- else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- else
- ldlm_grant_lock(lock, NULL);
+ if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+ else
+ ldlm_grant_lock(lock, NULL);
GOTO(out, rc = ELDLM_OK);
#ifdef HAVE_SERVER_SUPPORT
- } else if (*flags & LDLM_FL_REPLAY) {
- if (*flags & LDLM_FL_BLOCK_CONV) {
- ldlm_resource_add_lock(res, &res->lr_converting, lock);
+ } else if (*flags & LDLM_FL_REPLAY) {
+ if (*flags & LDLM_FL_BLOCK_WAIT) {
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
GOTO(out, rc = ELDLM_OK);
- } else if (*flags & LDLM_FL_BLOCK_WAIT) {
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+ } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
+ ldlm_grant_lock(lock, NULL);
GOTO(out, rc = ELDLM_OK);
- } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
- ldlm_grant_lock(lock, NULL);
- GOTO(out, rc = ELDLM_OK);
- }
- /* If no flags, fall through to normal enqueue path. */
- }
+ }
+ /* If no flags, fall through to normal enqueue path. */
+ }
- policy = ldlm_processing_policy_table[res->lr_type];
+ policy = ldlm_processing_policy_table[res->lr_type];
policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
- GOTO(out, rc);
+ GOTO(out, rc);
#else
} else {
CERROR("This is client-side-only module, cannot handle "
/**
* Try to grant all waiting locks on a resource.
*
- * Calls ldlm_reprocess_queue on converting and waiting queues.
+ * Calls ldlm_reprocess_queue on waiting queue.
*
* Typically called after some resource locks are cancelled to see
* if anything could be granted as a result of the cancellation.
RETURN_EXIT;
restart:
lock_res(res);
- rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list,
- intention);
- if (rc == LDLM_ITER_CONTINUE)
- ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list,
- intention);
+ ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention);
unlock_res(res);
- rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
- LDLM_WORK_CP_AST);
- if (rc == -ERESTART) {
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
+ LDLM_WORK_CP_AST);
+ if (rc == -ERESTART) {
LASSERT(list_empty(&rpc_list));
- goto restart;
- }
+ goto restart;
+ }
#else
ENTRY;
}
/**
- * Downgrade an exclusive lock.
+ * Downgrade an PW/EX lock to COS mode.
*
- * A fast variant of ldlm_lock_convert for convertion of exclusive locks. The
+ * A lock mode convertion from PW/EX mode to less conflict mode. The
* convertion may fail if lock was canceled before downgrade, but it doesn't
* indicate any problem, because such lock has no reader or writer, and will
* be released soon.
- * Used by Commit on Sharing (COS) code.
+ * Used by Commit on Sharing (COS) code only for now.
*
* \param lock A lock to convert
* \param new_mode new lock mode
*/
-void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
+void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
{
ENTRY;
EXIT;
}
-EXPORT_SYMBOL(ldlm_lock_downgrade);
-
-/**
- * Attempt to convert already granted lock to a different mode.
- *
- * While lock conversion is not currently used, future client-side
- * optimizations could take advantage of it to avoid discarding cached
- * pages on a file.
- */
-struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock,
- enum ldlm_mode new_mode, __u32 *flags)
-{
- struct list_head rpc_list;
- struct ldlm_resource *res;
- struct ldlm_namespace *ns;
- int granted = 0;
-#ifdef HAVE_SERVER_SUPPORT
- int old_mode;
- struct sl_insert_point prev;
-#endif
- struct ldlm_interval *node;
- ENTRY;
-
- INIT_LIST_HEAD(&rpc_list);
- /* Just return if mode is unchanged. */
- if (new_mode == lock->l_granted_mode) {
- *flags |= LDLM_FL_BLOCK_GRANTED;
- RETURN(lock->l_resource);
- }
-
- /* I can't check the type of lock here because the bitlock of lock
- * is not held here, so do the allocation blindly. -jay */
- OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
- if (node == NULL) /* Actually, this causes EDEADLOCK to be returned */
- RETURN(NULL);
-
- LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
- "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
-
- lock_res_and_lock(lock);
-
- res = lock->l_resource;
- ns = ldlm_res_to_ns(res);
-
-#ifdef HAVE_SERVER_SUPPORT
- old_mode = lock->l_req_mode;
-#endif
- lock->l_req_mode = new_mode;
- if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
-#ifdef HAVE_SERVER_SUPPORT
- /* remember the lock position where the lock might be
- * added back to the granted list later and also
- * remember the join mode for skiplist fixing. */
- prev.res_link = lock->l_res_link.prev;
- prev.mode_link = lock->l_sl_mode.prev;
- prev.policy_link = lock->l_sl_policy.prev;
-#endif
- ldlm_resource_unlink_lock(lock);
- } else {
- ldlm_resource_unlink_lock(lock);
- if (res->lr_type == LDLM_EXTENT) {
- /* FIXME: ugly code, I have to attach the lock to a
- * interval node again since perhaps it will be granted
- * soon */
- INIT_LIST_HEAD(&node->li_group);
- ldlm_interval_attach(node, lock);
- node = NULL;
- }
- }
-
- /*
- * Remove old lock from the pool before adding the lock with new
- * mode below in ->policy()
- */
- ldlm_pool_del(&ns->ns_pool, lock);
-
- /* If this is a local resource, put it on the appropriate list. */
- if (ns_is_client(ldlm_res_to_ns(res))) {
- if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
- ldlm_resource_add_lock(res, &res->lr_converting, lock);
- } else {
- /* This should never happen, because of the way the
- * server handles conversions. */
- LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
- *flags);
- LBUG();
-
- ldlm_grant_lock(lock, &rpc_list);
- granted = 1;
- /* FIXME: completion handling not with lr_lock held ! */
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, 0, NULL);
- }
-#ifdef HAVE_SERVER_SUPPORT
- } else {
- int rc;
- enum ldlm_error err;
- __u64 pflags = 0;
- ldlm_processing_policy policy;
-
- policy = ldlm_processing_policy_table[res->lr_type];
- rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err,
- &rpc_list);
- if (rc == LDLM_ITER_STOP) {
- lock->l_req_mode = old_mode;
- if (res->lr_type == LDLM_EXTENT)
- ldlm_extent_add_lock(res, lock);
- else
- ldlm_granted_list_add_lock(lock, &prev);
-
- res = NULL;
- } else {
- *flags |= LDLM_FL_BLOCK_GRANTED;
- granted = 1;
- }
- }
-#else
- } else {
- CERROR("This is client-side-only module, cannot handle "
- "LDLM_NAMESPACE_SERVER resource type lock.\n");
- LBUG();
- }
-#endif
- unlock_res_and_lock(lock);
-
- if (granted)
- ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
- if (node)
- OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
- RETURN(res);
-}
+EXPORT_SYMBOL(ldlm_lock_mode_downgrade);
/**
* Print lock with lock handle \a lockh description into debug log.
}
/**
- * Old-style LDLM main entry point for server code enqueue.
- */
-int ldlm_handle_enqueue(struct ptlrpc_request *req,
- ldlm_completion_callback completion_callback,
- ldlm_blocking_callback blocking_callback,
- ldlm_glimpse_callback glimpse_callback)
-{
- struct ldlm_request *dlm_req;
- struct ldlm_callback_suite cbs = {
- .lcs_completion = completion_callback,
- .lcs_blocking = blocking_callback,
- .lcs_glimpse = glimpse_callback
- };
- int rc;
-
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL) {
- rc = ldlm_handle_enqueue0(req->rq_export->exp_obd->obd_namespace,
- req, dlm_req, &cbs);
- } else {
- rc = -EFAULT;
- }
- return rc;
-}
-
-/**
* Main LDLM entry point for server code to process lock conversion requests.
*/
int ldlm_handle_convert0(struct ptlrpc_request *req,
const struct ldlm_request *dlm_req)
{
- struct ldlm_reply *dlm_rep;
- struct ldlm_lock *lock;
- int rc;
- ENTRY;
-
- if (req->rq_export && req->rq_export->exp_nid_stats &&
- req->rq_export->exp_nid_stats->nid_ldlm_stats)
- lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
- LDLM_CONVERT - LDLM_FIRST_OPC);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- dlm_rep->lock_flags = dlm_req->lock_flags;
-
- lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
- if (!lock) {
- req->rq_status = LUSTRE_EINVAL;
- } else {
- void *res = NULL;
-
- LDLM_DEBUG(lock, "server-side convert handler START");
-
- res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
- &dlm_rep->lock_flags);
- if (res) {
- if (ldlm_del_waiting_lock(lock))
- LDLM_DEBUG(lock, "converted waiting lock");
- req->rq_status = 0;
- } else {
- req->rq_status = LUSTRE_EDEADLK;
- }
- }
-
- if (lock) {
- if (!req->rq_status)
- ldlm_reprocess_all(lock->l_resource);
- LDLM_DEBUG(lock, "server-side convert handler END");
- LDLM_LOCK_PUT(lock);
- } else
- LDLM_DEBUG_NOLOCK("server-side convert handler END");
-
- RETURN(0);
-}
-
-/**
- * Old-style main LDLM entry point for server code to process lock conversion
- * requests.
- */
-int ldlm_handle_convert(struct ptlrpc_request *req)
-{
- int rc;
- struct ldlm_request *dlm_req;
-
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL) {
- rc = ldlm_handle_convert0(req, dlm_req);
- } else {
- CERROR ("Can't unpack dlm_req\n");
- rc = -EFAULT;
- }
- return rc;
+ RETURN(-ENOTSUPP);
}
/**
LASSERT(lock->l_granted_mode != lock->l_req_mode);
check_res_locked(res);
- LASSERT(list_empty(&res->lr_converting));
INIT_LIST_HEAD(&rpc_list);
if (intention == LDLM_PROCESS_RESCAN) {
*
* - to force all locks when resource is destroyed (cleanup_resource());
*
- * - during lock conversion (not used currently).
- *
* If lock is not granted in the first case, this function waits until second
* or penultimate cases happen in some other thread.
*
}
EXPORT_SYMBOL(ldlm_cli_enqueue);
-static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
- __u32 *flags)
-{
- struct ldlm_resource *res;
- int rc;
- ENTRY;
- if (ns_is_client(ldlm_lock_to_ns(lock))) {
- CERROR("Trying to cancel local lock\n");
- LBUG();
- }
- LDLM_DEBUG(lock, "client-side local convert");
-
- res = ldlm_lock_convert(lock, new_mode, flags);
- if (res) {
- ldlm_reprocess_all(res);
- rc = 0;
- } else {
- rc = LUSTRE_EDEADLK;
- }
- LDLM_DEBUG(lock, "client-side local convert handler END");
- LDLM_LOCK_PUT(lock);
- RETURN(rc);
-}
-
-/* FIXME: one of ldlm_cli_convert or the server side should reject attempted
- * conversion of locks which are on the waiting or converting queue */
-/* Caller of this code is supposed to take care of lock readers/writers
- accounting */
-int ldlm_cli_convert(const struct lustre_handle *lockh, int new_mode,
- __u32 *flags)
-{
- struct ldlm_request *body;
- struct ldlm_reply *reply;
- struct ldlm_lock *lock;
- struct ldlm_resource *res;
- struct ptlrpc_request *req;
- int rc;
- ENTRY;
-
- lock = ldlm_handle2lock(lockh);
- if (!lock) {
- LBUG();
- RETURN(-EINVAL);
- }
- *flags = 0;
-
- if (lock->l_conn_export == NULL)
- RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
-
- LDLM_DEBUG(lock, "client-side convert");
-
- req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
- &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
- LDLM_CONVERT);
- if (req == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-ENOMEM);
- }
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- body->lock_handle[0] = lock->l_remote_handle;
-
- body->lock_desc.l_req_mode = new_mode;
- body->lock_flags = ldlm_flags_to_wire(*flags);
-
-
- ptlrpc_request_set_replen(req);
- rc = ptlrpc_queue_wait(req);
- if (rc != ELDLM_OK)
- GOTO(out, rc);
-
- reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- if (reply == NULL)
- GOTO(out, rc = -EPROTO);
-
- if (req->rq_status)
- GOTO(out, rc = req->rq_status);
-
- res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
- if (res != NULL) {
- ldlm_reprocess_all(res);
- /* Go to sleep until the lock is granted. */
- /* FIXME: or cancelled. */
- if (lock->l_completion_ast) {
- rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
- NULL);
- if (rc)
- GOTO(out, rc);
- }
- } else {
- rc = LUSTRE_EDEADLK;
- }
- EXIT;
- out:
- LDLM_LOCK_PUT(lock);
- ptlrpc_req_finished(req);
- return rc;
-}
-
/**
* Cancel locks locally.
* Returns:
/* Lock iterators. */
int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
- void *closure)
+ void *closure)
{
struct list_head *tmp, *next;
- struct ldlm_lock *lock;
- int rc = LDLM_ITER_CONTINUE;
+ struct ldlm_lock *lock;
+ int rc = LDLM_ITER_CONTINUE;
- ENTRY;
+ ENTRY;
- if (!res)
- RETURN(LDLM_ITER_CONTINUE);
+ if (!res)
+ RETURN(LDLM_ITER_CONTINUE);
- lock_res(res);
+ lock_res(res);
list_for_each_safe(tmp, next, &res->lr_granted) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (iter(lock, closure) == LDLM_ITER_STOP)
- GOTO(out, rc = LDLM_ITER_STOP);
- }
-
- list_for_each_safe(tmp, next, &res->lr_converting) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
- if (iter(lock, closure) == LDLM_ITER_STOP)
- GOTO(out, rc = LDLM_ITER_STOP);
- }
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
list_for_each_safe(tmp, next, &res->lr_waiting) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (iter(lock, closure) == LDLM_ITER_STOP)
- GOTO(out, rc = LDLM_ITER_STOP);
- }
- out:
- unlock_res(res);
- RETURN(rc);
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+out:
+ unlock_res(res);
+ RETURN(rc);
}
struct iter_helper_data {
RETURN(0);
}
- /*
- * If granted mode matches the requested mode, this lock is granted.
- *
- * If they differ, but we have a granted mode, then we were granted
- * one mode and now want another: ergo, converting.
- *
- * If we haven't been granted anything and are on a resource list,
- * then we're blocked/waiting.
- *
- * If we haven't been granted anything and we're NOT on a resource list,
- * then we haven't got a reply yet and don't have a known disposition.
- * This happens whenever a lock enqueue is the request that triggers
- * recovery.
- */
- if (lock->l_granted_mode == lock->l_req_mode)
- flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
- else if (lock->l_granted_mode)
- flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
+ /*
+ * If granted mode matches the requested mode, this lock is granted.
+ *
+ * If we haven't been granted anything and are on a resource list,
+ * then we're blocked/waiting.
+ *
+ * If we haven't been granted anything and we're NOT on a resource list,
+ * then we haven't got a reply yet and don't have a known disposition.
+ * This happens whenever a lock enqueue is the request that triggers
+ * recovery.
+ */
+ if (lock->l_granted_mode == lock->l_req_mode)
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
else if (!list_empty(&lock->l_res_link))
- flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
- else
- flags = LDLM_FL_REPLAY;
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
+ else
+ flags = LDLM_FL_REPLAY;
req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
static int ldlm_resource_clean(struct cfs_hash *hs, struct cfs_hash_bd *bd,
struct hlist_node *hnode, void *arg)
{
- struct ldlm_resource *res = cfs_hash_object(hs, hnode);
+ struct ldlm_resource *res = cfs_hash_object(hs, hnode);
__u64 flags = *(__u64 *)arg;
- cleanup_resource(res, &res->lr_granted, flags);
- cleanup_resource(res, &res->lr_converting, flags);
- cleanup_resource(res, &res->lr_waiting, flags);
+ cleanup_resource(res, &res->lr_granted, flags);
+ cleanup_resource(res, &res->lr_waiting, flags);
- return 0;
+ return 0;
}
static int ldlm_resource_complain(struct cfs_hash *hs, struct cfs_hash_bd *bd,
}
INIT_LIST_HEAD(&res->lr_granted);
- INIT_LIST_HEAD(&res->lr_converting);
INIT_LIST_HEAD(&res->lr_waiting);
atomic_set(&res->lr_refcount, 1);
static void __ldlm_resource_putref_final(struct cfs_hash_bd *bd,
struct ldlm_resource *res)
{
- struct ldlm_ns_bucket *nsb = res->lr_ns_bucket;
+ struct ldlm_ns_bucket *nsb = res->lr_ns_bucket;
if (!list_empty(&res->lr_granted)) {
- ldlm_resource_dump(D_ERROR, res);
- LBUG();
- }
-
- if (!list_empty(&res->lr_converting)) {
- ldlm_resource_dump(D_ERROR, res);
- LBUG();
- }
+ ldlm_resource_dump(D_ERROR, res);
+ LBUG();
+ }
if (!list_empty(&res->lr_waiting)) {
- ldlm_resource_dump(D_ERROR, res);
- LBUG();
- }
+ ldlm_resource_dump(D_ERROR, res);
+ LBUG();
+ }
- cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash,
- bd, &res->lr_hash);
- lu_ref_fini(&res->lr_reference);
- if (cfs_hash_bd_count_get(bd) == 0)
- ldlm_namespace_put(nsb->nsb_namespace);
+ cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash,
+ bd, &res->lr_hash);
+ lu_ref_fini(&res->lr_reference);
+ if (cfs_hash_bd_count_get(bd) == 0)
+ ldlm_namespace_put(nsb->nsb_namespace);
}
/* Returns 1 if the resource was freed, 0 if it remains. */
}
}
}
- if (!list_empty(&res->lr_converting)) {
- CDEBUG(level, "Converting locks:\n");
- list_for_each_entry(lock, &res->lr_converting, l_res_link)
- LDLM_DEBUG_LIMIT(level, lock, "###");
- }
+
if (!list_empty(&res->lr_waiting)) {
- CDEBUG(level, "Waiting locks:\n");
+ CDEBUG(level, "Waiting locks:\n");
list_for_each_entry(lock, &res->lr_waiting, l_res_link)
- LDLM_DEBUG_LIMIT(level, lock, "###");
- }
+ LDLM_DEBUG_LIMIT(level, lock, "###");
+ }
}
EXPORT_SYMBOL(ldlm_resource_dump);
"state %p transno %lld\n", req,
req->rq_reply_state, req->rq_transno);
if (cos) {
- ldlm_lock_downgrade(lock, LCK_COS);
+ ldlm_lock_mode_downgrade(lock, LCK_COS);
mode = LCK_COS;
}
if (req->rq_export->exp_disconnected)
while (nlocks-- > 0) {
lock = ack_locks[nlocks];
- ldlm_lock_downgrade(lock, LCK_COS);
+ ldlm_lock_mode_downgrade(lock, LCK_COS);
LDLM_LOCK_PUT(lock);
}
RETURN(0);