Whamcloud - gitweb
LU-1301 lu: local objects library
authorAlex Zhuravlev <bzzz@whamcloud.com>
Thu, 6 Sep 2012 04:48:16 +0000 (08:48 +0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 17 Sep 2012 19:22:20 +0000 (15:22 -0400)
set of functions working on top of OSD API to create/access
local objects by name.
the library maintains own top device to be able to work in
multi-service environment (mds + mgs).

Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Change-Id: I26cc47b866bb0925be4f4419ac663a1d42520e02
Reviewed-on: http://review.whamcloud.com/3665
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
lustre/include/dt_object.h
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_disk.h
lustre/obdclass/Makefile.in
lustre/obdclass/dt_object.c
lustre/obdclass/local_storage.c [new file with mode: 0644]
lustre/obdclass/local_storage.h [new file with mode: 0644]
lustre/obdclass/lu_object.c
lustre/osd-ldiskfs/osd_handler.c

index 5455194..6cdd013 100644 (file)
@@ -674,6 +674,24 @@ struct dt_object {
         const struct dt_index_operations  *do_index_ops;
 };
 
+/*
+ * In-core representation of per-device local object OID storage
+ */
+struct local_oid_storage {
+       /* all initialized llog systems on this node linked by this */
+       cfs_list_t        los_list;
+
+       /* how many handle's reference this los has */
+       cfs_atomic_t      los_refcount;
+       struct dt_device *los_dev;
+       struct dt_object *los_obj;
+
+       /* data used to generate new fids */
+       cfs_mutex_t       los_id_lock;
+       __u64             los_seq;
+       __u32             los_last_oid;
+};
+
 static inline struct dt_object *lu2dt(struct lu_object *l)
 {
         LASSERT(l == NULL || IS_ERR(l) || lu_device_is_dt(l->lo_dev));
@@ -783,9 +801,50 @@ struct dt_object *dt_find_or_create(const struct lu_env *env,
                                     struct dt_object_format *dof,
                                     struct lu_attr *attr);
 
-struct dt_object *dt_locate(const struct lu_env *env,
-                            struct dt_device *dev,
-                            const struct lu_fid *fid);
+struct dt_object *dt_locate_at(const struct lu_env *env,
+                              struct dt_device *dev,
+                              const struct lu_fid *fid,
+                              struct lu_device *top_dev);
+static inline struct dt_object *
+dt_locate(const struct lu_env *env, struct dt_device *dev,
+         const struct lu_fid *fid)
+{
+       return dt_locate_at(env, dev, fid, dev->dd_lu_dev.ld_site->ls_top_dev);
+}
+
+
+int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
+                          const struct lu_fid *first_fid,
+                          struct local_oid_storage **los);
+void local_oid_storage_fini(const struct lu_env *env,
+                           struct local_oid_storage *los);
+int local_object_fid_generate(const struct lu_env *env,
+                             struct local_oid_storage *los,
+                             struct lu_fid *fid);
+int local_object_declare_create(const struct lu_env *env,
+                               struct local_oid_storage *los,
+                               struct dt_object *o,
+                               struct lu_attr *attr,
+                               struct dt_object_format *dof,
+                               struct thandle *th);
+int local_object_create(const struct lu_env *env,
+                       struct local_oid_storage *los,
+                       struct dt_object *o,
+                       struct lu_attr *attr, struct dt_object_format *dof,
+                       struct thandle *th);
+struct dt_object *local_file_find_or_create(const struct lu_env *env,
+                                           struct local_oid_storage *los,
+                                           struct dt_object *parent,
+                                           const char *name, __u32 mode);
+struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
+                                                    struct dt_device *dt,
+                                                    const struct lu_fid *fid,
+                                                    struct dt_object *parent,
+                                                    const char *name,
+                                                    __u32 mode);
+
+int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
+                 const char *name, struct lu_fid *fid);
 
 static inline int dt_object_sync(const struct lu_env *env,
                                  struct dt_object *o)
