*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
#define DEBUG_SUBSYSTEM S_LDLM
#include <linux/kthread.h>
+#include <linux/list.h>
#include <libcfs/libcfs.h>
#include <lustre_dlm.h>
#include <obd_class.h>
-#include <libcfs/list.h>
#include "ldlm_internal.h"
static int ldlm_num_threads;
-CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
- "number of DLM service threads to start");
+module_param(ldlm_num_threads, int, 0444);
+MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
static char *ldlm_cpts;
-CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
- "CPU partitions ldlm threads should run on");
+module_param(ldlm_cpts, charp, 0444);
+MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
static struct mutex ldlm_ref_mutex;
static int ldlm_refcount;
static struct ldlm_state *ldlm_state;
-inline cfs_time_t round_timeout(cfs_time_t timeout)
+static inline cfs_time_t round_timeout(cfs_time_t timeout)
{
return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
}
lock = list_entry(expired->next, struct ldlm_lock,
l_pending_chain);
- if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE &&
+ if ((void *)lock < LP_POISON + PAGE_SIZE &&
(void *)lock >= LP_POISON) {
spin_unlock_bh(&waiting_locks_spinlock);
CERROR("free lock on elt list %p\n", lock);
}
list_del_init(&lock->l_pending_chain);
if ((void *)lock->l_export <
- LP_POISON + PAGE_CACHE_SIZE &&
+ LP_POISON + PAGE_SIZE &&
(void *)lock->l_export >= LP_POISON) {
CERROR("lock with free export on elt list %p\n",
lock->l_export);
* Perform lock cleanup if AST reply came with error.
*/
static int ldlm_handle_ast_error(struct ldlm_lock *lock,
- struct ptlrpc_request *req, int rc,
- const char *ast_type)
+ struct ptlrpc_request *req, int rc,
+ const char *ast_type)
{
lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
if (!req->rq_replied || (rc && rc != -EINVAL)) {
if (lock->l_export && lock->l_export->exp_libclient) {
- LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
- " timeout, just cancelling lock", ast_type,
+ LDLM_DEBUG(lock,
+ "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock",
+ ast_type, req, req->rq_xid,
libcfs_nid2str(peer.nid));
ldlm_lock_cancel(lock);
rc = -ERESTART;
} else if (ldlm_is_cancel(lock)) {
- LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
- "cancel was received (AST reply lost?)",
- ast_type, libcfs_nid2str(peer.nid));
+ LDLM_DEBUG(lock,
+ "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
+ ast_type, req, req->rq_xid,
+ libcfs_nid2str(peer.nid));
ldlm_lock_cancel(lock);
rc = -ERESTART;
} else {
- LDLM_ERROR(lock, "client (nid %s) %s %s AST "
- "(req status %d rc %d), evict it",
+ LDLM_ERROR(lock,
+ "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
libcfs_nid2str(peer.nid),
req->rq_replied ? "returned error from" :
"failed to reply to",
- ast_type,
+ ast_type, req, req->rq_xid,
(req->rq_repmsg != NULL) ?
lustre_msg_get_status(req->rq_repmsg) : 0,
rc);
if (rc == -EINVAL) {
struct ldlm_resource *res = lock->l_resource;
- LDLM_DEBUG(lock, "client (nid %s) returned %d"
- " from %s AST - normal race",
+ LDLM_DEBUG(lock,
+ "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
libcfs_nid2str(peer.nid),
req->rq_repmsg ?
lustre_msg_get_status(req->rq_repmsg) : -1,
- ast_type);
+ ast_type, req, req->rq_xid);
if (res) {
/* update lvbo to return proper attributes.
* see bug 23174 */
/* Don't need to do anything here. */
RETURN(0);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
+ LDLM_DEBUG(lock, "dropping BL AST");
+ RETURN(0);
+ }
+
LASSERT(lock);
LASSERT(data != NULL);
if (lock->l_export->exp_obd->obd_recovering != 0)
LDLM_DEBUG(lock, "server preparing blocking AST");
ptlrpc_request_set_replen(req);
+ ldlm_set_cbpending(lock);
if (instant_cancel) {
unlock_res_and_lock(lock);
ldlm_lock_cancel(lock);
GOTO(out, rc = -EPROTO);
}
-#if 0
- /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
- against server's _CONNECT_SUPPORTED flags? (I don't want to use
- ibits for mgc/mgs) */
-
- /* INODEBITS_INTEROP: Perform conversion from plain lock to
- * inodebits lock if client does not support them. */
- if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) &&
- (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
- dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
- dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
- if (dlm_req->lock_desc.l_req_mode == LCK_PR)
- dlm_req->lock_desc.l_req_mode = LCK_CR;
- }
-#endif
-
if (unlikely((flags & LDLM_FL_REPLAY) ||
(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
/* Find an existing lock in the per-export lock hash */
lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
(void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
- DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
- LPX64, lock->l_handle.h_cookie);
+ DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx",
+ lock->l_handle.h_cookie);
flags |= LDLM_FL_RESENT;
GOTO(existing_lock, rc = 0);
}
* without them. */
lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
LDLM_FL_INHERIT_MASK);
+
+ ldlm_convert_policy_to_local(req->rq_export,
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+ lock->l_req_extent = lock->l_policy_data.l_extent;
+
existing_lock:
if (flags & LDLM_FL_HAS_INTENT) {
GOTO(out, rc);
}
- if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
- ldlm_convert_policy_to_local(req->rq_export,
- dlm_req->lock_desc.l_resource.lr_type,
- &dlm_req->lock_desc.l_policy_data,
- &lock->l_policy_data);
- if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
- lock->l_req_extent = lock->l_policy_data.l_extent;
-
err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
if (err) {
if ((int)err < 0)
if (unlikely(!ldlm_is_cancel_on_block(lock) ||
!(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
CERROR("Granting sync lock to libclient. "
- "req fl %d, rep fl %d, lock fl "LPX64"\n",
+ "req fl %d, rep fl %d, lock fl %#llx\n",
dlm_req->lock_flags, dlm_rep->lock_flags,
lock->l_flags);
LDLM_ERROR(lock, "sync lock");
it = req_capsule_client_get(&req->rq_pill,
&RMF_LDLM_INTENT);
if (it != NULL) {
- CERROR("This is intent %s ("LPU64")\n",
+ CERROR("This is intent %s (%llu)\n",
ldlm_it2str(it->opc), it->opc);
}
}
lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
if (!lock) {
LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
- "lock (cookie "LPU64")",
+ "lock (cookie %llu)",
dlm_req->lock_handle[i].cookie);
continue;
}
}
static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
- const char *msg, int rc,
- struct lustre_handle *handle)
+ const char *msg, int rc,
+ const struct lustre_handle *handle)
{
DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
- "%s: [nid %s] [rc %d] [lock "LPX64"]",
+ "%s: [nid %s] [rc %d] [lock %#llx]",
msg, libcfs_id2str(req->rq_peer), rc,
handle ? handle->cookie : 0);
if (req->rq_no_reply)
lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
if (!lock) {
- CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
+ CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock "
"disappeared\n", dlm_req->lock_handle[0].cookie);
rc = ldlm_callback_reply(req, -EINVAL);
ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
lock_res_and_lock(lock);
lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
LDLM_FL_AST_MASK);
- if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- /* If somebody cancels lock and cache is already dropped,
- * or lock is failed before cp_ast received on client,
- * we can tell the server we have no lock. Otherwise, we
- * should send cancel after dropping the cache. */
+ if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
+ /* If somebody cancels lock and cache is already dropped,
+ * or lock is failed before cp_ast received on client,
+ * we can tell the server we have no lock. Otherwise, we
+ * should send cancel after dropping the cache. */
if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
- ldlm_is_failed(lock)) {
- LDLM_DEBUG(lock, "callback on lock "
- LPX64" - lock disappeared\n",
- dlm_req->lock_handle[0].cookie);
- unlock_res_and_lock(lock);
- LDLM_LOCK_RELEASE(lock);
- rc = ldlm_callback_reply(req, -EINVAL);
- ldlm_callback_errmsg(req, "Operate on stale lock", rc,
- &dlm_req->lock_handle[0]);
- RETURN(0);
- }
+ ldlm_is_failed(lock)) {
+ LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared",
+ dlm_req->lock_handle[0].cookie);
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ rc = ldlm_callback_reply(req, -EINVAL);
+ ldlm_callback_errmsg(req, "Operate on stale lock", rc,
+ &dlm_req->lock_handle[0]);
+ RETURN(0);
+ }
/* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast. */
- ldlm_lock_remove_from_lru(lock);
+ ldlm_lock_remove_from_lru(lock);
ldlm_set_bl_ast(lock);
- }
+ }
unlock_res_and_lock(lock);
/* We want the ost thread to get this reply so that it can respond
struct ldlm_request *dlm_req;
CERROR("%s from %s arrived at %lu with bad export cookie "
- LPU64"\n",
+ "%llu\n",
ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
libcfs_nid2str(req->rq_peer.nid),
req->rq_arrival_time.tv_sec,
if (lustre_handle_equal(&dlm_req->lock_handle[i],
&lockh)) {
DEBUG_REQ(D_RPCTRACE, req,
- "Prio raised by lock "LPX64".", lockh.cookie);
+ "Prio raised by lock %#llx.", lockh.cookie);
rc = 1;
break;
void ldlm_revoke_export_locks(struct obd_export *exp)
{
struct list_head rpc_list;
- ENTRY;
+ ENTRY;
INIT_LIST_HEAD(&rpc_list);
- cfs_hash_for_each_empty(exp->exp_lock_hash,
- ldlm_revoke_lock_cb, &rpc_list);
- ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
- LDLM_WORK_REVOKE_AST);
+ cfs_hash_for_each_nolock(exp->exp_lock_hash,
+ ldlm_revoke_lock_cb, &rpc_list, 0);
+ ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+ LDLM_WORK_REVOKE_AST);
- EXIT;
+ EXIT;
}
EXPORT_SYMBOL(ldlm_revoke_export_locks);
#endif /* HAVE_SERVER_SUPPORT */