wake_up(&expired_lock_thread.elt_waitq);
while (1) {
+ struct list_head *tmp, *n, work_list;
l_wait_event(expired_lock_thread.elt_waitq,
have_expired_locks() ||
expired_lock_thread.elt_state == ELT_TERMINATE,
spin_lock_bh(&expired_lock_thread.elt_lock);
while (!list_empty(expired)) {
- struct obd_export *export;
struct ldlm_lock *lock;
- lock = list_entry(expired->next, struct ldlm_lock,
- l_pending_chain);
- if ((void *)lock < LP_POISON + PAGE_SIZE &&
- (void *)lock >= LP_POISON) {
- CERROR("free lock on elt list %p\n", lock);
- LBUG();
- }
- list_del_init(&lock->l_pending_chain);
- if ((void *)lock->l_export < LP_POISON + PAGE_SIZE &&
- (void *)lock->l_export >= LP_POISON + PAGE_SIZE) {
- CERROR("lock with free export on elt list %p\n",
- export);
- lock->l_export = NULL;
- LDLM_ERROR(lock, "free export\n");
- continue;
+ list_add(&work_list, expired);
+ list_del_init(expired);
+
+ list_for_each_entry(lock, &work_list, l_pending_chain) {
+ LDLM_DEBUG(lock, "moving to work list");
}
- export = class_export_get(lock->l_export);
+
spin_unlock_bh(&expired_lock_thread.elt_lock);
- ptlrpc_fail_export(export);
- class_export_put(export);
+
+ list_for_each_safe(tmp, n, &work_list) {
+ lock = list_entry(tmp, struct ldlm_lock,
+ l_pending_chain);
+ ptlrpc_fail_export(lock->l_export);
+ }
+
+
+ if (!list_empty(&work_list)) {
+ list_for_each_entry(lock, &work_list, l_pending_chain) {
+ LDLM_ERROR(lock, "still on work list!");
+ }
+ }
+ LASSERTF (list_empty(&work_list),
+ "some exports not failed properly\n");
+
spin_lock_bh(&expired_lock_thread.elt_lock);
}
spin_unlock_bh(&expired_lock_thread.elt_lock);
l_pending_chain);
if ((lock->l_callback_timeout > jiffies) ||
- (lock->l_req_mode == LCK_GROUP))
+ (lock->l_req_mode == LCK_CW))
break;
LDLM_ERROR(lock, "lock callback timer expired: evicting client "
- "%s@%s nid %s ",
+ "%s@%s nid "LPX64" (%s) ",
lock->l_export->exp_client_uuid.uuid,
lock->l_export->exp_connection->c_remote_uuid.uuid,
- ptlrpc_peernid2str(&lock->l_export->exp_connection->c_peer, str));
+ lock->l_export->exp_connection->c_peer.peer_nid,
+ portals_nid2str(lock->l_export->exp_connection->c_peer.peer_ni->pni_number,
+ lock->l_export->exp_connection->c_peer.peer_nid,
+ str));
spin_lock_bh(&expired_lock_thread.elt_lock);
list_del(&lock->l_pending_chain);
#endif /* __KERNEL__ */
-static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,const char *ast_type)
+static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, char *ast_type)
{
- struct ptlrpc_connection *conn = lock->l_export->exp_connection;
+ const struct ptlrpc_connection *conn = lock->l_export->exp_connection;
char str[PTL_NALFMT_SIZE];
- LDLM_ERROR(lock, "%s AST failed (%d): evicting client %s@%s NID "LPX64
- " (%s)", ast_type, rc, lock->l_export->exp_client_uuid.uuid,
- conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
- ptlrpc_peernid2str(&conn->c_peer, str));
+ CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
+ ", mode %s: evicting client %s@%s NID "LPX64" (%s)\n",
+ ast_type, rc,
+ lock->l_resource->lr_name.name[0],
+ lock->l_resource->lr_name.name[1],
+ ldlm_lockname[lock->l_granted_mode],
+ lock->l_export->exp_client_uuid.uuid,
+ conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
+ portals_nid2str(conn->c_peer.peer_ni->pni_number,
+ conn->c_peer.peer_nid, str));
ptlrpc_fail_export(lock->l_export);
}
-static int ldlm_handle_ast_error(struct ldlm_lock *lock,
- struct ptlrpc_request *req, int rc,
- const char *ast_type)
-{
- struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer;
- char str[PTL_NALFMT_SIZE];
-
- if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
- LASSERT(lock->l_export);
- if (lock->l_export->exp_libclient) {
- LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
- " timeout, just cancelling lock", ast_type,
- ptlrpc_peernid2str(peer, str));
- ldlm_lock_cancel(lock);
- rc = -ERESTART;
- } else {
- ldlm_del_waiting_lock(lock);
- ldlm_failed_ast(lock, rc, ast_type);
- }
- } else if (rc) {
- if (rc == -EINVAL)
- LDLM_DEBUG(lock, "client (nid %s) returned %d"
- " from %s AST - normal race",
- ptlrpc_peernid2str(peer, str),
- req->rq_repmsg->status, ast_type);
- else
- LDLM_ERROR(lock, "client (nid %s) returned %d "
- "from %s AST", ptlrpc_peernid2str(peer, str),
- (req->rq_repmsg != NULL) ?
- req->rq_repmsg->status : 0, ast_type);
- ldlm_lock_cancel(lock);
- /* Server-side AST functions are called from ldlm_reprocess_all,
- * which needs to be told to please restart its reprocessing. */
- rc = -ERESTART;
- }
-
- return rc;
-}
-
int ldlm_server_blocking_ast(struct ldlm_lock *lock,
struct ldlm_lock_desc *desc,
void *data, int flag)
req->rq_send_state = LUSTRE_IMP_FULL;
req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
rc = ptlrpc_queue_wait(req);
- if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+ if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+ LASSERT(lock->l_export);
+ if (lock->l_export->exp_libclient) {
+ CDEBUG(D_HA, "BLOCKING AST to liblustre client (nid "
+ LPU64") timeout, simply cancel lock 0x%p\n",
+ req->rq_peer.peer_nid, lock);
+ ldlm_lock_cancel(lock);
+ rc = -ERESTART;
+ } else {
+ ldlm_del_waiting_lock(lock);
+ ldlm_failed_ast(lock, rc, "blocking");
+ }
+ } else if (rc) {
+ if (rc == -EINVAL)
+ CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+ "from blocking AST for lock %p--normal race\n",
+ req->rq_peer.peer_nid,
+ req->rq_repmsg->status, lock);
+ else if (rc == -ENOTCONN)
+ CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+ "from blocking AST for lock %p--this client was "
+ "probably rebooted while it held a lock, nothing"
+ " serious\n",req->rq_peer.peer_nid,
+ req->rq_repmsg->status, lock);
+ else
+ CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
+ "from blocking AST for lock %p\n",
+ req->rq_peer.peer_nid,
+ (req->rq_repmsg != NULL)?
+ req->rq_repmsg->status : 0,
+ lock);
+ LDLM_DEBUG(lock, "client sent rc %d rq_status %d from blocking "
+ "AST", rc, req->rq_status);
+ ldlm_lock_cancel(lock);
+ /* Server-side AST functions are called from ldlm_reprocess_all,
+ * which needs to be told to please restart its reprocessing. */
+ rc = -ERESTART;
+ }
ptlrpc_req_finished(req);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
rc = ptlrpc_queue_wait(req);
- if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "completion");
-
+ if ((rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) &&
+ !lock->l_export->exp_libclient) {
+ ldlm_del_waiting_lock(lock);
+ ldlm_failed_ast(lock, rc, "completion");
+ } else if (rc == -EINVAL) {
+ LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+ "lock");
+ } else if (rc) {
+ LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+ "completion AST", rc, req->rq_status);
+ ldlm_lock_cancel(lock);
+ /* Server-side AST functions are called from ldlm_reprocess_all,
+ * which needs to be told to please restart its reprocessing. */
+ rc = -ERESTART;
+ }
ptlrpc_req_finished(req);
RETURN(rc);
req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
rc = ptlrpc_queue_wait(req);
- if (rc == -ELDLM_NO_LOCK_DATA)
- LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
- else if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
- else
+ if ((rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) &&
+ !lock->l_export->exp_libclient) {
+ ldlm_del_waiting_lock(lock);
+ ldlm_failed_ast(lock, rc, "glimpse");
+ } else if (rc == -EINVAL) {
+ LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+ "lock");
+ } else if (rc == -ELDLM_NO_LOCK_DATA) {
+ LDLM_DEBUG(lock, "lost a race -- client has a lock, but no "
+ "inode");
+ } else if (rc) {
+ LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+ "glimpse AST", rc, req->rq_status);
+ } else {
rc = res->lr_namespace->ns_lvbo->lvbo_update
(res, req->rq_repmsg, 0, 1);
+ }
ptlrpc_req_finished(req);
RETURN(rc);
}
buffers = 2;
}
- if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
- GOTO(out, rc = -ENOMEM);
-
rc = lustre_pack_reply(req, buffers, size, NULL);
if (rc)
GOTO(out, rc);
err = lustre_pack_reply(req, 0, NULL, NULL);
if (rc == 0)
rc = err;
- req->rq_status = rc;
}
/* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
"(err=%d, rc=%d)", err, rc);
- if (lock->l_resource->lr_lvb_len > 0 && rc == 0) {
+ if (lock->l_resource->lr_lvb_len > 0) {
void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
lock->l_resource->lr_lvb_len);
- LASSERT(lvb != NULL);
memcpy(lvb, lock->l_resource->lr_lvb_data,
lock->l_resource->lr_lvb_len);
}
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
if (!lock) {
CERROR("received cancel for unknown lock cookie "LPX64
- " from client %s nid %s\n",
+ " from client %s nid "LPX64" (%s)\n",
dlm_req->lock_handle1.cookie,
req->rq_export->exp_client_uuid.uuid,
- ptlrpc_peernid2str(&req->rq_peer, str));
+ req->rq_peer.peer_nid,
+ portals_nid2str(req->rq_peer.peer_ni->pni_number,
+ req->rq_peer.peer_nid, str));
LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
"(cookie "LPU64")",
dlm_req->lock_handle1.cookie);
RETURN(0);
}
-void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+static void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock)
{
int do_ast;
ENTRY;
return ptlrpc_reply(req);
}
+#ifdef __KERNEL__
int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock *lock)
{
-#ifdef __KERNEL__
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
struct ldlm_bl_work_item *blwi;
ENTRY;
list_add_tail(&blwi->blwi_entry, &blp->blp_list);
wake_up(&blp->blp_waitq);
spin_unlock(&blp->blp_lock);
-#else
- LBUG();
-#endif
RETURN(0);
}
+#else
+/* XXX */
+void liblustre_ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock)
+{
+ ldlm_handle_bl_callback(ns, ld, lock);
+}
+#endif
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
if (req->rq_export == NULL) {
struct ldlm_request *dlm_req;
- CDEBUG(D_RPCTRACE, "operation %d from nid %s with bad "
+ CDEBUG(D_RPCTRACE, "operation %d from nid "LPX64" (%s) with bad "
"export cookie "LPX64" (ptl req %d/rep %d); this is "
"normal if this node rebooted with a lock held\n",
- req->rq_reqmsg->opc,
- ptlrpc_peernid2str(&req->rq_peer, str),
+ req->rq_reqmsg->opc, req->rq_peer.peer_nid,
+ portals_nid2str(req->rq_peer.peer_ni->pni_number,
+ req->rq_peer.peer_nid, str),
req->rq_reqmsg->handle.cookie,
req->rq_request_portal, req->rq_reply_portal);
rc = llog_origin_handle_next_block(req);
ldlm_callback_reply(req, rc);
RETURN(0);
- case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
- rc = llog_origin_handle_prev_block(req);
- ldlm_callback_reply(req, rc);
- RETURN(0);
case LLOG_ORIGIN_HANDLE_READ_HEADER:
OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
rc = llog_origin_handle_read_header(req);
rc = kernel_thread(ldlm_bl_thread_main, &bltd, 0);
if (rc < 0) {
CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
+ LBUG();
GOTO(out_thread, rc);
}
wait_for_completion(&blp->blp_comp);
rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
LDLM_NUM_THREADS, "ldlm_cn");
- if (rc)
+ if (rc) {
+ LBUG();
GOTO(out_thread, rc);
+ }
rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
LDLM_NUM_THREADS, "ldlm_cb");
- if (rc)
+ if (rc) {
+ LBUG();
GOTO(out_thread, rc);
+ }
INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
spin_lock_init(&expired_lock_thread.elt_lock);