Whamcloud - gitweb
LU-1842 quota: add quota disk operations
authorJohann Lombardi <johann@whamcloud.com>
Tue, 18 Sep 2012 20:18:14 +0000 (22:18 +0200)
committerOleg Drokin <green@whamcloud.com>
Tue, 25 Sep 2012 18:16:29 +0000 (14:16 -0400)
Add low level quota disk operations that will be used by both the
QMT (quota master) and QSD (quota slave) to manipulate on-disk
structures.

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: I783bf9519cf37fdfa35a04cff8207825a7ab1b49
Reviewed-on: http://review.whamcloud.com/4042
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <tappro@whamcloud.com>
lustre/include/dt_object.h
lustre/include/lquota.h
lustre/include/lustre_fid.h
lustre/obdclass/local_storage.c
lustre/osd-ldiskfs/osd_handler.c
lustre/quota/Makefile.in
lustre/quota/lquota_disk.c [new file with mode: 0644]
lustre/quota/lquota_internal.h
lustre/quota/lquota_lib.c
lustre/quota/qsd_internal.h
lustre/quota/qsd_lib.c

index a199087..7d2b97e 100644 (file)
@@ -843,6 +843,19 @@ struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
                                                     struct dt_object *parent,
                                                     const char *name,
                                                     __u32 mode);
                                                     struct dt_object *parent,
                                                     const char *name,
                                                     __u32 mode);
+struct dt_object *
+local_index_find_or_create(const struct lu_env *env,
+                          struct local_oid_storage *los,
+                          struct dt_object *parent,
+                          const char *name, __u32 mode,
+                          const struct dt_index_features *ft);
+struct dt_object *
+local_index_find_or_create_with_fid(const struct lu_env *env,
+                                   struct dt_device *dt,
+                                   const struct lu_fid *fid,
+                                   struct dt_object *parent,
+                                   const char *name, __u32 mode,
+                                   const struct dt_index_features *ft);
 
 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
                  const char *name, struct lu_fid *fid);
 
 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
                  const char *name, struct lu_fid *fid);
index 947e2bc..3052467 100644 (file)
@@ -41,6 +41,14 @@ union lquota_rec {
        struct lquota_acct_rec  lqr_acct_rec;
 };
 
        struct lquota_acct_rec  lqr_acct_rec;
 };
 
+/* Index features supported by the global index objects
+ * Only used for migration purpose and should be removed once on-disk migration
+ * is no longer needed */
+extern struct dt_index_features dt_quota_iusr_features;
+extern struct dt_index_features dt_quota_busr_features;
+extern struct dt_index_features dt_quota_igrp_features;
+extern struct dt_index_features dt_quota_bgrp_features;
+
 /* Name used in the configuration logs to identify the default metadata pool
  * (composed of all the MDTs, with pool ID 0) and the default data pool (all
  * the OSTs, with pool ID 0 too). */
 /* Name used in the configuration logs to identify the default metadata pool
  * (composed of all the MDTs, with pool ID 0) and the default data pool (all
  * the OSTs, with pool ID 0 too). */
index 56e2186..96ac595 100644 (file)
@@ -130,6 +130,13 @@ static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
         fid->f_ver = 0;
 }
 
         fid->f_ver = 0;
 }
 
+static inline void lu_local_name_obj_fid(struct lu_fid *fid, __u32 oid)
+{
+        fid->f_seq = FID_SEQ_LOCAL_NAME;
+        fid->f_oid = oid;
+        fid->f_ver = 0;
+}
+
 static inline int fid_is_otable_it(const struct lu_fid *fid)
 {
        return unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
 static inline int fid_is_otable_it(const struct lu_fid *fid)
 {
        return unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
index ca47645..26d6e18 100644 (file)
@@ -303,7 +303,8 @@ struct dt_object *__local_file_create(const struct lu_env *env,
                                      struct local_oid_storage *los,
                                      struct ls_device *ls,
                                      struct dt_object *parent,
                                      struct local_oid_storage *los,
                                      struct ls_device *ls,
                                      struct dt_object *parent,
-                                     const char *name, __u32 mode)
+                                     const char *name, struct lu_attr *attr,
+                                     struct dt_object_format *dof)
 {
        struct dt_thread_info   *dti = dt_info(env);
        struct dt_object        *dto;
 {
        struct dt_thread_info   *dti = dt_info(env);
        struct dt_object        *dto;
@@ -318,17 +319,11 @@ struct dt_object *__local_file_create(const struct lu_env *env,
        if (dt_object_exists(dto))
                GOTO(out, rc = -EEXIST);
 
        if (dt_object_exists(dto))
                GOTO(out, rc = -EEXIST);
 
-       /* create the object */
-       dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
-       dti->dti_attr.la_mode = mode;
-       dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT);
-
        th = dt_trans_create(env, ls->ls_osd);
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
        th = dt_trans_create(env, ls->ls_osd);
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
-       rc = local_object_declare_create(env, los, dto, &dti->dti_attr,
-                                        &dti->dti_dof, th);
+       rc = local_object_declare_create(env, los, dto, attr, dof, th);
        if (rc)
                GOTO(trans_stop, rc);
 
        if (rc)
                GOTO(trans_stop, rc);
 
@@ -351,8 +346,7 @@ struct dt_object *__local_file_create(const struct lu_env *env,
 
        CDEBUG(D_OTHER, "create new object "DFID"\n",
               PFID(lu_object_fid(&dto->do_lu)));
 
        CDEBUG(D_OTHER, "create new object "DFID"\n",
               PFID(lu_object_fid(&dto->do_lu)));
-       rc = local_object_create(env, los, dto, &dti->dti_attr,
-                                &dti->dti_dof, th);
+       rc = local_object_create(env, los, dto, attr, dof, th);
        if (rc)
                GOTO(unlock, rc);
        LASSERT(dt_object_exists(dto));
        if (rc)
                GOTO(unlock, rc);
        LASSERT(dt_object_exists(dto));
@@ -425,12 +419,18 @@ struct dt_object *local_file_find_or_create(const struct lu_env *env,
                dto = ERR_PTR(rc);
        else {
                rc = local_object_fid_generate(env, los, &dti->dti_fid);
                dto = ERR_PTR(rc);
        else {
                rc = local_object_fid_generate(env, los, &dti->dti_fid);
-               if (rc < 0)
+               if (rc < 0) {
                        dto = ERR_PTR(rc);
                        dto = ERR_PTR(rc);
-               else
+               } else {
+                       /* create the object */
+                       dti->dti_attr.la_valid  = LA_MODE;
+                       dti->dti_attr.la_mode   = mode;
+                       dti->dti_dof.dof_type   = dt_mode_to_dft(mode & S_IFMT);
                        dto = __local_file_create(env, &dti->dti_fid, los,
                                                  dt2ls_dev(los->los_dev),
                        dto = __local_file_create(env, &dti->dti_fid, los,
                                                  dt2ls_dev(los->los_dev),
-                                                 parent, name, mode);
+                                                 parent, name, &dti->dti_attr,
+                                                 &dti->dti_dof);
+               }
        }
        return dto;
 }
        }
        return dto;
 }
@@ -462,17 +462,125 @@ struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
                struct ls_device *ls;
 
                ls = ls_device_get(env, dt);
                struct ls_device *ls;
 
                ls = ls_device_get(env, dt);
-               if (IS_ERR(ls))
+               if (IS_ERR(ls)) {
                        dto = ERR_PTR(PTR_ERR(ls));
                        dto = ERR_PTR(PTR_ERR(ls));
-               else
+               } else {
+                       /* create the object */
+                       dti->dti_attr.la_valid  = LA_MODE;
+                       dti->dti_attr.la_mode   = mode;
+                       dti->dti_dof.dof_type   = dt_mode_to_dft(mode & S_IFMT);
                        dto = __local_file_create(env, fid, NULL, ls, parent,
                        dto = __local_file_create(env, fid, NULL, ls, parent,
-                                                 name, mode);
-               ls_device_put(env, ls);
+                                                 name, &dti->dti_attr,
+                                                 &dti->dti_dof);
+                       /* ls_device_put() will finalize the ls device, we
+                        * have to open the object in other device stack */
+                       if (!IS_ERR(dto)) {
+                               dti->dti_fid = dto->do_lu.lo_header->loh_fid;
+                               lu_object_put_nocache(env, &dto->do_lu);
+                               dto = dt_locate(env, dt, &dti->dti_fid);
+                       }
+                       ls_device_put(env, ls);
+               }
        }
        return dto;
 }
 EXPORT_SYMBOL(local_file_find_or_create_with_fid);
 
        }
        return dto;
 }
 EXPORT_SYMBOL(local_file_find_or_create_with_fid);
 
