From: tappro Date: Mon, 2 Oct 2006 07:30:21 +0000 (+0000) Subject: - merge recovery from cmd2 X-Git-Tag: v1_8_0_110~486^2~736 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=b30ea39916e05e24a3518377974ad0d301c12451;p=fs%2Flustre-release.git - merge recovery from cmd2 --- diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index ce8b8cb..c6fd621 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -409,6 +409,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define MSG_LAST_REPLAY 1 #define MSG_RESENT 2 #define MSG_REPLAY 4 +#define MSG_REQ_REPLAY_DONE 8 +#define MSG_LOCK_REPLAY_DONE 16 /* * Flags for all connect opcodes (MDS_CONNECT, OST_CONNECT) @@ -421,7 +423,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define MSG_CONNECT_LIBCLIENT 0x10 #define MSG_CONNECT_INITIAL 0x20 #define MSG_CONNECT_ASYNC 0x40 -#define MSG_CONNECT_NEXT_VER 0x80 /* use next version of lustre_msg */ +#define MSG_CONNECT_NEXT_VER 0x80 /* use next version of lustre_msg */ +#define MSG_CONNECT_TRANSNO 0x100 /* report transno */ /* Connect flags */ #define OBD_CONNECT_RDONLY 0x1ULL /* client allowed read-only access */ @@ -479,18 +482,18 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); * almost certainly will, then perhaps we stick a union in here. */ struct obd_connect_data { __u64 ocd_connect_flags; /* OBD_CONNECT_* per above */ + __u64 ocd_transno; /* first transno from client to be replayed */ + __u64 ocd_ibits_known; /* inode bits this client understands */ __u32 ocd_version; /* lustre release version number */ __u32 ocd_grant; /* initial cache grant amount (bytes) */ __u32 ocd_index; /* LOV index to connect to */ __u32 ocd_brw_size; /* Maximum BRW size in bytes */ - __u64 ocd_ibits_known; /* inode bits this client understands */ __u32 ocd_nllu; /* non-local-lustre-user */ __u32 ocd_nllg; /* non-local-lustre-group */ __u32 ocd_group; /* MDS group on OST */ __u32 padding1; /* also fix lustre_swab_connect */ __u64 padding2; /* also fix lustre_swab_connect */ __u64 padding3; /* also fix lustre_swab_connect */ - __u64 padding4; /* also fix lustre_swab_connect */ }; extern void lustre_swab_connect(struct obd_connect_data *ocd); diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 896d3e8..51e8bf4 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -80,6 +80,7 @@ struct filter_export_data { struct obd_export { struct portals_handle exp_handle; atomic_t exp_refcount; + atomic_t exp_rpc_count; struct obd_uuid exp_client_uuid; struct list_head exp_obd_chain; /* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */ @@ -96,9 +97,11 @@ struct obd_export { __u64 exp_connect_flags; int exp_flags; unsigned int exp_failed:1, + exp_connected:1, exp_disconnected:1, exp_connecting:1, - exp_replay_needed:1, + exp_req_replay_needed:1, + exp_lock_replay_needed:1, exp_libclient:1; /* liblustre client? */ union { struct mds_export_data eu_mds_data; diff --git a/lustre/include/lustre_lib.h b/lustre/include/lustre_lib.h index 8e865ca..4233d36 100644 --- a/lustre/include/lustre_lib.h +++ b/lustre/include/lustre_lib.h @@ -51,7 +51,7 @@ struct obd_export; #include #include -int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler); +int target_handle_connect(struct ptlrpc_request *req); int target_handle_disconnect(struct ptlrpc_request *req); void target_destroy_export(struct obd_export *exp); int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, @@ -71,8 +71,10 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req); void target_cancel_recovery_timer(struct obd_device *obd); #define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 / 2) /* *waves hands* */ -void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler); -void target_abort_recovery(void *data); +void target_start_recovery_timer(struct obd_device *obd); +int target_start_recovery_thread(struct obd_device *obd, + svc_handler_t handler); +void target_stop_recovery_thread(struct obd_device *obd); void target_cleanup_recovery(struct obd_device *obd); int target_queue_recovery_request(struct ptlrpc_request *req, struct obd_device *obd); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 8515aae..3493fec 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -848,6 +848,13 @@ struct obd_notify_upcall { void *onu_owner; }; +struct target_recovery_data { + svc_handler_t trd_recovery_handler; + pid_t trd_processing_task; + struct completion trd_starting; + struct completion trd_finishing; +}; + /* corresponds to one of the obd's */ #define MAX_OBD_NAME 128 #define OBD_DEVICE_MAGIC 0XAB5CD6EF @@ -869,6 +876,7 @@ struct obd_device { obd_replayable:1, /* recovery is enabled; inform clients */ obd_no_transno:1, /* no committed-transno notification */ obd_no_recov:1, /* fail instead of retry messages */ + obd_req_replaying:1, /* replaying requests */ obd_stopping:1, /* started cleanup */ obd_starting:1, /* started setup */ obd_force:1, /* cleanup with > 0 obd refcount */ @@ -900,11 +908,13 @@ struct obd_device { /* XXX encapsulate all this recovery data into one struct */ svc_handler_t obd_recovery_handler; + pid_t obd_processing_task; + + /* common recovery fields for b1_x and CMD2 */ int obd_max_recoverable_clients; int obd_connected_clients; int obd_recoverable_clients; spinlock_t obd_processing_task_lock; /* BH lock (timer) */ - pid_t obd_processing_task; __u64 obd_next_recovery_transno; int obd_replayed_requests; int obd_requests_queued_for_recovery; @@ -912,10 +922,18 @@ struct obd_device { struct list_head obd_uncommitted_replies; spinlock_t obd_uncommitted_replies_lock; cfs_timer_t obd_recovery_timer; - struct list_head obd_recovery_queue; - struct list_head obd_delayed_reply_queue; time_t obd_recovery_start; time_t obd_recovery_end; + + /* new recovery stuff from CMD2 */ + struct target_recovery_data obd_recovery_data; + int obd_replayed_locks; + atomic_t obd_req_replay_clients; + atomic_t obd_lock_replay_clients; + struct list_head obd_req_replay_queue; + struct list_head obd_lock_replay_queue; + struct list_head obd_final_req_queue; + int obd_recovery_stage; union { struct obd_device_target obt; diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index b69db25..d4ca3d1 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -176,7 +176,8 @@ int class_connect(struct lustre_handle *conn, struct obd_device *obd, int class_disconnect(struct obd_export *exp); void class_fail_export(struct obd_export *exp); void class_disconnect_exports(struct obd_device *obddev); -void class_disconnect_stale_exports(struct obd_device *obddev); +int class_disconnect_stale_exports(struct obd_device *, + int (*test_export)(struct obd_export *)); int class_manual_cleanup(struct obd_device *obd); void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid); diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 0a6cb18..8400b01 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -98,7 +98,6 @@ static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid, } else { spin_unlock(&imp->imp_lock); GOTO(out_free, rc = -ENOENT); - } spin_unlock(&imp->imp_lock); @@ -539,7 +538,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, RETURN(0); } -int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) +int target_handle_connect(struct ptlrpc_request *req) { struct obd_device *target, *targref = NULL; struct obd_export *export = NULL; @@ -550,7 +549,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) struct obd_uuid remote_uuid; struct list_head *p; char *str, *tmp; - int rc = 0, abort_recovery; + int rc = 0; + int initial_conn = 0; struct obd_connect_data *data; int size[2] = { sizeof(struct ptlrpc_body), sizeof(*data) }; ENTRY; @@ -566,17 +566,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) obd_str2uuid (&tgtuuid, str); target = class_uuid2obd(&tgtuuid); - /* COMPAT_146 */ - /* old (pre 1.6) lustre_process_log tries to connect to mdsname - (eg. mdsA) instead of uuid. */ - if (!target) { - snprintf((char *)tgtuuid.uuid, sizeof(tgtuuid), "%s_UUID", str); - target = class_uuid2obd(&tgtuuid); - } if (!target) target = class_name2obd(str); - /* end COMPAT_146 */ - + if (!target || target->obd_stopping || !target->obd_set_up) { DEBUG_REQ(D_ERROR, req, "UUID '%s' is not available " " for connect (%s)", str, @@ -615,12 +607,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) LBUG(); } - spin_lock_bh(&target->obd_processing_task_lock); - abort_recovery = target->obd_abort_recovery; - spin_unlock_bh(&target->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(target); - tmp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn); if (tmp == NULL) GOTO(out, rc = -EPROTO); @@ -629,6 +615,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) data = lustre_swab_reqbuf(req, REQ_REC_OFF + 3, sizeof(*data), lustre_swab_connect); + + if (!data) + GOTO(out, rc = -EPROTO); + rc = lustre_pack_reply(req, 2, size, NULL); if (rc) GOTO(out, rc); @@ -658,6 +648,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } } + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL) + initial_conn = 1; + /* lctl gets a backstage, all-access pass. */ if (obd_uuid_equals(&cluuid, &target->obd_uuid)) goto dont_check_exports; @@ -682,10 +675,25 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } export = NULL; } + /* If we found an export, we already unlocked. */ if (!export) { spin_unlock(&target->obd_dev_lock); OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_CONNECT, 2 * obd_timeout); + } else if (req->rq_export == NULL && + atomic_read(&export->exp_rpc_count) > 0) { + CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + libcfs_nid2str(req->rq_peer.nid), + export, atomic_read(&export->exp_refcount)); + GOTO(out, rc = -EBUSY); + } else if (req->rq_export != NULL && + atomic_read(&export->exp_rpc_count) > 1) { + CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + libcfs_nid2str(req->rq_peer.nid), + export, atomic_read(&export->exp_rpc_count)); + GOTO(out, rc = -EBUSY); } else if (lustre_msg_get_conn_cnt(req->rq_reqmsg) == 1) { CERROR("%s: NID %s (%s) reconnected with 1 conn_cnt; " "cookies not random?\n", target->obd_name, @@ -697,18 +705,22 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) /* We want to handle EALREADY but *not* -EALREADY from * target_handle_reconnect(), return reconnection state in a flag */ + //XXX: check this if (rc == EALREADY) { lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT); rc = 0; } else if (rc) { GOTO(out, rc); } - /* Tell the client if we're in recovery. */ /* If this is the first client, start the recovery timer */ + CWARN("%s: connection from %s@%s %st"LPU64"\n", target->obd_name, + cluuid.uuid, libcfs_nid2str(req->rq_peer.nid), + target->obd_recovering ? "recovering/" : "", data->ocd_transno); + if (target->obd_recovering) { lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING); - target_start_recovery_timer(target, handler); + target_start_recovery_timer(target/*, handler*/); } /* Tell the client if we support replayable requests */ @@ -733,17 +745,16 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } else { rc = obd_reconnect(export, target, &cluuid, data); } - if (rc) GOTO(out, rc); - /* Return only the parts of obd_connect_data that we understand, so the * client knows that we don't understand the rest. */ - if (data) + if (data) { + //data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED; memcpy(lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*data)), - data, sizeof(*data)); - + sizeof(*data)), data, sizeof(*data)); + } + /* If all else goes well, this is our RPC return code. */ req->rq_status = 0; @@ -771,7 +782,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) req->rq_export = export; spin_lock(&export->exp_lock); - if (export->exp_conn_cnt >= lustre_msg_get_conn_cnt(req->rq_reqmsg)) { + if (initial_conn) { + lustre_msg_set_conn_cnt(req->rq_repmsg, export->exp_conn_cnt + 1); + } else if (export->exp_conn_cnt >= lustre_msg_get_conn_cnt(req->rq_reqmsg)) { CERROR("%s: %s already connected at higher conn_cnt: %d > %d\n", cluuid.uuid, libcfs_nid2str(req->rq_peer.nid), export->exp_conn_cnt, @@ -804,16 +817,33 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) GOTO(out, rc = 0); } - if (target->obd_recovering) + spin_lock_bh(&target->obd_processing_task_lock); + if (target->obd_recovering && export->exp_connected == 0) { + export->exp_connected = 1; + if ((lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_TRANSNO) + && data->ocd_transno < target->obd_next_recovery_transno) + target->obd_next_recovery_transno = data->ocd_transno; target->obd_connected_clients++; - + if (target->obd_connected_clients == target->obd_max_recoverable_clients) + wake_up(&target->obd_next_transno_waitq); + } + spin_unlock_bh(&target->obd_processing_task_lock); memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn), sizeof conn); if (export->exp_imp_reverse != NULL) destroy_import(export->exp_imp_reverse); + + /* for the rest part, we return -ENOTCONN in case of errors + * in order to let client initialize connection again. + */ revimp = export->exp_imp_reverse = class_new_import(target); + if (!revimp) { + CERROR("fail to alloc new reverse import.\n"); + GOTO(out, rc = -ENOTCONN); + } + revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection); revimp->imp_client = &export->exp_obd->obd_ldlm_client; revimp->imp_remote_handle = conn; @@ -875,44 +905,59 @@ void target_destroy_export(struct obd_export *exp) * Recovery functions */ - -static -struct ptlrpc_request *target_save_req(struct ptlrpc_request *src) +struct ptlrpc_request * +ptlrpc_clone_req( struct ptlrpc_request *orig_req) { - struct ptlrpc_request *req; - struct lustre_msg *reqmsg; + struct ptlrpc_request *copy_req; + struct lustre_msg *copy_reqmsg; - OBD_ALLOC_PTR(req); - if (!req) + OBD_ALLOC_PTR(copy_req); + if (!copy_req) return NULL; - - OBD_ALLOC(reqmsg, src->rq_reqlen); - if (!reqmsg) { - OBD_FREE_PTR(req); + OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen); + if (!copy_reqmsg){ + OBD_FREE_PTR(copy_req); return NULL; } - *req = *src; - memcpy(reqmsg, src->rq_reqmsg, src->rq_reqlen); - req->rq_reqmsg = reqmsg; - - class_export_get(req->rq_export); - CFS_INIT_LIST_HEAD(&req->rq_list); - sptlrpc_svc_ctx_addref(req); - if (req->rq_reply_state) - ptlrpc_rs_addref(req->rq_reply_state); + *copy_req = *orig_req; + memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen); + orig_req->rq_svc_ctx = NULL; + orig_req->rq_reply_state = NULL; + + copy_req->rq_reqmsg = copy_reqmsg; + class_export_get(copy_req->rq_export); + CFS_INIT_LIST_HEAD(©_req->rq_list); +#if 0 + sptlrpc_svc_ctx_addref(copy_req); + if (copy_req->rq_reply_state) + ptlrpc_rs_addref(copy_req->rq_reply_state); + /* the copied req takes over the reply state and security data */ + if (orig_req->rq_reply_state) { + ptlrpc_rs_decref(orig_req->rq_reply_state); + orig_req->rq_reply_state = NULL; + } + sptlrpc_svc_ctx_decref(orig_req); +#endif + return copy_req; +} - /* repmsg have been taken over, in privacy mode this might point to - * invalid data. prevent further access on it. - */ - src->rq_repmsg = NULL; - src->rq_replen = 0; +void ptlrpc_free_clone( struct ptlrpc_request *req) +{ + if (req->rq_reply_state) { + ptlrpc_rs_decref(req->rq_reply_state); + req->rq_reply_state = NULL; + } - return req; + sptlrpc_svc_ctx_decref(req); + class_export_put(req->rq_export); + list_del(&req->rq_list); + OBD_FREE(req->rq_reqmsg, req->rq_reqlen); + OBD_FREE_PTR(req); } -static -void target_release_saved_req(struct ptlrpc_request *req) + +static void target_release_req(struct ptlrpc_request *req) { if (req->rq_reply_state) { ptlrpc_rs_decref(req->rq_reply_state); @@ -927,8 +972,6 @@ void target_release_saved_req(struct ptlrpc_request *req) static void target_finish_recovery(struct obd_device *obd) { - struct list_head *tmp, *n; - CWARN("%s: sending delayed replies to recovered clients\n", obd->obd_name); @@ -941,37 +984,52 @@ static void target_finish_recovery(struct obd_device *obd) rc < 0 ? "failed" : "complete", rc); } - list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { - struct ptlrpc_request *req; + obd->obd_recovery_end = CURRENT_SECONDS; +} + +static void abort_req_replay_queue(struct obd_device *obd) +{ + struct ptlrpc_request *req; + struct list_head *tmp, *n; + int rc; + + list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); list_del(&req->rq_list); - DEBUG_REQ(D_WARNING, req, "delayed:"); - ptlrpc_reply(req); - target_release_saved_req(req); + DEBUG_REQ(D_ERROR, req, "aborted:"); + req->rq_status = -ENOTCONN; + req->rq_type = PTL_RPC_MSG_ERR; + rc = lustre_pack_reply(req, 0, NULL, NULL); + if (rc == 0) { + ptlrpc_reply(req); + } else { + DEBUG_REQ(D_ERROR, req, + "packing failed for abort-reply; skipping"); + } + target_release_req(req); } - obd->obd_recovery_end = CURRENT_SECONDS; } -static void abort_recovery_queue(struct obd_device *obd) +static void abort_lock_replay_queue(struct obd_device *obd) { struct ptlrpc_request *req; struct list_head *tmp, *n; int rc; - list_for_each_safe(tmp, n, &obd->obd_recovery_queue) { + list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); list_del(&req->rq_list); DEBUG_REQ(D_ERROR, req, "aborted:"); req->rq_status = -ENOTCONN; req->rq_type = PTL_RPC_MSG_ERR; - rc = lustre_pack_reply(req, 1, NULL, NULL); + rc = lustre_pack_reply(req, 0, NULL, NULL); if (rc == 0) { ptlrpc_reply(req); } else { DEBUG_REQ(D_ERROR, req, "packing failed for abort-reply; skipping"); } - target_release_saved_req(req); + target_release_req(req); } } @@ -1002,43 +1060,25 @@ void target_cleanup_recovery(struct obd_device *obd) target_cancel_recovery_timer(obd); spin_unlock_bh(&obd->obd_processing_task_lock); - list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { + list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); list_del(&req->rq_list); - target_release_saved_req(req); + LASSERT (req->rq_reply_state == 0); + target_release_req(req); } - - list_for_each_safe(tmp, n, &obd->obd_recovery_queue) { + list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); list_del(&req->rq_list); - target_release_saved_req(req); + LASSERT (req->rq_reply_state == 0); + target_release_req(req); } - EXIT; -} - -void target_abort_recovery(void *data) -{ - struct obd_device *obd = data; - - ENTRY; - spin_lock_bh(&obd->obd_processing_task_lock); - if (!obd->obd_recovering) { - spin_unlock_bh(&obd->obd_processing_task_lock); - EXIT; - return; + list_for_each_safe(tmp, n, &obd->obd_final_req_queue) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + list_del(&req->rq_list); + LASSERT (req->rq_reply_state == 0); + target_release_req(req); } - obd->obd_recovering = obd->obd_abort_recovery = 0; - obd->obd_recoverable_clients = 0; - target_cancel_recovery_timer(obd); - spin_unlock_bh(&obd->obd_processing_task_lock); - - LCONSOLE_WARN("%s: recovery period over; disconnecting unfinished " - "clients.\n", obd->obd_name); - class_disconnect_stale_exports(obd); - abort_recovery_queue(obd); - - target_finish_recovery(obd); - CDEBUG(D_HA, "%s: recovery complete\n", obd_uuid2str(&obd->obd_uuid)); + EXIT; } @@ -1079,116 +1119,431 @@ static void reset_recovery_timer(struct obd_device *obd) /* Only start it the first time called */ -void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler) +void target_start_recovery_timer(struct obd_device *obd) { spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_recovery_handler) { + if (obd->obd_recovery_handler + || timer_pending(&obd->obd_recovery_timer)) { spin_unlock_bh(&obd->obd_processing_task_lock); return; } CWARN("%s: starting recovery timer (%us)\n", obd->obd_name, OBD_RECOVERY_TIMEOUT); - obd->obd_recovery_handler = handler; cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd); spin_unlock_bh(&obd->obd_processing_task_lock); reset_recovery_timer(obd); } +#ifdef __KERNEL__ static int check_for_next_transno(struct obd_device *obd) { - struct ptlrpc_request *req; + struct ptlrpc_request *req = NULL; int wake_up = 0, connected, completed, queue_len, max; __u64 next_transno, req_transno; - + ENTRY; spin_lock_bh(&obd->obd_processing_task_lock); - req = list_entry(obd->obd_recovery_queue.next, - struct ptlrpc_request, rq_list); + + if (!list_empty(&obd->obd_req_replay_queue)) { + req = list_entry(obd->obd_req_replay_queue.next, + struct ptlrpc_request, rq_list); + req_transno = lustre_msg_get_transno(req->rq_reqmsg); + } else { + req_transno = 0; + } + max = obd->obd_max_recoverable_clients; - req_transno = lustre_msg_get_transno(req->rq_reqmsg); connected = obd->obd_connected_clients; completed = max - obd->obd_recoverable_clients; queue_len = obd->obd_requests_queued_for_recovery; next_transno = obd->obd_next_recovery_transno; - CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, " + CWARN("max: %d, connected: %d, completed: %d, queue_len: %d, " "req_transno: "LPU64", next_transno: "LPU64"\n", max, connected, completed, queue_len, req_transno, next_transno); + if (obd->obd_abort_recovery) { CDEBUG(D_HA, "waking for aborted recovery\n"); wake_up = 1; - } else if (!obd->obd_recovering) { - CDEBUG(D_HA, "waking for completed recovery (?)\n"); + } else if (atomic_read(&obd->obd_req_replay_clients) == 0) { + CDEBUG(D_HA, "waking for completed recovery\n"); wake_up = 1; } else if (req_transno == next_transno) { CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno); wake_up = 1; } else if (queue_len + completed == max) { - CDEBUG(D_ERROR, + LASSERT(lustre_msg_get_transno(req->rq_reqmsg) >= next_transno); + CDEBUG(req_transno > obd->obd_last_committed ? D_ERROR : D_HA, "waking for skipped transno (skip: "LPD64 ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n", next_transno, queue_len, completed, max, req_transno); obd->obd_next_recovery_transno = req_transno; wake_up = 1; + } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) { + /* some clients haven't connected in time, but we can try + * to replay requests that demand on already committed ones + * also, we can replay first non-committed transation */ + LASSERT(req_transno != 0); + if (req_transno == obd->obd_last_committed + 1) { + obd->obd_next_recovery_transno = req_transno; + } else if (req_transno > obd->obd_last_committed) { + /* can't continue recovery: have no needed transno */ + obd->obd_abort_recovery = 1; + CDEBUG(D_ERROR, "abort due to missed clients. max: %d, " + "connected: %d, completed: %d, queue_len: %d, " + "req_transno: "LPU64", next_transno: "LPU64"\n", + max, connected, completed, queue_len, + req_transno, next_transno); + } + wake_up = 1; } + spin_unlock_bh(&obd->obd_processing_task_lock); - LASSERT(lustre_msg_get_transno(req->rq_reqmsg) >= next_transno); return wake_up; } -static void process_recovery_queue(struct obd_device *obd) +static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd) { + struct l_wait_info lwi = { 0 }; struct ptlrpc_request *req; - int abort_recovery = 0; + + CDEBUG(D_HA, "Waiting for transno "LPD64"\n", + obd->obd_next_recovery_transno); + l_wait_event(obd->obd_next_transno_waitq, + check_for_next_transno(obd), &lwi); + + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_abort_recovery) { + req = NULL; + } else if (!list_empty(&obd->obd_req_replay_queue)) { + req = list_entry(obd->obd_req_replay_queue.next, + struct ptlrpc_request, rq_list); + list_del_init(&req->rq_list); + obd->obd_requests_queued_for_recovery--; + } else { + req = NULL; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + RETURN(req); +} + +static int check_for_next_lock(struct obd_device *obd) +{ + struct ptlrpc_request *req = NULL; + int wake_up = 0; + + spin_lock_bh(&obd->obd_processing_task_lock); + if (!list_empty(&obd->obd_lock_replay_queue)) { + req = list_entry(obd->obd_lock_replay_queue.next, + struct ptlrpc_request, rq_list); + CDEBUG(D_HA, "waking for next lock\n"); + wake_up = 1; + } else if (atomic_read(&obd->obd_lock_replay_clients) == 0) { + CDEBUG(D_HA, "waking for completed lock replay\n"); + wake_up = 1; + } else if (obd->obd_abort_recovery) { + CDEBUG(D_HA, "waking for aborted recovery\n"); + wake_up = 1; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + + return wake_up; +} + +static struct ptlrpc_request *target_next_replay_lock(struct obd_device *obd) +{ struct l_wait_info lwi = { 0 }; - ENTRY; + struct ptlrpc_request *req; - for (;;) { - spin_lock_bh(&obd->obd_processing_task_lock); - LASSERT(obd->obd_processing_task == cfs_curproc_pid()); - req = list_entry(obd->obd_recovery_queue.next, + CDEBUG(D_HA, "Waiting for lock\n"); + l_wait_event(obd->obd_next_transno_waitq, + check_for_next_lock(obd), &lwi); + + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_abort_recovery) { + req = NULL; + } else if (!list_empty(&obd->obd_lock_replay_queue)) { + req = list_entry(obd->obd_lock_replay_queue.next, struct ptlrpc_request, rq_list); + list_del_init(&req->rq_list); + } else { + req = NULL; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + return req; +} - if (lustre_msg_get_transno(req->rq_reqmsg) != - obd->obd_next_recovery_transno) { - spin_unlock_bh(&obd->obd_processing_task_lock); - CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is " - LPD64")\n", - obd->obd_next_recovery_transno, - lustre_msg_get_transno(req->rq_reqmsg)); - l_wait_event(obd->obd_next_transno_waitq, - check_for_next_transno(obd), &lwi); - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - return; - } - continue; - } +static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd) +{ + struct ptlrpc_request *req; + + spin_lock_bh(&obd->obd_processing_task_lock); + if (!list_empty(&obd->obd_final_req_queue)) { + req = list_entry(obd->obd_final_req_queue.next, + struct ptlrpc_request, rq_list); list_del_init(&req->rq_list); - obd->obd_requests_queued_for_recovery--; - spin_unlock_bh(&obd->obd_processing_task_lock); + } else { + req = NULL; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + return req; +} + +static inline int req_replay_done(struct obd_export *exp) +{ + return (exp->exp_req_replay_needed == 0); +} + +static inline int lock_replay_done(struct obd_export *exp) +{ + return (exp->exp_lock_replay_needed == 0); +} + +static inline int connect_done(struct obd_export *exp) +{ + return (exp->exp_connected != 0); +} + +static int check_for_clients(struct obd_device *obd) +{ + if (obd->obd_abort_recovery) + return 1; + LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients); + if (obd->obd_connected_clients == obd->obd_max_recoverable_clients) + return 1; + return 0; +} + +static int target_recovery_thread(void *arg) +{ + struct obd_device *obd = arg; + struct ptlrpc_request *req; + struct target_recovery_data *trd = &obd->obd_recovery_data; + struct l_wait_info lwi = { 0 }; + unsigned long delta; + unsigned long flags; + struct lu_env env; + struct ptlrpc_thread fake_svc_thread, *thread = &fake_svc_thread; + __u32 recov_ctx_tags = LCT_MD_THREAD; + int rc = 0; + ENTRY; + + cfs_daemonize("tgt_recov"); + + SIGNAL_MASK_LOCK(current, flags); + sigfillset(¤t->blocked); + RECALC_SIGPENDING; + SIGNAL_MASK_UNLOCK(current, flags); + + rc = lu_context_init(&env.le_ctx, recov_ctx_tags); + if (rc) + return rc; + + thread->t_env = &env; + env.le_ctx.lc_thread = thread; + + CERROR("%s: started recovery thread pid %d\n", obd->obd_name, + current->pid); + trd->trd_processing_task = current->pid; + + obd->obd_recovering = 1; + complete(&trd->trd_starting); + + /* first of all, we have to know the first transno to replay */ + obd->obd_abort_recovery = 0; + l_wait_event(obd->obd_next_transno_waitq, + check_for_clients(obd), &lwi); + + spin_lock_bh(&obd->obd_processing_task_lock); + target_cancel_recovery_timer(obd); + spin_unlock_bh(&obd->obd_processing_task_lock); + + /* If some clients haven't connected in time, evict them */ + if (obd->obd_abort_recovery) { + int stale; + CDEBUG(D_ERROR, "few clients haven't connect in time (%d/%d)," + "evict them ...\n", obd->obd_connected_clients, + obd->obd_max_recoverable_clients); + obd->obd_abort_recovery = 0; + stale = class_disconnect_stale_exports(obd, connect_done); + atomic_sub(stale, &obd->obd_req_replay_clients); + atomic_sub(stale, &obd->obd_lock_replay_clients); + } + /* next stage: replay requests */ + delta = jiffies; + obd->obd_req_replaying = 1; + CDEBUG(D_ERROR, "1: request replay stage - %d clients from t"LPU64"\n", + atomic_read(&obd->obd_req_replay_clients), + obd->obd_next_recovery_transno); + while ((req = target_next_replay_req(obd))) { + LASSERT(trd->trd_processing_task == current->pid); + DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s", + lustre_msg_get_transno(req->rq_reqmsg), + libcfs_nid2str(req->rq_peer.nid)); + + rc = lu_context_init(&req->rq_session, LCT_SESSION); + if (rc) { + CERROR("Failure to initialize session: %d\n", rc); + break; + } + req->rq_session.lc_thread = thread; + lu_context_enter(&req->rq_session); + req->rq_svc_thread = thread; + req->rq_svc_thread->t_env->le_ses = &req->rq_session; + + (void)trd->trd_recovery_handler(req); + + lu_context_exit(&req->rq_session); + lu_context_fini(&req->rq_session); - DEBUG_REQ(D_HA, req, "processing: "); - (void)obd->obd_recovery_handler(req); obd->obd_replayed_requests++; reset_recovery_timer(obd); - /* bug 1580: decide how to properly sync() in recovery */ - //mds_fsync_super(obd->u.obt.obt_sb); - target_release_saved_req(req); - + /* bug 1580: decide how to properly sync() in recovery*/ + //mds_fsync_super(mds->mds_sb); + ptlrpc_free_clone(req); spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_next_recovery_transno++; - if (list_empty(&obd->obd_recovery_queue)) { - obd->obd_processing_task = 0; - spin_unlock_bh(&obd->obd_processing_task_lock); - break; + spin_unlock_bh(&obd->obd_processing_task_lock); + } + + spin_lock_bh(&obd->obd_processing_task_lock); + target_cancel_recovery_timer(obd); + spin_unlock_bh(&obd->obd_processing_task_lock); + /* If some clients haven't replayed requests in time, evict them */ + if (obd->obd_abort_recovery) { + int stale; + CDEBUG(D_ERROR, "req replay timed out, aborting ...\n"); + obd->obd_abort_recovery = 0; + stale = class_disconnect_stale_exports(obd, req_replay_done); + atomic_sub(stale, &obd->obd_lock_replay_clients); + abort_req_replay_queue(obd); + /* XXX for debuggin tests 11 and 17 */ + /* LBUG(); */ + } + /* The second stage: replay locks */ + CDEBUG(D_ERROR, "2: lock replay stage - %d clients\n", + atomic_read(&obd->obd_lock_replay_clients)); + while ((req = target_next_replay_lock(obd))) { + LASSERT(trd->trd_processing_task == current->pid); + DEBUG_REQ(D_HA, req, "processing lock from %s: ", + libcfs_nid2str(req->rq_peer.nid)); + (void)trd->trd_recovery_handler(req); + reset_recovery_timer(obd); + ptlrpc_free_clone(req); + obd->obd_replayed_locks++; + } + + spin_lock_bh(&obd->obd_processing_task_lock); + target_cancel_recovery_timer(obd); + spin_unlock_bh(&obd->obd_processing_task_lock); + /* If some clients haven't replayed requests in time, evict them */ + if (obd->obd_abort_recovery) { + int stale; + CERROR("lock replay timed out, aborting ...\n"); + obd->obd_abort_recovery = 0; + stale = class_disconnect_stale_exports(obd, lock_replay_done); + abort_lock_replay_queue(obd); + } + + /* We drop recoverying flag to forward all new requests + * to regular mds_handle() since now */ + spin_lock_bh(&obd->obd_processing_task_lock); + obd->obd_recovering = 0; + spin_unlock_bh(&obd->obd_processing_task_lock); + /* The third stage: reply on final pings */ + CDEBUG(D_ERROR, "3: final stage - process recovery completion pings\n"); + while ((req = target_next_final_ping(obd))) { + LASSERT(trd->trd_processing_task == current->pid); + DEBUG_REQ(D_HA, req, "processing final ping from %s: ", + libcfs_nid2str(req->rq_peer.nid)); + (void)trd->trd_recovery_handler(req); + ptlrpc_free_clone(req); + } + + delta = (jiffies - delta) / HZ; + CDEBUG(D_ERROR,"4: recovery completed in %lus - %d/%d reqs/locks\n", + delta, obd->obd_replayed_requests, obd->obd_replayed_locks); + if (delta > obd_timeout * 2) { + CWARN("too long recovery - read logs\n"); + libcfs_debug_dumplog(); + } + target_finish_recovery(obd); + + lu_env_fini(&env); + trd->trd_processing_task = 0; + complete(&trd->trd_finishing); + return rc; +} + +int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler) +{ + int rc = 0; + struct target_recovery_data *trd = &obd->obd_recovery_data; + + memset(trd, 0, sizeof(*trd)); + init_completion(&trd->trd_starting); + init_completion(&trd->trd_finishing); + trd->trd_recovery_handler = handler; + + if (kernel_thread(target_recovery_thread, obd, 0) > 0) { + wait_for_completion(&trd->trd_starting); + LASSERT(obd->obd_recovering != 0); + } else + rc = -ECHILD; + + return rc; +} + +void target_stop_recovery_thread(struct obd_device *obd) +{ + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_recovery_data.trd_processing_task > 0) { + struct target_recovery_data *trd = &obd->obd_recovery_data; + CERROR("%s: aborting recovery\n", obd->obd_name); + obd->obd_abort_recovery = 1; + wake_up(&obd->obd_next_transno_waitq); + spin_unlock_bh(&obd->obd_processing_task_lock); + wait_for_completion(&trd->trd_finishing); + } else { + spin_unlock_bh(&obd->obd_processing_task_lock); + } +} +#endif + +int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req) +{ + struct obd_export *exp = req->rq_export; + LASSERT(exp != NULL); + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) { + /* client declares he's ready to replay locks */ + spin_lock_bh(&obd->obd_processing_task_lock); + if (exp->exp_req_replay_needed) { + LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0); + exp->exp_req_replay_needed = 0; + atomic_dec(&obd->obd_req_replay_clients); + obd->obd_recoverable_clients--; + if (atomic_read(&obd->obd_req_replay_clients) == 0) + CDEBUG(D_HA, "all clients have replayed reqs\n"); + wake_up(&obd->obd_next_transno_waitq); } spin_unlock_bh(&obd->obd_processing_task_lock); } - EXIT; + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) { + /* client declares he's ready to complete recovery + * so, we put the request on th final queue */ + spin_lock_bh(&obd->obd_processing_task_lock); + if (exp->exp_lock_replay_needed) { + LASSERT(atomic_read(&obd->obd_lock_replay_clients) > 0); + exp->exp_lock_replay_needed = 0; + atomic_dec(&obd->obd_lock_replay_clients); + if (atomic_read(&obd->obd_lock_replay_clients) == 0) + CDEBUG(D_HA, "all clients have replayed locks\n"); + wake_up(&obd->obd_next_transno_waitq); + } + spin_unlock_bh(&obd->obd_processing_task_lock); + } + + return 0; } int target_queue_recovery_request(struct ptlrpc_request *req, @@ -1197,7 +1552,40 @@ int target_queue_recovery_request(struct ptlrpc_request *req, struct list_head *tmp; int inserted = 0; __u64 transno = lustre_msg_get_transno(req->rq_reqmsg); - struct ptlrpc_request *saved_req; + + ENTRY; + + if (obd->obd_recovery_data.trd_processing_task == current->pid) { + /* Processing the queue right now, don't re-add. */ + return 1; + } + + target_process_req_flags(obd, req); + + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) { + /* client declares he's ready to complete recovery + * so, we put the request on th final queue */ + req = ptlrpc_clone_req(req); + if (req == NULL) + return -ENOMEM; + DEBUG_REQ(D_HA, req, "queue final req"); + spin_lock_bh(&obd->obd_processing_task_lock); + list_add_tail(&req->rq_list, &obd->obd_final_req_queue); + spin_unlock_bh(&obd->obd_processing_task_lock); + return 0; + } + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) { + /* client declares he's ready to replay locks */ + req = ptlrpc_clone_req(req); + if (req == NULL) + return -ENOMEM; + DEBUG_REQ(D_HA, req, "queue lock replay req"); + spin_lock_bh(&obd->obd_processing_task_lock); + list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue); + spin_unlock_bh(&obd->obd_processing_task_lock); + wake_up(&obd->obd_next_transno_waitq); + return 0; + } /* CAVEAT EMPTOR: The incoming request message has been swabbed * (i.e. buflens etc are in my own byte order), but type-dependent @@ -1209,48 +1597,43 @@ int target_queue_recovery_request(struct ptlrpc_request *req, return 1; } - /* XXX If I were a real man, these LBUGs would be sane cleanups. */ - saved_req = target_save_req(req); - if (!saved_req) - LBUG(); - spin_lock_bh(&obd->obd_processing_task_lock); - /* - * If we're processing the queue, we don't want to queue this message. + /* If we're processing the queue, we want don't want to queue this + * message. * * Also, if this request has a transno less than the one we're waiting * for, we should process it now. It could (and currently always will) * be an open request for a descriptor that was opened some time ago. * - * Also, a resent, replayed request that has already been handled will - * pass through here and be processed immediately. + * Also, a resent, replayed request that has already been + * handled will pass through here and be processed immediately. */ - if (obd->obd_processing_task == cfs_curproc_pid() || - transno < obd->obd_next_recovery_transno) { + CWARN("Next recovery transno: "LPX64", current: "LPX64", replaying: %i\n", + obd->obd_next_recovery_transno, transno, obd->obd_req_replaying); + if (transno <= obd->obd_next_recovery_transno && obd->obd_req_replaying) { /* Processing the queue right now, don't re-add. */ LASSERT(list_empty(&req->rq_list)); spin_unlock_bh(&obd->obd_processing_task_lock); - - target_release_saved_req(saved_req); return 1; } - + spin_unlock_bh(&obd->obd_processing_task_lock); + /* A resent, replayed request that is still on the queue; just drop it. The queued request will handle this. */ - if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT|MSG_REPLAY)) == - (MSG_RESENT | MSG_REPLAY)) { + if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT|MSG_REPLAY)) + == (MSG_RESENT | MSG_REPLAY)) { DEBUG_REQ(D_ERROR, req, "dropping resent queued req"); - spin_unlock_bh(&obd->obd_processing_task_lock); - - target_release_saved_req(saved_req); return 0; } - req = saved_req; + req = ptlrpc_clone_req(req); + if (req == NULL) + return -ENOMEM; + spin_lock_bh(&obd->obd_processing_task_lock); /* XXX O(n^2) */ - list_for_each(tmp, &obd->obd_recovery_queue) { + list_for_each(tmp, &obd->obd_req_replay_queue) { struct ptlrpc_request *reqiter = list_entry(tmp, struct ptlrpc_request, rq_list); @@ -1261,28 +1644,12 @@ int target_queue_recovery_request(struct ptlrpc_request *req, } } - if (!inserted) { - list_add_tail(&req->rq_list, &obd->obd_recovery_queue); - } + if (!inserted) + list_add_tail(&req->rq_list, &obd->obd_req_replay_queue); obd->obd_requests_queued_for_recovery++; - - if (obd->obd_processing_task != 0) { - /* Someone else is processing this queue, we'll leave it to - * them. - */ - cfs_waitq_signal(&obd->obd_next_transno_waitq); - spin_unlock_bh(&obd->obd_processing_task_lock); - return 0; - } - - /* Nobody is processing, and we know there's (at least) one to process - * now, so we'll do the honours. - */ - obd->obd_processing_task = cfs_curproc_pid(); + wake_up(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); - - process_recovery_queue(obd); return 0; } @@ -1291,84 +1658,15 @@ struct obd_device * target_req2obd(struct ptlrpc_request *req) return req->rq_export->exp_obd; } -int target_queue_final_reply(struct ptlrpc_request *req, int rc) -{ - struct obd_device *obd = target_req2obd(req); - struct ptlrpc_request *saved_req; - int recovery_done = 0; - - LASSERT ((rc == 0) == (req->rq_reply_state != NULL)); - - if (rc) { - /* Just like ptlrpc_error, but without the sending. */ - rc = lustre_pack_reply(req, 1, NULL, NULL); - LASSERT(rc == 0); /* XXX handle this */ - req->rq_type = PTL_RPC_MSG_ERR; - } - - LASSERT (!req->rq_reply_state->rs_difficult); - LASSERT(list_empty(&req->rq_list)); - - saved_req = target_save_req(req); - if (!saved_req) - LBUG(); - - /* Don't race cleanup */ - spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_stopping) { - spin_unlock_bh(&obd->obd_processing_task_lock); - target_release_saved_req(saved_req); - req->rq_status = -ENOTCONN; - /* rv is ignored anyhow */ - return -ENOTCONN; - } - - req = saved_req; - list_add(&req->rq_list, &obd->obd_delayed_reply_queue); - - /* only count the first "replay over" request from each - export */ - if (req->rq_export->exp_replay_needed) { - --obd->obd_recoverable_clients; - req->rq_export->exp_replay_needed = 0; - } - recovery_done = (obd->obd_recoverable_clients == 0); - spin_unlock_bh(&obd->obd_processing_task_lock); - - OBD_RACE(OBD_FAIL_LDLM_RECOV_CLIENTS); - if (recovery_done) { - spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_recovering = obd->obd_abort_recovery = 0; - target_cancel_recovery_timer(obd); - spin_unlock_bh(&obd->obd_processing_task_lock); - - target_finish_recovery(obd); - CDEBUG(D_HA, "%s: recovery complete\n", - obd_uuid2str(&obd->obd_uuid)); - } else { - CWARN("%s: %d recoverable clients remain\n", - obd->obd_name, obd->obd_recoverable_clients); - cfs_waitq_signal(&obd->obd_next_transno_waitq); - } - - return 1; -} - -int -target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id) +int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id) { if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; - DEBUG_REQ(D_ERROR, req, "dropping reply"); + DEBUG_REQ(D_ERROR, req, "dropping reply"); return (-ECOMM); } - if (rc || req->rq_reply_state == NULL) { - if (rc == 0) { - DEBUG_REQ(D_ERROR, req, "no reply message packed"); - rc = -ENOMEM; - } else - DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); + if (rc) { req->rq_status = rc; return (ptlrpc_error(req)); } @@ -1377,8 +1675,7 @@ target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id) return (ptlrpc_send_reply(req, 1)); } -void -target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) +void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) { int netrc; struct ptlrpc_reply_state *rs; @@ -1486,8 +1783,8 @@ void target_committed_to_req(struct ptlrpc_request *req) else DEBUG_REQ(D_IOCTL, req, "not sending last_committed update"); - CDEBUG(D_INFO, "last_committed "LPU64", xid "LPU64"\n", - obd->obd_last_committed, req->rq_xid); + CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n", + obd->obd_last_committed, req->rq_transno, req->rq_xid); } EXPORT_SYMBOL(target_committed_to_req); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 88821ec..0f63667 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1928,16 +1928,16 @@ EXPORT_SYMBOL(client_obd_setup); EXPORT_SYMBOL(client_obd_cleanup); EXPORT_SYMBOL(client_connect_import); EXPORT_SYMBOL(client_disconnect_export); -EXPORT_SYMBOL(target_abort_recovery); -EXPORT_SYMBOL(target_cleanup_recovery); +EXPORT_SYMBOL(target_start_recovery_thread); +EXPORT_SYMBOL(target_stop_recovery_thread); EXPORT_SYMBOL(target_handle_connect); +EXPORT_SYMBOL(target_cleanup_recovery); EXPORT_SYMBOL(target_destroy_export); EXPORT_SYMBOL(target_cancel_recovery_timer); EXPORT_SYMBOL(target_send_reply); EXPORT_SYMBOL(target_queue_recovery_request); EXPORT_SYMBOL(target_handle_ping); EXPORT_SYMBOL(target_handle_disconnect); -EXPORT_SYMBOL(target_queue_final_reply); /* l_lock.c */ EXPORT_SYMBOL(lock_res_and_lock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 25a319c..a934e5d 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1282,6 +1282,11 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) size[DLM_REPLY_REC_OFF] = lock->l_lvb_len; } ptlrpc_req_set_repsize(req, buffers, size); + /* notify the server we've replayed all requests. + * also, we mark the request to be put on a dedicated + * queue to be processed after all request replayes. + * bug 6063 */ + lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE); LDLM_DEBUG(lock, "replaying lock:"); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 1be79c3..fba849d 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -319,19 +319,12 @@ static int mds_connect(const struct lu_env *env, struct obd_export *exp; struct mds_export_data *med; struct mds_client_data *mcd = NULL; - int rc, abort_recovery; + int rc; ENTRY; if (!conn || !obd || !cluuid) RETURN(-EINVAL); - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(obd); - /* XXX There is a small race between checking the list and adding a * new connection for the same UUID, but the real threat (list * corruption when multiple different clients connect) is solved. @@ -1444,7 +1437,7 @@ int mds_handle(struct ptlrpc_request *req) /* XXX identical to OST */ if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) { struct mds_export_data *med; - int recovering, abort_recovery; + int recovering; if (req->rq_export == NULL) { CERROR("operation %d on unconnected MDS from %s\n", @@ -1479,16 +1472,18 @@ int mds_handle(struct ptlrpc_request *req) /* Check for aborted recovery. */ spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; recovering = obd->obd_recovering; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { + if (recovering) { rc = mds_filter_recovery_request(req, obd, &should_process); if (rc || !should_process) RETURN(rc); + else if (should_process < 0) { + req->rq_status = should_process; + rc = ptlrpc_error(req); + RETURN(rc); + } } } @@ -1496,7 +1491,7 @@ int mds_handle(struct ptlrpc_request *req) case MDS_CONNECT: DEBUG_REQ(D_INODE, req, "connect"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0); - rc = target_handle_connect(req, mds_handle); + rc = target_handle_connect(req); if (!rc) { /* Now that we have an export, set mds. */ /* @@ -1747,15 +1742,6 @@ int mds_handle(struct ptlrpc_request *req) EXIT; out: - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { - if (obd && obd->obd_recovering) { - DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); - return target_queue_final_reply(req, rc); - } - /* Lost a race with recovery; let the error path DTRT. */ - rc = req->rq_status = -ENOTCONN; - } - target_send_reply(req, rc, fail); return 0; } diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index b3ce673..198e0cb 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -365,7 +365,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file) mcd = NULL; - exp->exp_replay_needed = 1; + exp->exp_req_replay_needed = 1; exp->exp_connecting = 0; obd->obd_recoverable_clients++; obd->obd_max_recoverable_clients++; @@ -384,6 +384,8 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file) obd->obd_last_committed = mds->mds_last_transno; if (obd->obd_recoverable_clients) { + /* shouldn't happen in b_new_cmd */ + LBUG(); CWARN("RECOVERY: service %s, %d recoverable clients, " "last_transno "LPU64"\n", obd->obd_name, obd->obd_recoverable_clients, mds->mds_last_transno); diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 892efa1..028d290 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -614,7 +614,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case OBD_IOC_ABORT_RECOVERY: CERROR("aborting recovery for device %s\n", obd->obd_name); - target_abort_recovery(obd); + target_stop_recovery_thread(obd); RETURN(0); default: diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 82f3a9f..6d13fbe 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -359,7 +359,7 @@ void mds_steal_ack_locks(struct ptlrpc_request *req) } spin_unlock(&exp->exp_lock); } - +EXPORT_SYMBOL(mds_steal_ack_locks); void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd) { if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) { diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 54e9b51..db7f3d3 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -125,7 +125,6 @@ static struct mdt_opc_slice mdt_fld_handlers[]; static struct mdt_device *mdt_dev(struct lu_device *d); static int mdt_regular_handle(struct ptlrpc_request *req); -static int mdt_recovery_handle(struct ptlrpc_request *req); static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags); static struct lu_object_operations mdt_obj_ops; @@ -675,7 +674,7 @@ static int mdt_connect(struct mdt_thread_info *info) struct ptlrpc_request *req; req = mdt_info_req(info); - rc = target_handle_connect(req, mdt_recovery_handle); + rc = target_handle_connect(req); if (rc == 0) { LASSERT(req->rq_export != NULL); info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); @@ -1643,7 +1642,7 @@ static int mdt_req_handle(struct mdt_thread_info *info, } /* If we're DISCONNECTing, the mdt_export_data is already freed */ - if (rc == 0 && h->mh_opc != MDS_DISCONNECT) + if (h->mh_opc != MDS_DISCONNECT) target_committed_to_req(req); RETURN(rc); @@ -1665,6 +1664,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, { int i; + LASSERT(info->mti_env != req->rq_svc_thread->t_env); memset(info, 0, sizeof(*info)); info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size); @@ -1696,6 +1696,7 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info) } for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) mdt_lock_handle_fini(&info->mti_lh[i]); + info->mti_env = NULL; } /* mds/handler.c */ @@ -1711,7 +1712,6 @@ static int mdt_recovery(struct mdt_thread_info *info) { struct ptlrpc_request *req = mdt_info_req(info); int recovering; - int abort_recovery; struct obd_device *obd; ENTRY; @@ -1730,7 +1730,8 @@ static int mdt_recovery(struct mdt_thread_info *info) lustre_msg_get_opc(req->rq_reqmsg), libcfs_id2str(req->rq_peer)); req->rq_status = -ENOTCONN; - RETURN(-ENOTCONN); + target_send_reply(req, -ENOTCONN, info->mti_fail_id); + RETURN(0); } /* sanity check: if the xid matches, the request must be marked as a @@ -1741,6 +1742,7 @@ static int mdt_recovery(struct mdt_thread_info *info) (MSG_RESENT | MSG_REPLAY))) { CERROR("rq_xid "LPU64" matches last_xid, " "expected RESENT flag\n", req->rq_xid); + LBUG(); req->rq_status = -ENOTCONN; RETURN(-ENOTCONN); } @@ -1756,18 +1758,20 @@ static int mdt_recovery(struct mdt_thread_info *info) /* Check for aborted recovery... */ spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; recovering = obd->obd_recovering; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { + if (recovering) { int rc; int should_process; - + DEBUG_REQ(D_WARNING, req, "Got new replay"); rc = mds_filter_recovery_request(req, obd, &should_process); if (rc != 0 || !should_process) RETURN(rc); + else if (should_process < 0) { + req->rq_status = should_process; + rc = ptlrpc_error(req); + RETURN(rc); + } } RETURN(+1); } @@ -1775,24 +1779,7 @@ static int mdt_recovery(struct mdt_thread_info *info) static int mdt_reply(struct ptlrpc_request *req, int rc, struct mdt_thread_info *info) { - struct obd_device *obd; ENTRY; - - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { - if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING) - DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY"); - - obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL; - if (obd && obd->obd_recovering) { - DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); - RETURN(target_queue_final_reply(req, rc)); - } else { - /* - * Lost a race with recovery; let the error path DTRT. - */ - rc = req->rq_status = -ENOTCONN; - } - } target_send_reply(req, rc, info->mti_fail_id); RETURN(0); } @@ -1868,7 +1855,7 @@ static int mdt_handle_common(struct ptlrpc_request *req, * This is called from recovery code as handler of _all_ RPC types, FLD and SEQ * as well. */ -static int mdt_recovery_handle(struct ptlrpc_request *req) +int mdt_recovery_handle(struct ptlrpc_request *req) { int rc; ENTRY; @@ -3105,9 +3092,15 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) ENTRY; target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd); + mdt_fs_cleanup(env, m); + ping_evictor_stop(); mdt_stop_ptlrpc_service(m); + cleanup_capas(CAPA_SITE_SERVER); + del_timer(&m->mdt_ck_timer); + mdt_ck_thread_stop(m); + upcall_cache_cleanup(m->mdt_rmtacl_cache); m->mdt_rmtacl_cache = NULL; @@ -3128,12 +3121,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) m->mdt_rootsquash_info = NULL; } - cleanup_capas(CAPA_SITE_SERVER); - del_timer(&m->mdt_ck_timer); - mdt_ck_thread_stop(m); - - mdt_fs_cleanup(env, m); - /* finish the stack */ mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); @@ -3273,6 +3260,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_capa, rc); ping_evictor_start(); + rc = mdt_fs_setup(env, m, obd); if (rc) GOTO(err_stop_service, rc); @@ -3810,7 +3798,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case OBD_IOC_ABORT_RECOVERY: CERROR("aborting recovery for device %s\n", obd->obd_name); - target_abort_recovery(obd); + target_stop_recovery_thread(obd); break; default: diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 447019a..0c35829 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -434,6 +434,8 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt, struct mdt_export_data *med); +int mdt_recovery_handle(struct ptlrpc_request *); + int mdt_pin(struct mdt_thread_info* info); int mdt_lock_new_child(struct mdt_thread_info *info, diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index ce7a46a..0e3e404 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -286,16 +286,14 @@ static void mdt_open_transno(struct mdt_thread_info* info) struct mdt_device *mdt = info->mti_mdt; struct ptlrpc_request *req = mdt_info_req(info); - if (info->mti_transno != 0) { - CDEBUG(D_INODE, "(open | create) | replay: transno = %llu," - " last_committed = %llu\n", - info->mti_transno, - req->rq_export->exp_obd->obd_last_committed); - return; - } - spin_lock(&mdt->mdt_transno_lock); - info->mti_transno = ++ mdt->mdt_last_transno; + if (info->mti_transno == 0) { + info->mti_transno = ++ mdt->mdt_last_transno; + } else { + /* should be replay */ + if (info->mti_transno > mdt->mdt_last_transno) + mdt->mdt_last_transno = info->mti_transno; + } spin_unlock(&mdt->mdt_transno_lock); CDEBUG(D_INODE, "open only: transno = %llu, last_committed = %llu\n", @@ -304,7 +302,6 @@ static void mdt_open_transno(struct mdt_thread_info* info) req->rq_transno = info->mti_transno; lustre_msg_set_transno(req->rq_repmsg, info->mti_transno); - target_committed_to_req(req); lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid); } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index bd91703..feeba27 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -385,10 +385,14 @@ static int mdt_clients_data_init(const struct lu_env *env, rc = mdt_client_add(env, mdt, med, cl_idx); LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */ mcd = NULL; - exp->exp_replay_needed = 1; exp->exp_connecting = 0; obd->obd_recoverable_clients++; obd->obd_max_recoverable_clients++; + atomic_inc(&obd->obd_req_replay_clients); + exp->exp_req_replay_needed = 1; + atomic_inc(&obd->obd_lock_replay_clients); + exp->exp_lock_replay_needed = 1; + class_export_put(exp); CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n", @@ -537,7 +541,7 @@ static int mdt_server_data_init(const struct lu_env *env, "last_transno "LPU64"\n", obd->obd_name, obd->obd_recoverable_clients, mdt->mdt_last_transno); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_recovering = 1; + target_start_recovery_thread(obd, mdt_recovery_handle); obd->obd_recovery_start = CURRENT_SECONDS; /* Only used for lprocfs_status */ obd->obd_recovery_end = obd->obd_recovery_start + @@ -780,7 +784,7 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti, off = med->med_lr_off; mutex_down(&med->med_mcd_lock); - if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) { + if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) { mcd->mcd_last_close_transno = mti->mti_transno; mcd->mcd_last_close_xid = req->rq_xid; mcd->mcd_last_close_result = rc; @@ -869,7 +873,7 @@ static int mdt_txn_stop_cb(const struct lu_env *env, req->rq_transno = mti->mti_transno; lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno); - target_committed_to_req(req); + //target_committed_to_req(req); lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req)); /* save transno for the commit callback */ txi->txi_transno = mti->mti_transno; @@ -980,6 +984,7 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt) } /* reconstruction code */ +extern void mds_steal_ack_locks(struct ptlrpc_request *req); void mdt_req_from_mcd(struct ptlrpc_request *req, struct mdt_client_data *mcd) { @@ -997,7 +1002,7 @@ void mdt_req_from_mcd(struct ptlrpc_request *req, lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); lustre_msg_set_status(req->rq_repmsg, req->rq_status); } - //mds_steal_ack_locks(req); + mds_steal_ack_locks(req); } static void mdt_reconstruct_generic(struct mdt_thread_info *mti, diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index 4ff9bbd..cf1f9f5 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -475,7 +475,7 @@ int mgs_handle(struct ptlrpc_request *req) switch (opc) { case MGS_CONNECT: DEBUG_REQ(D_MGS, req, "connect"); - rc = target_handle_connect(req, mgs_handle); + rc = target_handle_connect(req); if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1)) /* Make clients trying to reconnect after a MGS restart happy; also requires obd_replayable */ diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 86dbd59..0d89aea 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -942,7 +942,8 @@ EXPORT_SYMBOL(class_disconnect_exports); /* Remove exports that have not completed recovery. */ -void class_disconnect_stale_exports(struct obd_device *obd) +int class_disconnect_stale_exports(struct obd_device *obd, + int (*test_export)(struct obd_export *)) { struct list_head work_list; struct list_head *pos, *n; @@ -954,18 +955,23 @@ void class_disconnect_stale_exports(struct obd_device *obd) spin_lock(&obd->obd_dev_lock); list_for_each_safe(pos, n, &obd->obd_exports) { exp = list_entry(pos, struct obd_export, exp_obd_chain); - if (exp->exp_replay_needed) { - list_del(&exp->exp_obd_chain); - list_add(&exp->exp_obd_chain, &work_list); - cnt++; - } + if (test_export(exp)) + continue; + + list_del(&exp->exp_obd_chain); + list_add(&exp->exp_obd_chain, &work_list); + cnt++; + CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n", + obd->obd_name, exp->exp_client_uuid.uuid, + exp->exp_connection == NULL ? "" : + libcfs_nid2str(exp->exp_connection->c_peer.nid)); } spin_unlock(&obd->obd_dev_lock); CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n", obd->obd_name, cnt); class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd)); - EXIT; + RETURN(cnt); } EXPORT_SYMBOL(class_disconnect_stale_exports); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 6e8bf9f..ab35ec4 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -198,8 +198,9 @@ int class_attach(struct lustre_cfg *lcfg) cfs_init_timer(&obd->obd_recovery_timer); spin_lock_init(&obd->obd_processing_task_lock); cfs_waitq_init(&obd->obd_next_transno_waitq); - CFS_INIT_LIST_HEAD(&obd->obd_recovery_queue); - CFS_INIT_LIST_HEAD(&obd->obd_delayed_reply_queue); + CFS_INIT_LIST_HEAD(&obd->obd_req_replay_queue); + CFS_INIT_LIST_HEAD(&obd->obd_lock_replay_queue); + CFS_INIT_LIST_HEAD(&obd->obd_final_req_queue); spin_lock_init(&obd->obd_uncommitted_replies_lock); CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index a322154..17446d1 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -541,7 +541,7 @@ int filter_update_last_objid(struct obd_device *obd, obd_gr group, group, rc); RETURN(rc); } - +extern int ost_handle(struct ptlrpc_request *req); /* assumes caller has already in kernel ctxt */ static int filter_init_server_data(struct obd_device *obd, struct file * filp) { @@ -702,8 +702,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */ fcd = NULL; - exp->exp_replay_needed = 1; + exp->exp_req_replay_needed = 1; + exp->exp_lock_replay_needed = 1; exp->exp_connecting = 0; + atomic_inc(&obd->obd_req_replay_clients); + atomic_inc(&obd->obd_lock_replay_clients); obd->obd_recoverable_clients++; obd->obd_max_recoverable_clients++; class_export_put(exp); @@ -728,7 +731,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) obd->obd_recoverable_clients, le64_to_cpu(fsd->lsd_last_transno)); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_recovering = 1; + target_start_recovery_thread(obd, ost_handle); obd->obd_recovery_start = CURRENT_SECONDS; /* Only used for lprocfs_status */ obd->obd_recovery_end = obd->obd_recovery_start + @@ -3768,7 +3771,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp, switch (cmd) { case OBD_IOC_ABORT_RECOVERY: { CERROR("aborting recovery for device %s\n", obd->obd_name); - target_abort_recovery(obd); + target_stop_recovery_thread(obd); RETURN(0); } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 8def3b1..e55c527 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1313,7 +1313,7 @@ int ost_msg_check_version(struct lustre_msg *msg) return rc; } -static int ost_handle(struct ptlrpc_request *req) +int ost_handle(struct ptlrpc_request *req) { struct obd_trans_info trans_info = { 0, }; struct obd_trans_info *oti = &trans_info; @@ -1333,7 +1333,7 @@ static int ost_handle(struct ptlrpc_request *req) /* XXX identical to MDS */ if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) { - int abort_recovery, recovering; + int recovering; if (req->rq_export == NULL) { CDEBUG(D_HA,"operation %d on unconnected OST from %s\n", @@ -1347,16 +1347,18 @@ static int ost_handle(struct ptlrpc_request *req) /* Check for aborted recovery. */ spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; recovering = obd->obd_recovering; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { + if (recovering) { rc = ost_filter_recovery_request(req, obd, &should_process); if (rc || !should_process) RETURN(rc); + else if (should_process < 0) { + req->rq_status = should_process; + rc = ptlrpc_error(req); + RETURN(rc); + } } } @@ -1369,7 +1371,7 @@ static int ost_handle(struct ptlrpc_request *req) case OST_CONNECT: { CDEBUG(D_INODE, "connect\n"); OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0); - rc = target_handle_connect(req, ost_handle); + rc = target_handle_connect(req); if (!rc) obd = req->rq_export->exp_obd; break; @@ -1519,22 +1521,13 @@ static int ost_handle(struct ptlrpc_request *req) target_committed_to_req(req); out: - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { - if (obd && obd->obd_recovering) { - DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); - return target_queue_final_reply(req, rc); - } - /* Lost a race with recovery; let the error path DTRT. */ - rc = req->rq_status = -ENOTCONN; - } - if (!rc) oti_to_request(oti, req); target_send_reply(req, rc, fail); return 0; } - +EXPORT_SYMBOL(ost_handle); /* * free per-thread pool created by ost_thread_init(). */ diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 4e37dea..371a48a 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -703,9 +703,15 @@ static int after_reply(struct ptlrpc_request *req) } /* Replay-enabled imports return commit-status information. */ - if (lustre_msg_get_last_committed(req->rq_repmsg)) + if (lustre_msg_get_last_committed(req->rq_repmsg)) { + if (imp->imp_peer_committed_transno > + lustre_msg_get_last_committed(req->rq_repmsg)) + CERROR(" Import has "LPU64", receive "LPU64" committed\n", + imp->imp_peer_committed_transno, + lustre_msg_get_last_committed(req->rq_repmsg)); imp->imp_peer_committed_transno = lustre_msg_get_last_committed(req->rq_repmsg); + } ptlrpc_free_committed(imp); spin_unlock(&imp->imp_lock); } @@ -1808,6 +1814,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req, } spin_lock(&imp->imp_lock); + LASSERT(req->rq_transno <= imp->imp_last_replay_transno); imp->imp_last_replay_transno = req->rq_transno; spin_unlock(&imp->imp_lock); diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index 740450c..46bb13c 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -327,10 +327,27 @@ static int import_select_connection(struct obd_import *imp) RETURN(0); } +/* + * must be called under imp_lock + */ +int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno) +{ + struct ptlrpc_request *req; + struct list_head *tmp; + + if (list_empty(&imp->imp_replay_list)) + return 0; + tmp = imp->imp_replay_list.next; + req = list_entry(tmp, struct ptlrpc_request, rq_replay_list); + *transno = req->rq_transno; + return 1; +} + int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid) { struct obd_device *obd = imp->imp_obd; int initial_connect = 0; + int set_transno = 0; int rc; __u64 committed_before_reconnect = 0; struct ptlrpc_request *request; @@ -373,6 +390,7 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid) else committed_before_reconnect = imp->imp_peer_committed_transno; + set_transno = ptlrpc_first_transno(imp, &imp->imp_connect_data.ocd_transno); spin_unlock(&imp->imp_lock); if (new_uuid) { @@ -444,8 +462,15 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid) /* On an initial connect, we don't know which one of a failover server pair is up. Don't wait long. */ request->rq_timeout = max((int)(obd_timeout / 20), 5); + lustre_msg_add_op_flags(request->rq_reqmsg, + MSG_CONNECT_INITIAL); + } + if (set_transno) + lustre_msg_add_op_flags(request->rq_reqmsg, + MSG_CONNECT_TRANSNO); + DEBUG_REQ(D_RPCTRACE, request, "(re)connect request"); ptlrpcd_add_req(request); rc = 0; @@ -803,7 +828,8 @@ static int signal_completed_replay(struct obd_import *imp) ptlrpc_req_set_repsize(req, 1, NULL); req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT; - lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY); + lustre_msg_add_flags(req->rq_reqmsg, + MSG_LOCK_REPLAY_DONE | MSG_REQ_REPLAY_DONE); req->rq_timeout *= 3; req->rq_interpret_reply = completed_replay_interpret; diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 9177ae1..2e23555 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1501,18 +1501,18 @@ void lustre_swab_ptlrpc_body(struct ptlrpc_body *b) void lustre_swab_connect(struct obd_connect_data *ocd) { __swab64s(&ocd->ocd_connect_flags); + __swab64s(&ocd->ocd_transno); + __swab64s(&ocd->ocd_ibits_known); __swab32s(&ocd->ocd_version); __swab32s(&ocd->ocd_grant); __swab32s(&ocd->ocd_index); __swab32s(&ocd->ocd_brw_size); - __swab64s(&ocd->ocd_ibits_known); __swab32s(&ocd->ocd_nllu); __swab32s(&ocd->ocd_nllg); __swab32s(&ocd->ocd_group); CLASSERT(offsetof(typeof(*ocd), padding1) != 0); CLASSERT(offsetof(typeof(*ocd), padding2) != 0); CLASSERT(offsetof(typeof(*ocd), padding3) != 0); - CLASSERT(offsetof(typeof(*ocd), padding4) != 0); } void lustre_swab_obdo (struct obdo *o) diff --git a/lustre/ptlrpc/recover.c b/lustre/ptlrpc/recover.c index 1849656..26ac533 100644 --- a/lustre/ptlrpc/recover.c +++ b/lustre/ptlrpc/recover.c @@ -75,7 +75,6 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight) ptlrpc_free_committed(imp); last_transno = imp->imp_last_replay_transno; spin_unlock(&imp->imp_lock); - CDEBUG(D_HA, "import %p from %s committed "LPU64" last "LPU64"\n", imp, obd2cli_tgt(imp->imp_obd), imp->imp_peer_committed_transno, last_transno); @@ -97,7 +96,6 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight) */ list_for_each_safe(tmp, pos, &imp->imp_replay_list) { req = list_entry(tmp, struct ptlrpc_request, rq_replay_list); - /* If need to resend the last sent transno (because a reconnect has occurred), then stop on the matching req and send it again. If, however, the last sent diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index b97130f..d4955c3 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -135,17 +135,17 @@ static void check_obd_connect_data(void) BLANK_LINE(); CHECK_STRUCT(obd_connect_data); CHECK_MEMBER(obd_connect_data, ocd_connect_flags); + CHECK_MEMBER(obd_connect_data, ocd_transno); + CHECK_MEMBER(obd_connect_data, ocd_ibits_known); CHECK_MEMBER(obd_connect_data, ocd_version); CHECK_MEMBER(obd_connect_data, ocd_grant); CHECK_MEMBER(obd_connect_data, ocd_index); CHECK_MEMBER(obd_connect_data, ocd_brw_size); - CHECK_MEMBER(obd_connect_data, ocd_ibits_known); CHECK_MEMBER(obd_connect_data, ocd_nllu); CHECK_MEMBER(obd_connect_data, ocd_nllg); CHECK_MEMBER(obd_connect_data, padding1); CHECK_MEMBER(obd_connect_data, padding2); CHECK_MEMBER(obd_connect_data, padding3); - CHECK_MEMBER(obd_connect_data, padding4); CHECK_CDEFINE(OBD_CONNECT_RDONLY); CHECK_CDEFINE(OBD_CONNECT_INDEX);