struct local_oid_storage *los,
struct ls_device *ls,
struct dt_object *parent,
- const char *name, __u32 mode)
+ const char *name, struct lu_attr *attr,
+ struct dt_object_format *dof)
{
struct dt_thread_info *dti = dt_info(env);
struct dt_object *dto;
if (dt_object_exists(dto))
GOTO(out, rc = -EEXIST);
- /* create the object */
- dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
- dti->dti_attr.la_mode = mode;
- dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT);
-
th = dt_trans_create(env, ls->ls_osd);
if (IS_ERR(th))
GOTO(out, rc = PTR_ERR(th));
- rc = local_object_declare_create(env, los, dto, &dti->dti_attr,
- &dti->dti_dof, th);
+ rc = local_object_declare_create(env, los, dto, attr, dof, th);
if (rc)
GOTO(trans_stop, rc);
CDEBUG(D_OTHER, "create new object "DFID"\n",
PFID(lu_object_fid(&dto->do_lu)));
- rc = local_object_create(env, los, dto, &dti->dti_attr,
- &dti->dti_dof, th);
+ rc = local_object_create(env, los, dto, attr, dof, th);
if (rc)
GOTO(unlock, rc);
LASSERT(dt_object_exists(dto));
dto = ERR_PTR(rc);
else {
rc = local_object_fid_generate(env, los, &dti->dti_fid);
- if (rc < 0)
+ if (rc < 0) {
dto = ERR_PTR(rc);
- else
+ } else {
+ /* create the object */
+ dti->dti_attr.la_valid = LA_MODE;
+ dti->dti_attr.la_mode = mode;
+ dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT);
dto = __local_file_create(env, &dti->dti_fid, los,
dt2ls_dev(los->los_dev),
- parent, name, mode);
+ parent, name, &dti->dti_attr,
+ &dti->dti_dof);
+ }
}
return dto;
}
struct ls_device *ls;
ls = ls_device_get(env, dt);
- if (IS_ERR(ls))
+ if (IS_ERR(ls)) {
dto = ERR_PTR(PTR_ERR(ls));
- else
+ } else {
+ /* create the object */
+ dti->dti_attr.la_valid = LA_MODE;
+ dti->dti_attr.la_mode = mode;
+ dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT);
dto = __local_file_create(env, fid, NULL, ls, parent,
- name, mode);
- ls_device_put(env, ls);
+ name, &dti->dti_attr,
+ &dti->dti_dof);
+ /* ls_device_put() will finalize the ls device, we
+ * have to open the object in other device stack */
+ if (!IS_ERR(dto)) {
+ dti->dti_fid = dto->do_lu.lo_header->loh_fid;
+ lu_object_put_nocache(env, &dto->do_lu);
+ dto = dt_locate(env, dt, &dti->dti_fid);
+ }
+ ls_device_put(env, ls);
+ }
}
return dto;
}
EXPORT_SYMBOL(local_file_find_or_create_with_fid);
+/*
+ * Look up and create (if it does not exist) a local named index file in parent
+ * directory.
+ */
+struct dt_object *local_index_find_or_create(const struct lu_env *env,
+ struct local_oid_storage *los,
+ struct dt_object *parent,
+ const char *name, __u32 mode,
+ const struct dt_index_features *ft)
+{
+ struct dt_thread_info *dti = dt_info(env);
+ struct dt_object *dto;
+ int rc;
+
+ LASSERT(parent);
+
+ rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+ if (rc == 0) {
+ /* name is found, get the object */
+ dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
+ } else if (rc != -ENOENT) {
+ dto = ERR_PTR(rc);
+ } else {
+ rc = local_object_fid_generate(env, los, &dti->dti_fid);
+ if (rc < 0) {
+ dto = ERR_PTR(rc);
+ } else {
+ /* create the object */
+ dti->dti_attr.la_valid = LA_MODE;
+ dti->dti_attr.la_mode = mode;
+ dti->dti_dof.dof_type = DFT_INDEX;
+ dti->dti_dof.u.dof_idx.di_feat = ft;
+ dto = __local_file_create(env, &dti->dti_fid, los,
+ dt2ls_dev(los->los_dev),
+ parent, name, &dti->dti_attr,
+ &dti->dti_dof);
+ }
+ }
+ return dto;
+
+}
+EXPORT_SYMBOL(local_index_find_or_create);
+
+struct dt_object *
+local_index_find_or_create_with_fid(const struct lu_env *env,
+ struct dt_device *dt,
+ const struct lu_fid *fid,
+ struct dt_object *parent,
+ const char *name, __u32 mode,
+ const struct dt_index_features *ft)
+{
+ struct dt_thread_info *dti = dt_info(env);
+ struct dt_object *dto;
+ int rc;
+
+ LASSERT(parent);
+
+ rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+ if (rc == 0) {
+ /* name is found, get the object */
+ if (!lu_fid_eq(fid, &dti->dti_fid))
+ dto = ERR_PTR(-EINVAL);
+ else
+ dto = dt_locate(env, dt, fid);
+ } else if (rc != -ENOENT) {
+ dto = ERR_PTR(rc);
+ } else {
+ struct ls_device *ls;
+
+ ls = ls_device_get(env, dt);
+ if (IS_ERR(ls)) {
+ dto = ERR_PTR(PTR_ERR(ls));
+ } else {
+ /* create the object */
+ dti->dti_attr.la_valid = LA_MODE;
+ dti->dti_attr.la_mode = mode;
+ dti->dti_dof.dof_type = DFT_INDEX;
+ dti->dti_dof.u.dof_idx.di_feat = ft;
+ dto = __local_file_create(env, fid, NULL, ls, parent,
+ name, &dti->dti_attr,
+ &dti->dti_dof);
+ /* ls_device_put() will finalize the ls device, we
+ * have to open the object in other device stack */
+ if (!IS_ERR(dto)) {
+ dti->dti_fid = dto->do_lu.lo_header->loh_fid;
+ lu_object_put_nocache(env, &dto->do_lu);
+ dto = dt_locate(env, dt, &dti->dti_fid);
+ }
+ ls_device_put(env, ls);
+ }
+ }
+ return dto;
+}
+EXPORT_SYMBOL(local_index_find_or_create_with_fid);
+
static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
{
struct local_oid_storage *los, *ret = NULL;
ls_device_put(env, ls);
}
EXPORT_SYMBOL(local_oid_storage_fini);
-
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu Yawei <yawei.niu@intel.com>
+ */
+
+/*
+ * The disk API is used by both the QMT and QSD to access/update on-disk index
+ * files. The API consists of the following functions:
+ *
+ * - lquota_disk_dir_find_create: look-up quota directory, create it if not
+ * found.
+ * - lquota_disk_glb_find_create: look-up global index file, create it if not
+ * found.
+ * - lquota_disk_slv_find: look-up a slave index file.
+ * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
+ * required and create the index file on disk if
+ * it does not exist.
+ * - lquota_disk_for_each_slv: iterate over all existing slave index files
+ * - lquota_disk_read: read quota settings from a index file
+ * - lquota_disk_declare_write: reserve credits to update a record in an index
+ * file
+ * - lquota_disk_write: update a record in an index file
+ * - lquota_disk_update_ver: update version of an index file
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include "lquota_internal.h"
+
+#define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
+
+/*
+ * Helper function looking up & creating if not found an index file with a
+ * dynamic fid.
+ */
+static struct dt_object *
+lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
+ struct dt_object *parent, struct lu_fid *fid,
+ const struct dt_index_features *idx_feat,
+ char *name)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_object *obj;
+ struct local_oid_storage *los;
+ int rc;
+ ENTRY;
+
+ /* Set up local storage */
+ rc = local_oid_storage_init(env, dev, fid, &los);
+ if (rc)
+ RETURN(ERR_PTR(rc));
+
+ /* lookup/create slave index file */
+ obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
+ idx_feat);
+ if (IS_ERR(obj))
+ GOTO(out, obj);
+
+ /* local_oid_storage_fini() will finalize the local storage device,
+ * we have to open the object in another device stack */
+ qti->qti_fid = obj->do_lu.lo_header->loh_fid;
+ lu_object_put_nocache(env, &obj->do_lu);
+ obj = dt_locate(env, dev, &qti->qti_fid);
+ if (IS_ERR(obj))
+ GOTO(out, obj);
+
+ /* install index operation vector */
+ if (obj->do_index_ops == NULL) {
+ rc = obj->do_ops->do_index_try(env, obj, idx_feat);
+ if (rc) {
+ CERROR("%s: fail to setup index operations for "DFID
+ " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&obj->do_lu)), rc);
+ lu_object_put(env, &obj->do_lu);
+ obj = ERR_PTR(rc);
+ }
+ }
+out:
+ local_oid_storage_fini(env, los);
+ RETURN(obj);
+}
+
+/*
+ * helper function to generate the filename associated with a slave index file
+ */
+static inline int lquota_disk_slv_filename(struct lu_fid *glb_fid,
+ struct obd_uuid *uuid,
+ char *filename)
+{
+ char *name, *uuid_str;
+
+ /* In most case, the uuid is NULL terminated */
+ if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
+ OBD_ALLOC(uuid_str, sizeof(*uuid));
+ if (uuid_str == NULL)
+ RETURN(-ENOMEM);
+ memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
+ } else {
+ uuid_str = (char *)uuid->uuid;
+ }
+
+ /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
+ * the filesystem name in case this one is changed in the future */
+ name = strrchr(uuid_str, '-');
+ if (name == NULL) {
+ name = strrchr(uuid_str, ':');
+ if (name == NULL) {
+ CERROR("Failed to extract extract filesystem "
+ "name from UUID %s\n", uuid_str);
+ if (uuid_str != uuid->uuid)
+ OBD_FREE(uuid_str, sizeof(*uuid));
+ return -EINVAL;
+ }
+ }
+ name++;
+
+ /* the filename is composed of the most signicant bits of the global
+ * FID, that's to say the oid which encodes the pool id, pool type and
+ * quota type, followed by the export UUID */
+ sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
+
+ if (uuid_str != uuid->uuid)
+ OBD_FREE(uuid_str, sizeof(*uuid));
+
+ return 0;
+}
+
+/*
+ * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
+ * QSD instance. This function is also used to create per-pool directory on
+ * the quota master.
+ * The directory is created with a local sequence if it does not exist already.
+ * This function is called at ->ldo_prepare time when the full device stack is
+ * configured.
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the dt_device where to create the quota directory
+ * \param parent - is the parent directory. If not specified, the directory
+ * will be created under the root directory
+ * \param name - is the name of quota directory to be created
+ *
+ * \retval - pointer to quota root dt_object on success, appropriate error
+ * on failure
+ */
+struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
+ struct dt_device *dev,
+ struct dt_object *parent,
+ const char *name)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_object *qt_dir = NULL;
+ struct local_oid_storage *los = NULL;
+ int rc;
+ ENTRY;
+
+ /* Set up local storage to create the quota directory.
+ * We use the sequence reserved for local named objects */
+ lu_local_name_obj_fid(&qti->qti_fid, 1);
+ rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
+ if (rc)
+ RETURN(ERR_PTR(rc));
+
+ if (parent == NULL) {
+ /* Fetch dt object associated with root directory */
+ rc = dt_root_get(env, dev, &qti->qti_fid);
+ if (rc)
+ GOTO(out, rc);
+
+ parent = dt_locate_at(env, dev, &qti->qti_fid,
+ dev->dd_lu_dev.ld_site->ls_top_dev);
+ if (IS_ERR(parent))
+ GOTO(out, rc = PTR_ERR(parent));
+ } else {
+ lu_object_get(&parent->do_lu);
+ }
+
+ /* create quota directory to be used for all quota index files */
+ qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
+ S_IRUGO | S_IWUSR | S_IXUGO);
+ if (IS_ERR(qt_dir))
+ GOTO(out, rc = PTR_ERR(qt_dir));
+
+ /* local_oid_storage_fini() will finalize the local storage device,
+ * we have to open the object in another device stack */
+ qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
+ lu_object_put_nocache(env, &qt_dir->do_lu);
+ qt_dir = dt_locate(env, dev, &qti->qti_fid);
+ if (IS_ERR(qt_dir))
+ GOTO(out, rc = PTR_ERR(qt_dir));
+
+ if (!dt_try_as_dir(env, qt_dir))
+ GOTO(out, rc = -ENOTDIR);
+ EXIT;
+out:
+ if (parent != NULL && !IS_ERR(parent))
+ lu_object_put(env, &parent->do_lu);
+ if (los != NULL)
+ local_oid_storage_fini(env, los);
+ if (rc) {
+ if (qt_dir != NULL && !IS_ERR(qt_dir))
+ lu_object_put(env, &qt_dir->do_lu);
+ qt_dir = ERR_PTR(rc);
+ }
+ return qt_dir;
+}
+
+/*
+ * Look-up/create a global index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \parap dev - is the dt_device where to lookup/create the global index file
+ * \param parent - is the parent directory where to create the global index if
+ * not found
+ * \param fid - is the fid of the global index to be looked up/created
+ * \parap local - indicates whether the index should be created with a local
+ * generated fid or with \fid
+ *
+ * \retval - pointer to the dt_object of the global index on success,
+ * appropriate error on failure
+ */
+struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
+ struct dt_device *dev,
+ struct dt_object *parent,
+ struct lu_fid *fid, bool local)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_object *glb_idx;
+ const struct dt_index_features *idx_feat;
+ ENTRY;
+
+ CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
+ local ? "local " : "", PFID(fid));
+
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0)
+ /* we use different index feature for each quota type and target type
+ * for the time being. This is done for on-disk conversion from the old
+ * quota format. Once this is no longer required, we should just be
+ * using dt_quota_glb_features for all global index file */
+ idx_feat = glb_idx_feature(fid);
+#else
+#warning "remove old quota compatibility code"
+ idx_feat = &dt_quota_glb_features;
+#endif
+
+ /* the filename is composed of the most signicant bits of the FID,
+ * that's to say the oid which encodes the pool id, pool type and quota
+ * type */
+ sprintf(qti->qti_buf, "0x%x", fid->f_oid);
+
+ if (local) {
+ /* We use the sequence reserved for local named objects */
+ lu_local_name_obj_fid(&qti->qti_fid, 1);
+ glb_idx = lquota_disk_find_create(env, dev, parent,
+ &qti->qti_fid, idx_feat,
+ qti->qti_buf);
+ } else {
+ /* look-up/create global index on disk */
+ glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
+ parent,
+ qti->qti_buf,
+ LQUOTA_MODE,
+ idx_feat);
+ }
+
+ if (IS_ERR(glb_idx))
+ CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
+ "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(fid), PTR_ERR(glb_idx), local);
+
+ RETURN(glb_idx);
+}
+
+/*
+ * Look-up a slave index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the backend dt_device where to look-up/create the slave index
+ * \param parent - is the parent directory where to lookup the slave index
+ * \param glb_fid - is the fid of the global index file associated with this
+ * slave index.
+ * \param uuid - is the uuid of slave which is (re)connecting to the master
+ * target
+ *
+ * \retval - pointer to the dt_object of the slave index on success,
+ * appropriate error on failure
+ */
+struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
+ struct dt_device *dev,
+ struct dt_object *parent,
+ struct lu_fid *glb_fid,
+ struct obd_uuid *uuid)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_object *slv_idx;
+ int rc;
+ ENTRY;
+
+ LASSERT(uuid != NULL);
+
+ CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
+ obd_uuid2str(uuid));
+
+ /* generate filename associated with the slave */
+ rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
+ if (rc)
+ RETURN(ERR_PTR(rc));
+
+ /* lookup slave index file */
+ rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
+ if (rc)
+ RETURN(ERR_PTR(rc));
+
+ /* name is found, get the object */
+ slv_idx = dt_locate(env, dev, &qti->qti_fid);
+ if (IS_ERR(slv_idx))
+ RETURN(slv_idx);
+
+ if (slv_idx->do_index_ops == NULL) {
+ rc = slv_idx->do_ops->do_index_try(env, slv_idx,
+ &dt_quota_slv_features);
+ if (rc) {
+ CERROR("%s: failed to setup slave index operations for "
+ "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+ obd_uuid2str(uuid), rc);
+ lu_object_put(env, &slv_idx->do_lu);
+ slv_idx = ERR_PTR(rc);
+ }
+ }
+
+ RETURN(slv_idx);
+}
+
+/*
+ * Look-up a slave index file. If the slave index isn't found:
+ * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
+ * create the index.
+ * - otherwise, we create the index file with a local reserved FID (see
+ * lquota_local_oid)
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the backend dt_device where to look-up/create the slave index
+ * \param parent - is the parent directory where to create the slave index if
+ * it does not exist already
+ * \param glb_fid - is the fid of the global index file associated with this
+ * slave index.
+ * \param uuid - is the uuid of slave which is (re)connecting to the master
+ * target
+ * \param local - indicate whether to use local reserved FID (LQUOTA_USR_OID
+ * & LQUOTA_GRP_OID) for the slave index creation or to
+ * allocate a new fid from sequence FID_SEQ_QUOTA
+ *
+ * \retval - pointer to the dt_object of the slave index on success,
+ * appropriate error on failure
+ */
+struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
+ struct dt_device *dev,
+ struct dt_object *parent,
+ struct lu_fid *glb_fid,
+ struct obd_uuid *uuid,
+ bool local)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_object *slv_idx;
+ int rc;
+ ENTRY;
+
+ LASSERT(uuid != NULL);
+
+ CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
+ obd_uuid2str(uuid));
+
+ /* generate filename associated with the slave */
+ rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
+ if (rc)
+ RETURN(ERR_PTR(rc));
+
+ /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
+ * through the network */
+ qti->qti_fid.f_seq = FID_SEQ_QUOTA;
+ qti->qti_fid.f_ver = 0;
+ if (local) {
+ int type;
+
+ rc = lquota_extract_fid(glb_fid, NULL, NULL, &type);
+ if (rc)
+ RETURN(ERR_PTR(rc));
+
+ /* use predefined fid in the reserved oid list */
+ qti->qti_fid.f_oid = (type == USRQUOTA) ? LQUOTA_USR_OID
+ : LQUOTA_GRP_OID;
+
+ slv_idx = local_index_find_or_create_with_fid(env, dev,
+ &qti->qti_fid,
+ parent,
+ qti->qti_buf,
+ LQUOTA_MODE,
+ &dt_quota_slv_features);
+ } else {
+ /* allocate fid dynamically if index does not exist already */
+ qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
+
+ /* lookup/create slave index file */
+ slv_idx = lquota_disk_find_create(env, dev, parent,
+ &qti->qti_fid,
+ &dt_quota_slv_features,
+ qti->qti_buf);
+ }
+
+ RETURN(slv_idx);
+}
+
+/*
+ * Iterate over all slave index files associated with global index \glb_fid and
+ * invoke a callback function for each slave index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \param parent - is the parent directory where the slave index files are
+ * stored
+ * \param glb_fid - is the fid of the global index file associated with the
+ * slave indexes to scan
+ * \param func - is the callback function to call each time a slave index
+ * file is found
+ * \param arg - is an opaq argument passed to the callback function \func
+ */
+int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
+ struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
+ void *arg)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_it *it;
+ const struct dt_it_ops *iops;
+ char *name;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC(name, sizeof("0x00000000-"));
+ if (name == NULL)
+ RETURN(-ENOMEM);
+
+ /* filename associated with slave index files are prefixed with the most
+ * signicant bits of the global FID */
+ sprintf(name, "0x%x-", glb_fid->f_oid);
+
+ iops = &parent->do_index_ops->dio_it;
+ it = iops->init(env, parent, 0, BYPASS_CAPA);
+ if (IS_ERR(it)) {
+ OBD_FREE(name, sizeof("0x00000000-"));
+ RETURN(PTR_ERR(it));
+ }
+
+ rc = iops->load(env, it, 0);
+ if (rc == 0) {
+ /*
+ * Iterator didn't find record with exactly the key requested.
+ *
+ * It is currently either
+ *
+ * - positioned above record with key less than
+ * requested---skip it.
+ *
+ * - or not positioned at all (is in IAM_IT_SKEWED
+ * state)---position it on the next item.
+ */
+ rc = iops->next(env, it);
+ } else if (rc > 0)
+ rc = 0;
+
+ while (rc == 0) {
+ struct dt_key *key;
+ int len;
+
+ len = iops->key_size(env, it);
+ /* IAM iterator can return record with zero len. */
+ if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
+ goto next;
+
+ key = iops->key(env, it);
+ if (IS_ERR(key)) {
+ rc = PTR_ERR(key);
+ break;
+ }
+
+ if (strncmp((char *)key, name, strlen(name)) != 0)
+ goto next;
+
+ /* ldiskfs OSD returns filename as stored in directory entry
+ * which does not end up with '\0' */
+ memcpy(&qti->qti_buf, key, len);
+ qti->qti_buf[len] = '\0';
+
+ /* lookup fid associated with this slave index file */
+ rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
+ if (rc)
+ break;
+
+ if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
+ goto next;
+
+ rc = func(env, glb_fid, (char *)key, &qti->qti_fid, arg);
+ if (rc)
+ break;
+next:
+ do {
+ rc = iops->next(env, it);
+ } while (rc == -ESTALE);
+ }
+
+ iops->put(env, it);
+ iops->fini(env, it);
+ OBD_FREE(name, sizeof("0x00000000-"));
+ if (rc > 0)
+ rc = 0;
+ RETURN(rc);
+}
+
+/*
+ * Retrieve quota settings from disk for a particular identifier.
+ *
+ * \param env - is the environment passed by the caller
+ * \param obj - is the on-disk index where quota settings are stored.
+ * \param id - is the key to be updated
+ * \param rec - is the output record where to store quota settings.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
+ union lquota_id *id, struct dt_rec *rec)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(dt_object_exists(obj));
+ LASSERT(obj->do_index_ops != NULL);
+
+ /* lookup on-disk record from index file */
+ dt_read_lock(env, obj, 0);
+ rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid,
+ BYPASS_CAPA);
+ dt_read_unlock(env, obj);
+
+ RETURN(rc);
+}
+
+/*
+ * Reserve enough credits to update a record in a quota index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \param th - is the transaction to use for disk writes
+ * \param obj - is the on-disk index where quota settings are stored.
+ * \param id - is the key to be updated
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
+ struct dt_object *obj, union lquota_id *id)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_key *key = (struct dt_key *)&id->qid_uid;
+ int rc;
+ ENTRY;
+
+ LASSERT(dt_object_exists(obj));
+ LASSERT(obj->do_index_ops != NULL);
+
+ /* speculative delete declaration in case there is already an existing
+ * record in the index */
+ rc = dt_declare_delete(env, obj, key, th);
+ if (rc)
+ RETURN(rc);
+
+ /* declare insertion of updated record */
+ rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
+ th);
+ if (rc)
+ RETURN(rc);
+
+ /* we might have to update the version of the global index too */
+ rc = dt_declare_version_set(env, obj, th);
+
+ RETURN(rc);
+}
+
+/*
+ * Update a record in a quota index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \param th - is the transaction to use for disk writes
+ * \param obj - is the on-disk index to be updated.
+ * \param id - is the key to be updated
+ * \param rec - is the input record containing the new quota settings.
+ * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
+ * \param ver - is the new version of the index if LQUOTA_SET_VER is set or is
+ * used to return the new version of the index when
+ * LQUOTA_BUMP_VER is set.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+int lquota_disk_write(const struct lu_env *env, struct thandle *th,
+ struct dt_object *obj, union lquota_id *id,
+ struct dt_rec *rec, __u32 flags, __u64 *ver)
+{
+ struct lquota_thread_info *qti = lquota_info(env);
+ struct dt_key *key = (struct dt_key *)&id->qid_uid;
+ int rc;
+ ENTRY;
+
+ LASSERT(dt_object_exists(obj));
+ LASSERT(obj->do_index_ops != NULL);
+
+ /* lock index */
+ dt_write_lock(env, obj, 0);
+
+ /* check whether there is already an existing record for this ID */
+ rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key,
+ BYPASS_CAPA);
+ if (rc == 0) {
+ /* delete existing record in order to replace it */
+ rc = dt_delete(env, obj, key, th, BYPASS_CAPA);
+ if (rc)
+ GOTO(out, rc);
+ } else if (rc == -ENOENT) {
+ /* probably first insert */
+ rc = 0;
+ } else {
+ GOTO(out, rc);
+ }
+
+ if (rec != NULL) {
+ /* insert record with updated quota settings */
+ rc = dt_insert(env, obj, rec, key, th, BYPASS_CAPA, 1);
+ if (rc) {
+ /* try to insert the old one */
+ rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
+ key, th, BYPASS_CAPA, 1);
+ LASSERTF(rc == 0, "failed to insert record in quota "
+ "index "DFID,
+ PFID(lu_object_fid(&obj->do_lu)));
+ GOTO(out, rc);
+ }
+ }
+
+ if (flags != 0) {
+ LASSERT(ver);
+ if (flags & LQUOTA_BUMP_VER) {
+ /* caller wants to bump the version, let's first read
+ * it */
+ *ver = dt_version_get(env, obj);
+ (*ver)++;
+ } else {
+ LASSERT(flags & LQUOTA_SET_VER);
+ }
+ dt_version_set(env, obj, *ver, th);
+ }
+
+ EXIT;
+out:
+ dt_write_unlock(env, obj);
+ return rc;
+}
+
+/*
+ * Update version of an index file
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the backend dt device storing the index file
+ * \param obj - is the on-disk index that should be updated
+ * \param ver - is the new version
+ */
+int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
+ struct dt_object *obj, __u64 ver)
+{
+ struct thandle *th;
+ int rc;
+ ENTRY;
+
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = dt_declare_version_set(env, obj, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc)
+ GOTO(out, rc);
+ th->th_sync = 1;
+
+ dt_version_set(env, obj, ver, th);
+ EXIT;
+out:
+ dt_trans_stop(env, dev, th);
+ return rc;
+}