+/*
+ * Look up and create (if it does not exist) a local named index file in parent
+ * directory.
+ */
+struct dt_object *local_index_find_or_create(const struct lu_env *env,
+                                            struct local_oid_storage *los,
+                                            struct dt_object *parent,
+                                            const char *name, __u32 mode,
+                                            const struct dt_index_features *ft)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct dt_object        *dto;
+       int                      rc;
+
+       LASSERT(parent);
+
+       rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+       if (rc == 0) {
+               /* name is found, get the object */
+               dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
+       } else if (rc != -ENOENT) {
+               dto = ERR_PTR(rc);
+       } else {
+               rc = local_object_fid_generate(env, los, &dti->dti_fid);
+               if (rc < 0) {
+                       dto = ERR_PTR(rc);
+               } else {
+                       /* create the object */
+                       dti->dti_attr.la_valid          = LA_MODE;
+                       dti->dti_attr.la_mode           = mode;
+                       dti->dti_dof.dof_type           = DFT_INDEX;
+                       dti->dti_dof.u.dof_idx.di_feat  = ft;
+                       dto = __local_file_create(env, &dti->dti_fid, los,
+                                                 dt2ls_dev(los->los_dev),
+                                                 parent, name, &dti->dti_attr,
+                                                 &dti->dti_dof);
+               }
+       }
+       return dto;
+
+}
+EXPORT_SYMBOL(local_index_find_or_create);
+
+struct dt_object *
+local_index_find_or_create_with_fid(const struct lu_env *env,
+                                   struct dt_device *dt,
+                                   const struct lu_fid *fid,
+                                   struct dt_object *parent,
+                                   const char *name, __u32 mode,
+                                   const struct dt_index_features *ft)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct dt_object        *dto;
+       int                      rc;
+
+       LASSERT(parent);
+
+       rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+       if (rc == 0) {
+               /* name is found, get the object */
+               if (!lu_fid_eq(fid, &dti->dti_fid))
+                       dto = ERR_PTR(-EINVAL);
+               else
+                       dto = dt_locate(env, dt, fid);
+       } else if (rc != -ENOENT) {
+               dto = ERR_PTR(rc);
+       } else {
+               struct ls_device *ls;
+
+               ls = ls_device_get(env, dt);
+               if (IS_ERR(ls)) {
+                       dto = ERR_PTR(PTR_ERR(ls));
+               } else {
+                       /* create the object */
+                       dti->dti_attr.la_valid          = LA_MODE;
+                       dti->dti_attr.la_mode           = mode;
+                       dti->dti_dof.dof_type           = DFT_INDEX;
+                       dti->dti_dof.u.dof_idx.di_feat  = ft;
+                       dto = __local_file_create(env, fid, NULL, ls, parent,
+                                                 name, &dti->dti_attr,
+                                                 &dti->dti_dof);
+                       /* ls_device_put() will finalize the ls device, we
+                        * have to open the object in other device stack */
+                       if (!IS_ERR(dto)) {
+                               dti->dti_fid = dto->do_lu.lo_header->loh_fid;
+                               lu_object_put_nocache(env, &dto->do_lu);
+                               dto = dt_locate(env, dt, &dti->dti_fid);
+                       }
+                       ls_device_put(env, ls);
+               }
+       }
+       return dto;
+}
+EXPORT_SYMBOL(local_index_find_or_create_with_fid);
+
 static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
 {
        struct local_oid_storage *los, *ret = NULL;
 static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
 {
        struct local_oid_storage *los, *ret = NULL;
@@ -682,4 +790,3 @@ void local_oid_storage_fini(const struct lu_env *env,
        ls_device_put(env, ls);
 }
 EXPORT_SYMBOL(local_oid_storage_fini);
        ls_device_put(env, ls);
 }
 EXPORT_SYMBOL(local_oid_storage_fini);
-
index b1a2f17..32ca46e 100644 (file)
@@ -778,22 +778,6 @@ int osd_trans_start(const struct lu_env *env, struct dt_device *d,
                 lu_device_get(&d->dd_lu_dev);
                 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
                                              "osd-tx", th);
                 lu_device_get(&d->dd_lu_dev);
                 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
                                              "osd-tx", th);
-
-                /*
-                 * XXX: current rule is that we first start tx,
-                 *      then lock object(s), but we can't use
-                 *      this rule for data (due to locking specifics
-                 *      in ldiskfs). also in long-term we'd like to
-                 *      use usually-used (locks;tx) ordering. so,
-                 *      UGLY thing is that we'll use one ordering for
-                 *      data (ofd) and reverse ordering for metadata
-                 *      (mdd). then at some point we'll fix the latter
-                 */
-               if (dev->od_is_md) {
-                        LASSERT(oti->oti_r_locks == 0);
-                        LASSERT(oti->oti_w_locks == 0);
-                }
-
                 oti->oti_txns++;
                 rc = 0;
         } else {
                 oti->oti_txns++;
                 rc = 0;
         } else {
@@ -836,20 +820,6 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
 
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
 
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
-                /*
-                 * XXX: current rule is that we first start tx,
-                 *      then lock object(s), but we can't use
-                 *      this rule for data (due to locking specifics
-                 *      in ldiskfs). also in long-term we'd like to
-                 *      use usually-used (locks;tx) ordering. so,
-                 *      UGLY thing is that we'll use one ordering for
-                 *      data (ofd) and reverse ordering for metadata
-                 *      (mdd). then at some point we'll fix the latter
-                 */
-               if (osd_dt_dev(th->th_dev)->od_is_md) {
-                        LASSERT(oti->oti_r_locks == 0);
-                        LASSERT(oti->oti_w_locks == 0);
-                }
                 rc = dt_txn_hook_stop(env, th);
                 if (rc != 0)
                         CERROR("Failure in transaction hook: %d\n", rc);
                 rc = dt_txn_hook_stop(env, th);
                 if (rc != 0)
                         CERROR("Failure in transaction hook: %d\n", rc);
@@ -4741,8 +4711,8 @@ static int osd_process_config(const struct lu_env *env,
                 break;
         case LCFG_CLEANUP:
                lu_dev_del_linkage(d->ld_site, d);
                 break;
         case LCFG_CLEANUP:
                lu_dev_del_linkage(d->ld_site, d);
-               err = 0;
-                break;
+               err = osd_shutdown(env, o);
+               break;
         default:
                 err = -ENOSYS;
         }
         default:
                 err = -ENOSYS;
         }
