* Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <linux/types.h>
/* prerequisite for linux/xattr.h */
#include <linux/fs.h>
+/* XATTR_{REPLACE,CREATE} */
+#include <linux/xattr.h>
/*
* struct OBD_{ALLOC,FREE}*()
* OBD_FAIL_CHECK
*/
#include <obd_support.h>
-#include <lvfs.h>
#include "osd_internal.h"
#include "osd_oi.h"
pop_ctxt(save, new, NULL);
}
+/* utility to make a directory */
+static struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt,
+ const char *name, int mode, int fix)
+{
+ struct dentry *dchild;
+ int err = 0;
+ ENTRY;
+
+ // ASSERT_KERNEL_CTXT("kernel doing mkdir outside kernel context\n");
+ CDEBUG(D_INODE, "creating directory %.*s\n", (int)strlen(name), name);
+ dchild = ll_lookup_one_len(name, dir, strlen(name));
+ if (IS_ERR(dchild))
+ GOTO(out_up, dchild);
+
+ if (dchild->d_inode) {
+ int old_mode = dchild->d_inode->i_mode;
+ if (!S_ISDIR(old_mode)) {
+ CERROR("found %s (%lu/%u) is mode %o\n", name,
+ dchild->d_inode->i_ino,
+ dchild->d_inode->i_generation, old_mode);
+ GOTO(out_err, err = -ENOTDIR);
+ }
+
+ /* Fixup directory permissions if necessary */
+ if (fix && (old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
+ CDEBUG(D_CONFIG,
+ "fixing permissions on %s from %o to %o\n",
+ name, old_mode, mode);
+ dchild->d_inode->i_mode = (mode & S_IALLUGO) |
+ (old_mode & ~S_IALLUGO);
+ mark_inode_dirty(dchild->d_inode);
+ }
+ GOTO(out_up, dchild);
+ }
+
+ err = ll_vfs_mkdir(dir->d_inode, dchild, mnt, mode);
+ if (err)
+ GOTO(out_err, err);
+
+ RETURN(dchild);
+
+out_err:
+ dput(dchild);
+ dchild = ERR_PTR(err);
+out_up:
+ return dchild;
+}
+
int osd_last_rcvd_subdir_count(struct osd_device *osd)
{
struct lr_server_data lsd;
return count;
}
-/*
- * directory structure on legacy MDT:
- *
- * REM_OBJ_DIR/ per mdt
- * AGENT_OBJ_DIR/ per mdt
- *
- */
-static const char remote_obj_dir[] = "REM_OBJ_DIR";
-static const char agent_obj_dir[] = "AGENT_OBJ_DIR";
-int osd_mdt_init(struct osd_device *dev)
+static const char remote_parent_dir[] = "REMOTE_PARENT_DIR";
+static int osd_mdt_init(const struct lu_env *env, struct osd_device *dev)
{
- struct lvfs_run_ctxt new;
- struct lvfs_run_ctxt save;
- struct dentry *parent;
- struct osd_mdobj_map *omm;
- struct dentry *d;
- int rc = 0;
+ struct lvfs_run_ctxt new;
+ struct lvfs_run_ctxt save;
+ struct dentry *parent;
+ struct osd_mdobj_map *omm;
+ struct dentry *d;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lu_fid *fid = &info->oti_fid;
+ int rc = 0;
ENTRY;
OBD_ALLOC_PTR(dev->od_mdt_map);
parent = osd_sb(dev)->s_root;
osd_push_ctxt(dev, &new, &save);
- d = simple_mkdir(parent, dev->od_mnt, agent_obj_dir,
+ d = simple_mkdir(parent, dev->od_mnt, remote_parent_dir,
0755, 1);
if (IS_ERR(d))
GOTO(cleanup, rc = PTR_ERR(d));
- omm->omm_agent_dentry = d;
+ ldiskfs_set_inode_state(d->d_inode, LDISKFS_STATE_LUSTRE_NO_OI);
+ omm->omm_remote_parent = d;
+ /* Set LMA for remote parent inode */
+ lu_local_obj_fid(fid, REMOTE_PARENT_DIR_OID);
+ rc = osd_ea_fid_set(info, d->d_inode, fid, 0);
+ if (rc != 0)
+ GOTO(cleanup, rc);
cleanup:
pop_ctxt(&save, &new, NULL);
if (rc) {
- if (omm->omm_agent_dentry != NULL)
- dput(omm->omm_agent_dentry);
+ if (omm->omm_remote_parent != NULL)
+ dput(omm->omm_remote_parent);
OBD_FREE_PTR(omm);
dev->od_mdt_map = NULL;
}
if (omm == NULL)
return;
- if (omm->omm_agent_dentry)
- dput(omm->omm_agent_dentry);
+ if (omm->omm_remote_parent)
+ dput(omm->omm_remote_parent);
OBD_FREE_PTR(omm);
osd->od_ost_map = NULL;
}
-int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh)
+int osd_add_to_remote_parent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh)
{
struct osd_mdobj_map *omm = osd->od_mdt_map;
struct osd_thread_info *oti = osd_oti_get(env);
- char *name_buf = oti->oti_name;
- struct dentry *agent;
+ struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs;
+ char *name = oti->oti_name;
+ struct dentry *dentry;
struct dentry *parent;
int rc;
- parent = omm->omm_agent_dentry;
- sprintf(name_buf, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
- agent = osd_child_dentry_by_inode(env, parent->d_inode,
- name_buf, strlen(name_buf));
+ /* Set REMOTE_PARENT in lma, so other process like unlink or lfsck
+ * can identify this object quickly */
+ rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry, lma);
+ if (rc != 0)
+ RETURN(rc);
+
+ lma->lma_incompat |= LMAI_REMOTE_PARENT;
+ lustre_lma_swab(lma);
+ rc = __osd_xattr_set(oti, obj->oo_inode, XATTR_NAME_LMA, lma,
+ sizeof(*lma), XATTR_REPLACE);
+ if (rc != 0)
+ RETURN(rc);
+
+ parent = omm->omm_remote_parent;
+ sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+ dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+ name, strlen(name));
mutex_lock(&parent->d_inode->i_mutex);
- rc = osd_ldiskfs_add_entry(oh->ot_handle, agent, obj->oo_inode, NULL);
+ rc = osd_ldiskfs_add_entry(oh->ot_handle, dentry, obj->oo_inode,
+ NULL);
+ CDEBUG(D_INODE, "%s: add %s:%lu to remote parent %lu.\n", osd_name(osd),
+ name, obj->oo_inode->i_ino, parent->d_inode->i_ino);
+ LASSERTF(parent->d_inode->i_nlink > 1, "%s: %lu nlink %d",
+ osd_name(osd), parent->d_inode->i_ino,
+ parent->d_inode->i_nlink);
parent->d_inode->i_nlink++;
mark_inode_dirty(parent->d_inode);
mutex_unlock(&parent->d_inode->i_mutex);
- if (rc != 0)
- CERROR("%s: "DFID" add agent error: rc = %d\n", osd_name(osd),
- PFID(lu_object_fid(&obj->oo_dt.do_lu)), rc);
RETURN(rc);
}
-int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh)
+int osd_delete_from_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh)
{
struct osd_mdobj_map *omm = osd->od_mdt_map;
struct osd_thread_info *oti = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs;
char *name = oti->oti_name;
- struct dentry *agent;
+ struct dentry *dentry;
struct dentry *parent;
struct ldiskfs_dir_entry_2 *de;
struct buffer_head *bh;
int rc;
- parent = omm->omm_agent_dentry;
- sprintf(name, DFID, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
- agent = osd_child_dentry_by_inode(env, parent->d_inode,
- name, strlen(name));
+ /* Check lma to see whether it is remote object */
+ rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry, lma);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (likely(!(lma->lma_incompat & LMAI_REMOTE_PARENT)))
+ RETURN(0);
+
+ parent = omm->omm_remote_parent;
+ sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+ dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+ name, strlen(name));
mutex_lock(&parent->d_inode->i_mutex);
- bh = osd_ldiskfs_find_entry(parent->d_inode, agent, &de, NULL);
+ bh = osd_ldiskfs_find_entry(parent->d_inode, dentry, &de, NULL);
if (bh == NULL) {
mutex_unlock(&parent->d_inode->i_mutex);
RETURN(-ENOENT);
}
+ CDEBUG(D_INODE, "%s: el %s:%lu to remote parent %lu.\n", osd_name(osd),
+ name, obj->oo_inode->i_ino, parent->d_inode->i_ino);
rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode, de, bh);
+ LASSERTF(parent->d_inode->i_nlink > 1, "%s: %lu nlink %d",
+ osd_name(osd), parent->d_inode->i_ino,
+ parent->d_inode->i_nlink);
parent->d_inode->i_nlink--;
mark_inode_dirty(parent->d_inode);
mutex_unlock(&parent->d_inode->i_mutex);
brelse(bh);
+
+ /* Get rid of REMOTE_PARENT flag from incompat */
+ lma->lma_incompat &= ~LMAI_REMOTE_PARENT;
+ lustre_lma_swab(lma);
+ rc = __osd_xattr_set(oti, obj->oo_inode, XATTR_NAME_LMA, lma,
+ sizeof(*lma), XATTR_REPLACE);
RETURN(rc);
}
* CONFIGS
*
*/
-int osd_ost_init(struct osd_device *dev)
+static int osd_ost_init(struct osd_device *dev)
{
struct lvfs_run_ctxt new;
struct lvfs_run_ctxt save;
EXIT;
}
-int osd_obj_map_init(struct osd_device *dev)
+int osd_obj_map_init(const struct lu_env *env, struct osd_device *dev)
{
int rc;
ENTRY;
RETURN(rc);
/* prepare structures for MDS */
- rc = osd_mdt_init(dev);
+ rc = osd_mdt_init(env, dev);
RETURN(rc);
}
LASSERT(map);
LASSERT(map->om_root);
- fid_ostid_pack(fid, ostid);
- osd_seq = osd_seq_load(dev, ostid->oi_seq);
+ fid_to_ostid(fid, ostid);
+ osd_seq = osd_seq_load(dev, ostid_seq(ostid));
if (IS_ERR(osd_seq))
RETURN(PTR_ERR(osd_seq));
- dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+ dirn = ostid_id(ostid) & (osd_seq->oos_subdir_count - 1);
d_seq = osd_seq->oos_dirs[dirn];
LASSERT(d_seq);
- osd_oid_name(name, fid, ostid->oi_id);
+ osd_oid_name(name, fid, ostid_id(ostid));
child = &info->oti_child_dentry;
child->d_parent = d_seq;
LASSERT(map);
/* map fid to seq:objid */
- fid_ostid_pack(fid, ostid);
+ fid_to_ostid(fid, ostid);
- osd_seq = osd_seq_load(osd, ostid->oi_seq);
+ osd_seq = osd_seq_load(osd, ostid_seq(ostid));
if (IS_ERR(osd_seq))
RETURN(PTR_ERR(osd_seq));
- dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+ dirn = ostid_id(ostid) & (osd_seq->oos_subdir_count - 1);
d = osd_seq->oos_dirs[dirn];
LASSERT(d);
- osd_oid_name(name, fid, ostid->oi_id);
+ osd_oid_name(name, fid, ostid_id(ostid));
rc = osd_obj_add_entry(info, osd, d, name, id, th);
RETURN(rc);
LASSERT(map);
/* map fid to seq:objid */
- fid_ostid_pack(fid, ostid);
+ fid_to_ostid(fid, ostid);
- osd_seq = osd_seq_load(osd, ostid->oi_seq);
+ osd_seq = osd_seq_load(osd, ostid_seq(ostid));
if (IS_ERR(osd_seq))
GOTO(cleanup, rc = PTR_ERR(osd_seq));
- dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+ dirn = ostid_id(ostid) & (osd_seq->oos_subdir_count - 1);
d = osd_seq->oos_dirs[dirn];
LASSERT(d);
- osd_oid_name(name, fid, ostid->oi_id);
+ osd_oid_name(name, fid, ostid_id(ostid));
rc = osd_obj_del_entry(info, osd, d, name, th);
cleanup:
RETURN(rc);
const struct lu_fid *fid, struct osd_inode_id *id)
{
struct dentry *root;
- struct dentry *dentry;
- struct inode *inode;
- char *name;
- int rc = -ENOENT;
+ struct dentry *dentry;
+ struct inode *inode;
+ char *name;
+ int rc = -ENOENT;
ENTRY;
if (fid_is_last_id(fid)) {