struct osd_directory {
struct iam_container od_container;
struct iam_descr od_descr;
- struct semaphore od_sem;
};
struct osd_object {
* creation, or assigned by osd_object_create() under write lock).
*/
struct inode *oo_inode;
+ /**
+ * to protect index ops.
+ */
+ struct rw_semaphore oo_ext_idx_sem;
struct rw_semaphore oo_sem;
struct osd_directory *oo_dir;
/** protects inode attributes. */
l->lo_ops = &osd_lu_obj_ops;
init_rwsem(&mo->oo_sem);
+ init_rwsem(&mo->oo_ext_idx_sem);
spin_lock_init(&mo->oo_guard);
return l;
} else
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thread_info *oti = osd_oti_get(env);
struct txn_param *prm = &oti->oti_txn;
+ struct lu_env *env_del_obj = &oti->oti_obj_delete_tx_env;
struct thandle *th;
int result;
+ lu_env_init(env_del_obj, LCT_DT_THREAD);
txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
OSD_TXN_INODE_DELETE_CREDITS);
- th = osd_trans_start(env, &osd->od_dt_dev, prm);
+ th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
if (!IS_ERR(th)) {
- result = osd_oi_delete(oti, &osd->od_oi, fid, th);
- osd_trans_stop(env, th);
+ result = osd_oi_delete(osd_oti_get(env_del_obj),
+ &osd->od_oi, fid, th);
+ osd_trans_stop(env_del_obj, th);
} else
result = PTR_ERR(th);
+
+ lu_env_fini(env_del_obj);
return result;
}
[DTO_INDEX_DELETE] = 20,
/**
* Unused now.
- */
+ */
[DTO_IDNEX_UPDATE] = 16,
/*
* Create a object. Same as create object in EXT3 filesystem.
* INDEX_EXTRA_BLOCKS(8) +
* 3(inode bits, groups, GDT) +
* QUOTA(?)
- */
+ */
[DTO_OBJECT_DELETE] = 27,
/**
* Attr set credits.
LDISKFS_I(inode)->i_disksize = attr->la_size;
i_size_write(inode, attr->la_size);
}
-# if 0
- /*
- * OSD should not change "i_blocks" which is used by quota.
+
+ /* OSD should not change "i_blocks" which is used by quota.
* "i_blocks" should be changed by ldiskfs only.
- * Disable this assignment until SOM to fix some EA field. */
+ * Enable this assignment for SOM purpose now, until it is
+ * stored in SOM EA. */
if (bits & LA_BLOCKS)
inode->i_blocks = attr->la_blocks;
-#endif
+
if (bits & LA_MODE)
inode->i_mode = (inode->i_mode & S_IFMT) |
(attr->la_mode & ~S_IFMT);
static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
struct lu_attr *attr, struct thandle *th)
{
- LASSERT(obj->oo_inode != NULL);
-
osd_object_init0(obj);
return 0;
}
rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
sizeof *mdt_attrs);
+ /* Check LMA compatibility */
+ if (rc > 0 &&
+ (mdt_attrs->lma_incompat & ~cpu_to_be32(LMA_INCOMPAT_SUPP))) {
+ CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
+ inode->i_ino, be32_to_cpu(mdt_attrs->lma_incompat) &
+ ~LMA_INCOMPAT_SUPP);
+ return -ENOSYS;
+ }
+
if (rc > 0) {
fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid);
rc = 0;
RETURN(rc);
}
+/*
+ * Get the 64-bit version for an inode.
+ */
+static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
+ LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+ return LDISKFS_I(inode)->i_fs_version;
+}
+
+/*
+ * Set the 64-bit version and return the old version.
+ */
+static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
+ dt_obj_version_t new_version)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+ new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+ LDISKFS_I(inode)->i_fs_version = new_version;
+ /** Version is set after all inode operations are finished,
+ * so we should mark it dirty here */
+ inode->i_sb->s_op->dirty_inode(inode);
+}
+
static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
void **data)
{
.do_xattr_list = osd_xattr_list,
.do_capa_get = osd_capa_get,
.do_object_sync = osd_object_sync,
+ .do_version_get = osd_object_version_get,
+ .do_version_set = osd_object_version_set,
.do_data_get = osd_data_get,
};
.do_xattr_list = osd_xattr_list,
.do_capa_get = osd_capa_get,
.do_object_sync = osd_object_sync,
+ .do_version_get = osd_object_version_get,
+ .do_version_set = osd_object_version_set,
.do_data_get = osd_data_get,
};
OBD_ALLOC_PTR(dir);
if (dir != NULL) {
- sema_init(&dir->od_sem, 1);
spin_lock(&obj->oo_guard);
if (obj->oo_dir == NULL)
* Now, that we have container data, serialize its
* initialization.
*/
- down(&obj->oo_dir->od_sem);
+ down_write(&obj->oo_ext_idx_sem);
/*
* recheck under lock.
*/
result = osd_iam_container_init(env, obj, dir);
else
result = 0;
- up(&obj->oo_dir->od_sem);
+ up_write(&obj->oo_ext_idx_sem);
} else
result = -ENOMEM;
} else
dentry = osd_child_dentry_get(env, obj,
(char *)key, strlen((char *)key));
+
+ down_write(&obj->oo_ext_idx_sem);
bh = ldiskfs_find_entry(dentry, &de);
if (bh) {
struct osd_thread_info *oti = osd_oti_get(env);
} else
rc = -ENOENT;
+ up_write(&obj->oo_ext_idx_sem);
LASSERT(osd_invariant(obj));
RETURN(rc);
}
dentry = osd_child_dentry_get(env, obj,
(char *)key, strlen((char *)key));
+
+ down_read(&obj->oo_ext_idx_sem);
bh = ldiskfs_find_entry(dentry, &de);
if (bh) {
ino = le32_to_cpu(de->inode);
rc = osd_ea_fid_get(env, dentry, rec);
iput(inode);
} else
- rc = -ENOENT;
+ rc = PTR_ERR(inode);
} else
rc = -ENOENT;
+ up_read(&obj->oo_ext_idx_sem);
RETURN (rc);
}
else
current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
#endif
+ down_write(&obj->oo_ext_idx_sem);
rc = osd_ea_add_rec(env, obj, child, name, th);
-
+ up_write(&obj->oo_ext_idx_sem);
#ifdef HAVE_QUOTA_SUPPORT
current->cap_effective = save;
#endif
obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
obj_dentry->d_name.hash = 0;
- it->oie_namelen = 0;
+ it->oie_rd_dirent = 0;
+ it->oie_it_dirent = 0;
it->oie_curr_pos = 0;
it->oie_next_pos = 0;
+ it->oie_dirent = NULL;
+ it->oie_buf = info->oti_it_ea_buf;
it->oie_obj = obj;
it->oie_file.f_dentry = obj_dentry;
it->oie_file.f_mapping = obj->oo_inode->i_mapping;
it->oie_file.f_op = obj->oo_inode->i_fop;
it->oie_file.private_data = NULL;
lu_object_get(lo);
-
RETURN((struct dt_it*) it);
}
ENTRY;
LASSERT(((const char *)key)[0] == '\0');
- it->oie_namelen = 0;
it->oie_curr_pos = 0;
it->oie_next_pos = 0;
+ it->oie_rd_dirent = 0;
+ it->oie_it_dirent = 0;
+ it->oie_dirent = NULL;
RETURN(+1);
}
loff_t offset, ino_t ino,
unsigned int d_type)
{
- struct osd_it_ea *it = (struct osd_it_ea *)buf;
- struct dirent64 *dirent = &it->oie_dirent64;
-
+ struct osd_it_ea *it = (struct osd_it_ea *)buf;
+ struct osd_it_ea_dirent *ent = it->oie_dirent;
ENTRY;
- if (it->oie_namelen)
- RETURN(-ENOENT);
- if (namelen == 0 || namelen > LDISKFS_NAME_LEN)
+ /* this should never happen */
+ if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
+ CERROR("ldiskfs return invalid namelen %d\n", namelen);
RETURN(-EIO);
+ }
+
+ if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
+ OSD_IT_EA_BUFSIZE)
+ RETURN(1);
- strncpy(dirent->d_name, name, LDISKFS_NAME_LEN);
- dirent->d_name[namelen] = 0;
- dirent->d_ino = ino;
- it->oie_namelen = namelen;
- it->oie_curr_pos = offset;
+ ent->oied_ino = ino;
+ ent->oied_off = offset;
+ ent->oied_namelen = namelen;
+ memcpy(ent->oied_name, name, namelen);
+ it->oie_rd_dirent++;
+ it->oie_dirent = (void *) ent + size_round(sizeof(*ent) + namelen);
RETURN(0);
}
* \retval 0, on success
* \retval -ve, on error
*/
-int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct dt_it *di)
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
struct osd_object *obj = it->oie_obj;
int result = 0;
ENTRY;
- it->oie_namelen = 0;
+ it->oie_dirent = it->oie_buf;
+ it->oie_rd_dirent = 0;
it->oie_file.f_pos = it->oie_curr_pos;
+ down_read(&obj->oo_ext_idx_sem);
result = inode->i_fop->readdir(&it->oie_file, it,
(filldir_t) osd_ldiskfs_filldir);
+ up_read(&obj->oo_ext_idx_sem);
it->oie_next_pos = it->oie_file.f_pos;
- if (it->oie_namelen == 0)
+ if (it->oie_rd_dirent == 0) {
result = -EIO;
+ } else {
+ it->oie_dirent = it->oie_buf;
+ it->oie_it_dirent = 1;
+ }
RETURN(result);
}
int rc;
ENTRY;
- it->oie_curr_pos = it->oie_next_pos;
- if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
- rc = +1;
- else
- rc = osd_ldiskfs_it_fill(di);
+ if (it->oie_it_dirent < it->oie_rd_dirent) {
+ it->oie_dirent = (void *) it->oie_dirent +
+ size_round(sizeof(struct osd_it_ea_dirent) +
+ it->oie_dirent->oied_namelen);
+ it->oie_it_dirent++;
+ RETURN(0);
+ } else {
+ it->oie_curr_pos = it->oie_next_pos;
+
+ if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
+ rc = +1;
+ else
+ rc = osd_ldiskfs_it_fill(di);
+ }
RETURN(rc);
}
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
ENTRY;
- RETURN((struct dt_key *)it->oie_dirent64.d_name);
+ RETURN((struct dt_key *)it->oie_dirent->oied_name);
}
/**
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
ENTRY;
- RETURN(it->oie_namelen);
+ RETURN(it->oie_dirent->oied_namelen);
}
/**
ENTRY;
dev = osd_dev(ldev);
- id->oii_ino = it->oie_dirent64.d_ino;
+ id->oii_ino = it->oie_dirent->oied_ino;
id->oii_gen = OSD_OII_NOGEN;
inode = osd_iget(info, dev, id);
if (!IS_ERR(inode)) {
dentry->d_inode = inode;
LASSERT(dentry->d_inode->i_sb == osd_sb(dev));
} else {
- CERROR("Error getting inode for ino =%d", id->oii_ino);
RETURN((struct dt_rec *) PTR_ERR(inode));
}
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
ENTRY;
- RETURN(it->oie_curr_pos);
+ RETURN(it->oie_dirent->oied_off);
}
/**
struct osd_thread_info *info;
OBD_ALLOC_PTR(info);
- if (info != NULL)
- info->oti_env = container_of(ctx, struct lu_env, le_ctx);
- else
+ if (info != NULL) {
+ OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ if (info->oti_it_ea_buf != NULL) {
+ info->oti_env = container_of(ctx, struct lu_env,
+ le_ctx);
+ } else {
+ OBD_FREE_PTR(info);
+ info = ERR_PTR(-ENOMEM);
+ }
+ } else {
info = ERR_PTR(-ENOMEM);
+ }
return info;
}
-/* context key destructor: osd_key_fini */
-LU_KEY_FINI(osd, struct osd_thread_info);
+static void osd_key_fini(const struct lu_context *ctx,
+ struct lu_context_key *key, void* data)
+{
+ struct osd_thread_info *info = data;
+
+ OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ OBD_FREE_PTR(info);
+}
static void osd_key_exit(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
if (inode == NULL) {
CERROR("no inode\n");
inode = ERR_PTR(-EACCES);
- } else if (is_bad_inode(inode)) {
- CERROR("bad inode\n");
- iput(inode);
- inode = ERR_PTR(-ENOENT);
} else if (id->oii_gen != OSD_OII_NOGEN &&
inode->i_generation != id->oii_gen) {
- CERROR("stale inode\n");
iput(inode);
inode = ERR_PTR(-ESTALE);
+ } else if (inode->i_nlink == 0) {
+ /* due to parallel readdir and unlink,
+ * we can have dead inode here. */
+ CWARN("stale inode\n");
+ make_bad_inode(inode);
+ iput(inode);
+ inode = ERR_PTR(-ESTALE);
+ } else if (is_bad_inode(inode)) {
+ CERROR("bad inode %lx\n",inode->i_ino);
+ iput(inode);
+ inode = ERR_PTR(-ENOENT);
}
-
return inode;
}