spinlock_t lq_lock; /** Protect the whole structure */
enum qunit_state lq_state; /** Present the status of qunit */
int lq_rc; /** The rc of lq_data */
+ pid_t lq_owner;
};
#define QUNIT_SET_STATE(qunit, state) \
do { \
spin_lock(&qunit->lq_lock); \
QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
- "lq_rc(%d)\n", \
+ "lq_rc(%d), lq_owner(%d)\n", \
qunit, qunit_state_names[qunit->lq_state], \
- qunit_state_names[state], qunit->lq_rc); \
+ qunit_state_names[state], qunit->lq_rc, \
+ qunit->lq_owner); \
qunit->lq_state = state; \
spin_unlock(&qunit->lq_lock); \
} while(0)
spin_lock(&qunit->lq_lock); \
qunit->lq_rc = rc; \
QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
- "lq_rc(%d)\n", \
+ "lq_rc(%d), lq_owner(%d)\n", \
qunit, qunit_state_names[qunit->lq_state], \
- qunit_state_names[state], qunit->lq_rc); \
+ qunit_state_names[state], qunit->lq_rc, \
+ qunit->lq_owner); \
qunit->lq_state = state; \
spin_unlock(&qunit->lq_lock); \
} while(0)
qunit->lq_opc = opc;
qunit->lq_lock = SPIN_LOCK_UNLOCKED;
QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+ qunit->lq_owner = cfs_curproc_pid();
RETURN(qunit);
}
struct lustre_qunit *qunit = aa->aa_qunit;
struct obd_device *obd = req->rq_import->imp_obd;
struct qunit_data *qdata = NULL;
- int rc1 = 0;
ENTRY;
LASSERT(req);
LASSERT(req->rq_import);
- /* there are several forms of qunit(historic causes), so we need to
- * adjust qunit from slaves to the same form here */
- OBD_ALLOC(qdata, sizeof(struct qunit_data));
- if (!qdata)
- RETURN(-ENOMEM);
-
down_read(&obt->obt_rwsem);
/* if a quota req timeouts or is dropped, we should update quota
* statistics which will be handled in dqacq_completion. And in
* this situation we should get qdata from request instead of
* reply */
- rc1 = quota_get_qdata(req, qdata,
- (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
- QUOTA_IMPORT);
- if (rc1 < 0) {
+ qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+ QUOTA_IMPORT);
+ if (IS_ERR(qdata)) {
+ rc = PTR_ERR(qdata);
DEBUG_REQ(D_ERROR, req,
- "error unpacking qunit_data(rc: %d)\n", rc1);
- GOTO(exit, rc = rc1);
+ "error unpacking qunit_data(rc: %ld)\n",
+ PTR_ERR(qdata));
+ RETURN(PTR_ERR(qdata));
}
QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
rc = dqacq_completion(obd, qctxt, qdata, rc,
lustre_msg_get_opc(req->rq_reqmsg));
-exit:
up_read(&obt->obt_rwsem);
- OBD_FREE(qdata, sizeof(struct qunit_data));
-
RETURN(rc);
}
EXIT;
}
-static int got_qunit(struct lustre_qunit *qunit)
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
{
struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
int rc = 0;
if (!rc) {
spin_lock(&qctxt->lqc_lock);
- rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+ rc = !qctxt->lqc_valid;
+ if (!is_master)
+ rc |= !qctxt->lqc_import;
spin_unlock(&qctxt->lqc_lock);
}
l_wait_event(qctxt->lqc_wait_for_qmaster,
check_qm(qctxt), &lwi);
CDEBUG(D_QUOTA, "wake up when quota master is back\n");
- lc_watchdog_touch(oti->oti_thread->t_watchdog);
+ lc_watchdog_touch(oti->oti_thread->t_watchdog,
+ GET_TIMEOUT(oti->oti_thread->t_svc));
} else {
spin_unlock(&qctxt->lqc_lock);
}
struct qunit_data *p = &qunit->lq_data;
QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
- l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+ l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+ &lwi);
/* rc = -EAGAIN, it means the quota master isn't ready yet
* rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
* rc = -EDQUOT, it means out of quota
spin_lock(&qunit->lq_lock);
rc = qunit->lq_rc;
spin_unlock(&qunit->lq_lock);
- CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
- qunit, rc);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+ "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+ qunit->lq_data.qd_flags, rc, qunit->lq_owner);
}
qunit_put(qunit);
int
qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
- uid_t uid, gid_t gid, __u32 isblk, int wait,
+ const unsigned int id[], __u32 isblk, int wait,
struct obd_trans_info *oti)
{
int rc = 0, i = USRQUOTA;
- __u32 id[MAXQUOTAS] = { uid, gid };
struct qunit_data qdata[MAXQUOTAS];
ENTRY;
struct qunit_data *p = &qunit->lq_data;
QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
- l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
- CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
- qunit, qunit->lq_rc);
+ l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+ &lwi);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+ "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
/* keep same as schedule_dqacq() b=17030 */
spin_lock(&qunit->lq_lock);
rc = qunit->lq_rc;
qctxt->lqc_sync_blk = 0;
spin_unlock(&qctxt->lqc_lock);
- qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7,
+ qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
+ HASH_LQS_CUR_BITS,
+ HASH_LQS_MAX_BITS,
&lqs_hash_ops, 0);
if (!qctxt->lqc_lqs_hash) {
CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
ptlrpc_daemonize("qslave_recovd");
+ /* for obdfilter */
+ class_incref(obd, "qslave_recovd_filter", obd);
+
complete(&data->comp);
- if (qctxt->lqc_recovery)
+ spin_lock(&qctxt->lqc_lock);
+ if (qctxt->lqc_recovery) {
+ spin_unlock(&qctxt->lqc_lock);
+ class_decref(obd, "qslave_recovd_filter", obd);
RETURN(0);
- qctxt->lqc_recovery = 1;
+ } else {
+ qctxt->lqc_recovery = 1;
+ spin_unlock(&qctxt->lqc_lock);
+ }
for (type = USRQUOTA; type < MAXQUOTAS; type++) {
struct qunit_data qdata;
}
}
+ spin_lock(&qctxt->lqc_lock);
qctxt->lqc_recovery = 0;
+ spin_unlock(&qctxt->lqc_lock);
+ class_decref(obd, "qslave_recovd_filter", obd);
RETURN(rc);
}
static void *
lqs_get(struct hlist_node *hnode)
{
- struct lustre_qunit_size *q =
+ struct lustre_qunit_size *q =
hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
static void *
lqs_put(struct hlist_node *hnode)
{
- struct lustre_qunit_size *q =
+ struct lustre_qunit_size *q =
hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
ENTRY;
q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
- /*
+ /*
* Nothing should be left. User of lqs put it and
* lqs also was deleted from table by this time
* so we should have 0 refs.
*/
- LASSERTF(atomic_read(&q->lqs_refcount) == 0,
+ LASSERTF(atomic_read(&q->lqs_refcount) == 0,
"Busy lqs %p with %d refs\n", q,
atomic_read(&q->lqs_refcount));
OBD_FREE_PTR(q);