*/
enum lma_incompat {
LMAI_RELEASED = 0x0000001, /* file is released */
+ LMAI_AGENT = 0x00000002, /* agent inode */
+ LMAI_REMOTE_PARENT = 0x00000004, /* the parent of the object
+ is on the remote MDT */
};
#define LMA_INCOMPAT_SUPP 0x0
extern void lustre_lma_swab(struct lustre_mdt_attrs *lma);
extern void lustre_lma_init(struct lustre_mdt_attrs *lma,
- const struct lu_fid *fid);
+ const struct lu_fid *fid, __u32 incompat);
/**
* SOM on-disk attributes stored in a separate xattr.
*/
OFD_HEALTH_CHECK_OID = 4120UL,
MDD_LOV_OBJ_OSEQ = 4121UL,
LFSCK_NAMESPACE_OID = 4122UL,
+ REMOTE_PARENT_DIR_OID = 4123UL,
};
static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
lu_root_fid(&fid);
lma = (struct lustre_mdt_attrs *)&mdd_env_info(env)->mti_xattr_buf;
- lustre_lma_init(lma, &fid);
+ lustre_lma_init(lma, &fid, 0);
lustre_lma_swab(lma);
buf.lb_buf = lma;
buf.lb_len = sizeof(*lma);
*
* \param lma - is the new LMA structure to be initialized
* \param fid - is the FID of the object this LMA belongs to
+ * \param incompat - features that MDS must understand to access object
*/
-void lustre_lma_init(struct lustre_mdt_attrs *lma, const struct lu_fid *fid)
+void lustre_lma_init(struct lustre_mdt_attrs *lma, const struct lu_fid *fid,
+ __u32 incompat)
{
lma->lma_compat = 0;
- lma->lma_incompat = 0;
+ lma->lma_incompat = incompat;
lma->lma_self_fid = *fid;
- lma->lma_flags = 0;
+ lma->lma_flags = 0;
/* If a field is added in struct lustre_mdt_attrs, zero it explicitly
* and change the test below. */
#include <linux/types.h>
/* prerequisite for linux/xattr.h */
#include <linux/fs.h>
+/* XATTR_{REPLACE,CREATE} */
+#include <linux/xattr.h>
/*
* struct OBD_{ALLOC,FREE}*()
return count;
}
-/*
- * directory structure on legacy MDT:
- *
- * REM_OBJ_DIR/ per mdt
- * AGENT_OBJ_DIR/ per mdt
- *
- */
-static const char remote_obj_dir[] = "REM_OBJ_DIR";
-static const char agent_obj_dir[] = "AGENT_OBJ_DIR";
-int osd_mdt_init(struct osd_device *dev)
+static const char remote_parent_dir[] = "REMOTE_PARENT_DIR";
+static int osd_mdt_init(const struct lu_env *env, struct osd_device *dev)
{
- struct lvfs_run_ctxt new;
- struct lvfs_run_ctxt save;
- struct dentry *parent;
- struct osd_mdobj_map *omm;
- struct dentry *d;
- int rc = 0;
+ struct lvfs_run_ctxt new;
+ struct lvfs_run_ctxt save;
+ struct dentry *parent;
+ struct osd_mdobj_map *omm;
+ struct dentry *d;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lu_fid *fid = &info->oti_fid;
+ int rc = 0;
ENTRY;
OBD_ALLOC_PTR(dev->od_mdt_map);
parent = osd_sb(dev)->s_root;
osd_push_ctxt(dev, &new, &save);
- d = simple_mkdir(parent, dev->od_mnt, agent_obj_dir,
+ d = simple_mkdir(parent, dev->od_mnt, remote_parent_dir,
0755, 1);
if (IS_ERR(d))
GOTO(cleanup, rc = PTR_ERR(d));
- omm->omm_agent_dentry = d;
+ omm->omm_remote_parent = d;
+ /* Set LMA for remote parent inode */
+ lu_local_obj_fid(fid, REMOTE_PARENT_DIR_OID);
+ rc = osd_ea_fid_set(info, d->d_inode, fid, 0);
+ if (rc != 0)
+ GOTO(cleanup, rc);
cleanup:
pop_ctxt(&save, &new, NULL);
if (rc) {
- if (omm->omm_agent_dentry != NULL)
- dput(omm->omm_agent_dentry);
+ if (omm->omm_remote_parent != NULL)
+ dput(omm->omm_remote_parent);
OBD_FREE_PTR(omm);
dev->od_mdt_map = NULL;
}
if (omm == NULL)
return;
- if (omm->omm_agent_dentry)
- dput(omm->omm_agent_dentry);
+ if (omm->omm_remote_parent)
+ dput(omm->omm_remote_parent);
OBD_FREE_PTR(omm);
osd->od_ost_map = NULL;
}
-int osd_add_to_agent(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh)
+int osd_add_to_remote_parent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh)
{
struct osd_mdobj_map *omm = osd->od_mdt_map;
struct osd_thread_info *oti = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs;
char *name = oti->oti_name;
- struct dentry *agent;
+ struct dentry *dentry;
struct dentry *parent;
int rc;
- parent = omm->omm_agent_dentry;
+ /* Set REMOTE_PARENT in lma, so other process like unlink or lfsck
+ * can identify this object quickly */
+ rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry, lma);
+ if (rc != 0)
+ RETURN(rc);
+
+ lma->lma_incompat |= LMAI_REMOTE_PARENT;
+ lustre_lma_swab(lma);
+ rc = __osd_xattr_set(oti, obj->oo_inode, XATTR_NAME_LMA, lma,
+ sizeof(*lma), XATTR_REPLACE);
+ if (rc != 0)
+ RETURN(rc);
+
+ parent = omm->omm_remote_parent;
sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
- agent = osd_child_dentry_by_inode(env, parent->d_inode,
- name, strlen(name));
+ dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+ name, strlen(name));
mutex_lock(&parent->d_inode->i_mutex);
- rc = osd_ldiskfs_add_entry(oh->ot_handle, agent, obj->oo_inode, NULL);
- LASSERTF(parent->d_inode->i_nlink > 1, "%s: agent inode nlink %d",
- osd_name(osd), parent->d_inode->i_nlink);
+ rc = osd_ldiskfs_add_entry(oh->ot_handle, dentry, obj->oo_inode,
+ NULL);
+ LASSERTF(parent->d_inode->i_nlink > 1, "%s: %lu nlink %d",
+ osd_name(osd), parent->d_inode->i_ino,
+ parent->d_inode->i_nlink);
parent->d_inode->i_nlink++;
mark_inode_dirty(parent->d_inode);
mutex_unlock(&parent->d_inode->i_mutex);
RETURN(rc);
}
-int osd_delete_from_agent(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh)
+int osd_delete_from_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh)
{
struct osd_mdobj_map *omm = osd->od_mdt_map;
struct osd_thread_info *oti = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs;
char *name = oti->oti_name;
- struct dentry *agent;
+ struct dentry *dentry;
struct dentry *parent;
struct ldiskfs_dir_entry_2 *de;
struct buffer_head *bh;
int rc;
- parent = omm->omm_agent_dentry;
+ /* Check lma to see whether it is remote object */
+ rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry, lma);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (likely(!(lma->lma_incompat & LMAI_REMOTE_PARENT)))
+ RETURN(0);
+
+ parent = omm->omm_remote_parent;
sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
- agent = osd_child_dentry_by_inode(env, parent->d_inode,
- name, strlen(name));
+ dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+ name, strlen(name));
mutex_lock(&parent->d_inode->i_mutex);
- bh = osd_ldiskfs_find_entry(parent->d_inode, agent, &de, NULL);
+ bh = osd_ldiskfs_find_entry(parent->d_inode, dentry, &de, NULL);
if (bh == NULL) {
mutex_unlock(&parent->d_inode->i_mutex);
RETURN(-ENOENT);
}
rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode, de, bh);
- LASSERTF(parent->d_inode->i_nlink > 1, "%s: agent inode nlink %d",
- osd_name(osd), parent->d_inode->i_nlink);
+ LASSERTF(parent->d_inode->i_nlink > 1, "%s: %lu nlink %d",
+ osd_name(osd), parent->d_inode->i_ino,
+ parent->d_inode->i_nlink);
parent->d_inode->i_nlink--;
mark_inode_dirty(parent->d_inode);
mutex_unlock(&parent->d_inode->i_mutex);
* CONFIGS
*
*/
-int osd_ost_init(struct osd_device *dev)
+static int osd_ost_init(struct osd_device *dev)
{
struct lvfs_run_ctxt new;
struct lvfs_run_ctxt save;
EXIT;
}
-int osd_obj_map_init(struct osd_device *dev)
+int osd_obj_map_init(const struct lu_env *env, struct osd_device *dev)
{
int rc;
ENTRY;
RETURN(rc);
/* prepare structures for MDS */
- rc = osd_mdt_init(dev);
+ rc = osd_mdt_init(env, dev);
RETURN(rc);
}
mutex_lock(&inode->i_mutex);
if (S_ISDIR(inode->i_mode)) {
LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1);
- /* it will check/delete the agent inode for every dir
- * destory, how to optimize it? unlink performance
- * impaction XXX */
- result = osd_delete_from_agent(env, osd, obj, oh);
+ /* it will check/delete the inode from remote parent,
+ * how to optimize it? unlink performance impaction XXX */
+ result = osd_delete_from_remote_parent(env, osd, obj, oh);
if (result != 0 && result != -ENOENT) {
- CERROR("%s: delete agent inode "DFID": rc = %d\n",
+ CERROR("%s: delete inode "DFID": rc = %d\n",
osd_name(osd), PFID(fid), result);
}
spin_lock(&obj->oo_guard);
RETURN(0);
}
-static inline int __osd_xattr_set(struct osd_thread_info *info,
- struct inode *inode, const char *name,
- const void *buf, int buflen, int fl)
-{
- struct dentry *dentry = &info->oti_child_dentry;
-
- ll_vfs_dq_init(inode);
- dentry->d_inode = inode;
- return inode->i_op->setxattr(dentry, name, buf, buflen, fl);
-}
-
/**
* Put the fid into lustre_mdt_attrs, and then place the structure
* inode's ea. This fid should not be altered during the life time
* FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
*/
int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode,
- const struct lu_fid *fid)
+ const struct lu_fid *fid, __u64 flags)
{
struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
int rc;
if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF) && fid_is_client_visible(fid))
return 0;
- lustre_lma_init(lma, fid);
+ lustre_lma_init(lma, fid, flags);
lustre_lma_swab(lma);
rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma, sizeof(*lma),
}
/**
- * Create an local inode for remote entry
+ * Create an local agent inode for remote entry
*/
-static struct inode *osd_create_remote_inode(const struct lu_env *env,
- struct osd_device *osd,
- struct osd_object *pobj,
- const struct lu_fid *fid,
- struct thandle *th)
+static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *pobj,
+ const struct lu_fid *fid,
+ struct thandle *th)
{
struct osd_thread_info *info = osd_oti_get(env);
struct inode *local;
RETURN(local);
}
+ /* Set special LMA flag for local agent inode */
+ rc = osd_ea_fid_set(info, local, fid, LMAI_AGENT);
+ if (rc != 0) {
+ CERROR("%s: set LMA for "DFID" remote inode failed: rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ RETURN(ERR_PTR(rc));
+ }
+
rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
(const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
(const struct dt_rec *)fid, oh);
if ((result == 0) &&
(fid_is_last_id(fid) ||
!fid_is_on_ost(info, osd_dt_dev(th->th_dev), fid)))
- result = osd_ea_fid_set(info, obj->oo_inode, fid);
+ result = osd_ea_fid_set(info, obj->oo_inode, fid, 0);
if (result == 0)
result = __osd_oi_insert(env, obj, fid, th);
/* If parent on remote MDT, we need put this object
* under AGENT */
oh = container_of(th, typeof(*oh), ot_super);
- rc = osd_add_to_agent(env, osd, obj, oh);
+ rc = osd_add_to_remote_parent(env, osd, obj, oh);
if (rc != 0) {
- CERROR("%s: add agent "DFID" error: rc = %d\n",
+ CERROR("%s: add "DFID" error: rc = %d\n",
osd_name(osd),
PFID(lu_object_fid(&dt->do_lu)), rc);
RETURN(rc);
}
- child_inode = igrab(omm->omm_agent_dentry->d_inode);
+ child_inode = igrab(omm->omm_remote_parent->d_inode);
} else {
- child_inode = osd_create_remote_inode(env, osd, obj,
- fid, th);
+ child_inode = osd_create_local_agent_inode(env, osd,
+ obj, fid,
+ th);
if (IS_ERR(child_inode))
RETURN(PTR_ERR(child_inode));
}
if (unlikely(fid_is_sane(fid))) {
/* FID-in-dirent exists, but FID-in-LMA is lost.
* Trust the FID-in-dirent, and add FID-in-LMA. */
- rc = osd_ea_fid_set(info, inode, fid);
+ rc = osd_ea_fid_set(info, inode, fid, 0);
if (rc == 0)
*attr |= LUDA_REPAIR;
} else {
GOTO(out_mnt, rc);
}
- rc = osd_obj_map_init(o);
+ rc = osd_obj_map_init(env, o);
if (rc != 0)
GOTO(out_scrub, rc);
};
struct osd_mdobj_map {
- struct dentry *omm_agent_dentry;
+ struct dentry *omm_remote_parent;
};
#define osd_ldiskfs_find_entry(dir, dentry, de, lock) \
extern int ldiskfs_pdo;
+static inline int __osd_xattr_set(struct osd_thread_info *info,
+ struct inode *inode, const char *name,
+ const void *buf, int buflen, int fl)
+{
+ struct dentry *dentry = &info->oti_child_dentry;
+
+ ll_vfs_dq_init(inode);
+ dentry->d_inode = inode;
+ return inode->i_op->setxattr(dentry, name, buf, buflen, fl);
+}
+
#ifdef LPROCFS
/* osd_lproc.c */
void lprocfs_osd_init_vars(struct lprocfs_static_vars *lvars);
struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
struct osd_inode_id *id);
int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode,
- const struct lu_fid *fid);
+ const struct lu_fid *fid, __u64 flags);
int osd_get_lma(struct osd_thread_info *info, struct inode *inode,
struct dentry *dentry, struct lustre_mdt_attrs *lma);
-int osd_obj_map_init(struct osd_device *osd);
+int osd_obj_map_init(const struct lu_env *env, struct osd_device *osd);
void osd_obj_map_fini(struct osd_device *dev);
int osd_obj_map_lookup(struct osd_thread_info *info, struct osd_device *osd,
const struct lu_fid *fid, struct osd_inode_id *id);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
const struct lu_fid *fid, struct lu_seq_range *range);
-struct dentry *osd_agent_lookup(struct osd_mdobj_map *omm, int index);
-struct dentry *osd_agent_load(const struct osd_device *osd, int mdt_index,
- int create);
-
-int osd_delete_from_agent(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh);
-int osd_add_to_agent(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh);
+
+int osd_delete_from_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh);
+int osd_add_to_remote_parent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh);
/* osd_quota_fmt.c */
int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
ops = DTO_INDEX_INSERT;
idx = osd_oi_fid2idx(dev, fid);
if (val == SCRUB_NEXT_NOLMA) {
- rc = osd_ea_fid_set(info, inode, fid);
+ rc = osd_ea_fid_set(info, inode, fid, 0);
if (rc != 0)
GOTO(out, rc);
} else {
lu_igif_build(&tfid, inode->i_ino, inode->i_generation);
else
tfid = *fid;
- rc = osd_ea_fid_set(info, inode, &tfid);
+ rc = osd_ea_fid_set(info, inode, &tfid, 0);
if (rc != 0)
RETURN(rc);
} else {
struct lu_buf buf;
int rc;
- lustre_lma_init(lma, fid);
+ lustre_lma_init(lma, fid, 0);
lustre_lma_swab(lma);
buf.lb_buf = lma;
buf.lb_len = sizeof(*lma);