__u32 lwd_conn_cnt;
};
+struct ldlm_async_args {
+ struct lustre_handle lock_handle;
+};
+
int ldlm_expired_completion_wait(void *data)
{
struct lock_wait_data *lwd = data;
ENTRY;
LASSERT(!(*flags & LDLM_FL_REPLAY));
- if (unlikely(ns->ns_client)) {
+ if (unlikely(ns_is_client(ns))) {
CERROR("Trying to enqueue local lock in a shadow namespace\n");
LBUG();
}
lock = ldlm_handle2lock(lockh);
/* ldlm_cli_enqueue is holding a reference on this lock. */
- LASSERT(lock != NULL);
+ if (!lock) {
+ LASSERT(type == LDLM_FLOCK);
+ RETURN(-ENOLCK);
+ }
+
if (rc != ELDLM_OK) {
LASSERT(!is_replay);
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
* a single page on the send/receive side. XXX: 512 should be changed
* to more adequate value. */
-#define ldlm_req_handles_avail(exp, size, bufcount, off) \
-({ \
- int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \
- int _s = size[DLM_LOCKREQ_OFF]; \
- size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \
- _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \
- bufcount, size); \
- _avail /= sizeof(struct lustre_handle); \
- _avail += LDLM_LOCKREQ_HANDLES - off; \
- size[DLM_LOCKREQ_OFF] = _s; \
- _avail; \
-})
+static inline int ldlm_req_handles_avail(struct obd_export *exp,
+ int *size, int bufcount, int off)
+{
+ int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
+ int old_size = size[DLM_LOCKREQ_OFF];
+
+ size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+ avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
+ bufcount, size);
+ avail /= sizeof(struct lustre_handle);
+ avail += LDLM_LOCKREQ_HANDLES - off;
+ size[DLM_LOCKREQ_OFF] = old_size;
+
+ return avail;
+}
+
+static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+{
+ int size[2] = { sizeof(struct ptlrpc_body),
+ sizeof(struct ldlm_request) };
+ return ldlm_req_handles_avail(exp, size, 2, 0);
+}
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
- * @count locks in @cancel. */
+ * @count locks in @cancels. */
struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
int bufcount, int *size,
struct list_head *cancels,
* EARLY_CANCEL. Otherwise we have to send extra CANCEL
* rpc right on enqueue, what will make it slower, vs.
* asynchronous rpc in blocking thread. */
- count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count,
- LDLM_CANCEL_AGED);
+ count += ldlm_cancel_lru_local(ns, cancels,
+ exp_connect_lru_resize(exp) ? 0 : 1,
+ avail - count, LDLM_CANCEL_AGED);
size[DLM_LOCKREQ_OFF] =
ldlm_request_bufsize(count, LDLM_ENQUEUE);
}
* will incrment @lock_count according to the lock handle amount
* actually written to the buffer. */
dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF;
- }
- if (req)
ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0);
- else
+ } else {
ldlm_lock_list_put(cancels, l_bl_ast, count);
+ }
RETURN(req);
}
* request was created in ldlm_cli_enqueue and it is the async request,
* pass it to the caller in @reqp. */
int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
+ struct ldlm_enqueue_info *einfo,
const struct ldlm_res_id *res_id,
- ldlm_type_t type, ldlm_policy_data_t *policy,
- ldlm_mode_t mode, int *flags,
- ldlm_blocking_callback blocking,
- ldlm_completion_callback completion,
- ldlm_glimpse_callback glimpse,
- void *data, void *lvb, __u32 lvb_len, void *lvb_swabber,
+ ldlm_policy_data_t *policy, int *flags,
+ void *lvb, __u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh, int async)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
[DLM_LOCKREQ_OFF] = sizeof(*body),
[DLM_REPLY_REC_OFF] = lvb_len };
int is_replay = *flags & LDLM_FL_REPLAY;
- int req_passed_in = 1, rc;
+ int req_passed_in = 1, rc, err;
struct ptlrpc_request *req;
ENTRY;
LDLM_DEBUG(lock, "client-side enqueue START");
LASSERT(exp == lock->l_conn_export);
} else {
- lock = ldlm_lock_create(ns, res_id, type, mode, blocking,
- completion, glimpse, data, lvb_len);
+ lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
+ einfo->ei_mode, einfo->ei_cb_bl,
+ einfo->ei_cb_cp, einfo->ei_cb_gl,
+ einfo->ei_cbdata, lvb_len);
if (lock == NULL)
RETURN(-ENOMEM);
/* for the local lock, add the reference */
- ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock_addref_internal(lock, einfo->ei_mode);
ldlm_lock2handle(lock, lockh);
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL) {
* descriptor (ldlm_lock2desc() below) but use an
* inodebits lock internally with both bits set.
*/
- if (type == LDLM_IBITS && !(exp->exp_connect_flags &
- OBD_CONNECT_IBITS))
+ if (einfo->ei_type == LDLM_IBITS &&
+ !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
lock->l_policy_data.l_inodebits.bits =
MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_UPDATE;
lock->l_policy_data = *policy;
}
- if (type == LDLM_EXTENT)
+ if (einfo->ei_type == LDLM_EXTENT)
lock->l_req_extent = policy->l_extent;
LDLM_DEBUG(lock, "client-side enqueue START");
}
if (reqp == NULL || *reqp == NULL) {
req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
if (req == NULL) {
- failed_lock_cleanup(ns, lock, lockh, mode);
+ failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
LDLM_LOCK_PUT(lock);
RETURN(-ENOMEM);
}
lock->l_conn_export = exp;
lock->l_export = NULL;
- lock->l_blocking_ast = blocking;
+ lock->l_blocking_ast = einfo->ei_cb_bl;
/* Dump lock data into the request buffer */
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
* where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
* [i_size, OBD_OBJECT_EOF] lock is taken.
*/
- LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT ||
+ LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
policy->l_extent.end == OBD_OBJECT_EOF));
if (async) {
LDLM_DEBUG(lock, "sending request");
rc = ptlrpc_queue_wait(req);
- rc = ldlm_cli_enqueue_fini(exp, req, type, policy ? 1 : 0,
- mode, flags, lvb, lvb_len, lvb_swabber,
- lockh, rc);
+ err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
+ einfo->ei_mode, flags, lvb, lvb_len,
+ lvb_swabber, lockh, rc);
+
+ /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
+ * one reference that we took */
+ if (err == -ENOLCK)
+ LDLM_LOCK_PUT(lock);
+ else
+ rc = err;
if (!req_passed_in && req != NULL) {
ptlrpc_req_finished(req);
struct ldlm_resource *res;
int rc;
ENTRY;
- if (lock->l_resource->lr_namespace->ns_client) {
+ if (ns_is_client(lock->l_resource->lr_namespace)) {
CERROR("Trying to cancel local lock\n");
LBUG();
}
}
/* Cancel locks locally.
- * Returns: 1 if there is a need to send a cancel RPC to server. 0 otherwise. */
+ * Returns:
+ * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server;
+ * LDLM_FL_CANCELING otherwise;
+ * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */
static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
{
- int rc = 0;
+ int rc = LDLM_FL_LOCAL_ONLY;
ENTRY;
if (lock->l_conn_export) {
local_only = (lock->l_flags &
(LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
ldlm_cancel_callback(lock);
+ rc = (lock->l_flags & LDLM_FL_BL_AST) ?
+ LDLM_FL_BL_AST : LDLM_FL_CANCELING;
unlock_res_and_lock(lock);
- if (local_only)
- CDEBUG(D_INFO, "not sending request (at caller's "
+ if (local_only) {
+ CDEBUG(D_DLMTRACE, "not sending request (at caller's "
"instruction)\n");
- else
- rc = 1;
-
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
ldlm_lock_cancel(lock);
} else {
- if (lock->l_resource->lr_namespace->ns_client) {
+ if (ns_is_client(lock->l_resource->lr_namespace)) {
LDLM_ERROR(lock, "Trying to cancel local lock");
LBUG();
}
{
struct ldlm_request *dlm;
struct ldlm_lock *lock;
- int max;
+ int max, packed = 0;
ENTRY;
dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
list_for_each_entry(lock, head, l_bl_ast) {
if (!count--)
break;
- /* Pack the lock handle to the given request buffer. */
LASSERT(lock->l_conn_export);
- /* Cannot be set on a lock in a resource granted list.*/
- LASSERT(!(lock->l_flags &
- (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)));
- /* If @lock is marked CANCEL_ON_BLOCK, cancel
- * will not be sent in ldlm_cli_cancel(). It
- * is used for liblustre clients, no cancel on
- * block requests. However, even for liblustre
- * clients, when the flag is set, batched cancel
- * should be sent (what if no block rpc has
- * come). To not send another separated rpc in
- * this case, the caller pass CANCEL_ON_BLOCK
- * flag to ldlm_cli_cancel_unused_resource(). */
+ /* Pack the lock handle to the given request buffer. */
+ LDLM_DEBUG(lock, "packing");
dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
+ packed++;
}
+ CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
EXIT;
}
LASSERT(exp != NULL);
LASSERT(count > 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
+ RETURN(count);
+
free = ldlm_req_handles_avail(exp, size, 2, 0);
if (count > free)
count = free;
while (1) {
imp = class_exp2cliimp(exp);
if (imp == NULL || imp->imp_invalid) {
- CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
- imp);
- break;
+ CDEBUG(D_DLMTRACE,
+ "skipping cancel on invalid import %p\n", imp);
+ RETURN(count);
}
req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
GOTO(out, rc = -ENOMEM);
req->rq_no_resend = 1;
+ req->rq_no_delay = 1;
/* XXX FIXME bug 249 */
req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
"out of sync -- not fatal\n",
libcfs_nid2str(req->rq_import->
imp_connection->c_peer.nid));
- } else if (rc == -ETIMEDOUT) {
+ rc = 0;
+ } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
+ req->rq_import_generation == imp->imp_generation) {
ptlrpc_req_finished(req);
continue;
} else if (rc != ELDLM_OK) {
return sent ? sent : rc;
}
+static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
+{
+ LASSERT(imp != NULL);
+ return &imp->imp_obd->obd_namespace->ns_pool;
+}
+
+int ldlm_cli_update_pool(struct ptlrpc_request *req)
+{
+ struct ldlm_pool *pl;
+ ENTRY;
+
+ if (!imp_connect_lru_resize(req->rq_import))
+ RETURN(0);
+
+ if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
+ lustre_msg_get_limit(req->rq_repmsg) == 0)
+ RETURN(0);
+
+ pl = ldlm_imp2pl(req->rq_import);
+
+ spin_lock(&pl->pl_lock);
+#ifdef __KERNEL__
+ {
+ __u64 old_slv, fast_slv_change;
+
+ old_slv = ldlm_pool_get_slv(pl);
+ fast_slv_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
+ do_div(fast_slv_change, 100);
+#endif
+ pl->pl_update_time = cfs_time_current();
+ ldlm_pool_set_slv(pl, lustre_msg_get_slv(req->rq_repmsg));
+ ldlm_pool_set_limit(pl, lustre_msg_get_limit(req->rq_repmsg));
+#ifdef __KERNEL__
+ /* Wake up pools thread only if SLV has changed more than
+ * 5% since last update. In this case we want to react asap.
+ * Otherwise it is no sense to wake up pools as they are
+ * re-calculated every 1s anyways. */
+ if (old_slv > ldlm_pool_get_slv(pl) &&
+ old_slv - ldlm_pool_get_slv(pl) > fast_slv_change)
+ ldlm_pools_wakeup();
+ }
+#endif
+ spin_unlock(&pl->pl_lock);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_cli_update_pool);
+
int ldlm_cli_cancel(struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
- CFS_LIST_HEAD(head);
+ CFS_LIST_HEAD(cancels);
int rc = 0;
ENTRY;
/* concurrent cancels on the same handle can happen */
lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
- if (lock == NULL)
+ if (lock == NULL) {
+ LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
RETURN(0);
-
+ }
+
rc = ldlm_cli_cancel_local(lock);
- if (rc <= 0)
- GOTO(out, rc);
-
- list_add(&lock->l_bl_ast, &head);
- rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
- EXIT;
-out:
- LDLM_LOCK_PUT(lock);
- return rc < 0 ? rc : 0;
+ list_add(&lock->l_bl_ast, &cancels);
+
+ if (rc == LDLM_FL_BL_AST) {
+ rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0);
+ } else if (rc == LDLM_FL_CANCELING) {
+ int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+ int count = 1;
+ LASSERT(avail > 0);
+ count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace,
+ &cancels, 0, avail - 1,
+ LDLM_CANCEL_AGED);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ }
+ if (rc != LDLM_FL_CANCELING)
+ LDLM_LOCK_PUT(lock);
+ RETURN(rc < 0 ? rc : 0);
}
/* - Free space in lru for @count new locks,
int count, int max, int flags)
{
cfs_time_t cur = cfs_time_current();
+ int rc, added = 0, left, unused;
struct ldlm_lock *lock, *next;
- int rc, added = 0, left;
+ __u64 slv, lvf, lv;
ENTRY;
spin_lock(&ns->ns_unused_lock);
- count += ns->ns_nr_unused - ns->ns_max_unused;
+ unused = ns->ns_nr_unused;
+
+ if (!ns_connect_lru_resize(ns))
+ count += unused - ns->ns_max_unused;
+
while (!list_empty(&ns->ns_unused_list)) {
- struct list_head *tmp = ns->ns_unused_list.next;
- lock = list_entry(tmp, struct ldlm_lock, l_lru);
+ struct ldlm_pool *pl = &ns->ns_pool;
+
+ LASSERT(unused >= 0);
if (max && added >= max)
break;
- if ((added >= count) &&
- (!(flags & LDLM_CANCEL_AGED) ||
- cfs_time_before_64(cur, ns->ns_max_age +
- lock->l_last_used)))
+ list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
+ /* somebody is already doing CANCEL or there is a
+ * blocking request will send cancel. */
+ if (!(lock->l_flags & LDLM_FL_CANCELING) &&
+ !(lock->l_flags & LDLM_FL_BL_AST))
+ break;
+ }
+ if (&lock->l_lru == &ns->ns_unused_list)
break;
+ if (ns_connect_lru_resize(ns)) {
+ cfs_time_t la;
+
+ /* Take into account SLV only if cpount == 0. */
+ if (count == 0) {
+ /* Calculate lv for every lock. */
+ spin_lock(&pl->pl_lock);
+ slv = ldlm_pool_get_slv(pl);
+ lvf = atomic_read(&pl->pl_lock_volume_factor);
+ spin_unlock(&pl->pl_lock);
+
+ la = cfs_duration_sec(cfs_time_sub(cur,
+ lock->l_last_used));
+ if (la == 0)
+ la = 1;
+
+ /* Stop when slv is not yet come from server
+ * or lv is smaller than it is. */
+ lv = lvf * la * unused;
+ if (slv == 1 || lv < slv)
+ break;
+ } else {
+ if (added >= count)
+ break;
+ }
+ } else {
+ if ((added >= count) &&
+ (!(flags & LDLM_CANCEL_AGED) ||
+ cfs_time_before_64(cur, ns->ns_max_age +
+ lock->l_last_used)))
+ break;
+ }
+
LDLM_LOCK_GET(lock); /* dropped by bl thread */
spin_unlock(&ns->ns_unused_lock);
lock_res_and_lock(lock);
- if ((ldlm_lock_remove_from_lru(lock) == 0) ||
- (lock->l_flags & LDLM_FL_CANCELING)) {
+ /* Check flags again under the lock. */
+ if ((lock->l_flags & LDLM_FL_CANCELING) ||
+ (lock->l_flags & LDLM_FL_BL_AST) ||
+ (ldlm_lock_remove_from_lru(lock) == 0)) {
/* other thread is removing lock from lru or
- * somebody is already doing CANCEL. */
+ * somebody is already doing CANCEL or
+ * there is a blocking request which will send
+ * cancel by itseft. */
unlock_res_and_lock(lock);
LDLM_LOCK_PUT(lock);
spin_lock(&ns->ns_unused_lock);
unlock_res_and_lock(lock);
spin_lock(&ns->ns_unused_lock);
added++;
+ unused--;
}
spin_unlock(&ns->ns_unused_lock);
list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
if (left-- == 0)
break;
+
rc = ldlm_cli_cancel_local(lock);
- if (rc == 0) {
+ if (rc == LDLM_FL_BL_AST) {
+ CFS_LIST_HEAD(head);
+
+ LDLM_DEBUG(lock, "Cancel lock separately");
+ list_del_init(&lock->l_bl_ast);
+ list_add(&lock->l_bl_ast, &head);
+ ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
+ if (rc == LDLM_FL_LOCAL_ONLY) {
/* CANCEL RPC should not be sent to server. */
list_del_init(&lock->l_bl_ast);
LDLM_LOCK_PUT(lock);
added--;
}
+
}
RETURN(added);
}
* in a thread and this function will return after the thread has been
* asked to call the callback. when called with LDLM_SYNC the blocking
* callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync)
{
CFS_LIST_HEAD(cancels);
int count, rc;
#ifndef __KERNEL__
sync = LDLM_SYNC; /* force to be sync in user space */
#endif
- count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0);
+ count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0);
if (sync == LDLM_ASYNC) {
- struct ldlm_lock *lock, *next;
- list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
- /* Remove from the list to allow blocking thread to
- * re-use l_bl_ast. */
- list_del_init(&lock->l_bl_ast);
- rc = ldlm_bl_to_thread(ns, NULL, lock,
- LDLM_FL_CANCELING);
- if (rc)
- list_add_tail(&lock->l_bl_ast, &next->l_bl_ast);
- }
+ rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
+ if (rc == 0)
+ RETURN(count);
}
- /* If some locks are left in the list in ASYNC mode, or
+ /* If an error occured in ASYNC mode, or
* this is SYNC mode, cancel the list. */
- ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF, 0);
- RETURN(0);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ RETURN(count);
}
/* Find and cancel locally unused locks found on resource, matched to the
continue;
}
+ /* If somebody is already doing CANCEL, or blocking ast came,
+ * skip this lock. */
+ if (lock->l_flags & LDLM_FL_BL_AST ||
+ lock->l_flags & LDLM_FL_CANCELING)
+ continue;
+
if (lockmode_compat(lock->l_granted_mode, mode))
continue;
policy->l_inodebits.bits))
continue;
- /* If somebody is already doing CANCEL, skip it. */
- if (lock->l_flags & LDLM_FL_CANCELING)
- continue;
-
/* See CBPENDING comment in ldlm_cancel_lru */
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
- lock_flags;
+ lock_flags;
LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, cancels);
/* Handle only @count inserted locks. */
left = count;
list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
- int rc = 0;
+ int rc = LDLM_FL_LOCAL_ONLY;
if (left-- == 0)
break;
else
rc = ldlm_cli_cancel_local(lock);
- if (rc == 0) {
+ if (rc == LDLM_FL_BL_AST) {
+ CFS_LIST_HEAD(head);
+
+ LDLM_DEBUG(lock, "Cancel lock separately");
+ list_del_init(&lock->l_bl_ast);
+ list_add(&lock->l_bl_ast, &head);
+ ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
+ if (rc == LDLM_FL_LOCAL_ONLY) {
/* CANCEL RPC should not be sent to server. */
list_del_init(&lock->l_bl_ast);
LDLM_LOCK_PUT(lock);
ldlm_cancel_pack(req, off, cancels, count);
else
res = ldlm_cli_cancel_req(lock->l_conn_export,
- cancels, count, flags);
+ cancels, count,
+ flags);
} else {
res = ldlm_cli_cancel_req(lock->l_conn_export,
cancels, 1, flags);
int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
ldlm_policy_data_t *policy,
- int mode, int flags, void *opaque)
+ ldlm_mode_t mode, int flags, void *opaque)
{
struct ldlm_resource *res;
CFS_LIST_HEAD(cancels);
count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
0, flags, opaque);
- rc = ldlm_cli_cancel_list(&cancels, count, NULL,
- DLM_LOCKREQ_OFF, flags);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
int count = 0;
ENTRY;
- LASSERT(ns->ns_client == LDLM_NAMESPACE_CLIENT);
+ LASSERT(ns_is_client(ns));
res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
if (res == NULL)
!lock->l_readers && !lock->l_writers &&
!(lock->l_flags & LDLM_FL_LOCAL) &&
!(lock->l_flags & LDLM_FL_CBPENDING)) {
- lock->l_last_used = cfs_time_current();
- spin_lock(&ns->ns_unused_lock);
- LASSERT(ns->ns_nr_unused >= 0);
- list_add_tail(&lock->l_lru, &ns->ns_unused_list);
- ns->ns_nr_unused++;
- spin_unlock(&ns->ns_unused_lock);
+ ldlm_lock_add_to_lru(lock);
lock->l_flags &= ~LDLM_FL_NO_LRU;
LDLM_DEBUG(lock, "join lock to lru");
count++;
}
static int replay_lock_interpret(struct ptlrpc_request *req,
- void * data, int rc)
+ struct ldlm_async_args *aa, int rc)
{
struct ldlm_lock *lock;
struct ldlm_reply *reply;
if (rc != ELDLM_OK)
GOTO(out, rc);
- lock = req->rq_async_args.pointer_arg[0];
- LASSERT(lock != NULL);
reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
lustre_swab_ldlm_reply);
GOTO (out, rc = -EPROTO);
}
+ lock = ldlm_handle2lock(&aa->lock_handle);
+ if (!lock) {
+ CERROR("received replay ack for unknown local cookie "LPX64
+ " remote cookie "LPX64 " from server %s id %s\n",
+ aa->lock_handle.cookie, reply->lock_handle.cookie,
+ req->rq_export->exp_client_uuid.uuid,
+ libcfs_id2str(req->rq_peer));
+ GOTO(out, rc = -ESTALE);
+ }
+
lock->l_remote_handle = reply->lock_handle;
LDLM_DEBUG(lock, "replayed lock:");
ptlrpc_import_recovery_state_machine(req->rq_import);
- out:
+ LDLM_LOCK_PUT(lock);
+out:
if (rc != ELDLM_OK)
ptlrpc_connect_import(req->rq_import, NULL);
struct ptlrpc_request *req;
struct ldlm_request *body;
struct ldlm_reply *reply;
+ struct ldlm_async_args *aa;
int buffers = 2;
int size[3] = { sizeof(struct ptlrpc_body) };
int flags;
ENTRY;
+
+ /* Bug 11974: Do not replay a lock which is actively being canceled */
+ if (lock->l_flags & LDLM_FL_CANCELING) {
+ LDLM_DEBUG(lock, "Not replaying canceled lock:");
+ RETURN(0);
+ }
+
/* If this is reply-less callback lock, we cannot replay it, since
* server might have long dropped it, but notification of that event was
* lost by network. (and server granted conflicting lock already) */
LDLM_DEBUG(lock, "replaying lock:");
atomic_inc(&req->rq_import->imp_replay_inflight);
- req->rq_async_args.pointer_arg[0] = lock;
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = (struct ldlm_async_args *)&req->rq_async_args;
+ aa->lock_handle = body->lock_handle[0];
req->rq_interpret_reply = replay_lock_interpret;
ptlrpcd_add_req(req);