From 4d477d1468cf4be4c37681610b3d726fd27f229f Mon Sep 17 00:00:00 2001 From: shaver Date: Thu, 28 Nov 2002 20:21:24 +0000 Subject: [PATCH] Landing of b_recovery (at last). Highlights: - b=324: MDS recovery must replay transactions in strict transno sequence - b=325: getattr after OST failure returns -EIO - b=326: unlink after OST failure returns -EIO - b=400: new client can't join cluster after OST failure - b=403: multi-client access failure when OST fails - b=410: After an OST failure, lfind incorrectly displays file information - b=417: Freeing unreplayable requests twice (aed's fix from b_md) - b=402: (partial) give error for lstripe request that exceeds configured OSTs - much better support for reconnecting to MDS after network partition (still some lock-repeating issues to be resolved for some requests) - better support for connecting to multiple MDSes on one host (xid and transno and request_list are all per-import now) - track disconnecting clients in last_rcvd, for more reliable recovery - also, sync last_rcvd after connect/disconnect - reduced syslog/CERROR output for recovery (hi, Terry!) - server (DLM) timeout is half the system-wide timeout, to avoid cascading failure in the face of a dead client - don't wait for recovery to finish in order to send disconnect messages - removal of c_dying_head - don't wait for timeout to trigger recovery after ptl_send_rpc error - strict MDS transno ordering via mds_transno_sem (non-optimal, but correct) - many !handle -> IS_ERR(handle) fixes around mds_fs_start callers. - turn on client-eviction for bulk-timeouts in OST and MDS --- lustre/include/linux/lustre_export.h | 7 +- lustre/include/linux/lustre_ha.h | 2 +- lustre/include/linux/lustre_idl.h | 1 + lustre/include/linux/lustre_import.h | 10 +- lustre/include/linux/lustre_lib.h | 3 - lustre/include/linux/lustre_mds.h | 5 +- lustre/include/linux/lustre_net.h | 15 +- lustre/include/linux/obd.h | 45 +++--- lustre/ldlm/ldlm_lockd.c | 2 +- lustre/ldlm/ldlm_request.c | 3 - lustre/lib/client.c | 52 ++++-- lustre/lib/target.c | 4 +- lustre/llite/file.c | 9 +- lustre/llite/recover.c | 63 ++------ lustre/llite/super.c | 2 +- lustre/lov/lov_obd.c | 174 +++++++++++++------- lustre/mdc/mdc_reint.c | 1 - lustre/mdc/mdc_request.c | 9 +- lustre/mds/handler.c | 303 ++++++++++++++++++++++++++++++----- lustre/mds/mds_fs.c | 89 +++++++--- lustre/mds/mds_reint.c | 147 +++++++++++------ lustre/obdclass/class_obd.c | 42 ++--- lustre/obdclass/genops.c | 2 +- lustre/osc/osc_request.c | 23 +-- lustre/ost/ost_handler.c | 2 +- lustre/ptlrpc/client.c | 244 +++++++++++++--------------- lustre/ptlrpc/connection.c | 10 +- lustre/ptlrpc/recovd.c | 4 +- lustre/ptlrpc/recover.c | 132 +++++++++------ lustre/ptlrpc/rpc.c | 1 - lustre/utils/lconf | 27 ++-- 31 files changed, 900 insertions(+), 533 deletions(-) diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h index 38551ac..dc2c0b5 100644 --- a/lustre/include/linux/lustre_export.h +++ b/lustre/include/linux/lustre_export.h @@ -22,21 +22,16 @@ struct lov_export_data { struct obd_export { __u64 exp_cookie; - struct lustre_handle exp_impconnh; struct list_head exp_obd_chain; struct list_head exp_conn_chain; struct obd_device *exp_obd; struct ptlrpc_connection *exp_connection; - struct ldlm_export_data exp_ldlm_data; /* can this go inside u? */ + struct ldlm_export_data exp_ldlm_data; union { struct mds_export_data eu_mds_data; struct filter_export_data eu_filter_data; struct lov_export_data eu_lov_data; } u; - void *exp_data; /* device specific data */ - int exp_desclen; - char *exp_desc; - obd_uuid_t exp_uuid; }; #define exp_mds_data u.eu_mds_data diff --git a/lustre/include/linux/lustre_ha.h b/lustre/include/linux/lustre_ha.h index f989e6d..8611e88 100644 --- a/lustre/include/linux/lustre_ha.h +++ b/lustre/include/linux/lustre_ha.h @@ -54,6 +54,6 @@ extern struct recovd_obd *ptlrpc_recovd; int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn); int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc); -int ptlrpc_replay(struct ptlrpc_connection *conn); +int ptlrpc_replay(struct obd_import *imp, int unreplied_only); #endif diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 2e01bde..ea75f08 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -127,6 +127,7 @@ struct lustre_msg { /* Flags that apply to all requests are in the bottom 16 bits */ #define MSG_GEN_FLAG_MASK 0x0000ffff +#define MSG_LAST_REPLAY 1 static inline int lustre_msg_get_flags(struct lustre_msg *msg) { diff --git a/lustre/include/linux/lustre_import.h b/lustre/include/linux/lustre_import.h index 13b39b7..893fd0a 100644 --- a/lustre/include/linux/lustre_import.h +++ b/lustre/include/linux/lustre_import.h @@ -21,9 +21,17 @@ struct obd_import { struct ptlrpc_client *imp_client; struct lustre_handle imp_handle; struct list_head imp_chain; + struct list_head imp_request_list; struct obd_device *imp_obd; int imp_flags; - /* XXX need a UUID here, I think, unless we just use the OBD's UUID */ + int imp_level; + __u64 imp_last_xid; + __u64 imp_max_transno; + __u64 imp_peer_last_xid; + __u64 imp_peer_committed_transno; + + /* Protects flags, level, *_xid, request_list */ + spinlock_t imp_lock; }; extern struct obd_import *class_conn2cliimp(struct lustre_handle *); diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index a1e325b..1c6e0fd 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -548,7 +548,6 @@ do { \ if (condition) \ break; \ if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) {\ - CERROR("lwe: interrupt\n"); \ if (info->lwi_on_signal) \ info->lwi_on_signal(info->lwi_cb_data); \ ret = -EINTR; \ @@ -556,7 +555,6 @@ do { \ } \ if (info->lwi_timeout && !__timed_out) { \ if (schedule_timeout(info->lwi_timeout) == 0) { \ - CERROR("lwe: timeout\n"); \ __timed_out = 1; \ if (!info->lwi_on_timeout || \ info->lwi_on_timeout(info->lwi_cb_data)) { \ @@ -568,7 +566,6 @@ do { \ __state = TASK_INTERRUPTIBLE; \ /* Check for a pending interrupt. */ \ if (info->lwi_signals && l_killable_pending(current)) {\ - CERROR("lwe: pending interrupt\n"); \ if (info->lwi_on_signal) \ info->lwi_on_signal(info->lwi_cb_data); \ ret = -EINTR; \ diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 67d8542..936ce99 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -194,8 +194,9 @@ int mdc_create_client(obd_uuid_t uuid, struct ptlrpc_client *cl); void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff, int repoff); -extern int mds_client_add(struct mds_export_data *med, int cl_off); -extern int mds_client_free(struct obd_export *exp); +int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, + int cl_off); +int mds_client_free(struct obd_export *exp); /* mds/mds_fs.c */ struct mds_fs_operations { diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index f1a0870..d3aa4fd 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -78,19 +78,13 @@ struct ptlrpc_connection { __u32 c_bootcount; /* peer's boot count */ spinlock_t c_lock; /* also protects req->rq_list */ - __u32 c_xid_in; - __u32 c_xid_out; atomic_t c_refcount; __u64 c_token; __u64 c_remote_conn; __u64 c_remote_token; - __u64 c_last_xid; /* protected by c_lock */ - __u64 c_last_committed;/* protected by c_lock */ - struct list_head c_delayed_head;/* delayed until post-recovery */ - struct list_head c_sending_head;/* protected by c_lock */ - struct list_head c_dying_head; /* protected by c_lock */ + struct list_head c_delayed_head;/* delayed until post-recovery XXX imp? */ struct recovd_data c_recovd_data; struct list_head c_imports; @@ -120,7 +114,7 @@ struct ptlrpc_client { #define PTL_RPC_FL_ERR (1 << 5) #define PTL_RPC_FL_TIMEOUT (1 << 6) #define PTL_RPC_FL_RESEND (1 << 7) -#define PTL_RPC_FL_RECOVERY (1 << 8) /* retransmission for recovery */ +#define PTL_RPC_FL_RESTART (1 << 8) /* operation must be restarted */ #define PTL_RPC_FL_FINISHED (1 << 9) #define PTL_RPC_FL_RETAIN (1 << 10) /* retain for replay after reply */ #define PTL_RPC_FL_REPLAY (1 << 11) /* replay upon recovery */ @@ -169,13 +163,13 @@ struct ptlrpc_request { #define DEBUG_REQ(level, req, fmt, args...) \ do { \ CDEBUG(level, \ - "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d fl " \ + "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d ref %d fl " \ "%x\n" , ## args, req->rq_xid, req->rq_transno, \ req->rq_reqmsg ? req->rq_reqmsg->opc : -1, \ req->rq_connection ? (char *)req->rq_connection->c_remote_uuid : "", \ (req->rq_import && req->rq_import->imp_client) ? \ req->rq_import->imp_client->cli_request_portal : -1, \ - req->rq_reqlen, req->rq_replen, req->rq_flags); \ + req->rq_reqlen, req->rq_replen, req->rq_refcount, req->rq_flags); \ } while (0) struct ptlrpc_bulk_page { @@ -326,7 +320,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *); void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk); struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc); void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page); -int ptlrpc_check_status(struct ptlrpc_request *req, int err); /* rpc/service.c */ struct ptlrpc_service * diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 70e1369..2235a1e 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -119,25 +119,34 @@ struct client_obd { #define IOC_OSC_MAX_NR 50 struct mds_obd { - struct ptlrpc_service *mds_service; - - char *mds_fstype; - struct super_block *mds_sb; - struct super_operations *mds_sop; - struct vfsmount *mds_vfsmnt; - struct obd_run_ctxt mds_ctxt; - struct file_operations *mds_fop; - struct inode_operations *mds_iop; + struct ptlrpc_service *mds_service; + + char *mds_fstype; + struct super_block *mds_sb; + struct super_operations *mds_sop; + struct vfsmount *mds_vfsmnt; + struct obd_run_ctxt mds_ctxt; + struct file_operations *mds_fop; + struct inode_operations *mds_iop; struct address_space_operations *mds_aops; - struct mds_fs_operations *mds_fsops; - int mds_max_mdsize; - struct file *mds_rcvd_filp; - spinlock_t mds_last_lock; - __u64 mds_last_committed; - __u64 mds_last_rcvd; - __u64 mds_mount_count; - struct ll_fid mds_rootfid; - struct mds_server_data *mds_server_data; + struct mds_fs_operations *mds_fsops; + + int mds_max_mdsize; + struct file *mds_rcvd_filp; + struct semaphore mds_transno_sem; + __u64 mds_last_committed; + __u64 mds_last_rcvd; + __u64 mds_mount_count; + struct ll_fid mds_rootfid; + struct mds_server_data *mds_server_data; + + wait_queue_head_t mds_next_transno_waitq; + __u64 mds_next_recovery_transno; + int mds_recoverable_clients; + struct list_head mds_recovery_queue; + struct list_head mds_delayed_reply_queue; + spinlock_t mds_processing_task_lock; + pid_t mds_processing_task; }; struct ldlm_obd { diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index a1524bf..12af650 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -77,7 +77,7 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) LASSERT(list_empty(&lock->l_pending_chain)); spin_lock_bh(&waiting_locks_spinlock); - lock->l_callback_timeout = jiffies + (obd_timeout * HZ); + lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2); timeout_rounded = round_timeout(lock->l_callback_timeout); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index c23fd85..6672c3e 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -230,7 +230,6 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, LDLM_DEBUG(lock, "sending request"); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); if (rc != ELDLM_OK) { LASSERT(!is_replay); @@ -407,7 +406,6 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) req->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); if (rc != ELDLM_OK) GOTO(out, rc); @@ -464,7 +462,6 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) req->rq_replen = lustre_msg_size(0, NULL); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); ptlrpc_req_finished(req); if (rc != ELDLM_OK) GOTO(out, rc); diff --git a/lustre/lib/client.c b/lustre/lib/client.c index c5590fa..bf5fac3 100644 --- a/lustre/lib/client.c +++ b/lustre/lib/client.c @@ -46,6 +46,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) int rq_portal, rp_portal; char *name; struct client_obd *cli = &obddev->u.cli; + struct obd_import *imp = &cli->cl_import; obd_uuid_t server_uuid; ENTRY; @@ -85,14 +86,17 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2, sizeof(server_uuid))); - cli->cl_import.imp_connection = ptlrpc_uuid_to_connection(server_uuid); - if (!cli->cl_import.imp_connection) + imp->imp_connection = ptlrpc_uuid_to_connection(server_uuid); + if (!imp->imp_connection) RETURN(-ENOENT); + + INIT_LIST_HEAD(&imp->imp_request_list); + spin_lock_init(&imp->imp_lock); ptlrpc_init_client(rq_portal, rp_portal, name, &obddev->obd_ldlm_client); - cli->cl_import.imp_client = &obddev->obd_ldlm_client; - cli->cl_import.imp_obd = obddev; + imp->imp_client = &obddev->obd_ldlm_client; + imp->imp_obd = obddev; cli->cl_max_mds_easize = sizeof(struct lov_mds_md); @@ -122,6 +126,7 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd, char *tmp[] = {cli->cl_target_uuid, obd->obd_uuid}; int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT; struct ptlrpc_connection *c; + struct obd_import *imp = &cli->cl_import; ENTRY; down(&cli->cl_sem); @@ -140,6 +145,12 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd, if (obd->obd_namespace == NULL) GOTO(out_disco, rc = -ENOMEM); + INIT_LIST_HEAD(&imp->imp_chain); + imp->imp_last_xid = 0; + imp->imp_max_transno = 0; + imp->imp_peer_last_xid = 0; + imp->imp_peer_committed_transno = 0; + request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 2, size, tmp); if (!request) GOTO(out_ldlm, rc = -ENOMEM); @@ -153,16 +164,15 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd, recovd_conn_manage(c, recovd, recover); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) GOTO(out_req, rc); if (rq_opc == MDS_CONNECT) - cli->cl_import.imp_flags |= IMP_REPLAYABLE; - list_add(&cli->cl_import.imp_chain, &c->c_imports); + imp->imp_flags |= IMP_REPLAYABLE; + list_add(&imp->imp_chain, &c->c_imports); c->c_level = LUSTRE_CONN_FULL; - cli->cl_import.imp_handle.addr = request->rq_repmsg->addr; - cli->cl_import.imp_handle.cookie = request->rq_repmsg->cookie; + imp->imp_handle.addr = request->rq_repmsg->addr; + imp->imp_handle.cookie = request->rq_repmsg->cookie; EXIT; out_req: @@ -171,9 +181,21 @@ out_req: out_ldlm: ldlm_namespace_free(obd->obd_namespace); obd->obd_namespace = NULL; + if (rq_opc == MDS_CONNECT) { + /* Don't class_disconnect OSCs, because the LOV + * cares about them even if they can't connect to the + * OST. + * + * This is leak-bait, but without either a way to + * operate on the osc without an export or separate + * methods for connect-to-osc and connect-osc-to-ost + * it's not clear what else to do. + */ out_disco: - class_disconnect(conn); - MOD_DEC_USE_COUNT; + cli->cl_conn_count--; + class_disconnect(conn); + MOD_DEC_USE_COUNT; + } } out_sem: up(&cli->cl_sem); @@ -210,12 +232,16 @@ int client_obd_disconnect(struct lustre_handle *conn) ldlm_namespace_free(obd->obd_namespace); obd->obd_namespace = NULL; - request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL, NULL); + request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL, + NULL); if (!request) GOTO(out_disco, rc = -ENOMEM); - + request->rq_replen = lustre_msg_size(0, NULL); + /* Process disconnects even if we're waiting for recovery. */ + request->rq_level = LUSTRE_CONN_RECOVD; + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out_req, rc); diff --git a/lustre/lib/target.c b/lustre/lib/target.c index 8786ee8..7666663 100644 --- a/lustre/lib/target.c +++ b/lustre/lib/target.c @@ -69,7 +69,8 @@ int target_handle_connect(struct ptlrpc_request *req) rc = obd_connect(&conn, target, cluuid, ptlrpc_recovd, target_revoke_connection); - if (rc) + /* EALREADY indicates a reconnection, send the reply normally. */ + if (rc && rc != EALREADY) GOTO(out, rc); rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); @@ -99,6 +100,7 @@ int target_handle_connect(struct ptlrpc_request *req) dlmimp->imp_handle.addr = req->rq_reqmsg->addr; dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie; dlmimp->imp_obd = /* LDLM! */ NULL; + spin_lock_init(&dlmimp->imp_lock); req->rq_connection->c_level = LUSTRE_CONN_FULL; out: diff --git a/lustre/llite/file.c b/lustre/llite/file.c index dbc0485..a67b023 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -158,7 +158,8 @@ out_mdc: mdc_close(&sbi->ll_mdc_conn, inode->i_ino, S_IFREG, &fd->fd_mdshandle, &req); out_req: - ptlrpc_free_req(req); + ptlrpc_req_finished(req); /* once for reply */ + ptlrpc_req_finished(req); /* once for an early "commit" */ //out_fd: fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC; kmem_cache_free(ll_file_data_slab, fd); @@ -344,11 +345,7 @@ out_mdc: rc = -abs(rc2); GOTO(out_fd, rc); } - CDEBUG(D_HA, "matched req %p xid "LPD64" transno "LPD64" op " - "%d->%s:%d\n", fd->fd_req, fd->fd_req->rq_xid, - fd->fd_req->rq_repmsg->transno, fd->fd_req->rq_reqmsg->opc, - fd->fd_req->rq_import->imp_connection->c_remote_uuid, - fd->fd_req->rq_import->imp_client->cli_request_portal); + DEBUG_REQ(D_HA, fd->fd_req, "matched open for this close: "); ptlrpc_req_finished(fd->fd_req); if (atomic_dec_and_test(&lli->lli_open_count)) { diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c index b688fb9..8acd1bb 100644 --- a/lustre/llite/recover.c +++ b/lustre/llite/recover.c @@ -33,26 +33,16 @@ static void abort_inflight_for_import(struct obd_import *imp) imp->imp_flags |= IMP_INVALID; spin_unlock(&imp->imp_connection->c_lock); - list_for_each_safe(tmp, n, &imp->imp_connection->c_sending_head) { + list_for_each_safe(tmp, n, &imp->imp_request_list) { struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_list); - if (req->rq_import != imp) - continue; - if (req->rq_flags & PTL_RPC_FL_REPLIED) { /* no need to replay, just discard */ - CERROR("uncommitted req xid "LPD64" op %d to OST %s\n", - (unsigned long long)req->rq_xid, - req->rq_reqmsg->opc, - imp->imp_obd->u.cli.cl_target_uuid); + DEBUG_REQ(D_ERROR, req, "uncommitted"); ptlrpc_req_finished(req); } else { - CERROR("inflight req xid "LPD64" op %d to OST %s\n", - (unsigned long long)req->rq_xid, - req->rq_reqmsg->opc, - imp->imp_obd->u.cli.cl_target_uuid); - + DEBUG_REQ(D_ERROR, req, "inflight"); req->rq_flags |= PTL_RPC_FL_ERR; wake_up(&req->rq_wait_for_rep); } @@ -61,9 +51,11 @@ static void abort_inflight_for_import(struct obd_import *imp) list_for_each_safe(tmp, n, &imp->imp_connection->c_delayed_head) { struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_list); - CERROR("aborting waiting req xid "LPD64" op %d to OST %s\n", - (unsigned long long)req->rq_xid, req->rq_reqmsg->opc, - imp->imp_obd->u.cli.cl_target_uuid); + + if (req->rq_import != imp) + continue; + + DEBUG_REQ(D_ERROR, req, "aborting waiting req"); req->rq_flags |= PTL_RPC_FL_ERR; wake_up(&req->rq_wait_for_rep); } @@ -149,53 +141,32 @@ static void reconnect_osc(struct obd_import *imp) imp->imp_obd->obd_uuid); } -static int reconnect_mdc(struct obd_import *imp) +static void reconnect_mdc(struct obd_import *imp) { - return ptlrpc_reconnect_import(imp, MDS_CONNECT); + int rc = ptlrpc_reconnect_import(imp, MDS_CONNECT); + if (!rc) + ptlrpc_replay(imp, 0 /* all reqs */); + else if (rc == EALREADY) + ptlrpc_replay(imp, 1 /* only unreplied reqs */); } static int ll_reconnect(struct ptlrpc_connection *conn) { struct list_head *tmp; - int need_replay = 0; ENTRY; - - /* XXX c_lock semantics! */ - conn->c_level = LUSTRE_CONN_CON; - - /* XXX this code MUST be shared with class_obd_connect! */ list_for_each(tmp, &conn->c_imports) { struct obd_import *imp = list_entry(tmp, struct obd_import, imp_chain); if (imp->imp_obd->obd_type->typ_ops->o_brw) { - /* XXX what to do if we fail? */ reconnect_osc(imp); } else { - int rc = reconnect_mdc(imp); - if (!rc) { - need_replay = 1; - } - /* make sure we don't try to replay for dead imps? - * - * else imp->imp_connection = NULL; - * - */ - + reconnect_mdc(imp); } } - if (!need_replay) { - /* all done! */ - conn->c_level = LUSTRE_CONN_FULL; - RETURN(0); - } - - conn->c_level = LUSTRE_CONN_RECOVD; - /* this will replay, up the c_level, recovd_conn_fixed and continue - * reqs. also, makes a mean cup of coffee. - */ - RETURN(ptlrpc_replay(conn)); + conn->c_level = LUSTRE_CONN_FULL; + RETURN(0); } int ll_recover(struct recovd_data *rd, int phase) diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 01e00ce..cb7136c 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -574,7 +574,7 @@ void ll_umount_begin(struct super_block *sb) /* XXX should just be dealing with imports, probably through * XXX iocontrol, need next-gen recovery! */ conn->c_flags |= CONN_INVALID; - invalidate_request_list(&conn->c_sending_head); + /* invalidate_request_list(&conn->c_sending_head); */ invalidate_request_list(&conn->c_delayed_head); spin_unlock(&conn->c_lock); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index c840368..15fe873 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -158,6 +158,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, for (i = 0; i < desc->ld_tgt_count; i++) { struct obd_device *tgt = class_uuid2obd(uuidarray[i]); + int rc2; if (!tgt) { CERROR("Target %s not attached\n", uuidarray[i]); @@ -171,18 +172,27 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd, recover); - if (rc) { - CERROR("Target %s connect error %d\n", - uuidarray[i], rc); - GOTO(out_disc, rc); + + /* Register even if connect failed, so that we get reactivation + * notices. + */ + rc2 = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn, + sizeof(struct obd_device *), obd, NULL); + if (rc2) { + CERROR("Target %s REGISTER_LOV error %d\n", + uuidarray[i], rc2); + GOTO(out_disc, rc2); } - rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn, - sizeof(struct obd_device *), obd, NULL); + + /* But mark failed-connect OSCs as inactive! */ if (rc) { - CERROR("Target %s REGISTER_LOV error %d\n", + CDEBUG(D_INFO, "Target %s connect error %d\n", uuidarray[i], rc); - GOTO(out_disc, rc); + LASSERT(lov->tgts[i].active == 0); + rc = 0; + continue; } + desc->ld_active_tgt_count++; lov->tgts[i].active = 1; } @@ -227,19 +237,17 @@ static int lov_disconnect(struct lustre_handle *conn) goto out_local; for (i = 0; i < lov->desc.ld_tgt_count; i++) { - if (!lov->tgts[i].active) { - CERROR("Skipping disconnect for inactive OSC %s\n", - lov->tgts[i].uuid); - continue; - } - - lov->desc.ld_active_tgt_count--; - lov->tgts[i].active = 0; rc = obd_disconnect(&lov->tgts[i].conn); if (rc) { - CERROR("Target %s disconnect error %d\n", - lov->tgts[i].uuid, rc); - RETURN(rc); + if (lov->tgts[i].active) { + CERROR("Target %s disconnect error %d\n", + lov->tgts[i].uuid, rc); + } + rc = 0; + } + if (lov->tgts[i].active) { + lov->desc.ld_active_tgt_count--; + lov->tgts[i].active = 0; } } OBD_FREE(lov->tgts, lov->bufsize); @@ -313,10 +321,24 @@ static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid, CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in"); lov->tgts[i].active = activate; - if (activate) + if (activate) { + /* + * foreach(export) + * foreach(open_file) + * if (file_handle uses this_osc) + * if (has_no_filehandle) + * open(file_handle, this_osc); + */ + /* XXX reconnect? */ lov->desc.ld_active_tgt_count++; - else + } else { + /* + * Should I invalidate filehandles that refer to this OSC, so + * that I reopen them during reactivation? + */ + /* XXX disconnect from OSC? */ lov->desc.ld_active_tgt_count--; + } EXIT; out: @@ -332,7 +354,7 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) ENTRY; if (data->ioc_inllen1 < 1) { - CERROR("osc setup requires an MDC UUID\n"); + CERROR("LOV setup requires an MDC UUID\n"); RETURN(-EINVAL); } @@ -400,6 +422,10 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, lsm = *ea; + /* Can't create more stripes than we have targets (incl inactive). */ + if (lsm && lsm->lsm_stripe_count > lov->desc.ld_tgt_count) + GOTO(out_tmp, rc = -EINVAL); + /* Free the user lsm if it needs to be changed, to avoid memory leaks */ if (!lsm || (lsm && lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)) { @@ -494,7 +520,7 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, out_tmp: obdo_free(tmp); - return rc; + RETURN(rc); out_cleanup: while (i-- > 0) { @@ -547,6 +573,12 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { + int err; + if (lov->tgts[loi->loi_ost_idx].active == 0) { + /* Orphan clean up will (someday) fix this up. */ + continue; + } + memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; if (lfh) @@ -554,11 +586,15 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, sizeof(lfh->lfh_handles[i])); else tmp.o_valid &= ~OBD_MD_FLHANDLE; - rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL); - if (rc) - CERROR("Error destroying objid "LPX64" subobj "LPX64 - " on OST idx %d\n: rc = %d", - oa->o_id, loi->loi_id, loi->loi_ost_idx, rc); + err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, + NULL); + if (err && lov->tgts[loi->loi_ost_idx].active) { + CERROR("Error destroying objid "LPX64" subobj " + LPX64" on OST idx %d\n: rc = %d", + oa->o_id, loi->loi_id, loi->loi_ost_idx, err); + if (!rc) + rc = err; + } } RETURN(rc); } @@ -620,7 +656,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, struct lov_obd *lov; struct lov_oinfo *loi; struct lov_file_handles *lfh = NULL; - int rc = 0, i; + int i; int new = 1; ENTRY; @@ -649,6 +685,9 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, if (loi->loi_id == 0) continue; + if (lov->tgts[loi->loi_ost_idx].active == 0) + continue; + CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx " "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx); /* create data objects with "parent" OA */ @@ -661,17 +700,16 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, tmp.o_valid &= ~OBD_MD_FLHANDLE; err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL); - if (err) { + if (err && lov->tgts[loi->loi_ost_idx].active) { CERROR("Error getattr objid "LPX64" subobj "LPX64 " on OST idx %d: rc = %d\n", oa->o_id, loi->loi_id, loi->loi_ost_idx, err); - if (!rc) - rc = err; - continue; /* XXX or break? */ + RETURN(err); } lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new); } - RETURN(rc); + + RETURN(0); } static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, @@ -744,11 +782,12 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, static int lov_open(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *lsm) { - struct obdo *tmp; + struct obdo *tmp; /* on the heap here, on the stack in lov_close? */ struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; struct lov_file_handles *lfh = NULL; + struct lustre_handle *handle; int new = 1; int rc = 0, i; ENTRY; @@ -783,20 +822,22 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, oa->o_size = 0; oa->o_blocks = 0; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - int err; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + continue; + } /* create data objects with "parent" OA */ memcpy(tmp, oa, sizeof(*tmp)); tmp->o_id = loi->loi_id; - err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL); - if (err) { + rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL); + if (rc && lov->tgts[loi->loi_ost_idx].active) { CERROR("Error open objid "LPX64" subobj "LPX64 " on OST idx %d: rc = %d\n", oa->o_id, lsm->lsm_oinfo[i].loi_id, loi->loi_ost_idx, rc); - if (!rc) - rc = err; + goto out_handles; } lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new); @@ -806,31 +847,40 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, sizeof(lfh->lfh_handles[i])); } - if (tmp->o_valid & OBD_MD_FLHANDLE) { - struct lustre_handle *handle = obdo_handle(oa); + handle = obdo_handle(oa); + + lfh->lfh_count = lsm->lsm_stripe_count; + get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie)); + + handle->addr = (__u64)(unsigned long)lfh; + handle->cookie = lfh->lfh_cookie; + oa->o_valid |= OBD_MD_FLHANDLE; + list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head); - lfh->lfh_count = lsm->lsm_stripe_count; - get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie)); - - handle->addr = (__u64)(unsigned long)lfh; - handle->cookie = lfh->lfh_cookie; - oa->o_valid |= OBD_MD_FLHANDLE; - list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head); - } else - goto out_handles; - - /* FIXME: returning an error, but having opened some objects is a bad - * idea, since they will likely never be closed. We either - * need to not return an error if _some_ objects could be - * opened, and leave it to read/write to return -EIO (with - * hopefully partial error status) or close all opened objects - * and return an error. I think the former is preferred. - */ out_tmp: obdo_free(tmp); RETURN(rc); out_handles: + for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) { + int err; + + if (lov->tgts[loi->loi_ost_idx].active == 0) + continue; + + memcpy(tmp, oa, sizeof(*tmp)); + tmp->o_id = loi->loi_id; + memcpy(obdo_handle(tmp), &lfh->lfh_handles[i], + sizeof(lfh->lfh_handles[i])); + + err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL); + if (err) { + CERROR("Error closing objid "LPX64" subobj "LPX64 + " on OST idx %d after open error: rc = %d\n", + oa->o_id, loi->loi_id, loi->loi_ost_idx, err); + } + } + OBD_FREE(lfh->lfh_handles, lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles)); out_lfh: @@ -870,6 +920,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { int err; + + if (lov->tgts[loi->loi_ost_idx].active == 0) + continue; /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); @@ -1119,6 +1172,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm, RETURN(-EINVAL); } + /* XXX assert that we're not in recovery */ + if (!export || !export->exp_obd) RETURN(-ENODEV); @@ -1128,6 +1183,7 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm, struct ldlm_extent sub_ext; struct lov_stripe_md submd; + *flags = 0; sub_ext.start = lov_stripe_offset(lsm, extent->start, i); sub_ext.end = lov_stripe_offset(lsm, extent->end, i); if (sub_ext.start == sub_ext.end) diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 433d365..63c1ef0 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -37,7 +37,6 @@ static int mdc_reint(struct ptlrpc_request *request, int level) request->rq_level = level; rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) { CERROR("error in handling %d\n", rc); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index c409b3d..a9a5d9a 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -56,7 +56,6 @@ int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid) mds_pack_req_body(req); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); if (!rc) { body = lustre_msg_buf(req->rq_repmsg, 0); @@ -100,7 +99,6 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh, req->rq_replen = lustre_msg_size(2, size); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); out: RETURN(rc); @@ -136,7 +134,6 @@ int mdc_getattr(struct lustre_handle *conn, mds_pack_req_body(req); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); if (!rc) { body = lustre_msg_buf(req->rq_repmsg, 0); @@ -225,6 +222,8 @@ void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff, struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff); struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff); + DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64, + body->fid1.generation, body->fid1.id); memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid); } @@ -496,7 +495,6 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags, req->rq_replen = lustre_msg_size(1, size); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); if (!rc) { body = lustre_msg_buf(req->rq_repmsg, 0); mds_unpack_body(body); @@ -533,7 +531,6 @@ int mdc_close(struct lustre_handle *conn, obd_id ino, int type, req->rq_replen = lustre_msg_size(0, NULL); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); EXIT; out: @@ -580,7 +577,6 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset, req->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); if (rc) { ptlrpc_abort_bulk(desc); GOTO(out2, rc); @@ -611,7 +607,6 @@ static int mdc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs) req->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); if (rc) GOTO(out, rc); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index d78ad53..4f5f6e3 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -8,6 +8,7 @@ * Author: Peter Braam * Author: Andreas Dilger * Author: Phil Schwan + * Author: Mike Shaver * * This file is part of Lustre, http://www.lustre.org. * @@ -46,8 +47,9 @@ static kmem_cache_t *mds_file_cache; extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count, obd_uuid_t *uuidarray); extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc); -extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle, - struct ptlrpc_request *req); +extern void mds_start_transno(struct mds_obd *mds); +extern int mds_finish_transno(struct mds_obd *mds, void *handle, + struct ptlrpc_request *req, int rc); static int mds_cleanup(struct obd_device * obddev); extern struct lprocfs_vars status_var_nm_1[]; @@ -63,7 +65,7 @@ static int mds_bulk_timeout(void *data) struct ptlrpc_bulk_desc *desc = data; ENTRY; - CERROR("(not yet) starting recovery of client %p\n", desc->bd_client); + recovd_conn_fail(desc->bd_connection); RETURN(1); } @@ -113,7 +115,8 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, } lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc); - rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi); + rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, + &lwi); if (rc) { if (rc != -ETIMEDOUT) LBUG(); @@ -301,27 +304,53 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, CERROR("FYI: NULL mcd - simultaneous connects\n"); continue; } - if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) { + if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) { + /* XXX make handle-found-export a subroutine */ LASSERT(exp->exp_obd == obd); - if (!list_empty(&exp->exp_conn_chain)) { - CERROR("existing uuid/export, list not empty!\n"); - spin_unlock(&obd->obd_dev_lock); + spin_unlock(&obd->obd_dev_lock); + if (exp->exp_connection) { + struct lustre_handle *hdl; + hdl = &exp->exp_ldlm_data.led_import.imp_handle; + /* Might be a re-connect after a partition. */ + if (!memcmp(conn, hdl, sizeof *conn)) { + CERROR("%s reconnecting\n", cluuid); + conn->addr = (__u64) (unsigned long)exp; + conn->cookie = exp->exp_cookie; + rc = EALREADY; + } else { + CERROR("%s reconnecting from %s, " + "handle mismatch (ours %Lx/%Lx, " + "theirs %Lx/%Lx)\n", cluuid, + exp->exp_connection-> + c_remote_uuid, hdl->addr, + hdl->cookie, conn->addr, + conn->cookie); + /* XXX disconnect them here? */ + memset(conn, 0, sizeof *conn); + rc = -EALREADY; + } MOD_DEC_USE_COUNT; - RETURN(-EALREADY); + RETURN(rc); } conn->addr = (__u64) (unsigned long)exp; conn->cookie = exp->exp_cookie; - spin_unlock(&obd->obd_dev_lock); CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", cluuid, exp); CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n", (long long)conn->addr, (long long)conn->cookie); - MOD_DEC_USE_COUNT; RETURN(0); } } spin_unlock(&obd->obd_dev_lock); + + if (obd->u.mds.mds_recoverable_clients != 0) { + CERROR("denying connection for new client %s: in recovery\n", + cluuid); + MOD_DEC_USE_COUNT; + RETURN(-EBUSY); + } + /* XXX There is a small race between checking the list and adding a * new connection for the same UUID, but the real threat (list * corruption when multiple different clients connect) is solved. @@ -351,7 +380,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, INIT_LIST_HEAD(&med->med_open_head); spin_lock_init(&med->med_open_lock); - rc = mds_client_add(med, -1); + rc = mds_client_add(&obd->u.mds, med, -1); if (rc) GOTO(out_mcd, rc); @@ -836,13 +865,16 @@ static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req, uc.ouc_fsgid = body->fsgid; uc.ouc_cap = body->capability; push_ctxt(&saved, &mds->mds_ctxt, &uc); + mds_start_transno(mds); handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR); - if (!handle) - GOTO(out_ea, rc = -ENOMEM); + if (IS_ERR(handle)) { + rc = PTR_ERR(handle); + mds_finish_transno(mds, handle, req, rc); + GOTO(out_ea, rc); + } rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size); - if (!rc) - rc = mds_update_last_rcvd(mds, handle, req); + rc = mds_finish_transno(mds, handle, req, rc); rc2 = mds_fs_commit(mds, inode, handle); if (rc2 && !rc) @@ -1058,9 +1090,162 @@ int mds_reint(struct ptlrpc_request *req, int offset) return rc; } +/* forward declaration */ +int mds_handle(struct ptlrpc_request *req); + +static int check_for_next_transno(struct mds_obd *mds) +{ + struct ptlrpc_request *req; + req = list_entry(mds->mds_recovery_queue.next, + struct ptlrpc_request, rq_list); + return req->rq_reqmsg->transno == mds->mds_next_recovery_transno; +} + +static void process_recovery_queue(struct mds_obd *mds) +{ + struct ptlrpc_request *req; + + for (;;) { + spin_lock(&mds->mds_processing_task_lock); + req = list_entry(mds->mds_recovery_queue.next, + struct ptlrpc_request, rq_list); + + if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) { + spin_unlock(&mds->mds_processing_task_lock); + wait_event(mds->mds_next_transno_waitq, + check_for_next_transno(mds)); + continue; + } + list_del(&req->rq_list); + spin_unlock(&mds->mds_processing_task_lock); + + DEBUG_REQ(D_HA, req, ""); + mds_handle(req); + + if (list_empty(&mds->mds_recovery_queue)) + break; + } +} + +static int queue_recovery_request(struct ptlrpc_request *req, + struct mds_obd *mds) +{ + struct list_head *tmp; + int inserted = 0, transno = req->rq_reqmsg->transno; + + if (!transno) { + DEBUG_REQ(D_HA, req, "not queueing"); + return 1; + } + + spin_lock(&mds->mds_processing_task_lock); + + if (mds->mds_processing_task == current->pid) { + /* Processing the queue right now, don't re-add. */ + spin_unlock(&mds->mds_processing_task_lock); + return 1; + } + + /* XXX O(n^2) */ + list_for_each(tmp, &mds->mds_recovery_queue) { + struct ptlrpc_request *reqiter = + list_entry(tmp, struct ptlrpc_request, rq_list); + if (reqiter->rq_reqmsg->transno > transno) { + list_add_tail(&req->rq_list, &reqiter->rq_list); + inserted = 1; + break; + } + } + + if (!inserted) + list_add_tail(&req->rq_list, &mds->mds_recovery_queue); + + if (mds->mds_processing_task != 0) { + /* Someone else is processing this queue, we'll leave it to + * them. + */ + spin_unlock(&mds->mds_processing_task_lock); + if (transno == mds->mds_next_recovery_transno) + wake_up(&mds->mds_next_transno_waitq); + return 0; + } + + /* Nobody is processing, and we know there's (at least) one to process + * now, so we'll do the honours. + */ + mds->mds_processing_task = current->pid; + spin_unlock(&mds->mds_processing_task_lock); + + process_recovery_queue(mds); + return 0; +} + +static int filter_recovery_request(struct ptlrpc_request *req, + struct mds_obd *mds, int *process) +{ + switch (req->rq_reqmsg->opc) { + case MDS_CONNECT: + case MDS_DISCONNECT: + case MDS_OPEN: + *process = 1; + RETURN(0); + + case MDS_GETSTATUS: /* used in unmounting */ + case MDS_REINT: + case LDLM_ENQUEUE: + *process = queue_recovery_request(req, mds); + RETURN(0); + + default: + DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); + *process = 0; + RETURN(ptlrpc_error(req->rq_svc, req)); + } +} + +static int mds_queue_final_reply(struct ptlrpc_request *req, int rc) +{ + struct mds_obd *mds = mds_req2mds(req); + + if (rc) { + /* Just like ptlrpc_error, but without the sending. */ + lustre_pack_msg(0, NULL, NULL, &req->rq_replen, + &req->rq_repmsg); + req->rq_type = PTL_RPC_MSG_ERR; + } + + list_add(&req->rq_list, &mds->mds_delayed_reply_queue); + if (--mds->mds_recoverable_clients == 0) { + struct list_head *tmp, *n; + + CDEBUG(D_HA, + "all clients recovered, sending delayed replies\n"); + list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + DEBUG_REQ(D_HA, req, "delayed:"); + ptlrpc_reply(req->rq_svc, req); + } + } else { + CDEBUG(D_HA, "%d recoverable clients remain\n", + mds->mds_recoverable_clients); + } + + return 1; +} + +static char *reint_names[] = { + [REINT_SETATTR] "setattr", + [REINT_CREATE] "create", + [REINT_LINK] "link", + [REINT_UNLINK] "unlink", + [REINT_RENAME] "rename" +}; + int mds_handle(struct ptlrpc_request *req) { int rc; + int should_process; + struct mds_obd *mds = NULL; /* quell gcc overwarning */ ENTRY; rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); @@ -1069,49 +1254,67 @@ int mds_handle(struct ptlrpc_request *req) GOTO(out, rc); } - if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL) - GOTO(out, rc = -ENOTCONN); - LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME)); + if (req->rq_reqmsg->opc != MDS_CONNECT) { + if (req->rq_export == NULL) + GOTO(out, rc = -ENOTCONN); + + mds = mds_req2mds(req); + if (mds->mds_recoverable_clients != 0) { + rc = filter_recovery_request(req, mds, &should_process); + if (rc || !should_process) + RETURN(rc); + } + } + switch (req->rq_reqmsg->opc) { case MDS_CONNECT: - CDEBUG(D_INODE, "connect\n"); + DEBUG_REQ(D_INODE, req, "connect"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0); rc = target_handle_connect(req); + /* Make sure that last_rcvd is correct. */ + if (!rc) { + /* Now that we have an export, set mds. */ + mds = mds_req2mds(req); + mds_fsync_super(mds->mds_sb); + } break; case MDS_DISCONNECT: - CDEBUG(D_INODE, "disconnect\n"); + DEBUG_REQ(D_INODE, req, "disconnect"); OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0); rc = target_handle_disconnect(req); + /* Make sure that last_rcvd is correct. */ + if (!rc) + mds_fsync_super(mds->mds_sb); goto out; case MDS_GETSTATUS: - CDEBUG(D_INODE, "getstatus\n"); + DEBUG_REQ(D_INODE, req, "getstatus"); OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0); rc = mds_getstatus(req); break; case MDS_GETLOVINFO: - CDEBUG(D_INODE, "getlovinfo\n"); + DEBUG_REQ(D_INODE, req, "getlovinfo"); rc = mds_getlovinfo(req); break; case MDS_GETATTR: - CDEBUG(D_INODE, "getattr\n"); + DEBUG_REQ(D_INODE, req, "getattr"); OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0); rc = mds_getattr(0, req); break; case MDS_STATFS: - CDEBUG(D_INODE, "statfs\n"); + DEBUG_REQ(D_INODE, req, "statfs"); OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0); rc = mds_statfs(req); break; case MDS_READPAGE: - CDEBUG(D_INODE, "readpage\n"); + DEBUG_REQ(D_INODE, req, "readpage\n"); OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0); rc = mds_readpage(req); @@ -1121,7 +1324,13 @@ int mds_handle(struct ptlrpc_request *req) case MDS_REINT: { int size = sizeof(struct mds_body); - CDEBUG(D_INODE, "reint\n"); + int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0), + realopc = opc & REINT_OPCODE_MASK; + + DEBUG_REQ(D_INODE, req, "reint (%s%s)", + reint_names[realopc], + opc & REINT_REPLAYING ? "|REPLAYING" : ""); + OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0); rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, @@ -1136,30 +1345,30 @@ int mds_handle(struct ptlrpc_request *req) } case MDS_OPEN: - CDEBUG(D_INODE, "open\n"); + DEBUG_REQ(D_INODE, req, "open"); OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0); rc = mds_open(req); break; case MDS_CLOSE: - CDEBUG(D_INODE, "close\n"); + DEBUG_REQ(D_INODE, req, "close"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0); rc = mds_close(req); break; case LDLM_ENQUEUE: - CDEBUG(D_INODE, "enqueue\n"); + DEBUG_REQ(D_INODE, req, "enqueue"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); rc = ldlm_handle_enqueue(req); break; case LDLM_CONVERT: - CDEBUG(D_INODE, "convert\n"); + DEBUG_REQ(D_INODE, req, "convert"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); rc = ldlm_handle_convert(req); break; case LDLM_BL_CALLBACK: case LDLM_CP_CALLBACK: - CDEBUG(D_INODE, "callback\n"); + DEBUG_REQ(D_INODE, req, "callback"); CERROR("callbacks should not happen on MDS\n"); LBUG(); OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); @@ -1173,7 +1382,6 @@ int mds_handle(struct ptlrpc_request *req) if (!rc) { struct mds_export_data *med = &req->rq_export->exp_mds_data; - struct mds_obd *mds = mds_req2mds(req); req->rq_repmsg->last_xid = HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid)); @@ -1185,7 +1393,17 @@ int mds_handle(struct ptlrpc_request *req) cpu_to_le32(req->rq_xid)); } out: - if (rc) { + + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { + struct mds_obd *mds = mds_req2mds(req); + LASSERT(mds->mds_recoverable_clients); + DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); + return mds_queue_final_reply(req, rc); + } + + /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */ + if (rc && (req->rq_reqmsg->opc != MDS_CONNECT || + rc != EALREADY)) { CERROR("mds: processing error (opcode %d): %d\n", req->rq_reqmsg->opc, rc); ptlrpc_error(req->rq_svc, req); @@ -1205,7 +1423,6 @@ int mds_handle(struct ptlrpc_request *req) * * Also assumes for mds_last_rcvd that we are not modifying it (no locking). */ -static int mds_update_server_data(struct mds_obd *mds) { struct mds_server_data *msd = mds->mds_server_data; @@ -1238,12 +1455,14 @@ int mds_update_server_data(struct mds_obd *mds) } /* Do recovery actions for the MDS */ -static int mds_recover(struct obd_device *obddev) +static int mds_recovery_complete(struct obd_device *obddev) { struct mds_obd *mds = &obddev->u.mds; struct obd_run_ctxt saved; int rc; + LASSERT(mds->mds_recoverable_clients == 0); + /* This happens at the end when recovery is complete */ ++mds->mds_mount_count; push_ctxt(&saved, &mds->mds_ctxt, NULL); @@ -1283,7 +1502,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) if (!mds->mds_sb) GOTO(err_put, rc = -ENODEV); - spin_lock_init(&mds->mds_last_lock); + init_MUTEX(&mds->mds_transno_sem); mds->mds_max_mdsize = sizeof(struct lov_mds_md); rc = mds_fs_setup(obddev, mnt); if (rc) { @@ -1298,14 +1517,14 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_fs, rc = -ENOMEM); } - - rc = mds_recover(obddev); - if (rc) - GOTO(err_fs, rc); - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, "mds_ldlm_client", &obddev->obd_ldlm_client); + spin_lock_init(&mds->mds_processing_task_lock); + mds->mds_processing_task = 0; + INIT_LIST_HEAD(&mds->mds_recovery_queue); + INIT_LIST_HEAD(&mds->mds_delayed_reply_queue); + RETURN(0); err_fs: diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index 3975f4f..9bba857 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -37,17 +37,21 @@ struct mds_fs_type { static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS]; +#define LAST_RCVD "last_rcvd" + /* Add client data to the MDS. We use a bitmap to locate a free space * in the last_rcvd file if cl_off is -1 (i.e. a new client). * Otherwise, we have just read the data from the last_rcvd file and * we know its offset. */ -int mds_client_add(struct mds_export_data *med, int cl_off) +int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off) { + int new_client = (cl_off == -1); + /* the bitmap operations can handle cl_off > sizeof(long) * 8, so * there's no need for extra complication here */ - if (cl_off == -1) { + if (new_client) { cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS); repeat: if (cl_off >= MDS_MAX_CLIENTS) { @@ -73,12 +77,35 @@ int mds_client_add(struct mds_export_data *med, int cl_off) cl_off, med->med_mcd->mcd_uuid); med->med_off = cl_off; + + if (new_client) { + struct obd_run_ctxt saved; + loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE); + ssize_t written; + + push_ctxt(&saved, &mds->mds_ctxt, NULL); + written = lustre_fwrite(mds->mds_rcvd_filp, + (char *)med->med_mcd, + sizeof(*med->med_mcd), &off); + pop_ctxt(&saved); + + if (written != sizeof(*med->med_mcd)) { + if (written < 0) + RETURN(written); + RETURN(-EIO); + } + } return 0; } int mds_client_free(struct obd_export *exp) { struct mds_export_data *med = &exp->exp_mds_data; + struct mds_obd *mds = &exp->exp_obd->u.mds; + struct mds_client_data zero_mcd; + struct obd_run_ctxt saved; + int written; + loff_t off; if (!med->med_mcd) RETURN(0); @@ -92,6 +119,24 @@ int mds_client_free(struct obd_export *exp) LBUG(); } + off = med->med_off; + + memset(&zero_mcd, 0, sizeof zero_mcd); + push_ctxt(&saved, &mds->mds_ctxt, NULL); + written = lustre_fwrite(mds->mds_rcvd_filp, (const char *)&zero_mcd, + sizeof zero_mcd, &off); + pop_ctxt(&saved); + + if (written != sizeof zero_mcd) { + CERROR("error zeroing out client %s off %d in %s: %d\n", + med->med_mcd->mcd_uuid, med->med_off, LAST_RCVD, + written); + LBUG(); + } else { + CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n", + med->med_mcd->mcd_uuid, med->med_off); + } + OBD_FREE(med->med_mcd, sizeof(*med->med_mcd)); return 0; @@ -105,19 +150,16 @@ static int mds_server_free_data(struct mds_obd *mds) return 0; } -#define LAST_RCVD "last_rcvd" - static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f) { struct mds_obd *mds = &obddev->u.mds; struct mds_server_data *msd; struct mds_client_data *mcd = NULL; - loff_t fsize = f->f_dentry->d_inode->i_size; loff_t off = 0; int cl_off; + int max_off = f->f_dentry->d_inode->i_size / sizeof(*mcd); __u64 last_rcvd = 0; __u64 last_mount; - int clients = 0; int rc = 0; OBD_ALLOC(msd, sizeof(*msd)); @@ -154,9 +196,11 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f) CDEBUG(D_INODE, "got %Lu for server last_mount value\n", (unsigned long long)last_mount); - for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd); - off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd); - off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) { + for (off = MDS_LR_CLIENT, cl_off = 0; + off < max_off; + off += MDS_LR_SIZE, cl_off++) { + int mount_age; + if (!mcd) { OBD_ALLOC(mcd, sizeof(*mcd)); if (!mcd) @@ -172,13 +216,19 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f) break; } + if (mcd->mcd_uuid[0] == '\0') { + CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", + cl_off); + continue; + } + last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd); /* The exports are cleaned up by mds_disconnect, so they * need to be set up like real exports also. */ - if (last_rcvd && (last_mount - le64_to_cpu(mcd->mcd_mount_count) - < MDS_MOUNT_RECOV)) { + mount_age = last_mount - le64_to_cpu(mcd->mcd_mount_count); + if (last_rcvd && mount_age < MDS_MOUNT_RECOV) { struct obd_export *exp = class_new_export(obddev); struct mds_export_data *med; @@ -189,17 +239,17 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f) med = &exp->exp_mds_data; med->med_mcd = mcd; - mds_client_add(med, cl_off); + mds_client_add(mds, med, cl_off); /* XXX put this in a helper if it gets more complex */ INIT_LIST_HEAD(&med->med_open_head); spin_lock_init(&med->med_open_lock); mcd = NULL; - clients++; + mds->mds_recoverable_clients++; MOD_INC_USE_COUNT; } else { CDEBUG(D_INFO, - "ignored client %d, UUID '%s', last_mount %Ld\n", + "discarded client %d, UUID '%s', count %Ld\n", cl_off, mcd->mcd_uuid, (long long)le64_to_cpu(mcd->mcd_mount_count)); } @@ -211,15 +261,16 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f) mds->mds_last_rcvd = last_rcvd; } } - CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d/%d clients\n", - (unsigned long long)mds->mds_last_rcvd, clients, cl_off); + + mds->mds_last_committed = mds->mds_last_rcvd; + if (mds->mds_recoverable_clients) { + CERROR("need recovery: %d recoverable clients, last_rcvd %Lu\n", + mds->mds_recoverable_clients, mds->mds_last_rcvd); + } if (mcd) OBD_FREE(mcd, sizeof(*mcd)); - /* After recovery, there can be no local uncommitted transactions */ - mds->mds_last_committed = mds->mds_last_rcvd; - return 0; err_msd: diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 854b357..f158bc2 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -39,43 +39,57 @@ extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req); +void mds_start_transno(struct mds_obd *mds) +{ + ENTRY; + down(&mds->mds_transno_sem); +} + /* Assumes caller has already pushed us into the kernel context. */ -int mds_update_last_rcvd(struct mds_obd *mds, void *handle, - struct ptlrpc_request *req) +int mds_finish_transno(struct mds_obd *mds, void *handle, + struct ptlrpc_request *req, int rc) { struct mds_export_data *med = &req->rq_export->exp_mds_data; struct mds_client_data *mcd = med->med_mcd; __u64 last_rcvd; loff_t off; - int rc; + ssize_t written; + + /* Propagate error code. */ + if (rc) + goto out; /* we don't allocate new transnos for replayed requests */ - if (req->rq_level == LUSTRE_CONN_RECOVD) - RETURN(0); + if (req->rq_level == LUSTRE_CONN_RECOVD) { + rc = 0; + goto out; + } off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE; - spin_lock(&mds->mds_last_lock); last_rcvd = ++mds->mds_last_rcvd; - spin_unlock(&mds->mds_last_lock); req->rq_repmsg->transno = HTON__u64(last_rcvd); mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd); mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count); mcd->mcd_last_xid = cpu_to_le64(req->rq_xid); mds_fs_set_last_rcvd(mds, handle); - rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off); - CDEBUG(D_INODE, "wrote trans #"LPD64" for client '%s' at #%d: rc = " - "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, rc); + written = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), + &off); + CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = " + "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, written); - if (rc == sizeof(*mcd)) - rc = 0; - else { - CERROR("error writing to last_rcvd file: rc = %d\n", rc); - if (rc >= 0) - rc = -EIO; - } + if (written == sizeof(*mcd)) + GOTO(out, rc = 0); + CERROR("error writing to last_rcvd file: rc = %d\n", rc); + if (written >= 0) + GOTO(out, rc = -EIO); + + rc = 0; + out: + EXIT; + up(&mds->mds_transno_sem); return rc; } @@ -129,9 +143,13 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, to_kdev_t(inode->i_sb->s_dev)); + mds_start_transno(mds); handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR); - if (!handle) - GOTO(out_setattr_de, rc = PTR_ERR(handle)); + if (IS_ERR(handle)) { + rc = PTR_ERR(handle); + (void)mds_finish_transno(mds, handle, req, rc); + GOTO(out_setattr_de, rc); + } rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr); @@ -141,8 +159,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, mds_pack_inode2body(body, inode); } - if (!rc) - rc = mds_update_last_rcvd(mds, handle, req); + rc = mds_finish_transno(mds, handle, req, rc); err = mds_fs_commit(mds, de->d_inode, handle); if (err) { @@ -238,27 +255,34 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, rec->ur_mode |= S_ISGID; } + /* From here on, we must exit via a path that calls mds_finish_transno, + * so that we release the mds_transno_sem (and, in the case of success, + * update the transno correctly). out_create_commit and + * out_transno_dchild are good candidates. + */ + mds_start_transno(mds); + switch (type) { case S_IFREG:{ handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE); - if (!handle) - GOTO(out_create_dchild, PTR_ERR(handle)); + if (IS_ERR(handle)) + GOTO(out_transno_dchild, rc = PTR_ERR(handle)); rc = vfs_create(dir, dchild, rec->ur_mode); EXIT; break; } case S_IFDIR:{ handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR); - if (!handle) - GOTO(out_create_dchild, PTR_ERR(handle)); + if (IS_ERR(handle)) + GOTO(out_transno_dchild, rc = PTR_ERR(handle)); rc = vfs_mkdir(dir, dchild, rec->ur_mode); EXIT; break; } case S_IFLNK:{ handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK); - if (!handle) - GOTO(out_create_dchild, PTR_ERR(handle)); + if (IS_ERR(handle)) + GOTO(out_transno_dchild, rc = PTR_ERR(handle)); rc = vfs_symlink(dir, dchild, rec->ur_name); EXIT; break; @@ -269,15 +293,16 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, case S_IFSOCK:{ int rdev = rec->ur_rdev; handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD); - if (!handle) - GOTO(out_create_dchild, PTR_ERR(handle)); + if (IS_ERR(handle)) + GOTO(out_transno_dchild, rc = PTR_ERR(handle)); rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev); EXIT; break; } default: CERROR("bad file type %o creating %s\n", type, rec->ur_name); - GOTO(out_create_dchild, rc = -EINVAL); + handle = NULL; /* quell uninitialized warning */ + GOTO(out_transno_dchild, rc = -EINVAL); } if (rc) { @@ -299,7 +324,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (rec->ur_fid2->id) { LASSERT(rec->ur_opcode & REINT_REPLAYING); inode->i_generation = rec->ur_fid2->generation; - /* Dirtied and committed by this setattr: */ + /* Dirtied and committed by the upcoming setattr. */ CDEBUG(D_INODE, "recreated ino %ld with gen %ld\n", inode->i_ino, inode->i_generation); } else { @@ -312,18 +337,19 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, /* XXX should we abort here in case of error? */ } - rc = mds_update_last_rcvd(mds, handle, req); - if (rc) { - CERROR("error on mds_update_last_rcvd: rc = %d\n", rc); - GOTO(out_create_unlink, rc); - } - body = lustre_msg_buf(req->rq_repmsg, offset); mds_pack_inode2fid(&body->fid1, inode); mds_pack_inode2body(body, inode); } EXIT; out_create_commit: + if (rc) { + rc = mds_finish_transno(mds, handle, req, rc); + } else { + rc = mds_finish_transno(mds, handle, req, rc); + if (rc) + GOTO(out_create_unlink, rc); + } err = mds_fs_commit(mds, dir, handle); if (err) { CERROR("error on commit: err = %d\n", err); @@ -340,6 +366,12 @@ out_create: req->rq_status = rc; return 0; +out_transno_dchild: + /* Need to release the transno lock, and then put the dchild. */ + LASSERT(rc); + mds_finish_transno(mds, handle, req, rc); + goto out_create_dchild; + out_create_unlink: /* Destroy the file we just created. This should not need extra * journal credits, as we have already modified all of the blocks @@ -431,11 +463,12 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, to_kdev_t(dir->i_sb->s_dev)); + mds_start_transno(mds); switch (rec->ur_mode /* & S_IFMT ? */) { case S_IFDIR: handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR); - if (!handle) - GOTO(out_unlink_cancel, rc = PTR_ERR(handle)); + if (IS_ERR(handle)) + GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle)); rc = vfs_rmdir(dir, dchild); break; case S_IFREG: @@ -449,19 +482,18 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, case S_IFIFO: case S_IFSOCK: handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK); - if (!handle) - GOTO(out_unlink_cancel, rc = PTR_ERR(handle)); + if (IS_ERR(handle)) + GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle)); rc = vfs_unlink(dir, dchild); break; default: CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name); handle = NULL; LBUG(); - GOTO(out_unlink_cancel, rc = -EINVAL); + GOTO(out_unlink_cancel_transno, rc = -EINVAL); } - if (!rc) - rc = mds_update_last_rcvd(mds, handle, req); + rc = mds_finish_transno(mds, handle, req, rc); err = mds_fs_commit(mds, dir, handle); if (err) { CERROR("error on commit: err = %d\n", err); @@ -487,6 +519,10 @@ out_unlink: l_dput(de); req->rq_status = rc; return 0; + +out_unlink_cancel_transno: + rc = mds_finish_transno(mds, handle, req, rc); + goto out_unlink_cancel; } static int mds_reint_link(struct mds_update_record *rec, int offset, @@ -589,15 +625,18 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE, to_kdev_t(de_src->d_inode->i_sb->s_dev)); + mds_start_transno(mds); handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK); - if (!handle) - GOTO(out_link_dchild, rc = PTR_ERR(handle)); + if (IS_ERR(handle)) { + rc = PTR_ERR(handle); + mds_finish_transno(mds, handle, req, rc); + GOTO(out_link_dchild, rc); + } rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild); if (rc) CERROR("link error %d\n", rc); - if (!rc) - rc = mds_update_last_rcvd(mds, handle, req); + rc = mds_finish_transno(mds, handle, req, rc); err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle); if (err) { @@ -720,16 +759,20 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE, to_kdev_t(de_srcdir->d_inode->i_sb->s_dev)); + mds_start_transno(mds); handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME); - if (!handle) - GOTO(out_rename_denew, rc = PTR_ERR(handle)); + if (IS_ERR(handle)) { + rc = PTR_ERR(handle); + mds_finish_transno(mds, handle, req, rc); + GOTO(out_rename_denew, rc); + } + lock_kernel(); rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new, NULL); unlock_kernel(); - if (!rc) - rc = mds_update_last_rcvd(mds, handle, req); + rc = mds_finish_transno(mds, handle, req, rc); err = mds_fs_commit(mds, de_tgtdir->d_inode, handle); if (err) { diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index cc10150..60816e7 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -332,29 +332,29 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, INIT_LIST_HEAD(&obd->obd_imports); spin_lock_init(&obd->obd_dev_lock); - if (data->ioc_inlbuf2) { - int len = strlen(data->ioc_inlbuf2) + 1; - OBD_ALLOC(obd->obd_name, len); - if (!obd->obd_name) { - CERROR("no memory\n"); - LBUG(); - } - memcpy(obd->obd_name, data->ioc_inlbuf2, len); - } else { - CERROR("WARNING: unnamed obd device\n"); + if (data->ioc_inlbuf2) { + int len = strlen(data->ioc_inlbuf2) + 1; + OBD_ALLOC(obd->obd_name, len); + if (!obd->obd_name) { + CERROR("no memory\n"); + LBUG(); } - if (data->ioc_inlbuf3) { - int len = strlen(data->ioc_inlbuf3); - if (len >= sizeof(obd->obd_uuid)) { - CERROR("uuid must be < %d bytes long\n", - sizeof(obd->obd_uuid)); - if (obd->obd_name) - OBD_FREE(obd->obd_name, - strlen(obd->obd_name) + 1); - GOTO(out, err=-EINVAL); - } - memcpy(obd->obd_uuid, data->ioc_inlbuf3, len); + memcpy(obd->obd_name, data->ioc_inlbuf2, len); + } else { + CERROR("WARNING: unnamed obd device\n"); + } + if (data->ioc_inlbuf3) { + int len = strlen(data->ioc_inlbuf3); + if (len >= sizeof(obd->obd_uuid)) { + CERROR("uuid must be < %d bytes long\n", + sizeof(obd->obd_uuid)); + if (obd->obd_name) + OBD_FREE(obd->obd_name, + strlen(obd->obd_name) + 1); + GOTO(out, err=-EINVAL); } + memcpy(obd->obd_uuid, data->ioc_inlbuf3, len); + } /* do the attach */ if (OBP(obd, attach)) err = OBP(obd,attach)(obd, sizeof(*data), data); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 8d6fa4c..f31a97a 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -408,7 +408,7 @@ void class_disconnect_all(struct obd_device *obddev) spin_unlock(&obddev->obd_dev_lock); CERROR("force disconnecting %s:%s export %p\n", export->exp_obd->obd_type->typ_name, - export->exp_uuid, export); + export->exp_connection->c_remote_uuid, export); rc = obd_disconnect(&conn); if (rc < 0) { /* AED: not so sure about this... We can't diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 68918cc..43ae0ca 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -136,7 +136,6 @@ static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) { CERROR("%s failed: rc = %d\n", __FUNCTION__, rc); GOTO(out, rc); @@ -173,7 +172,6 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) GOTO(out, rc); @@ -208,7 +206,6 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) GOTO(out, rc); @@ -242,7 +239,6 @@ static int osc_setattr(struct lustre_handle *conn, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); ptlrpc_req_finished(request); return rc; @@ -278,7 +274,6 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) GOTO(out_req, rc); @@ -328,7 +323,6 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) GOTO(out, rc); @@ -365,7 +359,6 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) GOTO(out, rc); @@ -422,8 +415,8 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm, obd_count page_count, struct brw_page *pga, struct obd_brw_set *set) { - struct ptlrpc_connection *connection = - client_conn2cli(conn)->cl_import.imp_connection; + struct obd_import *imp = class_conn2cliimp(conn); + struct ptlrpc_connection *connection = imp->imp_connection; struct ptlrpc_request *request = NULL; struct ptlrpc_bulk_desc *desc = NULL; struct ost_body *body; @@ -435,8 +428,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm, size[1] = sizeof(struct obd_ioobj); size[2] = page_count * sizeof(struct niobuf_remote); - request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size, - NULL); + request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL); if (!request) RETURN(-ENOMEM); @@ -454,9 +446,9 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm, ost_pack_ioo(&iooptr, lsm, page_count); /* end almost identical to brw_write case */ - spin_lock(&connection->c_lock); - xid = ++connection->c_xid_out; /* single xid for all pages */ - spin_unlock(&connection->c_lock); + spin_lock(&imp->imp_lock); + xid = ++imp->imp_last_xid; /* single xid for all pages */ + spin_unlock(&imp->imp_lock); obd_kmap_get(page_count, 0); @@ -495,7 +487,6 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm, request->rq_replen = lustre_msg_size(1, size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); /* * XXX: If there is an error during the processing of the callback, @@ -584,7 +575,6 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md, size[1] = page_count * sizeof(*remote); request->rq_replen = lustre_msg_size(2, size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) GOTO(out_unmap, rc); @@ -766,7 +756,6 @@ static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs) request->rq_replen = lustre_msg_size(1, &size); rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); if (rc) { CERROR("%s failed: rc = %d\n", __FUNCTION__, rc); GOTO(out, rc); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 228115f..ced4655 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -215,7 +215,7 @@ static int ost_bulk_timeout(void *data) struct ptlrpc_bulk_desc *desc = data; ENTRY; - CERROR("(not yet) starting recovery of client %p\n", desc->bd_client); + recovd_conn_fail(desc->bd_connection); RETURN(1); } diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 28f1a5c..8f4aceb 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -299,9 +299,9 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, */ atomic_set(&request->rq_refcount, 2); - spin_lock(&conn->c_lock); - request->rq_xid = HTON__u32(++conn->c_xid_out); - spin_unlock(&conn->c_lock); + spin_lock(&imp->imp_lock); + request->rq_xid = HTON__u32(++imp->imp_last_xid); + spin_unlock(&imp->imp_lock); request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC; request->rq_reqmsg->version = PTLRPC_MSG_VERSION; @@ -312,19 +312,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, RETURN(request); } -void ptlrpc_req_finished(struct ptlrpc_request *request) -{ - if (request == NULL) - return; - - if (atomic_dec_and_test(&request->rq_refcount)) - ptlrpc_free_req(request); - else - DEBUG_REQ(D_INFO, request, "refcount now %u", - atomic_read(&request->rq_refcount)); -} - -void ptlrpc_free_req(struct ptlrpc_request *request) +static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) { ENTRY; if (request == NULL) { @@ -351,10 +339,12 @@ void ptlrpc_free_req(struct ptlrpc_request *request) request->rq_reqmsg = NULL; } - if (request->rq_connection) { - spin_lock(&request->rq_connection->c_lock); + if (request->rq_import) { + if (!locked) + spin_lock(&request->rq_import->imp_lock); list_del_init(&request->rq_list); - spin_unlock(&request->rq_connection->c_lock); + if (!locked) + spin_unlock(&request->rq_import->imp_lock); } ptlrpc_put_connection(request->rq_connection); @@ -362,62 +352,87 @@ void ptlrpc_free_req(struct ptlrpc_request *request) EXIT; } +void ptlrpc_free_req(struct ptlrpc_request *request) +{ + __ptlrpc_free_req(request, 0); +} + +static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked) +{ + ENTRY; + if (request == NULL) + RETURN(1); + + if (atomic_dec_and_test(&request->rq_refcount)) { + __ptlrpc_free_req(request, locked); + RETURN(1); + } + + DEBUG_REQ(D_INFO, request, "refcount now %u", + atomic_read(&request->rq_refcount)); + RETURN(0); +} + +void ptlrpc_req_finished(struct ptlrpc_request *request) +{ + __ptlrpc_req_finished(request, 0); +} + static int ptlrpc_check_reply(struct ptlrpc_request *req) { int rc = 0; if (req->rq_repmsg != NULL) { - struct ptlrpc_connection *conn = req->rq_import->imp_connection; + struct obd_import *imp = req->rq_import; + struct ptlrpc_connection *conn = imp->imp_connection; + ENTRY; if (req->rq_level > conn->c_level) { - CDEBUG(D_HA, - "rep to xid "LPD64" op %d to %s:%d: " - "recovery started, ignoring (%d > %d)\n", - (unsigned long long)req->rq_xid, - req->rq_reqmsg->opc, conn->c_remote_uuid, - req->rq_import->imp_client->cli_request_portal, + DEBUG_REQ(D_HA, req, + "recovery started, ignoring (%d > %d)", req->rq_level, conn->c_level); req->rq_repmsg = NULL; GOTO(out, rc = 0); } req->rq_transno = NTOH__u64(req->rq_repmsg->transno); + spin_lock(&imp->imp_lock); + if (req->rq_transno > imp->imp_max_transno) { + imp->imp_max_transno = req->rq_transno; + } else if (req->rq_transno != 0) { + if (conn->c_level == LUSTRE_CONN_FULL) { + CERROR("got transno "LPD64" after " + LPD64": recovery may not work\n", + req->rq_transno, imp->imp_max_transno); + } + } + spin_unlock(&imp->imp_lock); req->rq_flags |= PTL_RPC_FL_REPLIED; GOTO(out, rc = 1); } if (req->rq_flags & PTL_RPC_FL_RESEND) { - CERROR("-- RESTART --\n"); + DEBUG_REQ(D_ERROR, req, "RESEND:"); GOTO(out, rc = 1); } if (req->rq_flags & PTL_RPC_FL_ERR) { - CERROR("-- ABORTED --\n"); + DEBUG_REQ(D_ERROR, req, "ABORTED:"); GOTO(out, rc = 1); } + if (req->rq_flags & PTL_RPC_FL_RESTART) { + DEBUG_REQ(D_ERROR, req, "RESTART:"); + GOTO(out, rc = 1); + } out: - CDEBUG(D_NET, "req = %p, rc = %d\n", req, rc); + DEBUG_REQ(D_NET, req, "rc = %d for", rc); return rc; } -int ptlrpc_check_status(struct ptlrpc_request *req, int err) +static int ptlrpc_check_status(struct ptlrpc_request *req) { + int err; ENTRY; - if (err != 0) { - CERROR("err is %d\n", err); - RETURN(err); - } - - if (req == NULL) { - CERROR("req == NULL\n"); - RETURN(-ENOMEM); - } - - if (req->rq_repmsg == NULL) { - CERROR("req->rq_repmsg == NULL\n"); - RETURN(-ENOMEM); - } - err = req->rq_repmsg->status; if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) { CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n"); @@ -426,14 +441,12 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err) if (err != 0) { if (err < 0) - CERROR("req->rq_repmsg->status is %d\n", err); + CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n", err); else CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n", err); - /* XXX: translate this error from net to host */ - RETURN(err); } - RETURN(0); + RETURN(err); } static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request) @@ -455,14 +468,13 @@ static int ptlrpc_abort(struct ptlrpc_request *request) return 0; } -/* caller must hold conn->c_lock */ -void ptlrpc_free_committed(struct ptlrpc_connection *conn) +/* caller must hold imp->imp_lock */ +void ptlrpc_free_committed(struct obd_import *imp) { struct list_head *tmp, *saved; struct ptlrpc_request *req; -restart: - list_for_each_safe(tmp, saved, &conn->c_sending_head) { + list_for_each_safe(tmp, saved, &imp->imp_request_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); if (req->rq_flags & PTL_RPC_FL_REPLAY) { @@ -470,33 +482,27 @@ restart: continue; } - if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) { + /* If neither replied-to nor restarted, keep it. */ + if (!(req->rq_flags & + (PTL_RPC_FL_REPLIED | PTL_RPC_FL_RESTART))) { DEBUG_REQ(D_HA, req, "keeping (in-flight)"); continue; } + /* This needs to match the commit test in ptlrpc_queue_wait() */ + if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) || + req->rq_transno == 0) { + DEBUG_REQ(D_HA, req, "keeping (queue_wait will free)"); + continue; + } + /* not yet committed */ - if (req->rq_transno > conn->c_last_committed) + if (req->rq_transno > imp->imp_peer_committed_transno) break; DEBUG_REQ(D_HA, req, "committing (last_committed %Lu)", - (long long)conn->c_last_committed); - if (atomic_dec_and_test(&req->rq_refcount)) { - /* We do this to prevent free_req deadlock. Restarting - * after each removal is not so bad, as we are almost - * always deleting the first item in the list. - * - * If we use a recursive lock here, we can skip the - * unlock/lock/restart sequence. - */ - spin_unlock(&conn->c_lock); - ptlrpc_free_req(req); - spin_lock(&conn->c_lock); - goto restart; - } else { - list_del(&req->rq_list); - list_add(&req->rq_list, &conn->c_dying_head); - } + imp->imp_peer_committed_transno); + __ptlrpc_req_finished(req, 1); } EXIT; @@ -512,35 +518,18 @@ void ptlrpc_cleanup_client(struct obd_import *imp) LASSERT(conn); -restart1: - spin_lock(&conn->c_lock); - list_for_each_safe(tmp, saved, &conn->c_sending_head) { + spin_lock(&imp->imp_lock); + list_for_each_safe(tmp, saved, &imp->imp_request_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); - if (req->rq_import != imp) - continue; + /* XXX we should make sure that nobody's sleeping on these! */ DEBUG_REQ(D_HA, req, "cleaning up from sending list"); list_del_init(&req->rq_list); req->rq_import = NULL; - spin_unlock(&conn->c_lock); - ptlrpc_req_finished(req); - goto restart1; + __ptlrpc_req_finished(req, 0); } -restart2: - list_for_each_safe(tmp, saved, &conn->c_dying_head) { - req = list_entry(tmp, struct ptlrpc_request, rq_list); - if (req->rq_import != imp) - continue; - DEBUG_REQ(D_ERROR, req, "on dying list at cleanup"); - list_del_init(&req->rq_list); - req->rq_import = NULL; - spin_unlock(&conn->c_lock); - ptlrpc_req_finished(req); - spin_lock(&conn->c_lock); - goto restart2; - } - spin_unlock(&conn->c_lock); - + spin_unlock(&imp->imp_lock); + EXIT; return; } @@ -548,8 +537,7 @@ restart2: void ptlrpc_continue_req(struct ptlrpc_request *req) { ENTRY; - CDEBUG(D_HA, "continue delayed request "LPD64" opc %d\n", - req->rq_xid, req->rq_reqmsg->opc); + DEBUG_REQ(D_HA, req, "continuing delayed request"); req->rq_reqmsg->addr = req->rq_import->imp_handle.addr; req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie; wake_up(&req->rq_wait_for_rep); @@ -559,8 +547,7 @@ void ptlrpc_continue_req(struct ptlrpc_request *req) void ptlrpc_resend_req(struct ptlrpc_request *req) { ENTRY; - CDEBUG(D_HA, "resend request "LPD64", opc %d\n", - req->rq_xid, req->rq_reqmsg->opc); + DEBUG_REQ(D_HA, req, "resending"); req->rq_reqmsg->addr = req->rq_import->imp_handle.addr; req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie; req->rq_status = -EAGAIN; @@ -574,10 +561,9 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) void ptlrpc_restart_req(struct ptlrpc_request *req) { ENTRY; - CDEBUG(D_HA, "restart completed request "LPD64", opc %d\n", - req->rq_xid, req->rq_reqmsg->opc); + DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request"); req->rq_status = -ERESTARTSYS; - req->rq_flags |= PTL_RPC_FL_RECOVERY; + req->rq_flags |= PTL_RPC_FL_RESTART; req->rq_flags &= ~PTL_RPC_FL_TIMEOUT; wake_up(&req->rq_wait_for_rep); EXIT; @@ -654,21 +640,16 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) { int rc = 0; struct l_wait_info lwi; - //struct ptlrpc_client *cli = req->rq_import->imp_client; - struct ptlrpc_connection *conn = req->rq_import->imp_connection; + struct obd_import *imp = req->rq_import; + struct ptlrpc_connection *conn = imp->imp_connection; ENTRY; init_waitqueue_head(&req->rq_wait_for_rep); req->rq_reqmsg->status = HTON__u32(current->pid); /* for distributed debugging */ - CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:" - LPX64":%x:%d\n", - NTOH__u32(req->rq_reqmsg->status), - req->rq_xid, - conn->c_peer.peer_nid, - NTOH__u32(req->rq_reqmsg->opc) - ); + CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n", + NTOH__u32(req->rq_reqmsg->status), req->rq_xid, + conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc)); - //DEBUG_REQ(D_HA, req, "subsys: %s:", cli->cli_name); /* XXX probably both an import and connection level are needed */ if (req->rq_level > conn->c_level) { @@ -703,18 +684,20 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) EIO_IF_INVALID(conn, req); list_del(&req->rq_list); - list_add_tail(&req->rq_list, &conn->c_sending_head); + list_add_tail(&req->rq_list, &imp->imp_request_list); spin_unlock(&conn->c_lock); rc = ptl_send_rpc(req); if (rc) { CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc, req->rq_reqmsg->opc); - /* the sleep below will time out, triggering recovery */ + /* sleep for a jiffy, then trigger recovery */ + lwi = LWI_TIMEOUT_INTR(1, expired_request, + interrupted_request, req); + } else { + DEBUG_REQ(D_NET, req, "-- sleeping"); + lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request, + interrupted_request, req); } - - DEBUG_REQ(D_NET, req, "-- sleeping"); - lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request, - interrupted_request, req); l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi); DEBUG_REQ(D_NET, req, "-- done sleeping"); @@ -761,7 +744,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) GOTO(out, rc = -EINVAL); } #endif - CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid); + CDEBUG(D_NET, "got rep "LPU64"\n", req->rq_xid); if (req->rq_repmsg->status == 0) CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg, req->rq_replen, req->rq_repmsg->status); @@ -773,8 +756,10 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) * * But don't commit anything that's kept indefinitely for replay (has * the PTL_RPC_FL_REPLAY flag set), such as open requests. + * + * This needs to match the commit test in ptlrpc_free_committed(). */ - if ((req->rq_import->imp_flags & IMP_REPLAYABLE) == 0 || + if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) || (req->rq_repmsg->transno == 0 && (req->rq_flags & PTL_RPC_FL_REPLAY) == 0)) { /* This import doesn't support replay, so we can just "commit" @@ -782,20 +767,17 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) */ DEBUG_REQ(D_HA, req, "not replayable, committing:"); list_del_init(&req->rq_list); - spin_unlock(&conn->c_lock); - ptlrpc_req_finished(req); /* Must be called unlocked. */ - spin_lock(&conn->c_lock); - } else /* if (req->rq_import->imp_flags & IMP_REPLAYABLE) */ { + __ptlrpc_req_finished(req, 1); + } + if (req->rq_import->imp_flags & IMP_REPLAYABLE) { /* Replay-enabled imports return commit-status information. */ - /* XXX this needs to be per-import, or multiple MDS services on - * XXX the same system are going to interfere messily with each - * XXX others' transno spaces. - */ - conn->c_last_xid = req->rq_repmsg->last_xid; - conn->c_last_committed = req->rq_repmsg->last_committed; - ptlrpc_free_committed(conn); + imp->imp_peer_last_xid = req->rq_repmsg->last_xid; + imp->imp_peer_committed_transno = + req->rq_repmsg->last_committed; + ptlrpc_free_committed(imp); } + rc = ptlrpc_check_status(req); spin_unlock(&conn->c_lock); EXIT; diff --git a/lustre/ptlrpc/connection.c b/lustre/ptlrpc/connection.c index df2a2c2..2458b08 100644 --- a/lustre/ptlrpc/connection.c +++ b/lustre/ptlrpc/connection.c @@ -80,21 +80,17 @@ struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer, GOTO(out, c); c->c_level = LUSTRE_CONN_NEW; - c->c_xid_in = 1; - c->c_xid_out = 1; c->c_generation = 1; c->c_epoch = 1; c->c_bootcount = 0; c->c_flags = 0; if (uuid) strcpy(c->c_remote_uuid, uuid); - INIT_LIST_HEAD(&c->c_delayed_head); - INIT_LIST_HEAD(&c->c_sending_head); - INIT_LIST_HEAD(&c->c_dying_head); INIT_LIST_HEAD(&c->c_imports); INIT_LIST_HEAD(&c->c_exports); INIT_LIST_HEAD(&c->c_sb_chain); INIT_LIST_HEAD(&c->c_recovd_data.rd_managed_chain); + INIT_LIST_HEAD(&c->c_delayed_head); atomic_set(&c->c_refcount, 0); ptlrpc_connection_addref(c); spin_lock_init(&c->c_lock); @@ -164,8 +160,8 @@ void ptlrpc_cleanup_connection(void) } list_for_each_safe(tmp, pos, &conn_list) { c = list_entry(tmp, struct ptlrpc_connection, c_link); - CERROR("Connection %p has refcount %d at cleanup (nid=%lu)!\n", - c, atomic_read(&c->c_refcount), + CERROR("Connection %p/%s has refcount %d (nid=%lu)\n", + c, c->c_remote_uuid, atomic_read(&c->c_refcount), (unsigned long)c->c_peer.peer_nid); list_del(&c->c_link); OBD_FREE(c, sizeof(*c)); diff --git a/lustre/ptlrpc/recovd.c b/lustre/ptlrpc/recovd.c index 1520cf9..0bbc4b0 100644 --- a/lustre/ptlrpc/recovd.c +++ b/lustre/ptlrpc/recovd.c @@ -124,8 +124,8 @@ void recovd_conn_fail(struct ptlrpc_connection *conn) return; } - CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid); - CERROR("peer is %08x %08lx %08lx\n", conn->c_peer.peer_nid, + CERROR("connection %p to %s (%08x %08lx %08lx) failed\n", conn, + conn->c_remote_uuid, conn->c_peer.peer_nid, conn->c_peer.peer_ni.nal_idx, conn->c_peer.peer_ni.handle_idx); list_del(&rd->rd_managed_chain); list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items); diff --git a/lustre/ptlrpc/recover.c b/lustre/ptlrpc/recover.c index 060258f..9d955e6 100644 --- a/lustre/ptlrpc/recover.c +++ b/lustre/ptlrpc/recover.c @@ -6,11 +6,11 @@ * This code is issued under the GNU General Public License. * See the file COPYING in this distribution * - * Copryright (C) 1996 Peter J. Braam - * Copryright (C) 1999 Stelias Computing Inc. - * Copryright (C) 1999 Seagate Technology Inc. - * Copryright (C) 2001 Mountain View Data, Inc. - * Copryright (C) 2002 Cluster File Systems, Inc. + * Copyright (C) 1996 Peter J. Braam + * Copyright (C) 1999 Stelias Computing Inc. + * Copyright (C) 1999 Seagate Technology Inc. + * Copyright (C) 2001 Mountain View Data, Inc. + * Copyright (C) 2002 Cluster File Systems, Inc. * */ @@ -40,7 +40,6 @@ int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc) request->rq_level = LUSTRE_CONN_NEW; request->rq_replen = lustre_msg_size(0, NULL); /* - * This address is the export that represents our client-side LDLM * service (for ASTs). We should only have one on this list, so we * just grab the first one. @@ -52,24 +51,55 @@ int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc) request->rq_reqmsg->addr = (__u64)(unsigned long)ldlmexp; request->rq_reqmsg->cookie = ldlmexp->exp_cookie; rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); - if (rc) { + switch (rc) { + case EALREADY: + case -EALREADY: + /* already connected! */ + memset(&old_hdl, 0, sizeof(old_hdl)); + if (!memcmp(&old_hdl.addr, &request->rq_repmsg->addr, + sizeof (old_hdl.addr)) && + !memcmp(&old_hdl.cookie, &request->rq_repmsg->cookie, + sizeof (old_hdl.cookie))) { + CERROR("%s@%s didn't like our handle %Lx/%Lx, failed\n", + cli->cl_target_uuid, conn->c_remote_uuid, + (__u64)(unsigned long)ldlmexp, + ldlmexp->exp_cookie); + GOTO(out_disc, rc = -ENOTCONN); + } + + old_hdl.addr = request->rq_repmsg->addr; + old_hdl.cookie = request->rq_repmsg->cookie; + if (memcmp(&imp->imp_handle, &old_hdl, sizeof(old_hdl))) { + CERROR("%s@%s changed handle from %Lx/%Lx to %Lx/%Lx; " + "copying, but this may foreshadow disaster\n", + cli->cl_target_uuid, conn->c_remote_uuid, + old_hdl.addr, old_hdl.cookie, + imp->imp_handle.addr, imp->imp_handle.cookie); + imp->imp_handle.addr = request->rq_repmsg->addr; + imp->imp_handle.cookie = request->rq_repmsg->cookie; + GOTO(out_disc, rc = EALREADY); + } + + CERROR("reconnected to %s@%s after partition\n", + cli->cl_target_uuid, conn->c_remote_uuid); + GOTO(out_disc, rc = EALREADY); + case 0: + old_hdl = imp->imp_handle; + imp->imp_handle.addr = request->rq_repmsg->addr; + imp->imp_handle.cookie = request->rq_repmsg->cookie; + CERROR("now connected to %s@%s (%Lx/%Lx, was %Lx/%Lx)!\n", + cli->cl_target_uuid, conn->c_remote_uuid, + imp->imp_handle.addr, imp->imp_handle.cookie, + old_hdl.addr, old_hdl.cookie); + GOTO(out_disc, rc = 0); + default: CERROR("cannot connect to %s@%s: rc = %d\n", cli->cl_target_uuid, conn->c_remote_uuid, rc); - ptlrpc_free_req(request); - GOTO(out_disc, rc = -ENOTCONN); + GOTO(out_disc, rc = -ENOTCONN); /* XXX preserve rc? */ } - - old_hdl = imp->imp_handle; - imp->imp_handle.addr = request->rq_repmsg->addr; - imp->imp_handle.cookie = request->rq_repmsg->cookie; - CERROR("reconnected to %s@%s (%Lx/%Lx, was %Lx/%Lx)!\n", - cli->cl_target_uuid, conn->c_remote_uuid, - imp->imp_handle.addr, imp->imp_handle.cookie, - old_hdl.addr, old_hdl.cookie); - ptlrpc_req_finished(request); out_disc: + ptlrpc_req_finished(request); return rc; } @@ -113,23 +143,16 @@ int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn) #define REPLAY_RESEND 2 /* Resend required. */ #define REPLAY_RESEND_IGNORE 3 /* Resend, ignore the reply (already saw it). */ #define REPLAY_RESTART 4 /* Have to restart the call, sorry! */ -#define REPLAY_NO_STATE 5 /* Request doesn't change MDS state: skip. */ -static int replay_state(struct ptlrpc_request *req, __u64 last_xid) +static int replay_state(struct ptlrpc_request *req, __u64 committed) { /* This request must always be replayed. */ if (req->rq_flags & PTL_RPC_FL_REPLAY) return REPLAY_REPLAY; /* Uncommitted request */ - if (req->rq_xid > last_xid) { + if (req->rq_transno > committed) { if (req->rq_flags & PTL_RPC_FL_REPLIED) { - if (req->rq_transno == 0) { - /* If no transno was returned, no state was - altered on the MDS. */ - return REPLAY_NO_STATE; - } - /* Saw reply, so resend and ignore new reply. */ return REPLAY_RESEND_IGNORE; } @@ -149,7 +172,6 @@ static int replay_state(struct ptlrpc_request *req, __u64 last_xid) static char *replay_state2str(int state) { static char *state_strings[] = { "COMMITTED", "REPLAY", "RESEND", "RESEND_IGNORE", "RESTART", - "NO_STATE" }; static char *unknown_state = "UNKNOWN"; @@ -161,36 +183,52 @@ static char *replay_state2str(int state) { return state_strings[state]; } -int ptlrpc_replay(struct ptlrpc_connection *conn) +int ptlrpc_replay(struct obd_import *imp, int unreplied_only) { - int rc = 0; + int rc = 0, state; struct list_head *tmp, *pos; struct ptlrpc_request *req; + struct ptlrpc_connection *conn = imp->imp_connection; + __u64 committed = imp->imp_peer_committed_transno; ENTRY; - spin_lock(&conn->c_lock); + spin_lock(&imp->imp_lock); - CDEBUG(D_HA, "connection %p to %s has last_xid "LPD64"\n", - conn, conn->c_remote_uuid, conn->c_last_xid); + CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n", + imp, imp->imp_obd->u.cli.cl_target_uuid, committed); - list_for_each(tmp, &conn->c_sending_head) { - int state; + list_for_each(tmp, &imp->imp_request_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); - state = replay_state(req, conn->c_last_xid); + state = replay_state(req, committed); DEBUG_REQ(D_HA, req, "SENDING: %s: ", replay_state2str(state)); } list_for_each(tmp, &conn->c_delayed_head) { - int state; req = list_entry(tmp, struct ptlrpc_request, rq_list); - state = replay_state(req, conn->c_last_xid); - DEBUG_REQ(D_HA, req, "DELAYED: "); + state = replay_state(req, committed); + DEBUG_REQ(D_HA, req, "DELAYED: %s: ", replay_state2str(state)); } - list_for_each_safe(tmp, pos, &conn->c_sending_head) { + list_for_each_safe(tmp, pos, &imp->imp_request_list) { req = list_entry(tmp, struct ptlrpc_request, rq_list); - - switch (replay_state(req, conn->c_last_xid)) { + + if (unreplied_only) { + if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) { + DEBUG_REQ(D_HA, req, "UNREPLIED:"); + ptlrpc_restart_req(req); + } + continue; + } + + state = replay_state(req, committed); + + if (req->rq_transno == imp->imp_max_transno) { + req->rq_reqmsg->flags |= MSG_LAST_REPLAY; + DEBUG_REQ(D_HA, req, "last for replay"); + LASSERT(state != REPLAY_COMMITTED); + } + + switch (state) { case REPLAY_REPLAY: DEBUG_REQ(D_HA, req, "REPLAY:"); rc = ptlrpc_replay_req(req); @@ -208,14 +246,8 @@ int ptlrpc_replay(struct ptlrpc_connection *conn) } break; - case REPLAY_COMMITTED: - DEBUG_REQ(D_HA, req, "COMMITTED:"); - /* XXX commit now? */ - break; - - case REPLAY_NO_STATE: - DEBUG_REQ(D_HA, req, "NO_STATE:"); + DEBUG_REQ(D_ERROR, req, "COMMITTED:"); /* XXX commit now? */ break; diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index f33fa17..7263ac0 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -254,7 +254,6 @@ EXPORT_SYMBOL(ptlrpc_prep_bulk); EXPORT_SYMBOL(ptlrpc_free_bulk); EXPORT_SYMBOL(ptlrpc_prep_bulk_page); EXPORT_SYMBOL(ptlrpc_free_bulk_page); -EXPORT_SYMBOL(ptlrpc_check_status); EXPORT_SYMBOL(ll_brw_sync_wait); /* service.c */ diff --git a/lustre/utils/lconf b/lustre/utils/lconf index 9e7e42c..d74ba89 100755 --- a/lustre/utils/lconf +++ b/lustre/utils/lconf @@ -888,7 +888,12 @@ class LOV(Module): osc = lookup(self.dom_node.parentNode, osc_uuid) if osc: n = OSC(osc) - n.prepare() + try: + # Ignore connection failures, because the LOV will DTRT with + # an unconnected OSC. + n.prepare(ignore_connect_failure=1) + except CommandError: + print "Error preparing OSC %s (inactive)\n" % osc_uuid else: panic('osc not found:', osc_uuid) mdc_uuid = prepare_mdc(self.dom_node.parentNode, self.mds_uuid) @@ -1089,19 +1094,23 @@ class OSC(Module): self.lookup_server(self.ost_uuid) self.add_module('lustre/osc', 'osc') - def prepare(self): + def prepare(self, ignore_connect_failure = 0): if is_prepared(self.uuid): return self.info(self.obd_uuid, self.ost_uuid) srv = self.get_server() - if local_net(srv): - lctl.connect(srv.net_type, srv.nid, srv.port, srv.uuid, srv.send_mem, srv.recv_mem) - else: - r = find_route(srv) - if r: - lctl.add_route_host(r[0], srv.uuid, r[1], r[2]) + try: + if local_net(srv): + lctl.connect(srv.net_type, srv.nid, srv.port, srv.uuid, srv.send_mem, srv.recv_mem) else: - panic ("no route to", srv.nid) + r = find_route(srv) + if r: + lctl.add_route_host(r[0], srv.uuid, r[1], r[2]) + else: + panic ("no route to", srv.nid) + except CommandError: + if (ignore_connect_failure == 0): + pass lctl.newdev(attach="osc %s %s" % (self.name, self.uuid), setup ="%s %s" %(self.obd_uuid, srv.uuid)) -- 1.8.3.1