Whamcloud - gitweb
LU-2886 mdd: create local files using local_storage lib
authorMikhail Pershin <tappro@whamcloud.com>
Tue, 27 Nov 2012 14:00:01 +0000 (18:00 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 13 Apr 2013 03:51:48 +0000 (23:51 -0400)
Switch MDD to use local_storage library to create local objects with
generated or pre-defined FIDs. That unifies the way how local objects
are created on both OST/MDT and avoid layering violation with calling
MDD from OSD like md_local_object library does.

two other fixes are included:
- remove DECLARE_LLOG_WRITE/REWRITE and mdd_declare_llog_record()
  which was a temporary solution to declare llog changes and was
  fixed with llog-over-osd implementation
- add .lustre seq range into FLD in fld_insert_special_entries() as
  all other special ranges.

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: I7f8452e0a6d9abbe6cd1960a8ea71cbae4abe753
Reviewed-on: http://review.whamcloud.com/4682
Tested-by: Hudson
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
14 files changed:
lustre/include/dt_object.h
lustre/include/lustre_fid.h
lustre/include/md_object.h
lustre/mdd/mdd_device.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lfsck.c
lustre/mdd/mdd_orphans.c
lustre/obdclass/dt_object.c
lustre/obdclass/local_storage.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_io.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/osd-zfs/osd_oi.c

index fafe05d..1b8f71b 100644 (file)
@@ -883,6 +883,8 @@ local_index_find_or_create_with_fid(const struct lu_env *env,
                                    struct dt_object *parent,
                                    const char *name, __u32 mode,
                                    const struct dt_index_features *ft);
+int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
+                       struct dt_object *parent, const char *name);
 
 static inline int dt_object_lock(const struct lu_env *env,
                                 struct dt_object *o, struct lustre_handle *lh,
index 3e5b73e..1b6d0ff 100644 (file)
@@ -212,8 +212,8 @@ enum local_oid {
        FID_SEQ_CTL_OID         = 4UL,
        FID_SEQ_SRV_OID         = 5UL,
        /** \see mdd_mod_init */
-       MDD_ROOT_INDEX_OID      = 6UL,
-       MDD_ORPHAN_OID          = 7UL,
+       MDD_ROOT_INDEX_OID      = 6UL, /* deprecated in 2.4 */
+       MDD_ORPHAN_OID          = 7UL, /* deprecated in 2.4 */
        MDD_LOV_OBJ_OID         = 8UL,
        MDD_CAPA_KEYS_OID       = 9UL,
        /** \see mdt_mod_init */
index 383831b..9d76354 100644 (file)
  * super-class definitions.
  */
 #include <dt_object.h>
-#include <lvfs.h>
-
-/* LU-1051, temperary solution to reduce llog credits */
-#define DECLARE_LLOG_REWRITE  0
-#define DECLARE_LLOG_WRITE    INT_MAX
 
 struct md_device;
 struct md_device_operations;
index 88d3b56..bd1e80a 100644 (file)
@@ -160,32 +160,6 @@ static struct lu_device *mdd_device_fini(const struct lu_env *env,
        return NULL;
 }
 
-static void mdd_changelog_fini(const struct lu_env *env,
-                               struct mdd_device *mdd);
-
-static void mdd_device_shutdown(const struct lu_env *env,
-                                struct mdd_device *m, struct lustre_cfg *cfg)
-{
-        ENTRY;
-        if (m->mdd_dot_lustre_objs.mdd_obf)
-                mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
-        if (m->mdd_dot_lustre)
-                mdd_object_put(env, m->mdd_dot_lustre);
-        orph_index_fini(env, m);
-        if (m->mdd_capa != NULL) {
-                lu_object_put(env, &m->mdd_capa->do_lu);
-                m->mdd_capa = NULL;
-        }
-       lu_site_purge(env, m->mdd_md_dev.md_lu_dev.ld_site, -1);
-        /* remove upcall device*/
-        md_upcall_fini(&m->mdd_md_dev);
-
-       if (m->mdd_child_exp)
-               obd_disconnect(m->mdd_child_exp);
-
-        EXIT;
-}
-
 static int changelog_init_cb(const struct lu_env *env, struct llog_handle *llh,
                             struct llog_rec_hdr *hdr, void *data)
 {
@@ -564,34 +538,6 @@ int mdd_changelog_write_header(const struct lu_env *env,
        RETURN(rc);
 }
 
-/**
- * Create ".lustre" directory.
- */
-static int create_dot_lustre_dir(const struct lu_env *env, struct mdd_device *m)
-{
-        struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
-        struct md_object *mdo;
-        int rc;
-
-        memcpy(fid, &LU_DOT_LUSTRE_FID, sizeof(struct lu_fid));
-        mdo = llo_store_create_index(env, &m->mdd_md_dev, m->mdd_child,
-                                     mdd_root_dir_name, dot_lustre_name,
-                                     fid, &dt_directory_features);
-        /* .lustre dir may be already present */
-        if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) {
-                rc = PTR_ERR(mdo);
-                CERROR("creating obj [%s] fid = "DFID" rc = %d\n",
-                        dot_lustre_name, PFID(fid), rc);
-               return rc;
-        }
-
-        if (!IS_ERR(mdo))
-                lu_object_put(env, &mdo->mo_lu);
-
-        return 0;
-}
-
-
 static int dot_lustre_mdd_permission(const struct lu_env *env,
                                      struct md_object *pobj,
                                      struct md_object *cobj,
@@ -1037,61 +983,94 @@ static struct md_dir_operations mdd_obf_dir_ops = {
  */
 static int mdd_obf_setup(const struct lu_env *env, struct mdd_device *m)
 {
-        struct mdd_object *mdd_obf;
-        struct lu_object *obf_lu_obj;
-        int rc = 0;
+       struct mdd_object *mdd_obf;
 
-        m->mdd_dot_lustre_objs.mdd_obf = mdd_object_find(env, m,
-                                                         &LU_OBF_FID);
-        if (m->mdd_dot_lustre_objs.mdd_obf == NULL ||
-            IS_ERR(m->mdd_dot_lustre_objs.mdd_obf))
-                GOTO(out, rc = -ENOENT);
+       mdd_obf = mdd_object_find(env, m, &LU_OBF_FID);
+       if (mdd_obf == NULL || IS_ERR(mdd_obf))
+               return -ENOENT;
 
-        mdd_obf = m->mdd_dot_lustre_objs.mdd_obf;
-        mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops;
-        mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops;
-        /* Don't allow objects to be created in "fid" dir */
-        mdd_obf->mod_flags |= IMMUTE_OBJ;
+       m->mdd_dot_lustre_objs.mdd_obf = mdd_obf;
+       mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops;
+       mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops;
+       /* Don't allow objects to be created in "fid" dir */
+       mdd_obf->mod_flags |= IMMUTE_OBJ;
 
-        obf_lu_obj = mdd2lu_obj(mdd_obf);
-        obf_lu_obj->lo_header->loh_attr |= (LOHA_EXISTS | S_IFDIR);
+       mdd2lu_obj(mdd_obf)->lo_header->loh_attr |= LOHA_EXISTS | S_IFDIR;
+       return 0;
+}
 
-out:
-        return rc;
+static struct md_object *mdo_locate(const struct lu_env *env,
+                                   struct md_device *md,
+                                   const struct lu_fid *fid)
+{
+       struct lu_object *obj;
+       struct md_object *mdo;
+
+       obj = lu_object_find(env, &md->md_lu_dev, fid, NULL);
+       if (!IS_ERR(obj)) {
+               obj = lu_object_locate(obj->lo_header, md->md_lu_dev.ld_type);
+               LASSERT(obj != NULL);
+               mdo = lu2md(obj);
+       } else {
+               mdo = ERR_PTR(PTR_ERR(obj));
+       }
+       return mdo;
 }
 
 /** Setup ".lustre" directory object */
 static int mdd_dot_lustre_setup(const struct lu_env *env, struct mdd_device *m)
 {
-        struct dt_object *dt_dot_lustre;
-        struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
-        int rc;
-       ENTRY;
-
-        rc = create_dot_lustre_dir(env, m);
-        if (rc)
-                return rc;
+       struct md_object        *mdo;
+       struct lu_fid            fid;
+       int                      rc;
 
-        dt_dot_lustre = dt_store_open(env, m->mdd_child, mdd_root_dir_name,
-                                      dot_lustre_name, fid);
-        if (IS_ERR(dt_dot_lustre)) {
-                rc = PTR_ERR(dt_dot_lustre);
-                GOTO(out, rc);
-        }
+       ENTRY;
+       /* Create ".lustre" directory in ROOT. */
+       fid = LU_DOT_LUSTRE_FID;
+       rc = mdd_local_file_create(env, m, &m->mdd_root_fid,
+                                  dot_lustre_name,
+                                  S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO,
+                                  &fid);
+       if (rc < 0)
+               RETURN(rc);
+       mdo = mdo_locate(env, &m->mdd_md_dev, &fid);
+       if (IS_ERR(mdo))
+               RETURN(PTR_ERR(mdo));
+       LASSERT(lu_object_exists(&mdo->mo_lu));
 
-        /* references are released in mdd_device_shutdown() */
-        m->mdd_dot_lustre = lu2mdd_obj(lu_object_locate(dt_dot_lustre->do_lu.lo_header,
-                                                        &mdd_device_type));
+       m->mdd_dot_lustre = md2mdd_obj(mdo);
+       m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops;
+       m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops;
 
-        m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops;
-        m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops;
+       rc = mdd_obf_setup(env, m);
+       if (rc) {
+               CERROR("%s: error initializing \"fid\" object: rc = %d.\n",
+                      mdd2obd_dev(m)->obd_name, rc);
+               GOTO(out, rc);
+       }
+       RETURN(0);
+out:
+       mdd_object_put(env, m->mdd_dot_lustre);
+       m->mdd_dot_lustre = NULL;
+       return rc;
+}
 
-        rc = mdd_obf_setup(env, m);
-        if (rc)
-                CERROR("Error initializing \"fid\" object - %d.\n", rc);
+static void mdd_device_shutdown(const struct lu_env *env, struct mdd_device *m,
+                               struct lustre_cfg *cfg)
+{
+       mdd_lfsck_cleanup(env, m);
+       mdd_changelog_fini(env, m);
+       orph_index_fini(env, m);
+       if (m->mdd_dot_lustre_objs.mdd_obf)
+               mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
+       if (m->mdd_dot_lustre)
+               mdd_object_put(env, m->mdd_dot_lustre);
+       if (m->mdd_los != NULL)
+               local_oid_storage_fini(env, m->mdd_los);
+       lu_site_purge(env, mdd2lu_dev(m)->ld_site, ~0);
 
-out:
-        RETURN(rc);
+       if (m->mdd_child_exp)
+               obd_disconnect(m->mdd_child_exp);
 }
 
 static int mdd_process_config(const struct lu_env *env,
@@ -1118,14 +1097,12 @@ static int mdd_process_config(const struct lu_env *env,
                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
                 if (rc)
                         GOTO(out, rc);
-                dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
+               dt_conf_get(env, dt, &m->mdd_dt_conf);
                 break;
         case LCFG_CLEANUP:
-               mdd_lfsck_cleanup(env, m);
-               mdd_changelog_fini(env, m);
                rc = next->ld_ops->ldo_process_config(env, next, cfg);
                lu_dev_del_linkage(d->ld_site, d);
-                mdd_device_shutdown(env, m, cfg);
+               mdd_device_shutdown(env, m, cfg);
                break;
         default:
                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
@@ -1153,38 +1130,38 @@ static int mdd_recovery_complete(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_find_or_create_root(const struct lu_env *env,
-                                  struct mdd_device *mdd)
+int mdd_local_file_create(const struct lu_env *env, struct mdd_device *mdd,
+                         const struct lu_fid *pfid, const char *name, __u32 mode,
+                         struct lu_fid *fid)
 {
-       struct dt_object        *root;
-       struct md_object        *mroot;
-       struct lu_fid           fid;
-       int                     rc = 0;
+       struct dt_object        *parent, *dto;
+       int                      rc;
 
        ENTRY;
 
-       /* Check if the "ROOT" entry exists already */
-       root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name,
-                            &fid);
-       if (!IS_ERR(root)) {
-               lu_object_put(env, &root->do_lu);
-               GOTO(out, rc = 0);
-       }
-
-       lu_root_fid(&fid);
-       /* New Filesystem, create /ROOT */
-       mroot = llo_store_create_index(env, &mdd->mdd_md_dev, mdd->mdd_bottom,
-                                      "", mdd_root_dir_name, &fid,
-                                      &dt_directory_features);
-       if (IS_ERR(mroot))
-               GOTO(out, rc = PTR_ERR(mroot));
-
-       lu_object_put(env, &mroot->mo_lu);
-out:
-       if (rc == 0)
-               mdd->mdd_root_fid = fid;
+       LASSERT(!fid_is_zero(pfid));
+       parent = dt_locate(env, mdd->mdd_bottom, pfid);
+       if (unlikely(IS_ERR(parent)))
+               RETURN(PTR_ERR(parent));
 
-       RETURN(rc);
+       /* create local file/dir, if @fid is passed then try to use it */
+       if (fid_is_zero(fid))
+               dto = local_file_find_or_create(env, mdd->mdd_los, parent,
+                                               name, mode);
+       else
+               dto = local_file_find_or_create_with_fid(env, mdd->mdd_bottom,
+                                                        fid, parent, name,
+                                                        mode);
+       if (IS_ERR(dto))
+               GOTO(out_put, rc = PTR_ERR(dto));
+       *fid = *lu_object_fid(&dto->do_lu);
+       /* since stack is not fully set up the local_storage uses own stack
+        * and we should drop its object from cache */
+       lu_object_put_nocache(env, &dto->do_lu);
+       EXIT;
+out_put:
+       lu_object_put(env, &parent->do_lu);
+       return 0;
 }
 
 static int mdd_prepare(const struct lu_env *env,
@@ -1193,62 +1170,85 @@ static int mdd_prepare(const struct lu_env *env,
 {
        struct mdd_device       *mdd = lu2mdd_dev(cdev);
        struct lu_device        *next = &mdd->mdd_child->dd_lu_dev;
-       struct dt_object        *root;
        struct lu_fid            fid;
-       int                     rc;
+       int                      rc;
 
-        ENTRY;
+       ENTRY;
 
        rc = next->ld_ops->ldo_prepare(env, cdev, next);
        if (rc)
-               GOTO(out, rc);
+               RETURN(rc);
+
+       /* Setup local dirs */
+       fid.f_seq = FID_SEQ_LOCAL_NAME;
+       fid.f_oid = 1;
+       fid.f_ver = 0;
+       rc = local_oid_storage_init(env, mdd->mdd_bottom, &fid,
+                                   &mdd->mdd_los);
+       if (rc)
+               RETURN(rc);
 
        rc = dt_root_get(env, mdd->mdd_child, &mdd->mdd_local_root_fid);
-       if (rc != 0)
-               GOTO(out, rc);
+       if (rc < 0)
+               GOTO(out_los, rc);
 
        if (mdd_seq_site(mdd)->ss_node_id == 0) {
-               rc = mdd_find_or_create_root(env, mdd);
+               lu_root_fid(&fid);
+               rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
+                                          mdd_root_dir_name, S_IFDIR |
+                                          S_IRUGO | S_IWUSR | S_IXUGO, &fid);
                if (rc != 0) {
                        CERROR("%s: create root fid failed: rc = %d\n",
                               mdd2obd_dev(mdd)->obd_name, rc);
-                       GOTO(out, rc);
+                       GOTO(out_los, rc);
                }
+               mdd->mdd_root_fid = fid;
 
                rc = mdd_dot_lustre_setup(env, mdd);
                if (rc != 0) {
                        CERROR("%s: initializing .lustre failed: rc = %d\n",
                               mdd2obd_dev(mdd)->obd_name, rc);
-                       GOTO(out, rc);
+                       GOTO(out_los, rc);
                }
 
                rc = mdd_compat_fixes(env, mdd);
                if (rc)
-                       GOTO(out, rc);
+                       GOTO(out_los, rc);
 
        }
 
        rc = orph_index_init(env, mdd);
-       if (rc != 0)
-                GOTO(out, rc);
-
-        /* we use capa file to declare llog changes,
-         * will be fixed with new llog in 2.3 */
-       root = dt_store_open(env, mdd->mdd_child, "", CAPA_KEYS, &fid);
-       if (IS_ERR(root))
-               GOTO(out, rc = PTR_ERR(root));
-
-       mdd->mdd_capa = root;
+       if (rc < 0)
+               GOTO(out_dot, rc);
 
        rc = mdd_changelog_init(env, mdd);
-       if (rc != 0)
-               GOTO(out, rc);
+       if (rc != 0) {
+               CERROR("%s: failed to initialize changelog: rc = %d\n",
+                      mdd2obd_dev(mdd)->obd_name, rc);
+               GOTO(out_orph, rc);
+       }
 
        rc = mdd_lfsck_setup(env, mdd);
-       if (rc != 0)
+       if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       mdd2obd_dev(mdd)->obd_name, rc);
-out:
+               GOTO(out_changelog, rc);
+       }
+       RETURN(0);
+out_changelog:
+       mdd_changelog_fini(env, mdd);
+out_orph:
+       orph_index_fini(env, mdd);
+out_dot:
+       if (mdd_seq_site(mdd)->ss_node_id == 0) {
+               mdd_object_put(env, mdd->mdd_dot_lustre);
+               mdd->mdd_dot_lustre = NULL;
+               mdd_object_put(env, mdd->mdd_dot_lustre_objs.mdd_obf);
+               mdd->mdd_dot_lustre_objs.mdd_obf = NULL;
+       }
+out_los:
+       local_oid_storage_fini(env, mdd->mdd_los);
+       mdd->mdd_los = NULL;
        return rc;
 }
 
@@ -1736,10 +1736,7 @@ static void mdd_key_fini(const struct lu_context *ctx,
                          struct lu_context_key *key, void *data)
 {
         struct mdd_thread_info *info = data;
-        if (info->mti_max_lmm != NULL)
-                OBD_FREE(info->mti_max_lmm, info->mti_max_lmm_size);
-        if (info->mti_max_cookie != NULL)
-                OBD_FREE(info->mti_max_cookie, info->mti_max_cookie_size);
+
        lu_buf_free(&info->mti_big_buf);
        lu_buf_free(&info->mti_link_buf);
 
@@ -1749,32 +1746,6 @@ static void mdd_key_fini(const struct lu_context *ctx,
 /* context key: mdd_thread_key */
 LU_CONTEXT_KEY_DEFINE(mdd, LCT_MD_THREAD);
 
-static struct lu_local_obj_desc llod_capa_key = {
-        .llod_name      = CAPA_KEYS,
-        .llod_oid       = MDD_CAPA_KEYS_OID,
-        .llod_is_index  = 0,
-};
-
-static struct lu_local_obj_desc llod_mdd_orphan = {
-        .llod_name      = orph_index_name,
-        .llod_oid       = MDD_ORPHAN_OID,
-        .llod_is_index  = 1,
-        .llod_feat      = &dt_directory_features,
-};
-
-static struct lu_local_obj_desc llod_lfsck_bookmark = {
-       .llod_name      = lfsck_bookmark_name,
-       .llod_oid       = LFSCK_BOOKMARK_OID,
-       .llod_is_index  = 0,
-};
-
-static struct lu_local_obj_desc llod_lfsck_namespace = {
-       .llod_name      = lfsck_namespace_name,
-       .llod_oid       = LFSCK_NAMESPACE_OID,
-       .llod_is_index  = 1,
-       .llod_feat      = &dt_lfsck_features,
-};
-
 static int __init mdd_mod_init(void)
 {
        struct lprocfs_static_vars lvars;
@@ -1791,11 +1762,6 @@ static int __init mdd_mod_init(void)
        changelog_orig_logops.lop_add = llog_cat_add_rec;
        changelog_orig_logops.lop_declare_add = llog_cat_declare_add_rec;
 
-       llo_local_obj_register(&llod_capa_key);
-       llo_local_obj_register(&llod_mdd_orphan);
-       llo_local_obj_register(&llod_lfsck_bookmark);
-       llo_local_obj_register(&llod_lfsck_namespace);
-
        rc = class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
                                 LUSTRE_MDD_NAME, &mdd_device_type);
        if (rc)
@@ -1805,11 +1771,6 @@ static int __init mdd_mod_init(void)
 
 static void __exit mdd_mod_exit(void)
 {
-       llo_local_obj_unregister(&llod_capa_key);
-       llo_local_obj_unregister(&llod_mdd_orphan);
-       llo_local_obj_unregister(&llod_lfsck_bookmark);
-       llo_local_obj_unregister(&llod_lfsck_namespace);
-
        class_unregister_type(LUSTRE_MDD_NAME);
        lu_kmem_fini(mdd_caches);
 }
index 2a8fd65..b9fcaac 100644 (file)
@@ -554,60 +554,6 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
-int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
-                            int reclen, struct thandle *handle)
-{
-        int rc;
-
-        /* XXX: this is a temporary solution to declare llog changes
-         *      will be fixed in 2.3 with new llog implementation */
-
-        LASSERT(mdd->mdd_capa);
-
-        /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
-         *      have to set the parameter 'size' as INT_MAX or 0 to inform
-         *      OSD that this record write is for a llog write or catalog
-         *      header update, and osd declare function will reserve less
-         *      credits for optimization purpose.
-         *
-         *      Reserve 6 blocks for a llog write, since the llog file is
-         *      usually small, reserve 2 blocks for catalog header update,
-         *      because we know for sure that catalog header is already
-         *      allocated.
-         *
-         *      This hack should be removed in 2.3.
-         */
-
-        /* record itself */
-        rc = dt_declare_record_write(env, mdd->mdd_capa,
-                                     DECLARE_LLOG_WRITE, 0, handle);
-        if (rc)
-                return rc;
-
-        /* header will be updated as well */
-        rc = dt_declare_record_write(env, mdd->mdd_capa,
-                                     DECLARE_LLOG_WRITE, 0, handle);
-        if (rc)
-                return rc;
-
-        /* also we should be able to create new plain log */
-        rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
-        if (rc)
-                return rc;
-
-        /* new record referencing new plain llog */
-        rc = dt_declare_record_write(env, mdd->mdd_capa,
-                                     DECLARE_LLOG_WRITE, 0, handle);
-        if (rc)
-                return rc;
-
-        /* catalog's header will be updated as well */
-        rc = dt_declare_record_write(env, mdd->mdd_capa,
-                                     DECLARE_LLOG_REWRITE, 0, handle);
-
-        return rc;
-}
-
 int mdd_declare_changelog_store(const struct lu_env *env,
                                struct mdd_device *mdd,
                                const struct lu_name *fname,
index b3691c2..ee987f1 100644 (file)
@@ -106,7 +106,6 @@ struct mdd_device {
        struct lu_fid                    mdd_local_root_fid;
         struct dt_device_param           mdd_dt_conf;
         struct dt_object                *mdd_orphans; /* PENDING directory */
-        struct dt_object                *mdd_capa;
         cfs_proc_dir_entry_t            *mdd_proc_entry;
         struct mdd_changelog             mdd_cl;
         unsigned long                    mdd_atime_diff;
@@ -115,6 +114,7 @@ struct mdd_device {
        struct md_lfsck                  mdd_lfsck;
        unsigned int                     mdd_sync_permission;
        int                              mdd_connects;
+       struct local_oid_storage        *mdd_los;
 };
 
 enum mod_flags {
@@ -384,8 +384,6 @@ struct mdd_object *mdd_object_find(const struct lu_env *env,
 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm);
 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                  const struct lu_rdpg *rdpg);
-int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
-                            int reclen, struct thandle *handle);
 int mdd_declare_changelog_store(const struct lu_env *env,
                                struct mdd_device *mdd,
                                const struct lu_name *fname,
@@ -448,6 +446,9 @@ int mdd_lfsck_dump(const struct lu_env *env, struct md_lfsck *lfsck,
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d);
+int mdd_local_file_create(const struct lu_env *env, struct mdd_device *mdd,
+                         const struct lu_fid *pfid, const char *name,
+                         __u32 mode, struct lu_fid *fid);
 
 int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
                   struct thandle *handle);
index dd8979e..9d7b54c 100644 (file)
@@ -683,99 +683,6 @@ static int mdd_lfsck_namespace_init(const struct lu_env *env,
        return rc;
 }
 
-static int mdd_declare_lfsck_namespace_unlink(const struct lu_env *env,
-                                             struct mdd_device *mdd,
-                                             struct dt_object *p,
-                                             struct dt_object *c,
-                                             const char *name,
-                                             struct thandle *handle)
-{
-       int rc;
-
-       rc = dt_declare_delete(env, p, (const struct dt_key *)name, handle);
-       if (rc != 0)
-               return rc;
-
-       rc = dt_declare_ref_del(env, c, handle);
-       if (rc != 0)
-               return rc;
-
-       rc = dt_declare_destroy(env, c, handle);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_unlink(const struct lu_env *env,
-                                     struct mdd_device *mdd,
-                                     struct lfsck_component *com)
-{
-       struct mdd_thread_info  *info   = mdd_env_info(env);
-       struct lu_fid           *fid    = &info->mti_fid;
-       struct dt_object        *child  = com->lc_obj;
-       struct dt_object        *parent;
-       struct thandle          *handle;
-       bool                     locked = false;
-       int                      rc;
-       ENTRY;
-
-       parent = dt_store_resolve(env, mdd->mdd_bottom, "", fid);
-       if (IS_ERR(parent))
-               RETURN(rc = PTR_ERR(parent));
-
-       if (!dt_try_as_dir(env, parent))
-               GOTO(out, rc = -ENOTDIR);
-
-       handle = dt_trans_create(env, mdd->mdd_bottom);
-       if (IS_ERR(handle))
-               GOTO(out, rc = PTR_ERR(handle));
-
-       rc = mdd_declare_lfsck_namespace_unlink(env, mdd, parent, child,
-                                               lfsck_namespace_name, handle);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       dt_write_lock(env, child, MOR_TGT_CHILD);
-       locked = true;
-       rc = dt_delete(env, parent, (struct dt_key *)lfsck_namespace_name,
-                      handle, BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       rc = child->do_ops->do_ref_del(env, child, handle);
-       if (rc != 0) {
-               lu_local_obj_fid(fid, LFSCK_NAMESPACE_OID);
-               rc = dt_insert(env, parent,
-                              (const struct dt_rec*)fid,
-                              (const struct dt_key *)lfsck_namespace_name,
-                              handle, BYPASS_CAPA, 1);
-
-               GOTO(stop, rc);
-       }
-
-
-       rc = dt_destroy(env, child, handle);
-
-       GOTO(stop, rc);
-
-stop:
-       if (locked)
-               dt_write_unlock(env, child);
-
-       if (rc == 0) {
-               lu_object_put(env, &child->do_lu);
-               com->lc_obj = NULL;
-       }
-
-       dt_trans_stop(env, mdd->mdd_bottom, handle);
-
-out:
-       lu_object_put(env, &parent->do_lu);
-       return rc;
-}
-
 static int mdd_lfsck_namespace_lookup(const struct lu_env *env,
                                      struct lfsck_component *com,
                                      const struct lu_fid *fid,
@@ -1067,12 +974,9 @@ stop:
 static int mdd_lfsck_namespace_reset(const struct lu_env *env,
                                     struct lfsck_component *com, bool init)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct lu_fid           *fid  = &info->mti_fid;
        struct lfsck_namespace  *ns   = (struct lfsck_namespace *)com->lc_file_ram;
        struct mdd_device       *mdd  = mdd_lfsck2mdd(com->lc_lfsck);
-       struct md_object        *mdo;
-       struct dt_object        *dto;
+       struct dt_object        *dto, *root;
        int                      rc;
        ENTRY;
 
@@ -1090,32 +994,32 @@ static int mdd_lfsck_namespace_reset(const struct lu_env *env,
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
 
-       rc = mdd_lfsck_namespace_unlink(env, mdd, com);
+       root = dt_locate(env, mdd->mdd_bottom, &mdd->mdd_local_root_fid);
+       if (unlikely(IS_ERR(root)))
+               GOTO(out, rc = PTR_ERR(root));
+
+       rc = local_object_unlink(env, mdd->mdd_bottom, root,
+                                lfsck_namespace_name);
        if (rc != 0)
                GOTO(out, rc);
 
-       lu_local_obj_fid(fid, LFSCK_NAMESPACE_OID);
-       mdo = llo_store_create_index(env, &mdd->mdd_md_dev, mdd->mdd_bottom, "",
-                                    lfsck_namespace_name, fid,
-                                    &dt_lfsck_features);
-       if (IS_ERR(mdo))
-               GOTO(out, rc = PTR_ERR(mdo));
-
-       lu_object_put(env, &mdo->mo_lu);
-       dto = dt_store_open(env, mdd->mdd_bottom, "", lfsck_namespace_name, fid);
+       dto = local_index_find_or_create(env, mdd->mdd_los, root,
+                                        lfsck_namespace_name,
+                                        S_IFREG | S_IRUGO | S_IWUSR,
+                                        &dt_lfsck_features);
        if (IS_ERR(dto))
                GOTO(out, rc = PTR_ERR(dto));
 
-       com->lc_obj = dto;
        rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
        if (rc != 0)
                GOTO(out, rc);
+       com->lc_obj = dto;
 
        rc = mdd_lfsck_namespace_store(env, com, true);
 
        GOTO(out, rc);
-
 out:
+       lu_object_put(env, &root->do_lu);
        up_write(&com->lc_sem);
        return rc;
 }
@@ -1980,11 +1884,11 @@ static struct lfsck_operations mdd_lfsck_namespace_ops = {
 static int mdd_lfsck_namespace_setup(const struct lu_env *env,
                                     struct md_lfsck *lfsck)
 {
-       struct mdd_device      *mdd = mdd_lfsck2mdd(lfsck);
-       struct lfsck_component *com;
-       struct lfsck_namespace *ns;
-       struct dt_object       *obj;
-       int                     rc;
+       struct mdd_device       *mdd = mdd_lfsck2mdd(lfsck);
+       struct lfsck_component  *com;
+       struct lfsck_namespace  *ns;
+       struct dt_object        *obj, *root;
+       int                      rc;
        ENTRY;
 
        OBD_ALLOC_PTR(com);
@@ -2007,8 +1911,15 @@ static int mdd_lfsck_namespace_setup(const struct lu_env *env,
        if (com->lc_file_disk == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       obj = dt_store_open(env, mdd->mdd_bottom, "", lfsck_namespace_name,
-                           &mdd_env_info(env)->mti_fid);
+       root = dt_locate(env, mdd->mdd_bottom, &mdd->mdd_local_root_fid);
+       if (unlikely(IS_ERR(root)))
+               GOTO(out, rc = PTR_ERR(root));
+
+       obj = local_index_find_or_create(env, mdd->mdd_los, root,
+                                        lfsck_namespace_name,
+                                        S_IFREG | S_IRUGO | S_IWUSR,
+                                        &dt_lfsck_features);
+       lu_object_put(env, &root->do_lu);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
@@ -2615,7 +2526,7 @@ static int mdd_lfsck_main(void *args)
        ENTRY;
 
        cfs_daemonize("lfsck");
-       rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
+       rc = lu_env_init(&env, LCT_MD_THREAD);
        if (rc != 0) {
                CERROR("%s: LFSCK, fail to init env, rc = %d\n",
                       mdd_lfsck2name(lfsck), rc);
@@ -2938,9 +2849,11 @@ static const struct lu_fid lfsck_it_fid = { .f_seq = FID_SEQ_LOCAL_FILE,
 
 int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd)
 {
-       struct md_lfsck  *lfsck = &mdd->mdd_lfsck;
-       struct dt_object *obj;
-       int               rc;
+       struct md_lfsck         *lfsck = &mdd->mdd_lfsck;
+       struct dt_object        *obj;
+       struct lu_fid            fid;
+       int                      rc;
+
        ENTRY;
 
        LASSERT(!lfsck->ml_initialized);
@@ -2962,27 +2875,40 @@ int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd)
        rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
        if (rc != 0) {
                if (rc == -ENOTSUPP)
-                       rc = 0;
-
-               RETURN(rc);
+                       RETURN(0);
+               GOTO(out, rc);
        }
 
-       obj = dt_store_open(env, mdd->mdd_bottom, "", lfsck_bookmark_name,
-                           &mdd_env_info(env)->mti_fid);
+       /* LFSCK bookmark */
+       fid_zero(&fid);
+       rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
+                                  lfsck_bookmark_name,
+                                  S_IFREG | S_IRUGO | S_IWUSR, &fid);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       obj = dt_locate(env, mdd->mdd_bottom, &fid);
        if (IS_ERR(obj))
-               RETURN(PTR_ERR(obj));
+               GOTO(out, rc = PTR_ERR(obj));
 
+       LASSERT(lu_object_exists(&obj->do_lu));
        lfsck->ml_bookmark_obj = obj;
+
        rc = mdd_lfsck_bookmark_load(env, lfsck);
        if (rc == -ENODATA)
                rc = mdd_lfsck_bookmark_init(env, lfsck);
        if (rc != 0)
-               RETURN(rc);
+               GOTO(out, rc);
 
        rc = mdd_lfsck_namespace_setup(env, lfsck);
+       if (rc < 0)
+               GOTO(out, rc);
        /* XXX: LFSCK components initialization to be added here. */
-
-       RETURN(rc);
+       RETURN(0);
+out:
+       lu_object_put(env, &lfsck->ml_obj_oit->do_lu);
+       lfsck->ml_obj_oit = NULL;
+       return 0;
 }
 
 void mdd_lfsck_cleanup(const struct lu_env *env, struct mdd_device *mdd)
index e364903..38e9849 100644 (file)
@@ -518,26 +518,32 @@ out:
  */
 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
 {
-        struct lu_fid fid;
-        struct dt_object *d;
-        int rc = 0;
-        ENTRY;
-
-        d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
-        if (!IS_ERR(d)) {
-                mdd->mdd_orphans = d;
-                if (!dt_try_as_dir(env, d)) {
-                        rc = -ENOTDIR;
-                        CERROR("\"%s\" is not an index! : rc = %d\n",
-                                        orph_index_name, rc);
-                }
-        } else {
-                CERROR("cannot find \"%s\" obj %d\n",
-                       orph_index_name, (int)PTR_ERR(d));
-                rc = PTR_ERR(d);
-        }
-
-        RETURN(rc);
+       struct lu_fid            fid;
+       struct dt_object        *d;
+       int                      rc = 0;
+
+       ENTRY;
+
+       /* create PENDING dir */
+       fid_zero(&fid);
+       rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
+                                  orph_index_name, S_IFDIR | S_IRUGO |
+                                  S_IWUSR | S_IXUGO, &fid);
+       if (rc < 0)
+               RETURN(rc);
+
+       d = dt_locate(env, mdd->mdd_child, &fid);
+       if (IS_ERR(d))
+               RETURN(PTR_ERR(d));
+       LASSERT(lu_object_exists(&d->do_lu));
+       if (!dt_try_as_dir(env, d)) {
+               CERROR("%s: \"%s\" is not an index: rc = %d\n",
+                      mdd2obd_dev(mdd)->obd_name, orph_index_name, rc);
+               lu_object_put(env, &d->do_lu);
+               RETURN(-ENOTDIR);
+       }
+       mdd->mdd_orphans = d;
+       RETURN(0);
 }
 
 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
index 55b2be3..6389e03 100644 (file)
@@ -850,7 +850,7 @@ EXPORT_SYMBOL(dt_index_walk);
  * \retval appropriate error otherwise.
  */
 int dt_index_read(const struct lu_env *env, struct dt_device *dev,
-                  struct idx_info *ii, const struct lu_rdpg *rdpg)
+                 struct idx_info *ii, const struct lu_rdpg *rdpg)
 {
        const struct dt_index_features  *feat;
        struct dt_object                *obj;
@@ -862,15 +862,15 @@ int dt_index_read(const struct lu_env *env, struct dt_device *dev,
        if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0)
                RETURN(-EFAULT);
 
-       if (fid_seq(&ii->ii_fid) < FID_SEQ_SPECIAL)
-               /* block access to local files */
-               RETURN(-EPERM);
-
        if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL)
                /* we don't support directory transfer via OBD_IDX_READ for the
                 * time being */
                RETURN(-EOPNOTSUPP);
 
+       if (!fid_is_quota(&ii->ii_fid))
+               /* block access to all local files except quota files */
+               RETURN(-EPERM);
+
        /* lookup index object subject to the transfer */
        obj = dt_locate(env, dev, &ii->ii_fid);
        if (IS_ERR(obj))
index 7b36943..aefcb8e 100644 (file)
@@ -436,11 +436,7 @@ struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
 
        rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
        if (rc == 0) {
-               /* name is found, get the object */
-               if (!lu_fid_eq(fid, &dti->dti_fid))
-                       dto = ERR_PTR(-EINVAL);
-               else
-                       dto = dt_locate(env, dt, fid);
+               dto = dt_locate(env, dt, &dti->dti_fid);
        } else if (rc != -ENOENT) {
                dto = ERR_PTR(rc);
        } else {
@@ -566,6 +562,81 @@ local_index_find_or_create_with_fid(const struct lu_env *env,
 }
 EXPORT_SYMBOL(local_index_find_or_create_with_fid);
 
+static int local_object_declare_unlink(const struct lu_env *env,
+                                      struct dt_device *dt,
+                                      struct dt_object *p,
+                                      struct dt_object *c, const char *name,
+                                      struct thandle *th)
+{
+       int rc;
+
+       rc = dt_declare_delete(env, p, (const struct dt_key *)name, th);
+       if (rc < 0)
+               return rc;
+
+       rc = dt_declare_ref_del(env, c, th);
+       if (rc < 0)
+               return rc;
+
+       return dt_declare_destroy(env, c, th);
+}
+
+int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
+                       struct dt_object *parent, const char *name)
+{
+       struct dt_thread_info   *dti = dt_info(env);
+       struct dt_object        *dto;
+       struct thandle          *th;
+       int                      rc;
+
+       ENTRY;
+
+       rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+       if (rc == -ENOENT)
+               RETURN(0);
+       else if (rc < 0)
+               RETURN(rc);
+
+       dto = dt_locate(env, dt, &dti->dti_fid);
+       if (unlikely(IS_ERR(dto)))
+               RETURN(PTR_ERR(dto));
+
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       rc = local_object_declare_unlink(env, dt, parent, dto, name, th);
+       if (rc < 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dt, th);
+       if (rc < 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, dto, 0);
+       rc = dt_delete(env, parent, (struct dt_key *)name, th, BYPASS_CAPA);
+       if (rc < 0)
+               GOTO(unlock, rc);
+
+       rc = dt_ref_del(env, dto, th);
+       if (rc < 0) {
+               rc = dt_insert(env, parent,
+                              (const struct dt_rec *)&dti->dti_fid,
+                              (const struct dt_key *)name, th, BYPASS_CAPA, 1);
+               GOTO(unlock, rc);
+       }
+
+       rc = dt_destroy(env, dto, th);
+unlock:
+       dt_write_unlock(env, dto);
+stop:
+       dt_trans_stop(env, dt, th);
+out:
+       lu_object_put_nocache(env, &dto->do_lu);
+       return rc;
+}
+EXPORT_SYMBOL(local_object_unlink);
+
 struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
 {
        struct local_oid_storage *los, *ret = NULL;
@@ -648,22 +719,20 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
        if (IS_ERR(root))
                GOTO(out_los, rc = PTR_ERR(root));
 
+       /* initialize data allowing to generate new fids,
+        * literally we need a sequence */
        snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%Lx-lastid",
                 fid_seq(first_fid));
        rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid);
-       if (rc != 0 && rc != -ENOENT)
+       if (rc == -ENOENT)
+               dti->dti_fid = *first_fid;
+       else if (rc < 0)
                GOTO(out_los, rc);
 
-       /* initialize data allowing to generate new fids,
-        * literally we need a sequence */
-       if (rc == 0)
-               o = ls_locate(env, ls, &dti->dti_fid);
-       else
-               o = ls_locate(env, ls, first_fid);
+       o = ls_locate(env, ls, &dti->dti_fid);
        if (IS_ERR(o))
                GOTO(out_los, rc = PTR_ERR(o));
-
-       dt_write_lock(env, o, 0);
+       LASSERT(fid_seq(&dti->dti_fid) == fid_seq(first_fid));
        if (!dt_object_exists(o)) {
                LASSERT(rc == -ENOENT);
 
@@ -681,19 +750,12 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                        GOTO(out_trans, rc);
 
                rc = dt_declare_insert(env, root,
-                                      (const struct dt_rec *)lu_object_fid(&o->do_lu),
+                                      (const struct dt_rec *)&dti->dti_fid,
                                       (const struct dt_key *)dti->dti_buf,
                                       th);
                if (rc)
                        GOTO(out_trans, rc);
 
-               dti->dti_lb.lb_buf = NULL;
-               dti->dti_lb.lb_len = sizeof(dti->dti_lma);
-               rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA,
-                                         0, th);
-               if (rc)
-                       GOTO(out_trans, rc);
-
                rc = dt_declare_record_write(env, o, sizeof(losd), 0, th);
                if (rc)
                        GOTO(out_trans, rc);
@@ -702,11 +764,15 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                if (rc)
                        GOTO(out_trans, rc);
 
-               LASSERT(!dt_object_exists(o));
-               rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, th);
+               dt_write_lock(env, root, 0);
+               dt_write_lock(env, o, 0);
+               if (dt_object_exists(o))
+                       GOTO(out_lock, rc = 0);
+
+               rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof,
+                              th);
                if (rc)
-                       GOTO(out_trans, rc);
-               LASSERT(dt_object_exists(o));
+                       GOTO(out_lock, rc);
 
                losd.lso_magic = cpu_to_le32(LOS_MAGIC);
                losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1);
@@ -716,7 +782,7 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                dti->dti_lb.lb_len = sizeof(losd);
                rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
                if (rc)
-                       GOTO(out_trans, rc);
+                       GOTO(out_lock, rc);
 #if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0)
 #error "fix this before release"
 #endif
@@ -725,29 +791,32 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                 * proper hanlding of named vs no-name objects.
                 * Llog objects have name always as they are placed in O/d/...
                 */
-               if (fid_seq(lu_object_fid(&o->do_lu)) != FID_SEQ_LLOG) {
+               if (fid_seq(&dti->dti_fid) != FID_SEQ_LLOG) {
                        rc = dt_insert(env, root,
-                                      (const struct dt_rec *)first_fid,
+                                      (const struct dt_rec *)&dti->dti_fid,
                                       (const struct dt_key *)dti->dti_buf,
                                       th, BYPASS_CAPA, 1);
                        if (rc)
-                               GOTO(out_trans, rc);
+                               GOTO(out_lock, rc);
                }
+out_lock:
+               dt_write_unlock(env, o);
+               dt_write_unlock(env, root);
 out_trans:
                dt_trans_stop(env, dev, th);
        } else {
                dti->dti_off = 0;
                dti->dti_lb.lb_buf = &losd;
                dti->dti_lb.lb_len = sizeof(losd);
+               dt_read_lock(env, o, 0);
                rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
+               dt_read_unlock(env, o);
                if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
                        CERROR("local storage file "DFID" is corrupted\n",
                               PFID(first_fid));
                        rc = -EINVAL;
                }
        }
-out_lock:
-       dt_write_unlock(env, o);
 out_los:
        if (root != NULL && !IS_ERR(root))
                lu_object_put_nocache(env, &root->do_lu);
index 7981fd5..7c52ddc 100644 (file)
@@ -2143,13 +2143,12 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        LASSERT(oh->ot_handle == NULL);
        LASSERT(inode);
 
-       osd_trans_declare_op(env, oh, OSD_OT_DELETE,
+       osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
                             osd_dto_credits_noquota[DTO_OBJECT_DELETE]);
        /* Recycle idle OI leaf may cause additional three OI blocks
         * to be changed. */
-       osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
+       osd_trans_declare_op(env, oh, OSD_OT_DELETE,
                             osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3);
-
        /* one less inode */
        rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh,
                                   false, true, NULL, false);
index 695e883..091feb6 100644 (file)
@@ -1048,17 +1048,7 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
         oh = container_of0(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_handle == NULL);
 
-        /* XXX: size == 0 or INT_MAX indicating a catalog header update or
-         *      llog write, see comment in mdd_declare_llog_record().
-         *
-         *      This hack will be removed with llog over OSD landing
-         */
-        if (size == DECLARE_LLOG_REWRITE)
-                credits = 2;
-        else if (size == DECLARE_LLOG_WRITE)
-                credits = 6;
-        else
-                credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
+       credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
 
        osd_trans_declare_op(env, oh, OSD_OT_WRITE, credits);
 
index 2661d97..77acd9e 100644 (file)
@@ -1096,10 +1096,6 @@ static const struct osd_lf_map osd_lf_maps[] = {
        { "ROOT", { FID_SEQ_ROOT, 1, 0 },
                OLF_SCAN_SUBITEMS | OLF_HIDE_FID, osd_ios_ROOT_scan, NULL },
 
-       /* capa_keys */
-       { CAPA_KEYS, { FID_SEQ_LOCAL_FILE, MDD_CAPA_KEYS_OID, 0 }, 0,
-               NULL, NULL },
-
        /* changelog_catalog */
        { CHANGELOG_CATALOG, { 0, 0, 0 }, 0, NULL, NULL },
 
@@ -1155,7 +1151,7 @@ static const struct osd_lf_map osd_lf_maps[] = {
                OLF_SHOW_NAME, NULL, NULL },
 
        /* lfsck_namespace */
-       { "lfsck_namespace", { FID_SEQ_LOCAL_FILE, LFSCK_NAMESPACE_OID, 0 }, 0,
+       { "lfsck_namespace", { FID_SEQ_LOCAL_FILE, LFSCK_BOOKMARK_OID, 0 }, 0,
                NULL, NULL },
 
        /* OBJECTS, upgrade from old device */
index e059cf7..e6a02a7 100644 (file)
@@ -96,13 +96,11 @@ static const struct named_oid oids[] = {
        { MGS_CONFIGS_OID,              NULL /*MOUNT_CONFIGS_DIR*/ },
        { FID_SEQ_SRV_OID,              "seq_srv" },
        { FID_SEQ_CTL_OID,              "seq_ctl" },
-       { MDD_CAPA_KEYS_OID,            NULL /*CAPA_KEYS*/ },
        { FLD_INDEX_OID,                "fld" },
        { MDD_LOV_OBJ_OID,              LOV_OBJID },
        { OFD_HEALTH_CHECK_OID,         HEALTH_CHECK },
        { ACCT_USER_OID,                "acct_usr_inode" },
        { ACCT_GROUP_OID,               "acct_grp_inode" },
-       { MDD_ORPHAN_OID,               NULL },
        { 0,                            NULL }
 };