From 2d19fdc6b4e4c7968fe0f89579bacd08ed97299d Mon Sep 17 00:00:00 2001 From: tianzy Date: Thu, 11 Sep 2008 14:51:38 +0000 Subject: [PATCH] Branch b1_8_gate This patch includes att18982, att18236, att18237,att18983 and att19061 in bz14840. Slove "quota recovery deadlock during mds failover", it includes: 1. fix osts hang when mds does failover with quotaon 2. prevent watchdog storm when osts threads wait for the recovery of mds b=14840 i=johann i=shadow i=panda --- lustre/quota/quota_context.c | 110 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 88 insertions(+), 22 deletions(-) diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index aefddf1..6d2bbaf 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -236,6 +236,13 @@ check_cur_qunit(struct obd_device *obd, if (!sb_any_quota_enabled(sb)) RETURN(0); + spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_valid){ + spin_unlock(&qctxt->lqc_lock); + RETURN(0); + } + spin_unlock(&qctxt->lqc_lock); + OBD_ALLOC_PTR(qctl); if (qctl == NULL) RETURN(-ENOMEM); @@ -453,6 +460,7 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) struct list_head *head; LASSERT(list_empty(&qunit->lq_hash)); + qunit_get(qunit); head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data); list_add(&qunit->lq_hash, head); QUNIT_SET_STATE(qunit, QUNIT_IN_HASH); @@ -485,6 +493,7 @@ static void remove_qunit_nolock(struct lustre_qunit *qunit) list_del_init(&qunit->lq_hash); QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH); + qunit_put(qunit); } #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \ @@ -501,7 +510,8 @@ is_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, static int schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait); + struct qunit_data *qdata, int opc, int wait, + struct obd_trans_info *oti); static int dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, @@ -649,7 +659,7 @@ out: if (rc1 > 0) { int opc; opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0); + rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL); QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1); } RETURN(err); @@ -660,6 +670,10 @@ struct dqacq_async_args { struct lustre_qunit *aa_qunit; }; +#define QUOTA_NOSENT(rc) \ + (rc == -EIO || rc == -EINTR || rc == -ENOTCONN || \ + rc == -ETIMEDOUT || rc == -EWOULDBLOCK) + static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc) { struct dqacq_async_args *aa = (struct dqacq_async_args *)data; @@ -679,14 +693,13 @@ static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc) if (!qdata) RETURN(-ENOMEM); - if (rc == -EIO || rc == -EINTR || rc == -ENOTCONN ) - /* if a quota req timeouts or is dropped, we should update quota - * statistics which will be handled in dqacq_completion. And in - * this situation we should get qdata from request instead of - * reply */ - rc1 = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT); - else - rc1 = quota_get_qdata(req, qdata, QUOTA_REPLY, QUOTA_IMPORT); + /* if a quota req timeouts or is dropped, we should update quota + * statistics which will be handled in dqacq_completion. And in + * this situation we should get qdata from request instead of + * reply */ + rc1 = quota_get_qdata(req, qdata, + QUOTA_NOSENT(rc) ? QUOTA_REQUEST : QUOTA_REPLY, + QUOTA_IMPORT); if (rc1 < 0) { DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data\n"); GOTO(exit, rc = -EPROTO); @@ -729,6 +742,20 @@ exit: RETURN(rc); } +/* check if quota master is online */ +int check_qm(struct lustre_quota_ctxt *qctxt) +{ + int rc; + ENTRY; + + spin_lock(&qctxt->lqc_lock); + /* quit waiting when mds is back or qctxt is cleaned up */ + rc = qctxt->lqc_import || !qctxt->lqc_valid; + spin_unlock(&qctxt->lqc_lock); + + RETURN(rc); +} + static int got_qunit(struct lustre_qunit *qunit) { int rc; @@ -753,7 +780,8 @@ static int got_qunit(struct lustre_qunit *qunit) static int schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait) + struct qunit_data *qdata, int opc, int wait, + struct obd_trans_info *oti) { struct lustre_qunit *qunit, *empty; struct l_wait_info lwi = { 0 }; @@ -779,7 +807,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, if (wait) qunit_get(qunit); spin_unlock(&qunit_hash_lock); - free_qunit(empty); + qunit_put(empty); goto wait_completion; } @@ -840,6 +868,23 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, wake_up(&qunit->lq_waitq); qunit_put(qunit); + spin_lock(&qctxt->lqc_lock); + if (wait && !qctxt->lqc_import) { + spin_unlock(&qctxt->lqc_lock); + + LASSERT(oti && oti->oti_thread && + oti->oti_thread->t_watchdog); + + lc_watchdog_disable(oti->oti_thread->t_watchdog); + CDEBUG(D_QUOTA, "sleep for quota master\n"); + l_wait_event(qctxt->lqc_wait_for_qmaster, + check_qm(qctxt), &lwi); + CDEBUG(D_QUOTA, "wake up when quota master is back\n"); + lc_watchdog_touch(oti->oti_thread->t_watchdog); + } else { + spin_unlock(&qctxt->lqc_lock); + } + RETURN(-EAGAIN); } imp = class_import_get(qctxt->lqc_import); @@ -861,9 +906,11 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT); if (rc < 0) { CDEBUG(D_ERROR, "Can't pack qunit_data\n"); + class_import_put(imp); RETURN(-EPROTO); } ptlrpc_req_set_repsize(req, 2, size); + req->rq_no_resend = req->rq_no_delay = 1; class_import_put(imp); if (wait && qunit) @@ -917,9 +964,10 @@ wait_completion: int qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - uid_t uid, gid_t gid, __u32 isblk, int wait) + uid_t uid, gid_t gid, __u32 isblk, int wait, + struct obd_trans_info *oti) { - int ret, rc = 0, i = USRQUOTA; + int rc = 0, i = USRQUOTA; __u32 id[MAXQUOTAS] = { uid, gid }; struct qunit_data qdata[MAXQUOTAS]; ENTRY; @@ -935,18 +983,21 @@ qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, QDATA_SET_BLK(&qdata[i]); qdata[i].qd_count = 0; - ret = check_cur_qunit(obd, qctxt, &qdata[i]); - if (ret > 0) { + rc = check_cur_qunit(obd, qctxt, &qdata[i]); + if (rc > 0) { int opc; /* need acquire or release */ - opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - ret = schedule_dqacq(obd, qctxt, &qdata[i], opc, wait); - if (!rc) - rc = ret; + opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL; + rc = schedule_dqacq(obd, qctxt, &qdata[i], opc, + wait,oti); + if (rc < 0) + RETURN(rc); } else if (wait == 1) { /* when wait equates 1, that means mds_quota_acquire * or filter_quota_acquire is calling it. */ - qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk); + rc = qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk); + if (rc < 0) + RETURN(rc); } } @@ -1028,6 +1079,7 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) qctxt->lqc_import = NULL; qctxt->lqc_recovery = 0; qctxt->lqc_switch_qs = 1; /* Change qunit size in default setting */ + qctxt->lqc_valid = 1; qctxt->lqc_cqs_boundary_factor = 4; qctxt->lqc_cqs_least_bunit = PTLRPC_MAX_BRW_SIZE; qctxt->lqc_cqs_least_iunit = 2; @@ -1047,6 +1099,7 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) obd->obd_name); lustre_hash_exit(&LQC_HASH_BODY(qctxt)); } + cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); spin_unlock(&qctxt->lqc_lock); #ifdef LPROCFS @@ -1066,6 +1119,10 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) INIT_LIST_HEAD(&tmp_list); + spin_lock(&qctxt->lqc_lock); + qctxt->lqc_valid = 0; + spin_unlock(&qctxt->lqc_lock); + spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) { list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) { @@ -1088,6 +1145,14 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) } lustre_hash_exit(&LQC_HASH_BODY(qctxt)); + /* after qctxt_cleanup, qctxt might be freed, then check_qm() is + * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ + while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { + cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster); + cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, + cfs_time_seconds(1)); + } + ptlrpcd_decref(); #ifdef LPROCFS @@ -1163,7 +1228,8 @@ static int qslave_recovery_main(void *arg) if (ret > 0) { int opc; opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - rc = schedule_dqacq(obd, qctxt, &qdata, opc, 0); + rc = schedule_dqacq(obd, qctxt, &qdata, opc, + 0, NULL); if (rc == -EDQUOT) rc = 0; } else { -- 1.8.3.1