struct dt_object *parent,
const char *name, __u32 mode,
const struct dt_index_features *ft);
+int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
+ struct dt_object *parent, const char *name);
static inline int dt_object_lock(const struct lu_env *env,
struct dt_object *o, struct lustre_handle *lh,
FID_SEQ_CTL_OID = 4UL,
FID_SEQ_SRV_OID = 5UL,
/** \see mdd_mod_init */
- MDD_ROOT_INDEX_OID = 6UL,
- MDD_ORPHAN_OID = 7UL,
+ MDD_ROOT_INDEX_OID = 6UL, /* deprecated in 2.4 */
+ MDD_ORPHAN_OID = 7UL, /* deprecated in 2.4 */
MDD_LOV_OBJ_OID = 8UL,
MDD_CAPA_KEYS_OID = 9UL,
/** \see mdt_mod_init */
* super-class definitions.
*/
#include <dt_object.h>
-#include <lvfs.h>
-
-/* LU-1051, temperary solution to reduce llog credits */
-#define DECLARE_LLOG_REWRITE 0
-#define DECLARE_LLOG_WRITE INT_MAX
struct md_device;
struct md_device_operations;
return NULL;
}
-static void mdd_changelog_fini(const struct lu_env *env,
- struct mdd_device *mdd);
-
-static void mdd_device_shutdown(const struct lu_env *env,
- struct mdd_device *m, struct lustre_cfg *cfg)
-{
- ENTRY;
- if (m->mdd_dot_lustre_objs.mdd_obf)
- mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
- if (m->mdd_dot_lustre)
- mdd_object_put(env, m->mdd_dot_lustre);
- orph_index_fini(env, m);
- if (m->mdd_capa != NULL) {
- lu_object_put(env, &m->mdd_capa->do_lu);
- m->mdd_capa = NULL;
- }
- lu_site_purge(env, m->mdd_md_dev.md_lu_dev.ld_site, -1);
- /* remove upcall device*/
- md_upcall_fini(&m->mdd_md_dev);
-
- if (m->mdd_child_exp)
- obd_disconnect(m->mdd_child_exp);
-
- EXIT;
-}
-
static int changelog_init_cb(const struct lu_env *env, struct llog_handle *llh,
struct llog_rec_hdr *hdr, void *data)
{
RETURN(rc);
}
-/**
- * Create ".lustre" directory.
- */
-static int create_dot_lustre_dir(const struct lu_env *env, struct mdd_device *m)
-{
- struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
- struct md_object *mdo;
- int rc;
-
- memcpy(fid, &LU_DOT_LUSTRE_FID, sizeof(struct lu_fid));
- mdo = llo_store_create_index(env, &m->mdd_md_dev, m->mdd_child,
- mdd_root_dir_name, dot_lustre_name,
- fid, &dt_directory_features);
- /* .lustre dir may be already present */
- if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) {
- rc = PTR_ERR(mdo);
- CERROR("creating obj [%s] fid = "DFID" rc = %d\n",
- dot_lustre_name, PFID(fid), rc);
- return rc;
- }
-
- if (!IS_ERR(mdo))
- lu_object_put(env, &mdo->mo_lu);
-
- return 0;
-}
-
-
static int dot_lustre_mdd_permission(const struct lu_env *env,
struct md_object *pobj,
struct md_object *cobj,
*/
static int mdd_obf_setup(const struct lu_env *env, struct mdd_device *m)
{
- struct mdd_object *mdd_obf;
- struct lu_object *obf_lu_obj;
- int rc = 0;
+ struct mdd_object *mdd_obf;
- m->mdd_dot_lustre_objs.mdd_obf = mdd_object_find(env, m,
- &LU_OBF_FID);
- if (m->mdd_dot_lustre_objs.mdd_obf == NULL ||
- IS_ERR(m->mdd_dot_lustre_objs.mdd_obf))
- GOTO(out, rc = -ENOENT);
+ mdd_obf = mdd_object_find(env, m, &LU_OBF_FID);
+ if (mdd_obf == NULL || IS_ERR(mdd_obf))
+ return -ENOENT;
- mdd_obf = m->mdd_dot_lustre_objs.mdd_obf;
- mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops;
- mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops;
- /* Don't allow objects to be created in "fid" dir */
- mdd_obf->mod_flags |= IMMUTE_OBJ;
+ m->mdd_dot_lustre_objs.mdd_obf = mdd_obf;
+ mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops;
+ mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops;
+ /* Don't allow objects to be created in "fid" dir */
+ mdd_obf->mod_flags |= IMMUTE_OBJ;
- obf_lu_obj = mdd2lu_obj(mdd_obf);
- obf_lu_obj->lo_header->loh_attr |= (LOHA_EXISTS | S_IFDIR);
+ mdd2lu_obj(mdd_obf)->lo_header->loh_attr |= LOHA_EXISTS | S_IFDIR;
+ return 0;
+}
-out:
- return rc;
+static struct md_object *mdo_locate(const struct lu_env *env,
+ struct md_device *md,
+ const struct lu_fid *fid)
+{
+ struct lu_object *obj;
+ struct md_object *mdo;
+
+ obj = lu_object_find(env, &md->md_lu_dev, fid, NULL);
+ if (!IS_ERR(obj)) {
+ obj = lu_object_locate(obj->lo_header, md->md_lu_dev.ld_type);
+ LASSERT(obj != NULL);
+ mdo = lu2md(obj);
+ } else {
+ mdo = ERR_PTR(PTR_ERR(obj));
+ }
+ return mdo;
}
/** Setup ".lustre" directory object */
static int mdd_dot_lustre_setup(const struct lu_env *env, struct mdd_device *m)
{
- struct dt_object *dt_dot_lustre;
- struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
- int rc;
- ENTRY;
-
- rc = create_dot_lustre_dir(env, m);
- if (rc)
- return rc;
+ struct md_object *mdo;
+ struct lu_fid fid;
+ int rc;
- dt_dot_lustre = dt_store_open(env, m->mdd_child, mdd_root_dir_name,
- dot_lustre_name, fid);
- if (IS_ERR(dt_dot_lustre)) {
- rc = PTR_ERR(dt_dot_lustre);
- GOTO(out, rc);
- }
+ ENTRY;
+ /* Create ".lustre" directory in ROOT. */
+ fid = LU_DOT_LUSTRE_FID;
+ rc = mdd_local_file_create(env, m, &m->mdd_root_fid,
+ dot_lustre_name,
+ S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO,
+ &fid);
+ if (rc < 0)
+ RETURN(rc);
+ mdo = mdo_locate(env, &m->mdd_md_dev, &fid);
+ if (IS_ERR(mdo))
+ RETURN(PTR_ERR(mdo));
+ LASSERT(lu_object_exists(&mdo->mo_lu));
- /* references are released in mdd_device_shutdown() */
- m->mdd_dot_lustre = lu2mdd_obj(lu_object_locate(dt_dot_lustre->do_lu.lo_header,
- &mdd_device_type));
+ m->mdd_dot_lustre = md2mdd_obj(mdo);
+ m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops;
+ m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops;
- m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops;
- m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops;
+ rc = mdd_obf_setup(env, m);
+ if (rc) {
+ CERROR("%s: error initializing \"fid\" object: rc = %d.\n",
+ mdd2obd_dev(m)->obd_name, rc);
+ GOTO(out, rc);
+ }
+ RETURN(0);
+out:
+ mdd_object_put(env, m->mdd_dot_lustre);
+ m->mdd_dot_lustre = NULL;
+ return rc;
+}
- rc = mdd_obf_setup(env, m);
- if (rc)
- CERROR("Error initializing \"fid\" object - %d.\n", rc);
+static void mdd_device_shutdown(const struct lu_env *env, struct mdd_device *m,
+ struct lustre_cfg *cfg)
+{
+ mdd_lfsck_cleanup(env, m);
+ mdd_changelog_fini(env, m);
+ orph_index_fini(env, m);
+ if (m->mdd_dot_lustre_objs.mdd_obf)
+ mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
+ if (m->mdd_dot_lustre)
+ mdd_object_put(env, m->mdd_dot_lustre);
+ if (m->mdd_los != NULL)
+ local_oid_storage_fini(env, m->mdd_los);
+ lu_site_purge(env, mdd2lu_dev(m)->ld_site, ~0);
-out:
- RETURN(rc);
+ if (m->mdd_child_exp)
+ obd_disconnect(m->mdd_child_exp);
}
static int mdd_process_config(const struct lu_env *env,
rc = next->ld_ops->ldo_process_config(env, next, cfg);
if (rc)
GOTO(out, rc);
- dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
+ dt_conf_get(env, dt, &m->mdd_dt_conf);
break;
case LCFG_CLEANUP:
- mdd_lfsck_cleanup(env, m);
- mdd_changelog_fini(env, m);
rc = next->ld_ops->ldo_process_config(env, next, cfg);
lu_dev_del_linkage(d->ld_site, d);
- mdd_device_shutdown(env, m, cfg);
+ mdd_device_shutdown(env, m, cfg);
break;
default:
rc = next->ld_ops->ldo_process_config(env, next, cfg);
RETURN(rc);
}
-static int mdd_find_or_create_root(const struct lu_env *env,
- struct mdd_device *mdd)
+int mdd_local_file_create(const struct lu_env *env, struct mdd_device *mdd,
+ const struct lu_fid *pfid, const char *name, __u32 mode,
+ struct lu_fid *fid)
{
- struct dt_object *root;
- struct md_object *mroot;
- struct lu_fid fid;
- int rc = 0;
+ struct dt_object *parent, *dto;
+ int rc;
ENTRY;
- /* Check if the "ROOT" entry exists already */
- root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name,
- &fid);
- if (!IS_ERR(root)) {
- lu_object_put(env, &root->do_lu);
- GOTO(out, rc = 0);
- }
-
- lu_root_fid(&fid);
- /* New Filesystem, create /ROOT */
- mroot = llo_store_create_index(env, &mdd->mdd_md_dev, mdd->mdd_bottom,
- "", mdd_root_dir_name, &fid,
- &dt_directory_features);
- if (IS_ERR(mroot))
- GOTO(out, rc = PTR_ERR(mroot));
-
- lu_object_put(env, &mroot->mo_lu);
-out:
- if (rc == 0)
- mdd->mdd_root_fid = fid;
+ LASSERT(!fid_is_zero(pfid));
+ parent = dt_locate(env, mdd->mdd_bottom, pfid);
+ if (unlikely(IS_ERR(parent)))
+ RETURN(PTR_ERR(parent));
- RETURN(rc);
+ /* create local file/dir, if @fid is passed then try to use it */
+ if (fid_is_zero(fid))
+ dto = local_file_find_or_create(env, mdd->mdd_los, parent,
+ name, mode);
+ else
+ dto = local_file_find_or_create_with_fid(env, mdd->mdd_bottom,
+ fid, parent, name,
+ mode);
+ if (IS_ERR(dto))
+ GOTO(out_put, rc = PTR_ERR(dto));
+ *fid = *lu_object_fid(&dto->do_lu);
+ /* since stack is not fully set up the local_storage uses own stack
+ * and we should drop its object from cache */
+ lu_object_put_nocache(env, &dto->do_lu);
+ EXIT;
+out_put:
+ lu_object_put(env, &parent->do_lu);
+ return 0;
}
static int mdd_prepare(const struct lu_env *env,
{
struct mdd_device *mdd = lu2mdd_dev(cdev);
struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
- struct dt_object *root;
struct lu_fid fid;
- int rc;
+ int rc;
- ENTRY;
+ ENTRY;
rc = next->ld_ops->ldo_prepare(env, cdev, next);
if (rc)
- GOTO(out, rc);
+ RETURN(rc);
+
+ /* Setup local dirs */
+ fid.f_seq = FID_SEQ_LOCAL_NAME;
+ fid.f_oid = 1;
+ fid.f_ver = 0;
+ rc = local_oid_storage_init(env, mdd->mdd_bottom, &fid,
+ &mdd->mdd_los);
+ if (rc)
+ RETURN(rc);
rc = dt_root_get(env, mdd->mdd_child, &mdd->mdd_local_root_fid);
- if (rc != 0)
- GOTO(out, rc);
+ if (rc < 0)
+ GOTO(out_los, rc);
if (mdd_seq_site(mdd)->ss_node_id == 0) {
- rc = mdd_find_or_create_root(env, mdd);
+ lu_root_fid(&fid);
+ rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
+ mdd_root_dir_name, S_IFDIR |
+ S_IRUGO | S_IWUSR | S_IXUGO, &fid);
if (rc != 0) {
CERROR("%s: create root fid failed: rc = %d\n",
mdd2obd_dev(mdd)->obd_name, rc);
- GOTO(out, rc);
+ GOTO(out_los, rc);
}
+ mdd->mdd_root_fid = fid;
rc = mdd_dot_lustre_setup(env, mdd);
if (rc != 0) {
CERROR("%s: initializing .lustre failed: rc = %d\n",
mdd2obd_dev(mdd)->obd_name, rc);
- GOTO(out, rc);
+ GOTO(out_los, rc);
}
rc = mdd_compat_fixes(env, mdd);
if (rc)
- GOTO(out, rc);
+ GOTO(out_los, rc);
}
rc = orph_index_init(env, mdd);
- if (rc != 0)
- GOTO(out, rc);
-
- /* we use capa file to declare llog changes,
- * will be fixed with new llog in 2.3 */
- root = dt_store_open(env, mdd->mdd_child, "", CAPA_KEYS, &fid);
- if (IS_ERR(root))
- GOTO(out, rc = PTR_ERR(root));
-
- mdd->mdd_capa = root;
+ if (rc < 0)
+ GOTO(out_dot, rc);
rc = mdd_changelog_init(env, mdd);
- if (rc != 0)
- GOTO(out, rc);
+ if (rc != 0) {
+ CERROR("%s: failed to initialize changelog: rc = %d\n",
+ mdd2obd_dev(mdd)->obd_name, rc);
+ GOTO(out_orph, rc);
+ }
rc = mdd_lfsck_setup(env, mdd);
- if (rc != 0)
+ if (rc != 0) {
CERROR("%s: failed to initialize lfsck: rc = %d\n",
mdd2obd_dev(mdd)->obd_name, rc);
-out:
+ GOTO(out_changelog, rc);
+ }
+ RETURN(0);
+out_changelog:
+ mdd_changelog_fini(env, mdd);
+out_orph:
+ orph_index_fini(env, mdd);
+out_dot:
+ if (mdd_seq_site(mdd)->ss_node_id == 0) {
+ mdd_object_put(env, mdd->mdd_dot_lustre);
+ mdd->mdd_dot_lustre = NULL;
+ mdd_object_put(env, mdd->mdd_dot_lustre_objs.mdd_obf);
+ mdd->mdd_dot_lustre_objs.mdd_obf = NULL;
+ }
+out_los:
+ local_oid_storage_fini(env, mdd->mdd_los);
+ mdd->mdd_los = NULL;
return rc;
}
struct lu_context_key *key, void *data)
{
struct mdd_thread_info *info = data;
- if (info->mti_max_lmm != NULL)
- OBD_FREE(info->mti_max_lmm, info->mti_max_lmm_size);
- if (info->mti_max_cookie != NULL)
- OBD_FREE(info->mti_max_cookie, info->mti_max_cookie_size);
+
lu_buf_free(&info->mti_big_buf);
lu_buf_free(&info->mti_link_buf);
/* context key: mdd_thread_key */
LU_CONTEXT_KEY_DEFINE(mdd, LCT_MD_THREAD);
-static struct lu_local_obj_desc llod_capa_key = {
- .llod_name = CAPA_KEYS,
- .llod_oid = MDD_CAPA_KEYS_OID,
- .llod_is_index = 0,
-};
-
-static struct lu_local_obj_desc llod_mdd_orphan = {
- .llod_name = orph_index_name,
- .llod_oid = MDD_ORPHAN_OID,
- .llod_is_index = 1,
- .llod_feat = &dt_directory_features,
-};
-
-static struct lu_local_obj_desc llod_lfsck_bookmark = {
- .llod_name = lfsck_bookmark_name,
- .llod_oid = LFSCK_BOOKMARK_OID,
- .llod_is_index = 0,
-};
-
-static struct lu_local_obj_desc llod_lfsck_namespace = {
- .llod_name = lfsck_namespace_name,
- .llod_oid = LFSCK_NAMESPACE_OID,
- .llod_is_index = 1,
- .llod_feat = &dt_lfsck_features,
-};
-
static int __init mdd_mod_init(void)
{
struct lprocfs_static_vars lvars;
changelog_orig_logops.lop_add = llog_cat_add_rec;
changelog_orig_logops.lop_declare_add = llog_cat_declare_add_rec;
- llo_local_obj_register(&llod_capa_key);
- llo_local_obj_register(&llod_mdd_orphan);
- llo_local_obj_register(&llod_lfsck_bookmark);
- llo_local_obj_register(&llod_lfsck_namespace);
-
rc = class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
LUSTRE_MDD_NAME, &mdd_device_type);
if (rc)
static void __exit mdd_mod_exit(void)
{
- llo_local_obj_unregister(&llod_capa_key);
- llo_local_obj_unregister(&llod_mdd_orphan);
- llo_local_obj_unregister(&llod_lfsck_bookmark);
- llo_local_obj_unregister(&llod_lfsck_namespace);
-
class_unregister_type(LUSTRE_MDD_NAME);
lu_kmem_fini(mdd_caches);
}
RETURN(rc);
}
-int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
- int reclen, struct thandle *handle)
-{
- int rc;
-
- /* XXX: this is a temporary solution to declare llog changes
- * will be fixed in 2.3 with new llog implementation */
-
- LASSERT(mdd->mdd_capa);
-
- /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
- * have to set the parameter 'size' as INT_MAX or 0 to inform
- * OSD that this record write is for a llog write or catalog
- * header update, and osd declare function will reserve less
- * credits for optimization purpose.
- *
- * Reserve 6 blocks for a llog write, since the llog file is
- * usually small, reserve 2 blocks for catalog header update,
- * because we know for sure that catalog header is already
- * allocated.
- *
- * This hack should be removed in 2.3.
- */
-
- /* record itself */
- rc = dt_declare_record_write(env, mdd->mdd_capa,
- DECLARE_LLOG_WRITE, 0, handle);
- if (rc)
- return rc;
-
- /* header will be updated as well */
- rc = dt_declare_record_write(env, mdd->mdd_capa,
- DECLARE_LLOG_WRITE, 0, handle);
- if (rc)
- return rc;
-
- /* also we should be able to create new plain log */
- rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
- if (rc)
- return rc;
-
- /* new record referencing new plain llog */
- rc = dt_declare_record_write(env, mdd->mdd_capa,
- DECLARE_LLOG_WRITE, 0, handle);
- if (rc)
- return rc;
-
- /* catalog's header will be updated as well */
- rc = dt_declare_record_write(env, mdd->mdd_capa,
- DECLARE_LLOG_REWRITE, 0, handle);
-
- return rc;
-}
-
int mdd_declare_changelog_store(const struct lu_env *env,
struct mdd_device *mdd,
const struct lu_name *fname,
struct lu_fid mdd_local_root_fid;
struct dt_device_param mdd_dt_conf;
struct dt_object *mdd_orphans; /* PENDING directory */
- struct dt_object *mdd_capa;
cfs_proc_dir_entry_t *mdd_proc_entry;
struct mdd_changelog mdd_cl;
unsigned long mdd_atime_diff;
struct md_lfsck mdd_lfsck;
unsigned int mdd_sync_permission;
int mdd_connects;
+ struct local_oid_storage *mdd_los;
};
enum mod_flags {
int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm);
int mdd_readpage(const struct lu_env *env, struct md_object *obj,
const struct lu_rdpg *rdpg);
-int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
- int reclen, struct thandle *handle);
int mdd_declare_changelog_store(const struct lu_env *env,
struct mdd_device *mdd,
const struct lu_name *fname,
struct lu_object *mdd_object_alloc(const struct lu_env *env,
const struct lu_object_header *hdr,
struct lu_device *d);
+int mdd_local_file_create(const struct lu_env *env, struct mdd_device *mdd,
+ const struct lu_fid *pfid, const char *name,
+ __u32 mode, struct lu_fid *fid);
int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
struct thandle *handle);
return rc;
}
-static int mdd_declare_lfsck_namespace_unlink(const struct lu_env *env,
- struct mdd_device *mdd,
- struct dt_object *p,
- struct dt_object *c,
- const char *name,
- struct thandle *handle)
-{
- int rc;
-
- rc = dt_declare_delete(env, p, (const struct dt_key *)name, handle);
- if (rc != 0)
- return rc;
-
- rc = dt_declare_ref_del(env, c, handle);
- if (rc != 0)
- return rc;
-
- rc = dt_declare_destroy(env, c, handle);
- return rc;
-}
-
-static int mdd_lfsck_namespace_unlink(const struct lu_env *env,
- struct mdd_device *mdd,
- struct lfsck_component *com)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_fid *fid = &info->mti_fid;
- struct dt_object *child = com->lc_obj;
- struct dt_object *parent;
- struct thandle *handle;
- bool locked = false;
- int rc;
- ENTRY;
-
- parent = dt_store_resolve(env, mdd->mdd_bottom, "", fid);
- if (IS_ERR(parent))
- RETURN(rc = PTR_ERR(parent));
-
- if (!dt_try_as_dir(env, parent))
- GOTO(out, rc = -ENOTDIR);
-
- handle = dt_trans_create(env, mdd->mdd_bottom);
- if (IS_ERR(handle))
- GOTO(out, rc = PTR_ERR(handle));
-
- rc = mdd_declare_lfsck_namespace_unlink(env, mdd, parent, child,
- lfsck_namespace_name, handle);
- if (rc != 0)
- GOTO(stop, rc);
-
- rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
- if (rc != 0)
- GOTO(stop, rc);
-
- dt_write_lock(env, child, MOR_TGT_CHILD);
- locked = true;
- rc = dt_delete(env, parent, (struct dt_key *)lfsck_namespace_name,
- handle, BYPASS_CAPA);
- if (rc != 0)
- GOTO(stop, rc);
-
- rc = child->do_ops->do_ref_del(env, child, handle);
- if (rc != 0) {
- lu_local_obj_fid(fid, LFSCK_NAMESPACE_OID);
- rc = dt_insert(env, parent,
- (const struct dt_rec*)fid,
- (const struct dt_key *)lfsck_namespace_name,
- handle, BYPASS_CAPA, 1);
-
- GOTO(stop, rc);
- }
-
-
- rc = dt_destroy(env, child, handle);
-
- GOTO(stop, rc);
-
-stop:
- if (locked)
- dt_write_unlock(env, child);
-
- if (rc == 0) {
- lu_object_put(env, &child->do_lu);
- com->lc_obj = NULL;
- }
-
- dt_trans_stop(env, mdd->mdd_bottom, handle);
-
-out:
- lu_object_put(env, &parent->do_lu);
- return rc;
-}
-
static int mdd_lfsck_namespace_lookup(const struct lu_env *env,
struct lfsck_component *com,
const struct lu_fid *fid,
static int mdd_lfsck_namespace_reset(const struct lu_env *env,
struct lfsck_component *com, bool init)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_fid *fid = &info->mti_fid;
struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
struct mdd_device *mdd = mdd_lfsck2mdd(com->lc_lfsck);
- struct md_object *mdo;
- struct dt_object *dto;
+ struct dt_object *dto, *root;
int rc;
ENTRY;
ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
ns->ln_status = LS_INIT;
- rc = mdd_lfsck_namespace_unlink(env, mdd, com);
+ root = dt_locate(env, mdd->mdd_bottom, &mdd->mdd_local_root_fid);
+ if (unlikely(IS_ERR(root)))
+ GOTO(out, rc = PTR_ERR(root));
+
+ rc = local_object_unlink(env, mdd->mdd_bottom, root,
+ lfsck_namespace_name);
if (rc != 0)
GOTO(out, rc);
- lu_local_obj_fid(fid, LFSCK_NAMESPACE_OID);
- mdo = llo_store_create_index(env, &mdd->mdd_md_dev, mdd->mdd_bottom, "",
- lfsck_namespace_name, fid,
- &dt_lfsck_features);
- if (IS_ERR(mdo))
- GOTO(out, rc = PTR_ERR(mdo));
-
- lu_object_put(env, &mdo->mo_lu);
- dto = dt_store_open(env, mdd->mdd_bottom, "", lfsck_namespace_name, fid);
+ dto = local_index_find_or_create(env, mdd->mdd_los, root,
+ lfsck_namespace_name,
+ S_IFREG | S_IRUGO | S_IWUSR,
+ &dt_lfsck_features);
if (IS_ERR(dto))
GOTO(out, rc = PTR_ERR(dto));
- com->lc_obj = dto;
rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
if (rc != 0)
GOTO(out, rc);
+ com->lc_obj = dto;
rc = mdd_lfsck_namespace_store(env, com, true);
GOTO(out, rc);
-
out:
+ lu_object_put(env, &root->do_lu);
up_write(&com->lc_sem);
return rc;
}
static int mdd_lfsck_namespace_setup(const struct lu_env *env,
struct md_lfsck *lfsck)
{
- struct mdd_device *mdd = mdd_lfsck2mdd(lfsck);
- struct lfsck_component *com;
- struct lfsck_namespace *ns;
- struct dt_object *obj;
- int rc;
+ struct mdd_device *mdd = mdd_lfsck2mdd(lfsck);
+ struct lfsck_component *com;
+ struct lfsck_namespace *ns;
+ struct dt_object *obj, *root;
+ int rc;
ENTRY;
OBD_ALLOC_PTR(com);
if (com->lc_file_disk == NULL)
GOTO(out, rc = -ENOMEM);
- obj = dt_store_open(env, mdd->mdd_bottom, "", lfsck_namespace_name,
- &mdd_env_info(env)->mti_fid);
+ root = dt_locate(env, mdd->mdd_bottom, &mdd->mdd_local_root_fid);
+ if (unlikely(IS_ERR(root)))
+ GOTO(out, rc = PTR_ERR(root));
+
+ obj = local_index_find_or_create(env, mdd->mdd_los, root,
+ lfsck_namespace_name,
+ S_IFREG | S_IRUGO | S_IWUSR,
+ &dt_lfsck_features);
+ lu_object_put(env, &root->do_lu);
if (IS_ERR(obj))
GOTO(out, rc = PTR_ERR(obj));
ENTRY;
cfs_daemonize("lfsck");
- rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
+ rc = lu_env_init(&env, LCT_MD_THREAD);
if (rc != 0) {
CERROR("%s: LFSCK, fail to init env, rc = %d\n",
mdd_lfsck2name(lfsck), rc);
int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd)
{
- struct md_lfsck *lfsck = &mdd->mdd_lfsck;
- struct dt_object *obj;
- int rc;
+ struct md_lfsck *lfsck = &mdd->mdd_lfsck;
+ struct dt_object *obj;
+ struct lu_fid fid;
+ int rc;
+
ENTRY;
LASSERT(!lfsck->ml_initialized);
rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
if (rc != 0) {
if (rc == -ENOTSUPP)
- rc = 0;
-
- RETURN(rc);
+ RETURN(0);
+ GOTO(out, rc);
}
- obj = dt_store_open(env, mdd->mdd_bottom, "", lfsck_bookmark_name,
- &mdd_env_info(env)->mti_fid);
+ /* LFSCK bookmark */
+ fid_zero(&fid);
+ rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
+ lfsck_bookmark_name,
+ S_IFREG | S_IRUGO | S_IWUSR, &fid);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ obj = dt_locate(env, mdd->mdd_bottom, &fid);
if (IS_ERR(obj))
- RETURN(PTR_ERR(obj));
+ GOTO(out, rc = PTR_ERR(obj));
+ LASSERT(lu_object_exists(&obj->do_lu));
lfsck->ml_bookmark_obj = obj;
+
rc = mdd_lfsck_bookmark_load(env, lfsck);
if (rc == -ENODATA)
rc = mdd_lfsck_bookmark_init(env, lfsck);
if (rc != 0)
- RETURN(rc);
+ GOTO(out, rc);
rc = mdd_lfsck_namespace_setup(env, lfsck);
+ if (rc < 0)
+ GOTO(out, rc);
/* XXX: LFSCK components initialization to be added here. */
-
- RETURN(rc);
+ RETURN(0);
+out:
+ lu_object_put(env, &lfsck->ml_obj_oit->do_lu);
+ lfsck->ml_obj_oit = NULL;
+ return 0;
}
void mdd_lfsck_cleanup(const struct lu_env *env, struct mdd_device *mdd)
*/
int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
{
- struct lu_fid fid;
- struct dt_object *d;
- int rc = 0;
- ENTRY;
-
- d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
- if (!IS_ERR(d)) {
- mdd->mdd_orphans = d;
- if (!dt_try_as_dir(env, d)) {
- rc = -ENOTDIR;
- CERROR("\"%s\" is not an index! : rc = %d\n",
- orph_index_name, rc);
- }
- } else {
- CERROR("cannot find \"%s\" obj %d\n",
- orph_index_name, (int)PTR_ERR(d));
- rc = PTR_ERR(d);
- }
-
- RETURN(rc);
+ struct lu_fid fid;
+ struct dt_object *d;
+ int rc = 0;
+
+ ENTRY;
+
+ /* create PENDING dir */
+ fid_zero(&fid);
+ rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
+ orph_index_name, S_IFDIR | S_IRUGO |
+ S_IWUSR | S_IXUGO, &fid);
+ if (rc < 0)
+ RETURN(rc);
+
+ d = dt_locate(env, mdd->mdd_child, &fid);
+ if (IS_ERR(d))
+ RETURN(PTR_ERR(d));
+ LASSERT(lu_object_exists(&d->do_lu));
+ if (!dt_try_as_dir(env, d)) {
+ CERROR("%s: \"%s\" is not an index: rc = %d\n",
+ mdd2obd_dev(mdd)->obd_name, orph_index_name, rc);
+ lu_object_put(env, &d->do_lu);
+ RETURN(-ENOTDIR);
+ }
+ mdd->mdd_orphans = d;
+ RETURN(0);
}
void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
* \retval appropriate error otherwise.
*/
int dt_index_read(const struct lu_env *env, struct dt_device *dev,
- struct idx_info *ii, const struct lu_rdpg *rdpg)
+ struct idx_info *ii, const struct lu_rdpg *rdpg)
{
const struct dt_index_features *feat;
struct dt_object *obj;
if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0)
RETURN(-EFAULT);
- if (fid_seq(&ii->ii_fid) < FID_SEQ_SPECIAL)
- /* block access to local files */
- RETURN(-EPERM);
-
if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL)
/* we don't support directory transfer via OBD_IDX_READ for the
* time being */
RETURN(-EOPNOTSUPP);
+ if (!fid_is_quota(&ii->ii_fid))
+ /* block access to all local files except quota files */
+ RETURN(-EPERM);
+
/* lookup index object subject to the transfer */
obj = dt_locate(env, dev, &ii->ii_fid);
if (IS_ERR(obj))
rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
if (rc == 0) {
- /* name is found, get the object */
- if (!lu_fid_eq(fid, &dti->dti_fid))
- dto = ERR_PTR(-EINVAL);
- else
- dto = dt_locate(env, dt, fid);
+ dto = dt_locate(env, dt, &dti->dti_fid);
} else if (rc != -ENOENT) {
dto = ERR_PTR(rc);
} else {
}
EXPORT_SYMBOL(local_index_find_or_create_with_fid);
+static int local_object_declare_unlink(const struct lu_env *env,
+ struct dt_device *dt,
+ struct dt_object *p,
+ struct dt_object *c, const char *name,
+ struct thandle *th)
+{
+ int rc;
+
+ rc = dt_declare_delete(env, p, (const struct dt_key *)name, th);
+ if (rc < 0)
+ return rc;
+
+ rc = dt_declare_ref_del(env, c, th);
+ if (rc < 0)
+ return rc;
+
+ return dt_declare_destroy(env, c, th);
+}
+
+int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
+ struct dt_object *parent, const char *name)
+{
+ struct dt_thread_info *dti = dt_info(env);
+ struct dt_object *dto;
+ struct thandle *th;
+ int rc;
+
+ ENTRY;
+
+ rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
+ if (rc == -ENOENT)
+ RETURN(0);
+ else if (rc < 0)
+ RETURN(rc);
+
+ dto = dt_locate(env, dt, &dti->dti_fid);
+ if (unlikely(IS_ERR(dto)))
+ RETURN(PTR_ERR(dto));
+
+ th = dt_trans_create(env, dt);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ rc = local_object_declare_unlink(env, dt, parent, dto, name, th);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dt, th);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, dto, 0);
+ rc = dt_delete(env, parent, (struct dt_key *)name, th, BYPASS_CAPA);
+ if (rc < 0)
+ GOTO(unlock, rc);
+
+ rc = dt_ref_del(env, dto, th);
+ if (rc < 0) {
+ rc = dt_insert(env, parent,
+ (const struct dt_rec *)&dti->dti_fid,
+ (const struct dt_key *)name, th, BYPASS_CAPA, 1);
+ GOTO(unlock, rc);
+ }
+
+ rc = dt_destroy(env, dto, th);
+unlock:
+ dt_write_unlock(env, dto);
+stop:
+ dt_trans_stop(env, dt, th);
+out:
+ lu_object_put_nocache(env, &dto->do_lu);
+ return rc;
+}
+EXPORT_SYMBOL(local_object_unlink);
+
struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
{
struct local_oid_storage *los, *ret = NULL;
if (IS_ERR(root))
GOTO(out_los, rc = PTR_ERR(root));
+ /* initialize data allowing to generate new fids,
+ * literally we need a sequence */
snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%Lx-lastid",
fid_seq(first_fid));
rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid);
- if (rc != 0 && rc != -ENOENT)
+ if (rc == -ENOENT)
+ dti->dti_fid = *first_fid;
+ else if (rc < 0)
GOTO(out_los, rc);
- /* initialize data allowing to generate new fids,
- * literally we need a sequence */
- if (rc == 0)
- o = ls_locate(env, ls, &dti->dti_fid);
- else
- o = ls_locate(env, ls, first_fid);
+ o = ls_locate(env, ls, &dti->dti_fid);
if (IS_ERR(o))
GOTO(out_los, rc = PTR_ERR(o));
-
- dt_write_lock(env, o, 0);
+ LASSERT(fid_seq(&dti->dti_fid) == fid_seq(first_fid));
if (!dt_object_exists(o)) {
LASSERT(rc == -ENOENT);
GOTO(out_trans, rc);
rc = dt_declare_insert(env, root,
- (const struct dt_rec *)lu_object_fid(&o->do_lu),
+ (const struct dt_rec *)&dti->dti_fid,
(const struct dt_key *)dti->dti_buf,
th);
if (rc)
GOTO(out_trans, rc);
- dti->dti_lb.lb_buf = NULL;
- dti->dti_lb.lb_len = sizeof(dti->dti_lma);
- rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA,
- 0, th);
- if (rc)
- GOTO(out_trans, rc);
-
rc = dt_declare_record_write(env, o, sizeof(losd), 0, th);
if (rc)
GOTO(out_trans, rc);
if (rc)
GOTO(out_trans, rc);
- LASSERT(!dt_object_exists(o));
- rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, th);
+ dt_write_lock(env, root, 0);
+ dt_write_lock(env, o, 0);
+ if (dt_object_exists(o))
+ GOTO(out_lock, rc = 0);
+
+ rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof,
+ th);
if (rc)
- GOTO(out_trans, rc);
- LASSERT(dt_object_exists(o));
+ GOTO(out_lock, rc);
losd.lso_magic = cpu_to_le32(LOS_MAGIC);
losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1);
dti->dti_lb.lb_len = sizeof(losd);
rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
if (rc)
- GOTO(out_trans, rc);
+ GOTO(out_lock, rc);
#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0)
#error "fix this before release"
#endif
* proper hanlding of named vs no-name objects.
* Llog objects have name always as they are placed in O/d/...
*/
- if (fid_seq(lu_object_fid(&o->do_lu)) != FID_SEQ_LLOG) {
+ if (fid_seq(&dti->dti_fid) != FID_SEQ_LLOG) {
rc = dt_insert(env, root,
- (const struct dt_rec *)first_fid,
+ (const struct dt_rec *)&dti->dti_fid,
(const struct dt_key *)dti->dti_buf,
th, BYPASS_CAPA, 1);
if (rc)
- GOTO(out_trans, rc);
+ GOTO(out_lock, rc);
}
+out_lock:
+ dt_write_unlock(env, o);
+ dt_write_unlock(env, root);
out_trans:
dt_trans_stop(env, dev, th);
} else {
dti->dti_off = 0;
dti->dti_lb.lb_buf = &losd;
dti->dti_lb.lb_len = sizeof(losd);
+ dt_read_lock(env, o, 0);
rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
+ dt_read_unlock(env, o);
if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
CERROR("local storage file "DFID" is corrupted\n",
PFID(first_fid));
rc = -EINVAL;
}
}
-out_lock:
- dt_write_unlock(env, o);
out_los:
if (root != NULL && !IS_ERR(root))
lu_object_put_nocache(env, &root->do_lu);
LASSERT(oh->ot_handle == NULL);
LASSERT(inode);
- osd_trans_declare_op(env, oh, OSD_OT_DELETE,
+ osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
osd_dto_credits_noquota[DTO_OBJECT_DELETE]);
/* Recycle idle OI leaf may cause additional three OI blocks
* to be changed. */
- osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
+ osd_trans_declare_op(env, oh, OSD_OT_DELETE,
osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3);
-
/* one less inode */
rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh,
false, true, NULL, false);
oh = container_of0(handle, struct osd_thandle, ot_super);
LASSERT(oh->ot_handle == NULL);
- /* XXX: size == 0 or INT_MAX indicating a catalog header update or
- * llog write, see comment in mdd_declare_llog_record().
- *
- * This hack will be removed with llog over OSD landing
- */
- if (size == DECLARE_LLOG_REWRITE)
- credits = 2;
- else if (size == DECLARE_LLOG_WRITE)
- credits = 6;
- else
- credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
+ credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
osd_trans_declare_op(env, oh, OSD_OT_WRITE, credits);
{ "ROOT", { FID_SEQ_ROOT, 1, 0 },
OLF_SCAN_SUBITEMS | OLF_HIDE_FID, osd_ios_ROOT_scan, NULL },
- /* capa_keys */
- { CAPA_KEYS, { FID_SEQ_LOCAL_FILE, MDD_CAPA_KEYS_OID, 0 }, 0,
- NULL, NULL },
-
/* changelog_catalog */
{ CHANGELOG_CATALOG, { 0, 0, 0 }, 0, NULL, NULL },
OLF_SHOW_NAME, NULL, NULL },
/* lfsck_namespace */
- { "lfsck_namespace", { FID_SEQ_LOCAL_FILE, LFSCK_NAMESPACE_OID, 0 }, 0,
+ { "lfsck_namespace", { FID_SEQ_LOCAL_FILE, LFSCK_BOOKMARK_OID, 0 }, 0,
NULL, NULL },
/* OBJECTS, upgrade from old device */
{ MGS_CONFIGS_OID, NULL /*MOUNT_CONFIGS_DIR*/ },
{ FID_SEQ_SRV_OID, "seq_srv" },
{ FID_SEQ_CTL_OID, "seq_ctl" },
- { MDD_CAPA_KEYS_OID, NULL /*CAPA_KEYS*/ },
{ FLD_INDEX_OID, "fld" },
{ MDD_LOV_OBJ_OID, LOV_OBJID },
{ OFD_HEALTH_CHECK_OID, HEALTH_CHECK },
{ ACCT_USER_OID, "acct_usr_inode" },
{ ACCT_GROUP_OID, "acct_grp_inode" },
- { MDD_ORPHAN_OID, NULL },
{ 0, NULL }
};