LCF_ASYNC = 0x1, /* Cancel locks asynchronously. */
LCF_LOCAL = 0x2, /* Cancel locks locally, not notifing server */
LCF_BL_AST = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */
- LCF_CONVERT = 0x8, /* Try to convert IBITS lock before cancel */
};
struct ldlm_flock {
void *data, __u32 lvb_len, enum lvb_type lvb_type,
const __u64 *client_cookie,
struct lustre_handle *lockh);
-int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags);
+int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits);
+int ldlm_cli_convert(struct ldlm_lock *lock,
+ enum ldlm_cancel_flags cancel_flags);
int ldlm_cli_update_pool(struct ptlrpc_request *req);
int ldlm_cli_cancel(const struct lustre_handle *lockh,
enum ldlm_cancel_flags cancel_flags);
enum ldlm_cancel_flags flags);
int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop);
-int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits);
-int ldlm_cli_dropbits_list(struct list_head *converts, __u64 drop_bits);
+int ldlm_cli_inodebits_convert(struct ldlm_lock *lock,
+ enum ldlm_cancel_flags cancel_flags);
/** @} ldlm_cli_api */
union ldlm_policy_data *lpolicy)
{
lpolicy->l_inodebits.bits = wpolicy->l_inodebits.bits;
- lpolicy->l_inodebits.try_bits = wpolicy->l_inodebits.try_bits;
+ /**
+ * try_bits are to be handled outside of generic write_to_local due
+ * to different behavior on a server and client.
+ */
}
void ldlm_ibits_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
EXPORT_SYMBOL(ldlm_inodebits_drop);
/* convert single lock */
-int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits)
+int ldlm_cli_inodebits_convert(struct ldlm_lock *lock,
+ enum ldlm_cancel_flags cancel_flags)
{
- struct lustre_handle lockh;
+ struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+ struct ldlm_lock_desc ld = { { 0 } };
+ __u64 drop_bits, new_bits;
__u32 flags = 0;
int rc;
ENTRY;
- LASSERT(drop_bits);
- LASSERT(!lock->l_readers && !lock->l_writers);
+ check_res_locked(lock->l_resource);
- LDLM_DEBUG(lock, "client lock convert START");
+ /* Lock is being converted already */
+ if (ldlm_is_converting(lock)) {
+ if (!(cancel_flags & LCF_ASYNC)) {
+ struct l_wait_info lwi = { 0 };
- ldlm_lock2handle(lock, &lockh);
- lock_res_and_lock(lock);
- /* check if all bits are blocked */
- if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) {
- unlock_res_and_lock(lock);
- /* return error to continue with cancel */
- GOTO(exit, rc = -EINVAL);
+ unlock_res_and_lock(lock);
+ l_wait_event(lock->l_waitq,
+ is_lock_converted(lock), &lwi);
+ lock_res_and_lock(lock);
+ }
+ RETURN(0);
}
- /* check if no common bits, consider this as successful convert */
- if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
- unlock_res_and_lock(lock);
- GOTO(exit, rc = 0);
- }
+ /* lru_cancel may happen in parallel and call ldlm_cli_cancel_list()
+ * independently.
+ */
+ if (ldlm_is_canceling(lock))
+ RETURN(-EINVAL);
- /* check if there is race with cancel */
- if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) {
- unlock_res_and_lock(lock);
- GOTO(exit, rc = -EINVAL);
- }
+ /* no need in only local convert */
+ if (lock->l_flags & (LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL_ON_BLOCK))
+ RETURN(-EINVAL);
- /* clear cbpending flag early, it is safe to match lock right after
- * client convert because it is downgrade always.
- */
- ldlm_clear_cbpending(lock);
- ldlm_clear_bl_ast(lock);
+ drop_bits = lock->l_policy_data.l_inodebits.cancel_bits;
+ /* no cancel bits - means that caller needs full cancel */
+ if (drop_bits == 0)
+ RETURN(-EINVAL);
+
+ new_bits = lock->l_policy_data.l_inodebits.bits & ~drop_bits;
+ /* check if all lock bits are dropped, proceed with cancel */
+ if (!new_bits)
+ RETURN(-EINVAL);
+
+ /* check if no dropped bits, consider this as successful convert */
+ if (lock->l_policy_data.l_inodebits.bits == new_bits)
+ RETURN(0);
- /* If lock is being converted already, check drop bits first */
- if (ldlm_is_converting(lock)) {
- /* raced lock convert, lock inodebits are remaining bits
- * so check if they are conflicting with new convert or not.
- */
- if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
- unlock_res_and_lock(lock);
- GOTO(exit, rc = 0);
- }
- /* Otherwise drop new conflicting bits in new convert */
- }
ldlm_set_converting(lock);
- /* from all bits of blocking lock leave only conflicting */
- drop_bits &= lock->l_policy_data.l_inodebits.bits;
- /* save them in cancel_bits, so l_blocking_ast will know
- * which bits from the current lock were dropped. */
- lock->l_policy_data.l_inodebits.cancel_bits = drop_bits;
- /* Finally clear these bits in lock ibits */
- ldlm_inodebits_drop(lock, drop_bits);
- unlock_res_and_lock(lock);
/* Finally call cancel callback for remaining bits only.
* It is important to have converting flag during that
* so blocking_ast callback can distinguish convert from
* cancels.
*/
- if (lock->l_blocking_ast)
- lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
- LDLM_CB_CANCELING);
-
+ ld.l_policy_data.l_inodebits.cancel_bits = drop_bits;
+ unlock_res_and_lock(lock);
+ lock->l_blocking_ast(lock, &ld, lock->l_ast_data, LDLM_CB_CANCELING);
/* now notify server about convert */
- rc = ldlm_cli_convert(lock, &flags);
- if (rc) {
- lock_res_and_lock(lock);
- if (ldlm_is_converting(lock)) {
- ldlm_clear_converting(lock);
- ldlm_set_cbpending(lock);
- ldlm_set_bl_ast(lock);
- }
- unlock_res_and_lock(lock);
- GOTO(exit, rc);
- }
- EXIT;
-exit:
- LDLM_DEBUG(lock, "client lock convert END");
- return rc;
-}
+ rc = ldlm_cli_convert_req(lock, &flags, new_bits);
+ lock_res_and_lock(lock);
+ if (rc)
+ GOTO(full_cancel, rc);
+
+ /* Finally clear these bits in lock ibits */
+ ldlm_inodebits_drop(lock, drop_bits);
+ /* Being locked again check if lock was canceled, it is important
+ * to do and don't drop cbpending below
+ */
+ if (ldlm_is_canceling(lock))
+ GOTO(full_cancel, rc = -EINVAL);
+
+ /* also check again if more bits to be cancelled appeared */
+ if (drop_bits != lock->l_policy_data.l_inodebits.cancel_bits)
+ GOTO(clear_converting, rc = -EAGAIN);
+
+ /* clear cbpending flag early, it is safe to match lock right after
+ * client convert because it is downgrade always.
+ */
+ ldlm_clear_cbpending(lock);
+ ldlm_clear_bl_ast(lock);
+ spin_lock(&ns->ns_lock);
+ if (list_empty(&lock->l_lru))
+ ldlm_lock_add_to_lru_nolock(lock);
+ spin_unlock(&ns->ns_lock);
+
+ /* the job is done, zero the cancel_bits. If more conflicts appear,
+ * it will result in another cycle of ldlm_cli_inodebits_convert().
+ */
+full_cancel:
+ lock->l_policy_data.l_inodebits.cancel_bits = 0;
+clear_converting:
+ ldlm_clear_converting(lock);
+ RETURN(rc);
+}
int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock)
{
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
+void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
#ifdef HAVE_SERVER_SUPPORT
/* ldlm_plain.c */
return bl_done;
}
+static inline bool is_lock_converted(struct ldlm_lock *lock)
+{
+ bool ret = 0;
+
+ lock_res_and_lock(lock);
+ ret = (lock->l_policy_data.l_inodebits.cancel_bits == 0);
+ unlock_res_and_lock(lock);
+
+ return ret;
+}
+
typedef void (*ldlm_policy_wire_to_local_t)(const union ldlm_wire_policy_data *,
union ldlm_policy_data *);
typedef void (*ldlm_policy_local_to_wire_t)(const union ldlm_policy_data *,
&lock->l_policy_data);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
lock->l_req_extent = lock->l_policy_data.l_extent;
+ else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
+ lock->l_policy_data.l_inodebits.try_bits =
+ dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
existing_lock:
if (flags & LDLM_FL_HAS_INTENT) {
struct obd_export *exp = req->rq_export;
struct ldlm_reply *dlm_rep;
struct ldlm_lock *lock;
+ __u64 bits;
+ __u64 new_bits;
int rc;
ENTRY;
dlm_rep->lock_flags = dlm_req->lock_flags;
lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
- if (lock) {
- __u64 bits;
- __u64 new;
+ if (!lock) {
+ LDLM_DEBUG_NOLOCK("server lock is canceled already");
+ req->rq_status = ELDLM_NO_LOCK_DATA;
+ RETURN(0);
+ }
- bits = lock->l_policy_data.l_inodebits.bits;
- new = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
- LDLM_DEBUG(lock, "server-side convert handler START");
+ LDLM_DEBUG(lock, "server-side convert handler START");
- if (ldlm_is_cancel(lock)) {
- LDLM_ERROR(lock, "convert on canceled lock!");
- rc = ELDLM_NO_LOCK_DATA;
- } else if (dlm_req->lock_desc.l_req_mode !=
- lock->l_granted_mode) {
- LDLM_ERROR(lock, "lock mode differs!");
- rc = ELDLM_NO_LOCK_DATA;
- } else if (bits == new) {
- /* This can be valid situation if CONVERT RPCs are
- * re-ordered. Just finish silently */
- LDLM_DEBUG(lock, "lock is converted already!");
- rc = ELDLM_OK;
- } else {
- lock_res_and_lock(lock);
- if (ldlm_is_waited(lock))
- ldlm_del_waiting_lock(lock);
+ lock_res_and_lock(lock);
+ bits = lock->l_policy_data.l_inodebits.bits;
+ new_bits = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
- ldlm_clear_cbpending(lock);
- lock->l_policy_data.l_inodebits.cancel_bits = 0;
- ldlm_inodebits_drop(lock, bits & ~new);
+ if (ldlm_is_cancel(lock)) {
+ LDLM_DEBUG(lock, "convert on canceled lock!");
+ unlock_res_and_lock(lock);
+ GOTO(out_put, rc = ELDLM_NO_LOCK_DATA);
+ }
- ldlm_clear_blocking_data(lock);
- unlock_res_and_lock(lock);
+ if (dlm_req->lock_desc.l_req_mode != lock->l_granted_mode) {
+ LDLM_ERROR(lock, "lock mode differs!");
+ unlock_res_and_lock(lock);
+ GOTO(out_put, rc = -EPROTO);
+ }
- ldlm_reprocess_all(lock->l_resource, NULL);
- rc = ELDLM_OK;
- }
+ if (bits == new_bits) {
+ /*
+ * This can be valid situation if CONVERT RPCs are
+ * re-ordered. Just finish silently
+ */
+ LDLM_DEBUG(lock, "lock is converted already!");
+ unlock_res_and_lock(lock);
+ } else {
+ if (ldlm_is_waited(lock))
+ ldlm_del_waiting_lock(lock);
- if (rc == ELDLM_OK) {
- dlm_rep->lock_handle = lock->l_remote_handle;
- ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
- &dlm_rep->lock_desc.l_policy_data);
- }
+ ldlm_clear_cbpending(lock);
+ lock->l_policy_data.l_inodebits.cancel_bits = 0;
+ ldlm_inodebits_drop(lock, bits & ~new_bits);
- LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
- rc);
- LDLM_LOCK_PUT(lock);
- } else {
- rc = ELDLM_NO_LOCK_DATA;
- LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
- rc);
+ ldlm_clear_blocking_data(lock);
+ unlock_res_and_lock(lock);
+
+ ldlm_reprocess_all(lock->l_resource, NULL);
}
+ dlm_rep->lock_handle = lock->l_remote_handle;
+ ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
+ &dlm_rep->lock_desc.l_policy_data);
+ rc = ELDLM_OK;
+ EXIT;
+out_put:
+ LDLM_DEBUG(lock, "server-side convert handler END, rc = %d", rc);
+ LDLM_LOCK_PUT(lock);
req->rq_status = rc;
-
- RETURN(0);
+ return 0;
}
/**
#endif /* HAVE_SERVER_SUPPORT */
/**
- * Callback handler for receiving incoming blocking ASTs.
- *
- * This can only happen on client side.
+ * Server may pass additional information about blocking lock.
+ * For IBITS locks it is conflicting bits which can be used for
+ * lock convert instead of cancel.
*/
-void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
{
- int do_ast;
- ENTRY;
-
- LDLM_DEBUG(lock, "client blocking AST callback handler");
+ struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
- lock_res_and_lock(lock);
-
- /* set bits to cancel for this lock for possible lock convert */
- if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) {
- /* Lock description contains policy of blocking lock,
+ check_res_locked(lock->l_resource);
+ if (ns_is_client(ns) && ld &&
+ (lock->l_resource->lr_type == LDLM_IBITS)) {
+ /*
+ * Lock description contains policy of blocking lock,
* and its cancel_bits is used to pass conflicting bits.
* NOTE: ld can be NULL or can be not NULL but zeroed if
* passed from ldlm_bl_thread_blwi(), check below used bits
* in ld to make sure it is valid description.
*/
- if (ld && ld->l_policy_data.l_inodebits.bits)
- lock->l_policy_data.l_inodebits.cancel_bits =
+ if (ld->l_policy_data.l_inodebits.cancel_bits &&
+ ldlm_res_eq(&ld->l_resource.lr_name,
+ &lock->l_resource->lr_name) &&
+ !(ldlm_is_cbpending(lock) &&
+ lock->l_policy_data.l_inodebits.cancel_bits == 0)) {
+ /* always combine conflicting ibits */
+ lock->l_policy_data.l_inodebits.cancel_bits |=
ld->l_policy_data.l_inodebits.cancel_bits;
- /* if there is no valid ld and lock is cbpending already
- * then cancel_bits should be kept, otherwise it is zeroed.
- */
- else if (!ldlm_is_cbpending(lock))
+ } else {
+ /* If cancel_bits are not obtained or
+ * if the lock is already CBPENDING and
+ * has no cancel_bits set
+ * - the full lock is to be cancelled
+ */
lock->l_policy_data.l_inodebits.cancel_bits = 0;
+ }
}
+}
+
+/**
+ * Callback handler for receiving incoming blocking ASTs.
+ *
+ * This can only happen on client side.
+ */
+void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+{
+ int do_ast;
+
+ ENTRY;
+
+ LDLM_DEBUG(lock, "client blocking AST callback handler");
+
+ lock_res_and_lock(lock);
+
+ /* get extra information from desc if any */
+ ldlm_bl_desc2lock(ld, lock);
ldlm_set_cbpending(lock);
if (ldlm_is_cancel_on_block(lock))
ldlm_resource_unlink_lock(lock);
- if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
- /* BL_AST locks are not needed in LRU.
- * Let ldlm_cancel_lru() be fast. */
- ldlm_lock_remove_from_lru(lock);
+ if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
+ /*
+ * BL_AST locks are not needed in LRU.
+ * Let ldlm_cancel_lru() be fast.
+ */
+ ldlm_lock_remove_from_lru(lock);
+ ldlm_bl_desc2lock(&dlm_req->lock_desc, lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
LDLM_DEBUG(lock, "completion AST includes blocking AST");
}
struct ldlm_request *dlm_req,
struct ldlm_lock *lock)
{
- int rc = -ENOSYS;
- ENTRY;
+ struct ldlm_lock_desc *ld = &dlm_req->lock_desc;
+ int rc = -ENOSYS;
+
+ ENTRY;
- LDLM_DEBUG(lock, "client glimpse AST callback handler");
+ LDLM_DEBUG(lock, "client glimpse AST callback handler");
if (lock->l_glimpse_ast != NULL)
rc = lock->l_glimpse_ast(lock, req);
ktime_after(ktime_get(),
ktime_add(lock->l_last_used,
ktime_set(ns->ns_dirty_age_limit, 0)))) {
- unlock_res_and_lock(lock);
- if (ldlm_bl_to_thread_lock(ns, NULL, lock))
- ldlm_handle_bl_callback(ns, NULL, lock);
+ unlock_res_and_lock(lock);
+
+ /* For MDS glimpse it is always DOM lock, set corresponding
+ * cancel_bits to perform lock convert if needed
+ */
+ if (lock->l_resource->lr_type == LDLM_IBITS)
+ ld->l_policy_data.l_inodebits.cancel_bits =
+ MDS_INODELOCK_DOM;
+ if (ldlm_bl_to_thread_lock(ns, ld, lock))
+ ldlm_handle_bl_callback(ns, ld, lock);
EXIT;
return;
if ((*flags) & LDLM_FL_AST_SENT) {
lock_res_and_lock(lock);
+ ldlm_bl_desc2lock(&reply->lock_desc, lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
unlock_res_and_lock(lock);
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
EXPORT_SYMBOL(ldlm_cli_enqueue);
/**
- * Client-side lock convert reply handling.
- *
- * Finish client lock converting, checks for concurrent converts
- * and clear 'converting' flag so lock can be placed back into LRU.
- */
-static int lock_convert_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct ldlm_async_args *aa, int rc)
-{
- struct ldlm_lock *lock;
- struct ldlm_reply *reply;
-
- ENTRY;
-
- lock = ldlm_handle2lock(&aa->lock_handle);
- if (!lock) {
- LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
- aa->lock_handle.cookie);
- RETURN(-ESTALE);
- }
-
- LDLM_DEBUG(lock, "CONVERTED lock:");
-
- if (rc != ELDLM_OK)
- GOTO(out, rc);
-
- reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- if (reply == NULL)
- GOTO(out, rc = -EPROTO);
-
- if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
- LDLM_ERROR(lock, "convert ACK with wrong lock cookie %#llx"
- " but cookie %#llx from server %s id %s\n",
- aa->lock_handle.cookie, reply->lock_handle.cookie,
- req->rq_export->exp_client_uuid.uuid,
- libcfs_id2str(req->rq_peer));
- GOTO(out, rc = ELDLM_NO_LOCK_DATA);
- }
-
- lock_res_and_lock(lock);
- /* Lock convert is sent for any new bits to drop, the converting flag
- * is dropped when ibits on server are the same as on client. Meanwhile
- * that can be so that more later convert will be replied first with
- * and clear converting flag, so in case of such race just exit here.
- * if lock has no converting bits then */
- if (!ldlm_is_converting(lock)) {
- LDLM_DEBUG(lock, "convert ACK for lock without converting flag,"
- " reply ibits %#llx",
- reply->lock_desc.l_policy_data.l_inodebits.bits);
- } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
- lock->l_policy_data.l_inodebits.bits) {
- /* Compare server returned lock ibits and local lock ibits
- * if they are the same we consider convertion is done,
- * otherwise we have more converts inflight and keep
- * converting flag.
- */
- LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
- reply->lock_desc.l_policy_data.l_inodebits.bits);
- } else {
- ldlm_clear_converting(lock);
-
- /* Concurrent BL AST may arrive and cause another convert
- * or cancel so just do nothing here if bl_ast is set,
- * finish with convert otherwise.
- */
- if (!ldlm_is_bl_ast(lock)) {
- struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
-
- /* Drop cancel_bits since there are no more converts
- * and put lock into LRU if it is still not used and
- * is not there yet.
- */
- lock->l_policy_data.l_inodebits.cancel_bits = 0;
- if (!lock->l_readers && !lock->l_writers &&
- !ldlm_is_canceling(lock)) {
- spin_lock(&ns->ns_lock);
- /* there is check for list_empty() inside */
- ldlm_lock_remove_from_lru_nolock(lock);
- ldlm_lock_add_to_lru_nolock(lock);
- spin_unlock(&ns->ns_lock);
- }
- }
- }
- unlock_res_and_lock(lock);
-out:
- if (rc) {
- int flag;
-
- lock_res_and_lock(lock);
- if (ldlm_is_converting(lock)) {
- ldlm_clear_converting(lock);
- ldlm_set_cbpending(lock);
- ldlm_set_bl_ast(lock);
- lock->l_policy_data.l_inodebits.cancel_bits = 0;
- }
- unlock_res_and_lock(lock);
-
- /* fallback to normal lock cancel. If rc means there is no
- * valid lock on server, do only local cancel */
- if (rc == ELDLM_NO_LOCK_DATA)
- flag = LCF_LOCAL;
- else
- flag = LCF_ASYNC;
-
- rc = ldlm_cli_cancel(&aa->lock_handle, flag);
- if (rc < 0)
- LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
- rc);
- }
- LDLM_LOCK_PUT(lock);
- RETURN(rc);
-}
-
-/**
* Client-side IBITS lock convert.
*
* Inform server that lock has been converted instead of canceling.
* is made asynchronous.
*
*/
-int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
+int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits)
{
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct ldlm_async_args *aa;
struct obd_export *exp = lock->l_conn_export;
ENTRY;
- if (exp == NULL) {
- LDLM_ERROR(lock, "convert must not be called on local locks.");
- RETURN(-EINVAL);
- }
+ LASSERT(exp != NULL);
/* this is better to check earlier and it is done so already,
* but this check is kept too as final one to issue an error
body->lock_desc.l_req_mode = lock->l_req_mode;
body->lock_desc.l_granted_mode = lock->l_granted_mode;
- body->lock_desc.l_policy_data.l_inodebits.bits =
- lock->l_policy_data.l_inodebits.bits;
+ body->lock_desc.l_policy_data.l_inodebits.bits = new_bits;
body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0;
body->lock_flags = ldlm_flags_to_wire(*flags);
lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
LDLM_CONVERT - LDLM_FIRST_OPC);
- aa = ptlrpc_req_async_args(req);
- ldlm_lock2handle(lock, &aa->lock_handle);
- req->rq_interpret_reply = (ptlrpc_interpterer_t)lock_convert_interpret;
-
ptlrpcd_add_req(req);
RETURN(0);
}
RETURN(0);
}
+int ldlm_cli_convert(struct ldlm_lock *lock,
+ enum ldlm_cancel_flags cancel_flags)
+{
+ int rc = -EINVAL;
+
+ LASSERT(!lock->l_readers && !lock->l_writers);
+ LDLM_DEBUG(lock, "client lock convert START");
+
+ if (lock->l_resource->lr_type == LDLM_IBITS) {
+ lock_res_and_lock(lock);
+ do {
+ rc = ldlm_cli_inodebits_convert(lock, cancel_flags);
+ } while (rc == -EAGAIN);
+ unlock_res_and_lock(lock);
+ }
+
+ LDLM_DEBUG(lock, "client lock convert END");
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_cli_convert);
+
/**
* Client side lock cancel.
*
RETURN(0);
}
- /* Convert lock bits instead of cancel for IBITS locks */
- if (cancel_flags & LCF_CONVERT) {
- LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
- LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0);
-
- rc = ldlm_cli_dropbits(lock,
- lock->l_policy_data.l_inodebits.cancel_bits);
- if (rc == 0) {
- LDLM_LOCK_RELEASE(lock);
- RETURN(0);
- }
- }
-
lock_res_and_lock(lock);
+ LASSERT(!ldlm_is_converting(lock));
+
/* Lock is being canceled and the caller doesn't want to wait */
if (ldlm_is_canceling(lock)) {
if (cancel_flags & LCF_ASYNC) {
RETURN(0);
}
- /* Lock is being converted, cancel it immediately.
- * When convert will end, it releases lock and it will be gone.
- */
- if (ldlm_is_converting(lock)) {
- /* set back flags removed by convert */
- ldlm_set_cbpending(lock);
- ldlm_set_bl_ast(lock);
- }
-
ldlm_set_canceling(lock);
unlock_res_and_lock(lock);
/* No locks which got blocking requests. */
LASSERT(!ldlm_is_bl_ast(lock));
- if (!ldlm_is_canceling(lock) &&
- !ldlm_is_converting(lock))
+ if (!ldlm_is_canceling(lock))
break;
/* Somebody is already doing CANCEL. No need for this
lock_res_and_lock(lock);
/* Check flags again under the lock. */
- if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) ||
+ if (ldlm_is_canceling(lock) ||
ldlm_lock_remove_from_lru_check(lock, last_use) == 0) {
/* Another thread is removing lock from LRU, or
* somebody is already doing CANCEL, or there
if (lock->l_readers || lock->l_writers)
continue;
- /* If somebody is already doing CANCEL, or blocking AST came,
- * or lock is being converted then skip this lock. */
- if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) ||
- ldlm_is_converting(lock))
+ /*
+ * If somebody is already doing CANCEL, or blocking AST came
+ * then skip this lock.
+ */
+ if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock))
continue;
if (lockmode_compat(lock->l_granted_mode, mode))
/* See CBPENDING comment in ldlm_cancel_lru */
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
lock_flags;
-
LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, cancels);
LDLM_LOCK_GET(lock);
return !!(bits);
}
-int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld,
void *data, int flag)
{
struct lustre_handle lockh;
- __u64 bits = lock->l_policy_data.l_inodebits.bits;
int rc;
ENTRY;
{
__u64 cancel_flags = LCF_ASYNC;
- if (ll_md_need_convert(lock)) {
- cancel_flags |= LCF_CONVERT;
- /* For lock convert some cancel actions may require
- * this lock with non-dropped canceled bits, e.g. page
- * flush for DOM lock. So call ll_lock_cancel_bits()
- * here while canceled bits are still set.
- */
- bits = lock->l_policy_data.l_inodebits.cancel_bits;
- if (bits & MDS_INODELOCK_DOM)
- ll_lock_cancel_bits(lock, MDS_INODELOCK_DOM);
+ /* if lock convert is not needed then still have to
+ * pass lock via ldlm_cli_convert() to keep all states
+ * correct, set cancel_bits to full lock bits to cause
+ * full cancel to happen.
+ */
+ if (!ll_md_need_convert(lock)) {
+ lock_res_and_lock(lock);
+ lock->l_policy_data.l_inodebits.cancel_bits =
+ lock->l_policy_data.l_inodebits.bits;
+ unlock_res_and_lock(lock);
}
+ rc = ldlm_cli_convert(lock, cancel_flags);
+ if (!rc)
+ RETURN(0);
+ /* continue with cancel otherwise */
ldlm_lock2handle(lock, &lockh);
rc = ldlm_cli_cancel(&lockh, cancel_flags);
if (rc < 0) {
break;
}
case LDLM_CB_CANCELING:
+ {
+ __u64 to_cancel = lock->l_policy_data.l_inodebits.bits;
+
/* Nothing to do for non-granted locks */
if (!ldlm_is_granted(lock))
break;
- if (ldlm_is_converting(lock)) {
- /* this is called on already converted lock, so
- * ibits has remained bits only and cancel_bits
- * are bits that were dropped.
- * Note that DOM lock is handled prior lock convert
- * and is excluded here.
+ /* If 'ld' is supplied then bits to be cancelled are passed
+ * implicitly by lock converting and cancel_bits from 'ld'
+ * should be used. Otherwise full cancel is being performed
+ * and lock inodebits are used.
+ *
+ * Note: we cannot rely on cancel_bits in lock itself at this
+ * moment because they can be changed by concurrent thread,
+ * so ldlm_cli_inodebits_convert() pass cancel bits implicitly
+ * in 'ld' parameter.
+ */
+ if (ld) {
+ /* partial bits cancel allowed only during convert */
+ LASSERT(ldlm_is_converting(lock));
+ /* mask cancel bits by lock bits so only no any unused
+ * bits are passed to ll_lock_cancel_bits()
*/
- bits = lock->l_policy_data.l_inodebits.cancel_bits &
- ~MDS_INODELOCK_DOM;
- } else {
- LASSERT(ldlm_is_canceling(lock));
+ to_cancel &= ld->l_policy_data.l_inodebits.cancel_bits;
}
- ll_lock_cancel_bits(lock, bits);
+ ll_lock_cancel_bits(lock, to_cancel);
break;
+ }
default:
LBUG();
}