@@ -1315,4 +1374,32 @@ static inline int dt_lookup(const struct lu_env *env,
 
 #define LU221_BAD_TIME (0x80000000U + 24 * 3600)
 
+struct dt_find_hint {
+       struct lu_fid        *dfh_fid;
+       struct dt_device     *dfh_dt;
+       struct dt_object     *dfh_o;
+};
+
+struct dt_thread_info {
+       char                     dti_buf[DT_MAX_PATH];
+       struct dt_find_hint      dti_dfh;
+       struct lu_attr           dti_attr;
+       struct lu_fid            dti_fid;
+       struct dt_object_format  dti_dof;
+       struct lustre_mdt_attrs  dti_lma;
+       struct lu_buf            dti_lb;
+       loff_t                   dti_off;
+};
+
+extern struct lu_context_key dt_key;
+
+static inline struct dt_thread_info *dt_info(const struct lu_env *env)
+{
+       struct dt_thread_info *dti;
+
+       dti = lu_context_key_get(&env->le_ctx, &dt_key);
+       LASSERT(dti);
+       return dti;
+}
+
 #endif /* __LUSTRE_DT_OBJECT_H */
index fb7d0b3..a2d31ae 100644 (file)
@@ -719,6 +719,7 @@ static inline int lu_object_is_dying(const struct lu_object_header *h)
 }
 
 void lu_object_put(const struct lu_env *env, struct lu_object *o);
+void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o);
 
 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr);
 
index d051631..0622b44 100644 (file)
@@ -418,12 +418,16 @@ enum fid_seq {
         FID_SEQ_IDIF_MAX   = 0x1ffffffffULL,
         /* Normal FID sequence starts from this value, i.e. 1<<33 */
         FID_SEQ_START      = 0x200000000ULL,
+       /* sequence for local pre-defined FIDs listed in local_oid */
         FID_SEQ_LOCAL_FILE = 0x200000001ULL,
         FID_SEQ_DOT_LUSTRE = 0x200000002ULL,
         /* XXX 0x200000003ULL is reserved for FID_SEQ_LLOG_OBJ */
+       /* sequence is used for local named objects FIDs generated
+        * by local_object_storage library */
         FID_SEQ_SPECIAL    = 0x200000004ULL,
         FID_SEQ_QUOTA      = 0x200000005ULL,
         FID_SEQ_QUOTA_GLB  = 0x200000006ULL,
+       FID_SEQ_LOCAL_NAME = 0x200000007ULL,
         FID_SEQ_NORMAL     = 0x200000400ULL,
         FID_SEQ_LOV_DEFAULT= 0xffffffffffffffffULL
 };
index f91b21c..487fdaf 100644 (file)
@@ -523,6 +523,16 @@ struct lustre_mount_info {
         cfs_list_t            lmi_list_chain;
 };
 
+/* on-disk structure describing local object OIDs storage
+ * the structure to be used with any sequence managed by
+ * local object library */
+struct los_ondisk {
+       __u32 lso_magic;
+       __u32 lso_next_oid;
+};
+
+#define LOS_MAGIC      0xdecafbee
+
 /****************** prototypes *********************/
 
 #ifdef __KERNEL__
index 3a6944c..f303eae 100644 (file)
@@ -10,6 +10,7 @@ sources:
 obdclass-all-objs := llog.o llog_cat.o llog_lvfs.o llog_obd.o llog_swab.o
 obdclass-all-objs += class_obd.o debug.o genops.o uuid.o llog_ioctl.o
 obdclass-all-objs += lprocfs_status.o lprocfs_jobstats.o lustre_handles.o lustre_peer.o
+obdclass-all-objs += local_storage.o
 obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o mea.o
 obdclass-all-objs += lu_object.o dt_object.o capa.o lu_time.o
 obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o
@@ -26,6 +27,6 @@ $(obj)/llog-test.c: $(obj)/llog_test.c
        ln -sf $< $@
 
 EXTRA_DIST  = $(filter-out llog-test.c,$(obdclass-all-objs:.o=.c)) $(llog-test-objs:.o=.c) llog_test.c llog_internal.h
-EXTRA_DIST += cl_internal.h
+EXTRA_DIST += cl_internal.h local_storage.h
 
 @INCLUDE_RULES@
index c231814..11a8984 100644 (file)
 
 #include <lquota.h>
 
-struct dt_find_hint {
-        struct lu_fid        *dfh_fid;
-        struct dt_device     *dfh_dt;
-        struct dt_object     *dfh_o;
-};
-
-struct dt_thread_info {
-        char                    dti_buf[DT_MAX_PATH];
-        struct dt_find_hint     dti_dfh;
-};
-
 /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
 LU_KEY_INIT(dt_global, struct dt_thread_info);
 LU_KEY_FINI(dt_global, struct dt_thread_info);
 
