/* llo_* api support */
#include <md_object.h>
+#ifdef HAVE_LDISKFS_PDO
+int ldiskfs_pdo = 1;
+CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
+ "ldiskfs with parallel directory operations");
+#else
+int ldiskfs_pdo = 0;
+#endif
+
static const char dot[] = ".";
static const char dotdot[] = "..";
static const char remote_obj_dir[] = "REM_OBJ_DIR";
/**
* to protect index ops.
*/
+ struct htree_lock_head *oo_hl_head;
cfs_rw_semaphore_t oo_ext_idx_sem;
cfs_rw_semaphore_t oo_sem;
struct osd_directory *oo_dir;
cfs_init_rwsem(&mo->oo_ext_idx_sem);
cfs_spin_lock_init(&mo->oo_guard);
return l;
- } else
+ } else {
return NULL;
+ }
}
/*
RETURN(-ENOENT);
result = osd_oi_lookup(info, oi, fid, id);
- if (result == 0) {
- inode = osd_iget(info, dev, id);
- if (!IS_ERR(inode)) {
- obj->oo_inode = inode;
- LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
- if (dev->od_iop_mode) {
- obj->oo_compat_dot_created = 1;
- obj->oo_compat_dotdot_created = 1;
- }
+ if (result != 0) {
+ if (result == -ENOENT)
result = 0;
- } else
- /*
- * If fid wasn't found in oi, inode-less object is
- * created, for which lu_object_exists() returns
- * false. This is used in a (frequent) case when
- * objects are created as locking anchors or
- * place holders for objects yet to be created.
- */
- result = PTR_ERR(inode);
- } else if (result == -ENOENT)
- result = 0;
+ goto out;
+ }
+
+ inode = osd_iget(info, dev, id);
+ if (IS_ERR(inode)) {
+ /*
+ * If fid wasn't found in oi, inode-less object is
+ * created, for which lu_object_exists() returns
+ * false. This is used in a (frequent) case when
+ * objects are created as locking anchors or
+ * place holders for objects yet to be created.
+ */
+ result = PTR_ERR(inode);
+ goto out;
+ }
+
+ obj->oo_inode = inode;
+ LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
+ if (dev->od_iop_mode) {
+ obj->oo_compat_dot_created = 1;
+ obj->oo_compat_dotdot_created = 1;
+ }
+
+ if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
+ goto out;
+
+ LASSERT(obj->oo_hl_head == NULL);
+ obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+ if (obj->oo_hl_head == NULL) {
+ obj->oo_inode = NULL;
+ iput(inode);
+ result = -ENOMEM;
+ }
+out:
LINVRNT(osd_invariant(obj));
RETURN(result);
LINVRNT(osd_invariant(obj));
dt_object_fini(&obj->oo_dt);
+ if (obj->oo_hl_head != NULL)
+ ldiskfs_htree_lock_head_free(obj->oo_hl_head);
OBD_FREE_PTR(obj);
}
LINVRNT(osd_invariant(obj));
LASSERT(obj->oo_inode == NULL);
+ LASSERT(obj->oo_hl_head == NULL);
+
+ if (S_ISDIR(mode) && ldiskfs_pdo) {
+ obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+ if (obj->oo_hl_head == NULL)
+ return -ENOMEM;
+ }
oth = container_of(th, struct osd_thandle, ot_super);
LASSERT(oth->ot_handle->h_transaction != NULL);
inode->i_flags |= S_NOCMTIME;
obj->oo_inode = inode;
result = 0;
- } else
+ } else {
+ if (obj->oo_hl_head != NULL) {
+ ldiskfs_htree_lock_head_free(obj->oo_hl_head);
+ obj->oo_hl_head = NULL;
+ }
result = PTR_ERR(inode);
+ }
LINVRNT(osd_invariant(obj));
return result;
}
else
result = 0;
cfs_up_write(&obj->oo_ext_idx_sem);
- } else
+ } else {
result = -ENOMEM;
- } else
+ }
+ } else {
result = 0;
+ }
if (result == 0 && ea_dir == 0) {
if (!osd_iam_index_probe(env, obj, feat))
struct osd_thandle *oh;
struct ldiskfs_dir_entry_2 *de;
struct buffer_head *bh;
+ struct htree_lock *hlock = NULL;
int rc;
dentry = osd_child_dentry_get(env, obj,
(char *)key, strlen((char *)key));
- cfs_down_write(&obj->oo_ext_idx_sem);
- bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+ if (obj->oo_hl_head != NULL) {
+ hlock = osd_oti_get(env)->oti_hlock;
+ ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+ dir, LDISKFS_HLOCK_DEL);
+ } else {
+ cfs_down_write(&obj->oo_ext_idx_sem);
+ }
+
+ bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
if (bh) {
rc = ldiskfs_delete_entry(oh->ot_handle,
- dir, de, bh);
+ dir, de, bh);
brelse(bh);
- } else
+ } else {
rc = -ENOENT;
+ }
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
+ else
+ cfs_up_write(&obj->oo_ext_idx_sem);
- cfs_up_write(&obj->oo_ext_idx_sem);
LASSERT(osd_invariant(obj));
RETURN(rc);
}
struct inode *cinode,
const char *name,
const struct dt_rec *fid,
+ struct htree_lock *hlock,
struct thandle *th)
{
struct ldiskfs_dentry_param *ldp;
child->d_fsdata = (void*) ldp;
} else
child->d_fsdata = NULL;
- rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+ rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
RETURN(rc);
}
/* in case of rename, dotdot is already created */
if (dir->oo_compat_dotdot_created) {
return __osd_ea_add_rec(info, dir, parent_dir, name,
- dot_dot_fid, th);
+ dot_dot_fid, NULL, th);
}
- result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
- dot_ldp, dot_dot_ldp);
+ result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
+ inode, dot_ldp, dot_dot_ldp);
if (result == 0)
dir->oo_compat_dotdot_created = 1;
}
struct thandle *th)
{
struct osd_thread_info *info = osd_oti_get(env);
+ struct htree_lock *hlock;
int rc;
+ hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
+
if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
- name[2] =='\0')))
+ name[2] =='\0'))) {
+ if (hlock != NULL) {
+ ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+ pobj->oo_inode, 0);
+ } else {
+ cfs_down_write(&pobj->oo_ext_idx_sem);
+ }
rc = osd_add_dot_dotdot(info, pobj, cinode, name,
(struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
fid, th);
+ } else {
+ if (hlock != NULL) {
+ ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+ pobj->oo_inode, LDISKFS_HLOCK_ADD);
+ } else {
+ cfs_down_write(&pobj->oo_ext_idx_sem);
+ }
+
+ rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
+ hlock, th);
+ }
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
else
- rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
+ cfs_up_write(&pobj->oo_ext_idx_sem);
return rc;
}
struct ldiskfs_dir_entry_2 *de;
struct buffer_head *bh;
struct lu_fid *fid = (struct lu_fid *) rec;
+ struct htree_lock *hlock = NULL;
int ino;
int rc;
dentry = osd_child_dentry_get(env, obj,
(char *)key, strlen((char *)key));
- cfs_down_read(&obj->oo_ext_idx_sem);
- bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+ if (obj->oo_hl_head != NULL) {
+ hlock = osd_oti_get(env)->oti_hlock;
+ ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+ dir, LDISKFS_HLOCK_LOOKUP);
+ } else {
+ cfs_down_read(&obj->oo_ext_idx_sem);
+ }
+
+ bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
if (bh) {
ino = le32_to_cpu(de->inode);
rc = osd_get_fid_from_dentry(de, rec);
brelse(bh);
if (rc != 0)
rc = osd_ea_fid_get(env, obj, ino, fid);
- } else
+ } else {
rc = -ENOENT;
+ }
- cfs_up_read(&obj->oo_ext_idx_sem);
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
+ else
+ cfs_up_read(&obj->oo_ext_idx_sem);
RETURN (rc);
}
else
cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
#endif
- cfs_down_write(&obj->oo_ext_idx_sem);
rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
- cfs_up_write(&obj->oo_ext_idx_sem);
#ifdef HAVE_QUOTA_SUPPORT
cfs_curproc_cap_unpack(save);
#endif
* \retval 0 on success
* \retval -ve on error
*/
-static int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct lu_env *env,
+ const struct dt_it *di)
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
struct osd_object *obj = it->oie_obj;
struct inode *inode = obj->oo_inode;
- int result = 0;
+ struct htree_lock *hlock = NULL;
+ int result = 0;
ENTRY;
it->oie_dirent = it->oie_buf;
it->oie_rd_dirent = 0;
- cfs_down_read(&obj->oo_ext_idx_sem);
+ if (obj->oo_hl_head != NULL) {
+ hlock = osd_oti_get(env)->oti_hlock;
+ ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+ inode, LDISKFS_HLOCK_READDIR);
+ } else {
+ cfs_down_read(&obj->oo_ext_idx_sem);
+ }
+
result = inode->i_fop->readdir(&it->oie_file, it,
(filldir_t) osd_ldiskfs_filldir);
- cfs_up_read(&obj->oo_ext_idx_sem);
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
+ else
+ cfs_up_read(&obj->oo_ext_idx_sem);
if (it->oie_rd_dirent == 0) {
result = -EIO;
if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
rc = +1;
else
- rc = osd_ldiskfs_it_fill(di);
+ rc = osd_ldiskfs_it_fill(env, di);
}
RETURN(rc);
ENTRY;
it->oie_file.f_pos = hash;
- rc = osd_ldiskfs_it_fill(di);
+ rc = osd_ldiskfs_it_fill(env, di);
if (rc == 0)
rc = +1;
struct osd_thread_info *info;
OBD_ALLOC_PTR(info);
- if (info != NULL) {
- OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
- if (info->oti_it_ea_buf != NULL) {
- info->oti_env = container_of(ctx, struct lu_env,
- le_ctx);
- } else {
- OBD_FREE_PTR(info);
- info = ERR_PTR(-ENOMEM);
- }
- } else {
- info = ERR_PTR(-ENOMEM);
- }
+ if (info == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ if (info->oti_it_ea_buf == NULL)
+ goto out_free_info;
+
+ info->oti_env = container_of(ctx, struct lu_env, le_ctx);
+
+ info->oti_hlock = ldiskfs_htree_lock_alloc();
+ if (info->oti_hlock == NULL)
+ goto out_free_ea;
+
return info;
+
+ out_free_ea:
+ OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ out_free_info:
+ OBD_FREE_PTR(info);
+ return ERR_PTR(-ENOMEM);
}
static void osd_key_fini(const struct lu_context *ctx,
{
struct osd_thread_info *info = data;
+ if (info->oti_hlock != NULL)
+ ldiskfs_htree_lock_free(info->oti_hlock);
OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
OBD_FREE_PTR(info);
}