* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
#define DEBUG_SUBSYSTEM S_LDLM
-#ifndef __KERNEL__
-#include <signal.h>
-#include <liblustre.h>
-#endif
#include <lustre_dlm.h>
#include <obd_class.h>
#include "ldlm_internal.h"
-int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
-CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
+unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
+CFS_MODULE_PARM(ldlm_enqueue_min, "i", uint, 0644,
"lock enqueue timeout minimum");
/* in client side, whether the cached locks will be canceled before replay */
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_expired_completion_wait);
+
+/**
+ * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
+ * lock cancel, and their replies). Used for lock completion timeout on the
+ * client side.
+ *
+ * \param[in] lock lock which is waiting the completion callback
+ *
+ * \retval timeout in seconds to wait for the server reply
+ */
/* We use the same basis for both server side and client side functions
from a single node. */
-int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+static unsigned int ldlm_cp_timeout(struct ldlm_lock *lock)
{
- int timeout = at_get(ldlm_lock_to_ns_at(lock));
- if (AT_OFF)
- return obd_timeout / 2;
- /* Since these are non-updating timeouts, we should be conservative.
- It would be nice to have some kind of "early reply" mechanism for
- lock callbacks too... */
- timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
- return max(timeout, ldlm_enqueue_min);
+ unsigned int timeout;
+
+ if (AT_OFF)
+ return obd_timeout;
+
+ /* Wait a long time for enqueue - server may have to callback a
+ * lock from another client. Server will evict the other client if it
+ * doesn't respond reasonably, and then give us the lock. */
+ timeout = at_get(ldlm_lock_to_ns_at(lock));
+ return max(3 * timeout, ldlm_enqueue_min);
}
-EXPORT_SYMBOL(ldlm_get_enq_timeout);
/**
* Helper function for ldlm_completion_ast(), updating timings when lock is
* actually granted.
*/
-static int ldlm_completion_tail(struct ldlm_lock *lock)
+static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
{
long delay;
- int result;
+ int result = 0;
if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
LDLM_DEBUG(lock, "client-side enqueue: destroyed");
result = -EIO;
+ } else if (data == NULL) {
+ LDLM_DEBUG(lock, "client-side enqueue: granted");
} else {
+ /* Take into AT only CP RPC, not immediately granted locks */
delay = cfs_time_sub(cfs_time_current_sec(),
lock->l_last_activity);
LDLM_DEBUG(lock, "client-side enqueue: granted after "
CFS_DURATION_T"s", delay);
/* Update our time estimate */
- at_measured(ldlm_lock_to_ns_at(lock),
- delay);
- result = 0;
+ at_measured(ldlm_lock_to_ns_at(lock), delay);
}
return result;
}
RETURN(0);
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV))) {
+ if (!(flags & LDLM_FL_BLOCKED_MASK)) {
wake_up(&lock->l_waitq);
- RETURN(ldlm_completion_tail(lock));
+ RETURN(ldlm_completion_tail(lock, data));
}
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
goto noreproc;
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV))) {
+ if (!(flags & LDLM_FL_BLOCKED_MASK)) {
wake_up(&lock->l_waitq);
RETURN(0);
}
imp = obd->u.cli.cl_import;
}
- /* Wait a long time for enqueue - server may have to callback a
- lock from another client. Server will evict the other client if it
- doesn't respond reasonably, and then give us the lock. */
- timeout = ldlm_get_enq_timeout(lock) * 2;
+ timeout = ldlm_cp_timeout(lock);
- lwd.lwd_lock = lock;
+ lwd.lwd_lock = lock;
+ lock->l_last_activity = cfs_time_current_sec();
if (ldlm_is_no_timeout(lock)) {
LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
rc);
RETURN(rc);
- }
+ }
- RETURN(ldlm_completion_tail(lock));
+ RETURN(ldlm_completion_tail(lock, data));
}
EXPORT_SYMBOL(ldlm_completion_ast);
EXPORT_SYMBOL(ldlm_blocking_ast);
/**
- * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
- * comment in filter_intent_policy() on why you may need this.
+ * Implements ldlm_lock::l_glimpse_ast for extent locks acquired on the server.
+ *
+ * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for that is
+ * rather subtle: with OST-side locking, it may so happen that _all_ extent
+ * locks are held by the OST. If client wants to obtain the current file size
+ * it calls ll_glimpse_size(), and (as all locks are held only on the server),
+ * this dummy glimpse callback fires and does nothing. The client still
+ * receives the correct file size due to the following fragment of code in
+ * ldlm_cb_interpret():
+ *
+ * if (rc == -ELDLM_NO_LOCK_DATA) {
+ * LDLM_DEBUG(lock, "lost race - client has a lock but no"
+ * "inode");
+ * ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+ * }
+ *
+ * That is, after the glimpse returns this error, ofd_lvbo_update() is called
+ * and returns the updated file attributes from the inode to the client.
+ *
+ * See also comment in ofd_intent_policy() on why servers must set a non-NULL
+ * l_glimpse_ast when grabbing DLM locks. Otherwise, the server will assume
+ * that the object is in the process of being destroyed.
+ *
+ * \param[in] lock DLM lock being glimpsed, unused
+ * \param[in] reqp pointer to ptlrpc_request, unused
+ *
+ * \retval -ELDLM_NO_LOCK_DATA to get attributes from disk object
*/
int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
{
- /*
- * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
- * that is rather subtle: with OST-side locking, it may so happen that
- * _all_ extent locks are held by the OST. If client wants to obtain
- * current file size it calls ll{,u}_glimpse_size(), and (as locks are
- * on the server), dummy glimpse callback fires and does
- * nothing. Client still receives correct file size due to the
- * following fragment in filter_intent_policy():
- *
- * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
- * if (rc != 0 && res->lr_namespace->ns_lvbo &&
- * res->lr_namespace->ns_lvbo->lvbo_update) {
- * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
- * }
- *
- * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
- * returns correct file size to the client.
- */
return -ELDLM_NO_LOCK_DATA;
}
-EXPORT_SYMBOL(ldlm_glimpse_ast);
/**
* Enqueue a local lock (typically on a server).
lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len,
lvb_type);
- if (unlikely(!lock))
- GOTO(out_nolock, err = -ENOMEM);
+ if (IS_ERR(lock))
+ GOTO(out_nolock, err = PTR_ERR(lock));
+
+ err = ldlm_lvbo_init(lock->l_resource);
+ if (err < 0) {
+ LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
+ GOTO(out, err);
+ }
ldlm_lock2handle(lock, lockh);
else
LDLM_DEBUG(lock, "lock was granted or failed in race");
- ldlm_lock_decref_internal(lock, mode);
-
- /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
- * from llite/file.c/ll_file_flock(). */
- /* This code makes for the fact that we do not have blocking handler on
- * a client for flock locks. As such this is the place where we must
- * completely kill failed locks. (interrupted and those that
- * were waiting to be granted when server evicted us. */
- if (lock->l_resource->lr_type == LDLM_FLOCK) {
- lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- ldlm_lock_destroy_nolock(lock);
- unlock_res_and_lock(lock);
- }
+ /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
+ * from llite/file.c/ll_file_flock(). */
+ /* This code makes for the fact that we do not have blocking handler on
+ * a client for flock locks. As such this is the place where we must
+ * completely kill failed locks. (interrupted and those that
+ * were waiting to be granted when server evicted us. */
+ if (lock->l_resource->lr_type == LDLM_FLOCK) {
+ lock_res_and_lock(lock);
+ if (!ldlm_is_destroyed(lock)) {
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_decref_internal_nolock(lock, mode);
+ ldlm_lock_destroy_nolock(lock);
+ }
+ unlock_res_and_lock(lock);
+ } else {
+ ldlm_lock_decref_internal(lock, mode);
+ }
}
/**
struct ldlm_lock *lock;
struct ldlm_reply *reply;
int cleanup_phase = 1;
- int size = 0;
ENTRY;
lock = ldlm_handle2lock(lockh);
if (reply == NULL)
GOTO(cleanup, rc = -EPROTO);
- if (lvb_len != 0) {
- LASSERT(lvb != NULL);
+ if (lvb_len > 0) {
+ int size = 0;
size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER);
lvb_len, size);
GOTO(cleanup, rc = -EINVAL);
}
+ lvb_len = size;
}
if (rc == ELDLM_LOCK_ABORTED) {
- if (lvb_len != 0)
+ if (lvb_len > 0 && lvb != NULL)
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
- lvb, size);
- GOTO(cleanup, rc = (rc != 0 ? rc : ELDLM_LOCK_ABORTED));
+ lvb, lvb_len);
+ GOTO(cleanup, rc = rc ? : ELDLM_LOCK_ABORTED);
}
/* lock enqueued on the server */
*flags = ldlm_flags_from_wire(reply->lock_flags);
lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
LDLM_FL_INHERIT_MASK);
- /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
- * to wait with no timeout as well */
- lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
- LDLM_FL_NO_TIMEOUT);
unlock_res_and_lock(lock);
- CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%llx\n",
- lock, reply->lock_handle.cookie, *flags);
-
- /* If enqueue returned a blocked lock but the completion handler has
- * already run, then it fixed up the resource and we don't need to do it
- * again. */
- if ((*flags) & LDLM_FL_LOCK_CHANGED) {
- int newmode = reply->lock_desc.l_req_mode;
- LASSERT(!is_replay);
- if (newmode && newmode != lock->l_req_mode) {
- LDLM_DEBUG(lock, "server returned different mode %s",
- ldlm_lockname[newmode]);
- lock->l_req_mode = newmode;
- }
+ CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: "LPX64"\n",
+ lock, reply->lock_handle.cookie, *flags);
+
+ /* If enqueue returned a blocked lock but the completion handler has
+ * already run, then it fixed up the resource and we don't need to do it
+ * again. */
+ if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+ int newmode = reply->lock_desc.l_req_mode;
+ LASSERT(!is_replay);
+ if (newmode && newmode != lock->l_req_mode) {
+ LDLM_DEBUG(lock, "server returned different mode %s",
+ ldlm_lockname[newmode]);
+ lock->l_req_mode = newmode;
+ }
if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name,
&lock->l_resource->lr_name)) {
LDLM_DEBUG(lock,"client-side enqueue, new policy data");
}
- if ((*flags) & LDLM_FL_AST_SENT ||
- /* Cancel extent locks as soon as possible on a liblustre client,
- * because it cannot handle asynchronous ASTs robustly (see
- * bug 7311). */
- (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
+ if ((*flags) & LDLM_FL_AST_SENT) {
lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
unlock_res_and_lock(lock);
/* If the lock has already been granted by a completion AST, don't
* clobber the LVB with an older one. */
- if (lvb_len != 0) {
+ if (lvb_len > 0) {
/* We must lock or a racing completion might update lvb without
* letting us know and we'll clobber the correct value.
* Cannot unlock after the check either, a that still leaves
lock_res_and_lock(lock);
if (lock->l_req_mode != lock->l_granted_mode)
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
- lock->l_lvb_data, size);
+ lock->l_lvb_data, lvb_len);
unlock_res_and_lock(lock);
if (rc < 0) {
cleanup_phase = 1;
}
}
- if (lvb_len && lvb != NULL) {
- /* Copy the LVB here, and not earlier, because the completion
- * AST (if any) can override what we got in the reply */
- memcpy(lvb, lock->l_lvb_data, lvb_len);
- }
+ if (lvb_len > 0 && lvb != NULL) {
+ /* Copy the LVB here, and not earlier, because the completion
+ * AST (if any) can override what we got in the reply */
+ memcpy(lvb, lock->l_lvb_data, lvb_len);
+ }
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
enum req_location loc,
int off)
{
- int size = req_capsule_msg_size(pill, loc);
- return ldlm_req_handles_avail(size, off);
+ __u32 size = req_capsule_msg_size(pill, loc);
+ return ldlm_req_handles_avail(size, off);
}
static inline int ldlm_format_handles_avail(struct obd_import *imp,
const struct req_format *fmt,
enum req_location loc, int off)
{
- int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
- return ldlm_req_handles_avail(size, off);
+ __u32 size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
+ return ldlm_req_handles_avail(size, off);
}
/**
* that needs to be performed.
*/
int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
- int version, int opc, int canceloff,
- cfs_list_t *cancels, int count)
-{
- struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
- struct req_capsule *pill = &req->rq_pill;
- struct ldlm_request *dlm = NULL;
- int flags, avail, to_free, pack = 0;
- CFS_LIST_HEAD(head);
- int rc;
- ENTRY;
+ int version, int opc, int canceloff,
+ struct list_head *cancels, int count)
+ {
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct req_capsule *pill = &req->rq_pill;
+ struct ldlm_request *dlm = NULL;
+ struct list_head head = LIST_HEAD_INIT(head);
+ int flags, avail, to_free, pack = 0;
+ int rc;
+ ENTRY;
- if (cancels == NULL)
- cancels = &head;
+ if (cancels == NULL)
+ cancels = &head;
if (ns_connect_cancelset(ns)) {
/* Estimate the amount of available space in the request. */
req_capsule_filled_sizes(pill, RCL_CLIENT);
dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
LASSERT(dlm);
/* Skip first lock handler in ldlm_request_pack(),
- * this method will incrment @lock_count according
+ * this method will increment @lock_count according
* to the lock handle amount actually written to
* the buffer. */
dlm->lock_count = canceloff;
EXPORT_SYMBOL(ldlm_prep_elc_req);
int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
- cfs_list_t *cancels, int count)
+ struct list_head *cancels, int count)
{
return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
einfo->ei_mode, &cbs, einfo->ei_cbdata,
lvb_len, lvb_type);
- if (lock == NULL)
- RETURN(-ENOMEM);
+ if (IS_ERR(lock))
+ RETURN(PTR_ERR(lock));
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, einfo->ei_mode);
ldlm_lock2handle(lock, lockh);
- if (policy != NULL) {
- /* INODEBITS_INTEROP: If the server does not support
- * inodebits, we will request a plain lock in the
- * descriptor (ldlm_lock2desc() below) but use an
- * inodebits lock internally with both bits set.
- */
- if (einfo->ei_type == LDLM_IBITS &&
- !(exp_connect_flags(exp) &
- OBD_CONNECT_IBITS))
- lock->l_policy_data.l_inodebits.bits =
- MDS_INODELOCK_LOOKUP |
- MDS_INODELOCK_UPDATE;
- else
- lock->l_policy_data = *policy;
- }
+ if (policy != NULL)
+ lock->l_policy_data = *policy;
if (einfo->ei_type == LDLM_EXTENT) {
/* extent lock without policy is a bug */
lock->l_req_extent = policy->l_extent;
}
- LDLM_DEBUG(lock, "client-side enqueue START, flags %llx\n",
+ LDLM_DEBUG(lock, "client-side enqueue START, flags "LPX64"\n",
*flags);
- }
+ }
lock->l_conn_export = exp;
lock->l_export = NULL;
lock->l_blocking_ast = einfo->ei_cb_bl;
lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
+ lock->l_last_activity = cfs_time_current_sec();
/* lock not sent to server yet */
ptlrpc_request_set_replen(req);
}
- /*
- * Liblustre client doesn't get extent locks, except for O_APPEND case
- * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
- * [i_size, OBD_OBJECT_EOF] lock is taken.
- */
- LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
- policy->l_extent.end == OBD_OBJECT_EOF));
-
if (async) {
LASSERT(reqp != NULL);
RETURN(0);
ptlrpc_req_finished(req);
return rc;
}
-EXPORT_SYMBOL(ldlm_cli_convert);
/**
* Cancel locks locally.
* Pack \a count locks in \a head into ldlm_request buffer of request \a req.
*/
static void ldlm_cancel_pack(struct ptlrpc_request *req,
- cfs_list_t *head, int count)
+ struct list_head *head, int count)
{
struct ldlm_request *dlm;
struct ldlm_lock *lock;
/* XXX: it would be better to pack lock handles grouped by resource.
* so that the server cancel would call filter_lvbo_update() less
* frequently. */
- cfs_list_for_each_entry(lock, head, l_bl_ast) {
+ list_for_each_entry(lock, head, l_bl_ast) {
if (!count--)
break;
LASSERT(lock->l_conn_export);
/**
* Prepare and send a batched cancel RPC. It will include \a count lock
* handles of locks given in \a cancels list. */
-int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels,
+int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
int count, ldlm_cancel_flags_t flags)
{
struct ptlrpc_request *req = NULL;
out:
return sent ? sent : rc;
}
-EXPORT_SYMBOL(ldlm_cli_cancel_req);
static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
{
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_cli_update_pool);
/**
* Client side lock cancel.
int ldlm_cli_cancel(struct lustre_handle *lockh,
ldlm_cancel_flags_t cancel_flags)
{
- struct obd_export *exp;
+ struct obd_export *exp;
int avail, flags, count = 1;
__u64 rc = 0;
- struct ldlm_namespace *ns;
- struct ldlm_lock *lock;
- CFS_LIST_HEAD(cancels);
- ENTRY;
+ struct ldlm_namespace *ns;
+ struct ldlm_lock *lock;
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
+ ENTRY;
/* concurrent cancels on the same handle can happen */
lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
- if (lock == NULL) {
- LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
- RETURN(0);
- }
+ if (lock == NULL) {
+ LDLM_DEBUG_NOLOCK("lock is already being destroyed");
+ RETURN(0);
+ }
- rc = ldlm_cli_cancel_local(lock);
+ rc = ldlm_cli_cancel_local(lock);
if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
LDLM_LOCK_RELEASE(lock);
RETURN(0);
/* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
* RPC which goes to canceld portal, so we can cancel other LRU locks
* here and send them all as one LDLM_CANCEL RPC. */
- LASSERT(cfs_list_empty(&lock->l_bl_ast));
- cfs_list_add(&lock->l_bl_ast, &cancels);
+ LASSERT(list_empty(&lock->l_bl_ast));
+ list_add(&lock->l_bl_ast, &cancels);
exp = lock->l_conn_export;
if (exp_connect_cancelset(exp)) {
* Locally cancel up to \a count locks in list \a cancels.
* Return the number of cancelled locks.
*/
-int ldlm_cli_cancel_list_local(cfs_list_t *cancels, int count,
- ldlm_cancel_flags_t flags)
+int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
+ ldlm_cancel_flags_t flags)
{
- CFS_LIST_HEAD(head);
- struct ldlm_lock *lock, *next;
+ struct list_head head = LIST_HEAD_INIT(head);
+ struct ldlm_lock *lock, *next;
int left = 0, bl_ast = 0;
__u64 rc;
left = count;
- cfs_list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+ list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
if (left-- == 0)
break;
* the one being generated now. */
if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
LDLM_DEBUG(lock, "Cancel lock separately");
- cfs_list_del_init(&lock->l_bl_ast);
- cfs_list_add(&lock->l_bl_ast, &head);
+ list_del_init(&lock->l_bl_ast);
+ list_add(&lock->l_bl_ast, &head);
bl_ast++;
continue;
}
if (rc == LDLM_FL_LOCAL_ONLY) {
/* CANCEL RPC should not be sent to server. */
- cfs_list_del_init(&lock->l_bl_ast);
+ list_del_init(&lock->l_bl_ast);
LDLM_LOCK_RELEASE(lock);
count--;
}
RETURN(count);
}
-EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
/**
* Cancel as many locks as possible w/o sending any RPCs (e.g. to write back
if (slv == 0 || lv < slv)
return LDLM_POLICY_KEEP_LOCK;
- if (ns->ns_cancel != NULL && ns->ns_cancel(lock) == 0)
- return LDLM_POLICY_KEEP_LOCK;
-
return LDLM_POLICY_CANCEL_LOCK;
}
int unused, int added,
int count)
{
- if (added >= count)
- return LDLM_POLICY_KEEP_LOCK;
-
- if (cfs_time_before(cfs_time_current(),
+ if ((added >= count) &&
+ cfs_time_before(cfs_time_current(),
cfs_time_add(lock->l_last_used, ns->ns_max_age)))
- return LDLM_POLICY_KEEP_LOCK;
-
- if (ns->ns_cancel != NULL && ns->ns_cancel(lock) == 0)
- return LDLM_POLICY_KEEP_LOCK;
+ return LDLM_POLICY_KEEP_LOCK;
return LDLM_POLICY_CANCEL_LOCK;
}
* sending any RPCs or waiting for any
* outstanding RPC to complete.
*/
-static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
- int count, int max, int flags)
+static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
+ struct list_head *cancels, int count, int max,
+ int flags)
{
ldlm_cancel_lru_policy_t pf;
struct ldlm_lock *lock, *next;
pf = ldlm_cancel_lru_policy(ns, flags);
LASSERT(pf != NULL);
- while (!cfs_list_empty(&ns->ns_unused_list)) {
+ while (!list_empty(&ns->ns_unused_list)) {
ldlm_policy_res_t result;
/* all unused locks */
if (max && added >= max)
break;
- cfs_list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
+ list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
l_lru) {
/* No locks which got blocking requests. */
LASSERT(!ldlm_is_bl_ast(lock));
* and can't use l_pending_chain as it is used both on
* server and client nevertheless bug 5666 says it is
* used only on server */
- LASSERT(cfs_list_empty(&lock->l_bl_ast));
- cfs_list_add(&lock->l_bl_ast, cancels);
+ LASSERT(list_empty(&lock->l_bl_ast));
+ list_add(&lock->l_bl_ast, cancels);
unlock_res_and_lock(lock);
lu_ref_del(&lock->l_reference, __FUNCTION__, current);
spin_lock(&ns->ns_lock);
RETURN(added);
}
-int ldlm_cancel_lru_local(struct ldlm_namespace *ns, cfs_list_t *cancels,
+int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
int count, int max, ldlm_cancel_flags_t cancel_flags,
int flags)
{
ldlm_cancel_flags_t cancel_flags,
int flags)
{
- CFS_LIST_HEAD(cancels);
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
int count, rc;
ENTRY;
-#ifndef __KERNEL__
- cancel_flags &= ~LCF_ASYNC; /* force to be sync in user space */
-#endif
/* Just prepare the list of locks, do not actually cancel them yet.
* Locks are cancelled later in a separate thread. */
count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
* list.
*/
int ldlm_cancel_resource_local(struct ldlm_resource *res,
- cfs_list_t *cancels,
+ struct list_head *cancels,
ldlm_policy_data_t *policy,
ldlm_mode_t mode, __u64 lock_flags,
ldlm_cancel_flags_t cancel_flags, void *opaque)
ENTRY;
lock_res(res);
- cfs_list_for_each_entry(lock, &res->lr_granted, l_res_link) {
+ list_for_each_entry(lock, &res->lr_granted, l_res_link) {
if (opaque != NULL && lock->l_ast_data != opaque) {
LDLM_ERROR(lock, "data %p doesn't match opaque %p",
lock->l_ast_data, opaque);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
lock_flags;
- LASSERT(cfs_list_empty(&lock->l_bl_ast));
- cfs_list_add(&lock->l_bl_ast, cancels);
+ LASSERT(list_empty(&lock->l_bl_ast));
+ list_add(&lock->l_bl_ast, cancels);
LDLM_LOCK_GET(lock);
count++;
}
* buffer at the offset \a off.
* Destroy \a cancels at the end.
*/
-int ldlm_cli_cancel_list(cfs_list_t *cancels, int count,
+int ldlm_cli_cancel_list(struct list_head *cancels, int count,
struct ptlrpc_request *req, ldlm_cancel_flags_t flags)
{
struct ldlm_lock *lock;
int res = 0;
ENTRY;
- if (cfs_list_empty(cancels) || count == 0)
+ if (list_empty(cancels) || count == 0)
RETURN(0);
/* XXX: requests (both batched and not) could be sent in parallel.
* It would also speed up the case when the server does not support
* the feature. */
while (count > 0) {
- LASSERT(!cfs_list_empty(cancels));
- lock = cfs_list_entry(cancels->next, struct ldlm_lock,
+ LASSERT(!list_empty(cancels));
+ lock = list_entry(cancels->next, struct ldlm_lock,
l_bl_ast);
LASSERT(lock->l_conn_export);
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server. */
int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
- const struct ldlm_res_id *res_id,
- ldlm_policy_data_t *policy,
- ldlm_mode_t mode,
- ldlm_cancel_flags_t flags,
- void *opaque)
+ const struct ldlm_res_id *res_id,
+ ldlm_policy_data_t *policy,
+ ldlm_mode_t mode,
+ ldlm_cancel_flags_t flags,
+ void *opaque)
{
- struct ldlm_resource *res;
- CFS_LIST_HEAD(cancels);
- int count;
- int rc;
- ENTRY;
+ struct ldlm_resource *res;
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
+ int count;
+ int rc;
+ ENTRY;
- res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
- if (res == NULL) {
- /* This is not a problem. */
- CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
- RETURN(0);
- }
+ res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+ if (IS_ERR(res)) {
+ /* This is not a problem. */
+ CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
+ RETURN(0);
+ }
- LDLM_RESOURCE_ADDREF(res);
- count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
- 0, flags | LCF_BL_AST, opaque);
- rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
- if (rc != ELDLM_OK)
+ LDLM_RESOURCE_ADDREF(res);
+ count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
+ 0, flags | LCF_BL_AST, opaque);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
+ if (rc != ELDLM_OK)
CERROR("canceling unused lock "DLDLMRES": rc = %d\n",
PLDLMRES(res), rc);
- LDLM_RESOURCE_DELREF(res);
- ldlm_resource_putref(res);
- RETURN(0);
+ LDLM_RESOURCE_DELREF(res);
+ ldlm_resource_putref(res);
+ RETURN(0);
}
EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
};
static int ldlm_cli_hash_cancel_unused(cfs_hash_t *hs, cfs_hash_bd_t *bd,
- cfs_hlist_node_t *hnode, void *arg)
+ struct hlist_node *hnode, void *arg)
{
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
struct ldlm_cli_cancel_arg *lc = arg;
RETURN(ELDLM_OK);
}
}
-EXPORT_SYMBOL(ldlm_cli_cancel_unused);
/* Lock iterators. */
int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
void *closure)
{
- cfs_list_t *tmp, *next;
+ struct list_head *tmp, *next;
struct ldlm_lock *lock;
int rc = LDLM_ITER_CONTINUE;
RETURN(LDLM_ITER_CONTINUE);
lock_res(res);
- cfs_list_for_each_safe(tmp, next, &res->lr_granted) {
- lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
+ list_for_each_safe(tmp, next, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (iter(lock, closure) == LDLM_ITER_STOP)
GOTO(out, rc = LDLM_ITER_STOP);
}
- cfs_list_for_each_safe(tmp, next, &res->lr_converting) {
- lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
+ list_for_each_safe(tmp, next, &res->lr_converting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (iter(lock, closure) == LDLM_ITER_STOP)
GOTO(out, rc = LDLM_ITER_STOP);
}
- cfs_list_for_each_safe(tmp, next, &res->lr_waiting) {
- lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
+ list_for_each_safe(tmp, next, &res->lr_waiting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (iter(lock, closure) == LDLM_ITER_STOP)
GOTO(out, rc = LDLM_ITER_STOP);
unlock_res(res);
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_resource_foreach);
struct iter_helper_data {
ldlm_iterator_t iter;
}
static int ldlm_res_iter_helper(cfs_hash_t *hs, cfs_hash_bd_t *bd,
- cfs_hlist_node_t *hnode, void *arg)
+ struct hlist_node *hnode, void *arg)
{
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
ldlm_iterator_t iter, void *closure)
{
- struct iter_helper_data helper = { iter: iter, closure: closure };
+ struct iter_helper_data helper = { .iter = iter, .closure = closure };
cfs_hash_for_each_nolock(ns->ns_rs_hash,
ldlm_res_iter_helper, &helper);
}
-EXPORT_SYMBOL(ldlm_namespace_foreach);
/* non-blocking function to manipulate a lock whose cb_data is being put away.
* return 0: find no resource
* < 0: errors
*/
int ldlm_resource_iterate(struct ldlm_namespace *ns,
- const struct ldlm_res_id *res_id,
- ldlm_iterator_t iter, void *data)
+ const struct ldlm_res_id *res_id,
+ ldlm_iterator_t iter, void *data)
{
- struct ldlm_resource *res;
- int rc;
- ENTRY;
+ struct ldlm_resource *res;
+ int rc;
+ ENTRY;
- if (ns == NULL) {
- CERROR("must pass in namespace\n");
- LBUG();
- }
+ LASSERTF(ns != NULL, "must pass in namespace\n");
- res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
- if (res == NULL)
- RETURN(0);
+ res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+ if (IS_ERR(res))
+ RETURN(0);
- LDLM_RESOURCE_ADDREF(res);
- rc = ldlm_resource_foreach(res, iter, data);
- LDLM_RESOURCE_DELREF(res);
- ldlm_resource_putref(res);
- RETURN(rc);
+ LDLM_RESOURCE_ADDREF(res);
+ rc = ldlm_resource_foreach(res, iter, data);
+ LDLM_RESOURCE_DELREF(res);
+ ldlm_resource_putref(res);
+ RETURN(rc);
}
EXPORT_SYMBOL(ldlm_resource_iterate);
static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
{
- cfs_list_t *list = closure;
+ struct list_head *list = closure;
/* we use l_pending_chain here, because it's unused on clients. */
- LASSERTF(cfs_list_empty(&lock->l_pending_chain),
+ LASSERTF(list_empty(&lock->l_pending_chain),
"lock %p next %p prev %p\n",
lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
/* bug 9573: don't replay locks left after eviction, or
* on a lock so that it does not disapear under us (e.g. due to cancel)
*/
if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) {
- cfs_list_add(&lock->l_pending_chain, list);
+ list_add(&lock->l_pending_chain, list);
LDLM_LOCK_GET(lock);
}
flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
else if (lock->l_granted_mode)
flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
- else if (!cfs_list_empty(&lock->l_res_link))
+ else if (!list_empty(&lock->l_res_link))
flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
else
flags = LDLM_FL_REPLAY;
*/
static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
{
- int canceled;
- CFS_LIST_HEAD(cancels);
+ int canceled;
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
- CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before"
- "replay for namespace %s (%d)\n",
- ldlm_ns_name(ns), ns->ns_nr_unused);
+ CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before"
+ "replay for namespace %s (%d)\n",
+ ldlm_ns_name(ns), ns->ns_nr_unused);
- /* We don't need to care whether or not LRU resize is enabled
- * because the LDLM_CANCEL_NO_WAIT policy doesn't use the
- * count parameter */
- canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
- LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
+ /* We don't need to care whether or not LRU resize is enabled
+ * because the LDLM_CANCEL_NO_WAIT policy doesn't use the
+ * count parameter */
+ canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
+ LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
- CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
- canceled, ldlm_ns_name(ns));
+ CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
+ canceled, ldlm_ns_name(ns));
}
int ldlm_replay_locks(struct obd_import *imp)
{
struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
- CFS_LIST_HEAD(list);
+ struct list_head list = LIST_HEAD_INIT(list);
struct ldlm_lock *lock, *next;
int rc = 0;
ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
- cfs_list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
- cfs_list_del_init(&lock->l_pending_chain);
+ list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
+ list_del_init(&lock->l_pending_chain);
if (rc) {
LDLM_LOCK_RELEASE(lock);
continue; /* or try to do the rest? */
RETURN(rc);
}
-EXPORT_SYMBOL(ldlm_replay_locks);