extern kmem_cache_t *ldlm_lock_slab;
extern struct lustre_lock ldlm_handle_lock;
extern struct list_head ldlm_namespace_list;
-extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
-extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
static DECLARE_MUTEX(ldlm_ref_sem);
-static int ldlm_refcount = 0;
-
+static int ldlm_refcount;
/* LDLM state */
static struct ldlm_state *ldlm_state;
} expired_lock_thread;
#endif
+#if !defined(ENOTSUPP)
+# define ENOTSUPP 524
+#endif
+
#define ELT_STOPPED 0
#define ELT_READY 1
#define ELT_TERMINATE 2
/* this blocking AST will be communicated as part of the
* completion AST instead */
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); RETURN(0);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
+ RETURN(0);
}
if (lock->l_destroyed) {
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
req->rq_send_state = LUSTRE_IMP_FULL;
- req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+ req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
rc = ptlrpc_queue_wait(req);
if (rc != 0)
rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
if (total_enqueue_wait / 1000000 > obd_timeout)
LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
+ down(&lock->l_resource->lr_lvb_sem);
if (lock->l_resource->lr_lvb_len) {
buffers = 2;
size[1] = lock->l_resource->lr_lvb_len;
}
-
+ up(&lock->l_resource->lr_lvb_sem);
+
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
buffers, size, NULL);
ldlm_lock2desc(lock, &body->lock_desc);
if (buffers == 2) {
- void *lvb = lustre_msg_buf(req->rq_reqmsg, 1,
- lock->l_resource->lr_lvb_len);
+ void *lvb;
+
+ down(&lock->l_resource->lr_lvb_sem);
+ lvb = lustre_msg_buf(req->rq_reqmsg, 1,
+ lock->l_resource->lr_lvb_len);
+
memcpy(lvb, lock->l_resource->lr_lvb_data,
lock->l_resource->lr_lvb_len);
+ up(&lock->l_resource->lr_lvb_sem);
}
LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
req->rq_replen = lustre_msg_size(0, NULL);
req->rq_send_state = LUSTRE_IMP_FULL;
- req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+ req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
/* We only send real blocking ASTs after the lock is granted */
l_lock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
LASSERT(req->rq_export);
- lock->l_export = class_export_get(req->rq_export);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ if (req->rq_export->exp_failed) {
+ LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ GOTO(out, err = -ENOTCONN);
+ }
+ lock->l_export = class_export_get(req->rq_export);
+
list_add(&lock->l_export_chain,
&lock->l_export->exp_ldlm_data.led_held_locks);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
cookie = req;
} else {
int buffers = 1;
+ down(&lock->l_resource->lr_lvb_sem);
if (lock->l_resource->lr_lvb_len) {
size[1] = lock->l_resource->lr_lvb_len;
buffers = 2;
}
-
+ up(&lock->l_resource->lr_lvb_sem);
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
"(err=%d, rc=%d)", err, rc);
- if (lock->l_resource->lr_lvb_len > 0 && rc == 0) {
- void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
- lock->l_resource->lr_lvb_len);
- LASSERT(lvb != NULL);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
+ if (rc == 0) {
+ down(&lock->l_resource->lr_lvb_sem);
+ size[1] = lock->l_resource->lr_lvb_len;
+ if (size[1] > 0) {
+ void *lvb = lustre_msg_buf(req->rq_repmsg,
+ 1, size[1]);
+ LASSERTF(lvb != NULL, "req %p, lock %p\n",
+ req, lock);
+
+ memcpy(lvb, lock->l_resource->lr_lvb_data,
+ size[1]);
+ }
+ up(&lock->l_resource->lr_lvb_sem);
+ } else {
+ ldlm_lock_destroy(lock);
+
}
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ptlrpc_error(req);
}
+ l_unlock(&ns->ns_lock);
if (lock->l_granted_mode == LCK_PW &&
!lock->l_readers && !lock->l_writers &&
time_after(jiffies, lock->l_last_used + 10 * HZ)) {
-#ifdef __KERNEL__
- ldlm_bl_to_thread(ns, NULL, lock);
- l_unlock(&ns->ns_lock);
-#else
- l_unlock(&ns->ns_lock);
- ldlm_handle_bl_callback(ns, NULL, lock);
-#endif
+ if (ldlm_bl_to_thread(ns, NULL, lock))
+ ldlm_handle_bl_callback(ns, NULL, lock);
+
EXIT;
return;
}
-
- l_unlock(&ns->ns_lock);
LDLM_LOCK_PUT(lock);
EXIT;
}
list_add_tail(&blwi->blwi_entry, &blp->blp_list);
wake_up(&blp->blp_waitq);
spin_unlock(&blp->blp_lock);
+
+ RETURN(0);
#else
- LBUG();
+ RETURN(-ENOSYS);
#endif
- RETURN(0);
}
static int ldlm_msg_check_version(struct lustre_msg *msg)
struct ldlm_request *dlm_req;
CDEBUG(D_RPCTRACE, "operation %d from nid %s with bad "
- "export cookie "LPX64" (ptl req %d/rep %d); this is "
+ "export cookie "LPX64"; this is "
"normal if this node rebooted with a lock held\n",
req->rq_reqmsg->opc,
ptlrpc_peernid2str(&req->rq_peer, str),
- req->rq_reqmsg->handle.cookie,
- req->rq_request_portal, req->rq_reply_portal);
-
+ req->rq_reqmsg->handle.cookie);
dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
lustre_swab_ldlm_request);
if (dlm_req != NULL)
switch (req->rq_reqmsg->opc) {
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
-#ifdef __KERNEL__
- rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
- ldlm_callback_reply(req, rc);
-#else
- rc = 0;
- ldlm_callback_reply(req, rc);
- ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
-#endif
+ ldlm_callback_reply(req, 0);
+ if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
+ ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
+
break;
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "completion ast\n");
if (req->rq_export == NULL) {
struct ldlm_request *dlm_req;
- CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
- req->rq_reqmsg->opc, req->rq_request_portal,
- req->rq_reply_portal);
+ char str[PTL_NALFMT_SIZE];
+ CERROR("operation %d with bad export from NID %s\n",
+ req->rq_reqmsg->opc,
+ ptlrpc_peernid2str(&req->rq_peer, str));
+
CERROR("--> export cookie: "LPX64"\n",
req->rq_reqmsg->handle.cookie);
dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
wake_up(&expired_lock_thread.elt_waitq);
wait_event(expired_lock_thread.elt_waitq,
expired_lock_thread.elt_state == ELT_STOPPED);
-
+#else
+ ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
#endif
OBD_FREE(ldlm_state, sizeof(*ldlm_state));
{
if ( ldlm_refcount )
CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
- if (kmem_cache_destroy(ldlm_resource_slab) != 0)
- CERROR("couldn't free ldlm resource slab\n");
- if (kmem_cache_destroy(ldlm_lock_slab) != 0)
- CERROR("couldn't free ldlm lock slab\n");
+ LASSERTF(kmem_cache_destroy(ldlm_resource_slab) == 0,
+ "couldn't free ldlm resource slab\n");
+ LASSERTF(kmem_cache_destroy(ldlm_lock_slab) == 0,
+ "couldn't free ldlm lock slab\n");
+
}
/* ldlm_flock.c */
EXPORT_SYMBOL(l_unlock);
/* ldlm_lib.c */
+EXPORT_SYMBOL(client_import_add_conn);
+EXPORT_SYMBOL(client_import_del_conn);
EXPORT_SYMBOL(client_obd_setup);
EXPORT_SYMBOL(client_obd_cleanup);
EXPORT_SYMBOL(client_connect_import);
EXPORT_SYMBOL(target_start_recovery_thread);
EXPORT_SYMBOL(target_stop_recovery_thread);
EXPORT_SYMBOL(target_handle_connect);
+EXPORT_SYMBOL(target_cleanup_recovery);
EXPORT_SYMBOL(target_destroy_export);
EXPORT_SYMBOL(target_cancel_recovery_timer);
EXPORT_SYMBOL(target_send_reply);