#include <linux/kthread.h>
#include <linux/list.h>
#include <libcfs/libcfs.h>
+#include <libcfs/linux/linux-mem.h>
#include <lustre_errno.h>
#include <lustre_dlm.h>
#include <obd_class.h>
/*
* timeout for initial callback (AST) reply (bz10399)
* Due to having to send a 32 bit time value over the
- * wire return it as time_t instead of time64_t
+ * wire return it as timeout_t instead of time64_t
*/
-static inline time_t ldlm_get_rq_timeout(void)
+static inline timeout_t ldlm_get_rq_timeout(void)
{
/* Non-AT value */
- time_t timeout = min(ldlm_timeout, obd_timeout / 3);
+ timeout_t timeout = min(ldlm_timeout, obd_timeout / 3);
return timeout < 1 ? 1 : timeout;
}
static LIST_HEAD(expired_lock_list);
static int ldlm_lock_busy(struct ldlm_lock *lock);
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout);
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds);
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout);
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout);
static inline int have_expired_locks(void)
{
static int expired_lock_main(void *arg)
{
struct list_head *expired = &expired_lock_list;
- struct l_wait_info lwi = { 0 };
int do_dump;
ENTRY;
wake_up(&expired_lock_wait_queue);
while (1) {
- l_wait_event(expired_lock_wait_queue,
- have_expired_locks() ||
- expired_lock_thread_state == ELT_TERMINATE,
- &lwi);
+ wait_event_idle(expired_lock_wait_queue,
+ have_expired_locks() ||
+ expired_lock_thread_state == ELT_TERMINATE);
spin_lock_bh(&waiting_locks_spinlock);
if (expired_lock_dump) {
/* Check if we need to prolong timeout */
if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
- lock->l_callback_timeout != 0 && /* not AST error */
+ lock->l_callback_timestamp != 0 && /* not AST error */
ldlm_lock_busy(lock)) {
LDLM_DEBUG(lock, "prolong the busy lock");
lock_res_and_lock(lock);
while (!list_empty(&waiting_locks_list)) {
lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
- if (lock->l_callback_timeout > ktime_get_seconds() ||
+ if (lock->l_callback_timestamp > ktime_get_seconds() ||
lock->l_req_mode == LCK_GROUP)
break;
* the waiting_locks_list and ldlm_add_waiting_lock()
* already grabbed a ref
*/
- list_del(&lock->l_pending_chain);
- list_add(&lock->l_pending_chain, &expired_lock_list);
+ list_move(&lock->l_pending_chain, &expired_lock_list);
need_dump = 1;
}
* left.
*/
if (!list_empty(&waiting_locks_list)) {
- unsigned long timeout_jiffies;
+ time64_t now = ktime_get_seconds();
+ timeout_t delta = 0;
lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
- timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
- mod_timer(&waiting_locks_timer, timeout_jiffies);
+ if (lock->l_callback_timestamp - now > 0)
+ delta = lock->l_callback_timestamp - now;
+ mod_timer(&waiting_locks_timer,
+ jiffies + cfs_time_seconds(delta));
}
spin_unlock_bh(&waiting_locks_spinlock);
}
*
* Called with the namespace lock held.
*/
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds)
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t delay)
{
- unsigned long timeout_jiffies;
- time64_t timeout;
+ unsigned long timeout_jiffies = jiffies;
+ time64_t now = ktime_get_seconds();
+ time64_t deadline;
+ timeout_t timeout;
if (!list_empty(&lock->l_pending_chain))
return 0;
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
- seconds = 1;
+ delay = 1;
- timeout = ktime_get_seconds() + seconds;
- if (likely(timeout > lock->l_callback_timeout))
- lock->l_callback_timeout = timeout;
+ deadline = now + delay;
+ if (likely(deadline > lock->l_callback_timestamp))
+ lock->l_callback_timestamp = deadline;
- timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
+ timeout = clamp_t(timeout_t, lock->l_callback_timestamp - now,
+ 0, delay);
+ timeout_jiffies += cfs_time_seconds(timeout);
if (time_before(timeout_jiffies, waiting_locks_timer.expires) ||
!timer_pending(&waiting_locks_timer))
obd_stale_export_adjust(lock->l_export);
}
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
{
int ret;
if (ret)
ldlm_add_blocked_lock(lock);
- LDLM_DEBUG(lock, "%sadding to wait list(timeout: %lld, AT: %s)",
+ LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
AT_OFF ? "off" : "on");
return ret;
/* No more, just cancel. */
del_timer(&waiting_locks_timer);
} else {
+ time64_t now = ktime_get_seconds();
struct ldlm_lock *next;
+ timeout_t delta = 0;
next = list_entry(list_next, struct ldlm_lock,
l_pending_chain);
+ if (next->l_callback_timestamp - now > 0)
+ delta = lock->l_callback_timestamp - now;
+
mod_timer(&waiting_locks_timer,
- cfs_time_seconds(next->l_callback_timeout));
+ jiffies + cfs_time_seconds(delta));
}
}
list_del_init(&lock->l_pending_chain);
*
* Called with namespace lock held.
*/
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
{
if (lock->l_export == NULL) {
/* We don't have a "waiting locks list" on clients. */
RETURN(0);
}
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
{
RETURN(0);
}
*
* \retval timeout in seconds to wait for the client reply
*/
-time64_t ldlm_bl_timeout(struct ldlm_lock *lock)
+timeout_t ldlm_bl_timeout(struct ldlm_lock *lock)
{
- time64_t timeout;
+ timeout_t timeout;
if (AT_OFF)
return obd_timeout / 2;
* lock callbacks too...
*/
timeout = at_get(&lock->l_export->exp_bl_lock_at);
- return max(timeout + (timeout >> 1), (time64_t)ldlm_enqueue_min);
+ return max_t(timeout_t, timeout + (timeout >> 1),
+ (timeout_t)ldlm_enqueue_min);
}
EXPORT_SYMBOL(ldlm_bl_timeout);
* the lock to the expired list
*/
LDLM_LOCK_GET(lock);
- lock->l_callback_timeout = 0; /* differentiate it from expired locks */
+ /* differentiate it from expired locks */
+ lock->l_callback_timestamp = 0;
list_add(&lock->l_pending_chain, &expired_lock_list);
wake_up(&expired_lock_wait_queue);
spin_unlock_bh(&waiting_locks_spinlock);
if (req == NULL)
RETURN(-ENOMEM);
- CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
- ca = ptlrpc_req_async_args(req);
+ ca = ptlrpc_req_async_args(ca, req);
ca->ca_set_arg = arg;
ca->ca_lock = lock;
RETURN(rc);
}
- CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
- ca = ptlrpc_req_async_args(req);
+ ca = ptlrpc_req_async_args(ca, req);
ca->ca_set_arg = arg;
ca->ca_lock = lock;
body->lock_handle[0] = lock->l_remote_handle;
ldlm_lock2desc(lock, &body->lock_desc);
- CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
- ca = ptlrpc_req_async_args(req);
+ ca = ptlrpc_req_async_args(ca, req);
ca->ca_set_arg = arg;
ca->ca_lock = lock;
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
LDLM_WORK_GL_AST);
if (rc == -ERESTART)
- ldlm_reprocess_all(res);
+ ldlm_reprocess_all(res, NULL);
RETURN(rc);
}
ENTRY;
- ca = ptlrpc_req_async_args(req);
+ ca = ptlrpc_req_async_args(ca, req);
lock = ca->ca_lock;
if (lock == NULL)
RETURN(ERR_PTR(-EFAULT));
} else {
if (ldlm_reclaim_full()) {
DEBUG_REQ(D_DLMTRACE, req,
- "Too many granted locks, reject current enqueue request and let the client retry later.\n");
+ "Too many granted locks, reject current enqueue request and let the client retry later");
GOTO(out, rc = -EINPROGRESS);
}
}
&lock->l_policy_data);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
lock->l_req_extent = lock->l_policy_data.l_extent;
+ else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
+ lock->l_policy_data.l_inodebits.try_bits =
+ dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
existing_lock:
- if (flags & LDLM_FL_HAS_INTENT) {
- /*
- * In this case, the reply buffer is allocated deep in
- * local_lock_enqueue by the policy function.
- */
- cookie = req;
- } else {
- /*
- * based on the assumption that lvb size never changes during
+ cookie = req;
+ if (!(flags & LDLM_FL_HAS_INTENT)) {
+ /* based on the assumption that lvb size never changes during
* resource life time otherwise it need resource->lr_lock's
- * protection
- */
+ * protection */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER, ldlm_lvbo_size(lock));
if (!err && !ldlm_is_cbpending(lock) &&
dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
- ldlm_reprocess_all(lock->l_resource);
+ ldlm_reprocess_all(lock->l_resource, lock);
LDLM_LOCK_RELEASE(lock);
}
struct obd_export *exp = req->rq_export;
struct ldlm_reply *dlm_rep;
struct ldlm_lock *lock;
+ __u64 bits;
+ __u64 new_bits;
int rc;
ENTRY;
dlm_rep->lock_flags = dlm_req->lock_flags;
lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
- if (lock) {
- __u64 bits;
- __u64 new;
+ if (!lock) {
+ LDLM_DEBUG_NOLOCK("server lock is canceled already");
+ req->rq_status = ELDLM_NO_LOCK_DATA;
+ RETURN(0);
+ }
- bits = lock->l_policy_data.l_inodebits.bits;
- new = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
- LDLM_DEBUG(lock, "server-side convert handler START");
+ LDLM_DEBUG(lock, "server-side convert handler START");
- if (ldlm_is_cancel(lock)) {
- LDLM_ERROR(lock, "convert on canceled lock!");
- rc = ELDLM_NO_LOCK_DATA;
- } else if (dlm_req->lock_desc.l_req_mode !=
- lock->l_granted_mode) {
- LDLM_ERROR(lock, "lock mode differs!");
- rc = ELDLM_NO_LOCK_DATA;
- } else if (bits == new) {
- /*
- * This can be valid situation if CONVERT RPCs are
- * re-ordered. Just finish silently
- */
- LDLM_DEBUG(lock, "lock is converted already!");
- rc = ELDLM_OK;
- } else {
- lock_res_and_lock(lock);
- if (ldlm_is_waited(lock))
- ldlm_del_waiting_lock(lock);
+ lock_res_and_lock(lock);
+ bits = lock->l_policy_data.l_inodebits.bits;
+ new_bits = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
- ldlm_clear_cbpending(lock);
- lock->l_policy_data.l_inodebits.cancel_bits = 0;
- ldlm_inodebits_drop(lock, bits & ~new);
+ if (ldlm_is_cancel(lock)) {
+ LDLM_DEBUG(lock, "convert on canceled lock!");
+ unlock_res_and_lock(lock);
+ GOTO(out_put, rc = ELDLM_NO_LOCK_DATA);
+ }
- ldlm_clear_blocking_data(lock);
- unlock_res_and_lock(lock);
+ if (dlm_req->lock_desc.l_req_mode != lock->l_granted_mode) {
+ LDLM_ERROR(lock, "lock mode differs!");
+ unlock_res_and_lock(lock);
+ GOTO(out_put, rc = -EPROTO);
+ }
- ldlm_reprocess_all(lock->l_resource);
- rc = ELDLM_OK;
- }
+ if (bits == new_bits) {
+ /*
+ * This can be valid situation if CONVERT RPCs are
+ * re-ordered. Just finish silently
+ */
+ LDLM_DEBUG(lock, "lock is converted already!");
+ unlock_res_and_lock(lock);
+ } else {
+ if (ldlm_is_waited(lock))
+ ldlm_del_waiting_lock(lock);
- if (rc == ELDLM_OK) {
- dlm_rep->lock_handle = lock->l_remote_handle;
- ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
- &dlm_rep->lock_desc.l_policy_data);
- }
+ ldlm_clear_cbpending(lock);
+ lock->l_policy_data.l_inodebits.cancel_bits = 0;
+ ldlm_inodebits_drop(lock, bits & ~new_bits);
- LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
- rc);
- LDLM_LOCK_PUT(lock);
- } else {
- rc = ELDLM_NO_LOCK_DATA;
- LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
- rc);
+ ldlm_clear_blocking_data(lock);
+ unlock_res_and_lock(lock);
+
+ ldlm_reprocess_all(lock->l_resource, NULL);
}
+ dlm_rep->lock_handle = lock->l_remote_handle;
+ ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
+ &dlm_rep->lock_desc.l_policy_data);
+ rc = ELDLM_OK;
+ EXIT;
+out_put:
+ LDLM_DEBUG(lock, "server-side convert handler END, rc = %d", rc);
+ LDLM_LOCK_PUT(lock);
req->rq_status = rc;
-
- RETURN(0);
+ return 0;
}
/**
struct ldlm_resource *res, *pres = NULL;
struct ldlm_lock *lock;
int i, count, done = 0;
+ unsigned int size;
ENTRY;
+ size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT);
+ if (size <= offsetof(struct ldlm_request, lock_handle) ||
+ (size - offsetof(struct ldlm_request, lock_handle)) /
+ sizeof(struct lustre_handle) < dlm_req->lock_count)
+ RETURN(0);
+
count = dlm_req->lock_count ? dlm_req->lock_count : 1;
if (first >= count)
RETURN(0);
for (i = first; i < count; i++) {
lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
if (!lock) {
+ /* below message checked in replay-single.sh test_36 */
LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (cookie %llu)",
dlm_req->lock_handle[i].cookie);
continue;
*/
if (res != pres) {
if (pres != NULL) {
- ldlm_reprocess_all(pres);
+ ldlm_reprocess_all(pres, NULL);
LDLM_RESOURCE_DELREF(pres);
ldlm_resource_putref(pres);
}
if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock) &&
lock->l_blast_sent != 0) {
- time64_t delay = ktime_get_real_seconds() -
- lock->l_blast_sent;
+ timeout_t delay = 0;
+
+ if (ktime_get_real_seconds() > lock->l_blast_sent)
+ delay = ktime_get_real_seconds() -
+ lock->l_blast_sent;
LDLM_DEBUG(lock,
- "server cancels blocked lock after %llds",
- (s64)delay);
+ "server cancels blocked lock after %ds",
+ delay);
at_measured(&lock->l_export->exp_bl_lock_at, delay);
}
ldlm_lock_cancel(lock);
LDLM_LOCK_PUT(lock);
}
if (pres != NULL) {
- ldlm_reprocess_all(pres);
+ ldlm_reprocess_all(pres, NULL);
LDLM_RESOURCE_DELREF(pres);
ldlm_resource_putref(pres);
}
RETURN(-EFAULT);
}
+ if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) <
+ offsetof(struct ldlm_request, lock_handle[1]))
+ RETURN(-EPROTO);
+
if (req->rq_export && req->rq_export->exp_nid_stats &&
req->rq_export->exp_nid_stats->nid_ldlm_stats)
lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
#endif /* HAVE_SERVER_SUPPORT */
/**
- * Callback handler for receiving incoming blocking ASTs.
- *
- * This can only happen on client side.
+ * Server may pass additional information about blocking lock.
+ * For IBITS locks it is conflicting bits which can be used for
+ * lock convert instead of cancel.
*/
-void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
{
- int do_ast;
-
- ENTRY;
-
- LDLM_DEBUG(lock, "client blocking AST callback handler");
-
- lock_res_and_lock(lock);
+ struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
- /* set bits to cancel for this lock for possible lock convert */
- if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) {
+ check_res_locked(lock->l_resource);
+ if (ns_is_client(ns) && ld &&
+ (lock->l_resource->lr_type == LDLM_IBITS)) {
/*
* Lock description contains policy of blocking lock,
* and its cancel_bits is used to pass conflicting bits.
* never use cancel bits from different resource, full cancel
* is to be used.
*/
- if (ld && ld->l_policy_data.l_inodebits.bits &&
+ if (ld->l_policy_data.l_inodebits.cancel_bits &&
ldlm_res_eq(&ld->l_resource.lr_name,
- &lock->l_resource->lr_name))
- lock->l_policy_data.l_inodebits.cancel_bits =
+ &lock->l_resource->lr_name) &&
+ !(ldlm_is_cbpending(lock) &&
+ lock->l_policy_data.l_inodebits.cancel_bits == 0)) {
+ /* always combine conflicting ibits */
+ lock->l_policy_data.l_inodebits.cancel_bits |=
ld->l_policy_data.l_inodebits.cancel_bits;
- /*
- * if there is no valid ld and lock is cbpending already
- * then cancel_bits should be kept, otherwise it is zeroed.
- */
- else if (!ldlm_is_cbpending(lock))
+ } else {
+ /* If cancel_bits are not obtained or
+ * if the lock is already CBPENDING and
+ * has no cancel_bits set
+ * - the full lock is to be cancelled
+ */
lock->l_policy_data.l_inodebits.cancel_bits = 0;
+ }
}
+}
+
+/**
+ * Callback handler for receiving incoming blocking ASTs.
+ *
+ * This can only happen on client side.
+ */
+void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+{
+ int do_ast;
+
+ ENTRY;
+
+ LDLM_DEBUG(lock, "client blocking AST callback handler");
+
+ lock_res_and_lock(lock);
+
+ /* get extra information from desc if any */
+ ldlm_bl_desc2lock(ld, lock);
ldlm_set_cbpending(lock);
do_ast = (!lock->l_readers && !lock->l_writers);
EXIT;
}
+static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
+{
+ if (req->rq_no_reply)
+ return 0;
+
+ req->rq_status = rc;
+ if (!req->rq_packed_final) {
+ rc = lustre_pack_reply(req, 1, NULL, NULL);
+ if (rc)
+ return rc;
+ }
+ return ptlrpc_reply(req);
+}
+
/**
* Callback handler for receiving incoming completion ASTs.
*
* This only can happen on client side.
*/
-static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
- struct ldlm_namespace *ns,
- struct ldlm_request *dlm_req,
- struct ldlm_lock *lock)
+static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct ldlm_request *dlm_req,
+ struct ldlm_lock *lock)
{
- struct list_head ast_list;
+ LIST_HEAD(ast_list);
int lvb_len;
int rc = 0;
LDLM_DEBUG(lock, "client completion callback handler START");
- INIT_LIST_HEAD(&ast_list);
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
long to = cfs_time_seconds(1);
+ ldlm_callback_reply(req, 0);
+
while (to > 0) {
- set_current_state(TASK_INTERRUPTIBLE);
- schedule_timeout(to);
+ schedule_timeout_interruptible(to);
if (ldlm_is_granted(lock) ||
ldlm_is_destroyed(lock))
break;
lock_res_and_lock(lock);
}
+ if (ldlm_is_failed(lock)) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ RETURN(-EINVAL);
+ }
+
if (ldlm_is_destroyed(lock) ||
ldlm_is_granted(lock)) {
/* b=11300: the lock has already been granted */
* Let ldlm_cancel_lru() be fast.
*/
ldlm_lock_remove_from_lru(lock);
+ ldlm_bl_desc2lock(&dlm_req->lock_desc, lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
LDLM_DEBUG(lock, "completion AST includes blocking AST");
}
wake_up(&lock->l_waitq);
}
LDLM_LOCK_RELEASE(lock);
+
+ return 0;
}
/**
struct ldlm_request *dlm_req,
struct ldlm_lock *lock)
{
+ struct ldlm_lock_desc *ld = &dlm_req->lock_desc;
int rc = -ENOSYS;
ENTRY;
if (lock->l_granted_mode == LCK_PW &&
!lock->l_readers && !lock->l_writers &&
ktime_after(ktime_get(),
- ktime_add(lock->l_last_used,
- ktime_set(ns->ns_dirty_age_limit, 0)))) {
+ ktime_add(lock->l_last_used, ns->ns_dirty_age_limit))) {
unlock_res_and_lock(lock);
- if (ldlm_bl_to_thread_lock(ns, NULL, lock))
- ldlm_handle_bl_callback(ns, NULL, lock);
+
+ /* For MDS glimpse it is always DOM lock, set corresponding
+ * cancel_bits to perform lock convert if needed
+ */
+ if (lock->l_resource->lr_type == LDLM_IBITS)
+ ld->l_policy_data.l_inodebits.cancel_bits =
+ MDS_INODELOCK_DOM;
+ if (ldlm_bl_to_thread_lock(ns, ld, lock))
+ ldlm_handle_bl_callback(ns, ld, lock);
EXIT;
return;
EXIT;
}
-static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
-{
- if (req->rq_no_reply)
- return 0;
-
- req->rq_status = rc;
- if (!req->rq_packed_final) {
- rc = lustre_pack_reply(req, 1, NULL, NULL);
- if (rc)
- return rc;
- }
- return ptlrpc_reply(req);
-}
-
static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
enum ldlm_cancel_flags cancel_flags)
{
init_completion(&blwi->blwi_comp);
INIT_LIST_HEAD(&blwi->blwi_head);
- if (memory_pressure_get())
+ if (current->flags & PF_MEMALLOC)
blwi->blwi_mem_pressure = 1;
blwi->blwi_ns = ns;
if (ld != NULL)
blwi->blwi_ld = *ld;
if (count) {
- list_add(&blwi->blwi_head, cancels);
- list_del_init(cancels);
+ list_splice_init(cancels, &blwi->blwi_head);
blwi->blwi_count = count;
} else {
blwi->blwi_lock = lock;
ENTRY;
- DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
+ DEBUG_REQ(D_HSM, req, "%s: handle setinfo", obd->obd_name);
req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
KEY_HSM_COPYTOOL_SEND,
vallen, val, NULL);
else
- DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
+ DEBUG_REQ(D_WARNING, req, "ignoring unknown key '%s'", key);
return rc;
}
const struct lustre_handle *handle)
{
DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
- "%s: [nid %s] [rc %d] [lock %#llx]",
- msg, libcfs_id2str(req->rq_peer), rc,
- handle ? handle->cookie : 0);
+ "%s, NID=%s lock=%#llx: rc = %d",
+ msg, libcfs_id2str(req->rq_peer),
+ handle ? handle->cookie : 0, rc);
if (req->rq_no_reply)
CWARN("No reply was sent, maybe cause b=21636.\n");
else if (rc)
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "completion ast\n");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
- ldlm_callback_reply(req, 0);
- ldlm_handle_cp_callback(req, ns, dlm_req, lock);
+ rc = ldlm_handle_cp_callback(req, ns, dlm_req, lock);
+ if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
+ ldlm_callback_reply(req, rc);
break;
case LDLM_GL_CALLBACK:
CDEBUG(D_INODE, "glimpse ast\n");
if (lustre_handle_equal(&dlm_req->lock_handle[i],
&lockh)) {
DEBUG_REQ(D_RPCTRACE, req,
- "Prio raised by lock %#llx.", lockh.cookie);
+ "Prio raised by lock %#llx", lockh.cookie);
rc = 1;
break;
}
struct ldlm_request *dlm_req;
int rc = 0;
int i;
+ unsigned int size;
ENTRY;
if (dlm_req == NULL)
RETURN(-EFAULT);
+ size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT);
+ if (size <= offsetof(struct ldlm_request, lock_handle) ||
+ (size - offsetof(struct ldlm_request, lock_handle)) /
+ sizeof(struct lustre_handle) < dlm_req->lock_count)
+ RETURN(-EPROTO);
+
for (i = 0; i < dlm_req->lock_count; i++) {
struct ldlm_lock *lock;
void ldlm_revoke_export_locks(struct obd_export *exp)
{
- struct list_head rpc_list;
+ LIST_HEAD(rpc_list);
ENTRY;
- INIT_LIST_HEAD(&rpc_list);
cfs_hash_for_each_nolock(exp->exp_lock_hash,
ldlm_revoke_lock_cb, &rpc_list, 0);
ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
struct ldlm_bl_work_item *blwi)
{
+ /* '1' for consistency with code that checks !mpflag to restore */
+ unsigned int mpflags = 1;
+
ENTRY;
if (blwi->blwi_ns == NULL)
RETURN(LDLM_ITER_STOP);
if (blwi->blwi_mem_pressure)
- memory_pressure_set();
+ mpflags = memalloc_noreclaim_save();
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
blwi->blwi_lock);
}
if (blwi->blwi_mem_pressure)
- memory_pressure_clr();
+ memalloc_noreclaim_restore(mpflags);
if (blwi->blwi_flags & LCF_ASYNC)
OBD_FREE(blwi, sizeof(*blwi));
/* cannot use bltd after this, it is only on caller's stack */
while (1) {
- struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
struct obd_export *exp = NULL;
int rc;
rc = ldlm_bl_get_work(blp, &blwi, &exp);
if (rc == 0)
- l_wait_event_exclusive(blp->blp_waitq,
- ldlm_bl_get_work(blp, &blwi,
- &exp),
- &lwi);
+ wait_event_idle_exclusive(blp->blp_waitq,
+ ldlm_bl_get_work(blp, &blwi,
+ &exp));
atomic_inc(&blp->blp_busy_threads);
if (ldlm_bl_thread_need_create(blp, blwi))
goto out_interval;
#ifdef HAVE_SERVER_SUPPORT
+ ldlm_inodebits_slab = kmem_cache_create("ldlm_ibits_node",
+ sizeof(struct ldlm_ibits_node),
+ 0, SLAB_HWCACHE_ALIGN, NULL);
+ if (ldlm_inodebits_slab == NULL)
+ goto out_interval_tree;
+
ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
sizeof(struct ldlm_glimpse_work),
0, 0, NULL);
if (ldlm_glimpse_work_kmem == NULL)
- goto out_interval_tree;
+ goto out_inodebits;
#endif
#if LUSTRE_TRACKS_LOCK_EXP_REFS
#endif
return 0;
#ifdef HAVE_SERVER_SUPPORT
+out_inodebits:
+ kmem_cache_destroy(ldlm_inodebits_slab);
out_interval_tree:
kmem_cache_destroy(ldlm_interval_tree_slab);
#endif
kmem_cache_destroy(ldlm_interval_slab);
kmem_cache_destroy(ldlm_interval_tree_slab);
#ifdef HAVE_SERVER_SUPPORT
+ kmem_cache_destroy(ldlm_inodebits_slab);
kmem_cache_destroy(ldlm_glimpse_work_kmem);
#endif
}