+
+int qmt_pool_lqes_lookup(const struct lu_env *env,
+ struct qmt_device *qmt,
+ int rtype, int stype,
+ int qtype, union lquota_id *qid,
+ char *pool_name, int idx)
+{
+ struct qmt_pool_info *pool;
+ struct lquota_entry *lqe;
+ int rc, i;
+ ENTRY;
+
+ /* Until MDT pools are not emplemented, all MDTs belong to
+ * global pool, thus lookup lqes only from global pool. */
+ if (rtype == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+ idx = -1;
+
+ qti_pools_init(env);
+ rc = 0;
+ /* look-up pool responsible for this global index FID */
+ pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
+ if (IS_ERR(pool)) {
+ qti_pools_fini(env);
+ RETURN(PTR_ERR(pool));
+ }
+
+ /* now that we have the pool, let's look-up the quota entry in the
+ * right quota site */
+ qti_lqes_init(env);
+ for (i = 0; i < qti_pools_cnt(env); i++) {
+ pool = qti_pools_env(env)[i];
+ lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
+ if (IS_ERR(lqe)) {
+ qti_lqes_fini(env);
+ GOTO(out, rc = PTR_ERR(lqe));
+ }
+ /* Only release could be done for not enforced lqe
+ * (see qmt_dqacq0). However slave could request to
+ * release more than not global lqe had granted before
+ * lqe_enforced was cleared. It is legal case,
+ * because even if current lqe is not enforced,
+ * lqes from other pools are still active and avilable
+ * for acquiring. Furthermore, skip not enforced lqe
+ * to don't make extra allocations. */
+ /*if (!lqe_is_glbl(lqe) && !lqe->lqe_enforced) {
+ lqe_putref(lqe);
+ continue;
+ }*/
+ qti_lqes_add(env, lqe);
+ }
+ LASSERT(qti_lqes_glbl(env)->lqe_is_global);
+
+out:
+ qti_pools_fini(env);
+ RETURN(rc);
+}
+
+static int lqes_cmp(const void *arg1, const void *arg2)
+{
+ const struct lquota_entry *lqe1, *lqe2;
+
+ lqe1 = *(const struct lquota_entry **)arg1;
+ lqe2 = *(const struct lquota_entry **)arg2;
+ if (lqe1->lqe_qunit > lqe2->lqe_qunit)
+ return 1;
+ if (lqe1->lqe_qunit < lqe2->lqe_qunit)
+ return -1;
+ return 0;
+}
+
+void qmt_lqes_sort(const struct lu_env *env)
+{
+ sort(qti_lqes(env), qti_lqes_cnt(env), sizeof(void *), lqes_cmp, NULL);
+ /* global lqe was moved during sorting */
+ if (!qti_lqes_glbl(env)->lqe_is_global) {
+ int i;
+ for (i = 0; i < qti_lqes_cnt(env); i++) {
+ if (qti_lqes(env)[i]->lqe_is_global) {
+ qti_glbl_lqe_idx(env) = i;
+ break;
+ }
+ }
+ }
+}
+
+int qmt_pool_lqes_lookup_spec(const struct lu_env *env, struct qmt_device *qmt,
+ int rtype, int qtype, union lquota_id *qid)
+{
+ struct qmt_pool_info *pos;
+ struct lquota_entry *lqe;
+ int rc = 0;
+
+ qti_lqes_init(env);
+ down_read(&qmt->qmt_pool_lock);
+ if (list_empty(&qmt->qmt_pool_list)) {
+ up_read(&qmt->qmt_pool_lock);
+ RETURN(-ENOENT);
+ }
+
+ list_for_each_entry(pos, &qmt->qmt_pool_list, qpi_linkage) {
+ if (pos->qpi_rtype != rtype)
+ continue;
+ /* Don't take into account pools without slaves */
+ if (!qpi_slv_nr(pos, qtype))
+ continue;
+ lqe = lqe_find(env, pos->qpi_site[qtype], qid);
+ /* ENOENT is valid case for lqe from non global pool
+ * that hasn't limits, i.e. not enforced. Continue even
+ * in case of error - we can handle already found lqes */
+ if (IS_ERR_OR_NULL(lqe)) {
+ /* let know that something went wrong */
+ rc = lqe ? PTR_ERR(lqe) : -ENOENT;
+ continue;
+ }
+ if (!lqe->lqe_enforced) {
+ /* no settings for this qid_uid */
+ lqe_putref(lqe);
+ continue;
+ }
+ qti_lqes_add(env, lqe);
+ CDEBUG(D_QUOTA, "adding lqe %p from pool %s\n",
+ lqe, pos->qpi_name);
+ }
+ up_read(&qmt->qmt_pool_lock);
+ RETURN(rc);
+}
+
+/**
+ * Allocate a new pool for the specified device.
+ *
+ * Allocate a new pool_desc structure for the specified \a new_pool
+ * device to create a pool with the given \a poolname. The new pool
+ * structure is created with a single reference, and is freed when the
+ * reference count drops to zero.
+ *
+ * \param[in] obd Lustre OBD device on which to add a pool iterator
+ * \param[in] poolname the name of the pool to be created
+ *
+ * \retval 0 in case of success
+ * \retval negative error code in case of error
+ */
+int qmt_pool_new(struct obd_device *obd, char *poolname)
+{
+ struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
+ struct qmt_pool_info *qpi;
+ struct lu_env env;
+ int rc;
+ ENTRY;
+
+ if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+ RETURN(-ENAMETOOLONG);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: can't init env: rc = %d\n", obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (!IS_ERR(qpi)) {
+ /* Valid case when several MDTs are mounted
+ * at the same node. */
+ CDEBUG(D_QUOTA, "pool %s already exists\n", poolname);
+ qpi_putref(&env, qpi);
+ GOTO(out_env, rc = -EEXIST);
+ }
+ if (PTR_ERR(qpi) != -ENOENT) {
+ CWARN("%s: pool %s lookup failed: rc = %ld\n",
+ obd->obd_name, poolname, PTR_ERR(qpi));
+ GOTO(out_env, rc = PTR_ERR(qpi));
+ }
+
+ /* Now allocate and prepare only DATA pool.
+ * Further when MDT pools will be ready we need to add
+ * a cycle here and setup pools of both types. Another
+ * approach is to find out pool of which type should be
+ * created. */
+ rc = qmt_pool_alloc(&env, qmt, poolname, LQUOTA_RES_DT);
+ if (rc) {
+ CERROR("%s: can't alloc pool %s: rc = %d\n",
+ obd->obd_name, poolname, rc);
+ GOTO(out_env, rc);
+ }
+
+ rc = qmt_pool_prepare(&env, qmt, qmt->qmt_root, poolname);
+ if (rc) {
+ CERROR("%s: can't prepare pool for %s: rc = %d\n",
+ obd->obd_name, poolname, rc);
+ GOTO(out_err, rc);
+ }
+
+ CDEBUG(D_QUOTA, "Quota pool "LOV_POOLNAMEF" added\n",
+ poolname);
+
+ GOTO(out_env, rc);
+out_err:
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (!IS_ERR(qpi)) {
+ qpi_putref(&env, qpi);
+ qpi_putref(&env, qpi);
+ }
+out_env:
+ lu_env_fini(&env);
+ return rc;
+}
+
+static int
+qmt_obj_recalc(const struct lu_env *env, struct dt_object *obj,
+ struct lquota_site *site)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ union lquota_id *qid = &qti->qti_id;
+ const struct dt_it_ops *iops;
+ struct dt_key *key;
+ struct dt_it *it;
+ __u64 granted;
+ int rc;
+ ENTRY;
+
+ iops = &obj->do_index_ops->dio_it;
+
+ it = iops->init(env, obj, 0);
+ if (IS_ERR(it)) {
+ CWARN("quota: initialize it for "DFID" failed: rc = %ld\n",
+ PFID(&qti->qti_fid), PTR_ERR(it));
+ RETURN(PTR_ERR(it));
+ }
+
+ rc = iops->load(env, it, 0);
+ if (rc < 0) {
+ CWARN("quota: load first entry for "DFID" failed: rc = %d\n",
+ PFID(&qti->qti_fid), rc);
+ GOTO(out, rc);
+ } else if (rc == 0) {
+ rc = iops->next(env, it);
+ if (rc != 0)
+ GOTO(out, rc = (rc < 0) ? rc : 0);
+ }
+
+ do {
+ struct lquota_entry *lqe;
+
+ key = iops->key(env, it);
+ if (IS_ERR(key)) {
+ CWARN("quota: error key for "DFID": rc = %ld\n",
+ PFID(&qti->qti_fid), PTR_ERR(key));
+ GOTO(out, rc = PTR_ERR(key));
+ }
+
+ /* skip the root user/group */
+ if (*((__u64 *)key) == 0)
+ goto next;
+
+ qid->qid_uid = *((__u64 *)key);
+
+ rc = qmt_slv_read(env, qid, obj, &granted);
+ if (!granted)
+ goto next;
+
+ lqe = lqe_locate(env, site, qid);
+ if (IS_ERR(lqe))
+ GOTO(out, rc = PTR_ERR(lqe));
+ lqe_write_lock(lqe);
+ lqe->lqe_recalc_granted += granted;
+ lqe_write_unlock(lqe);
+ lqe_putref(lqe);
+next:
+ rc = iops->next(env, it);
+ if (rc < 0)
+ CWARN("quota: failed to parse index "DFID
+ ", ->next error: rc = %d\n",
+ PFID(&qti->qti_fid), rc);
+ } while (rc == 0 && !kthread_should_stop());
+
+out:
+ iops->put(env, it);
+ iops->fini(env, it);
+ RETURN(rc);
+}
+
+static int qmt_site_recalc_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *data)
+{
+ struct lquota_entry *lqe;
+ struct lu_env *env = data;
+
+ lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+ lqe_write_lock(lqe);
+ if (lqe->lqe_granted != lqe->lqe_recalc_granted) {
+ struct qmt_device *qmt = lqe2qpi(lqe)->qpi_qmt;
+ struct thandle *th;
+ bool need_notify = false;
+ int rc;
+
+ LQUOTA_DEBUG(lqe, "lqe_recalc_granted %llu\n",
+ lqe->lqe_recalc_granted);
+ lqe->lqe_granted = lqe->lqe_recalc_granted;
+ /* Always returns true, if there is no slaves in a pool */
+ need_notify |= qmt_adjust_qunit(env, lqe);
+ need_notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds());
+ if (need_notify) {
+ /* Find all lqes with lqe_id to reseed lgd array */
+ rc = qmt_pool_lqes_lookup_spec(env, qmt, lqe_rtype(lqe),
+ lqe_qtype(lqe), &lqe->lqe_id);
+ if (!rc && qti_lqes_glbl(env)->lqe_glbl_data) {
+ qmt_seed_glbe(env,
+ qti_lqes_glbl(env)->lqe_glbl_data);
+ qmt_id_lock_notify(qmt, qti_lqes_glbl(env));
+ }
+ qti_lqes_fini(env);
+ }
+ th = dt_trans_create(env, qmt->qmt_child);
+ if (IS_ERR(th))
+ goto out;
+
+ rc = lquota_disk_declare_write(env, th,
+ LQE_GLB_OBJ(lqe),
+ &lqe->lqe_id);
+ if (rc)
+ GOTO(out_stop, rc);
+
+ rc = dt_trans_start_local(env, qmt->qmt_child, th);
+ if (rc)
+ GOTO(out_stop, rc);
+
+ qmt_glb_write(env, th, lqe, 0, NULL);
+out_stop:
+ dt_trans_stop(env, qmt->qmt_child, th);
+ }
+out:
+ lqe->lqe_recalc_granted = 0;
+ lqe_write_unlock(lqe);
+
+ return 0;
+}
+
+#define MDT_DEV_NAME_LEN (LUSTRE_MAXFSNAME + sizeof("-MDT0000"))
+static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
+{
+ char mdt_name[MDT_DEV_NAME_LEN];
+ struct lustre_mount_info *lmi;
+ struct obd_device *obd;
+ int rc;
+ ENTRY;
+
+ rc = server_name2fsname(qmt->qmt_svname, mdt_name, NULL);
+ if (rc) {
+ CERROR("quota: cannot get server name from %s: rc = %d\n",
+ qmt->qmt_svname, rc);
+ RETURN(ERR_PTR(rc));
+ }
+
+ strlcat(mdt_name, "-MDT0000", MDT_DEV_NAME_LEN);
+ lmi = server_get_mount(mdt_name);
+ if (lmi == NULL) {
+ rc = -ENOENT;
+ CERROR("%s: cannot get mount info from %s: rc = %d\n",
+ qmt->qmt_svname, mdt_name, rc);
+ RETURN(ERR_PTR(rc));
+ }
+ obd = s2lsi(lmi->lmi_sb)->lsi_mgc;
+ lustre_put_lsi(lmi->lmi_sb);
+
+ RETURN(obd);
+}
+
+static int qmt_pool_recalc(void *args)
+{
+ struct qmt_pool_info *pool, *glbl_pool;
+ struct rw_semaphore *sem = NULL;
+ struct obd_device *obd;
+ struct lu_env env;
+ int i, rc, qtype, slaves_cnt;
+ ENTRY;
+
+ pool = args;
+
+ obd = qmt_get_mgc(pool->qpi_qmt);
+ if (IS_ERR(obd))
+ GOTO(out, rc = PTR_ERR(obd));
+ else
+ /* Waiting for the end of processing mgs config.
+ * It is needed to be sure all pools are configured. */
+ while (obd->obd_process_conf)
+ schedule_timeout_uninterruptible(cfs_time_seconds(1));
+
+ sem = qmt_sarr_rwsem(pool);
+ LASSERT(sem);
+ down_read(sem);
+ /* Hold this to be sure that OSTs from this pool
+ * can't do acquire/release.
+ *
+ * I guess below write semaphore could be a bottleneck
+ * as qmt_dqacq would be blocked trying to hold
+ * read_lock at qmt_pool_lookup->qti_pools_add.
+ * But on the other hand adding/removing OSTs to the pool is
+ * a rare operation. If finally this would be a problem,
+ * we can consider another approach. For example we can
+ * iterate through the POOL's lqes. Take lqe, hold lqe_write_lock
+ * and go through appropriate OSTs. I don't use this approach now
+ * as newly created pool hasn't lqes entries. So firstly we need
+ * to get this lqes from the global pool index file. This
+ * solution looks more complex, so leave it as it is. */
+ down_write(&pool->qpi_recalc_sem);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+
+ glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
+ if (IS_ERR(glbl_pool))
+ GOTO(out_env, rc = PTR_ERR(glbl_pool));
+
+ slaves_cnt = qmt_sarr_count(pool);
+ CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
+ slaves_cnt, pool->qpi_name);
+
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+ for (i = 0; i < slaves_cnt; i++) {
+ struct qmt_thread_info *qti = qmt_info(&env);
+ struct dt_object *slv_obj;
+ struct obd_uuid uuid;
+ int idx;
+
+ if (kthread_should_stop())
+ GOTO(out_stop, rc = 0);
+ idx = qmt_sarr_get_idx(pool, i);
+ LASSERT(idx >= 0);
+
+ /* We don't need fsname here - anyway
+ * lquota_disk_slv_filename ignores it. */
+ snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+ lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+ qtype);
+ /* look-up index file associated with acquiring slave */
+ slv_obj = lquota_disk_slv_find(&env,
+ glbl_pool->qpi_qmt->qmt_child,
+ glbl_pool->qpi_root,
+ &qti->qti_fid,
+ &uuid);
+ if (IS_ERR(slv_obj))
+ GOTO(out_stop, rc = PTR_ERR(slv_obj));
+
+ CDEBUG(D_QUOTA, "slv_obj is found %p for uuid %s\n",
+ slv_obj, uuid.uuid);
+ qmt_obj_recalc(&env, slv_obj, pool->qpi_site[qtype]);
+ dt_object_put(&env, slv_obj);
+ }
+ /* Now go trough the site hash and compare lqe_granted
+ * with lqe_calc_granted. Write new value if disagree */
+
+ cfs_hash_for_each(pool->qpi_site[qtype]->lqs_hash,
+ qmt_site_recalc_cb, &env);
+ }
+ GOTO(out_stop, rc);
+out_stop:
+ qpi_putref(&env, glbl_pool);
+out_env:
+ lu_env_fini(&env);
+out:
+ if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
+ /*
+ * Someone is waiting for us to stop - be sure not to exit
+ * before kthread_stop() gets a ref on the task. No event
+ * will happen on 'pool, this is just a convenient way to
+ * wait.
+ */
+ wait_var_event(pool, kthread_should_stop());
+
+ clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
+ /* Pool can't be changed, since sem has been down.
+ * Thus until up_read, no one can restart recalc thread. */
+ if (sem) {
+ up_read(sem);
+ up_write(&pool->qpi_recalc_sem);
+ }
+ qpi_putref(&env, pool);
+
+ return rc;
+}
+
+static int qmt_start_pool_recalc(struct lu_env *env, struct qmt_pool_info *qpi)
+{
+ struct task_struct *task;
+ int rc = 0;
+
+ if (!test_and_set_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags)) {
+ LASSERT(!qpi->qpi_recalc_task);
+
+ qpi_getref(qpi);
+ task = kthread_create(qmt_pool_recalc, qpi,
+ "qsd_reint_%s", qpi->qpi_name);
+ if (IS_ERR(task)) {
+ clear_bit(QPI_FLAG_RECALC_OFFSET, &qpi->qpi_flags);
+ rc = PTR_ERR(task);
+ qpi_putref(env, qpi);
+ } else {
+ qpi->qpi_recalc_task = task;
+ /* Using park/unpark to start the thread ensures that
+ * the thread function does get calls, so the
+ * ref on qpi will be dropped
+ */
+ kthread_park(task);
+ kthread_unpark(task);
+ }
+ }
+
+ RETURN(rc);
+}
+
+static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi)
+{
+ struct task_struct *task;
+
+ task = xchg(&qpi->qpi_recalc_task, NULL);
+ if (task)
+ kthread_stop(task);
+}
+
+static int qmt_pool_slv_nr_change(const struct lu_env *env,
+ struct qmt_pool_info *pool,
+ int idx, bool add)
+{
+ struct qmt_pool_info *glbl_pool;
+ int qtype;
+
+ glbl_pool = qmt_pool_lookup_glb(env, pool->qpi_qmt, LQUOTA_RES_DT);
+ if (IS_ERR(glbl_pool))
+ RETURN(PTR_ERR(glbl_pool));
+
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+ struct qmt_thread_info *qti = qmt_info(env);
+ struct dt_object *slv_obj;
+ struct obd_uuid uuid;
+
+ /* We don't need fsname here - anyway
+ * lquota_disk_slv_filename ignores it. */
+ snprintf(uuid.uuid, UUID_MAX, "-OST%04x_UUID", idx);
+ lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
+ qtype);
+ /* look-up index file associated with acquiring slave */
+ slv_obj = lquota_disk_slv_find(env,
+ glbl_pool->qpi_qmt->qmt_child,
+ glbl_pool->qpi_root,
+ &qti->qti_fid,
+ &uuid);
+ if (IS_ERR(slv_obj))
+ continue;
+
+ if (add)
+ pool->qpi_slv_nr[QMT_STYPE_OST][qtype]++;
+ else
+ pool->qpi_slv_nr[QMT_STYPE_OST][qtype]--;
+ dt_object_put(env, slv_obj);
+ }
+ qpi_putref(env, glbl_pool);
+
+ return 0;
+}
+
+static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
+ char *slavename, bool add)
+{
+ struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
+ struct qmt_pool_info *qpi;
+ struct lu_env env;
+ int rc, idx;
+ ENTRY;
+
+ if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+ RETURN(-ENAMETOOLONG);
+
+ CDEBUG(D_QUOTA, add ? "%s: pool %s, adding %s\n" :
+ "%s: pool %s, removing %s\n",
+ obd->obd_name, poolname, slavename);
+
+ rc = server_name2index(slavename, &idx, NULL);
+ if (rc != LDD_F_SV_TYPE_OST)
+ RETURN(-EINVAL);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (IS_ERR(qpi)) {
+ CWARN("%s: can't find pool %s: rc = %long\n",
+ obd->obd_name, poolname, PTR_ERR(qpi));
+ GOTO(out, rc = PTR_ERR(qpi));
+ }
+
+ rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
+ qmt_sarr_pool_rem(qpi, idx);
+ if (rc) {
+ CERROR("%s: can't %s %s pool %s: rc = %d\n",
+ add ? "add to" : "remove", obd->obd_name,
+ slavename, poolname, rc);
+ GOTO(out_putref, rc);
+ }
+ qmt_pool_slv_nr_change(&env, qpi, idx, add);
+ qmt_start_pool_recalc(&env, qpi);
+
+out_putref:
+ qpi_putref(&env, qpi);
+out:
+ lu_env_fini(&env);
+ RETURN(rc);
+}
+
+
+
+/**
+ * Add a single target device to the named pool.
+ *
+ * \param[in] obd OBD device on which to add the pool
+ * \param[in] poolname name of the pool to which to add the target \a slavename
+ * \param[in] slavename name of the target device to be added
+ *
+ * \retval 0 if \a slavename was (previously) added to the pool
+ * \retval negative error number on failure
+ */
+int qmt_pool_add(struct obd_device *obd, char *poolname, char *slavename)
+{
+ return qmt_pool_add_rem(obd, poolname, slavename, true);
+}
+
+/**
+ * Remove the named target from the specified pool.
+ *
+ * \param[in] obd OBD device from which to remove \a poolname
+ * \param[in] poolname name of the pool to be changed
+ * \param[in] slavename name of the target to remove from \a poolname
+ *
+ * \retval 0 on successfully removing \a slavename from the pool
+ * \retval negative number on error (e.g. \a slavename not in pool)
+ */
+int qmt_pool_rem(struct obd_device *obd, char *poolname, char *slavename)
+{
+ return qmt_pool_add_rem(obd, poolname, slavename, false);
+}
+
+/**
+ * Remove the named pool from the QMT device.
+ *
+ * \param[in] obd OBD device on which pool was previously created
+ * \param[in] poolname name of pool to remove from \a obd
+ *
+ * \retval 0 on successfully removing the pool
+ * \retval negative error numbers for failures
+ */
+int qmt_pool_del(struct obd_device *obd, char *poolname)
+{
+ struct qmt_device *qmt = lu2qmt_dev(obd->obd_lu_dev);
+ struct qmt_pool_info *qpi;
+ struct lu_fid fid;
+ char buf[LQUOTA_NAME_MAX];
+ struct lu_env env;
+ int rc;
+ int qtype;
+ ENTRY;
+
+ if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
+ RETURN(-ENAMETOOLONG);
+
+ CDEBUG(D_QUOTA, "Removing quota pool "LOV_POOLNAMEF"\n",
+ poolname);
+
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ /* look-up pool in charge of this global index FID */
+ qpi = qmt_pool_lookup_name(&env, qmt, LQUOTA_RES_DT, poolname);
+ if (IS_ERR(qpi)) {
+ /* Valid case for several MDTs at the same node -
+ * pool removed by the 1st MDT in config */
+ CDEBUG(D_QUOTA, "Cannot find pool %s\n", poolname);
+ lu_env_fini(&env);
+ RETURN(PTR_ERR(qpi));
+ }
+
+ for (qtype = 0; qtype < LL_MAXQUOTAS; qtype++) {
+ lquota_generate_fid(&fid, LQUOTA_RES_DT, qtype);
+ snprintf(buf, LQUOTA_NAME_MAX, "0x%x", fid.f_oid);
+ rc = local_object_unlink(&env, qmt->qmt_child,
+ qpi->qpi_root, buf);
+ if (rc)
+ CWARN("%s: cannot unlink %s from pool %s: rc = %d\n",
+ obd->obd_name, buf, poolname, rc);
+ }
+
+ /* put ref from look-up */
+ qpi_putref(&env, qpi);
+ /* put last ref to free qpi */
+ qpi_putref(&env, qpi);
+
+ snprintf(buf, LQUOTA_NAME_MAX, "%s-%s",
+ RES_NAME(LQUOTA_RES_DT), poolname);
+ rc = local_object_unlink(&env, qmt->qmt_child, qmt->qmt_root, buf);
+ if (rc)
+ CWARN("%s: cannot unlink dir %s: rc = %d\n",
+ obd->obd_name, poolname, rc);
+
+ lu_env_fini(&env);
+ RETURN(0);
+}
+
+static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
+{
+
+ /* No need to initialize sarray for global pool
+ * as it always includes all slaves */
+ if (qmt_pool_global(qpi))
+ return 0;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi)
+{
+ if (qmt_pool_global(qpi))
+ return 0;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ if (!qpi->qpi_sarr.osts.op_array)
+ return 0;
+ return lu_tgt_pool_free(&qpi->qpi_sarr.osts);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
+{
+ if (qmt_pool_global(qpi))
+ return 0;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
+ case LQUOTA_RES_MD:
+ default:
+ return 0;
+ }
+}
+
+struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ /* to protect ost_pool use */
+ return &qpi->qpi_sarr.osts.op_rw_sem;
+ case LQUOTA_RES_MD:
+ default:
+ return NULL;
+ }
+}
+
+int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
+{
+
+ if (qmt_pool_global(qpi))
+ return arr_idx;
+
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
+ "idx invalid %d op_count %d\n", arr_idx,
+ qpi->qpi_sarr.osts.op_count);
+ return qpi->qpi_sarr.osts.op_array[arr_idx];
+ case LQUOTA_RES_MD:
+ default:
+ return -EINVAL;
+ }
+}
+
+/* Number of slaves in a pool */
+unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
+{
+ switch (qpi->qpi_rtype) {
+ case LQUOTA_RES_DT:
+ return qpi->qpi_sarr.osts.op_count;
+ case LQUOTA_RES_MD:
+ default:
+ return -EINVAL;
+ }
+}