@@ -4812,18 +4782,9 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
                        struct lu_device *dev)
 {
        struct osd_device *osd = osd_dev(dev);
                        struct lu_device *dev)
 {
        struct osd_device *osd = osd_dev(dev);
-       int                result;
+       int                result = 0;
        ENTRY;
 
        ENTRY;
 
-       /* 2. setup quota slave instance */
-       osd->od_quota_slave = qsd_init(env, osd->od_svname, &osd->od_dt_dev,
-                                      osd->od_proc_entry);
-       if (IS_ERR(osd->od_quota_slave)) {
-               result = PTR_ERR(osd->od_quota_slave);
-               osd->od_quota_slave = NULL;
-               RETURN(result);
-       }
-
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0)
        /* Unfortunately, the current MDD implementation relies on some specific
         * code to be executed in the OSD layer. Since OFD now also uses the OSD
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0)
        /* Unfortunately, the current MDD implementation relies on some specific
         * code to be executed in the OSD layer. Since OFD now also uses the OSD
@@ -4841,12 +4802,23 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
 #warning "all is_md checks must be removed from osd-ldiskfs"
 #endif
 
 #warning "all is_md checks must be removed from osd-ldiskfs"
 #endif
 
-        if (!osd->od_is_md)
-                RETURN(0);
+        if (osd->od_is_md) {
+               /* 1. setup local objects */
+               result = llo_local_objects_setup(env, lu2md_dev(pdev),
+                                                lu2dt_dev(dev));
+               if (result)
+                       RETURN(result);
+       }
 
 
-        /* 3. setup local objects */
-        result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
-        RETURN(result);
+       /* 2. setup quota slave instance */
+       osd->od_quota_slave = qsd_init(env, osd->od_svname, &osd->od_dt_dev,
+                                      osd->od_proc_entry);
+       if (IS_ERR(osd->od_quota_slave)) {
+               result = PTR_ERR(osd->od_quota_slave);
+               osd->od_quota_slave = NULL;
+       }
+
+       RETURN(result);
 }
 
 static const struct lu_object_operations osd_lu_obj_ops = {
 }
 
 static const struct lu_object_operations osd_lu_obj_ops = {
index 52c2e22..309f941 100644 (file)
@@ -1,7 +1,8 @@
 MODULES := lquota
 
 MODULES := lquota
 
-quota-objs := quota_check.o quota_context.o quota_ctl.o quota_interface.o
-quota-objs += quota_master.o quota_adjust_qunit.o lproc_quota.o lquota_lib.o
+quota-objs := lquota_lib.o lquota_disk.o lproc_quota.o
+quota-objs += quota_check.o quota_context.o quota_ctl.o quota_interface.o
+quota-objs += quota_master.o quota_adjust_qunit.o
 
 qsd-objs := qsd_lib.o
 
 
 qsd-objs := qsd_lib.o
 
diff --git a/lustre/quota/lquota_disk.c b/lustre/quota/lquota_disk.c
new file mode 100644 (file)
index 0000000..6421bc4
--- /dev/null
@@ -0,0 +1,723 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
+ */
+
+/*
+ * The disk API is used by both the QMT and QSD to access/update on-disk index
+ * files. The API consists of the following functions:
+ *
+ * - lquota_disk_dir_find_create: look-up quota directory, create it if not
+ *                               found.
+ * - lquota_disk_glb_find_create: look-up global index file, create it if not
+ *                               found.
+ * - lquota_disk_slv_find:       look-up a slave index file.
+ * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
+ *                               required and create the index file on disk if
+ *                               it does not exist.
+ * - lquota_disk_for_each_slv:   iterate over all existing slave index files
+ * - lquota_disk_read:           read quota settings from a index file
+ * - lquota_disk_declare_write:          reserve credits to update a record in an index
+ *                               file
+ * - lquota_disk_write:                  update a record in an index file
+ * - lquota_disk_update_ver:     update version of an index file
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include "lquota_internal.h"
+
+#define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
+
+/*
+ * Helper function looking up & creating if not found an index file with a
+ * dynamic fid.
+ */
+static struct dt_object *
+lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
+                       struct dt_object *parent, struct lu_fid *fid,
+                       const struct dt_index_features *idx_feat,
+                       char *name)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_object                *obj;
+       struct local_oid_storage        *los;
+       int                              rc;
+       ENTRY;
+
+       /* Set up local storage */
+       rc = local_oid_storage_init(env, dev, fid, &los);
+       if (rc)
+               RETURN(ERR_PTR(rc));
+
+       /* lookup/create slave index file */
+       obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
+                                        idx_feat);
+       if (IS_ERR(obj))
+               GOTO(out, obj);
+
+       /* local_oid_storage_fini() will finalize the local storage device,
+        * we have to open the object in another device stack */
+       qti->qti_fid = obj->do_lu.lo_header->loh_fid;
+       lu_object_put_nocache(env, &obj->do_lu);
+       obj = dt_locate(env, dev, &qti->qti_fid);
+       if (IS_ERR(obj))
+               GOTO(out, obj);
+
+       /* install index operation vector */
+       if (obj->do_index_ops == NULL) {
+               rc = obj->do_ops->do_index_try(env, obj, idx_feat);
+               if (rc) {
+                       CERROR("%s: fail to setup index operations for "DFID
+                              " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+                              PFID(lu_object_fid(&obj->do_lu)), rc);
+                       lu_object_put(env, &obj->do_lu);
+                       obj = ERR_PTR(rc);
+               }
+       }
+out:
+       local_oid_storage_fini(env, los);
+       RETURN(obj);
+}
+
+/*
+ * helper function to generate the filename associated with a slave index file
+ */
+static inline int lquota_disk_slv_filename(struct lu_fid *glb_fid,
+                                          struct obd_uuid *uuid,
+                                          char *filename)
+{
+       char    *name, *uuid_str;
+
+       /* In most case, the uuid is NULL terminated */
+       if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
+               OBD_ALLOC(uuid_str, sizeof(*uuid));
+               if (uuid_str == NULL)
+                       RETURN(-ENOMEM);
+               memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
+       } else {
+               uuid_str = (char *)uuid->uuid;
+       }
+
+       /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
+        * the filesystem name in case this one is changed in the future */
+       name = strrchr(uuid_str, '-');
+       if (name == NULL) {
+               name = strrchr(uuid_str, ':');
+               if (name == NULL) {
+                       CERROR("Failed to extract extract filesystem "
+                              "name from UUID %s\n", uuid_str);
+                       if (uuid_str != uuid->uuid)
+                               OBD_FREE(uuid_str, sizeof(*uuid));
+                       return -EINVAL;
+               }
+       }
+       name++;
+
+       /* the filename is composed of the most signicant bits of the global
+        * FID, that's to say the oid which encodes the pool id, pool type and
+        * quota type, followed by the export UUID */
+       sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
+
+       if (uuid_str != uuid->uuid)
+               OBD_FREE(uuid_str, sizeof(*uuid));
+
+       return 0;
+}
+
+/*
+ * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
+ * QSD instance. This function is also used to create per-pool directory on
+ * the quota master.
+ * The directory is created with a local sequence if it does not exist already.
+ * This function is called at ->ldo_prepare time when the full device stack is
+ * configured.
+ *
+ * \param env  - is the environment passed by the caller
+ * \param dev  - is the dt_device where to create the quota directory
+ * \param parent  - is the parent directory. If not specified, the directory
+ *                  will be created under the root directory
+ * \param name - is the name of quota directory to be created
+ *
+ * \retval     - pointer to quota root dt_object on success, appropriate error
+ *               on failure
+ */
+struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
+                                             struct dt_device *dev,
+                                             struct dt_object *parent,
+                                             const char *name)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_object                *qt_dir = NULL;
+       struct local_oid_storage        *los = NULL;
+       int                              rc;
+       ENTRY;
+
+       /* Set up local storage to create the quota directory.
+        * We use the sequence reserved for local named objects */
+       lu_local_name_obj_fid(&qti->qti_fid, 1);
+       rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
+       if (rc)
+               RETURN(ERR_PTR(rc));
+
+       if (parent == NULL) {
+               /* Fetch dt object associated with root directory */
+               rc = dt_root_get(env, dev, &qti->qti_fid);
+               if (rc)
+                       GOTO(out, rc);
+
+               parent = dt_locate_at(env, dev, &qti->qti_fid,
+                                     dev->dd_lu_dev.ld_site->ls_top_dev);
+               if (IS_ERR(parent))
+                       GOTO(out, rc = PTR_ERR(parent));
+       } else {
+               lu_object_get(&parent->do_lu);
+       }
+
+       /* create quota directory to be used for all quota index files */
+       qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
+                                          S_IRUGO | S_IWUSR | S_IXUGO);
+       if (IS_ERR(qt_dir))
+               GOTO(out, rc = PTR_ERR(qt_dir));
+
+       /* local_oid_storage_fini() will finalize the local storage device,
+        * we have to open the object in another device stack */
+       qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
+       lu_object_put_nocache(env, &qt_dir->do_lu);
+       qt_dir = dt_locate(env, dev, &qti->qti_fid);
+       if (IS_ERR(qt_dir))
+               GOTO(out, rc = PTR_ERR(qt_dir));
+
+       if (!dt_try_as_dir(env, qt_dir))
+               GOTO(out, rc = -ENOTDIR);
+       EXIT;
+out:
+       if (parent != NULL && !IS_ERR(parent))
+               lu_object_put(env, &parent->do_lu);
+       if (los != NULL)
+               local_oid_storage_fini(env, los);
+       if (rc) {
+               if (qt_dir != NULL && !IS_ERR(qt_dir))
+                       lu_object_put(env, &qt_dir->do_lu);
+               qt_dir = ERR_PTR(rc);
+       }
+       return qt_dir;
+}
+
+/*
+ * Look-up/create a global index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \parap dev - is the dt_device where to lookup/create the global index file
+ * \param parent - is the parent directory where to create the global index if
+ *                 not found
+ * \param fid - is the fid of the global index to be looked up/created
+ * \parap local - indicates whether the index should be created with a local
+ *                generated fid or with \fid
+ *
+ * \retval     - pointer to the dt_object of the global index on success,
+ *               appropriate error on failure
+ */
+struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
+                                             struct dt_device *dev,
+                                             struct dt_object *parent,
+                                             struct lu_fid *fid, bool local)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_object                *glb_idx;
+       const struct dt_index_features  *idx_feat;
+       ENTRY;
+
+       CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
+              local ? "local " : "", PFID(fid));
+
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0)
+       /* we use different index feature for each quota type and target type
+        * for the time being. This is done for on-disk conversion from the old
+        * quota format. Once this is no longer required, we should just be
+        * using dt_quota_glb_features for all global index file */
+       idx_feat = glb_idx_feature(fid);
+#else
+#warning "remove old quota compatibility code"
+       idx_feat = &dt_quota_glb_features;
+#endif
+
+       /* the filename is composed of the most signicant bits of the FID,
+        * that's to say the oid which encodes the pool id, pool type and quota
+        * type */
+       sprintf(qti->qti_buf, "0x%x", fid->f_oid);
+
+       if (local) {
+               /* We use the sequence reserved for local named objects */
+               lu_local_name_obj_fid(&qti->qti_fid, 1);
+               glb_idx = lquota_disk_find_create(env, dev, parent,
+                                                 &qti->qti_fid, idx_feat,
+                                                 qti->qti_buf);
+       } else {
+               /* look-up/create global index on disk */
+               glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
+                                                             parent,
+                                                             qti->qti_buf,
+                                                             LQUOTA_MODE,
+                                                             idx_feat);
+       }
+
+       if (IS_ERR(glb_idx))
+               CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
+                      "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+                      PFID(fid), PTR_ERR(glb_idx), local);
+
+       RETURN(glb_idx);
+}
+
+/*
+ * Look-up a slave index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the backend dt_device where to look-up/create the slave index
+ * \param parent - is the parent directory where to lookup the slave index
+ * \param glb_fid - is the fid of the global index file associated with this
+ *                  slave index.
+ * \param uuid    - is the uuid of slave which is (re)connecting to the master
+ *                  target
+ *
+ * \retval     - pointer to the dt_object of the slave index on success,
+ *               appropriate error on failure
+ */
+struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
+                                      struct dt_device *dev,
+                                      struct dt_object *parent,
+                                      struct lu_fid *glb_fid,
+                                      struct obd_uuid *uuid)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_object                *slv_idx;
+       int                              rc;
+       ENTRY;
+
+       LASSERT(uuid != NULL);
+
+       CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
+              obd_uuid2str(uuid));
+
+       /* generate filename associated with the slave */
+       rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
+       if (rc)
+               RETURN(ERR_PTR(rc));
+
+       /* lookup slave index file */
+       rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
+       if (rc)
+                RETURN(ERR_PTR(rc));
+
+       /* name is found, get the object */
+       slv_idx = dt_locate(env, dev, &qti->qti_fid);
+       if (IS_ERR(slv_idx))
+               RETURN(slv_idx);
+
+       if (slv_idx->do_index_ops == NULL) {
+               rc = slv_idx->do_ops->do_index_try(env, slv_idx,
+                                                  &dt_quota_slv_features);
+               if (rc) {
+                       CERROR("%s: failed to setup slave index operations for "
+                              "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
+                              obd_uuid2str(uuid), rc);
+                       lu_object_put(env, &slv_idx->do_lu);
+                       slv_idx = ERR_PTR(rc);
+               }
+       }
+
+       RETURN(slv_idx);
+}
+
+/*
+ * Look-up a slave index file. If the slave index isn't found:
+ * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
+ *   create the index.
+ * - otherwise, we create the index file with a local reserved FID (see
+ *   lquota_local_oid)
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the backend dt_device where to look-up/create the slave index
+ * \param parent - is the parent directory where to create the slave index if
+ *                 it does not exist already
+ * \param glb_fid - is the fid of the global index file associated with this
+ *                  slave index.
+ * \param uuid    - is the uuid of slave which is (re)connecting to the master
+ *                  target
+ * \param local   - indicate whether to use local reserved FID (LQUOTA_USR_OID
+ *                  & LQUOTA_GRP_OID) for the slave index creation or to
+ *                  allocate a new fid from sequence FID_SEQ_QUOTA
+ *
+ * \retval     - pointer to the dt_object of the slave index on success,
+ *               appropriate error on failure
+ */
+struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
+                                             struct dt_device *dev,
+                                             struct dt_object *parent,
+                                             struct lu_fid *glb_fid,
+                                             struct obd_uuid *uuid,
+                                             bool local)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_object                *slv_idx;
+       int                              rc;
+       ENTRY;
+
+       LASSERT(uuid != NULL);
+
+       CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
+              obd_uuid2str(uuid));
+
+       /* generate filename associated with the slave */
+       rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
+       if (rc)
+               RETURN(ERR_PTR(rc));
+
+       /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
+        * through the network */
+       qti->qti_fid.f_seq = FID_SEQ_QUOTA;
+       qti->qti_fid.f_ver = 0;
+       if (local) {
+               int type;
+
+               rc = lquota_extract_fid(glb_fid, NULL, NULL, &type);
+               if (rc)
+                       RETURN(ERR_PTR(rc));
+
+               /* use predefined fid in the reserved oid list */
+               qti->qti_fid.f_oid = (type == USRQUOTA) ? LQUOTA_USR_OID
+                                                       : LQUOTA_GRP_OID;
+
+               slv_idx = local_index_find_or_create_with_fid(env, dev,
+                                                             &qti->qti_fid,
+                                                             parent,
+                                                             qti->qti_buf,
+                                                             LQUOTA_MODE,
+                                                       &dt_quota_slv_features);
+       } else {
+               /* allocate fid dynamically if index does not exist already */
+               qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
+
+               /* lookup/create slave index file */
+               slv_idx = lquota_disk_find_create(env, dev, parent,
+                                                 &qti->qti_fid,
+                                                 &dt_quota_slv_features,
+                                                 qti->qti_buf);
+       }
+
+       RETURN(slv_idx);
+}
+
+/*
+ * Iterate over all slave index files associated with global index \glb_fid and
+ * invoke a callback function for each slave index file.
+ *
+ * \param env     - is the environment passed by the caller
+ * \param parent  - is the parent directory where the slave index files are
+ *                  stored
+ * \param glb_fid - is the fid of the global index file associated with the
+ *                  slave indexes to scan
+ * \param func    - is the callback function to call each time a slave index
+ *                  file is found
+ * \param arg     - is an opaq argument passed to the callback function \func
+ */
+int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
+                            struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
+                            void *arg)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_it                    *it;
+       const struct dt_it_ops          *iops;
+       char                            *name;
+       int                              rc;
+       ENTRY;
+
+       OBD_ALLOC(name, sizeof("0x00000000-"));
+       if (name == NULL)
+               RETURN(-ENOMEM);
+
+       /* filename associated with slave index files are prefixed with the most
+        * signicant bits of the global FID */
+       sprintf(name, "0x%x-", glb_fid->f_oid);
+
+       iops = &parent->do_index_ops->dio_it;
+       it = iops->init(env, parent, 0, BYPASS_CAPA);
+       if (IS_ERR(it)) {
+               OBD_FREE(name, sizeof("0x00000000-"));
+               RETURN(PTR_ERR(it));
+       }
+
+       rc = iops->load(env, it, 0);
+       if (rc == 0) {
+               /*
+                * Iterator didn't find record with exactly the key requested.
+                *
+                * It is currently either
+                *
+                *     - positioned above record with key less than
+                *     requested---skip it.
+                *
+                *     - or not positioned at all (is in IAM_IT_SKEWED
+                *     state)---position it on the next item.
+                */
+               rc = iops->next(env, it);
+       } else if (rc > 0)
+               rc = 0;
+
+       while (rc == 0) {
+               struct dt_key   *key;
+               int              len;
+
+               len = iops->key_size(env, it);
+               /* IAM iterator can return record with zero len. */
+               if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
+                       goto next;
+
+               key = iops->key(env, it);
+               if (IS_ERR(key)) {
+                       rc = PTR_ERR(key);
+                       break;
+               }
+
+               if (strncmp((char *)key, name, strlen(name)) != 0)
+                       goto next;
+
+               /* ldiskfs OSD returns filename as stored in directory entry
+                * which does not end up with '\0' */
+               memcpy(&qti->qti_buf, key, len);
+               qti->qti_buf[len] = '\0';
+
+               /* lookup fid associated with this slave index file */
+               rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
+               if (rc)
+                       break;
+
+               if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
+                       goto next;
+
+               rc = func(env, glb_fid, (char *)key, &qti->qti_fid, arg);
+               if (rc)
+                       break;
+next:
+               do {
+                       rc = iops->next(env, it);
+               } while (rc == -ESTALE);
+       }
+
+       iops->put(env, it);
+       iops->fini(env, it);
+       OBD_FREE(name, sizeof("0x00000000-"));
+       if (rc > 0)
+               rc = 0;
+       RETURN(rc);
+}
+
+/*
+ * Retrieve quota settings from disk for a particular identifier.
+ *
+ * \param env - is the environment passed by the caller
+ * \param obj - is the on-disk index where quota settings are stored.
+ * \param id  - is the key to be updated
+ * \param rec - is the output record where to store quota settings.
+ *
+ * \retval    - 0 on success, appropriate error on failure
+ */
+int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
+                    union lquota_id *id, struct dt_rec *rec)
+{
+       int     rc;
+       ENTRY;
+
+       LASSERT(dt_object_exists(obj));
+       LASSERT(obj->do_index_ops != NULL);
+
+       /* lookup on-disk record from index file */
+       dt_read_lock(env, obj, 0);
+       rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid,
+                      BYPASS_CAPA);
+       dt_read_unlock(env, obj);
+
+       RETURN(rc);
+}
+
+/*
+ * Reserve enough credits to update a record in a quota index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \param th  - is the transaction to use for disk writes
+ * \param obj - is the on-disk index where quota settings are stored.
+ * \param id  - is the key to be updated
+ *
+ * \retval    - 0 on success, appropriate error on failure
+ */
+int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
+                             struct dt_object *obj, union lquota_id *id)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
+       int                              rc;
+       ENTRY;
+
+       LASSERT(dt_object_exists(obj));
+       LASSERT(obj->do_index_ops != NULL);
+
+       /* speculative delete declaration in case there is already an existing
+        * record in the index */
+       rc = dt_declare_delete(env, obj, key, th);
+       if (rc)
+               RETURN(rc);
+
+       /* declare insertion of updated record */
+       rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
+                              th);
+       if (rc)
+               RETURN(rc);
+
+       /* we might have to update the version of the global index too */
+       rc = dt_declare_version_set(env, obj, th);
+
+       RETURN(rc);
+}
+
+/*
+ * Update a record in a quota index file.
+ *
+ * \param env - is the environment passed by the caller
+ * \param th  - is the transaction to use for disk writes
+ * \param obj - is the on-disk index to be updated.
+ * \param id  - is the key to be updated
+ * \param rec - is the input record containing the new quota settings.
+ * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
+ * \param ver   - is the new version of the index if LQUOTA_SET_VER is set or is
+ *                used to return the new version of the index when
+ *                LQUOTA_BUMP_VER is set.
+ *
+ * \retval    - 0 on success, appropriate error on failure
+ */
+int lquota_disk_write(const struct lu_env *env, struct thandle *th,
+                     struct dt_object *obj, union lquota_id *id,
+                     struct dt_rec *rec, __u32 flags, __u64 *ver)
+{
+       struct lquota_thread_info       *qti = lquota_info(env);
+       struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
+       int                              rc;
+       ENTRY;
+
+       LASSERT(dt_object_exists(obj));
+       LASSERT(obj->do_index_ops != NULL);
+
+       /* lock index */
+       dt_write_lock(env, obj, 0);
+
+       /* check whether there is already an existing record for this ID */
+       rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key,
+                      BYPASS_CAPA);
+       if (rc == 0) {
+               /* delete existing record in order to replace it */
+               rc = dt_delete(env, obj, key, th, BYPASS_CAPA);
+               if (rc)
+                       GOTO(out, rc);
+       } else if (rc == -ENOENT) {
+               /* probably first insert */
+               rc = 0;
+       } else {
+               GOTO(out, rc);
+       }
+
+       if (rec != NULL) {
+               /* insert record with updated quota settings */
+               rc = dt_insert(env, obj, rec, key, th, BYPASS_CAPA, 1);
+               if (rc) {
+                       /* try to insert the old one */
+                       rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
+                                      key, th, BYPASS_CAPA, 1);
+                       LASSERTF(rc == 0, "failed to insert record in quota "
+                                "index "DFID,
+                                PFID(lu_object_fid(&obj->do_lu)));
+                       GOTO(out, rc);
+               }
+       }
+
+       if (flags != 0) {
+               LASSERT(ver);
+               if (flags & LQUOTA_BUMP_VER) {
+                       /* caller wants to bump the version, let's first read
+                        * it */
+                       *ver = dt_version_get(env, obj);
+                       (*ver)++;
+               } else {
+                       LASSERT(flags & LQUOTA_SET_VER);
+               }
+               dt_version_set(env, obj, *ver, th);
+       }
+
+       EXIT;
+out:
+       dt_write_unlock(env, obj);
+       return rc;
+}
+
+/*
+ * Update version of an index file
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the backend dt device storing the index file
+ * \param obj - is the on-disk index that should be updated
+ * \param ver - is the new version
+ */
+int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
+                          struct dt_object *obj, __u64 ver)
+{
+       struct thandle  *th;
+       int              rc;
+       ENTRY;
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_version_set(env, obj, th);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(out, rc);
+       th->th_sync = 1;
+
+       dt_version_set(env, obj, ver, th);
+       EXIT;
+out:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
index 4e916f5..3db38e0 100644 (file)
@@ -65,6 +65,9 @@ struct lquota_thread_info {
 #define qti_acct_rec   qti_rec.lqr_acct_rec
 #define qti_slv_rec    qti_rec.lqr_slv_rec
 
 #define qti_acct_rec   qti_rec.lqr_acct_rec
 #define qti_slv_rec    qti_rec.lqr_slv_rec
 
+#define LQUOTA_BUMP_VER        0x1
+#define LQUOTA_SET_VER 0x2
+
 extern struct lu_context_key lquota_thread_key;
 
 /* extract lquota_threa_info context from environment */
 extern struct lu_context_key lquota_thread_key;
 
 /* extract lquota_threa_info context from environment */
@@ -74,6 +77,10 @@ struct lquota_thread_info *lquota_info(const struct lu_env *env)
        struct lquota_thread_info       *info;
 
        info = lu_context_key_get(&env->le_ctx, &lquota_thread_key);
        struct lquota_thread_info       *info;
 
        info = lu_context_key_get(&env->le_ctx, &lquota_thread_key);
+       if (info == NULL) {
+               lu_env_refill((struct lu_env *)env);
+               info = lu_context_key_get(&env->le_ctx, &lquota_thread_key);
+       }
        LASSERT(info);
        return info;
 }
        LASSERT(info);
        return info;
 }
