From 460195d84766cbb423cc56ef6cee9228f528d439 Mon Sep 17 00:00:00 2001 From: shaver Date: Sun, 1 Dec 2002 03:47:10 +0000 Subject: [PATCH] Landing b_lock_replay so that Phil can use my ldlm iterators and whatnot for his upcoming locking work. Highlights: - lock replay infrastructure (needs much more testing, but didn't regress anything outside recovery) - b=421: ldlm iterators - b=348: imports now have service levels, replacing connections' c_level - replace c_delayed_head with imp_delayed_list - split imp_request_list into imp_replay_list for retained requests and imp_sending_list for inflight reqs - as a side-effect, clean up rq_refcount story materially - client-side recovery is now dispatched via a per-import handler function, for better layering and modularity - wire imports up to recovery before attempting mounts, for better handling of mount-time failures --- lustre/include/linux/lustre_dlm.h | 22 +++- lustre/include/linux/lustre_ha.h | 6 +- lustre/include/linux/lustre_import.h | 14 ++- lustre/include/linux/lustre_net.h | 1 - lustre/ldlm/ldlm_lockd.c | 10 +- lustre/ldlm/ldlm_request.c | 154 ++++++++++++++++++++++++++ lustre/lib/client.c | 9 +- lustre/lib/target.c | 4 +- lustre/llite/recover.c | 169 +++-------------------------- lustre/llite/super.c | 2 - lustre/llite/super25.c | 2 - lustre/mdc/mdc_request.c | 58 +++++++++- lustre/osc/osc_request.c | 106 +++++++++++++++++- lustre/ptlrpc/client.c | 159 +++++++++------------------ lustre/ptlrpc/connection.c | 1 - lustre/ptlrpc/recover.c | 205 ++++++++++++++++------------------- lustre/ptlrpc/rpc.c | 2 + 17 files changed, 524 insertions(+), 400 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 3c7367f..b52dfca 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -150,13 +150,13 @@ struct ldlm_lock { ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; - ldlm_completion_callback l_completion_ast; - ldlm_blocking_callback l_blocking_ast; + ldlm_completion_callback l_completion_ast; + ldlm_blocking_callback l_blocking_ast; struct obd_export *l_export; struct lustre_handle *l_connh; __u32 l_flags; - struct lustre_handle l_remote_handle; + struct lustre_handle l_remote_handle; void *l_data; __u32 l_data_len; struct ldlm_extent l_extent; @@ -280,6 +280,22 @@ do { \ #define LDLM_DEBUG_NOLOCK(format, a...) \ CDEBUG(D_DLMTRACE, "### " format "\n" , ## a) +/* + * Iterators. + */ + +#define LDLM_ITER_CONTINUE 0 /* keep iterating */ +#define LDLM_ITER_STOP 1 /* stop iterating */ + +typedef int (*ldlm_iterator_t)(struct ldlm_lock *, void *); + +int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, + void *closure); +int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter, + void *closure); + +int ldlm_replay_locks(struct obd_import *imp); + /* ldlm_extent.c */ int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *); int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, int flags, diff --git a/lustre/include/linux/lustre_ha.h b/lustre/include/linux/lustre_ha.h index 8611e88..1e6596b 100644 --- a/lustre/include/linux/lustre_ha.h +++ b/lustre/include/linux/lustre_ha.h @@ -54,6 +54,8 @@ extern struct recovd_obd *ptlrpc_recovd; int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn); int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc); -int ptlrpc_replay(struct obd_import *imp, int unreplied_only); - +int ptlrpc_replay(struct obd_import *imp, int send_last_flag); +int ptlrpc_resend(struct obd_import *imp); +void ptlrpc_free_committed(struct obd_import *imp); +void ptlrpc_wake_delayed(struct obd_import *imp); #endif diff --git a/lustre/include/linux/lustre_import.h b/lustre/include/linux/lustre_import.h index 893fd0ae..0f0d67d 100644 --- a/lustre/include/linux/lustre_import.h +++ b/lustre/include/linux/lustre_import.h @@ -15,13 +15,23 @@ #define IMP_INVALID 1 #define IMP_REPLAYABLE 2 +typedef int (*import_recover_t)(struct obd_import *imp, int phase); + #include struct obd_import { + import_recover_t imp_recover; struct ptlrpc_connection *imp_connection; struct ptlrpc_client *imp_client; struct lustre_handle imp_handle; struct list_head imp_chain; - struct list_head imp_request_list; + + /* Lists of requests that are retained for replay, waiting for a reply, + * or waiting for recovery to complete, respectively. + */ + struct list_head imp_replay_list; + struct list_head imp_sending_list; + struct list_head imp_delayed_list; + struct obd_device *imp_obd; int imp_flags; int imp_level; @@ -30,7 +40,7 @@ struct obd_import { __u64 imp_peer_last_xid; __u64 imp_peer_committed_transno; - /* Protects flags, level, *_xid, request_list */ + /* Protects flags, level, *_xid, *_list */ spinlock_t imp_lock; }; diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index d3aa4fd..ab8ee9a 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -72,7 +72,6 @@ struct ptlrpc_connection { __u8 c_local_uuid[37]; /* XXX do we need this? */ __u8 c_remote_uuid[37]; - int c_level; __u32 c_generation; /* changes upon new connection */ __u32 c_epoch; /* changes when peer changes */ __u32 c_bootcount; /* peer's boot count */ diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 12af650..da675c9 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -157,8 +157,7 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock, ldlm_add_waiting_lock(lock); (void)ptl_send_rpc(req); - /* no commit, and no waiting for reply, so 2x decref now */ - ptlrpc_req_finished(req); + /* not waiting for reply */ ptlrpc_req_finished(req); RETURN(rc); @@ -191,8 +190,8 @@ static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags) req->rq_replen = 0; /* no reply needed */ (void)ptl_send_rpc(req); - /* no commit, and no waiting for reply, so 2x decref now */ - ptlrpc_req_finished(req); + + /* not waiting for reply */ ptlrpc_req_finished(req); RETURN(rc); @@ -778,6 +777,9 @@ EXPORT_SYMBOL(ldlm_namespace_cleanup); EXPORT_SYMBOL(ldlm_namespace_free); EXPORT_SYMBOL(ldlm_namespace_dump); EXPORT_SYMBOL(ldlm_cancel_locks_for_export); +EXPORT_SYMBOL(ldlm_replay_locks); +EXPORT_SYMBOL(ldlm_resource_foreach); +EXPORT_SYMBOL(ldlm_namespace_foreach); EXPORT_SYMBOL(l_lock); EXPORT_SYMBOL(l_unlock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 6672c3e..130f092 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -646,3 +646,157 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id, return ELDLM_OK; } + +/* Lock iterators. */ + +int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, + void *closure) +{ + struct list_head *tmp, *next; + struct ldlm_lock *lock; + int rc = LDLM_ITER_CONTINUE; + struct ldlm_namespace *ns = res->lr_namespace; + + ENTRY; + + if (!res) + RETURN(LDLM_ITER_CONTINUE); + + l_lock(&ns->ns_lock); + list_for_each_safe(tmp, next, &res->lr_granted) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (iter(lock, closure) == LDLM_ITER_STOP) + GOTO(out, rc = LDLM_ITER_STOP); + } + + list_for_each_safe(tmp, next, &res->lr_converting) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (iter(lock, closure) == LDLM_ITER_STOP) + GOTO(out, rc = LDLM_ITER_STOP); + } + + list_for_each_safe(tmp, next, &res->lr_waiting) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (iter(lock, closure) == LDLM_ITER_STOP) + GOTO(out, rc = LDLM_ITER_STOP); + } + out: + l_unlock(&ns->ns_lock); + RETURN(rc); +} + +struct iter_helper_data { + ldlm_iterator_t iter; + void *closure; +}; + +static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure) +{ + struct iter_helper_data *helper = closure; + return helper->iter(lock, helper->closure); +} + +int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter, + void *closure) +{ + int i, rc = LDLM_ITER_CONTINUE; + struct iter_helper_data helper = { iter: iter, closure: closure }; + + l_lock(&ns->ns_lock); + for (i = 0; i < RES_HASH_SIZE; i++) { + struct list_head *tmp, *next; + list_for_each_safe(tmp, next, &(ns->ns_hash[i])) { + struct ldlm_resource *res = + list_entry(tmp, struct ldlm_resource, lr_hash); + + ldlm_resource_getref(res); + rc = ldlm_resource_foreach(res, ldlm_iter_helper, + &helper); + ldlm_resource_put(res); + if (rc == LDLM_ITER_STOP) + GOTO(out, rc); + } + } + out: + l_unlock(&ns->ns_lock); + RETURN(rc); +} + +/* Lock replay */ + +static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure) +{ + struct list_head *list = closure; + + /* we use l_pending_chain here, because it's unused on clients. */ + list_add(&lock->l_pending_chain, list); + return LDLM_ITER_CONTINUE; +} + +static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock, + int last) +{ + struct ptlrpc_request *req; + struct ldlm_request *body; + struct ldlm_reply *reply; + int rc, size; + int flags = LDLM_FL_REPLAY; + + flags |= lock->l_flags & + (LDLM_FL_BLOCK_GRANTED|LDLM_FL_BLOCK_CONV|LDLM_FL_BLOCK_WAIT); + + size = sizeof(*body); + req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL); + if (!req) + RETURN(-ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + ldlm_lock2desc(lock, &body->lock_desc); + body->lock_flags = flags; + + ldlm_lock2handle(lock, &body->lock_handle1); + size = sizeof(*reply); + req->rq_replen = lustre_msg_size(1, &size); + + if (last) + req->rq_reqmsg->flags |= MSG_LAST_REPLAY; + + LDLM_DEBUG(lock, "replaying lock:"); + rc = ptlrpc_queue_wait(req); + if (rc != ELDLM_OK) + GOTO(out, rc); + + reply = lustre_msg_buf(req->rq_repmsg, 0); + memcpy(&lock->l_remote_handle, &reply->lock_handle, + sizeof(lock->l_remote_handle)); + LDLM_DEBUG(lock, "replayed lock:"); + out: + ptlrpc_req_finished(req); + RETURN(rc); +} + +int ldlm_replay_locks(struct obd_import *imp) +{ + struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; + struct list_head list, *pos, *next; + struct ldlm_lock *lock; + int rc = 0; + + ENTRY; + INIT_LIST_HEAD(&list); + + l_lock(&ns->ns_lock); + (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list); + + list_for_each_safe(pos, next, &list) { + lock = list_entry(pos, struct ldlm_lock, l_pending_chain); + rc = replay_one_lock(imp, lock, (next == &list)); + if (rc) + break; /* or try to do the rest? */ + } + l_unlock(&ns->ns_lock); + RETURN(rc); +} diff --git a/lustre/lib/client.c b/lustre/lib/client.c index bf5fac3..00b0d48 100644 --- a/lustre/lib/client.c +++ b/lustre/lib/client.c @@ -90,7 +90,9 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) if (!imp->imp_connection) RETURN(-ENOENT); - INIT_LIST_HEAD(&imp->imp_request_list); + INIT_LIST_HEAD(&imp->imp_replay_list); + INIT_LIST_HEAD(&imp->imp_sending_list); + INIT_LIST_HEAD(&imp->imp_delayed_list); spin_lock_init(&imp->imp_lock); ptlrpc_init_client(rq_portal, rp_portal, name, @@ -161,16 +163,17 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd, request->rq_reqmsg->cookie = conn->cookie; c = class_conn2export(conn)->exp_connection = ptlrpc_connection_addref(request->rq_connection); + list_add(&imp->imp_chain, &c->c_imports); recovd_conn_manage(c, recovd, recover); + imp->imp_level = LUSTRE_CONN_CON; rc = ptlrpc_queue_wait(request); if (rc) GOTO(out_req, rc); if (rq_opc == MDS_CONNECT) imp->imp_flags |= IMP_REPLAYABLE; - list_add(&imp->imp_chain, &c->c_imports); - c->c_level = LUSTRE_CONN_FULL; + imp->imp_level = LUSTRE_CONN_FULL; imp->imp_handle.addr = request->rq_repmsg->addr; imp->imp_handle.cookie = request->rq_repmsg->cookie; diff --git a/lustre/lib/target.c b/lustre/lib/target.c index 7666663..141e155 100644 --- a/lustre/lib/target.c +++ b/lustre/lib/target.c @@ -101,8 +101,7 @@ int target_handle_connect(struct ptlrpc_request *req) dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie; dlmimp->imp_obd = /* LDLM! */ NULL; spin_lock_init(&dlmimp->imp_lock); - - req->rq_connection->c_level = LUSTRE_CONN_FULL; + dlmimp->imp_level = LUSTRE_CONN_FULL; out: req->rq_status = rc; RETURN(rc); @@ -149,7 +148,6 @@ static int target_fence_failed_connection(struct ptlrpc_connection *conn) { ENTRY; - conn->c_level = LUSTRE_CONN_RECOVD; conn->c_recovd_data.rd_phase = RD_PREPARED; RETURN(0); diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c index 8acd1bb..3310c34 100644 --- a/lustre/llite/recover.c +++ b/lustre/llite/recover.c @@ -12,7 +12,6 @@ #include #include #include -#include /* for IOC_LOV_SET_OSC_ACTIVE */ static int ll_retry_recovery(struct ptlrpc_connection *conn) { @@ -20,167 +19,33 @@ static int ll_retry_recovery(struct ptlrpc_connection *conn) RETURN(0); } -/* XXX looks a lot like super.c:invalidate_request_list, don't it? */ -static void abort_inflight_for_import(struct obd_import *imp) -{ - struct list_head *tmp, *n; - - /* Make sure that no new requests get processed for this import. - * ptlrpc_queue_wait must (and does) hold c_lock while testing this - * flags and then putting requests on sending_head or delayed_head. - */ - spin_lock(&imp->imp_connection->c_lock); - imp->imp_flags |= IMP_INVALID; - spin_unlock(&imp->imp_connection->c_lock); - - list_for_each_safe(tmp, n, &imp->imp_request_list) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_list); - - if (req->rq_flags & PTL_RPC_FL_REPLIED) { - /* no need to replay, just discard */ - DEBUG_REQ(D_ERROR, req, "uncommitted"); - ptlrpc_req_finished(req); - } else { - DEBUG_REQ(D_ERROR, req, "inflight"); - req->rq_flags |= PTL_RPC_FL_ERR; - wake_up(&req->rq_wait_for_rep); - } - } - - list_for_each_safe(tmp, n, &imp->imp_connection->c_delayed_head) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_list); - - if (req->rq_import != imp) - continue; - - DEBUG_REQ(D_ERROR, req, "aborting waiting req"); - req->rq_flags |= PTL_RPC_FL_ERR; - wake_up(&req->rq_wait_for_rep); - } -} - -static void set_osc_active(struct obd_import *imp, int active) -{ - struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov; - - if (notify_obd == NULL) - return; - - /* How gross is _this_? */ - if (!list_empty(¬ify_obd->obd_exports)) { - int rc; - struct lustre_handle fakeconn; - struct obd_ioctl_data ioc_data; - struct obd_export *exp = - list_entry(notify_obd->obd_exports.next, - struct obd_export, exp_obd_chain); - - fakeconn.addr = (__u64)(unsigned long)exp; - fakeconn.cookie = exp->exp_cookie; - ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid; - ioc_data.ioc_offset = active; - rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn, - sizeof ioc_data, &ioc_data, NULL); - if (rc) - CERROR("disabling %s on LOV %p/%s: %d\n", - imp->imp_obd->obd_uuid, notify_obd, - notify_obd->obd_uuid, rc); - } else { - CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about " - "%p\n", notify_obd, notify_obd->obd_uuid, - imp->imp_obd->obd_uuid); - } -} - -static void prepare_osc(struct obd_import *imp) -{ - struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; - - CDEBUG(D_HA, "invalidating all locks for OST imp %p (to %s):\n", - imp, imp->imp_connection->c_remote_uuid); - ldlm_namespace_dump(ns); - ldlm_namespace_cleanup(ns, 1 /* no network ops */); - - abort_inflight_for_import(imp); - - set_osc_active(imp, 0 /* inactive */); -} - -static void prepare_mdc(struct obd_import *imp) -{ - struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; - ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY); -} - -static int ll_prepare_recovery(struct ptlrpc_connection *conn) -{ - struct list_head *tmp; - - list_for_each(tmp, &conn->c_imports) { - struct obd_import *imp = list_entry(tmp, struct obd_import, - imp_chain); - - if (imp->imp_obd->obd_type->typ_ops->o_brw) - prepare_osc(imp); - else - prepare_mdc(imp); - } - - return ptlrpc_run_recovery_upcall(conn); -} - -static void reconnect_osc(struct obd_import *imp) -{ - int rc = ptlrpc_reconnect_import(imp, OST_CONNECT); - if (rc == 0) - set_osc_active(imp, 1 /* active */); - else - CDEBUG(D_HA, "reconnection failed, not reactivating OSC %s\n", - imp->imp_obd->obd_uuid); -} - -static void reconnect_mdc(struct obd_import *imp) -{ - int rc = ptlrpc_reconnect_import(imp, MDS_CONNECT); - if (!rc) - ptlrpc_replay(imp, 0 /* all reqs */); - else if (rc == EALREADY) - ptlrpc_replay(imp, 1 /* only unreplied reqs */); -} - -static int ll_reconnect(struct ptlrpc_connection *conn) -{ - struct list_head *tmp; - - ENTRY; - list_for_each(tmp, &conn->c_imports) { - struct obd_import *imp = list_entry(tmp, struct obd_import, - imp_chain); - if (imp->imp_obd->obd_type->typ_ops->o_brw) { - reconnect_osc(imp); - } else { - reconnect_mdc(imp); - } - } - - conn->c_level = LUSTRE_CONN_FULL; - RETURN(0); -} - int ll_recover(struct recovd_data *rd, int phase) { struct ptlrpc_connection *conn = class_rd2conn(rd); + struct list_head *tmp; LASSERT(conn); ENTRY; switch (phase) { case PTLRPC_RECOVD_PHASE_PREPARE: - RETURN(ll_prepare_recovery(conn)); case PTLRPC_RECOVD_PHASE_RECOVER: - RETURN(ll_reconnect(conn)); + list_for_each(tmp, &conn->c_imports) { + struct obd_import *imp = + list_entry(tmp, struct obd_import, imp_chain); + + if (phase == PTLRPC_RECOVD_PHASE_PREPARE) { + spin_lock(&imp->imp_lock); + imp->imp_level = LUSTRE_CONN_RECOVD; + spin_unlock(&imp->imp_lock); + } + imp->imp_recover(imp, phase); + } + + if (phase == PTLRPC_RECOVD_PHASE_PREPARE) + RETURN(ptlrpc_run_recovery_upcall(conn)); + RETURN(0); + case PTLRPC_RECOVD_PHASE_FAILURE: RETURN(ll_retry_recovery(conn)); } diff --git a/lustre/llite/super.c b/lustre/llite/super.c index cb7136c..cb3ae90 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -152,9 +152,7 @@ static struct super_block * ll_read_super(struct super_block *sb, GOTO(out_free, sb = NULL); } -#warning Mike: is this the right place to raise the connection level? mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection; - mdc_conn->c_level = LUSTRE_CONN_FULL; list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain); obd = class_uuid2obd(osc); diff --git a/lustre/llite/super25.c b/lustre/llite/super25.c index 30b582c..cd6544a 100644 --- a/lustre/llite/super25.c +++ b/lustre/llite/super25.c @@ -153,9 +153,7 @@ static int ll_fill_super(struct super_block *sb, void *data, int silent) GOTO(out_free, sb = NULL); } -#warning Peter: is this the right place to raise the connection level? mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection; - mdc_conn->c_level = LUSTRE_CONN_FULL; list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain); obd = class_uuid2obd(osc); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index a9a5d9a..5058ef0 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -619,21 +619,73 @@ out: return rc; } -int mdc_attach(struct obd_device *dev, obd_count len, void *data) + +static int mdc_attach(struct obd_device *dev, obd_count len, void *data) { return lprocfs_reg_obd(dev, status_var_nm_1, dev); } -int mdc_detach(struct obd_device *dev) +static int mdc_detach(struct obd_device *dev) { return lprocfs_dereg_obd(dev); } + +static int mdc_recover(struct obd_import *imp, int phase) +{ + int rc; + ENTRY; + + switch(phase) { + case PTLRPC_RECOVD_PHASE_PREPARE: + ldlm_cli_cancel_unused(imp->imp_obd->obd_namespace, + NULL, LDLM_FL_LOCAL_ONLY); + RETURN(0); + case PTLRPC_RECOVD_PHASE_RECOVER: + rc = ptlrpc_reconnect_import(imp, MDS_CONNECT); + if (rc == EALREADY) + RETURN(ptlrpc_replay(imp, 0)); + if (rc) + RETURN(rc); + + rc = ptlrpc_replay(imp, 0 /* no last flag*/); + if (rc) + RETURN(rc); + + rc = ldlm_replay_locks(imp); + if (rc) + RETURN(rc); + + spin_lock(&imp->imp_lock); + imp->imp_level = LUSTRE_CONN_FULL; + spin_unlock(&imp->imp_lock); + + ptlrpc_wake_delayed(imp); + + rc = ptlrpc_resend(imp); + if (rc) + RETURN(rc); + + RETURN(0); + default: + RETURN(-EINVAL); + } +} + +static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd, + obd_uuid_t cluuid, struct recovd_obd *recovd, + ptlrpc_recovery_cb_t recover) +{ + struct obd_import *imp = &obd->u.cli.cl_import; + imp->imp_recover = mdc_recover; + return client_obd_connect(conn, obd, cluuid, recovd, recover); +} + struct obd_ops mdc_obd_ops = { o_attach: mdc_attach, o_detach: mdc_detach, o_setup: client_obd_setup, o_cleanup: client_obd_cleanup, - o_connect: client_obd_connect, + o_connect: mdc_connect, o_disconnect: client_obd_disconnect, o_statfs: mdc_statfs, }; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 43ae0ca..532ec14 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -40,6 +40,7 @@ #include #include /* for mds_objid */ #include +#include /* for IOC_LOV_SET_OSC_ACTIVE */ #include #include #include @@ -51,12 +52,12 @@ extern struct lprocfs_vars status_var_nm_1[]; extern struct lprocfs_vars status_class_var[]; -int osc_attach(struct obd_device *dev, obd_count len, void *data) +static int osc_attach(struct obd_device *dev, obd_count len, void *data) { return lprocfs_reg_obd(dev, status_var_nm_1, dev); } -int osc_detach(struct obd_device *dev) +static int osc_detach(struct obd_device *dev) { return lprocfs_dereg_obd(dev); } @@ -878,12 +879,111 @@ out: return err; } +static void set_osc_active(struct obd_import *imp, int active) +{ + struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov; + + if (notify_obd == NULL) + return; + + /* How gross is _this_? */ + if (!list_empty(¬ify_obd->obd_exports)) { + int rc; + struct lustre_handle fakeconn; + struct obd_ioctl_data ioc_data; + struct obd_export *exp = + list_entry(notify_obd->obd_exports.next, + struct obd_export, exp_obd_chain); + + fakeconn.addr = (__u64)(unsigned long)exp; + fakeconn.cookie = exp->exp_cookie; + ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid; + ioc_data.ioc_offset = active; + rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn, + sizeof ioc_data, &ioc_data, NULL); + if (rc) + CERROR("disabling %s on LOV %p/%s: %d\n", + imp->imp_obd->obd_uuid, notify_obd, + notify_obd->obd_uuid, rc); + } else { + CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about " + "%p\n", notify_obd, notify_obd->obd_uuid, + imp->imp_obd->obd_uuid); + } +} + + +/* XXX looks a lot like super.c:invalidate_request_list, don't it? */ +static void abort_inflight_for_import(struct obd_import *imp) +{ + struct list_head *tmp, *n; + + /* Make sure that no new requests get processed for this import. + * ptlrpc_queue_wait must (and does) hold imp_lock while testing this + * flag and then putting requests on sending_list or delayed_list. + */ + spin_lock(&imp->imp_lock); + imp->imp_flags |= IMP_INVALID; + spin_unlock(&imp->imp_lock); + + list_for_each_safe(tmp, n, &imp->imp_sending_list) { + struct ptlrpc_request *req = + list_entry(tmp, struct ptlrpc_request, rq_list); + + DEBUG_REQ(D_HA, req, "inflight"); + req->rq_flags |= PTL_RPC_FL_ERR; + wake_up(&req->rq_wait_for_rep); + } + + list_for_each_safe(tmp, n, &imp->imp_delayed_list) { + struct ptlrpc_request *req = + list_entry(tmp, struct ptlrpc_request, rq_list); + + DEBUG_REQ(D_HA, req, "aborting waiting req"); + req->rq_flags |= PTL_RPC_FL_ERR; + wake_up(&req->rq_wait_for_rep); + } +} + +static int osc_recover(struct obd_import *imp, int phase) +{ + int rc; + ENTRY; + + switch(phase) { + case PTLRPC_RECOVD_PHASE_PREPARE: { + struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; + ldlm_namespace_cleanup(ns, 1 /* no network ops */); + abort_inflight_for_import(imp); + set_osc_active(imp, 0 /* inactive */); + RETURN(0); + } + case PTLRPC_RECOVD_PHASE_RECOVER: + rc = ptlrpc_reconnect_import(imp, OST_CONNECT); + if (rc) + RETURN(rc); + set_osc_active(imp, 1 /* active */); + RETURN(0); + default: + RETURN(-EINVAL); + } +} + +static int osc_connect(struct lustre_handle *conn, struct obd_device *obd, + obd_uuid_t cluuid, struct recovd_obd *recovd, + ptlrpc_recovery_cb_t recover) +{ + struct obd_import *imp = &obd->u.cli.cl_import; + imp->imp_recover = osc_recover; + return client_obd_connect(conn, obd, cluuid, recovd, recover); +} + struct obd_ops osc_obd_ops = { o_attach: osc_attach, o_detach: osc_detach, o_setup: client_obd_setup, o_cleanup: client_obd_cleanup, - o_connect: client_obd_connect, + o_connect: osc_connect, o_disconnect: client_obd_disconnect, o_statfs: osc_statfs, o_packmd: osc_packmd, diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 8f4aceb..b60ec1f 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -192,7 +192,6 @@ static int ll_sync_brw_timeout(void *data) CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n", desc->bd_page_count, desc->bd_connection->c_remote_uuid, desc->bd_portal, desc->bd_connection); - desc->bd_connection->c_level = LUSTRE_CONN_RECOVD; /* This one will "never" arrive, don't wait for it. */ if (atomic_dec_and_test(&set->brw_refcount)) @@ -284,20 +283,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, request->rq_connection = ptlrpc_connection_addref(conn); INIT_LIST_HEAD(&request->rq_list); - /* - * This will be reduced once when the sender is finished (waiting for - * reply, f.e.), and once when the request has been committed and is - * removed from the to-be-committed list. - * - * Also, the refcount will be increased in ptl_send_rpc immediately - * before we hand it off to portals, and there will be a corresponding - * decrease in request_out_cb (which is called to indicate that portals - * is finished with the request, and it can be safely freed). - * - * (Except in the DLM server case, where it will be dropped twice - * by the sender, and then the last time by request_out_callback.) - */ - atomic_set(&request->rq_refcount, 2); + atomic_set(&request->rq_refcount, 1); spin_lock(&imp->imp_lock); request->rq_xid = HTON__u32(++imp->imp_last_xid); @@ -382,29 +368,9 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) { int rc = 0; + ENTRY; if (req->rq_repmsg != NULL) { - struct obd_import *imp = req->rq_import; - struct ptlrpc_connection *conn = imp->imp_connection; - ENTRY; - if (req->rq_level > conn->c_level) { - DEBUG_REQ(D_HA, req, - "recovery started, ignoring (%d > %d)", - req->rq_level, conn->c_level); - req->rq_repmsg = NULL; - GOTO(out, rc = 0); - } req->rq_transno = NTOH__u64(req->rq_repmsg->transno); - spin_lock(&imp->imp_lock); - if (req->rq_transno > imp->imp_max_transno) { - imp->imp_max_transno = req->rq_transno; - } else if (req->rq_transno != 0) { - if (conn->c_level == LUSTRE_CONN_FULL) { - CERROR("got transno "LPD64" after " - LPD64": recovery may not work\n", - req->rq_transno, imp->imp_max_transno); - } - } - spin_unlock(&imp->imp_lock); req->rq_flags |= PTL_RPC_FL_REPLIED; GOTO(out, rc = 1); } @@ -474,7 +440,7 @@ void ptlrpc_free_committed(struct obd_import *imp) struct list_head *tmp, *saved; struct ptlrpc_request *req; - list_for_each_safe(tmp, saved, &imp->imp_request_list) { + list_for_each_safe(tmp, saved, &imp->imp_replay_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); if (req->rq_flags & PTL_RPC_FL_REPLAY) { @@ -482,20 +448,6 @@ void ptlrpc_free_committed(struct obd_import *imp) continue; } - /* If neither replied-to nor restarted, keep it. */ - if (!(req->rq_flags & - (PTL_RPC_FL_REPLIED | PTL_RPC_FL_RESTART))) { - DEBUG_REQ(D_HA, req, "keeping (in-flight)"); - continue; - } - - /* This needs to match the commit test in ptlrpc_queue_wait() */ - if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) || - req->rq_transno == 0) { - DEBUG_REQ(D_HA, req, "keeping (queue_wait will free)"); - continue; - } - /* not yet committed */ if (req->rq_transno > imp->imp_peer_committed_transno) break; @@ -519,7 +471,7 @@ void ptlrpc_cleanup_client(struct obd_import *imp) LASSERT(conn); spin_lock(&imp->imp_lock); - list_for_each_safe(tmp, saved, &imp->imp_request_list) { + list_for_each_safe(tmp, saved, &imp->imp_replay_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); /* XXX we should make sure that nobody's sleeping on these! */ @@ -599,14 +551,15 @@ static int expired_request(void *data) RETURN(1); req->rq_timeout = 0; - req->rq_connection->c_level = LUSTRE_CONN_RECOVD; recovd_conn_fail(req->rq_import->imp_connection); +#if 0 /* If this request is for recovery or other primordial tasks, * don't go back to sleep. */ if (req->rq_level < LUSTRE_CONN_FULL) RETURN(1); +#endif RETURN(0); } @@ -618,21 +571,15 @@ static int interrupted_request(void *data) RETURN(1); /* ignored, as of this writing */ } -/* If we're being torn down by umount -f, or the import has been - * invalidated (such as by an OST failure), the request must fail with - * -EIO. +/* If the import has been invalidated (such as by an OST failure), the + * request must fail with -EIO. * - * Must be called with conn->c_lock held, will drop it if it returns -EIO. - * - * XXX this should just be testing the import, and umount_begin shouldn't touch - * XXX the connection. + * Must be called with imp_lock held, will drop it if it returns -EIO. */ -#define EIO_IF_INVALID(conn, req) \ -if ((conn->c_flags & CONN_INVALID) || \ - (req->rq_import->imp_flags & IMP_INVALID)) { \ - DEBUG_REQ(D_ERROR, req, "%s_INVALID:", \ - (conn->c_flags & CONN_INVALID) ? "CONN" : "IMP"); \ - spin_unlock(&conn->c_lock); \ +#define EIO_IF_INVALID(req) \ +if (req->rq_import->imp_flags & IMP_INVALID) { \ + DEBUG_REQ(D_ERROR, req, "IMP_INVALID:"); \ + spin_unlock(&imp->imp_lock); \ RETURN(-EIO); \ } @@ -645,30 +592,30 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) ENTRY; init_waitqueue_head(&req->rq_wait_for_rep); - req->rq_reqmsg->status = HTON__u32(current->pid); /* for distributed debugging */ + + /* for distributed debugging */ + req->rq_reqmsg->status = HTON__u32(current->pid); CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n", NTOH__u32(req->rq_reqmsg->status), req->rq_xid, conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc)); - - /* XXX probably both an import and connection level are needed */ - if (req->rq_level > conn->c_level) { - spin_lock(&conn->c_lock); - EIO_IF_INVALID(conn, req); + if (req->rq_level > imp->imp_level) { + spin_lock(&imp->imp_lock); + EIO_IF_INVALID(req); list_del(&req->rq_list); - list_add_tail(&req->rq_list, &conn->c_delayed_head); - spin_unlock(&conn->c_lock); + list_add_tail(&req->rq_list, &imp->imp_delayed_list); + spin_unlock(&imp->imp_lock); - DEBUG_REQ(D_HA, req, "waiting for recovery: (%d < %d)", - req->rq_level, conn->c_level); + DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d < %d)", + current->comm, req->rq_level, imp->imp_level); lwi = LWI_INTR(NULL, NULL); rc = l_wait_event(req->rq_wait_for_rep, - (req->rq_level <= conn->c_level) || + (req->rq_level <= imp->imp_level) || (req->rq_flags & PTL_RPC_FL_ERR), &lwi); - spin_lock(&conn->c_lock); + spin_lock(&imp->imp_lock); list_del_init(&req->rq_list); - spin_unlock(&conn->c_lock); + spin_unlock(&imp->imp_lock); if (req->rq_flags & PTL_RPC_FL_ERR) RETURN(-EIO); @@ -680,12 +627,12 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) } resend: req->rq_timeout = obd_timeout; - spin_lock(&conn->c_lock); - EIO_IF_INVALID(conn, req); + spin_lock(&imp->imp_lock); + EIO_IF_INVALID(req); - list_del(&req->rq_list); - list_add_tail(&req->rq_list, &imp->imp_request_list); - spin_unlock(&conn->c_lock); + LASSERT(list_empty(&req->rq_list)); + list_add_tail(&req->rq_list, &imp->imp_sending_list); + spin_unlock(&imp->imp_lock); rc = ptl_send_rpc(req); if (rc) { CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc, @@ -701,6 +648,10 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi); DEBUG_REQ(D_NET, req, "-- done sleeping"); + spin_lock(&imp->imp_lock); + list_del_init(&req->rq_list); + spin_unlock(&imp->imp_lock); + if (req->rq_flags & PTL_RPC_FL_ERR) { ptlrpc_abort(req); GOTO(out, rc = -EIO); @@ -714,7 +665,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) goto resend; } - // up(&cli->cli_rpc_sem); if (req->rq_flags & PTL_RPC_FL_INTR) { if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT)) LBUG(); /* should only be interrupted if we timed out */ @@ -749,36 +699,33 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg, req->rq_replen, req->rq_repmsg->status); - spin_lock(&conn->c_lock); - /* Requests that aren't from replayable imports, or which don't have - * transno information, can be "committed" early. - * - * But don't commit anything that's kept indefinitely for replay (has - * the PTL_RPC_FL_REPLAY flag set), such as open requests. - * - * This needs to match the commit test in ptlrpc_free_committed(). - */ - if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) || - (req->rq_repmsg->transno == 0 && - (req->rq_flags & PTL_RPC_FL_REPLAY) == 0)) { - /* This import doesn't support replay, so we can just "commit" - * this request now. - */ - DEBUG_REQ(D_HA, req, "not replayable, committing:"); - list_del_init(&req->rq_list); - __ptlrpc_req_finished(req, 1); - } if (req->rq_import->imp_flags & IMP_REPLAYABLE) { + spin_lock(&imp->imp_lock); + if (req->rq_flags & PTL_RPC_FL_REPLAY || req->rq_transno != 0) { + /* Balanced in ptlrpc_free_committed, usually. */ + atomic_inc(&req->rq_refcount); + list_add_tail(&req->rq_list, &imp->imp_replay_list); + } + + if (req->rq_transno > imp->imp_max_transno) { + imp->imp_max_transno = req->rq_transno; + } else if (req->rq_transno != 0 && + imp->imp_level == LUSTRE_CONN_FULL) { + CERROR("got transno "LPD64" after "LPD64": recovery " + "may not work\n", req->rq_transno, + imp->imp_max_transno); + } + /* Replay-enabled imports return commit-status information. */ imp->imp_peer_last_xid = req->rq_repmsg->last_xid; - imp->imp_peer_committed_transno = + imp->imp_peer_committed_transno = req->rq_repmsg->last_committed; + spin_unlock(&imp->imp_lock); ptlrpc_free_committed(imp); } rc = ptlrpc_check_status(req); - spin_unlock(&conn->c_lock); EXIT; out: diff --git a/lustre/ptlrpc/connection.c b/lustre/ptlrpc/connection.c index 2458b08..2182591 100644 --- a/lustre/ptlrpc/connection.c +++ b/lustre/ptlrpc/connection.c @@ -79,7 +79,6 @@ struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer, if (c == NULL) GOTO(out, c); - c->c_level = LUSTRE_CONN_NEW; c->c_generation = 1; c->c_epoch = 1; c->c_bootcount = 0; diff --git a/lustre/ptlrpc/recover.c b/lustre/ptlrpc/recover.c index 9d955e6..9dddf78 100644 --- a/lustre/ptlrpc/recover.c +++ b/lustre/ptlrpc/recover.c @@ -110,8 +110,6 @@ int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn) int rc; ENTRY; - conn->c_level = LUSTRE_CONN_RECOVD; - argv[0] = obd_recovery_upcall; argv[1] = conn->c_remote_uuid; argv[2] = NULL; @@ -138,159 +136,140 @@ int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn) RETURN(0); } -#define REPLAY_COMMITTED 0 /* Fully processed (commit + reply). */ -#define REPLAY_REPLAY 1 /* Forced-replay (e.g. open). */ -#define REPLAY_RESEND 2 /* Resend required. */ -#define REPLAY_RESEND_IGNORE 3 /* Resend, ignore the reply (already saw it). */ -#define REPLAY_RESTART 4 /* Have to restart the call, sorry! */ - -static int replay_state(struct ptlrpc_request *req, __u64 committed) +int ptlrpc_replay(struct obd_import *imp, int send_last_flag) { - /* This request must always be replayed. */ - if (req->rq_flags & PTL_RPC_FL_REPLAY) - return REPLAY_REPLAY; + int rc = 0; + struct list_head *tmp, *pos; + struct ptlrpc_request *req; + __u64 committed = imp->imp_peer_committed_transno; + ENTRY; - /* Uncommitted request */ - if (req->rq_transno > committed) { - if (req->rq_flags & PTL_RPC_FL_REPLIED) { - /* Saw reply, so resend and ignore new reply. */ - return REPLAY_RESEND_IGNORE; - } + /* It might have committed some after we last spoke, so make sure we + * get rid of them now. + */ + ptlrpc_free_committed(imp); - /* Didn't see reply either, so resend. */ - return REPLAY_RESEND; + spin_lock(&imp->imp_lock); + + CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n", + imp, imp->imp_obd->u.cli.cl_target_uuid, committed); + + list_for_each(tmp, &imp->imp_replay_list) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + DEBUG_REQ(D_HA, req, "RETAINED: "); } - /* This request has been committed and we saw the reply. Goodbye! */ - if (req->rq_flags & PTL_RPC_FL_REPLIED) - return REPLAY_COMMITTED; + list_for_each_safe(tmp, pos, &imp->imp_replay_list) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + + if (req->rq_transno == imp->imp_max_transno && + send_last_flag) { + req->rq_reqmsg->flags |= MSG_LAST_REPLAY; + DEBUG_REQ(D_HA, req, "LAST_REPLAY:"); + } else { + DEBUG_REQ(D_HA, req, "REPLAY:"); + } + + rc = ptlrpc_replay_req(req); + req->rq_reqmsg->flags &= ~MSG_LAST_REPLAY; - /* Request committed, but we didn't see the reply: have to restart. */ - return REPLAY_RESTART; + if (rc) { + CERROR("recovery replay error %d for req %Ld\n", + rc, req->rq_xid); + GOTO(out, rc); + } + } + + out: + spin_unlock(&imp->imp_lock); + return rc; } -static char *replay_state2str(int state) { - static char *state_strings[] = { - "COMMITTED", "REPLAY", "RESEND", "RESEND_IGNORE", "RESTART", - }; - static char *unknown_state = "UNKNOWN"; +#define NO_RESEND 0 /* No action required. */ +#define RESEND 1 /* Resend required. */ +#define RESEND_IGNORE 2 /* Resend, ignore the reply (already saw it). */ +#define RESTART 3 /* Have to restart the call, sorry! */ + +static int resend_type(struct ptlrpc_request *req, __u64 committed) +{ + if (req->rq_transno < committed) { + if (req->rq_flags & PTL_RPC_FL_REPLIED) { + /* Saw the reply and it was committed, no biggie. */ + DEBUG_REQ(D_HA, req, "NO_RESEND"); + return NO_RESEND; + } + /* Request committed, but no reply: have to restart. */ + return RESTART; + } - if (state < 0 || - state > (sizeof(state_strings) / sizeof(state_strings[0]))) { - return unknown_state; + if (req->rq_flags & PTL_RPC_FL_REPLIED) { + /* Saw reply, so resend and ignore new reply. */ + return RESEND_IGNORE; } - return state_strings[state]; + /* Didn't see reply either, so resend. */ + return RESEND; + } -int ptlrpc_replay(struct obd_import *imp, int unreplied_only) +int ptlrpc_resend(struct obd_import *imp) { - int rc = 0, state; + int rc = 0, type; struct list_head *tmp, *pos; struct ptlrpc_request *req; - struct ptlrpc_connection *conn = imp->imp_connection; __u64 committed = imp->imp_peer_committed_transno; + ENTRY; spin_lock(&imp->imp_lock); - - CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n", - imp, imp->imp_obd->u.cli.cl_target_uuid, committed); - - list_for_each(tmp, &imp->imp_request_list) { - req = list_entry(tmp, struct ptlrpc_request, rq_list); - state = replay_state(req, committed); - DEBUG_REQ(D_HA, req, "SENDING: %s: ", replay_state2str(state)); - } - - list_for_each(tmp, &conn->c_delayed_head) { + list_for_each(tmp, &imp->imp_sending_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); - state = replay_state(req, committed); - DEBUG_REQ(D_HA, req, "DELAYED: %s: ", replay_state2str(state)); + DEBUG_REQ(D_HA, req, "SENDING: "); } - list_for_each_safe(tmp, pos, &imp->imp_request_list) { + list_for_each_safe(tmp, pos, &imp->imp_sending_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); - - if (unreplied_only) { - if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) { - DEBUG_REQ(D_HA, req, "UNREPLIED:"); - ptlrpc_restart_req(req); - } - continue; - } - - state = replay_state(req, committed); - - if (req->rq_transno == imp->imp_max_transno) { - req->rq_reqmsg->flags |= MSG_LAST_REPLAY; - DEBUG_REQ(D_HA, req, "last for replay"); - LASSERT(state != REPLAY_COMMITTED); - } - - switch (state) { - case REPLAY_REPLAY: - DEBUG_REQ(D_HA, req, "REPLAY:"); - rc = ptlrpc_replay_req(req); -#if 0 -#error We should not hold a spinlock over such a lengthy operation. -#error If necessary, drop spinlock, do operation, re-get spinlock, restart loop. -#error If we need to avoid re-processint items, then delete them from the list -#error as they are replayed and re-add at the tail of this list, so the next -#error item to process will always be at the head of the list. -#endif - if (rc) { - CERROR("recovery replay error %d for req %Ld\n", - rc, req->rq_xid); - GOTO(out, rc); - } + + switch(resend_type(req, committed)) { + case NO_RESEND: break; - case REPLAY_COMMITTED: - DEBUG_REQ(D_ERROR, req, "COMMITTED:"); - /* XXX commit now? */ + case RESTART: + DEBUG_REQ(D_HA, req, "RESTART:"); + ptlrpc_restart_req(req); break; - case REPLAY_RESEND_IGNORE: + case RESEND_IGNORE: DEBUG_REQ(D_HA, req, "RESEND_IGNORE:"); - rc = ptlrpc_replay_req(req); + rc = ptlrpc_replay_req(req); if (rc) { - CERROR("request resend error %d for req %Ld\n", - rc, req->rq_xid); - GOTO(out, rc); + DEBUG_REQ(D_ERROR, req, "error %d resending:", + rc); + ptlrpc_restart_req(req); /* might as well */ } break; - case REPLAY_RESTART: - DEBUG_REQ(D_HA, req, "RESTART:"); - ptlrpc_restart_req(req); - break; - - case REPLAY_RESEND: + case RESEND: DEBUG_REQ(D_HA, req, "RESEND:"); ptlrpc_resend_req(req); break; - + default: LBUG(); } - } +} - conn->c_level = LUSTRE_CONN_FULL; - recovd_conn_fixed(conn); +void ptlrpc_wake_delayed(struct obd_import *imp) +{ + struct list_head *tmp, *pos; + struct ptlrpc_request *req; - CERROR("recovery complete on conn %p(%s), waking delayed reqs\n", - conn, conn->c_remote_uuid); - /* Finally, continue processing requests that blocked for recovery. */ - list_for_each_safe(tmp, pos, &conn->c_delayed_head) { + spin_lock(&imp->imp_lock); + list_for_each_safe(tmp, pos, &imp->imp_delayed_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_HA, req, "WAKING: "); - ptlrpc_continue_req(req); + DEBUG_REQ(D_HA, req, "waking:"); + wake_up(&req->rq_wait_for_rep); } - - EXIT; - out: - spin_unlock(&conn->c_lock); - return rc; + spin_unlock(&imp->imp_lock); } diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index 7263ac0..c721993 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -272,6 +272,8 @@ EXPORT_SYMBOL(lustre_msg_buf); EXPORT_SYMBOL(ptlrpc_run_recovery_upcall); EXPORT_SYMBOL(ptlrpc_reconnect_import); EXPORT_SYMBOL(ptlrpc_replay); +EXPORT_SYMBOL(ptlrpc_resend); +EXPORT_SYMBOL(ptlrpc_wake_delayed); MODULE_AUTHOR("Cluster File Systems, Inc "); MODULE_DESCRIPTION("Lustre Request Processor v1.0"); -- 1.8.3.1