This patch includes att18982, att18236, att18237,att18983 and att19061 in bz14840.
Slove "quota recovery deadlock during mds failover", it includes:
1. fix osts hang when mds does failover with quotaon
2. prevent watchdog storm when osts threads wait for the
recovery of mds
b=14840
i=johann
i=shadow
i=panda
Details : Apply the MGS_CONNECT_SUPPORTED mask at reconnect time so
the connect flags are properly negotiated.
+Severity : major
+Bugzilla : 14840
+Description: quota recovery deadlock during mds failover
+Details : This patch includes att18982, att18236, att18237 in bz14840.
+ Slove the problems:
+ 1. fix osts hang when mds does failover with quotaon
+ 2. prevent watchdog storm when osts threads wait for the
+ recovery of mds
+
--------------------------------------------------------------------------
2008-08-31 Sun Microsystems, Inc.
__u32 t_flags;
unsigned int t_id; /* service thread index, from ptlrpc_start_threads */
+ struct lc_watchdog *t_watchdog; /* put watchdog in the structure per
+ * thread b=14840 */
cfs_waitq_t t_ctl_waitq;
};
unsigned long lqc_recovery:1, /* Doing recovery */
lqc_switch_qs:1, /* the function of change qunit size
* 0:Off, 1:On */
+ lqc_valid:1, /* this qctxt is valid or not */
lqc_setup:1; /* tell whether of not quota_type has
* been processed, so that the master
* knows when it can start processing
* seconds must be waited between
* enlarging and shinking qunit */
spinlock_t lqc_lock; /* guard lqc_imp_valid now */
+ cfs_waitq_t lqc_wait_for_qmaster; /* when mds isn't connected, threads
+ * on osts who send the quota reqs
+ * with wait==1 will be put here
+ * b=14840 */
struct proc_dir_entry *lqc_proc_dir;
struct lprocfs_stats *lqc_stats; /* lquota statistics */
};
atomic_t *qta_sem; /* obt_quotachecking */
};
-typedef int (*quota_acquire)(struct obd_device *obd,
- unsigned int uid, unsigned int gid);
+struct obd_trans_info;
+typedef int (*quota_acquire)(struct obd_device *obd, unsigned int uid,
+ unsigned int gid, struct obd_trans_info *oti);
typedef struct {
int (*quota_init) (void);
int (*quota_getflag) (struct obd_device *, struct obdo *);
/* For quota slave, acquire/release quota from master if needed */
- int (*quota_acquire) (struct obd_device *, unsigned int, unsigned int);
+ int (*quota_acquire) (struct obd_device *, unsigned int, unsigned int,
+ struct obd_trans_info *);
/* For quota slave, check whether specified uid/gid's remaining quota
* can finish a block_write or inode_create rpc. It updates the pending
* record of block and inode, acquires quota if necessary */
int (*quota_chkquota) (struct obd_device *, unsigned int, unsigned int,
- int, int *, quota_acquire);
+ int, int *, quota_acquire,
+ struct obd_trans_info *);
/* For quota client, poll if the quota check done */
int (*quota_poll_check) (struct obd_export *, struct if_quotacheck *);
static inline int lquota_acquire(quota_interface_t *interface,
struct obd_device *obd,
- unsigned int uid, unsigned int gid)
+ unsigned int uid, unsigned int gid,
+ struct obd_trans_info *oti)
{
int rc;
ENTRY;
QUOTA_CHECK_OP(interface, acquire);
- rc = QUOTA_OP(interface, acquire)(obd, uid, gid);
+ rc = QUOTA_OP(interface, acquire)(obd, uid, gid, oti);
RETURN(rc);
}
static inline int lquota_chkquota(quota_interface_t *interface,
struct obd_device *obd,
- unsigned int uid, unsigned int gid,
- int count, int *flag)
+ unsigned int uid, unsigned int gid, int count,
+ int *flag, struct obd_trans_info *oti)
{
int rc;
ENTRY;
QUOTA_CHECK_OP(interface, chkquota);
QUOTA_CHECK_OP(interface, acquire);
rc = QUOTA_OP(interface, chkquota)(obd, uid, gid, count, flag,
- QUOTA_OP(interface, acquire));
+ QUOTA_OP(interface, acquire), oti);
RETURN(rc);
}
int oti_numcookies;
/* initial thread handling transaction */
- int oti_thread_id;
+ struct ptlrpc_thread * oti_thread;
__u32 oti_conn_cnt;
struct obd_uuid *oti_ost_uuid;
if (req->rq_repmsg != NULL)
oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
- oti->oti_thread_id = req->rq_svc_thread ? req->rq_svc_thread->t_id : -1;
+ oti->oti_thread = req->rq_svc_thread;
if (req->rq_reqmsg != NULL)
oti->oti_conn_cnt = lustre_msg_get_conn_cnt(req->rq_reqmsg);
}
}
/* we use the observer */
- LASSERT(obd->obd_observer && obd->obd_observer->obd_observer);
+ if (!obd->obd_observer || !obd->obd_observer->obd_observer) {
+ CERROR("Can't find the observer, it is recovering\n");
+ req->rq_status = -EIO;
+ GOTO(send_reply, rc = -EIO);
+ }
+
master_obd = obd->obd_observer->obd_observer;
qctxt = &master_obd->u.obt.obt_qctxt;
/* Block the quota req. b=14840 */
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_BLOCK_QUOTA_REQ, obd_timeout);
-
+ send_reply:
rc = ptlrpc_reply(req);
out:
OBD_FREE(qdata, sizeof(struct qunit_data));
gid = current->fsgid;
/* we try to get enough quota to write here, and let ldiskfs
- * decide if it is out of quota or not b=14783 */
+ * decide if it is out of quota or not b=14783
+ * FIXME: after CMD is used, pointer to obd_trans_info* couldn't
+ * be NULL, b=14840 */
lquota_chkquota(mds_quota_interface_ref, obd,
- current->fsuid, gid, 1, &rec_pending);
+ current->fsuid, gid, 1, &rec_pending, NULL);
intent_set_disposition(rep, DISP_OPEN_CREATE);
handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
gid = current->fsgid;
/* we try to get enough quota to write here, and let ldiskfs
- * decide if it is out of quota or not b=14783 */
+ * decide if it is out of quota or not b=14783
+ * FIXME: after CMD is used, pointer to obd_trans_info* couldn't
+ * be NULL, b=14840 */
lquota_chkquota(mds_quota_interface_ref, obd,
- current->fsuid, gid, 1, &rec_pending);
+ current->fsuid, gid, 1, &rec_pending, NULL);
switch (type) {
case S_IFREG:{
{
struct obd_device *obd = class_exp2obd(exp);
struct echo_client_obd *ec = &obd->u.echo_client;
- struct obd_trans_info dummy_oti = { .oti_thread_id = -1 };
+ struct obd_trans_info dummy_oti = { .oti_thread = NULL };
struct ec_object *eco;
int rc;
ENTRY;
* If we haven't allocated a pool entry for this thread before, do so now. */
void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti)
{
- int thread_id = oti ? oti->oti_thread_id : -1;
+ int thread_id = (oti && oti->oti_thread) ?
+ oti->oti_thread->t_id : -1;
struct filter_iobuf *pool = NULL;
struct filter_iobuf **pool_place = NULL;
void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf,
struct obd_trans_info *oti)
{
- int thread_id = oti ? oti->oti_thread_id : -1;
+ int thread_id = (oti && oti->oti_thread) ?
+ oti->oti_thread->t_id : -1;
if (unlikely(thread_id < 0)) {
filter_free_iobuf(iobuf);
void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf,
struct obd_trans_info *oti)
{
- int thread_id = oti ? oti->oti_thread_id : -1;
+ int thread_id = (oti && oti->oti_thread) ?
+ oti->oti_thread->t_id : -1;
if (unlikely(thread_id < 0)) {
filter_free_iobuf(iobuf);
/* we try to get enough quota to write here, and let ldiskfs
* decide if it is out of quota or not b=14783 */
lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
- oa->o_gid, niocount, &rec_pending);
+ oa->o_gid, niocount, &rec_pending, oti);
iobuf = filter_iobuf_get(&obd->u.filter, oti);
if (IS_ERR(iobuf))
struct ptlrpc_thread *thread = data->thread;
struct obd_device *dev = data->dev;
struct ptlrpc_reply_state *rs;
- struct lc_watchdog *watchdog;
#ifdef WITH_GROUP_INFO
struct group_info *ginfo = NULL;
#endif
*/
cfs_waitq_signal(&thread->t_ctl_waitq);
- watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate)) *
- svc->srv_watchdog_factor, NULL, NULL);
+ thread->t_watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
+ at_get(&svc->srv_at_estimate))
+ * svc->srv_watchdog_factor,
+ NULL, NULL);
spin_lock(&svc->srv_lock);
svc->srv_threads_running++;
struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
ptlrpc_retry_rqbds, svc);
- lc_watchdog_disable(watchdog);
+ lc_watchdog_disable(thread->t_watchdog);
cond_resched();
svc->srv_at_check,
&lwi);
- lc_watchdog_touch_ms(watchdog, max_t(int, obd_timeout,
+ lc_watchdog_touch_ms(thread->t_watchdog, max_t(int, obd_timeout,
AT_OFF ? 0 :
at_get(&svc->srv_at_estimate)) *
svc->srv_watchdog_factor);
}
}
- lc_watchdog_delete(watchdog);
+ lc_watchdog_delete(thread->t_watchdog);
+ thread->t_watchdog = NULL;
out_srv_init:
/*
uid = oqaq->qaq_id;
if (rc > 0) {
- rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 0);
- if (rc == -EDQUOT || rc == -EBUSY) {
+ rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 0, NULL);
+ if (rc == -EDQUOT || rc == -EBUSY || rc == -EAGAIN) {
CDEBUG(D_QUOTA, "rc: %d.\n", rc);
rc = 0;
}
if (!sb_any_quota_enabled(sb))
RETURN(0);
+ spin_lock(&qctxt->lqc_lock);
+ if (!qctxt->lqc_valid){
+ spin_unlock(&qctxt->lqc_lock);
+ RETURN(0);
+ }
+ spin_unlock(&qctxt->lqc_lock);
+
OBD_ALLOC_PTR(qctl);
if (qctl == NULL)
RETURN(-ENOMEM);
struct list_head *head;
LASSERT(list_empty(&qunit->lq_hash));
+ qunit_get(qunit);
head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
list_add(&qunit->lq_hash, head);
QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);
list_del_init(&qunit->lq_hash);
QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);
+ qunit_put(qunit);
}
#define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
static int
schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
- struct qunit_data *qdata, int opc, int wait);
+ struct qunit_data *qdata, int opc, int wait,
+ struct obd_trans_info *oti);
static int split_before_schedule_dqacq(struct obd_device *obd,
struct lustre_quota_ctxt *qctxt,
- struct qunit_data *qdata, int opc, int wait)
+ struct qunit_data *qdata, int opc,
+ int wait, struct obd_trans_info *oti)
{
int rc = 0;
unsigned long factor;
tmp_qdata.qd_count = factor;
qdata->qd_count -= tmp_qdata.qd_count;
QDATA_DEBUG((&tmp_qdata), "be split.\n");
- rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
+ rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait, oti);
} else{
QDATA_DEBUG(qdata, "don't be split.\n");
- rc = schedule_dqacq(obd, qctxt, qdata, opc, wait);
+ rc = schedule_dqacq(obd, qctxt, qdata, opc, wait, oti);
}
do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
if (rc1 > 0) {
int opc;
opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
- rc1 = split_before_schedule_dqacq(obd, qctxt, qdata, opc, 0);
+ rc1 = split_before_schedule_dqacq(obd, qctxt, qdata, opc,
+ 0, NULL);
QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1);
}
RETURN(err);
struct lustre_qunit *aa_qunit;
};
+#define QUOTA_NOSENT(rc) \
+ (rc == -EIO || rc == -EINTR || rc == -ENOTCONN || \
+ rc == -ETIMEDOUT || rc == -EWOULDBLOCK)
+
static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
{
struct dqacq_async_args *aa = (struct dqacq_async_args *)data;
if (!qdata)
RETURN(-ENOMEM);
- if (rc == -EIO || rc == -EINTR || rc == -ENOTCONN )
- /* if a quota req timeouts or is dropped, we should update quota
- * statistics which will be handled in dqacq_completion. And in
- * this situation we should get qdata from request instead of
- * reply */
- rc1 = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
- else
- rc1 = quota_get_qdata(req, qdata, QUOTA_REPLY, QUOTA_IMPORT);
+ /* if a quota req timeouts or is dropped, we should update quota
+ * statistics which will be handled in dqacq_completion. And in
+ * this situation we should get qdata from request instead of
+ * reply */
+ rc1 = quota_get_qdata(req, qdata,
+ QUOTA_NOSENT(rc) ? QUOTA_REQUEST : QUOTA_REPLY,
+ QUOTA_IMPORT);
if (rc1 < 0) {
DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data\n");
GOTO(exit, rc = -EPROTO);
RETURN(rc);
}
+/* check if quota master is online */
+int check_qm(struct lustre_quota_ctxt *qctxt)
+{
+ int rc;
+ ENTRY;
+
+ spin_lock(&qctxt->lqc_lock);
+ /* quit waiting when mds is back or qctxt is cleaned up */
+ rc = qctxt->lqc_import || !qctxt->lqc_valid;
+ spin_unlock(&qctxt->lqc_lock);
+
+ RETURN(rc);
+}
+
static int got_qunit(struct lustre_qunit *qunit)
{
int rc;
static int
schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
- struct qunit_data *qdata, int opc, int wait)
+ struct qunit_data *qdata, int opc, int wait,
+ struct obd_trans_info *oti)
{
struct lustre_qunit *qunit, *empty;
struct l_wait_info lwi = { 0 };
if (wait)
qunit_get(qunit);
spin_unlock(&qunit_hash_lock);
- free_qunit(empty);
+ qunit_put(empty);
goto wait_completion;
}
wake_up(&qunit->lq_waitq);
qunit_put(qunit);
+ spin_lock(&qctxt->lqc_lock);
+ if (wait && !qctxt->lqc_import) {
+ spin_unlock(&qctxt->lqc_lock);
+
+ LASSERT(oti && oti->oti_thread &&
+ oti->oti_thread->t_watchdog);
+
+ lc_watchdog_disable(oti->oti_thread->t_watchdog);
+ CDEBUG(D_QUOTA, "sleep for quota master\n");
+ l_wait_event(qctxt->lqc_wait_for_qmaster,
+ check_qm(qctxt), &lwi);
+ CDEBUG(D_QUOTA, "wake up when quota master is back\n");
+ lc_watchdog_touch(oti->oti_thread->t_watchdog);
+ } else {
+ spin_unlock(&qctxt->lqc_lock);
+ }
+
RETURN(-EAGAIN);
}
imp = class_import_get(qctxt->lqc_import);
rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
if (rc < 0) {
CDEBUG(D_ERROR, "Can't pack qunit_data\n");
+ class_import_put(imp);
RETURN(-EPROTO);
}
ptlrpc_req_set_repsize(req, 2, size);
+ req->rq_no_resend = req->rq_no_delay = 1;
class_import_put(imp);
- if (wait && qunit)
+ if (wait && qunit)
qunit_get(qunit);
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
int
qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
- uid_t uid, gid_t gid, __u32 isblk, int wait)
+ uid_t uid, gid_t gid, __u32 isblk, int wait,
+ struct obd_trans_info *oti)
{
- int ret, rc = 0, i = USRQUOTA;
+ int rc = 0, i = USRQUOTA;
__u32 id[MAXQUOTAS] = { uid, gid };
struct qunit_data qdata[MAXQUOTAS];
ENTRY;
QDATA_SET_BLK(&qdata[i]);
qdata[i].qd_count = 0;
- ret = check_cur_qunit(obd, qctxt, &qdata[i]);
- if (ret > 0) {
+ rc = check_cur_qunit(obd, qctxt, &qdata[i]);
+ if (rc > 0) {
int opc;
/* need acquire or release */
- opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
- ret = split_before_schedule_dqacq(obd, qctxt, &qdata[i],
- opc, wait);
- if (!rc)
- rc = ret;
+ opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
+ rc = split_before_schedule_dqacq(obd, qctxt, &qdata[i],
+ opc, wait, oti);
+ if (rc < 0)
+ RETURN(rc);
} else if (wait == 1) {
/* when wait equates 1, that means mds_quota_acquire
* or filter_quota_acquire is calling it. */
- qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
+ rc = qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
+ if (rc < 0)
+ RETURN(rc);
}
}
qctxt->lqc_import = NULL;
qctxt->lqc_recovery = 0;
qctxt->lqc_switch_qs = 1; /* Change qunit size in default setting */
+ qctxt->lqc_valid = 1;
qctxt->lqc_cqs_boundary_factor = 4;
qctxt->lqc_cqs_least_bunit = PTLRPC_MAX_BRW_SIZE;
qctxt->lqc_cqs_least_iunit = 2;
obd->obd_name);
lustre_hash_exit(&LQC_HASH_BODY(qctxt));
}
+ cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
spin_unlock(&qctxt->lqc_lock);
#ifdef LPROCFS
INIT_LIST_HEAD(&tmp_list);
+ spin_lock(&qctxt->lqc_lock);
+ qctxt->lqc_valid = 0;
+ spin_unlock(&qctxt->lqc_lock);
+
spin_lock(&qunit_hash_lock);
for (i = 0; i < NR_DQHASH; i++) {
list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
}
lustre_hash_exit(&LQC_HASH_BODY(qctxt));
+ /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
+ * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
+ while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
+ cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
+ cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
+ cfs_time_seconds(1));
+ }
+
ptlrpcd_decref();
#ifdef LPROCFS
opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
rc = split_before_schedule_dqacq(obd, qctxt,
&qdata, opc,
- 0);
+ 0, NULL);
if (rc == -EDQUOT)
rc = 0;
} else {
else
gid = oqctl->qc_id;
- rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt,
- uid, gid, 1, 0);
+ rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt,
+ uid, gid, 1, 0, NULL);
if (rc == -EDQUOT || rc == -EBUSY) {
CDEBUG(D_QUOTA, "rc: %d.\n", rc);
rc = 0;
#include "quota_internal.h"
#ifdef __KERNEL__
+
+static cfs_time_t last_print = 0;
+static spinlock_t last_print_lock = SPIN_LOCK_UNLOCKED;
+
static int filter_quota_setup(struct obd_device *obd)
{
int rc = 0;
static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
{
struct obd_import *imp;
+ struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ ENTRY;
/* setup the quota context import */
spin_lock(&obd->u.obt.obt_qctxt.lqc_lock);
obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse;
spin_unlock(&obd->u.obt.obt_qctxt.lqc_lock);
+ CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n",
+ obd->obd_name,exp->exp_imp_reverse, obd);
/* make imp's connect flags equal relative exp's connect flags
* adding it to avoid the scan export list
(exp->exp_connect_flags &
(OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
+ cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
/* start quota slave recovery thread. (release high limits) */
qslave_start_recovery(obd, &obd->u.obt.obt_qctxt);
- return 0;
+ RETURN(0);
}
static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ ENTRY;
/* lquota may be not set up before destroying export, b=14896 */
if (!obd->obd_set_up)
if (qctxt->lqc_import == exp->exp_imp_reverse) {
spin_lock(&qctxt->lqc_lock);
qctxt->lqc_import = NULL;
+ CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n",
+ obd->obd_name, obd);
spin_unlock(&qctxt->lqc_lock);
}
- return 0;
+ RETURN(0);
}
static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
RETURN(0);
- if (ignore)
+ if (ignore) {
+ CDEBUG(D_QUOTA, "blocks will be written with ignoring quota.\n");
cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
- else
+ } else {
cap_lower(current->cap_effective, CAP_SYS_RESOURCE);
+ }
RETURN(0);
}
}
static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
- unsigned int gid)
+ unsigned int gid, struct obd_trans_info *oti)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
int rc;
ENTRY;
- rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1);
+ rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1, oti);
RETURN(rc);
}
if (!sb_any_quota_enabled(qctxt->lqc_sb))
RETURN(rc);
+ spin_lock(&qctxt->lqc_lock);
+ if (!qctxt->lqc_valid){
+ spin_unlock(&qctxt->lqc_lock);
+ RETURN(rc);
+ }
+ spin_unlock(&qctxt->lqc_lock);
+
for (i = 0; i < MAXQUOTAS; i++) {
struct lustre_qunit_size *lqs = NULL;
lqs->lqs_iwrite_pending += count;
}
- CDEBUG(D_QUOTA, "write pending: %lu, qd_count: "LPU64".\n",
+ CDEBUG(D_QUOTA, "count: %d, write pending: %lu, qd_count: "LPU64
+ ".\n", count,
isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
qdata[i].qd_count);
if (rc2[i] == QUOTA_RET_OK) {
static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
unsigned int gid, int count, int *pending,
- int isblk, quota_acquire acquire)
+ int isblk, quota_acquire acquire,
+ struct obd_trans_info *oti)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
struct timeval work_start;
struct timeval work_end;
long timediff;
- int rc = 0, cycle = 0, count_err = 0;
+ struct l_wait_info lwi = { 0 };
+ int rc = 0, cycle = 0, count_err = 1;
ENTRY;
CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
QUOTA_RET_ACQUOTA) {
+ spin_lock(&qctxt->lqc_lock);
+ if (!qctxt->lqc_import && oti) {
+ spin_unlock(&qctxt->lqc_lock);
+
+ LASSERT(oti && oti->oti_thread &&
+ oti->oti_thread->t_watchdog);
+
+ lc_watchdog_disable(oti->oti_thread->t_watchdog);
+ CDEBUG(D_QUOTA, "sleep for quota master\n");
+ l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt),
+ &lwi);
+ CDEBUG(D_QUOTA, "wake up when quota master is back\n");
+ lc_watchdog_touch(oti->oti_thread->t_watchdog);
+ } else {
+ spin_unlock(&qctxt->lqc_lock);
+ }
+
if (rc & QUOTA_RET_INC_PENDING)
*pending = 1;
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
/* after acquire(), we should run quota_check_common again
* so that we confirm there are enough quota to finish write */
- rc = acquire(obd, uid, gid);
+ rc = acquire(obd, uid, gid, oti);
/* please reference to dqacq_completion for the below */
/* a new request is finished, try again */
break;
}
- /* -EBUSY and others, try 10 times */
- if (rc < 0 && count_err < 10) {
- CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
- cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
- continue;
+ /* -EBUSY and others, wait a second and try again */
+ if (rc < 0) {
+ cfs_waitq_t waitq;
+ struct l_wait_info lwi;
+
+ if (oti && oti->oti_thread && oti->oti_thread->t_watchdog)
+ lc_watchdog_touch(oti->oti_thread->t_watchdog);
+ CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc,
+ count_err++);
+
+ init_waitqueue_head(&waitq);
+ lwi = LWI_TIMEOUT(cfs_time_seconds(min(cycle, 10)), NULL,
+ NULL);
+ l_wait_event(waitq, 0, &lwi);
}
- if (count_err >= 10 || cycle >= 1000) {
- CDEBUG(D_ERROR, "we meet 10 errors or run too many"
- " cycles when acquiring quota, quit checking with"
- " rc: %d, cycle: %d.\n", rc, cycle);
- break;
+ if (rc < 0 || cycle % 10 == 2) {
+ spin_lock(&last_print_lock);
+ if (last_print == 0 ||
+ cfs_time_before((last_print + cfs_time_seconds(30)),
+ cfs_time_current())) {
+ CWARN("still haven't managed to acquire quota "
+ "space from the quota master after %d "
+ "retries (err=%d, rc=%d)\n",
+ cycle, count_err - 1, rc);
+ last_print = cfs_time_current();
+ }
+ spin_unlock(&last_print_lock);
}
CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
static int filter_quota_check(struct obd_device *obd, unsigned int uid,
unsigned int gid, int npage, int *flag,
- quota_acquire acquire)
+ quota_acquire acquire, struct obd_trans_info *oti)
{
return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
- acquire);
+ acquire, oti);
}
/* when a block_write or inode_create rpc is finished, adjust the record for
static int mds_quota_check(struct obd_device *obd, unsigned int uid,
unsigned int gid, int inodes, int *flag,
- quota_acquire acquire)
+ quota_acquire acquire, struct obd_trans_info *oti)
{
- return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0, acquire);
+ return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0,
+ acquire, oti);
}
static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
- unsigned int gid)
+ unsigned int gid, struct obd_trans_info *oti)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
int rc;
ENTRY;
- rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1);
+ rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1, oti);
RETURN(rc);
}
void qunit_cache_cleanup(void);
int qunit_cache_init(void);
int qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
- uid_t uid, gid_t gid, __u32 isblk, int wait);
+ uid_t uid, gid_t gid, __u32 isblk, int wait,
+ struct obd_trans_info *oti);
int qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
unsigned short type, int isblk);
int qctxt_init(struct obd_device *obd, dqacq_handler_t handler);
int compute_remquota(struct obd_device *obd,
struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata,
int isblk);
+int check_qm(struct lustre_quota_ctxt *qctxt);
/* quota_master.c */
int lustre_dquot_init(void);
void lustre_dquot_exit(void);
up(&dquot->dq_sem);
- rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0);
+ rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0, NULL);
if (rc == -EDQUOT || rc == -EBUSY) {
CDEBUG(D_QUOTA, "rc: %d.\n", rc);
rc = 0;
switch (opc) {
case FSFILT_OP_RENAME:
/* acquire/release block quota on owner of original parent */
- rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0);
+ rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0,
+ NULL);
/* fall-through */
case FSFILT_OP_SETATTR:
/* acquire/release file quota on original owner */
- rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0);
+ rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0,
+ NULL);
/* fall-through */
case FSFILT_OP_CREATE:
case FSFILT_OP_UNLINK:
/* acquire/release file/block quota on owner of child
* (or current owner) */
- rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0);
- rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
+ rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0,
+ NULL);
+ rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0,
+ NULL);
/* acquire/release block quota on owner of parent
* (or original owner) */
- rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
+ rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0,
+ NULL);
break;
default:
LBUG();
switch (opc) {
case FSFILT_OP_SETATTR:
/* acquire/release block quota on original & current owner */
- rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
- rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
+ rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0,
+ NULL);
+ rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0,
+ NULL);
break;
case FSFILT_OP_UNLINK:
/* release block quota on this owner */
case FSFILT_OP_CREATE: /* XXX for write operation on obdfilter */
/* acquire block quota on this owner */
- rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
+ rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0,
+ NULL);
break;
default:
LBUG();
else
gid = oqctl->qc_id;
- rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
+ rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0,
+ NULL);
if (rc == -EDQUOT || rc == -EBUSY) {
CDEBUG(D_QUOTA, "rc: %d.\n", rc);
rc = 0;
/* initialize all slave's limit */
rc = obd_quotactl(mds->mds_osc_exp, ioqc);
- rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
+ rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0,
+ NULL);
if (rc == -EDQUOT || rc == -EBUSY) {
CDEBUG(D_QUOTA, "rc: %d.\n", rc);
rc = 0;
. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
DIRECTIO=${DIRECTIO:-$LUSTRE/tests/directio}
-[ "$SLOW" = "no" ] && EXCEPT_SLOW="9 10 11 21"
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="9 10 11 18b 21"
QUOTALOG=${TESTSUITELOG:-$TMP/$(basename $0 .sh).log}
error "(usr) write success, should be EDQUOT"
}
+# test when mds do failover, the ost still could work well without trigger
+# watchdog b=14840
+test_18bc_sub() {
+ type=$1
+
+ LIMIT=$((110 * 1024 )) # 110M
+ TESTFILE="$DIR/$tdir/$tfile"
+ mkdir -p $DIR/$tdir
+
+ wait_delete_completed
+
+ set_blk_tunesz 512
+ set_blk_unitsz 1024
+
+ log " User quota (limit: $LIMIT kbytes)"
+ $LFS setquota -u $TSTUSR -b 0 -B $LIMIT -i 0 -I 0 $MOUNT
+ $SHOW_QUOTA_USER
+
+ $LFS setstripe $TESTFILE -i 0 -c 1
+ chown $TSTUSR.$TSTUSR $TESTFILE
+
+ timeout=$(sysctl -n lustre.timeout)
+
+ if [ $type = "directio" ]; then
+ log " write 100M block(directio) ..."
+ $RUNAS $DIRECTIO write $TESTFILE 0 100 $((BLK_SZ * 1024)) &
+ else
+ log " write 100M block(normal) ..."
+ $RUNAS dd if=/dev/zero of=$TESTFILE bs=$((BLK_SZ * 1024)) count=100 &
+ fi
+
+ DDPID=$!
+ do_facet mds "$LCTL conf_param lustre-MDT0000.mdt.quota_type=ug"
+
+ log "failing mds for $((2 * timeout)) seconds"
+ fail mds $((2 * timeout))
+
+ # check if quotaon successful
+ $LFS quota -u $TSTUSR $MOUNT 2>&1 | grep -q "quotas are not enabled"
+ if [ $? -eq 0 ]; then
+ error "quotaon failed!"
+ rm -rf $TESTFILE
+ return
+ fi
+
+ count=0
+ while [ true ]; do
+ if [ -z `ps -ef | awk '$2 == '${DDPID}' { print $8 }'` ]; then break; fi
+ if [ $((++count % (2 * timeout) )) -eq 0 ]; then
+ log "it took $count second"
+ fi
+ sleep 1
+ done
+ log "(dd_pid=$DDPID, time=$count, timeout=$timeout)"
+ sync; sleep 1; sync
+
+ testfile_size=$(stat -c %s $TESTFILE)
+ [ $testfile_size -ne $((BLK_SZ * 1024 * 100)) ] && \
+ error "verifying file failed!"
+ $SHOW_QUOTA_USER
+ $LFS setquota -u $TSTUSR -b 0 -B 0 -i 0 -I 0 $MOUNT
+ rm -rf $TESTFILE
+ sync; sleep 1; sync
+}
+
+# test when mds does failover, the ost still could work well
+# this test shouldn't trigger watchdog b=14840
+test_18b() {
+ test_18bc_sub normal
+ test_18bc_sub directio
+ # check if watchdog is triggered
+ MSG="test 18b: run for fixing bug14840"
+ do_facet ost1 "dmesg > $TMP/lustre-log-${TESTNAME}.log"
+ do_facet client cat > $TMP/lustre-log-${TESTNAME}.awk <<-EOF
+ /$MSG/ {
+ start = 1;
+ }
+ /Watchdog triggered/ {
+ if (start) {
+ print \$0;
+ }
+ }
+ EOF
+ watchdog=`do_facet ost1 awk -f $TMP/lustre-log-${TESTNAME}.awk $TMP/lustre-log-${TESTNAME}.log`
+ if [ -n "$watchdog" ]; then error "$watchdog"; fi
+}
+run_test_with_stat 18b "run for fixing bug14840(mds failover, no watchdog) ==========="
+
+# test when mds does failover, the ost still could work well
+# this test will prevent OST_DISCONNET from happening b=14840
+test_18c() {
+ # define OBD_FAIL_OST_DISCONNECT_NET 0x202(disable ost_disconnect for osts)
+ lustre_fail ost 0x202
+ test_18bc_sub normal
+ test_18bc_sub directio
+ lustre_fail ost 0
+}
+run_test_with_stat 18c "run for fixing bug14840(mds failover, OST_DISCONNECT is disabled) ==========="
+
test_19() {
# 1 Mb bunit per each MDS/OSS
local TESTFILE="$DIR/$tdir/$tfile"
facet_failover() {
facet=$1
+ sleep_time=$2
echo "Failing $facet on node `facet_active_host $facet`"
shutdown_facet $facet
+ [ -n "$sleep_time" ] && sleep $sleep_time
reboot_facet $facet
client_df &
DFPID=$!