* Author: Phil Schwan <phil@clusterfs.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
#include <libcfs/list.h>
#include "ldlm_internal.h"
-#ifdef __KERNEL__
static int ldlm_num_threads;
CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
"number of DLM service threads to start");
-#endif
+
+static char *ldlm_cpts;
+CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
+ "CPU partitions ldlm threads should run on");
extern cfs_mem_cache_t *ldlm_resource_slab;
extern cfs_mem_cache_t *ldlm_lock_slab;
return timeout < 1 ? 1 : timeout;
}
-#ifdef __KERNEL__
-/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
-static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
-static cfs_list_t waiting_locks_list;
-static cfs_timer_t waiting_locks_timer;
-
-static struct expired_lock_thread {
- cfs_waitq_t elt_waitq;
- int elt_state;
- int elt_dump;
- cfs_list_t elt_expired_locks;
-} expired_lock_thread;
-#endif
-
#define ELT_STOPPED 0
#define ELT_READY 1
#define ELT_TERMINATE 2
int blwi_mem_pressure;
};
-#ifdef __KERNEL__
+#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__)
+
+/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
+static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
+static cfs_list_t waiting_locks_list;
+static cfs_timer_t waiting_locks_timer;
+
+static struct expired_lock_thread {
+ cfs_waitq_t elt_waitq;
+ int elt_state;
+ int elt_dump;
+ cfs_list_t elt_expired_locks;
+} expired_lock_thread;
static inline int have_expired_locks(void)
{
LDLM_LOCK_RELEASE(lock);
continue;
}
+
+ if (lock->l_destroyed) {
+ /* release the lock refcount where
+ * waiting_locks_callback() founds */
+ LDLM_LOCK_RELEASE(lock);
+ continue;
+ }
export = class_export_lock_get(lock->l_export, lock);
cfs_spin_unlock_bh(&waiting_locks_spinlock);
}
static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds);
/**
* Check if there is a request in the export request list
/* This is called from within a timer interrupt and cannot schedule */
static void waiting_locks_callback(unsigned long unused)
{
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
+ int need_dump = 0;
-repeat:
cfs_spin_lock_bh(&waiting_locks_spinlock);
while (!cfs_list_empty(&waiting_locks_list)) {
lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
cfs_list_del_init(&lock->l_pending_chain);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
- ldlm_add_waiting_lock(lock);
- goto repeat;
+ if (lock->l_destroyed) {
+ /* relay the lock refcount decrease to
+ * expired lock thread */
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ } else {
+ __ldlm_add_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
+ }
+ continue;
}
/* if timeout overlaps the activation time of suspended timeouts
libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
cfs_list_del_init(&lock->l_pending_chain);
- cfs_spin_unlock_bh(&waiting_locks_spinlock);
- ldlm_add_waiting_lock(lock);
- goto repeat;
+ if (lock->l_destroyed) {
+ /* relay the lock refcount decrease to
+ * expired lock thread */
+ cfs_list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ } else {
+ __ldlm_add_waiting_lock(lock,
+ ldlm_get_enq_timeout(lock));
+ }
+ continue;
}
/* Check if we need to prolong timeout */
cfs_list_del(&lock->l_pending_chain);
cfs_list_add(&lock->l_pending_chain,
&expired_lock_thread.elt_expired_locks);
- }
+ need_dump = 1;
+ }
- if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
- if (obd_dump_on_timeout)
- expired_lock_thread.elt_dump = __LINE__;
+ if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
+ if (obd_dump_on_timeout && need_dump)
+ expired_lock_thread.elt_dump = __LINE__;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- }
+ cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ }
/*
* Make sure the timer will fire again if we have any locks
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
- int ret;
- int timeout = ldlm_get_enq_timeout(lock);
+ int ret;
+ int timeout = ldlm_get_enq_timeout(lock);
- LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+ /* NB: must be called with hold of lock_res_and_lock() */
+ LASSERT(lock->l_res_locked);
+ lock->l_waited = 1;
+
+ LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
cfs_spin_lock_bh(&waiting_locks_spinlock);
if (lock->l_destroyed) {
LDLM_DEBUG(lock, "refreshed");
return 1;
}
-#else /* !__KERNEL__ */
+
+#else /* !HAVE_SERVER_SUPPORT || !__KERNEL__ */
int ldlm_del_waiting_lock(struct ldlm_lock *lock)
{
{
RETURN(0);
}
-#endif /* __KERNEL__ */
-#ifdef HAVE_SERVER_SUPPORT
-# ifndef __KERNEL__
+# ifdef HAVE_SERVER_SUPPORT
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
- LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
- RETURN(1);
+ LASSERT(lock->l_res_locked);
+ LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+ RETURN(1);
}
+
# endif
+#endif /* HAVE_SERVER_SUPPORT && __KERNEL__ */
+
+#ifdef HAVE_SERVER_SUPPORT
static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
const char *ast_type)
ENTRY;
LASSERT(lock != NULL);
- if (rc != 0) {
- rc = ldlm_handle_ast_error(lock, req, rc,
- arg->type == LDLM_BL_CALLBACK
- ? "blocking" : "completion");
- if (rc == -ERESTART)
- cfs_atomic_inc(&arg->restart);
- }
+
+ switch (arg->type) {
+ case LDLM_GL_CALLBACK:
+ /* Update the LVB from disk if the AST failed
+ * (this is a legal race)
+ *
+ * - Glimpse callback of local lock just returns
+ * -ELDLM_NO_LOCK_DATA.
+ * - Glimpse callback of remote lock might return
+ * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
+ */
+ if (rc == -ELDLM_NO_LOCK_DATA) {
+ LDLM_DEBUG(lock, "lost race - client has a lock but no "
+ "inode");
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+ } else if (rc != 0) {
+ rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+ } else {
+ rc = ldlm_res_lvbo_update(lock->l_resource, req, 1);
+ }
+ break;
+ case LDLM_BL_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+ break;
+ case LDLM_CP_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+ break;
+ default:
+ LDLM_ERROR(lock, "invalid opcode for lock callback %d",
+ arg->type);
+ LBUG();
+ }
+
+ /* release extra reference taken in ldlm_ast_fini() */
LDLM_LOCK_RELEASE(lock);
- if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
- cfs_waitq_signal(&arg->waitq);
+ if (rc == -ERESTART)
+ cfs_atomic_inc(&arg->restart);
- ldlm_csa_put(arg);
RETURN(0);
}
-static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
- struct ldlm_cb_set_arg *arg,
- struct ldlm_lock *lock,
- int instant_cancel)
+static inline int ldlm_ast_fini(struct ptlrpc_request *req,
+ struct ldlm_cb_set_arg *arg,
+ struct ldlm_lock *lock,
+ int instant_cancel)
{
- int rc = 0;
- ENTRY;
+ int rc = 0;
+ ENTRY;
- if (unlikely(instant_cancel)) {
- rc = ptl_send_rpc(req, 1);
- ptlrpc_req_finished(req);
- if (rc == 0)
- cfs_atomic_inc(&arg->restart);
- } else {
- LDLM_LOCK_GET(lock);
- cfs_atomic_inc(&arg->rpcs);
- cfs_atomic_inc(&arg->refcount);
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
- }
+ if (unlikely(instant_cancel)) {
+ rc = ptl_send_rpc(req, 1);
+ ptlrpc_req_finished(req);
+ if (rc == 0)
+ cfs_atomic_inc(&arg->restart);
+ } else {
+ LDLM_LOCK_GET(lock);
+ ptlrpc_set_add_req(arg->set, req);
+ }
- RETURN(rc);
+ RETURN(rc);
}
/**
LASSERT(lock);
LASSERT(data != NULL);
- if (lock->l_export->exp_obd->obd_recovering != 0) {
+ if (lock->l_export->exp_obd->obd_recovering != 0)
LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
- ldlm_lock_dump(D_ERROR, lock, 0);
- }
ldlm_lock_reorder_req(lock);
req->rq_interpret_reply = ldlm_cb_interpret;
req->rq_no_resend = 1;
- lock_res(lock->l_resource);
- if (lock->l_granted_mode != lock->l_req_mode) {
- /* this blocking AST will be communicated as part of the
- * completion AST instead */
- unlock_res(lock->l_resource);
- ptlrpc_req_finished(req);
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
- RETURN(0);
- }
+ lock_res_and_lock(lock);
+ if (lock->l_granted_mode != lock->l_req_mode) {
+ /* this blocking AST will be communicated as part of the
+ * completion AST instead */
+ unlock_res_and_lock(lock);
- if (lock->l_destroyed) {
- /* What's the point? */
- unlock_res(lock->l_resource);
- ptlrpc_req_finished(req);
- RETURN(0);
- }
+ ptlrpc_req_finished(req);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
+ RETURN(0);
+ }
+
+ if (lock->l_destroyed) {
+ /* What's the point? */
+ unlock_res_and_lock(lock);
+ ptlrpc_req_finished(req);
+ RETURN(0);
+ }
if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
instant_cancel = 1;
LDLM_DEBUG(lock, "server preparing blocking AST");
ptlrpc_request_set_replen(req);
- if (instant_cancel) {
- unlock_res(lock->l_resource);
- ldlm_lock_cancel(lock);
- } else {
- LASSERT(lock->l_granted_mode == lock->l_req_mode);
- ldlm_add_waiting_lock(lock);
- unlock_res(lock->l_resource);
- }
+ if (instant_cancel) {
+ unlock_res_and_lock(lock);
+ ldlm_lock_cancel(lock);
+ } else {
+ LASSERT(lock->l_granted_mode == lock->l_req_mode);
+ ldlm_add_waiting_lock(lock);
+ unlock_res_and_lock(lock);
+ }
req->rq_send_state = LUSTRE_IMP_FULL;
/* ptlrpc_request_alloc_pack already set timeout */
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
+ int lvb_len;
ENTRY;
LASSERT(lock != NULL);
if (req == NULL)
RETURN(-ENOMEM);
- /* server namespace, doesn't need lock */
- if (lock->l_resource->lr_lvb_len) {
+ /* server namespace, doesn't need lock */
+ lvb_len = ldlm_lvbo_size(lock);
+ if (lvb_len > 0)
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
- lock->l_resource->lr_lvb_len);
- }
+ lvb_len);
rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
if (rc) {
body->lock_handle[0] = lock->l_remote_handle;
body->lock_flags = flags;
ldlm_lock2desc(lock, &body->lock_desc);
- if (lock->l_resource->lr_lvb_len) {
- void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+ if (lvb_len > 0) {
+ void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
- lock_res(lock->l_resource);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
- unlock_res(lock->l_resource);
+ lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
+ lvb_len, RCL_CLIENT);
}
LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
{
- struct ldlm_resource *res = lock->l_resource;
- struct ldlm_request *body;
- struct ptlrpc_request *req;
- int rc;
+ struct ldlm_cb_set_arg *arg = data;
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ struct ldlm_cb_async_args *ca;
+ int rc;
+ struct req_format *req_fmt;
ENTRY;
LASSERT(lock != NULL);
+ if (arg->gl_desc != NULL)
+ /* There is a glimpse descriptor to pack */
+ req_fmt = &RQF_LDLM_GL_DESC_CALLBACK;
+ else
+ req_fmt = &RQF_LDLM_GL_CALLBACK;
+
req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
- &RQF_LDLM_GL_CALLBACK,
- LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK);
+ req_fmt, LUSTRE_DLM_VERSION,
+ LDLM_GL_CALLBACK);
if (req == NULL)
RETURN(-ENOMEM);
+ if (arg->gl_desc != NULL) {
+ /* copy the GL descriptor */
+ union ldlm_gl_desc *desc;
+ desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
+ *desc = *arg->gl_desc;
+ }
+
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
body->lock_handle[0] = lock->l_remote_handle;
ldlm_lock2desc(lock, &body->lock_desc);
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
/* server namespace, doesn't need lock */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
- lock->l_resource->lr_lvb_len);
- res = lock->l_resource;
+ ldlm_lvbo_size(lock));
ptlrpc_request_set_replen(req);
-
req->rq_send_state = LUSTRE_IMP_FULL;
/* ptlrpc_request_alloc_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
+ req->rq_interpret_reply = ldlm_cb_interpret;
+
if (lock->l_export && lock->l_export->exp_nid_stats &&
lock->l_export->exp_nid_stats->nid_ldlm_stats)
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
- rc = ptlrpc_queue_wait(req);
- /* Update the LVB from disk if the AST failed (this is a legal race)
- *
- * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
- * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
- * when inode is cleared. LU-274
- */
- if (rc == -ELDLM_NO_LOCK_DATA) {
- LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
- ldlm_res_lvbo_update(res, NULL, 1);
- } else if (rc != 0) {
- rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
- } else {
- rc = ldlm_res_lvbo_update(res, req, 1);
- }
+ rc = ldlm_ast_fini(req, arg, lock, 0);
- ptlrpc_req_finished(req);
- if (rc == -ERESTART)
- ldlm_reprocess_all(res);
+ RETURN(rc);
+}
- RETURN(rc);
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
+{
+ int rc;
+ ENTRY;
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
+ LDLM_WORK_GL_AST);
+ if (rc == -ERESTART)
+ ldlm_reprocess_all(res);
+
+ RETURN(rc);
+}
+
+/* return ldlm lock associated with a lock callback request */
+struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req)
+{
+ struct ldlm_cb_async_args *ca;
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ ca = ptlrpc_req_async_args(req);
+ lock = ca->ca_lock;
+ if (lock == NULL)
+ RETURN(ERR_PTR(-EFAULT));
+
+ RETURN(lock);
}
+EXPORT_SYMBOL(ldlm_request_lock);
static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
struct lprocfs_stats *srv_stats)
LASSERT(req->rq_export);
- if (req->rq_rqbd->rqbd_service->srv_stats)
- ldlm_svc_get_eopc(dlm_req,
- req->rq_rqbd->rqbd_service->srv_stats);
+ if (ptlrpc_req2svc(req)->srv_stats != NULL)
+ ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
if (req->rq_export && req->rq_export->exp_nid_stats &&
req->rq_export->exp_nid_stats->nid_ldlm_stats)
if (unlikely(flags & LDLM_FL_REPLAY)) {
/* Find an existing lock in the per-export lock hash */
+ /* In the function below, .hs_keycmp resolves to
+ * ldlm_export_lock_keycmp() */
+ /* coverity[overrun-buffer-val] */
lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
(void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
/* based on the assumption that lvb size never changes during
* resource life time otherwise it need resource->lr_lock's
* protection */
- if (lock->l_resource->lr_lvb_len) {
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
- RCL_SERVER,
- lock->l_resource->lr_lvb_len);
- }
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER, ldlm_lvbo_size(lock));
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
"(err=%d, rc=%d)", err, rc);
if (rc == 0) {
- if (lock->l_resource->lr_lvb_len > 0) {
- /* MDT path won't handle lr_lvb_data, so
- * lock/unlock better be contained in the
- * if block */
- void *lvb;
-
- lvb = req_capsule_server_get(&req->rq_pill,
- &RMF_DLM_LVB);
- LASSERTF(lvb != NULL, "req %p, lock %p\n",
- req, lock);
- lock_res(lock->l_resource);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
- unlock_res(lock->l_resource);
- }
+ int lvb_len = ldlm_lvbo_size(lock);
+
+ if (lvb_len > 0) {
+ void *buf;
+ int buflen;
+
+ buf = req_capsule_server_get(&req->rq_pill,
+ &RMF_DLM_LVB);
+ LASSERTF(buf != NULL, "req %p, lock %p\n",
+ req, lock);
+ buflen = req_capsule_get_size(&req->rq_pill,
+ &RMF_DLM_LVB, RCL_SERVER);
+ buflen = ldlm_lvbo_fill(lock, buf, buflen);
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
+ buflen, RCL_SERVER);
+ }
} else {
lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock);
if (first >= count)
RETURN(0);
+ if (count == 1 && dlm_req->lock_handle[0].cookie == 0)
+ RETURN(0);
+
/* There is no lock on the server at the replay time,
* skip lock cancelling to make replay tests to pass. */
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
struct ldlm_request *dlm_req,
struct ldlm_lock *lock)
{
+ int lvb_len;
CFS_LIST_HEAD(ast_list);
ENTRY;
}
}
+ lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
+ if (lvb_len > 0) {
+ if (lock->l_lvb_len > 0) {
+ /* for extent lock, lvb contains ost_lvb{}. */
+ LASSERT(lock->l_lvb_data != NULL);
+ LASSERTF(lock->l_lvb_len == lvb_len,
+ "preallocated %d, actual %d.\n",
+ lock->l_lvb_len, lvb_len);
+ } else { /* for layout lock, lvb has variable length */
+ void *lvb_data;
+
+ OBD_ALLOC(lvb_data, lvb_len);
+ if (lvb_data == NULL)
+ LDLM_ERROR(lock, "no memory.\n");
+
+ lock_res_and_lock(lock);
+ if (lvb_data == NULL) {
+ lock->l_flags |= LDLM_FL_FAILED;
+ } else {
+ LASSERT(lock->l_lvb_data == NULL);
+ lock->l_lvb_data = lvb_data;
+ lock->l_lvb_len = lvb_len;
+ }
+ unlock_res_and_lock(lock);
+ }
+ }
+
lock_res_and_lock(lock);
if (lock->l_destroyed ||
lock->l_granted_mode == lock->l_req_mode) {
if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
/* Pass it on to mdc (the "export" in this case) */
- rc = obd_set_info_async(req->rq_export,
+ rc = obd_set_info_async(req->rq_svc_thread->t_env,
+ req->rq_export,
sizeof(KEY_HSM_COPYTOOL_SEND),
KEY_HSM_COPYTOOL_SEND,
vallen, val, NULL);
LASSERT(req->rq_export != NULL);
LASSERT(req->rq_export->exp_obd != NULL);
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
- case LDLM_BL_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK))
- RETURN(0);
- break;
- case LDLM_CP_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK))
- RETURN(0);
- break;
- case LDLM_GL_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK))
- RETURN(0);
- break;
+ switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+ case LDLM_BL_CALLBACK:
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+ RETURN(0);
+ break;
+ case LDLM_CP_CALLBACK:
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
+ RETURN(0);
+ break;
+ case LDLM_GL_CALLBACK:
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
+ RETURN(0);
+ break;
case LDLM_SET_INFO:
rc = ldlm_handle_setinfo(req);
ldlm_callback_reply(req, rc);
RETURN(0);
ldlm_callback_reply(req, rc);
RETURN(0);
- case OBD_QC_CALLBACK:
- req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
- if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
- RETURN(0);
- rc = target_handle_qc_callback(req);
- ldlm_callback_reply(req, rc);
- RETURN(0);
- case QUOTA_DQACQ:
- case QUOTA_DQREL:
- /* reply in handler */
- req_capsule_set(&req->rq_pill, &RQF_MDS_QUOTA_DQACQ);
- rc = target_handle_dqacq_callback(req);
- RETURN(0);
case LLOG_ORIGIN_HANDLE_CREATE:
req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
RETURN(0);
- rc = llog_origin_handle_create(req);
+ rc = llog_origin_handle_open(req);
ldlm_callback_reply(req, rc);
RETURN(0);
case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
case LDLM_CANCEL:
req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
CDEBUG(D_INODE, "cancel\n");
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
- RETURN(0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET))
+ RETURN(0);
rc = ldlm_handle_cancel(req);
if (rc)
break;
static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
.hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
- .hpreq_check = ldlm_cancel_hpreq_check
+ .hpreq_check = ldlm_cancel_hpreq_check,
+ .hpreq_fini = NULL,
};
static int ldlm_hpreq_handler(struct ptlrpc_request *req)
LASSERT(!lock->l_blocking_lock);
lock->l_flags |= LDLM_FL_AST_SENT;
- if (lock->l_export && lock->l_export->exp_lock_hash &&
- !cfs_hlist_unhashed(&lock->l_exp_hash))
- cfs_hash_del(lock->l_export->exp_lock_hash,
- &lock->l_remote_handle, &lock->l_exp_hash);
+ if (lock->l_export && lock->l_export->exp_lock_hash) {
+ /* NB: it's safe to call cfs_hash_del() even lock isn't
+ * in exp_lock_hash. */
+ /* In the function below, .hs_keycmp resolves to
+ * ldlm_export_lock_keycmp() */
+ /* coverity[overrun-buffer-val] */
+ cfs_hash_del(lock->l_export->exp_lock_hash,
+ &lock->l_remote_handle, &lock->l_exp_hash);
+ }
+
cfs_list_add_tail(&lock->l_rk_ast, rpc_list);
LDLM_LOCK_GET(lock);
ENTRY;
cfs_hash_putref(exp->exp_lock_hash);
exp->exp_lock_hash = NULL;
+
+ ldlm_destroy_flock_export(exp);
EXIT;
}
EXPORT_SYMBOL(ldlm_destroy_export);
static int ldlm_setup(void)
{
- struct ldlm_bl_pool *blp;
+ static struct ptlrpc_service_conf conf;
+ struct ldlm_bl_pool *blp = NULL;
int rc = 0;
- int ldlm_min_threads = LDLM_THREADS_AUTO_MIN;
- int ldlm_max_threads = LDLM_THREADS_AUTO_MAX;
#ifdef __KERNEL__
int i;
#endif
#ifdef LPROCFS
rc = ldlm_proc_setup();
if (rc != 0)
- GOTO(out_free, rc);
+ GOTO(out, rc);
#endif
-#ifdef __KERNEL__
- if (ldlm_num_threads) {
- /* If ldlm_num_threads is set, it is the min and the max. */
- if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX)
- ldlm_num_threads = LDLM_THREADS_AUTO_MAX;
- if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN)
- ldlm_num_threads = LDLM_THREADS_AUTO_MIN;
- ldlm_min_threads = ldlm_max_threads = ldlm_num_threads;
- }
-#endif
-
- ldlm_state->ldlm_cb_service =
- ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
- LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, 2,
- ldlm_callback_handler, "ldlm_cbd",
- ldlm_svc_proc_dir, NULL,
- ldlm_min_threads, ldlm_max_threads,
- "ldlm_cb",
- LCT_MD_THREAD|LCT_DT_THREAD, NULL);
-
- if (!ldlm_state->ldlm_cb_service) {
- CERROR("failed to start service\n");
- GOTO(out_proc, rc = -ENOMEM);
- }
+ memset(&conf, 0, sizeof(conf));
+ conf = (typeof(conf)) {
+ .psc_name = "ldlm_cbd",
+ .psc_watchdog_factor = 2,
+ .psc_buf = {
+ .bc_nbufs = LDLM_NBUFS,
+ .bc_buf_size = LDLM_BUFSIZE,
+ .bc_req_max_size = LDLM_MAXREQSIZE,
+ .bc_rep_max_size = LDLM_MAXREPSIZE,
+ .bc_req_portal = LDLM_CB_REQUEST_PORTAL,
+ .bc_rep_portal = LDLM_CB_REPLY_PORTAL,
+ },
+ .psc_thr = {
+ .tc_thr_name = "ldlm_cb",
+ .tc_thr_factor = LDLM_THR_FACTOR,
+ .tc_nthrs_init = LDLM_NTHRS_INIT,
+ .tc_nthrs_base = LDLM_NTHRS_BASE,
+ .tc_nthrs_max = LDLM_NTHRS_MAX,
+ .tc_nthrs_user = ldlm_num_threads,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
+ },
+ .psc_cpt = {
+ .cc_pattern = ldlm_cpts,
+ },
+ .psc_ops = {
+ .so_req_handler = ldlm_callback_handler,
+ },
+ };
+ ldlm_state->ldlm_cb_service = \
+ ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
+ if (IS_ERR(ldlm_state->ldlm_cb_service)) {
+ CERROR("failed to start service\n");
+ rc = PTR_ERR(ldlm_state->ldlm_cb_service);
+ ldlm_state->ldlm_cb_service = NULL;
+ GOTO(out, rc);
+ }
#ifdef HAVE_SERVER_SUPPORT
- ldlm_state->ldlm_cancel_service =
- ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
- LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, 6,
- ldlm_cancel_handler, "ldlm_canceld",
- ldlm_svc_proc_dir, NULL,
- ldlm_min_threads, ldlm_max_threads,
- "ldlm_cn",
- LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
- ldlm_hpreq_handler);
-
- if (!ldlm_state->ldlm_cancel_service) {
- CERROR("failed to start service\n");
- GOTO(out_proc, rc = -ENOMEM);
- }
+ memset(&conf, 0, sizeof(conf));
+ conf = (typeof(conf)) {
+ .psc_name = "ldlm_canceld",
+ .psc_watchdog_factor = 6,
+ .psc_buf = {
+ .bc_nbufs = LDLM_NBUFS,
+ .bc_buf_size = LDLM_BUFSIZE,
+ .bc_req_max_size = LDLM_MAXREQSIZE,
+ .bc_rep_max_size = LDLM_MAXREPSIZE,
+ .bc_req_portal = LDLM_CANCEL_REQUEST_PORTAL,
+ .bc_rep_portal = LDLM_CANCEL_REPLY_PORTAL,
+
+ },
+ .psc_thr = {
+ .tc_thr_name = "ldlm_cn",
+ .tc_thr_factor = LDLM_THR_FACTOR,
+ .tc_nthrs_init = LDLM_NTHRS_INIT,
+ .tc_nthrs_base = LDLM_NTHRS_BASE,
+ .tc_nthrs_max = LDLM_NTHRS_MAX,
+ .tc_nthrs_user = ldlm_num_threads,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_MD_THREAD | \
+ LCT_DT_THREAD | \
+ LCT_CL_THREAD,
+ },
+ .psc_cpt = {
+ .cc_pattern = ldlm_cpts,
+ },
+ .psc_ops = {
+ .so_req_handler = ldlm_cancel_handler,
+ .so_hpreq_handler = ldlm_hpreq_handler,
+ },
+ };
+ ldlm_state->ldlm_cancel_service = \
+ ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
+ if (IS_ERR(ldlm_state->ldlm_cancel_service)) {
+ CERROR("failed to start service\n");
+ rc = PTR_ERR(ldlm_state->ldlm_cancel_service);
+ ldlm_state->ldlm_cancel_service = NULL;
+ GOTO(out, rc);
+ }
#endif
- OBD_ALLOC(blp, sizeof(*blp));
- if (blp == NULL)
- GOTO(out_proc, rc = -ENOMEM);
+ OBD_ALLOC(blp, sizeof(*blp));
+ if (blp == NULL)
+ GOTO(out, rc = -ENOMEM);
ldlm_state->ldlm_bl_pool = blp;
cfs_spin_lock_init(&blp->blp_lock);
cfs_waitq_init(&blp->blp_waitq);
cfs_atomic_set(&blp->blp_num_threads, 0);
cfs_atomic_set(&blp->blp_busy_threads, 0);
- blp->blp_min_threads = ldlm_min_threads;
- blp->blp_max_threads = ldlm_max_threads;
#ifdef __KERNEL__
- for (i = 0; i < blp->blp_min_threads; i++) {
- rc = ldlm_bl_thread_start(blp);
- if (rc < 0)
- GOTO(out_thread, rc);
- }
+ if (ldlm_num_threads == 0) {
+ blp->blp_min_threads = LDLM_NTHRS_INIT;
+ blp->blp_max_threads = LDLM_NTHRS_MAX;
+ } else {
+ blp->blp_min_threads = blp->blp_max_threads = \
+ min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
+ ldlm_num_threads));
+ }
+
+ for (i = 0; i < blp->blp_min_threads; i++) {
+ rc = ldlm_bl_thread_start(blp);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
# ifdef HAVE_SERVER_SUPPORT
- rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service);
- if (rc)
- GOTO(out_thread, rc);
-# endif
-
- rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service);
- if (rc)
- GOTO(out_thread, rc);
-
CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
expired_lock_thread.elt_state = ELT_STOPPED;
cfs_waitq_init(&expired_lock_thread.elt_waitq);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
- if (rc < 0) {
- CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
- GOTO(out_thread, rc);
- }
+ if (rc < 0) {
+ CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
+ GOTO(out, rc);
+ }
cfs_wait_event(expired_lock_thread.elt_waitq,
expired_lock_thread.elt_state == ELT_READY);
-#endif
+# endif /* HAVE_SERVER_SUPPORT */
-#ifdef __KERNEL__
- rc = ldlm_pools_init();
- if (rc)
- GOTO(out_thread, rc);
+ rc = ldlm_pools_init();
+ if (rc) {
+ CERROR("Failed to initialize LDLM pools: %d\n", rc);
+ GOTO(out, rc);
+ }
#endif
- RETURN(0);
+ RETURN(0);
-#ifdef __KERNEL__
- out_thread:
-# ifdef HAVE_SERVER_SUPPORT
- ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
-# endif
- ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
-#endif
-
- out_proc:
-#ifdef LPROCFS
- ldlm_proc_cleanup();
- out_free:
-#endif
- OBD_FREE(ldlm_state, sizeof(*ldlm_state));
- ldlm_state = NULL;
- return rc;
+ out:
+ ldlm_cleanup();
+ RETURN(rc);
}
static int ldlm_cleanup(void)
{
-#ifdef __KERNEL__
- struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
-#endif
ENTRY;
if (!cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
#ifdef __KERNEL__
ldlm_pools_fini();
-#endif
-#ifdef __KERNEL__
- while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
- struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
+ if (ldlm_state->ldlm_bl_pool != NULL) {
+ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
- cfs_init_completion(&blp->blp_comp);
+ while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
+ struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
- cfs_spin_lock(&blp->blp_lock);
- cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list);
- cfs_waitq_signal(&blp->blp_waitq);
- cfs_spin_unlock(&blp->blp_lock);
+ cfs_init_completion(&blp->blp_comp);
- cfs_wait_for_completion(&blp->blp_comp);
- }
- OBD_FREE(blp, sizeof(*blp));
+ cfs_spin_lock(&blp->blp_lock);
+ cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list);
+ cfs_waitq_signal(&blp->blp_waitq);
+ cfs_spin_unlock(&blp->blp_lock);
+
+ cfs_wait_for_completion(&blp->blp_comp);
+ }
- ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+ OBD_FREE(blp, sizeof(*blp));
+ }
+#endif /* __KERNEL__ */
+
+ if (ldlm_state->ldlm_cb_service != NULL)
+ ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
# ifdef HAVE_SERVER_SUPPORT
- ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+ if (ldlm_state->ldlm_cancel_service != NULL)
+ ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
# endif
- ldlm_proc_cleanup();
- expired_lock_thread.elt_state = ELT_TERMINATE;
- cfs_waitq_signal(&expired_lock_thread.elt_waitq);
- cfs_wait_event(expired_lock_thread.elt_waitq,
- expired_lock_thread.elt_state == ELT_STOPPED);
-#else /* !__KERNEL__ */
- ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+#ifdef __KERNEL__
+ ldlm_proc_cleanup();
+
# ifdef HAVE_SERVER_SUPPORT
- ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+ if (expired_lock_thread.elt_state != ELT_STOPPED) {
+ expired_lock_thread.elt_state = ELT_TERMINATE;
+ cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+ cfs_wait_event(expired_lock_thread.elt_waitq,
+ expired_lock_thread.elt_state == ELT_STOPPED);
+ }
# endif
#endif /* __KERNEL__ */
if (ldlm_resource_slab == NULL)
return -ENOMEM;
- ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
- sizeof(struct ldlm_lock), 0,
- CFS_SLAB_HWCACHE_ALIGN | CFS_SLAB_DESTROY_BY_RCU);
- if (ldlm_lock_slab == NULL) {
- cfs_mem_cache_destroy(ldlm_resource_slab);
- return -ENOMEM;
- }
+ ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
+ sizeof(struct ldlm_lock), 0,
+ CFS_SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
+ if (ldlm_lock_slab == NULL) {
+ cfs_mem_cache_destroy(ldlm_resource_slab);
+ return -ENOMEM;
+ }
ldlm_interval_slab = cfs_mem_cache_create("interval_node",
sizeof(struct ldlm_interval),
EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
EXPORT_SYMBOL(ldlm_lock_change_resource);
EXPORT_SYMBOL(ldlm_it2str);
-EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
EXPORT_SYMBOL(ldlm_server_blocking_ast);
EXPORT_SYMBOL(ldlm_server_completion_ast);
EXPORT_SYMBOL(ldlm_server_glimpse_ast);
+EXPORT_SYMBOL(ldlm_glimpse_locks);
EXPORT_SYMBOL(ldlm_handle_enqueue);
EXPORT_SYMBOL(ldlm_handle_enqueue0);
EXPORT_SYMBOL(ldlm_handle_cancel);