#define QDATA_CLR_CHANGE_QS(qdata) ((qdata)->qd_flags &= ~LQUOTA_FLAGS_CHG_QS)
extern void lustre_swab_qdata(struct qunit_data *d);
-extern int quota_get_qdata(void*req, struct qunit_data *qdata,
- int is_req, int is_exp);
+extern struct qunit_data *quota_get_qdata(void*req, int is_req, int is_exp);
extern int quota_copy_qdata(void *request, struct qunit_data *qdata,
int is_req, int is_exp);
revimp->imp_remote_handle = conn;
revimp->imp_dlm_fake = 1;
revimp->imp_state = LUSTRE_IMP_FULL;
- /* this is a bit of a layering violation, but much less risk than
- * changing this very complex and race-prone code. bug=16839 */
- if (data->ocd_connect_flags & OBD_CONNECT_MDS)
- obd_set_info_async(export, sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
- 0, NULL, NULL);
/* unknown versions will be caught in
* ptlrpc_handle_server_req_in->lustre_unpack_msg() */
LASSERT(req->rq_export);
- OBD_ALLOC(qdata, sizeof(struct qunit_data));
- if (!qdata)
- RETURN(-ENOMEM);
- rc = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_EXPORT);
- if (rc < 0) {
+ qdata = quota_get_qdata(req, QUOTA_REQUEST, QUOTA_EXPORT);
+ if (IS_ERR(qdata)) {
+ rc = PTR_ERR(qdata);
CDEBUG(D_ERROR, "Can't unpack qunit_data(rc: %d)\n", rc);
+ req->rq_status = rc;
GOTO(out, rc);
}
if (!obd->obd_observer || !obd->obd_observer->obd_observer) {
CERROR("Can't find the observer, it is recovering\n");
req->rq_status = -EAGAIN;
- GOTO(send_reply, rc = -EAGAIN);
+ GOTO(out, rc);
}
master_obd = obd->obd_observer->obd_observer;
CDEBUG(D_QUOTA, "quota_type not processed yet, return "
"-EAGAIN\n");
req->rq_status = -EAGAIN;
- rc = ptlrpc_reply(req);
GOTO(out, rc);
}
CDEBUG(D_QUOTA, "quota_ctxt is not ready yet, return "
"-EAGAIN\n");
req->rq_status = -EAGAIN;
- rc = ptlrpc_reply(req);
GOTO(out, rc);
}
up_read(&obt->obt_rwsem);
if (rc && rc != -EDQUOT)
CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR,
- "dqacq failed! (rc:%d)\n", rc);
+ "dqacq/dqrel failed! (rc:%d)\n", rc);
req->rq_status = rc;
- /* there are three forms of qunit(historic causes), so we need to
- * adjust the same form to different forms slaves needed */
rc = quota_copy_qdata(req, qdata, QUOTA_REPLY, QUOTA_EXPORT);
if (rc < 0) {
- CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
+ CERROR("Can't pack qunit_data(rc: %d)\n", rc);
GOTO(out, rc);
}
/* Block the quota req. b=14840 */
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_BLOCK_QUOTA_REQ, obd_timeout);
-send_reply:
- rc = ptlrpc_reply(req);
+ EXIT;
+
out:
- OBD_FREE(qdata, sizeof(struct qunit_data));
- RETURN(rc);
+ rc = ptlrpc_reply(req);
+ return rc;
#else
return 0;
#endif /* !__KERNEL__ */
if (rc)
goto out;
- lquota_setinfo(filter_quota_interface_ref, obd, exp);
-
if (group == FILTER_GROUP_MDS0) {
/* setup llog group 1 for interop */
filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG);
}
+
+ lquota_setinfo(filter_quota_interface_ref, obd, exp);
out:
RETURN(rc);
}
if (!set && !KEY_IS(KEY_GRANT_SHRINK))
RETURN(-EINVAL);
- /* If OST understood OBD_CONNECT_MDS we don't need to tell it we
- * are the MDS again. Just do the local setup. b=16839 */
- if (KEY_IS(KEY_MDS_CONN) &&
- (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MDS))
- RETURN(osc_setinfo_mds_connect_import(imp));
-
/* We pass all other commands directly to OST. Since nobody calls osc
methods directly and everybody is supposed to go through LOV, we
assume lov checked invalid values for us.
/**
* got qdata from request(req/rep)
*/
-int quota_get_qdata(void *request, struct qunit_data *qdata,
- int is_req, int is_exp)
+struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp)
{
struct ptlrpc_request *req = (struct ptlrpc_request *)request;
- struct qunit_data *new;
+ struct qunit_data *qdata;
__u64 flags = is_exp ? req->rq_export->exp_connect_flags :
req->rq_import->imp_connect_data.ocd_connect_flags;
- int rc = 0;
LASSERT(req);
- LASSERT(qdata);
-
- /* support for quota64 and change_qs */
- if (flags & OBD_CONNECT_CHANGE_QS) {
- if (!(flags & OBD_CONNECT_QUOTA64)) {
- CDEBUG(D_ERROR, "Wire protocol for qunit is broken!\n");
- return -EINVAL;
- }
- if (is_req == QUOTA_REQUEST)
- new = lustre_swab_reqbuf(req, REQ_REC_OFF,
- sizeof(struct qunit_data),
- lustre_swab_qdata);
- else
- new = lustre_swab_repbuf(req, REPLY_REC_OFF,
- sizeof(struct qunit_data),
- lustre_swab_qdata);
- if (new == NULL)
- GOTO(out, rc = -EPROTO);
- *qdata = *new;
- QDATA_SET_CHANGE_QS(qdata);
- return 0;
- } else {
- QDATA_CLR_CHANGE_QS(qdata);
- }
-
-out:
- return rc;
+ /* support for quota64 */
+ LASSERT(flags & OBD_CONNECT_QUOTA64);
+ /* support for change_qs */
+ LASSERT(flags & OBD_CONNECT_CHANGE_QS);
+
+ if (is_req == QUOTA_REQUEST)
+ qdata = lustre_swab_reqbuf(req, REQ_REC_OFF,
+ sizeof(struct qunit_data),
+ lustre_swab_qdata);
+ else
+ qdata = lustre_swab_repbuf(req, REPLY_REC_OFF,
+ sizeof(struct qunit_data),
+ lustre_swab_qdata);
+ if (qdata == NULL)
+ return ERR_PTR(-EPROTO);
+
+ QDATA_SET_CHANGE_QS(qdata);
+ return qdata;
}
EXPORT_SYMBOL(quota_get_qdata);
void *target;
__u64 flags = is_exp ? req->rq_export->exp_connect_flags :
req->rq_import->imp_connect_data.ocd_connect_flags;
- int rc = 0;
LASSERT(req);
LASSERT(qdata);
-
- /* support for quota64 and change_qs */
- if (flags & OBD_CONNECT_CHANGE_QS) {
- if (!(flags & OBD_CONNECT_QUOTA64)) {
- CERROR("Wire protocol for qunit is broken!\n");
- return -EINVAL;
- }
- if (is_req == QUOTA_REQUEST)
- target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
- sizeof(struct qunit_data));
- else
- target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(struct qunit_data));
- if (!target)
- GOTO(out, rc = -EPROTO);
- memcpy(target, qdata, sizeof(*qdata));
- return 0;
- }
-
-out:
- return rc;
+ /* support for quota64 */
+ LASSERT(flags & OBD_CONNECT_QUOTA64);
+ /* support for change_qs */
+ LASSERT(flags & OBD_CONNECT_CHANGE_QS);
+
+ if (is_req == QUOTA_REQUEST)
+ target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+ sizeof(struct qunit_data));
+ else
+ target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
+ sizeof(struct qunit_data));
+ if (target == NULL)
+ return -EPROTO;
+
+ memcpy(target, qdata, sizeof(*qdata));
+ return 0;
}
EXPORT_SYMBOL(quota_copy_qdata);
#endif /* __KERNEL__ */
spinlock_t lq_lock; /** Protect the whole structure */
enum qunit_state lq_state; /** Present the status of qunit */
int lq_rc; /** The rc of lq_data */
+ pid_t lq_owner;
};
#define QUNIT_SET_STATE(qunit, state) \
do { \
spin_lock(&qunit->lq_lock); \
QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
- "lq_rc(%d)\n", \
+ "lq_rc(%d), lq_owner(%d)\n", \
qunit, qunit_state_names[qunit->lq_state], \
- qunit_state_names[state], qunit->lq_rc); \
+ qunit_state_names[state], qunit->lq_rc, \
+ qunit->lq_owner); \
qunit->lq_state = state; \
spin_unlock(&qunit->lq_lock); \
} while(0)
spin_lock(&qunit->lq_lock); \
qunit->lq_rc = rc; \
QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
- "lq_rc(%d)\n", \
+ "lq_rc(%d), lq_owner(%d)\n", \
qunit, qunit_state_names[qunit->lq_state], \
- qunit_state_names[state], qunit->lq_rc); \
+ qunit_state_names[state], qunit->lq_rc, \
+ qunit->lq_owner); \
qunit->lq_state = state; \
spin_unlock(&qunit->lq_lock); \
} while(0)
qunit->lq_opc = opc;
qunit->lq_lock = SPIN_LOCK_UNLOCKED;
QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+ qunit->lq_owner = cfs_curproc_pid();
RETURN(qunit);
}
struct lustre_qunit *qunit = aa->aa_qunit;
struct obd_device *obd = req->rq_import->imp_obd;
struct qunit_data *qdata = NULL;
- int rc1 = 0;
ENTRY;
LASSERT(req);
LASSERT(req->rq_import);
- /* there are several forms of qunit(historic causes), so we need to
- * adjust qunit from slaves to the same form here */
- OBD_ALLOC(qdata, sizeof(struct qunit_data));
- if (!qdata)
- RETURN(-ENOMEM);
-
down_read(&obt->obt_rwsem);
/* if a quota req timeouts or is dropped, we should update quota
* statistics which will be handled in dqacq_completion. And in
* this situation we should get qdata from request instead of
* reply */
- rc1 = quota_get_qdata(req, qdata,
- (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
- QUOTA_IMPORT);
- if (rc1 < 0) {
+ qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+ QUOTA_IMPORT);
+ if (IS_ERR(qdata)) {
+ rc = PTR_ERR(qdata);
DEBUG_REQ(D_ERROR, req,
- "error unpacking qunit_data(rc: %d)\n", rc1);
- GOTO(exit, rc = rc1);
+ "error unpacking qunit_data(rc: %ld)\n",
+ PTR_ERR(qdata));
+ RETURN(PTR_ERR(qdata));
}
QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
rc = dqacq_completion(obd, qctxt, qdata, rc,
lustre_msg_get_opc(req->rq_reqmsg));
-exit:
up_read(&obt->obt_rwsem);
- OBD_FREE(qdata, sizeof(struct qunit_data));
-
RETURN(rc);
}
spin_lock(&qunit->lq_lock);
rc = qunit->lq_rc;
spin_unlock(&qunit->lq_lock);
- CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
- qunit, rc);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+ "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+ qunit->lq_data.qd_flags, rc, qunit->lq_owner);
}
qunit_put(qunit);
QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
- CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
- qunit, qunit->lq_rc);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+ "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
/* keep same as schedule_dqacq() b=17030 */
spin_lock(&qunit->lq_lock);
rc = qunit->lq_rc;
complete(&data->comp);
+ spin_lock(&qctxt->lqc_lock);
if (qctxt->lqc_recovery) {
+ spin_unlock(&qctxt->lqc_lock);
class_decref(obd, "qslave_recovd_filter", obd);
RETURN(0);
+ } else {
+ qctxt->lqc_recovery = 1;
+ spin_unlock(&qctxt->lqc_lock);
}
- qctxt->lqc_recovery = 1;
for (type = USRQUOTA; type < MAXQUOTAS; type++) {
struct qunit_data qdata;
}
}
- class_decref(obd, "qslave_recovd_filter", obd);
+ spin_lock(&qctxt->lqc_lock);
qctxt->lqc_recovery = 0;
+ spin_unlock(&qctxt->lqc_lock);
+ class_decref(obd, "qslave_recovd_filter", obd);
RETURN(rc);
}
* dqacq/dqrel done then return the correct limits to master */
if (oqctl->qc_stat == QUOTA_RECOVERING)
qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
- oqctl->qc_id, oqctl->qc_type,
+ oqctl->qc_id, oqctl->qc_type,
1);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
{
struct obd_export *exp = data;
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
- struct obd_import *imp;
+ struct obd_import *imp = exp->exp_imp_reverse;
ENTRY;
+ LASSERT(imp != NULL);
+
/* setup the quota context import */
spin_lock(&qctxt->lqc_lock);
- qctxt->lqc_import = exp->exp_imp_reverse;
- spin_unlock(&qctxt->lqc_lock);
- CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n",
- obd->obd_name,exp->exp_imp_reverse, obd);
-
- /* make imp's connect flags equal relative exp's connect flags
- * adding it to avoid the scan export list
- */
- imp = qctxt->lqc_import;
- if (likely(imp))
+ if (qctxt->lqc_import != NULL) {
+ spin_unlock(&qctxt->lqc_lock);
+ if (qctxt->lqc_import == imp)
+ CDEBUG(D_WARNING, "%s: lqc_import(%p) of obd(%p) was "
+ "activated already.\n", obd->obd_name, imp, obd);
+ else
+ CDEBUG(D_ERROR, "%s: lqc_import(%p:%p) of obd(%p) was "
+ "activated by others.\n", obd->obd_name,
+ qctxt->lqc_import, imp, obd);
+ } else {
+ qctxt->lqc_import = imp;
+ /* make imp's connect flags equal relative exp's connect flags
+ * adding it to avoid the scan export list */
imp->imp_connect_data.ocd_connect_flags |=
- (exp->exp_connect_flags &
- (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
+ (exp->exp_connect_flags &
+ (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
+ spin_unlock(&qctxt->lqc_lock);
+ CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated "
+ "now.\n", obd->obd_name, imp, obd);
- cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
- /* start quota slave recovery thread. (release high limits) */
- qslave_start_recovery(obd, qctxt);
+ cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
+ /* start quota slave recovery thread. (release high limits) */
+ qslave_start_recovery(obd, qctxt);
+ }
RETURN(0);
}
static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ struct obd_import *imp = exp->exp_imp_reverse;
ENTRY;
/* lquota may be not set up before destroying export, b=14896 */
if (!obd->obd_set_up)
RETURN(0);
+ if (unlikely(imp == NULL))
+ RETURN(0);
+
/* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import
* should be invalid b=12374 */
- if (qctxt->lqc_import && qctxt->lqc_import == exp->exp_imp_reverse) {
- spin_lock(&qctxt->lqc_lock);
+ spin_lock(&qctxt->lqc_lock);
+ if (qctxt->lqc_import == imp) {
qctxt->lqc_import = NULL;
spin_unlock(&qctxt->lqc_lock);
- ptlrpc_cleanup_imp(exp->exp_imp_reverse);
+ CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is invalid now.\n",
+ obd->obd_name, imp, obd);
+ ptlrpc_cleanup_imp(imp);
dqacq_interrupt(qctxt);
- CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n",
- obd->obd_name, obd);
+ } else {
+ spin_unlock(&qctxt->lqc_lock);
}
RETURN(0);
}