@@ -83,6 +90,37 @@ struct dt_object *acct_obj_lookup(const struct lu_env *, struct dt_device *,
                                  int);
 void lquota_generate_fid(struct lu_fid *, int, int, int);
 int lquota_extract_fid(struct lu_fid *, int *, int *, int *);
                                  int);
 void lquota_generate_fid(struct lu_fid *, int, int, int);
 int lquota_extract_fid(struct lu_fid *, int *, int *, int *);
+const struct dt_index_features *glb_idx_feature(struct lu_fid *);
+
+/* lquota_disk.c */
+struct dt_object *lquota_disk_dir_find_create(const struct lu_env *,
+                                             struct dt_device *,
+                                             struct dt_object *, const char *);
+struct dt_object *lquota_disk_glb_find_create(const struct lu_env *,
+                                             struct dt_device *,
+                                             struct dt_object *,
+                                             struct lu_fid *, bool);
+struct dt_object *lquota_disk_slv_find_create(const struct lu_env *,
+                                             struct dt_device *,
+                                             struct dt_object *,
+                                             struct lu_fid *,
+                                             struct obd_uuid *, bool);
+typedef int (*lquota_disk_slv_cb_t) (const struct lu_env *, struct lu_fid *,
+                                    char *, struct lu_fid *, void *);
+int lquota_disk_for_each_slv(const struct lu_env *, struct dt_object *,
+                            struct lu_fid *, lquota_disk_slv_cb_t, void *);
+struct dt_object *lquota_disk_slv_find(const struct lu_env *,
+                                      struct dt_device *, struct dt_object *,
+                                      struct lu_fid *, struct obd_uuid *);
+int lquota_disk_read(const struct lu_env *, struct dt_object *,
+                    union lquota_id *, struct dt_rec *);
+int lquota_disk_declare_write(const struct lu_env *, struct thandle *,
+                             struct dt_object *, union lquota_id *);
+int lquota_disk_write(const struct lu_env *, struct thandle *,
+                     struct dt_object *, union lquota_id *, struct dt_rec *,
+                     __u32, __u64 *);
+int lquota_disk_update_ver(const struct lu_env *, struct dt_device *,
+                          struct dt_object *, __u64);
 
 /* lproc_quota.c */
 extern struct file_operations lprocfs_quota_seq_fops;
 
 /* lproc_quota.c */
 extern struct file_operations lprocfs_quota_seq_fops;
