Whamcloud - gitweb
LU-2886 mdd: create local files using local_storage lib
[fs/lustre-release.git] / lustre / mdd / mdd_device.c
index 88d3b56..bd1e80a 100644 (file)
@@ -160,32 +160,6 @@ static struct lu_device *mdd_device_fini(const struct lu_env *env,
        return NULL;
 }
 
-static void mdd_changelog_fini(const struct lu_env *env,
-                               struct mdd_device *mdd);
-
-static void mdd_device_shutdown(const struct lu_env *env,
-                                struct mdd_device *m, struct lustre_cfg *cfg)
-{
-        ENTRY;
-        if (m->mdd_dot_lustre_objs.mdd_obf)
-                mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
-        if (m->mdd_dot_lustre)
-                mdd_object_put(env, m->mdd_dot_lustre);
-        orph_index_fini(env, m);
-        if (m->mdd_capa != NULL) {
-                lu_object_put(env, &m->mdd_capa->do_lu);
-                m->mdd_capa = NULL;
-        }
-       lu_site_purge(env, m->mdd_md_dev.md_lu_dev.ld_site, -1);
-        /* remove upcall device*/
-        md_upcall_fini(&m->mdd_md_dev);
-
-       if (m->mdd_child_exp)
-               obd_disconnect(m->mdd_child_exp);
-
-        EXIT;
-}
-
 static int changelog_init_cb(const struct lu_env *env, struct llog_handle *llh,
                             struct llog_rec_hdr *hdr, void *data)
 {
@@ -564,34 +538,6 @@ int mdd_changelog_write_header(const struct lu_env *env,
        RETURN(rc);
 }
 
-/**
- * Create ".lustre" directory.
- */
-static int create_dot_lustre_dir(const struct lu_env *env, struct mdd_device *m)
-{
-        struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
-        struct md_object *mdo;
-        int rc;
-
-        memcpy(fid, &LU_DOT_LUSTRE_FID, sizeof(struct lu_fid));
-        mdo = llo_store_create_index(env, &m->mdd_md_dev, m->mdd_child,
-                                     mdd_root_dir_name, dot_lustre_name,
-                                     fid, &dt_directory_features);
-        /* .lustre dir may be already present */
-        if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) {
-                rc = PTR_ERR(mdo);
-                CERROR("creating obj [%s] fid = "DFID" rc = %d\n",
-                        dot_lustre_name, PFID(fid), rc);
-               return rc;
-        }
-
-        if (!IS_ERR(mdo))
-                lu_object_put(env, &mdo->mo_lu);
-
-        return 0;
-}
-
-
 static int dot_lustre_mdd_permission(const struct lu_env *env,
                                      struct md_object *pobj,
                                      struct md_object *cobj,
@@ -1037,61 +983,94 @@ static struct md_dir_operations mdd_obf_dir_ops = {
  */
 static int mdd_obf_setup(const struct lu_env *env, struct mdd_device *m)
 {
-        struct mdd_object *mdd_obf;
-        struct lu_object *obf_lu_obj;
-        int rc = 0;
+       struct mdd_object *mdd_obf;
 
-        m->mdd_dot_lustre_objs.mdd_obf = mdd_object_find(env, m,
-                                                         &LU_OBF_FID);
-        if (m->mdd_dot_lustre_objs.mdd_obf == NULL ||
-            IS_ERR(m->mdd_dot_lustre_objs.mdd_obf))
-                GOTO(out, rc = -ENOENT);
+       mdd_obf = mdd_object_find(env, m, &LU_OBF_FID);
+       if (mdd_obf == NULL || IS_ERR(mdd_obf))
+               return -ENOENT;
 
-        mdd_obf = m->mdd_dot_lustre_objs.mdd_obf;
-        mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops;
-        mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops;
-        /* Don't allow objects to be created in "fid" dir */
-        mdd_obf->mod_flags |= IMMUTE_OBJ;
+       m->mdd_dot_lustre_objs.mdd_obf = mdd_obf;
+       mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops;
+       mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops;
+       /* Don't allow objects to be created in "fid" dir */
+       mdd_obf->mod_flags |= IMMUTE_OBJ;
 
-        obf_lu_obj = mdd2lu_obj(mdd_obf);
-        obf_lu_obj->lo_header->loh_attr |= (LOHA_EXISTS | S_IFDIR);
+       mdd2lu_obj(mdd_obf)->lo_header->loh_attr |= LOHA_EXISTS | S_IFDIR;
+       return 0;
+}
 
-out:
-        return rc;
+static struct md_object *mdo_locate(const struct lu_env *env,
+                                   struct md_device *md,
+                                   const struct lu_fid *fid)
+{
+       struct lu_object *obj;
+       struct md_object *mdo;
+
+       obj = lu_object_find(env, &md->md_lu_dev, fid, NULL);
+       if (!IS_ERR(obj)) {
+               obj = lu_object_locate(obj->lo_header, md->md_lu_dev.ld_type);
+               LASSERT(obj != NULL);
+               mdo = lu2md(obj);
+       } else {
+               mdo = ERR_PTR(PTR_ERR(obj));
+       }
+       return mdo;
 }
 
 /** Setup ".lustre" directory object */
 static int mdd_dot_lustre_setup(const struct lu_env *env, struct mdd_device *m)
 {
-        struct dt_object *dt_dot_lustre;
-        struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
-        int rc;
-       ENTRY;
-
-        rc = create_dot_lustre_dir(env, m);
-        if (rc)
-                return rc;
+       struct md_object        *mdo;
+       struct lu_fid            fid;
+       int                      rc;
 
-        dt_dot_lustre = dt_store_open(env, m->mdd_child, mdd_root_dir_name,
-                                      dot_lustre_name, fid);
-        if (IS_ERR(dt_dot_lustre)) {
-                rc = PTR_ERR(dt_dot_lustre);
-                GOTO(out, rc);
-        }
+       ENTRY;
+       /* Create ".lustre" directory in ROOT. */
+       fid = LU_DOT_LUSTRE_FID;
+       rc = mdd_local_file_create(env, m, &m->mdd_root_fid,
+                                  dot_lustre_name,
+                                  S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO,
+                                  &fid);
+       if (rc < 0)
+               RETURN(rc);
+       mdo = mdo_locate(env, &m->mdd_md_dev, &fid);
+       if (IS_ERR(mdo))
+               RETURN(PTR_ERR(mdo));
+       LASSERT(lu_object_exists(&mdo->mo_lu));
 
-        /* references are released in mdd_device_shutdown() */
-        m->mdd_dot_lustre = lu2mdd_obj(lu_object_locate(dt_dot_lustre->do_lu.lo_header,
-                                                        &mdd_device_type));
+       m->mdd_dot_lustre = md2mdd_obj(mdo);
+       m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops;
+       m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops;
 
-        m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops;
-        m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops;
+       rc = mdd_obf_setup(env, m);
+       if (rc) {
+               CERROR("%s: error initializing \"fid\" object: rc = %d.\n",
+                      mdd2obd_dev(m)->obd_name, rc);
+               GOTO(out, rc);
+       }
+       RETURN(0);
+out:
+       mdd_object_put(env, m->mdd_dot_lustre);
+       m->mdd_dot_lustre = NULL;
+       return rc;
+}
 
-        rc = mdd_obf_setup(env, m);
-        if (rc)
-                CERROR("Error initializing \"fid\" object - %d.\n", rc);
+static void mdd_device_shutdown(const struct lu_env *env, struct mdd_device *m,
+                               struct lustre_cfg *cfg)
+{
+       mdd_lfsck_cleanup(env, m);
+       mdd_changelog_fini(env, m);
+       orph_index_fini(env, m);
+       if (m->mdd_dot_lustre_objs.mdd_obf)
+               mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
+       if (m->mdd_dot_lustre)
+               mdd_object_put(env, m->mdd_dot_lustre);
+       if (m->mdd_los != NULL)
+               local_oid_storage_fini(env, m->mdd_los);
+       lu_site_purge(env, mdd2lu_dev(m)->ld_site, ~0);
 
-out:
-        RETURN(rc);
+       if (m->mdd_child_exp)
+               obd_disconnect(m->mdd_child_exp);
 }
 
 static int mdd_process_config(const struct lu_env *env,
@@ -1118,14 +1097,12 @@ static int mdd_process_config(const struct lu_env *env,
                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
                 if (rc)
                         GOTO(out, rc);
-                dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
+               dt_conf_get(env, dt, &m->mdd_dt_conf);
                 break;
         case LCFG_CLEANUP:
-               mdd_lfsck_cleanup(env, m);
-               mdd_changelog_fini(env, m);
                rc = next->ld_ops->ldo_process_config(env, next, cfg);
                lu_dev_del_linkage(d->ld_site, d);
-                mdd_device_shutdown(env, m, cfg);
+               mdd_device_shutdown(env, m, cfg);
                break;
         default:
                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
@@ -1153,38 +1130,38 @@ static int mdd_recovery_complete(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_find_or_create_root(const struct lu_env *env,
-                                  struct mdd_device *mdd)
+int mdd_local_file_create(const struct lu_env *env, struct mdd_device *mdd,
+                         const struct lu_fid *pfid, const char *name, __u32 mode,
+                         struct lu_fid *fid)
 {
-       struct dt_object        *root;
-       struct md_object        *mroot;
-       struct lu_fid           fid;
-       int                     rc = 0;
+       struct dt_object        *parent, *dto;
+       int                      rc;
 
        ENTRY;
 
-       /* Check if the "ROOT" entry exists already */
-       root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name,
-                            &fid);
-       if (!IS_ERR(root)) {
-               lu_object_put(env, &root->do_lu);
-               GOTO(out, rc = 0);
-       }
-
-       lu_root_fid(&fid);
-       /* New Filesystem, create /ROOT */
-       mroot = llo_store_create_index(env, &mdd->mdd_md_dev, mdd->mdd_bottom,
-                                      "", mdd_root_dir_name, &fid,
-                                      &dt_directory_features);
-       if (IS_ERR(mroot))
-               GOTO(out, rc = PTR_ERR(mroot));
-
-       lu_object_put(env, &mroot->mo_lu);
-out:
-       if (rc == 0)
-               mdd->mdd_root_fid = fid;
+       LASSERT(!fid_is_zero(pfid));
+       parent = dt_locate(env, mdd->mdd_bottom, pfid);
+       if (unlikely(IS_ERR(parent)))
+               RETURN(PTR_ERR(parent));
 
-       RETURN(rc);
+       /* create local file/dir, if @fid is passed then try to use it */
+       if (fid_is_zero(fid))
+               dto = local_file_find_or_create(env, mdd->mdd_los, parent,
+                                               name, mode);
+       else
+               dto = local_file_find_or_create_with_fid(env, mdd->mdd_bottom,
+                                                        fid, parent, name,
+                                                        mode);
+       if (IS_ERR(dto))
+               GOTO(out_put, rc = PTR_ERR(dto));
+       *fid = *lu_object_fid(&dto->do_lu);
+       /* since stack is not fully set up the local_storage uses own stack
+        * and we should drop its object from cache */
+       lu_object_put_nocache(env, &dto->do_lu);
+       EXIT;
+out_put:
+       lu_object_put(env, &parent->do_lu);
+       return 0;
 }
 
 static int mdd_prepare(const struct lu_env *env,
@@ -1193,62 +1170,85 @@ static int mdd_prepare(const struct lu_env *env,
 {
        struct mdd_device       *mdd = lu2mdd_dev(cdev);
        struct lu_device        *next = &mdd->mdd_child->dd_lu_dev;
-       struct dt_object        *root;
        struct lu_fid            fid;
-       int                     rc;
+       int                      rc;
 
-        ENTRY;
+       ENTRY;
 
        rc = next->ld_ops->ldo_prepare(env, cdev, next);
        if (rc)
-               GOTO(out, rc);
+               RETURN(rc);
+
+       /* Setup local dirs */
+       fid.f_seq = FID_SEQ_LOCAL_NAME;
+       fid.f_oid = 1;
+       fid.f_ver = 0;
+       rc = local_oid_storage_init(env, mdd->mdd_bottom, &fid,
+                                   &mdd->mdd_los);
+       if (rc)
+               RETURN(rc);
 
        rc = dt_root_get(env, mdd->mdd_child, &mdd->mdd_local_root_fid);
-       if (rc != 0)
-               GOTO(out, rc);
+       if (rc < 0)
+               GOTO(out_los, rc);
 
        if (mdd_seq_site(mdd)->ss_node_id == 0) {
-               rc = mdd_find_or_create_root(env, mdd);
+               lu_root_fid(&fid);
+               rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
+                                          mdd_root_dir_name, S_IFDIR |
+                                          S_IRUGO | S_IWUSR | S_IXUGO, &fid);
                if (rc != 0) {
                        CERROR("%s: create root fid failed: rc = %d\n",
                               mdd2obd_dev(mdd)->obd_name, rc);
-                       GOTO(out, rc);
+                       GOTO(out_los, rc);
                }
+               mdd->mdd_root_fid = fid;
 
                rc = mdd_dot_lustre_setup(env, mdd);
                if (rc != 0) {
                        CERROR("%s: initializing .lustre failed: rc = %d\n",
                               mdd2obd_dev(mdd)->obd_name, rc);
-                       GOTO(out, rc);
+                       GOTO(out_los, rc);
                }
 
                rc = mdd_compat_fixes(env, mdd);
                if (rc)
-                       GOTO(out, rc);
+                       GOTO(out_los, rc);
 
        }
 
        rc = orph_index_init(env, mdd);
-       if (rc != 0)
-                GOTO(out, rc);
-
-        /* we use capa file to declare llog changes,
-         * will be fixed with new llog in 2.3 */
-       root = dt_store_open(env, mdd->mdd_child, "", CAPA_KEYS, &fid);
-       if (IS_ERR(root))
-               GOTO(out, rc = PTR_ERR(root));
-
-       mdd->mdd_capa = root;
+       if (rc < 0)
+               GOTO(out_dot, rc);
 
        rc = mdd_changelog_init(env, mdd);
-       if (rc != 0)
-               GOTO(out, rc);
+       if (rc != 0) {
+               CERROR("%s: failed to initialize changelog: rc = %d\n",
+                      mdd2obd_dev(mdd)->obd_name, rc);
+               GOTO(out_orph, rc);
+       }
 
        rc = mdd_lfsck_setup(env, mdd);
-       if (rc != 0)
+       if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       mdd2obd_dev(mdd)->obd_name, rc);
-out:
+               GOTO(out_changelog, rc);
+       }
+       RETURN(0);
+out_changelog:
+       mdd_changelog_fini(env, mdd);
+out_orph:
+       orph_index_fini(env, mdd);
+out_dot:
+       if (mdd_seq_site(mdd)->ss_node_id == 0) {
+               mdd_object_put(env, mdd->mdd_dot_lustre);
+               mdd->mdd_dot_lustre = NULL;
+               mdd_object_put(env, mdd->mdd_dot_lustre_objs.mdd_obf);
+               mdd->mdd_dot_lustre_objs.mdd_obf = NULL;
+       }
+out_los:
+       local_oid_storage_fini(env, mdd->mdd_los);
+       mdd->mdd_los = NULL;
        return rc;
 }
 
@@ -1736,10 +1736,7 @@ static void mdd_key_fini(const struct lu_context *ctx,
                          struct lu_context_key *key, void *data)
 {
         struct mdd_thread_info *info = data;
-        if (info->mti_max_lmm != NULL)
-                OBD_FREE(info->mti_max_lmm, info->mti_max_lmm_size);
-        if (info->mti_max_cookie != NULL)
-                OBD_FREE(info->mti_max_cookie, info->mti_max_cookie_size);
+
        lu_buf_free(&info->mti_big_buf);
        lu_buf_free(&info->mti_link_buf);
 
@@ -1749,32 +1746,6 @@ static void mdd_key_fini(const struct lu_context *ctx,
 /* context key: mdd_thread_key */
 LU_CONTEXT_KEY_DEFINE(mdd, LCT_MD_THREAD);
 
-static struct lu_local_obj_desc llod_capa_key = {
-        .llod_name      = CAPA_KEYS,
-        .llod_oid       = MDD_CAPA_KEYS_OID,
-        .llod_is_index  = 0,
-};
-
-static struct lu_local_obj_desc llod_mdd_orphan = {
-        .llod_name      = orph_index_name,
-        .llod_oid       = MDD_ORPHAN_OID,
-        .llod_is_index  = 1,
-        .llod_feat      = &dt_directory_features,
-};
-
-static struct lu_local_obj_desc llod_lfsck_bookmark = {
-       .llod_name      = lfsck_bookmark_name,
-       .llod_oid       = LFSCK_BOOKMARK_OID,
-       .llod_is_index  = 0,
-};
-
-static struct lu_local_obj_desc llod_lfsck_namespace = {
-       .llod_name      = lfsck_namespace_name,
-       .llod_oid       = LFSCK_NAMESPACE_OID,
-       .llod_is_index  = 1,
-       .llod_feat      = &dt_lfsck_features,
-};
-
 static int __init mdd_mod_init(void)
 {
        struct lprocfs_static_vars lvars;
@@ -1791,11 +1762,6 @@ static int __init mdd_mod_init(void)
        changelog_orig_logops.lop_add = llog_cat_add_rec;
        changelog_orig_logops.lop_declare_add = llog_cat_declare_add_rec;
 
-       llo_local_obj_register(&llod_capa_key);
-       llo_local_obj_register(&llod_mdd_orphan);
-       llo_local_obj_register(&llod_lfsck_bookmark);
-       llo_local_obj_register(&llod_lfsck_namespace);
-
        rc = class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
                                 LUSTRE_MDD_NAME, &mdd_device_type);
        if (rc)
@@ -1805,11 +1771,6 @@ static int __init mdd_mod_init(void)
 
 static void __exit mdd_mod_exit(void)
 {
-       llo_local_obj_unregister(&llod_capa_key);
-       llo_local_obj_unregister(&llod_mdd_orphan);
-       llo_local_obj_unregister(&llod_lfsck_bookmark);
-       llo_local_obj_unregister(&llod_lfsck_namespace);
-
        class_unregister_type(LUSTRE_MDD_NAME);
        lu_kmem_fini(mdd_caches);
 }