From 848d709605948e32b2dcfa2a4ffdfa829f83565c Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Sat, 21 Aug 2010 14:01:48 +0400 Subject: [PATCH] b=17760 Separate locking for obd bitfield and recovery i=adilger i=zam --- lustre/include/obd.h | 16 +++- lustre/ldlm/ldlm_lib.c | 190 ++++++++++++++++++------------------- lustre/mdd/mdd_device.c | 2 + lustre/mdd/mdd_lov.c | 2 + lustre/mdt/mdt_handler.c | 4 +- lustre/obdclass/genops.c | 4 +- lustre/obdclass/obd_config.c | 2 +- lustre/obdfilter/filter.c | 7 +- lustre/obdfilter/filter_log.c | 4 +- lustre/obdfilter/lproc_obdfilter.c | 16 ++-- lustre/ost/ost_handler.c | 10 +- lustre/ptlrpc/target.c | 16 ++-- 12 files changed, 135 insertions(+), 138 deletions(-) diff --git a/lustre/include/obd.h b/lustre/include/obd.h index bb56150..e9a70be 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -300,6 +300,7 @@ struct filter_obd { int fo_tot_granted_clients; obd_size fo_readcache_max_filesize; + cfs_spinlock_t fo_flags_lock; int fo_read_cache:1, /**< enable read-only cache */ fo_writethrough_cache:1,/**< read cache writes */ fo_mds_ost_sync:1, /**< MDS-OST orphan recovery*/ @@ -977,6 +978,7 @@ struct obd_llog_group { #define MAX_OBD_NAME 128 #define OBD_DEVICE_MAGIC 0XAB5CD6EF #define OBD_DEV_BY_DEVNAME 0xffffd0de + struct obd_device { struct obd_type *obd_type; __u32 obd_magic; @@ -988,16 +990,15 @@ struct obd_device { struct lu_device *obd_lu_dev; int obd_minor; + /* bitfield modification is protected by obd_dev_lock */ unsigned long obd_attached:1, /* finished attach */ obd_set_up:1, /* finished setup */ obd_recovering:1, /* there are recoverable clients */ obd_abort_recovery:1,/* recovery expired */ obd_version_recov:1, /* obd uses version checking */ - obd_recovery_expired:1, obd_replayable:1, /* recovery is enabled; inform clients */ obd_no_transno:1, /* no committed-transno notification */ obd_no_recov:1, /* fail instead of retry messages */ - obd_req_replaying:1, /* replaying requests */ obd_stopping:1, /* started cleanup */ obd_starting:1, /* started setup */ obd_force:1, /* cleanup with > 0 obd refcount */ @@ -1007,6 +1008,9 @@ struct obd_device { obd_inactive:1, /* device active/inactive * (for /proc/status only!!) */ obd_process_conf:1; /* device is processing mgs config */ + /* use separate field as it is set in interrupt to don't mess with + * protection of other bits using _bh lock */ + unsigned long obd_recovery_expired:1; /* uuid-export hash body */ cfs_hash_t *obd_uuid_hash; /* nid-export hash body */ @@ -1024,7 +1028,7 @@ struct obd_device { struct ldlm_namespace *obd_namespace; struct ptlrpc_client obd_ldlm_client; /* XXX OST/MDS only */ /* a spinlock is OK for what we do now, may need a semaphore later */ - cfs_spinlock_t obd_dev_lock; + cfs_spinlock_t obd_dev_lock; /* protects obd bitfield above */ cfs_semaphore_t obd_dev_sem; __u64 obd_last_committed; struct fsfilt_operations *obd_fsops; @@ -1045,11 +1049,14 @@ struct obd_device { int obd_connected_clients; int obd_stale_clients; int obd_delayed_clients; - cfs_spinlock_t obd_processing_task_lock; /* BH lock (timer) */ + /* this lock protects all recovery list_heads, timer and + * obd_next_recovery_transno value */ + cfs_spinlock_t obd_recovery_task_lock; __u64 obd_next_recovery_transno; int obd_replayed_requests; int obd_requests_queued_for_recovery; cfs_waitq_t obd_next_transno_waitq; + /* protected by obd_recovery_task_lock */ cfs_timer_t obd_recovery_timer; time_t obd_recovery_start; /* seconds */ time_t obd_recovery_end; /* seconds, for lprocfs_status */ @@ -1061,6 +1068,7 @@ struct obd_device { int obd_replayed_locks; cfs_atomic_t obd_req_replay_clients; cfs_atomic_t obd_lock_replay_clients; + /* all lists are protected by obd_recovery_task_lock */ cfs_list_t obd_req_replay_queue; cfs_list_t obd_lock_replay_queue; cfs_list_t obd_final_req_queue; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index f1dbcd2..1cc8f9f 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -992,7 +992,7 @@ dont_check_exports: &export->exp_nid_hash); } - cfs_spin_lock_bh(&target->obd_processing_task_lock); + cfs_spin_lock(&target->obd_recovery_task_lock); if (target->obd_recovering && !export->exp_in_recovery) { cfs_spin_lock(&export->exp_lock); export->exp_in_recovery = 1; @@ -1013,7 +1013,7 @@ dont_check_exports: target->obd_max_recoverable_clients) cfs_waitq_signal(&target->obd_next_transno_waitq); } - cfs_spin_unlock_bh(&target->obd_processing_task_lock); + cfs_spin_unlock(&target->obd_recovery_task_lock); tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN); conn = *tmp; @@ -1186,7 +1186,7 @@ static void target_finish_recovery(struct obd_device *obd) obd->obd_name); ldlm_reprocess_all_ns(obd->obd_namespace); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (!cfs_list_empty(&obd->obd_req_replay_queue) || !cfs_list_empty(&obd->obd_lock_replay_queue) || !cfs_list_empty(&obd->obd_final_req_queue)) { @@ -1197,10 +1197,10 @@ static void target_finish_recovery(struct obd_device *obd) "" : "lock ", cfs_list_empty(&obd->obd_final_req_queue) ? \ "" : "final "); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); LBUG(); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); obd->obd_recovery_end = cfs_time_current_sec(); @@ -1220,9 +1220,9 @@ static void abort_req_replay_queue(struct obd_device *obd) cfs_list_t abort_list; CFS_INIT_LIST_HEAD(&abort_list); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); cfs_list_splice_init(&obd->obd_req_replay_queue, &abort_list); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); cfs_list_for_each_entry_safe(req, n, &abort_list, rq_list) { DEBUG_REQ(D_WARNING, req, "aborted:"); req->rq_status = -ENOTCONN; @@ -1241,9 +1241,9 @@ static void abort_lock_replay_queue(struct obd_device *obd) cfs_list_t abort_list; CFS_INIT_LIST_HEAD(&abort_list); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); cfs_list_splice_init(&obd->obd_lock_replay_queue, &abort_list); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); cfs_list_for_each_entry_safe(req, n, &abort_list, rq_list){ DEBUG_REQ(D_ERROR, req, "aborted:"); req->rq_status = -ENOTCONN; @@ -1272,17 +1272,19 @@ void target_cleanup_recovery(struct obd_device *obd) ENTRY; CFS_INIT_LIST_HEAD(&clean_list); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_dev_lock); if (!obd->obd_recovering) { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_dev_lock); EXIT; return; } obd->obd_recovering = obd->obd_abort_recovery = 0; - target_cancel_recovery_timer(obd); + cfs_spin_unlock(&obd->obd_dev_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); + target_cancel_recovery_timer(obd); cfs_list_splice_init(&obd->obd_req_replay_queue, &clean_list); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); cfs_list_for_each_entry_safe(req, n, &clean_list, rq_list) { LASSERT(req->rq_reply_state == 0); @@ -1290,10 +1292,10 @@ void target_cleanup_recovery(struct obd_device *obd) target_request_copy_put(req); } - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); cfs_list_splice_init(&obd->obd_lock_replay_queue, &clean_list); cfs_list_splice_init(&obd->obd_final_req_queue, &clean_list); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); cfs_list_for_each_entry_safe(req, n, &clean_list, rq_list){ LASSERT(req->rq_reply_state == 0); @@ -1303,7 +1305,7 @@ void target_cleanup_recovery(struct obd_device *obd) EXIT; } -/* obd_processing_task_lock should be held */ +/* obd_recovery_task_lock should be held */ void target_cancel_recovery_timer(struct obd_device *obd) { CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name); @@ -1318,9 +1320,9 @@ static void reset_recovery_timer(struct obd_device *obd, int duration, cfs_time_t now = cfs_time_current_sec(); cfs_duration_t left; - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (!obd->obd_recovering || obd->obd_abort_recovery) { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); return; } @@ -1343,21 +1345,21 @@ static void reset_recovery_timer(struct obd_device *obd, int duration, left = cfs_time_sub(obd->obd_recovery_end, now); cfs_timer_arm(&obd->obd_recovery_timer, cfs_time_shift(left)); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); CDEBUG(D_HA, "%s: recovery timer will expire in %u seconds\n", obd->obd_name, (unsigned)left); } static void check_and_start_recovery_timer(struct obd_device *obd) { - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (cfs_timer_is_armed(&obd->obd_recovery_timer)) { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); return; } CDEBUG(D_HA, "%s: starting recovery timer\n", obd->obd_name); obd->obd_recovery_start = cfs_time_current_sec(); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); reset_recovery_timer(obd, obd->obd_recovery_timeout, 0); } @@ -1451,8 +1453,8 @@ static int check_for_next_transno(struct obd_device *obd) int wake_up = 0, connected, completed, queue_len; __u64 next_transno, req_transno; ENTRY; - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (!cfs_list_empty(&obd->obd_req_replay_queue)) { req = cfs_list_entry(obd->obd_req_replay_queue.next, struct ptlrpc_request, rq_list); @@ -1507,7 +1509,7 @@ static int check_for_next_transno(struct obd_device *obd) obd->obd_next_recovery_transno = req_transno; wake_up = 1; } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); return wake_up; } @@ -1515,7 +1517,7 @@ static int check_for_next_lock(struct obd_device *obd) { int wake_up = 0; - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (!cfs_list_empty(&obd->obd_lock_replay_queue)) { CDEBUG(D_HA, "waking for next lock\n"); wake_up = 1; @@ -1529,7 +1531,7 @@ static int check_for_next_lock(struct obd_device *obd) CDEBUG(D_HA, "waking for expired recovery\n"); wake_up = 1; } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); return wake_up; } @@ -1543,38 +1545,33 @@ static int target_recovery_overseer(struct obd_device *obd, int (*check_routine)(struct obd_device *), int (*health_check)(struct obd_export *)) { - int abort = 0, expired = 1; - - do { - cfs_wait_event(obd->obd_next_transno_waitq, check_routine(obd)); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - abort = obd->obd_abort_recovery; - expired = obd->obd_recovery_expired; +repeat: + cfs_wait_event(obd->obd_next_transno_waitq, check_routine(obd)); + if (obd->obd_abort_recovery) { + CWARN("recovery is aborted, evict exports in recovery\n"); + /** evict exports which didn't finish recovery yet */ + class_disconnect_stale_exports(obd, exp_finished); + return 1; + } else if (obd->obd_recovery_expired) { obd->obd_recovery_expired = 0; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort) { - CWARN("recovery is aborted, evict exports in recovery\n"); - /** evict exports which didn't finish recovery yet */ - class_disconnect_stale_exports(obd, exp_finished); - } else if (expired) { - /** If some clients died being recovered, evict them */ - CDEBUG(D_WARNING, "recovery is timed out, evict stale exports\n"); - /** evict cexports with no replay in queue, they are stalled */ - class_disconnect_stale_exports(obd, health_check); - /** continue with VBR */ - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_version_recov = 1; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); - /** - * reset timer, recovery will proceed with versions now, - * timeout is set just to handle reconnection delays - */ - reset_recovery_timer(obd, RECONNECT_DELAY_MAX, 1); - /** Wait for recovery events again, after evicting bad clients */ - } - } while (!abort && expired); - - return abort; + /** If some clients died being recovered, evict them */ + CDEBUG(D_WARNING, + "recovery is timed out, evict stale exports\n"); + /** evict cexports with no replay in queue, they are stalled */ + class_disconnect_stale_exports(obd, health_check); + /** continue with VBR */ + cfs_spin_lock(&obd->obd_dev_lock); + obd->obd_version_recov = 1; + cfs_spin_unlock(&obd->obd_dev_lock); + /** + * reset timer, recovery will proceed with versions now, + * timeout is set just to handle reconnection delays + */ + reset_recovery_timer(obd, RECONNECT_DELAY_MAX, 1); + /** Wait for recovery events again, after evicting bad clients */ + goto repeat; + } + return 0; } static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd) @@ -1591,15 +1588,15 @@ static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd) abort_lock_replay_queue(obd); } - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (!cfs_list_empty(&obd->obd_req_replay_queue)) { req = cfs_list_entry(obd->obd_req_replay_queue.next, struct ptlrpc_request, rq_list); cfs_list_del_init(&req->rq_list); obd->obd_requests_queued_for_recovery--; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); } else { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); LASSERT(cfs_list_empty(&obd->obd_req_replay_queue)); LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients) == 0); /** evict exports failed VBR */ @@ -1617,14 +1614,14 @@ static struct ptlrpc_request *target_next_replay_lock(struct obd_device *obd) exp_lock_replay_healthy)) abort_lock_replay_queue(obd); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (!cfs_list_empty(&obd->obd_lock_replay_queue)) { req = cfs_list_entry(obd->obd_lock_replay_queue.next, struct ptlrpc_request, rq_list); cfs_list_del_init(&req->rq_list); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); } else { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); LASSERT(cfs_list_empty(&obd->obd_lock_replay_queue)); LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients) == 0); /** evict exports failed VBR */ @@ -1637,18 +1634,20 @@ static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd) { struct ptlrpc_request *req = NULL; - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (!cfs_list_empty(&obd->obd_final_req_queue)) { req = cfs_list_entry(obd->obd_final_req_queue.next, struct ptlrpc_request, rq_list); cfs_list_del_init(&req->rq_list); + cfs_spin_unlock(&obd->obd_recovery_task_lock); if (req->rq_export->exp_in_recovery) { cfs_spin_lock(&req->rq_export->exp_lock); req->rq_export->exp_in_recovery = 0; cfs_spin_unlock(&req->rq_export->exp_lock); } + } else { + cfs_spin_unlock(&obd->obd_recovery_task_lock); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); return req; } @@ -1731,7 +1730,9 @@ static int target_recovery_thread(void *arg) cfs_curproc_pid()); trd->trd_processing_task = cfs_curproc_pid(); + cfs_spin_lock(&obd->obd_dev_lock); obd->obd_recovering = 1; + cfs_spin_unlock(&obd->obd_dev_lock); cfs_complete(&trd->trd_starting); /* first of all, we have to know the first transno to replay */ @@ -1743,7 +1744,6 @@ static int target_recovery_thread(void *arg) /* next stage: replay requests */ delta = jiffies; - obd->obd_req_replaying = 1; CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n", cfs_atomic_read(&obd->obd_req_replay_clients), obd->obd_next_recovery_transno); @@ -1758,9 +1758,9 @@ static int target_recovery_thread(void *arg) * bz18031: increase next_recovery_transno before * target_request_copy_put() will drop exp_rpc reference */ - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); obd->obd_next_recovery_transno++; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); target_exp_dequeue_req_replay(req); target_request_copy_put(req); obd->obd_replayed_requests++; @@ -1790,10 +1790,12 @@ static int target_recovery_thread(void *arg) lut_boot_epoch_update(lut); /* We drop recoverying flag to forward all new requests * to regular mds_handle() since now */ - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_dev_lock); obd->obd_recovering = obd->obd_abort_recovery = 0; + cfs_spin_unlock(&obd->obd_dev_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); target_cancel_recovery_timer(obd); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); while ((req = target_next_final_ping(obd))) { LASSERT(trd->trd_processing_task == cfs_curproc_pid()); DEBUG_REQ(D_HA, req, "processing final ping from %s: ", @@ -1842,19 +1844,17 @@ static int target_start_recovery_thread(struct lu_target *lut, void target_stop_recovery_thread(struct obd_device *obd) { - cfs_spin_lock_bh(&obd->obd_processing_task_lock); if (obd->obd_recovery_data.trd_processing_task > 0) { struct target_recovery_data *trd = &obd->obd_recovery_data; /** recovery can be done but postrecovery is not yet */ + cfs_spin_lock(&obd->obd_dev_lock); if (obd->obd_recovering) { CERROR("%s: Aborting recovery\n", obd->obd_name); obd->obd_abort_recovery = 1; cfs_waitq_signal(&obd->obd_next_transno_waitq); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_dev_lock); cfs_wait_for_completion(&trd->trd_finishing); - } else { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); } } @@ -1875,10 +1875,8 @@ static void target_recovery_expired(unsigned long castmeharder) cfs_time_current_sec()- obd->obd_recovery_start, obd->obd_connected_clients); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_recovery_expired = 1; cfs_waitq_signal(&obd->obd_next_transno_waitq); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); } void target_recovery_init(struct lu_target *lut, svc_handler_t handler) @@ -1917,7 +1915,6 @@ static int target_process_req_flags(struct obd_device *obd, LASSERT(exp != NULL); if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) { /* client declares he's ready to replay locks */ - cfs_spin_lock_bh(&obd->obd_processing_task_lock); if (exp->exp_req_replay_needed) { LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients) > 0); @@ -1926,12 +1923,10 @@ static int target_process_req_flags(struct obd_device *obd, cfs_spin_unlock(&exp->exp_lock); cfs_atomic_dec(&obd->obd_req_replay_clients); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); } if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) { /* client declares he's ready to complete recovery * so, we put the request on th final queue */ - cfs_spin_lock_bh(&obd->obd_processing_task_lock); if (exp->exp_lock_replay_needed) { LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients) > 0); @@ -1940,9 +1935,7 @@ static int target_process_req_flags(struct obd_device *obd, cfs_spin_unlock(&exp->exp_lock); cfs_atomic_dec(&obd->obd_lock_replay_clients); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); } - return 0; } @@ -1966,35 +1959,35 @@ int target_queue_recovery_request(struct ptlrpc_request *req, * so, we put the request on th final queue */ target_request_copy_get(req); DEBUG_REQ(D_HA, req, "queue final req"); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); cfs_waitq_signal(&obd->obd_next_transno_waitq); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (obd->obd_recovering) { cfs_list_add_tail(&req->rq_list, &obd->obd_final_req_queue); } else { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); target_request_copy_put(req); RETURN(obd->obd_stopping ? -ENOTCONN : 1); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); RETURN(0); } if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) { /* client declares he's ready to replay locks */ target_request_copy_get(req); DEBUG_REQ(D_HA, req, "queue lock replay req"); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); cfs_waitq_signal(&obd->obd_next_transno_waitq); + cfs_spin_lock(&obd->obd_recovery_task_lock); LASSERT(obd->obd_recovering); /* usually due to recovery abort */ if (!req->rq_export->exp_in_recovery) { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); target_request_copy_put(req); RETURN(-ENOTCONN); } LASSERT(req->rq_export->exp_lock_replay_needed); cfs_list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); RETURN(0); } @@ -2008,8 +2001,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req, RETURN(1); } - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - /* If we're processing the queue, we want don't want to queue this * message. * @@ -2020,37 +2011,36 @@ int target_queue_recovery_request(struct ptlrpc_request *req, * Also, a resent, replayed request that has already been * handled will pass through here and be processed immediately. */ - CWARN("Next recovery transno: "LPU64", current: "LPU64", replaying: %i\n", - obd->obd_next_recovery_transno, transno, obd->obd_req_replaying); - if (transno < obd->obd_next_recovery_transno && obd->obd_req_replaying) { + CWARN("Next recovery transno: "LPU64", current: "LPU64", replaying\n", + obd->obd_next_recovery_transno, transno); + cfs_spin_lock(&obd->obd_recovery_task_lock); + if (transno < obd->obd_next_recovery_transno) { /* Processing the queue right now, don't re-add. */ LASSERT(cfs_list_empty(&req->rq_list)); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); RETURN(1); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); if (OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP)) RETURN(0); target_request_copy_get(req); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - LASSERT(obd->obd_recovering); if (!req->rq_export->exp_in_recovery) { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); target_request_copy_put(req); RETURN(-ENOTCONN); } LASSERT(req->rq_export->exp_req_replay_needed); if (target_exp_enqueue_req_replay(req)) { - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); DEBUG_REQ(D_ERROR, req, "dropping resent queued req"); target_request_copy_put(req); RETURN(0); } /* XXX O(n^2) */ + cfs_spin_lock(&obd->obd_recovery_task_lock); + LASSERT(obd->obd_recovering); cfs_list_for_each(tmp, &obd->obd_req_replay_queue) { struct ptlrpc_request *reqiter = cfs_list_entry(tmp, struct ptlrpc_request, rq_list); @@ -2065,7 +2055,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, transno)) { DEBUG_REQ(D_ERROR, req, "dropping replay: transno " "has been claimed by another client"); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); target_exp_dequeue_req_replay(req); target_request_copy_put(req); RETURN(0); @@ -2076,8 +2066,8 @@ int target_queue_recovery_request(struct ptlrpc_request *req, cfs_list_add_tail(&req->rq_list, &obd->obd_req_replay_queue); obd->obd_requests_queued_for_recovery++; + cfs_spin_unlock(&obd->obd_recovery_task_lock); cfs_waitq_signal(&obd->obd_next_transno_waitq); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); RETURN(0); } diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 2c9ad6d..8b665d5 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -1058,7 +1058,9 @@ static int mdd_recovery_complete(const struct lu_env *env, OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC), NULL); /* Drop obd_recovering to 0 and call o_postrecov to recover mds_lov */ + cfs_spin_lock(&obd->obd_dev_lock); obd->obd_recovering = 0; + cfs_spin_unlock(&obd->obd_dev_lock); obd->obd_type->typ_dt_ops->o_postrecov(obd); /* XXX: orphans handling. */ diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 9638327..1e16f38 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -145,7 +145,9 @@ int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd, LBUG(); } + cfs_spin_lock(&obd->obd_dev_lock); obd->obd_recovering = 1; + cfs_spin_unlock(&obd->obd_dev_lock); obd->u.mds.mds_id = mds_id; rc = class_setup(obd, lcfg); if (rc) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index c47af49..d748f76 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -5232,9 +5232,9 @@ static void mdt_allow_cli(struct mdt_device *m, unsigned int flag) /* Open for clients */ if (obd->obd_no_conn) { - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_dev_lock); obd->obd_no_conn = 0; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_dev_lock); } } } diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index d61104e..d41576c 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -1079,7 +1079,7 @@ void class_export_recovery_cleanup(struct obd_export *exp) { struct obd_device *obd = exp->exp_obd; - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); if (exp->exp_delayed) obd->obd_delayed_clients--; if (obd->obd_recovering && exp->exp_in_recovery) { @@ -1089,6 +1089,7 @@ void class_export_recovery_cleanup(struct obd_export *exp) LASSERT(obd->obd_connected_clients); obd->obd_connected_clients--; } + cfs_spin_unlock(&obd->obd_recovery_task_lock); /** Cleanup req replay fields */ if (exp->exp_req_replay_needed) { cfs_spin_lock(&exp->exp_lock); @@ -1105,7 +1106,6 @@ void class_export_recovery_cleanup(struct obd_export *exp) LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients)); cfs_atomic_dec(&obd->obd_lock_replay_clients); } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); } /* This function removes 1-3 references from the export: diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 9ebd6a5..f967cd2 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -329,7 +329,7 @@ int class_attach(struct lustre_cfg *lcfg) cfs_init_rwsem(&obd->obd_observer_link_sem); /* recovery data */ cfs_init_timer(&obd->obd_recovery_timer); - cfs_spin_lock_init(&obd->obd_processing_task_lock); + cfs_spin_lock_init(&obd->obd_recovery_task_lock); cfs_waitq_init(&obd->obd_next_transno_waitq); cfs_waitq_init(&obd->obd_evict_inprogress_waitq); CFS_INIT_LIST_HEAD(&obd->obd_req_replay_queue); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 561872a..69e06be 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -881,9 +881,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) exp->exp_connecting = 0; exp->exp_in_recovery = 0; cfs_spin_unlock(&exp->exp_lock); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_max_recoverable_clients++; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); class_export_put(exp); if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno)) @@ -2042,6 +2040,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, CFS_INIT_LIST_HEAD(&filter->fo_export_list); cfs_sema_init(&filter->fo_alloc_lock, 1); init_brw_stats(&filter->fo_filter_stats); + cfs_spin_lock_init(&filter->fo_flags_lock); filter->fo_read_cache = 1; /* enable read-only cache by default */ filter->fo_writethrough_cache = 1; /* enable writethrough cache */ filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE; @@ -2511,9 +2510,9 @@ static int filter_llog_connect(struct obd_export *exp, obd->obd_name, body->lgdc_logid.lgl_oid, body->lgdc_logid.lgl_oseq, body->lgdc_logid.lgl_ogen); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->u.filter.fo_flags_lock); obd->u.filter.fo_mds_ost_sync = 1; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->u.filter.fo_flags_lock); rc = llog_connect(ctxt, &body->lgdc_logid, &body->lgdc_gen, NULL); llog_ctxt_put(ctxt); diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index f62ee93..7670b39 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -267,9 +267,9 @@ int filter_recov_log_mds_ost_cb(struct llog_handle *llh, RETURN(LLOG_PROC_BREAK); if (rec == NULL) { - cfs_spin_lock_bh(&ctxt->loc_obd->obd_processing_task_lock); + cfs_spin_lock(&ctxt->loc_obd->u.filter.fo_flags_lock); ctxt->loc_obd->u.filter.fo_mds_ost_sync = 0; - cfs_spin_unlock_bh(&ctxt->loc_obd->obd_processing_task_lock); + cfs_spin_unlock(&ctxt->loc_obd->u.filter.fo_flags_lock); RETURN(0); } diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index 99bc28c..b13fa91 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -288,9 +288,9 @@ static int lprocfs_filter_wr_cache(struct file *file, const char *buffer, if (rc) return rc; - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - obd->u.filter.fo_read_cache = val; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock_bh(&obd->u.filter.fo_flags_lock); + obd->u.filter.fo_read_cache = !!val; + cfs_spin_unlock_bh(&obd->u.filter.fo_flags_lock); return count; } @@ -315,9 +315,9 @@ static int lprocfs_filter_wr_wcache(struct file *file, const char *buffer, if (rc) return rc; - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - obd->u.filter.fo_writethrough_cache = val; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->u.filter.fo_flags_lock); + obd->u.filter.fo_writethrough_cache = !!val; + cfs_spin_unlock(&obd->u.filter.fo_flags_lock); return count; } @@ -348,9 +348,9 @@ int lprocfs_filter_wr_degraded(struct file *file, const char *buffer, if (rc) return rc; - cfs_spin_lock(&obd->obd_osfs_lock); + cfs_spin_lock(&obd->u.filter.fo_flags_lock); obd->u.filter.fo_raid_degraded = !!val; - cfs_spin_unlock(&obd->obd_osfs_lock); + cfs_spin_unlock(&obd->u.filter.fo_flags_lock); return count; } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index b549391..36319d8 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -2622,13 +2622,9 @@ static int ost_cleanup(struct obd_device *obd) ping_evictor_stop(); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_recovering) { - target_cancel_recovery_timer(obd); - obd->obd_recovering = 0; - } - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); - + /* there is no recovery for OST OBD, all recovery is controlled by + * obdfilter OBD */ + LASSERT(obd->obd_recovering == 0); cfs_down(&ost->ost_health_sem); ptlrpc_unregister_service(ost->ost_service); ptlrpc_unregister_service(ost->ost_create_service); diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index 245c9dc..d402923 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -122,9 +122,9 @@ static void obt_boot_epoch_update(struct lu_target *lut) cfs_spin_unlock(&lut->lut_translock); CFS_INIT_LIST_HEAD(&client_list); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); cfs_list_splice_init(&obd->obd_final_req_queue, &client_list); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); /** * go through list of exports participated in recovery and @@ -135,9 +135,9 @@ static void obt_boot_epoch_update(struct lu_target *lut) obt_client_epoch_update(req->rq_export); } /** return list back at once */ - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); cfs_list_splice_init(&client_list, &obd->obd_final_req_queue); - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->obd_recovery_task_lock); obt_server_data_update(lut, 1); } @@ -308,9 +308,9 @@ void lut_boot_epoch_update(struct lu_target *lut) * The recovery is not yet finished and final queue can still be updated * with resend requests. Move final list to separate one for processing */ - cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); + cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock); cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list); - cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock); + cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock); /** * go through list of exports participated in recovery and @@ -322,9 +322,9 @@ void lut_boot_epoch_update(struct lu_target *lut) lut_client_epoch_update(&env, req->rq_export); } /** return list back at once */ - cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); + cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock); cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue); - cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock); + cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock); /** update server epoch */ lut_server_data_update(&env, lut, 1); lu_env_fini(&env); -- 1.8.3.1