index 5278818..9d51049 100644 (file)
@@ -280,6 +280,49 @@ int lquota_extract_fid(struct lu_fid *fid, int *pool_id, int *pool_type,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0)
+/* Index features supported by the global index objects.
+ * We actually use one dt_index_features structure for each quota combination
+ * of quota type x [inode, block] to allow the ldiskfs OSD to recognize those
+ * objects and to handle the conversion from the old administrative quota file
+ * format */
+struct dt_index_features dt_quota_iusr_features;
+EXPORT_SYMBOL(dt_quota_iusr_features);
+struct dt_index_features dt_quota_busr_features;
+EXPORT_SYMBOL(dt_quota_busr_features);
+struct dt_index_features dt_quota_igrp_features;
+EXPORT_SYMBOL(dt_quota_igrp_features);
+struct dt_index_features dt_quota_bgrp_features;
+EXPORT_SYMBOL(dt_quota_bgrp_features);
+
+/**
+ * Helper routine returning the right index feature structure to be used
+ * depending on the FID of the global index.
+ */
+const struct dt_index_features *glb_idx_feature(struct lu_fid *fid)
+{
+       int     res_type, quota_type, rc;
+
+       rc = lquota_extract_fid(fid, NULL, &res_type, &quota_type);
+       if (rc)
+               return ERR_PTR(rc);
+
+       if (quota_type == USRQUOTA) {
+               if (res_type == LQUOTA_RES_MD)
+                       return &dt_quota_iusr_features;
+               else
+                       return &dt_quota_busr_features;
+       } else {
+               if (res_type == LQUOTA_RES_MD)
+                       return &dt_quota_igrp_features;
+               else
+                       return &dt_quota_bgrp_features;
+       }
+}
+#else
+#warning "remove old quota compatibility code"
+#endif
+
 static int __init init_lquota(void)
 {
        int     rc;
 static int __init init_lquota(void)
 {
        int     rc;
@@ -293,6 +336,13 @@ static int __init init_lquota(void)
        lquota_key_init_generic(&lquota_thread_key, NULL);
        lu_context_key_register(&lquota_thread_key);
 
        lquota_key_init_generic(&lquota_thread_key, NULL);
        lu_context_key_register(&lquota_thread_key);
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0)
+       dt_quota_iusr_features = dt_quota_busr_features = dt_quota_glb_features;
+       dt_quota_igrp_features = dt_quota_bgrp_features = dt_quota_glb_features;
+#else
+#warning "remove old quota compatibility code"
+#endif
+
        return 0;
 }
 
        return 0;
 }
 
