* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
#define DEBUG_SUBSYSTEM S_LDLM
-#ifndef __KERNEL__
-#include <signal.h>
-#include <liblustre.h>
-#endif
#include <lustre_dlm.h>
#include <obd_class.h>
#include "ldlm_internal.h"
-int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
-CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
+unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
+CFS_MODULE_PARM(ldlm_enqueue_min, "i", uint, 0644,
"lock enqueue timeout minimum");
/* in client side, whether the cached locks will be canceled before replay */
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_expired_completion_wait);
+
+/**
+ * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
+ * lock cancel, and their replies). Used for lock completion timeout on the
+ * client side.
+ *
+ * \param[in] lock lock which is waiting the completion callback
+ *
+ * \retval timeout in seconds to wait for the server reply
+ */
/* We use the same basis for both server side and client side functions
from a single node. */
-int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+static unsigned int ldlm_cp_timeout(struct ldlm_lock *lock)
{
- int timeout = at_get(ldlm_lock_to_ns_at(lock));
- if (AT_OFF)
- return obd_timeout / 2;
- /* Since these are non-updating timeouts, we should be conservative.
- It would be nice to have some kind of "early reply" mechanism for
- lock callbacks too... */
- timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
- return max(timeout, ldlm_enqueue_min);
+ unsigned int timeout;
+
+ if (AT_OFF)
+ return obd_timeout;
+
+ /* Wait a long time for enqueue - server may have to callback a
+ * lock from another client. Server will evict the other client if it
+ * doesn't respond reasonably, and then give us the lock. */
+ timeout = at_get(ldlm_lock_to_ns_at(lock));
+ return max(3 * timeout, ldlm_enqueue_min);
}
-EXPORT_SYMBOL(ldlm_get_enq_timeout);
/**
* Helper function for ldlm_completion_ast(), updating timings when lock is
* actually granted.
*/
-static int ldlm_completion_tail(struct ldlm_lock *lock)
+static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
{
long delay;
- int result;
+ int result = 0;
if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
LDLM_DEBUG(lock, "client-side enqueue: destroyed");
result = -EIO;
+ } else if (data == NULL) {
+ LDLM_DEBUG(lock, "client-side enqueue: granted");
} else {
+ /* Take into AT only CP RPC, not immediately granted locks */
delay = cfs_time_sub(cfs_time_current_sec(),
lock->l_last_activity);
LDLM_DEBUG(lock, "client-side enqueue: granted after "
CFS_DURATION_T"s", delay);
/* Update our time estimate */
- at_measured(ldlm_lock_to_ns_at(lock),
- delay);
- result = 0;
+ at_measured(ldlm_lock_to_ns_at(lock), delay);
}
return result;
}
RETURN(0);
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV))) {
+ if (!(flags & LDLM_FL_BLOCKED_MASK)) {
wake_up(&lock->l_waitq);
- RETURN(ldlm_completion_tail(lock));
+ RETURN(ldlm_completion_tail(lock, data));
}
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
goto noreproc;
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV))) {
+ if (!(flags & LDLM_FL_BLOCKED_MASK)) {
wake_up(&lock->l_waitq);
RETURN(0);
}
imp = obd->u.cli.cl_import;
}
- /* Wait a long time for enqueue - server may have to callback a
- lock from another client. Server will evict the other client if it
- doesn't respond reasonably, and then give us the lock. */
- timeout = ldlm_get_enq_timeout(lock) * 2;
+ timeout = ldlm_cp_timeout(lock);
- lwd.lwd_lock = lock;
+ lwd.lwd_lock = lock;
+ lock->l_last_activity = cfs_time_current_sec();
if (ldlm_is_no_timeout(lock)) {
LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
rc);
RETURN(rc);
- }
+ }
- RETURN(ldlm_completion_tail(lock));
+ RETURN(ldlm_completion_tail(lock, data));
}
EXPORT_SYMBOL(ldlm_completion_ast);
{
return -ELDLM_NO_LOCK_DATA;
}
-EXPORT_SYMBOL(ldlm_glimpse_ast);
/**
* Enqueue a local lock (typically on a server).
struct ldlm_lock *lock;
struct ldlm_reply *reply;
int cleanup_phase = 1;
- int size = 0;
ENTRY;
lock = ldlm_handle2lock(lockh);
if (reply == NULL)
GOTO(cleanup, rc = -EPROTO);
- if (lvb_len != 0) {
- LASSERT(lvb != NULL);
+ if (lvb_len > 0) {
+ int size = 0;
size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER);
lvb_len, size);
GOTO(cleanup, rc = -EINVAL);
}
+ lvb_len = size;
}
if (rc == ELDLM_LOCK_ABORTED) {
- if (lvb_len != 0)
+ if (lvb_len > 0 && lvb != NULL)
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
- lvb, size);
- GOTO(cleanup, rc = (rc != 0 ? rc : ELDLM_LOCK_ABORTED));
+ lvb, lvb_len);
+ GOTO(cleanup, rc = rc ? : ELDLM_LOCK_ABORTED);
}
/* lock enqueued on the server */
LDLM_DEBUG(lock,"client-side enqueue, new policy data");
}
- if ((*flags) & LDLM_FL_AST_SENT ||
- /* Cancel extent locks as soon as possible on a liblustre client,
- * because it cannot handle asynchronous ASTs robustly (see
- * bug 7311). */
- (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
+ if ((*flags) & LDLM_FL_AST_SENT) {
lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
unlock_res_and_lock(lock);
/* If the lock has already been granted by a completion AST, don't
* clobber the LVB with an older one. */
- if (lvb_len != 0) {
+ if (lvb_len > 0) {
/* We must lock or a racing completion might update lvb without
* letting us know and we'll clobber the correct value.
* Cannot unlock after the check either, a that still leaves
lock_res_and_lock(lock);
if (lock->l_req_mode != lock->l_granted_mode)
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
- lock->l_lvb_data, size);
+ lock->l_lvb_data, lvb_len);
unlock_res_and_lock(lock);
if (rc < 0) {
cleanup_phase = 1;
}
}
- if (lvb_len && lvb != NULL) {
- /* Copy the LVB here, and not earlier, because the completion
- * AST (if any) can override what we got in the reply */
- memcpy(lvb, lock->l_lvb_data, lvb_len);
- }
+ if (lvb_len > 0 && lvb != NULL) {
+ /* Copy the LVB here, and not earlier, because the completion
+ * AST (if any) can override what we got in the reply */
+ memcpy(lvb, lock->l_lvb_data, lvb_len);
+ }
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
enum req_location loc,
int off)
{
- int size = req_capsule_msg_size(pill, loc);
- return ldlm_req_handles_avail(size, off);
+ __u32 size = req_capsule_msg_size(pill, loc);
+ return ldlm_req_handles_avail(size, off);
}
static inline int ldlm_format_handles_avail(struct obd_import *imp,
const struct req_format *fmt,
enum req_location loc, int off)
{
- int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
- return ldlm_req_handles_avail(size, off);
+ __u32 size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
+ return ldlm_req_handles_avail(size, off);
}
/**
dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
LASSERT(dlm);
/* Skip first lock handler in ldlm_request_pack(),
- * this method will incrment @lock_count according
+ * this method will increment @lock_count according
* to the lock handle amount actually written to
* the buffer. */
dlm->lock_count = canceloff;
lock->l_export = NULL;
lock->l_blocking_ast = einfo->ei_cb_bl;
lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
+ lock->l_last_activity = cfs_time_current_sec();
/* lock not sent to server yet */
ptlrpc_request_set_replen(req);
}
- /*
- * Liblustre client doesn't get extent locks, except for O_APPEND case
- * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
- * [i_size, OBD_OBJECT_EOF] lock is taken.
- */
- LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
- policy->l_extent.end == OBD_OBJECT_EOF));
-
if (async) {
LASSERT(reqp != NULL);
RETURN(0);
ptlrpc_req_finished(req);
return rc;
}
-EXPORT_SYMBOL(ldlm_cli_convert);
/**
* Cancel locks locally.
out:
return sent ? sent : rc;
}
-EXPORT_SYMBOL(ldlm_cli_cancel_req);
static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
{
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_cli_update_pool);
/**
* Client side lock cancel.
RETURN(count);
}
-EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
/**
* Cancel as many locks as possible w/o sending any RPCs (e.g. to write back
if (count && added >= count)
return LDLM_POLICY_KEEP_LOCK;
+ /* Despite of the LV, It doesn't make sense to keep the lock which
+ * is unused for ns_max_age time. */
+ if (cfs_time_after(cfs_time_current(),
+ cfs_time_add(lock->l_last_used, ns->ns_max_age)))
+ return LDLM_POLICY_CANCEL_LOCK;
+
slv = ldlm_pool_get_slv(pl);
lvf = ldlm_pool_get_lvf(pl);
la = cfs_duration_sec(cfs_time_sub(cur,
if (slv == 0 || lv < slv)
return LDLM_POLICY_KEEP_LOCK;
- if (ns->ns_cancel != NULL && ns->ns_cancel(lock) == 0)
- return LDLM_POLICY_KEEP_LOCK;
-
return LDLM_POLICY_CANCEL_LOCK;
}
int unused, int added,
int count)
{
- if (added >= count)
- return LDLM_POLICY_KEEP_LOCK;
-
- if (cfs_time_before(cfs_time_current(),
+ if ((added >= count) &&
+ cfs_time_before(cfs_time_current(),
cfs_time_add(lock->l_last_used, ns->ns_max_age)))
- return LDLM_POLICY_KEEP_LOCK;
-
- if (ns->ns_cancel != NULL && ns->ns_cancel(lock) == 0)
- return LDLM_POLICY_KEEP_LOCK;
+ return LDLM_POLICY_KEEP_LOCK;
return LDLM_POLICY_CANCEL_LOCK;
}
int count, rc;
ENTRY;
-#ifndef __KERNEL__
- cancel_flags &= ~LCF_ASYNC; /* force to be sync in user space */
-#endif
/* Just prepare the list of locks, do not actually cancel them yet.
* Locks are cancelled later in a separate thread. */
count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
RETURN(ELDLM_OK);
}
}
-EXPORT_SYMBOL(ldlm_cli_cancel_unused);
/* Lock iterators. */
unlock_res(res);
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_resource_foreach);
struct iter_helper_data {
ldlm_iterator_t iter;
ldlm_res_iter_helper, &helper);
}
-EXPORT_SYMBOL(ldlm_namespace_foreach);
/* non-blocking function to manipulate a lock whose cb_data is being put away.
* return 0: find no resource
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_replay_locks);