-static struct lu_context_key dt_key = {
+struct lu_context_key dt_key = {
         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL,
         .lct_init = dt_global_key_init,
         .lct_fini = dt_global_key_fini
 };
+EXPORT_SYMBOL(dt_key);
 
 /* no lock is necessary to protect the list, because call-backs
  * are added during system startup. Please refer to "struct dt_device".
@@ -221,26 +211,29 @@ int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
         return -ENOTDIR;
 }
 EXPORT_SYMBOL(dt_lookup_dir);
-/**
- * get object for given \a fid.
- */
-struct dt_object *dt_locate(const struct lu_env *env,
-                            struct dt_device *dev,
-                            const struct lu_fid *fid)
+
+/* this differs from dt_locate by top_dev as parameter
+ * but not one from lu_site */
+struct dt_object *dt_locate_at(const struct lu_env *env,
+                              struct dt_device *dev, const struct lu_fid *fid,
+                              struct lu_device *top_dev)
 {
-        struct lu_object *obj;
-        struct dt_object *dt;
-
-        obj = lu_object_find(env, &dev->dd_lu_dev, fid, NULL);
-        if (!IS_ERR(obj)) {
-                obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
-                LASSERT(obj != NULL);
-                dt = container_of(obj, struct dt_object, do_lu);
-        } else
-                dt = (struct dt_object *)obj;
-        return dt;
+       struct lu_object *lo, *n;
+       ENTRY;
+
+       lo = lu_object_find_at(env, top_dev, fid, NULL);
+       if (IS_ERR(lo))
+               return (void *)lo;
+
+       LASSERT(lo != NULL);
+
+       cfs_list_for_each_entry(n, &lo->lo_header->loh_layers, lo_linkage) {
+               if (n->lo_dev == &dev->dd_lu_dev)
+                       return container_of0(n, struct dt_object, do_lu);
+       }
+       return ERR_PTR(-ENOENT);
 }
-EXPORT_SYMBOL(dt_locate);
+EXPORT_SYMBOL(dt_locate_at);
 
 /**
  * find a object named \a entry in given \a dfh->dfh_o directory.
@@ -298,12 +291,12 @@ static struct dt_object *dt_store_resolve(const struct lu_env *env,
                                           const char *path,
                                           struct lu_fid *fid)
 {
-        struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
-                                                         &dt_key);
-        struct dt_find_hint *dfh = &info->dti_dfh;
-        struct dt_object     *obj;
-        char *local = info->dti_buf;
-        int result;
+       struct dt_thread_info *info = dt_info(env);
+       struct dt_find_hint   *dfh = &info->dti_dfh;
+       struct dt_object      *obj;
+       char                  *local = info->dti_buf;
+       int                    result;
+
 
         dfh->dfh_dt = dt;
         dfh->dfh_fid = fid;
diff --git a/lustre/obdclass/local_storage.c b/lustre/obdclass/local_storage.c
new file mode 100644 (file)
index 0000000..ca47645
--- /dev/null
@@ -0,0 +1,685 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Whamcloud, Inc.
+ */
+/*
+ * lustre/obdclass/local_storage.c
+ *
+ * Local storage for file/objects with fid generation. Works on top of OSD.
+ *
+ * Author: Mikhail Pershin <mike.pershin@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include "local_storage.h"
+
+/* all initialized local storages on this node are linked on this */
+static CFS_LIST_HEAD(ls_list_head);
+static CFS_DEFINE_MUTEX(ls_list_mutex);
+
+static int ls_object_init(const struct lu_env *env, struct lu_object *o,
+                         const struct lu_object_conf *unused)
+{
+       struct ls_device        *ls;
+       struct lu_object        *below;
+       struct lu_device        *under;
+
+       ENTRY;
+
+       ls = container_of0(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev);
+       under = &ls->ls_osd->dd_lu_dev;
+       below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
+       if (below == NULL)
+               RETURN(-ENOMEM);
+
+       lu_object_add(o, below);
+
+       RETURN(0);
+}
+
+static void ls_object_free(const struct lu_env *env, struct lu_object *o)
+{
+       struct ls_object        *obj = lu2ls_obj(o);
+       struct lu_object_header *h = o->lo_header;
+
+       dt_object_fini(&obj->ls_obj);
+       lu_object_header_fini(h);
+       OBD_FREE_PTR(obj);
+}
+
+struct lu_object_operations ls_lu_obj_ops = {
+       .loo_object_init  = ls_object_init,
+       .loo_object_free  = ls_object_free,
+};
+
+struct lu_object *ls_object_alloc(const struct lu_env *env,
+                                 const struct lu_object_header *_h,
+                                 struct lu_device *d)
+{
+       struct lu_object_header *h;
+       struct ls_object        *o;
+       struct lu_object        *l;
+
+       LASSERT(_h == NULL);
+
+       OBD_ALLOC_PTR(o);
+       if (o != NULL) {
+               l = &o->ls_obj.do_lu;
+               h = &o->ls_header;
+
+               lu_object_header_init(h);
+               dt_object_init(&o->ls_obj, h, d);
+               lu_object_add_top(h, l);
+
+               l->lo_ops = &ls_lu_obj_ops;
+
+               return l;
+       } else {
+               return NULL;
+       }
+}
+
+static struct lu_device_operations ls_lu_dev_ops = {
+       .ldo_object_alloc =     ls_object_alloc
+};
+
+static struct ls_device *__ls_find_dev(struct dt_device *dev)
+{
+       struct ls_device *ls, *ret = NULL;
+
+       cfs_list_for_each_entry(ls, &ls_list_head, ls_linkage) {
+               if (ls->ls_osd == dev) {
+                       cfs_atomic_inc(&ls->ls_refcount);
+                       ret = ls;
+                       break;
+               }
+       }
+       return ret;
+}
+
+struct ls_device *ls_find_dev(struct dt_device *dev)
+{
+       struct ls_device *ls;
+
+       cfs_mutex_lock(&ls_list_mutex);
+       ls = __ls_find_dev(dev);
+       cfs_mutex_unlock(&ls_list_mutex);
+
+       return ls;
+}
+
+static struct lu_device_type_operations ls_device_type_ops = {
+       .ldto_start = NULL,
+       .ldto_stop  = NULL,
+};
+
+static struct lu_device_type ls_lu_type = {
+       .ldt_name = "local_storage",
+       .ldt_ops  = &ls_device_type_ops,
+};
+
+static struct ls_device *ls_device_get(const struct lu_env *env,
+                                      struct dt_device *dev)
+{
+       struct ls_device *ls;
+
+       ENTRY;
+
+       cfs_mutex_lock(&ls_list_mutex);
+       ls = __ls_find_dev(dev);
+       if (ls)
+               GOTO(out_ls, ls);
+
+       /* not found, then create */
+       OBD_ALLOC_PTR(ls);
+       if (ls == NULL)
+               GOTO(out_ls, ls = ERR_PTR(-ENOMEM));
+
+       cfs_atomic_set(&ls->ls_refcount, 1);
+       CFS_INIT_LIST_HEAD(&ls->ls_los_list);
+       cfs_mutex_init(&ls->ls_los_mutex);
+
+       ls->ls_osd = dev;
+
+       LASSERT(dev->dd_lu_dev.ld_site);
+       lu_device_init(&ls->ls_top_dev.dd_lu_dev, &ls_lu_type);
+       ls->ls_top_dev.dd_lu_dev.ld_ops = &ls_lu_dev_ops;
+       ls->ls_top_dev.dd_lu_dev.ld_site = dev->dd_lu_dev.ld_site;
+
+       /* finally add ls to the list */
+       cfs_list_add(&ls->ls_linkage, &ls_list_head);
+out_ls:
+       cfs_mutex_unlock(&ls_list_mutex);
+       RETURN(ls);
+}
+
+static void ls_device_put(const struct lu_env *env, struct ls_device *ls)
+{
+       LASSERT(env);
+       if (!cfs_atomic_dec_and_test(&ls->ls_refcount))
+               return;
+
+       cfs_mutex_lock(&ls_list_mutex);
+       if (cfs_atomic_read(&ls->ls_refcount) == 0) {
+               LASSERT(cfs_list_empty(&ls->ls_los_list));
+               cfs_list_del(&ls->ls_linkage);
+               lu_site_purge(env, ls->ls_top_dev.dd_lu_dev.ld_site, ~0);
+               lu_device_fini(&ls->ls_top_dev.dd_lu_dev);
+               OBD_FREE_PTR(ls);
+       }
+       cfs_mutex_unlock(&ls_list_mutex);
+}
+
+/**
+ * local file fid generation
+ */
+int local_object_fid_generate(const struct lu_env *env,
+                             struct local_oid_storage *los,
+                             struct lu_fid *fid)
+{
+       LASSERT(los->los_dev);
+       LASSERT(los->los_obj);
+
+       /* take next OID */
+
+       /* to make it unique after reboot we store
+        * the latest generated fid atomically with
+        * object creation see local_object_create() */
+
+       cfs_mutex_lock(&los->los_id_lock);
+       fid->f_seq = los->los_seq;
+       fid->f_oid = los->los_last_oid++;
+       fid->f_ver = 0;
+       cfs_mutex_unlock(&los->los_id_lock);
+
+       return 0;
+}
+
+int local_object_declare_create(const struct lu_env *env,
+                               struct local_oid_storage *los,
+                               struct dt_object *o, struct lu_attr *attr,
+                               struct dt_object_format *dof,
+                               struct thandle *th)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       int                      rc;
+
+       ENTRY;
+
+       /* update fid generation file */
+       if (los != NULL) {
+               LASSERT(dt_object_exists(los->los_obj));
+               rc = dt_declare_record_write(env, los->los_obj,
+                                            sizeof(struct los_ondisk), 0, th);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       rc = dt_declare_create(env, o, attr, NULL, dof, th);
+       if (rc)
+               RETURN(rc);
+
+       dti->dti_lb.lb_buf = NULL;
+       dti->dti_lb.lb_len = sizeof(dti->dti_lma);
+       rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th);
+
+       RETURN(rc);
+}
+
+int local_object_create(const struct lu_env *env,
+                       struct local_oid_storage *los,
+                       struct dt_object *o, struct lu_attr *attr,
+                       struct dt_object_format *dof, struct thandle *th)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct los_ondisk        losd;
+       int                      rc;
+
+       ENTRY;
+
+       rc = dt_create(env, o, attr, NULL, dof, th);
+       if (rc)
+               RETURN(rc);
+
+       lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu));
+       lustre_lma_swab(&dti->dti_lma);
+       dti->dti_lb.lb_buf = &dti->dti_lma;
+       dti->dti_lb.lb_len = sizeof(dti->dti_lma);
+       rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th,
+                         BYPASS_CAPA);
+
+       if (los == NULL)
+               RETURN(rc);
+
+       LASSERT(los->los_obj);
+       LASSERT(dt_object_exists(los->los_obj));
+
+       /* many threads can be updated this, serialize
+        * them here to avoid the race where one thread
+        * takes the value first, but writes it last */
+       cfs_mutex_lock(&los->los_id_lock);
+
+       /* update local oid number on disk so that
+        * we know the last one used after reboot */
+       losd.lso_magic = cpu_to_le32(LOS_MAGIC);
+       losd.lso_next_oid = cpu_to_le32(los->los_last_oid);
+
+       dti->dti_off = 0;
+       dti->dti_lb.lb_buf = &losd;
+       dti->dti_lb.lb_len = sizeof(losd);
+       rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off,
+                            th);
+       cfs_mutex_unlock(&los->los_id_lock);
+
+       RETURN(rc);
+}
+
+/*
+ * Create local named object (file, directory or index) in parent directory.
+ */
+struct dt_object *__local_file_create(const struct lu_env *env,
+                                     const struct lu_fid *fid,
+                                     struct local_oid_storage *los,
+                                     struct ls_device *ls,
+                                     struct dt_object *parent,
+                                     const char *name, __u32 mode)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct dt_object        *dto;
+       struct thandle          *th;
+       int                      rc;
+
+       dto = ls_locate(env, ls, fid);
+       if (unlikely(IS_ERR(dto)))
+               RETURN(dto);
+
+       LASSERT(dto != NULL);
+       if (dt_object_exists(dto))
+               GOTO(out, rc = -EEXIST);
+
+       /* create the object */
+       dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
+       dti->dti_attr.la_mode = mode;
+       dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT);
+
+       th = dt_trans_create(env, ls->ls_osd);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       rc = local_object_declare_create(env, los, dto, &dti->dti_attr,
+                                        &dti->dti_dof, th);
+       if (rc)
+               GOTO(trans_stop, rc);
+
+       if (dti->dti_dof.dof_type == DFT_DIR) {
+               dt_declare_ref_add(env, dto, th);
+               dt_declare_ref_add(env, parent, th);
+       }
+
+       rc = dt_declare_insert(env, parent, (void *)fid, (void *)name, th);
+       if (rc)
+               GOTO(trans_stop, rc);
+
+       rc = dt_trans_start_local(env, ls->ls_osd, th);
+       if (rc)
+               GOTO(trans_stop, rc);
+
+       dt_write_lock(env, dto, 0);
+       if (dt_object_exists(dto))
+               GOTO(unlock, rc = 0);
+
+       CDEBUG(D_OTHER, "create new object "DFID"\n",
+              PFID(lu_object_fid(&dto->do_lu)));
+       rc = local_object_create(env, los, dto, &dti->dti_attr,
+                                &dti->dti_dof, th);
+       if (rc)
+               GOTO(unlock, rc);
+       LASSERT(dt_object_exists(dto));
+
+       if (dti->dti_dof.dof_type == DFT_DIR) {
+               if (!dt_try_as_dir(env, dto))
+                       GOTO(destroy, rc = -ENOTDIR);
+               /* Add "." and ".." for newly created dir */
+               rc = dt_insert(env, dto, (void *)fid, (void *)".", th,
+                              BYPASS_CAPA, 1);
+               if (rc)
+                       GOTO(destroy, rc);
+               dt_ref_add(env, dto, th);
+               rc = dt_insert(env, dto, (void *)lu_object_fid(&parent->do_lu),
+                              (void *)"..", th, BYPASS_CAPA, 1);
+               if (rc)
+                       GOTO(destroy, rc);
+       }
+
+       dt_write_lock(env, parent, 0);
+       rc = dt_insert(env, parent, (const struct dt_rec *)fid,
+                      (const struct dt_key *)name, th, BYPASS_CAPA, 1);
+       if (dti->dti_dof.dof_type == DFT_DIR)
+               dt_ref_add(env, parent, th);
+       dt_write_unlock(env, parent);
+       if (rc)
+               GOTO(destroy, rc);
+destroy:
+       if (rc)
+               dt_destroy(env, dto, th);
+unlock:
+       dt_write_unlock(env, dto);
+trans_stop:
+       dt_trans_stop(env, ls->ls_osd, th);
+out:
+       if (rc) {
+               lu_object_put_nocache(env, &dto->do_lu);
+               dto = ERR_PTR(rc);
+       } else {
+               struct lu_fid dti_fid;
+               /* since local files FIDs are not in OI the directory entry
+                * is used to get inode number/generation, we need to do lookup
+                * again to cache this data after create */
+               rc = dt_lookup_dir(env, parent, name, &dti_fid);
+               LASSERT(rc == 0);
+       }
+       RETURN(dto);
+}
+
+/*
+ * Look up and create (if it does not exist) a local named file or directory in
+ * parent directory.
+ */
+struct dt_object *local_file_find_or_create(const struct lu_env *env,
+                                           struct local_oid_storage *los,
+                                           struct dt_object *parent,
+                                           const char *name, __u32 mode)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct dt_object        *dto;
+       int                      rc;
+
+       LASSERT(parent);
+
+       rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+       if (rc == 0)
+               /* name is found, get the object */
+               dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
+       else if (rc != -ENOENT)
+               dto = ERR_PTR(rc);
+       else {
+               rc = local_object_fid_generate(env, los, &dti->dti_fid);
+               if (rc < 0)
+                       dto = ERR_PTR(rc);
+               else
+                       dto = __local_file_create(env, &dti->dti_fid, los,
+                                                 dt2ls_dev(los->los_dev),
+                                                 parent, name, mode);
+       }
+       return dto;
+}
+EXPORT_SYMBOL(local_file_find_or_create);
+
+struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
+                                                    struct dt_device *dt,
+                                                    const struct lu_fid *fid,
+                                                    struct dt_object *parent,
+                                                    const char *name,
+                                                    __u32 mode)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct dt_object        *dto;
+       int                      rc;
+
+       LASSERT(parent);
+
+       rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+       if (rc == 0) {
+               /* name is found, get the object */
+               if (!lu_fid_eq(fid, &dti->dti_fid))
+                       dto = ERR_PTR(-EINVAL);
+               else
+                       dto = dt_locate(env, dt, fid);
+       } else if (rc != -ENOENT) {
+               dto = ERR_PTR(rc);
+       } else {
+               struct ls_device *ls;
+
+               ls = ls_device_get(env, dt);
+               if (IS_ERR(ls))
+                       dto = ERR_PTR(PTR_ERR(ls));
+               else
+                       dto = __local_file_create(env, fid, NULL, ls, parent,
+                                                 name, mode);
+               ls_device_put(env, ls);
+       }
+       return dto;
+}
+EXPORT_SYMBOL(local_file_find_or_create_with_fid);
+
+static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
+{
+       struct local_oid_storage *los, *ret = NULL;
+
+       cfs_list_for_each_entry(los, &ls->ls_los_list, los_list) {
+               if (los->los_seq == seq) {
+                       cfs_atomic_inc(&los->los_refcount);
+                       ret = los;
+                       break;
+               }
+       }
+       return ret;
+}
+
+/**
+ * Initialize local OID storage for required sequence.
+ * That may be needed for services that uses local files and requires
+ * dynamic OID allocation for them.
+ *
+ * Per each sequence we have an object with 'first_fid' identificator
+ * containing the counter for OIDs of locally created files with that
+ * sequence.
+ *
+ * It is used now by llog subsystem and MGS for NID tables
+ *
+ * Function gets first_fid to create counter object.
+ * All dynamic fids will be generated with the same sequence and incremented
+ * OIDs
+ *
+ * Returned local_oid_storage is in-memory representaion of OID storage
+ */
+int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
+                          const struct lu_fid *first_fid,
+                          struct local_oid_storage **los)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct ls_device        *ls;
+       struct los_ondisk        losd;
+       struct dt_object        *o;
+       struct dt_object        *root = NULL;
+       struct thandle          *th;
+       int                      rc;
+
+       ENTRY;
+
+       ls = ls_device_get(env, dev);
+       if (IS_ERR(ls))
+               RETURN(PTR_ERR(ls));
+
+       cfs_mutex_lock(&ls->ls_los_mutex);
+       *los = dt_los_find(ls, fid_seq(first_fid));
+       if (*los != NULL)
+               GOTO(out, rc = 0);
+
+       /* not found, then create */
+       OBD_ALLOC_PTR(*los);
+       if (*los == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       cfs_atomic_set(&(*los)->los_refcount, 1);
+       cfs_mutex_init(&(*los)->los_id_lock);
+       (*los)->los_dev = &ls->ls_top_dev;
+       cfs_atomic_inc(&ls->ls_refcount);
+       cfs_list_add(&(*los)->los_list, &ls->ls_los_list);
+
+       /* initialize data allowing to generate new fids,
+        * literally we need a sequence */
+       o = ls_locate(env, ls, first_fid);
+       if (IS_ERR(o))
+               GOTO(out_los, rc = PTR_ERR(o));
+
+       rc = dt_root_get(env, dev, &dti->dti_fid);
+       if (rc)
+               GOTO(out_los, rc);
+
+       root = ls_locate(env, ls, &dti->dti_fid);
+       if (IS_ERR(root))
+               GOTO(out_los, rc = PTR_ERR(root));
+
+       if (dt_try_as_dir(env, root) == 0)
+               GOTO(out_los, rc = -ENOTDIR);
+
+       dt_write_lock(env, o, 0);
+       if (!dt_object_exists(o)) {
+               th = dt_trans_create(env, dev);
+               if (IS_ERR(th))
+                       GOTO(out_lock, rc = PTR_ERR(th));
+
+               dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
+               dti->dti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+               dti->dti_dof.dof_type = dt_mode_to_dft(S_IFREG);
+
+               rc = dt_declare_create(env, o, &dti->dti_attr, NULL,
+                                      &dti->dti_dof, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               snprintf(dti->dti_buf, sizeof(dti->dti_buf),
+                       "seq-%Lx-lastid", fid_seq(first_fid));
+               rc = dt_declare_insert(env, root,
+                                      (const struct dt_rec *)lu_object_fid(&o->do_lu),
+                                      (const struct dt_key *)dti->dti_buf,
+                                      th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               dti->dti_lb.lb_buf = NULL;
+               dti->dti_lb.lb_len = sizeof(dti->dti_lma);
+               rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA,
+                                         0, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               rc = dt_declare_record_write(env, o, sizeof(losd), 0, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               rc = dt_trans_start_local(env, dev, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               LASSERT(!dt_object_exists(o));
+               rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+               LASSERT(dt_object_exists(o));
+
+               lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu));
+               lustre_lma_swab(&dti->dti_lma);
+               dti->dti_lb.lb_buf = &dti->dti_lma;
+               dti->dti_lb.lb_len = sizeof(dti->dti_lma);
+               rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0,
+                                 th, BYPASS_CAPA);
+               if (rc)
+                       GOTO(out_trans, rc);
+
+               losd.lso_magic = cpu_to_le32(LOS_MAGIC);
+               losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1);
+
+               dti->dti_off = 0;
+               dti->dti_lb.lb_buf = &losd;
+               dti->dti_lb.lb_len = sizeof(losd);
+               rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+               rc = dt_insert(env, root,
+                              (const struct dt_rec *)lu_object_fid(&o->do_lu),
+                              (const struct dt_key *)dti->dti_buf, th,
+                              BYPASS_CAPA, 1);
+               if (rc)
+                       GOTO(out_trans, rc);
+out_trans:
+               dt_trans_stop(env, dev, th);
+       } else {
+               dti->dti_off = 0;
+               dti->dti_lb.lb_buf = &losd;
+               dti->dti_lb.lb_len = sizeof(losd);
+               rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
+               if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
+                       CERROR("local storage file "DFID" is corrupted\n",
+                              PFID(first_fid));
+                       rc = -EINVAL;
+               }
+       }
+out_lock:
+       dt_write_unlock(env, o);
+out_los:
+       if (root)
+               lu_object_put_nocache(env, &root->do_lu);
+       if (rc) {
+               OBD_FREE_PTR(*los);
+               *los = NULL;
+               if (o)
+                       lu_object_put_nocache(env, &o->do_lu);
+       } else {
+               (*los)->los_seq = fid_seq(first_fid);
+               (*los)->los_last_oid = le32_to_cpu(losd.lso_next_oid);
+               (*los)->los_obj = o;
+       }
+out:
+       cfs_mutex_unlock(&ls->ls_los_mutex);
+       ls_device_put(env, ls);
+       return rc;
+}
+EXPORT_SYMBOL(local_oid_storage_init);
+
+void local_oid_storage_fini(const struct lu_env *env,
+                            struct local_oid_storage *los)
+{
+       struct ls_device *ls;
+
+       if (!cfs_atomic_dec_and_test(&los->los_refcount))
+               return;
+
+       LASSERT(env);
+       LASSERT(los->los_dev);
+       ls = dt2ls_dev(los->los_dev);
+
+       cfs_mutex_lock(&ls->ls_los_mutex);
+       if (cfs_atomic_read(&los->los_refcount) == 0) {
+               if (los->los_obj)
+                       lu_object_put_nocache(env, &los->los_obj->do_lu);
+               cfs_list_del(&los->los_list);
+               OBD_FREE_PTR(los);
+       }
+       cfs_mutex_unlock(&ls->ls_los_mutex);
+       ls_device_put(env, ls);
+}
+EXPORT_SYMBOL(local_oid_storage_fini);
+
diff --git a/lustre/obdclass/local_storage.h b/lustre/obdclass/local_storage.h
new file mode 100644 (file)
index 0000000..6f801a7
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Whamcloud, Inc.
+ */
+/*
+ * lustre/obdclass/local_storage.c
+ *
+ * Local storage for file/objects with fid generation. Works on top of OSD.
+ *
+ * Author: Mikhail Pershin <mike.pershin@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <dt_object.h>
+#include <obd.h>
+#include <lustre_fid.h>
+#include <lustre_disk.h>
+
+struct ls_device {
+       struct dt_device         ls_top_dev;
+       /* all initialized ls_devices on this node linked by this */
+       cfs_list_t               ls_linkage;
+       /* how many handle's reference this local storage */
+       cfs_atomic_t             ls_refcount;
+       /* underlaying OSD device */
+       struct dt_device        *ls_osd;
+       /* list of all local OID storages */
+       cfs_list_t               ls_los_list;
+       cfs_mutex_t              ls_los_mutex;
+};
+
+static inline struct ls_device *dt2ls_dev(struct dt_device *d)
+{
+       return container_of0(d, struct ls_device, ls_top_dev);
+}
+
+struct ls_object {
+       struct lu_object_header  ls_header;
+       struct dt_object         ls_obj;
+};
+
+static inline struct ls_object *lu2ls_obj(struct lu_object *o)
+{
+       return container_of0(o, struct ls_object, ls_obj.do_lu);
+}
+
+static inline struct dt_object *ls_locate(const struct lu_env *env,
+                                         struct ls_device *ls,
+                                         const struct lu_fid *fid)
+{
+       return dt_locate_at(env, ls->ls_osd, fid, &ls->ls_top_dev.dd_lu_dev);
+}
+
+
index a91b4b6..40bf6ec 100644 (file)
@@ -135,6 +135,18 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
 EXPORT_SYMBOL(lu_object_put);
 
 /**
+ * Put object and don't keep in cache. This is temporary solution for
+ * multi-site objects when its layering is not constant.
+ */
+void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
+{
+       cfs_set_bit(LU_OBJECT_HEARD_BANSHEE,
+                   &o->lo_header->loh_flags);
+       return lu_object_put(env, o);
+}
+EXPORT_SYMBOL(lu_object_put_nocache);
+
+/**
  * Allocate new object.
  *
  * This follows object creation protocol, described in the comment within
index cd74171..76a2d19 100644 (file)
@@ -3467,7 +3467,17 @@ struct osd_object *osd_object_find(const struct lu_env *env,
         struct lu_object  *luch;
         struct lu_object  *lo;
 
-        luch = lu_object_find(env, ludev, fid, NULL);
+       /*
+        * at this point topdev might not exist yet
+        * (i.e. MGS is preparing profiles). so we can
+        * not rely on topdev and instead lookup with
+        * our device passed as topdev. this can't work
+        * if the object isn't cached yet (as osd doesn't
+        * allocate lu_header). IOW, the object must be
+        * in the cache, otherwise lu_object_alloc() crashes
+        * -bzzz
+        */
+       luch = lu_object_find_at(env, ludev, fid, NULL);
         if (!IS_ERR(luch)) {
                 if (lu_object_exists(luch)) {
                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);