index 6a4aa6e..2ffeb3f 100644 (file)
@@ -44,6 +44,9 @@ struct qsd_instance {
        /* name of service which created this qsd instance */
        char                     qsd_svname[MAX_OBD_NAME];
 
        /* name of service which created this qsd instance */
        char                     qsd_svname[MAX_OBD_NAME];
 
+       /* pool ID is always 0 for now */
+       int                      qsd_pool_id;
+
        /* dt_device associated with this qsd instance */
        struct dt_device        *qsd_dev;
 
        /* dt_device associated with this qsd instance */
        struct dt_device        *qsd_dev;
 
@@ -51,6 +54,9 @@ struct qsd_instance {
         * are exported */
        cfs_proc_dir_entry_t    *qsd_proc;
 
         * are exported */
        cfs_proc_dir_entry_t    *qsd_proc;
 
+       /* on-disk directory where to store index files for this qsd instance */
+       struct dt_object        *qsd_root;
+
        /* We create 2 quota slave instances:
         * - one for user quota
         * - one for group quota
        /* We create 2 quota slave instances:
         * - one for user quota
         * - one for group quota
@@ -58,6 +64,9 @@ struct qsd_instance {
         * This will have to be revisited if new quota types are added in the
         * future. For the time being, we can just use an array. */
        struct qsd_qtype_info   *qsd_type_array[MAXQUOTAS];
         * This will have to be revisited if new quota types are added in the
         * future. For the time being, we can just use an array. */
        struct qsd_qtype_info   *qsd_type_array[MAXQUOTAS];
+
+       unsigned long            qsd_is_md:1,    /* managing quota for mdt */
+                                qsd_stopping:1; /* qsd_instance is stopping */
 };
 
 /*
 };
 
 /*
@@ -70,11 +79,22 @@ struct qsd_qtype_info {
         * immutable after creation. */
        int                      qqi_qtype;
 
         * immutable after creation. */
        int                      qqi_qtype;
 
+       /* Global index FID to use for this quota type */
+       struct lu_fid            qqi_fid;
+
        /* back pointer to qsd device
         * immutable after creation. */
        struct qsd_instance     *qqi_qsd;
 
        /* Local index files storing quota settings for this quota type */
        struct dt_object        *qqi_acct_obj; /* accounting object */
        /* back pointer to qsd device
         * immutable after creation. */
        struct qsd_instance     *qqi_qsd;
 
        /* Local index files storing quota settings for this quota type */
        struct dt_object        *qqi_acct_obj; /* accounting object */
+       struct dt_object        *qqi_slv_obj;  /* slave index copy */
+       struct dt_object        *qqi_glb_obj;  /* global index copy */
+
+       /* Current object versions */
+       __u64                    qqi_slv_ver; /* slave index version */
+       __u64                    qqi_glb_ver; /* global index version */
 };
 };
