* For the time being, we only support fixed-size key & record. */
char lip_entries[0];
};
+extern void lustre_swab_lip_header(struct lu_idxpage *lip);
#define LIP_HDR_SIZE (offsetof(struct lu_idxpage, lip_entries))
/* disconnect the osp-on-ost first to drain off the inflight request */
if (IS_OST(lsi) || IS_MDT(lsi)) {
- if (lustre_disconnect_osp(sb) < 0)
- CERROR("%s: Fail to disconnect osp-on-ost!\n", tmpname);
+ int rc;
+
+ rc = lustre_disconnect_osp(sb);
+ if (rc && rc != ETIMEDOUT)
+ CERROR("%s: failed to disconnect osp-on-ost (rc=%d)!\n",
+ tmpname, rc);
}
/* Stop the target */
(void)ptlrpc_pinger_del_import(imp);
rc = ptlrpc_disconnect_import(imp, 0);
- if (rc)
+ if (rc && rc != -ETIMEDOUT)
CERROR("%s: can't disconnect: rc = %d\n",
d->opd_obd->obd_name, rc);
if (req->rq_status != -ENOSPC && req->rq_status != -EACCES &&
req->rq_status != -EPERM && req->rq_status != -ENOENT &&
- req->rq_status != -EINPROGRESS)
+ req->rq_status != -EINPROGRESS && req->rq_status != -EDQUOT)
req->rq_type = PTL_RPC_MSG_ERR;
rc = ptlrpc_send_reply(req, may_be_difficult);
__swab16s(&ii->ii_recsize);
}
+void lustre_swab_lip_header(struct lu_idxpage *lip)
+{
+ /* swab header */
+ __swab32s(&lip->lip_magic);
+ __swab16s(&lip->lip_flags);
+ __swab16s(&lip->lip_nr);
+}
+EXPORT_SYMBOL(lustre_swab_lip_header);
+
void lustre_swab_mdt_rec_reint (struct mdt_rec_reint *rr)
{
__swab32s (&rr->rr_opcode);
libcfs_debug_vmsg2(msgdata, fmt, args,
"qmt:%s pool:%d-%s id:"LPU64" enforced:%d hard:"LPU64
" soft:"LPU64" granted:"LPU64" time:"LPU64" qunit:"
- LPU64" edquot:%d revoke:"LPU64"\n",
+ LPU64" edquot:%d may_rel:"LPU64" revoke:"LPU64"\n",
pool->qpi_qmt->qmt_svname,
pool->qpi_key & 0x0000ffff,
RES_NAME(pool->qpi_key >> 16),
lqe->lqe_id.qid_uid, lqe->lqe_enforced,
lqe->lqe_hardlimit, lqe->lqe_softlimit,
lqe->lqe_granted, lqe->lqe_gracetime,
- lqe->lqe_qunit, lqe->lqe_edquot,
+ lqe->lqe_qunit, lqe->lqe_edquot, lqe->lqe_may_rel,
lqe->lqe_revoke_time);
}
void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
{
struct qmt_pool_info *pool = lqe2qpi(lqe);
+ ENTRY;
if (!lqe->lqe_enforced)
RETURN_EXIT;
* some quota space */
RETURN_EXIT;
+ if (lqe->lqe_revoke_time == 0)
+ /* least qunit value not sent to all slaves yet */
+ RETURN_EXIT;
+
if (lqe->lqe_may_rel != 0 &&
- cfs_time_beforeq_64(lqe->lqe_revoke_time,
- cfs_time_shift_64(-QMT_REBA_TIMEOUT)))
+ cfs_time_before_64(cfs_time_shift_64(-QMT_REBA_TIMEOUT),
+ lqe->lqe_revoke_time))
/* Let's give more time to slave to release space */
RETURN_EXIT;
/* let's notify slave by issuing glimpse on per-ID lock.
* the rebalance thread will take care of this */
qmt_id_lock_notify(pool->qpi_qmt, lqe);
+ EXIT;
}
/*
glb_rec->qbr_softlimit) ? true : false;
LQUOTA_DEBUG(lqe, "updating global index hardlimit: "LPU64", "
- "softlimit: "LPU64"\n", glb_rec->qbr_hardlimit,
+ "softlimit: "LPU64, glb_rec->qbr_hardlimit,
glb_rec->qbr_softlimit);
} else {
struct lquota_slv_rec *slv_rec = (struct lquota_slv_rec *)rec;
lqe_write_lock(lqe);
- if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
- LQUOTA_ERROR(lqe, "DQACQ failed with %d, op:%x", ret,
- reqbody->qb_flags);
- GOTO(out, ret);
- }
+ LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:%x", ret,
+ reqbody->qb_flags);
/* despite -EDQUOT & -EINPROGRESS errors, the master might still
* grant us back quota space to adjust quota overrun */
-
- LQUOTA_DEBUG(lqe, "DQACQ returned %d", ret);
+ if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
+ if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
+ ret != -ESHUTDOWN && ret != -EAGAIN)
+ /* print errors only if return code is unexpected */
+ LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:%x", ret,
+ reqbody->qb_flags);
+ GOTO(out, ret);
+ }
/* Set the lqe_lockh */
if (lustre_handle_is_used(lockh) &&
struct lquota_trans *trans, struct lquota_id_info *qi,
int *flags)
{
- struct qsd_qtype_info *qqi;
- int i, rc;
- bool found = false;
+ int i, rc;
+ bool found = false;
ENTRY;
if (unlikely(qsd == NULL))
(qsd->qsd_is_md && qi->lqi_is_blk))
RETURN(0);
- qqi = qsd->qsd_type_array[qi->lqi_type];
-
/* ignore quota enforcement request when:
* - quota isn't enforced for this quota type
- * or - we failed to access the accounting object for this quota type
- * or - the space to acquire is null
* or - the user/group is root */
- if (!qsd_type_enabled(qsd, qi->lqi_type) || qqi->qqi_acct_obj == NULL ||
- qi->lqi_id.qid_uid == 0)
+ if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
RETURN(0);
LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
}
/* manage quota enforcement for this ID */
- rc = qsd_op_begin0(env, qqi, &trans->lqt_ids[i], qi->lqi_space, flags);
-
+ rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
+ &trans->lqt_ids[i], qi->lqi_space, flags);
RETURN(rc);
}
EXPORT_SYMBOL(qsd_op_begin);
lqe->lqe_nopreacq = false;
}
-#define QSD_WB_INTERVAL 15 /* 15 seconds */
+#define QSD_WB_INTERVAL 60 /* 60 seconds */
/* qsd_entry.c */
extern struct lquota_entry_operations qsd_lqe_ops;
{
struct qsd_instance *qsd = (struct qsd_instance *)data;
char enabled[5];
+
LASSERT(qsd != NULL);
memset(enabled, 0, sizeof(enabled));
return snprintf(page, count, "%s\n", enabled);
}
+/* force reintegration procedure to be executed.
+ * Used for test/debugging purpose */
+static int lprocfs_qsd_wr_force_reint(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct qsd_instance *qsd = (struct qsd_instance *)data;
+ int rc = 0, qtype;
+
+ LASSERT(qsd != NULL);
+
+ cfs_write_lock(&qsd->qsd_lock);
+ if (qsd->qsd_stopping) {
+ /* don't mess up with shutdown procedure, it is already
+ * complicated enough */
+ rc = -ESHUTDOWN;
+ } else if (!qsd->qsd_prepared) {
+ rc = -EAGAIN;
+ } else {
+ /* mark all indexes as stale */
+ for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+ qsd->qsd_type_array[qtype]->qqi_glb_uptodate = false;
+ qsd->qsd_type_array[qtype]->qqi_slv_uptodate = false;
+ }
+ }
+ cfs_write_unlock(&qsd->qsd_lock);
+
+ if (rc)
+ return rc;
+
+ /* kick off reintegration */
+ for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+ rc = qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
+ if (rc)
+ break;
+ }
+ return rc == 0 ? count : rc;
+}
+
static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
{ "info", lprocfs_qsd_rd_state, 0, 0},
{ "enabled", lprocfs_qsd_rd_enabled, 0, 0},
+ { "force_reint", 0, lprocfs_qsd_wr_force_reint, 0},
{ NULL }
};
int qtype;
ENTRY;
+ if (unlikely(qsd == NULL))
+ RETURN_EXIT;
+
CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname);
cfs_write_lock(&qsd->qsd_lock);
qsd->qsd_stopping = true;
int qtype, rc = 0;
ENTRY;
- LASSERT(qsd != NULL);
+ if (unlikely(qsd == NULL))
+ RETURN(0);
cfs_read_lock(&qsd->qsd_lock);
if (qsd->qsd_prepared) {
int type, rc = 0;
ENTRY;
+ if (unlikely(qsd == NULL))
+ RETURN(0);
+
cfs_write_lock(&qsd->qsd_lock);
if (!qsd->qsd_prepared) {
CERROR("%s: can't start qsd instance since it was properly "
ENTRY;
if (rc) {
- CERROR("%s: failed to enqueue global quota lock, glb "
- "fid:"DFID", rc:%d\n", qsd->qsd_svname,
- PFID(&req_qbody->qb_fid), rc);
+ CDEBUG_LIMIT(rc != -EAGAIN ? D_ERROR : D_QUOTA,
+ "%s: failed to enqueue global quota lock, glb fid:"
+ DFID", rc:%d\n", qsd->qsd_svname,
+ PFID(&req_qbody->qb_fid), rc);
RETURN_EXIT;
}
if (IS_ERR(lqe))
RETURN(PTR_ERR(lqe));
+ LQUOTA_DEBUG(lqe, "reintegrating entry");
+
rc = qsd_update_lqe(env, lqe, global, rec);
if (rc)
GOTO(out, rc);
unsigned int npages, bool need_swab)
{
struct qsd_thread_info *qti = qsd_info(env);
+ struct qsd_instance *qsd = qqi->qqi_qsd;
union lquota_id *qid = &qti->qti_id;
int i, j, k, size;
int rc = 0;
ENTRY;
+ CDEBUG(D_QUOTA, "%s: processing %d pages for %s index\n",
+ qsd->qsd_svname, npages, global ? "global" : "slave");
+
/* sanity check on the record size */
if ((global && ii->ii_recsize != sizeof(struct lquota_glb_rec)) ||
(!global && ii->ii_recsize != sizeof(struct lquota_slv_rec))) {
- CERROR("Invalid record size:%d, global:%s\n",
- ii->ii_recsize, global ? "true" : "false");
+ CERROR("%s: invalid record size (%d) for %s index\n",
+ qsd->qsd_svname, ii->ii_recsize,
+ global ? "global" : "slave");
RETURN(-EINVAL);
}
- size = ii->ii_recsize + ii->ii_keysize + sizeof(__u64);
+ size = ii->ii_recsize + ii->ii_keysize;
for (i = 0; i < npages; i++) {
union lu_page *lip = cfs_kmap(pages[i]);
for (j = 0; j < LU_PAGE_COUNT; j++) {
+ if (need_swab)
+ /* swab header */
+ lustre_swab_lip_header(&lip->lp_idx);
+
+ if (lip->lp_idx.lip_magic != LIP_MAGIC) {
+ CERROR("%s: invalid magic (%x != %x) for page "
+ "%d/%d while transferring %s index\n",
+ qsd->qsd_svname, lip->lp_idx.lip_magic,
+ LIP_MAGIC, i + 1, npages,
+ global ? "global" : "slave");
+ GOTO(out, rc = -EINVAL);
+ }
+
+ CDEBUG(D_QUOTA, "%s: processing page %d/%d with %d "
+ "entries for %s index\n", qsd->qsd_svname, i + 1,
+ npages, lip->lp_idx.lip_nr,
+ global ? "global" : "slave");
+
for (k = 0; k < lip->lp_idx.lip_nr; k++) {
char *entry;
if (req == NULL)
GOTO(out, rc = -ENOMEM);
- req->rq_no_resend = req->rq_no_delay = 1;
rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
if (rc) {
ptlrpc_request_free(req);
case IT_QUOTA_DQACQ:
/* grab reference on lqe for new lock */
lqe_getref((struct lquota_entry *)arg);
+ /* all acquire/release request are sent with no_resend and
+ * no_delay flag */
+ req->rq_no_resend = req->rq_no_delay = 1;
break;
default:
break;
if (!qsd_type_enabled(qsd, qtype))
continue;
- if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate)
+ if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
+ !qqi->qqi_reint)
+ /* global or slave index not up to date and reint
+ * thread not running */
*uptodate = false;
}
run_test 22 "Simultaneous manipulation of a pool"
test_23a() {
- # XXX remove this once all quota code landed
- skip_env "quota isn't functional" && return
-
set_cleanup_trap
local POOL_ROOT=${POOL_ROOT:-$DIR/$tdir}
[[ $OSTCOUNT -le 1 ]] && skip_env "Need at least 2 OSTs" && return
run_test 23a "OST pools and quota"
test_23b() {
- # XXX remove this once all quota code landed
- skip_env "quota isn't functional" && return
-
set_cleanup_trap
local POOL_ROOT=${POOL_ROOT:-$DIR/$tdir}
[[ $OSTCOUNT -le 1 ]] && skip_env "Need at least 2 OSTs" && return 0
# test dropping acquire request on master
test_6() {
+ local LIMIT=3 # 3M
+
setup_quota_test
trap cleanup_quota_test EXIT
chown $TSTUSR2.$TSTUSR2 $TESTFILE2
# cache per-ID lock for $TSTUSR on slave
- $LFS setquota -u $TSTUSR -b 0 -B 2M -i 0 -I 0 $DIR
+ $LFS setquota -u $TSTUSR -b 0 -B ${LIMIT}M -i 0 -I 0 $DIR
$RUNAS $DD of=$TESTFILE count=1 ||
error "write $TESTFILE failure, expect success"
$RUNAS2 $DD of=$TESTFILE2 count=1 ||
lustre_fail mds 0x513 601
# write to un-enforced ID ($TSTUSR2) should succeed
- $RUNAS2 $DD of=$TESTFILE2 count=1 seek=1 oflag=sync conv=notrunc ||
+ $RUNAS2 $DD of=$TESTFILE2 count=$LIMIT seek=1 oflag=sync conv=notrunc ||
error "write failure, expect success"
# write to enforced ID ($TSTUSR) in background, exceeding limit
# to make sure DQACQ is sent
- $RUNAS $DD of=$TESTFILE count=2 seek=1 oflag=sync conv=notrunc &
+ $RUNAS $DD of=$TESTFILE count=$LIMIT seek=1 oflag=sync conv=notrunc &
DDPID=$!
echo "Sleep for $TIMEOUT"
return
fi
- # XXX remove it once all quota code landed
- echo "skip quota setup"
- return
-
local mntpt=$1
# save old quota type & set new quota type