for use in other places (such as llite).
- Replace many dangerous ptlrpc_free_req calls with its refcount-friendly
ptlrpc_req_finished counterpart.
- Remove excess and harmful p_req_finished in ll_file_open.
- Move recovd_conn_manage calls from client_obd_connect to ll_read_super,
since the recovery function is now llite-specific.
- Resurrect llite/recover.c for said function (coming soon).
- Rationalized the portals-owned refcount (added in ptl_send_rpc, removed in
request_out_callback).
- Free repmsg from free_req, not req_finished, in case one of the remaining
ref-holders has plans for it.
- Diagnostics for freed reqs with outstanding refcounts, to help track down
any remaining cases.
- Initialize c_recovd_data.rd_managed_chain at allocation time, for better karma.
- Marginally nicer dump_connection_list output, and LBUG check for ilooping.
- Add diagnostics for double-managing of connections, which turned out not to be
the bug I was chasing earlier, but might well be in the future.
- Set rd_phase and rd_next_phase before calling the recovery state-machine hooks,
so that adjustments within those hooks (such as calls to recovd_conn_fixed)
don't result in an inconsistent state.
- Failure to run the upcall no longer aborts recovery, so that an administrator
can manually salvage things.
- More complete diagnostics for request replay and sleeping.
extern struct recovd_obd *ptlrpc_recovd;
-int ll_recover(struct recovd_data *rd, int phase);
+int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn);
+int ptlrpc_reconnect_and_replay(struct ptlrpc_connection *conn);
#endif
}
if (!req_passed_in)
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
blocking);
EXIT;
out:
LDLM_LOCK_PUT(lock);
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
return rc;
}
rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
if (rc != ELDLM_OK)
GOTO(out, rc);
cli->cl_import.imp_handle.addr = request->rq_repmsg->addr;
cli->cl_import.imp_handle.cookie = request->rq_repmsg->cookie;
- recovd_conn_manage(c, ptlrpc_recovd, ll_recover);
-
EXIT;
out_req:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
if (rc) {
out_ldlm:
ldlm_namespace_free(obd->obd_namespace);
EXIT;
out_req:
if (request)
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
out_disco:
err = class_disconnect(conn);
if (!rc && err)
llite_SOURCES = dcache.c commit_callback.c super.c rw.c
llite_SOURCES += file.c dir.c sysctl.c namei.c symlink.c
-llite_SOURCES += lov_pack.c
+llite_SOURCES += lov_pack.c recover.c
lov_pack.c:
test -e lov_pack.c || ln -sf $(top_srcdir)/lib/lov_pack.c .
rc = ll_lock(inode, NULL, &it, &lockh);
request = (struct ptlrpc_request *)it.it_data;
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
if (rc != ELDLM_OK) {
CERROR("lock enqueue: err: %d\n", rc);
UnlockPage(page);
else
inode->i_size = body->size;
}
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
EXIT;
readpage_out:
rc = mdc_open(&sbi->ll_mdc_conn, inode->i_ino, S_IFREG | inode->i_mode,
file->f_flags, lsm, &fd->fd_mdshandle, &req);
fd->fd_req = req;
- ptlrpc_req_finished(req);
+
+ /* We don't call ptlrpc_req_finished here, because the request is
+ * preserved until we see a matching close, at which point it is
+ * released (and likely freed). (See ll_file_release.)
+ */
if (rc)
GOTO(out_req, -abs(rc));
if (!fd->fd_mdshandle.addr ||
valid |= OBD_MD_LINKNAME;
symlen = lic.lic_body->size;
}
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
request = NULL;
err = mdc_getattr(&sbi->ll_mdc_conn, ino, mode,
valid, symlen, &request);
out:
if (lmm)
OBD_FREE(lmm, mds_md_size);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return inode;
}
err = mdc_unlink(&sbi->ll_mdc_conn, dir, child, mode, name, len,
&request);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
RETURN(err);
}
err = mdc_link(&sbi->ll_mdc_conn, src, dir, name,
len, &request);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
RETURN(err);
}
err = mdc_rename(&sbi->ll_mdc_conn, src, tgt,
old->d_name.name, old->d_name.len,
new->d_name.name, new->d_name.len, &request);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
RETURN(err);
}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Lustre Lite recovery infrastructure.
+ *
+ * Copyright (C) 2002 Cluster File Systems Inc.
+ */
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <linux/lustre_lite.h>
+#include <linux/lustre_ha.h>
+
+static int ll_retry_recovery(struct ptlrpc_connection *conn)
+{
+ ENTRY;
+ RETURN(0);
+}
+
+int ll_recover(struct recovd_data *rd, int phase)
+{
+ struct ptlrpc_connection *conn = class_rd2conn(rd);
+
+ LASSERT(conn);
+ ENTRY;
+
+ switch (phase) {
+ case PTLRPC_RECOVD_PHASE_PREPARE:
+ RETURN(ptlrpc_run_recovery_upcall(conn));
+ case PTLRPC_RECOVD_PHASE_RECOVER:
+ RETURN(ptlrpc_reconnect_and_replay(conn));
+ case PTLRPC_RECOVD_PHASE_FAILURE:
+ RETURN(ll_retry_recovery(conn));
+ }
+
+ LBUG();
+ RETURN(-ENOSYS);
+}
mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection;
mdc_conn->c_level = LUSTRE_CONN_FULL;
list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain);
+ spin_lock(&ptlrpc_recovd->recovd_lock);
+ recovd_conn_manage(class_conn2export(&sbi->ll_mdc_conn)->exp_connection,
+ ptlrpc_recovd, ll_recover);
+ spin_unlock(&ptlrpc_recovd->recovd_lock);
obd = class_uuid2obd(osc);
if (!obd) {
CERROR("cannot connect to %s: rc = %d\n", osc, err);
GOTO(out_mdc, sb = NULL);
}
+ spin_lock(&ptlrpc_recovd->recovd_lock);
+ recovd_conn_manage(class_conn2export(&sbi->ll_osc_conn)->exp_connection,
+ ptlrpc_recovd, ll_recover);
+ spin_unlock(&ptlrpc_recovd->recovd_lock);
/* XXX: need to store the last_* values somewhere */
err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
rc = vfs_readlink(dentry, buffer, buflen, symname);
out:
up(&lli->lli_open_sem);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
RETURN(rc);
}
rc = vfs_follow_link(nd, symname);
out:
up(&lli->lli_open_sem);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
RETURN(rc);
}
}
out:
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
return rc;
out_disc:
EXIT;
out:
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
*ea = lsm;
EXIT;
out_req:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
out:
if (rc && !*ea)
OBD_FREE(lsm, oa->o_easize);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
INIT_LIST_HEAD(&request->rq_list);
/*
* This will be reduced once when the sender is finished (waiting for
- * reply, f.e.), once when the request has been committed and is
- * removed from the to-be-committed list, and once when portals is
- * finished with it and has called request_out_callback.
+ * reply, f.e.), and once when the request has been committed and is
+ * removed from the to-be-committed list.
+ *
+ * Also, the refcount will be increased in ptl_send_rpc immediately
+ * before we hand it off to portals, and there will be a corresponding
+ * decrease in request_out_cb (which is called to indicate that portals
+ * is finished with the request, and it can be safely freed).
*
* (Except in the DLM server case, where it will be dropped twice
* by the sender, and then the last time by request_out_callback.)
*/
- atomic_set(&request->rq_refcount, 3);
+ atomic_set(&request->rq_refcount, 2);
spin_lock(&conn->c_lock);
request->rq_xid = HTON__u32(++conn->c_xid_out);
if (request == NULL)
return;
- if (request->rq_repmsg != NULL) {
- OBD_FREE(request->rq_repmsg, request->rq_replen);
- request->rq_repmsg = NULL;
- request->rq_reply_md.start = NULL;
- }
-
if (atomic_dec_and_test(&request->rq_refcount))
ptlrpc_free_req(request);
}
return;
}
- if (request->rq_repmsg != NULL)
+ if (atomic_read(&request->rq_refcount) != 0) {
+ CERROR("freeing request %p (%d->%s:%d) with refcount %d\n",
+ request, request->rq_reqmsg->opc,
+ request->rq_connection->c_remote_uuid,
+ request->rq_import->imp_client->cli_request_portal,
+ request->rq_refcount);
+ /* LBUG(); */
+ }
+
+ if (request->rq_repmsg != NULL) {
OBD_FREE(request->rq_repmsg, request->rq_replen);
- if (request->rq_reqmsg != NULL)
+ request->rq_repmsg = NULL;
+ request->rq_reply_md.start = NULL;
+ }
+ if (request->rq_reqmsg != NULL) {
OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+ request->rq_reqmsg = NULL;
+ }
if (request->rq_connection) {
spin_lock(&request->rq_connection->c_lock);
(long long)req->rq_xid, (long long)req->rq_transno,
(long long)conn->c_last_committed);
if (atomic_dec_and_test(&req->rq_refcount)) {
- req->rq_import = NULL;
-
- /* We do this to prevent free_req deadlock. Restarting
- * after each removal is not so bad, as we are almost
- * always deleting the first item in the list.
+ /* We do this to prevent free_req deadlock.
+ * Restarting after each removal is not so bad, as we are
+ * almost always deleting the first item in the list.
+ *
+ * If we use a recursive lock here, we can skip the
+ * unlock/lock/restart sequence.
*/
spin_unlock(&conn->c_lock);
ptlrpc_free_req(req);
list_del_init(&req->rq_list);
req->rq_import = NULL;
spin_unlock(&conn->c_lock);
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
goto restart1;
}
restart2:
list_del_init(&req->rq_list);
req->rq_import = NULL;
spin_unlock(&conn->c_lock);
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
spin_lock(&conn->c_lock);
goto restart2;
}
req->rq_connection->c_remote_uuid,
req->rq_import->imp_client->cli_request_portal);
/* we'll get sent again, so balance 2nd request_out_callback */
- atomic_inc(&req->rq_refcount);
goto resend;
}
req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
- /* add a ref, which will again be balanced in request_out_callback */
- atomic_inc(&req->rq_refcount);
rc = ptl_send_rpc(req);
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
INIT_LIST_HEAD(&c->c_imports);
INIT_LIST_HEAD(&c->c_exports);
INIT_LIST_HEAD(&c->c_sb_chain);
+ INIT_LIST_HEAD(&c->c_recovd_data.rd_managed_chain);
atomic_set(&c->c_refcount, 0);
ptlrpc_connection_addref(c);
spin_lock_init(&c->c_lock);
LBUG();
}
+ /* this balances the atomic_inc in ptl_send_rpc */
ptlrpc_req_finished(req);
RETURN(1);
}
source_id.nid = request->rq_connection->c_peer.peer_nid;
source_id.pid = PTL_PID_ANY;
+ /* add a ref, which will be balanced in request_out_callback */
+ atomic_inc(&request->rq_refcount);
if (request->rq_replen != 0) {
/* request->rq_repmsg is set only when the reply comes in, in
#include <linux/lustre_ha.h>
#include <linux/obd_support.h>
+/* dump_connection_list, but shorter for nicer debugging logs */
+static void d_c_l(struct list_head *head)
+{
+ int sanity = 0;
+ struct list_head *tmp;
+
+ list_for_each(tmp, head) {
+ struct ptlrpc_connection *conn =
+ list_entry(tmp, struct ptlrpc_connection,
+ c_recovd_data.rd_managed_chain);
+ CDEBUG(D_HA, " %p = %s (%d/%d)\n", conn, conn->c_remote_uuid,
+ conn->c_recovd_data.rd_phase,
+ conn->c_recovd_data.rd_next_phase);
+ if (sanity++ > 50)
+ LBUG();
+ }
+}
+
+static void dump_lists(struct recovd_obd *recovd)
+{
+ CDEBUG(D_HA, "managed: \n");
+ d_c_l(&recovd->recovd_managed_items);
+ CDEBUG(D_HA, "troubled: \n");
+ d_c_l(&recovd->recovd_troubled_items);
+}
+
void recovd_conn_manage(struct ptlrpc_connection *conn,
struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
{
struct recovd_data *rd = &conn->c_recovd_data;
ENTRY;
+ if (!list_empty(&rd->rd_managed_chain)) {
+ if (rd->rd_recovd == recovd && rd->rd_recover == recover) {
+ CDEBUG(D_HA, "conn %p/%s already setup for recovery\n",
+ conn, conn->c_remote_uuid);
+ EXIT;
+ return;
+ }
+ CDEBUG(D_HA,
+ "conn %p/%s has recovery items %p/%p, making %p/%p\n",
+ conn, conn->c_remote_uuid, rd->rd_recovd, rd->rd_recover,
+ recovd, recover);
+ spin_lock(&rd->rd_recovd->recovd_lock);
+ list_del(&rd->rd_managed_chain);
+ spin_unlock(&rd->rd_recovd->recovd_lock);
+ }
+
rd->rd_recovd = recovd;
rd->rd_recover = recover;
rd->rd_phase = RD_IDLE;
rd->rd_next_phase = RD_TROUBLED;
spin_lock(&recovd->recovd_lock);
- INIT_LIST_HEAD(&rd->rd_managed_chain);
- list_add(&recovd->recovd_managed_items, &rd->rd_managed_chain);
+ list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
+ dump_lists(recovd);
spin_unlock(&recovd->recovd_lock);
EXIT;
return;
}
-
spin_lock(&recovd->recovd_lock);
if (rd->rd_phase != RD_IDLE) {
CERROR("connection %p to %s already in recovery\n",
list_del(&rd->rd_managed_chain);
list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
rd->rd_phase = RD_TROUBLED;
+ dump_lists(recovd);
spin_unlock(&recovd->recovd_lock);
wake_up(&recovd->recovd_waitq);
rd->rd_phase = RD_IDLE;
rd->rd_next_phase = RD_TROUBLED;
list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
+ dump_lists(rd->rd_recovd);
spin_unlock(&rd->rd_recovd->recovd_lock);
EXIT;
RETURN(rc);
}
-static void dump_connection_list(struct list_head *head)
-{
- struct list_head *tmp;
-
- list_for_each(tmp, head) {
- struct ptlrpc_connection *conn =
- list_entry(tmp, struct ptlrpc_connection,
- c_recovd_data.rd_managed_chain);
- CDEBUG(D_HA, " %p = %s (%d/%d)\n", conn, conn->c_remote_uuid,
- conn->c_recovd_data.rd_phase,
- conn->c_recovd_data.rd_next_phase);
- }
-}
-
static int recovd_handle_event(struct recovd_obd *recovd)
{
struct list_head *tmp, *n;
spin_lock(&recovd->recovd_lock);
- CERROR("managed: \n");
- dump_connection_list(&recovd->recovd_managed_items);
- CERROR("troubled: \n");
- dump_connection_list(&recovd->recovd_troubled_items);
+ dump_lists(recovd);
/*
* We use _safe here because one of the callbacks, expecially
CERROR("starting recovery for rd %p (conn %p)\n",
rd, class_rd2conn(rd));
rd->rd_phase = RD_PREPARING;
+ rd->rd_next_phase = RD_PREPARED;
spin_unlock(&recovd->recovd_lock);
rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
if (rc)
goto cb_failed;
- rd->rd_next_phase = RD_PREPARED;
break;
case RD_PREPARED:
- rd->rd_phase = RD_RECOVERING;
CERROR("recovery prepared for rd %p (conn %p)\n",
rd, class_rd2conn(rd));
+ rd->rd_phase = RD_RECOVERING;
+ rd->rd_next_phase = RD_RECOVERED;
spin_unlock(&recovd->recovd_lock);
rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
if (rc)
goto cb_failed;
- rd->rd_next_phase = RD_RECOVERED;
break;
case RD_RECOVERED:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Lustre Light Super operations
+ * Portal-RPC reconnection and replay operations, for use in recovery.
*
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
#include <linux/module.h>
#include <linux/kmod.h>
-#define DEBUG_SUBSYSTEM S_LLITE
+#define DEBUG_SUBSYSTEM S_RPC
-#include <linux/lustre_lite.h>
#include <linux/lustre_ha.h>
+#include <linux/lustre_net.h>
+#include <linux/obd.h>
-int ll_reconnect(struct ptlrpc_connection *conn)
+static int ptlrpc_reconnect(struct ptlrpc_connection *conn)
{
struct list_head *tmp;
int rc = -EINVAL;
return rc;
}
-static int ll_recover_upcall(struct ptlrpc_connection *conn)
+int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn)
{
char *argv[3];
char *envp[3];
rc = call_usermodehelper(argv[0], argv, envp);
if (rc < 0) {
- /*
- * Tragically, this will never be run, because call_umh doesn't
- * report errors like -ENOENT to its caller.
- */
- CERROR("Error invoking recovery upcall (%s): %d\n",
- obd_recovery_upcall, rc);
+ CERROR("Error invoking recovery upcall %s for %s: %d\n",
+ argv[0], argv[1], rc);
CERROR("Check /proc/sys/lustre/recovery_upcall?\n");
} else {
CERROR("Invoked upcall %s for connection %s\n",
argv[0], argv[1]);
}
- RETURN(rc);
+
+ /*
+ * We don't want to make this a "failed" recovery, because the system
+ * administrator -- or, perhaps, tester -- may well be able to rescue
+ * things by running the correct upcall.
+ */
+ RETURN(0);
}
-static int ll_recover_reconnect(struct ptlrpc_connection *conn)
+int ptlrpc_reconnect_and_replay(struct ptlrpc_connection *conn)
{
int rc = 0;
struct list_head *tmp, *pos;
ENTRY;
/* 1. reconnect */
- rc = ll_reconnect(conn);
+ rc = ptlrpc_reconnect(conn);
if (rc)
RETURN(rc);
/* replay what needs to be replayed */
if (req->rq_flags & PTL_RPC_FL_REPLAY) {
- CDEBUG(D_HA, "FL_REPLAY: xid "LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_reqmsg->opc,
+ CDEBUG(D_HA, "FL_REPLAY: xid "LPD64" transno "LPD64" op %d @ %d\n",
+ req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
req->rq_import->imp_client->cli_request_portal);
rc = ptlrpc_replay_req(req);
#if 0
/* server has seen req, we have reply: skip */
if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
req->rq_xid <= conn->c_last_xid) {
- CDEBUG(D_HA, "REPLIED SKIP: xid "LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_reqmsg->opc,
+ CDEBUG(D_HA, "REPLIED SKIP: xid "LPD64" transno "LPD64" op %d @ %d\n",
+ req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
req->rq_import->imp_client->cli_request_portal);
continue;
}
/* server has lost req, we have reply: resend, ign reply */
if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
req->rq_xid > conn->c_last_xid) {
- CDEBUG(D_HA, "REPLIED RESEND: xid "LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_reqmsg->opc,
+ CDEBUG(D_HA, "REPLIED RESEND: xid "LPD64" transno "LPD64" op %d @ %d\n",
+ req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
req->rq_import->imp_client->cli_request_portal);
rc = ptlrpc_replay_req(req);
if (rc) {
/* service has not seen req, no reply: resend */
if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) &&
req->rq_xid > conn->c_last_xid) {
- CDEBUG(D_HA, "RESEND: xid "LPD64" op %d @ %d\n",
- req->rq_xid, req->rq_reqmsg->opc,
+ CDEBUG(D_HA, "RESEND: xid "LPD64" transno "LPD64" op %d @ %d\n",
+ req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
req->rq_import->imp_client->cli_request_portal);
ptlrpc_resend_req(req);
}
spin_unlock(&conn->c_lock);
return rc;
}
-
-static int ll_retry_recovery(struct ptlrpc_connection *conn)
-{
- CERROR("Recovery has failed on conn %p\n", conn);
-#if 0
- /* XXX use a timer, sideshow bob */
- recovd_conn_fail(conn);
- /* XXX this is disabled until I fix it so that we don't just keep
- * XXX retrying in the case of a missing upcall.
- */
-#endif
- return 0;
-}
-
-int ll_recover(struct recovd_data *rd, int phase)
-{
- struct ptlrpc_connection *conn = class_rd2conn(rd);
-
- LASSERT(conn);
- ENTRY;
-
- switch (phase) {
- case PTLRPC_RECOVD_PHASE_PREPARE:
- RETURN(ll_recover_upcall(conn));
- case PTLRPC_RECOVD_PHASE_RECOVER:
- RETURN(ll_recover_reconnect(conn));
- case PTLRPC_RECOVD_PHASE_FAILURE:
- RETURN(ll_retry_recovery(conn));
- }
-
- LBUG();
- RETURN(-ENOSYS);
-}
EXPORT_SYMBOL(lustre_unpack_msg);
EXPORT_SYMBOL(lustre_msg_buf);
-EXPORT_SYMBOL(ll_recover);
-
+/* recover.c */
+EXPORT_SYMBOL(ptlrpc_run_recovery_upcall);
+EXPORT_SYMBOL(ptlrpc_reconnect_and_replay);
MODULE_AUTHOR("Cluster File Systems, Inc <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Request Processor v1.0");