+
+#define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
 #endif /* _QSD_INTERNAL_H */
 #endif /* _QSD_INTERNAL_H */
index 0ba8622..09efa51 100644 (file)
@@ -50,8 +50,11 @@ static int lprocfs_qsd_rd_state(char *page, char **start, off_t off,
 
        return snprintf(page, count,
                        "target name:    %s\n"
 
        return snprintf(page, count,
                        "target name:    %s\n"
+                       "pool ID:        %d\n"
+                       "type:           %s\n"
                        "quota enabled:  none\n",
                        "quota enabled:  none\n",
-                       qsd->qsd_svname);
+                       qsd->qsd_svname, qsd->qsd_pool_id,
+                       qsd->qsd_is_md ? "md" : "dt");
 }
 
 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
 }
 
 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
@@ -86,6 +89,20 @@ static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd,
                qqi->qqi_acct_obj = NULL;
        }
 
                qqi->qqi_acct_obj = NULL;
        }
 
+       /* release slv index */
+       if (qqi->qqi_slv_obj != NULL && !IS_ERR(qqi->qqi_slv_obj)) {
+               lu_object_put(env, &qqi->qqi_slv_obj->do_lu);
+               qqi->qqi_slv_obj = NULL;
+               qqi->qqi_slv_ver = 0;
+       }
+
+       /* release global index */
+       if (qqi->qqi_glb_obj != NULL && !IS_ERR(qqi->qqi_glb_obj)) {
+               lu_object_put(env, &qqi->qqi_glb_obj->do_lu);
+               qqi->qqi_glb_obj = NULL;
+               qqi->qqi_glb_ver = 0;
+       }
+
        OBD_FREE_PTR(qqi);
        EXIT;
 }
        OBD_FREE_PTR(qqi);
        EXIT;
 }
