/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2001-2003 Cluster File Systems, Inc.
* Author: Peter J. Braam <braam@clusterfs.com>
* Author: Phil Schwan <phil@clusterfs.com>
* Author: Mike Shaver <shaver@clusterfs.com>
#include <linux/lustre_net.h>
#include <linux/lustre_dlm.h>
-int target_handle_connect(struct ptlrpc_request *req)
+int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
+ struct obd_uuid *cluuid)
+{
+ if (exp->exp_connection) {
+ struct lustre_handle *hdl;
+ hdl = &exp->exp_ldlm_data.led_import.imp_handle;
+ /* Might be a re-connect after a partition. */
+ if (!memcmp(conn, hdl, sizeof *conn)) {
+ CERROR("%s reconnecting\n", cluuid->uuid);
+ conn->addr = (__u64) (unsigned long)exp;
+ conn->cookie = exp->exp_cookie;
+ RETURN(EALREADY);
+ } else {
+ CERROR("%s reconnecting from %s, "
+ "handle mismatch (ours "LPX64"/"LPX64", "
+ "theirs "LPX64"/"LPX64")\n", cluuid->uuid,
+ exp->exp_connection->c_remote_uuid.uuid,
+ hdl->addr,
+ hdl->cookie, conn->addr, conn->cookie);
+ /* XXX disconnect them here? */
+ memset(conn, 0, sizeof *conn);
+ /* This is a little scary, but right now we build this
+ * file separately into each server module, so I won't
+ * go _immediately_ to hell.
+ */
+ RETURN(-EALREADY);
+ }
+ }
+
+ conn->addr = (__u64) (unsigned long)exp;
+ conn->cookie = exp->exp_cookie;
+ CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", cluuid->uuid, exp);
+ CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
+ (long long)conn->addr, (long long)conn->cookie);
+ RETURN(0);
+}
+
+
+int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
{
struct obd_device *target;
- struct obd_export *export;
+ struct obd_export *export = NULL;
struct obd_import *dlmimp;
struct lustre_handle conn;
- char *tgtuuid, *cluuid;
+ struct obd_uuid tgtuuid;
+ struct obd_uuid cluuid;
+ struct list_head *p;
int rc, i;
ENTRY;
- tgtuuid = lustre_msg_buf(req->rq_reqmsg, 0);
if (req->rq_reqmsg->buflens[0] > 37) {
CERROR("bad target UUID for connect\n");
GOTO(out, rc = -EINVAL);
}
+ obd_str2uuid(&tgtuuid, lustre_msg_buf(req->rq_reqmsg, 0));
- cluuid = lustre_msg_buf(req->rq_reqmsg, 1);
if (req->rq_reqmsg->buflens[1] > 37) {
CERROR("bad client UUID for connect\n");
GOTO(out, rc = -EINVAL);
}
+ obd_str2uuid(&cluuid, lustre_msg_buf(req->rq_reqmsg, 1));
- i = class_uuid2dev(tgtuuid);
+ i = class_uuid2dev(&tgtuuid);
if (i == -1) {
- CERROR("UUID '%s' not found for connect\n", tgtuuid);
+ CERROR("UUID '%s' not found for connect\n", tgtuuid.uuid);
GOTO(out, rc = -ENODEV);
}
if (!target)
GOTO(out, rc = -ENODEV);
+ spin_lock_bh(&target->obd_processing_task_lock);
+ if (target->obd_flags & OBD_ABORT_RECOVERY)
+ target_abort_recovery(target);
+ spin_unlock_bh(&target->obd_processing_task_lock);
+
conn.addr = req->rq_reqmsg->addr;
conn.cookie = req->rq_reqmsg->cookie;
- rc = obd_connect(&conn, target, cluuid, ptlrpc_recovd,
- target_revoke_connection);
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
GOTO(out, rc);
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc)
+ /* lctl gets a backstage, all-access pass. */
+ if (!strcmp(cluuid.uuid, "OBD_CLASS_UUID"))
+ goto dont_check_exports;
+
+ spin_lock(&target->obd_dev_lock);
+ list_for_each(p, &target->obd_exports) {
+ export = list_entry(p, struct obd_export, exp_obd_chain);
+ if (!memcmp(&cluuid, &export->exp_client_uuid,
+ sizeof(export->exp_client_uuid))) {
+ spin_unlock(&target->obd_dev_lock);
+ LASSERT(export->exp_obd == target);
+
+ rc = target_handle_reconnect(&conn, export, &cluuid);
+ break;
+ }
+ export = NULL;
+ }
+ /* If we found an export, we already unlocked. */
+ if (!export)
+ spin_unlock(&target->obd_dev_lock);
+
+ /* Tell the client if we're in recovery. */
+ /* If this is the first client, start the recovery timer */
+ if (target->obd_flags & OBD_RECOVERING) {
+ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
+ target_start_recovery_timer(target, handler);
+ }
+
+ /* Tell the client if we support replayable requests */
+ if (target->obd_flags & OBD_REPLAYABLE)
+ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
+
+ if (!export) {
+ if (target->obd_flags & OBD_RECOVERING) {
+ CERROR("denying connection for new client %s: "
+ "in recovery\n", cluuid.uuid);
+ rc = -EBUSY;
+ } else {
+ dont_check_exports:
+ rc = obd_connect(&conn, target, &cluuid, ptlrpc_recovd,
+ target_revoke_connection);
+ }
+ }
+
+ /* If all else goes well, this is our RPC return code. */
+ req->rq_status = 0;
+
+ if (rc && rc != EALREADY)
GOTO(out, rc);
+
req->rq_repmsg->addr = conn.addr;
req->rq_repmsg->cookie = conn.cookie;
LASSERT(export);
req->rq_export = export;
- export->exp_connection = ptlrpc_get_connection(&req->rq_peer, cluuid);
+ export->exp_connection = ptlrpc_get_connection(&req->rq_peer, &cluuid);
if (req->rq_connection != NULL)
ptlrpc_put_connection(req->rq_connection);
req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
+ if (rc == EALREADY) {
+ /* We indicate the reconnection in a flag, not an error code. */
+ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
+ GOTO(out, rc = 0);
+ }
+
spin_lock(&export->exp_connection->c_lock);
list_add(&export->exp_conn_chain, &export->exp_connection->c_exports);
spin_unlock(&export->exp_connection->c_lock);
dlmimp = &export->exp_ldlm_data.led_import;
dlmimp->imp_connection = req->rq_connection;
- dlmimp->imp_client = &export->exp_obd->obd_ldlm_cb_client;
+ dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
dlmimp->imp_obd = /* LDLM! */ NULL;
-
- req->rq_connection->c_level = LUSTRE_CONN_FULL;
+ dlmimp->imp_recover = NULL;
+ INIT_LIST_HEAD(&dlmimp->imp_replay_list);
+ INIT_LIST_HEAD(&dlmimp->imp_sending_list);
+ INIT_LIST_HEAD(&dlmimp->imp_delayed_list);
+ spin_lock_init(&dlmimp->imp_lock);
+ dlmimp->imp_level = LUSTRE_CONN_FULL;
out:
- req->rq_status = rc;
+ if (rc)
+ req->rq_status = rc;
RETURN(rc);
}
RETURN(rc);
req->rq_status = obd_disconnect(conn);
-
+ req->rq_export = NULL;
RETURN(0);
}
list_for_each_safe(expiter, n, &conn->c_exports) {
exp = list_entry(expiter, struct obd_export, exp_conn_chain);
+ CDEBUG(D_HA, "disconnecting export %p/%s\n",
+ exp, exp->exp_client_uuid.uuid);
hdl.addr = (__u64)(unsigned long)exp;
hdl.cookie = exp->exp_cookie;
rc = obd_disconnect(&hdl);
{
ENTRY;
- conn->c_level = LUSTRE_CONN_RECOVD;
conn->c_recovd_data.rd_phase = RD_PREPARED;
RETURN(0);
int target_revoke_connection(struct recovd_data *rd, int phase)
{
struct ptlrpc_connection *conn = class_rd2conn(rd);
-
+
LASSERT(conn);
ENTRY;
LBUG();
RETURN(-ENOSYS);
}
+
+/*
+ * Recovery functions
+ */
+
+static void abort_delayed_replies(struct obd_device *obd)
+{
+ struct ptlrpc_request *req;
+ struct list_head *tmp, *n;
+ list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ DEBUG_REQ(D_ERROR, req, "aborted:");
+ req->rq_status = -ENOTCONN;
+ req->rq_type = PTL_RPC_MSG_ERR;
+ ptlrpc_reply(req->rq_svc, req);
+ list_del(&req->rq_list);
+ OBD_FREE(req, sizeof *req);
+ }
+}
+
+void target_abort_recovery(void *data)
+{
+ struct obd_device *obd = data;
+ CERROR("disconnecting clients and aborting recovery\n");
+ obd->obd_recoverable_clients = 0;
+ obd->obd_flags &= ~(OBD_RECOVERING | OBD_ABORT_RECOVERY);
+ abort_delayed_replies(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ class_disconnect_all(obd);
+ spin_lock_bh(&obd->obd_processing_task_lock);
+}
+
+static void target_recovery_expired(unsigned long castmeharder)
+{
+ struct obd_device *obd = (struct obd_device *)castmeharder;
+ CERROR("recovery timed out, aborting\n");
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ obd->obd_flags |= OBD_ABORT_RECOVERY;
+ wake_up(&obd->obd_next_transno_waitq);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+}
+
+static void reset_recovery_timer(struct obd_device *obd)
+{
+ CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
+ OBD_RECOVERY_TIMEOUT / HZ);
+ mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
+}
+
+
+/* Only start it the first time called */
+void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
+{
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_recovery_handler) {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ return;
+ }
+ CERROR("%s: starting recovery timer\n", obd->obd_name);
+ obd->obd_recovery_handler = handler;
+ obd->obd_recovery_timer.function = target_recovery_expired;
+ obd->obd_recovery_timer.data = (unsigned long)obd;
+ init_timer(&obd->obd_recovery_timer);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ reset_recovery_timer(obd);
+}
+
+static void cancel_recovery_timer(struct obd_device *obd)
+{
+ del_timer(&obd->obd_recovery_timer);
+}
+
+static int check_for_next_transno(struct obd_device *obd)
+{
+ struct ptlrpc_request *req;
+ req = list_entry(obd->obd_recovery_queue.next,
+ struct ptlrpc_request, rq_list);
+ LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
+
+ return req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
+ (obd->obd_flags & OBD_RECOVERING) == 0;
+}
+
+static void process_recovery_queue(struct obd_device *obd)
+{
+ struct ptlrpc_request *req;
+ int aborted = 0;
+ ENTRY;
+
+ for (;;) {
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ LASSERT(obd->obd_processing_task == current->pid);
+ req = list_entry(obd->obd_recovery_queue.next,
+ struct ptlrpc_request, rq_list);
+
+ if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
+ LPD64")\n",
+ obd->obd_next_recovery_transno,
+ req->rq_reqmsg->transno);
+ wait_event(obd->obd_next_transno_waitq,
+ check_for_next_transno(obd));
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_flags & OBD_ABORT_RECOVERY) {
+ target_abort_recovery(obd);
+ aborted = 1;
+ }
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ if (aborted)
+ return;
+ continue;
+ }
+ list_del_init(&req->rq_list);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ DEBUG_REQ(D_ERROR, req, "processing: ");
+ (void)obd->obd_recovery_handler(req);
+ reset_recovery_timer(obd);
+#warning FIXME: mds_fsync_super(mds->mds_sb);
+ OBD_FREE(req, sizeof *req);
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ obd->obd_next_recovery_transno++;
+ if (list_empty(&obd->obd_recovery_queue)) {
+ obd->obd_processing_task = 0;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ break;
+ }
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ }
+ EXIT;
+}
+
+int target_queue_recovery_request(struct ptlrpc_request *req,
+ struct obd_device *obd)
+{
+ struct list_head *tmp;
+ int inserted = 0;
+ __u64 transno = req->rq_reqmsg->transno;
+ struct ptlrpc_request *saved_req;
+
+ if (!transno) {
+ INIT_LIST_HEAD(&req->rq_list);
+ DEBUG_REQ(D_HA, req, "not queueing");
+ return 1;
+ }
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+
+ if (obd->obd_processing_task == current->pid) {
+ /* Processing the queue right now, don't re-add. */
+ LASSERT(list_empty(&req->rq_list));
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ return 1;
+ }
+
+ OBD_ALLOC(saved_req, sizeof *saved_req);
+ if (!saved_req)
+ LBUG();
+ memcpy(saved_req, req, sizeof *req);
+ req = saved_req;
+ INIT_LIST_HEAD(&req->rq_list);
+
+ /* XXX O(n^2) */
+ list_for_each(tmp, &obd->obd_recovery_queue) {
+ struct ptlrpc_request *reqiter =
+ list_entry(tmp, struct ptlrpc_request, rq_list);
+
+ if (reqiter->rq_reqmsg->transno > transno) {
+ list_add_tail(&req->rq_list, &reqiter->rq_list);
+ inserted = 1;
+ break;
+ }
+ }
+
+ if (!inserted) {
+ list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
+ }
+
+ if (obd->obd_processing_task != 0) {
+ /* Someone else is processing this queue, we'll leave it to
+ * them.
+ */
+ if (transno == obd->obd_next_recovery_transno)
+ wake_up(&obd->obd_next_transno_waitq);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ return 0;
+ }
+
+ /* Nobody is processing, and we know there's (at least) one to process
+ * now, so we'll do the honours.
+ */
+ obd->obd_processing_task = current->pid;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ process_recovery_queue(obd);
+ return 0;
+}
+
+struct obd_device * target_req2obd(struct ptlrpc_request *req)
+{
+ return req->rq_export->exp_obd;
+}
+
+int target_queue_final_reply(struct ptlrpc_request *req, int rc)
+{
+ struct obd_device *obd = target_req2obd(req);
+ struct ptlrpc_request *saved_req;
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (rc) {
+ /* Just like ptlrpc_error, but without the sending. */
+ lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ req->rq_type = PTL_RPC_MSG_ERR;
+ }
+
+ LASSERT(list_empty(&req->rq_list));
+ OBD_ALLOC(saved_req, sizeof *saved_req);
+ memcpy(saved_req, req, sizeof *saved_req);
+ req = saved_req;
+ list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
+ if (--obd->obd_recoverable_clients == 0) {
+ struct list_head *tmp, *n;
+ ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
+ CDEBUG(D_ERROR,
+ "all clients recovered, sending delayed replies\n");
+ obd->obd_flags &= ~OBD_RECOVERING;
+ list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ DEBUG_REQ(D_ERROR, req, "delayed:");
+ ptlrpc_reply(req->rq_svc, req);
+ list_del(&req->rq_list);
+ OBD_FREE(req, sizeof *req);
+ }
+ cancel_recovery_timer(obd);
+ } else {
+ CERROR("%d recoverable clients remain\n",
+ obd->obd_recoverable_clients);
+ }
+
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ return 1;
+}