@@ -108,6 +125,7 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
 {
        struct qsd_qtype_info   *qqi;
        int                      rc;
 {
        struct qsd_qtype_info   *qqi;
        int                      rc;
+       struct obd_uuid          uuid;
        ENTRY;
 
        LASSERT(qsd->qsd_type_array[qtype] == NULL);
        ENTRY;
 
        LASSERT(qsd->qsd_type_array[qtype] == NULL);
@@ -121,6 +139,8 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
        /* set backpointer and other parameters */
        qqi->qqi_qsd   = qsd;
        qqi->qqi_qtype = qtype;
        /* set backpointer and other parameters */
        qqi->qqi_qsd   = qsd;
        qqi->qqi_qtype = qtype;
+       lquota_generate_fid(&qqi->qqi_fid, qsd->qsd_pool_id, QSD_RES_TYPE(qsd),
+                           qtype);
 
         /* open accounting object */
         LASSERT(qqi->qqi_acct_obj == NULL);
 
         /* open accounting object */
         LASSERT(qqi->qqi_acct_obj == NULL);
@@ -132,6 +152,34 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
        if (IS_ERR(qqi->qqi_acct_obj))
                qqi->qqi_acct_obj = NULL;
 
        if (IS_ERR(qqi->qqi_acct_obj))
                qqi->qqi_acct_obj = NULL;
 
+       /* open global index copy */
+       LASSERT(qqi->qqi_glb_obj == NULL);
+       qqi->qqi_glb_obj = lquota_disk_glb_find_create(env, qsd->qsd_dev,
+                                                      qsd->qsd_root,
+                                                      &qqi->qqi_fid, true);
+       if (IS_ERR(qqi->qqi_glb_obj)) {
+               CERROR("%s: can't open global index copy "DFID" %ld\n",
+                      qsd->qsd_svname, PFID(&qqi->qqi_fid),
+                      PTR_ERR(qqi->qqi_glb_obj));
+               GOTO(out, rc = PTR_ERR(qqi->qqi_glb_obj));
+       }
+       qqi->qqi_glb_ver = dt_version_get(env, qqi->qqi_glb_obj);
+
+       /* open slave index copy */
+       LASSERT(qqi->qqi_slv_obj == NULL);
+       obd_str2uuid(&uuid, qsd->qsd_svname);
+       qqi->qqi_slv_obj = lquota_disk_slv_find_create(env, qsd->qsd_dev,
+                                                      qsd->qsd_root,
+                                                      &qqi->qqi_fid, &uuid,
+                                                      true);
+       if (IS_ERR(qqi->qqi_slv_obj)) {
+               CERROR("%s: can't open slave index copy "DFID" %ld\n",
+                      qsd->qsd_svname, PFID(&qqi->qqi_fid),
+                      PTR_ERR(qqi->qqi_slv_obj));
+               GOTO(out, rc = PTR_ERR(qqi->qqi_slv_obj));
+       }
+       qqi->qqi_slv_ver = dt_version_get(env, qqi->qqi_slv_obj);
+
        /* register proc entry for accounting object */
        rc = lprocfs_seq_create(qsd->qsd_proc,
                                qtype == USRQUOTA ? "acct_user" : "acct_group",
        /* register proc entry for accounting object */
        rc = lprocfs_seq_create(qsd->qsd_proc,
                                qtype == USRQUOTA ? "acct_user" : "acct_group",
@@ -163,6 +211,9 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
        int     qtype;
        ENTRY;
 
        int     qtype;
        ENTRY;
 
+       CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname);
+       qsd->qsd_stopping = true;
+
        /* remove qsd proc entry */
        if (qsd->qsd_proc != NULL && !IS_ERR(qsd->qsd_proc)) {
                lprocfs_remove(&qsd->qsd_proc);
        /* remove qsd proc entry */
        if (qsd->qsd_proc != NULL && !IS_ERR(qsd->qsd_proc)) {
                lprocfs_remove(&qsd->qsd_proc);
@@ -173,6 +224,12 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
        for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
                qsd_qtype_fini(env, qsd, qtype);
 
        for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
                qsd_qtype_fini(env, qsd, qtype);
 
+       /* release quota root directory */
+       if (qsd->qsd_root != NULL && !IS_ERR(qsd->qsd_root)) {
+               lu_object_put(env, &qsd->qsd_root->do_lu);
+               qsd->qsd_root = NULL;
+       }
+
        /* release reference on dt_device */
        if (qsd->qsd_dev != NULL) {
                lu_ref_del(&qsd->qsd_dev->dd_lu_dev.ld_reference, "qsd", qsd);
        /* release reference on dt_device */
        if (qsd->qsd_dev != NULL) {
                lu_ref_del(&qsd->qsd_dev->dd_lu_dev.ld_reference, "qsd", qsd);
@@ -220,6 +277,24 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
        lu_ref_add(&dev->dd_lu_dev.ld_reference, "qsd", qsd);
        qsd->qsd_dev = dev;
 
        lu_ref_add(&dev->dd_lu_dev.ld_reference, "qsd", qsd);
        qsd->qsd_dev = dev;
 
+       /* we only support pool ID 0 (default data or metadata pool) for the
+        * time being. A different pool ID could be assigned to this target via
+        * the configuration log in the future */
+       qsd->qsd_pool_id  = 0;
+
+       /* Record whether this qsd instance is managing quota enforcement for a
+        * MDT (i.e. inode quota) or OST (block quota) */
+       qsd->qsd_is_md = lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev);
+
+       /* look-up on-disk directory for the quota slave */
+       qsd->qsd_root = lquota_disk_dir_find_create(env, dev, NULL, QSD_DIR);
+       if (IS_ERR(qsd->qsd_root)) {
+               rc = PTR_ERR(qsd->qsd_root);
+               CERROR("%s: failed to create quota slave root dir (%d)\n",
+                      svname, rc);
+               GOTO(out, rc);
+       }
+
        /* register procfs directory */
        qsd->qsd_proc = lprocfs_register(QSD_DIR, osd_proc,
                                         lprocfs_quota_qsd_vars, qsd);
        /* register procfs directory */
        qsd->qsd_proc = lprocfs_register(QSD_DIR, osd_proc,
                                         lprocfs_quota_qsd_vars, qsd);