/* llo_* api support */
#include <md_object.h>
+#ifdef HAVE_LDISKFS_PDO
+int ldiskfs_pdo = 1;
+CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
+ "ldiskfs with parallel directory operations");
+#else
+int ldiskfs_pdo = 0;
+#endif
+
static const char dot[] = ".";
static const char dotdot[] = "..";
static const char remote_obj_dir[] = "REM_OBJ_DIR";
/**
* to protect index ops.
*/
+ struct htree_lock_head *oo_hl_head;
cfs_rw_semaphore_t oo_ext_idx_sem;
cfs_rw_semaphore_t oo_sem;
struct osd_directory *oo_dir;
struct thandle ot_super;
handle_t *ot_handle;
struct journal_callback ot_jcb;
+ cfs_list_t ot_dcb_list;
/* Link to the device, for debugging. */
struct lu_ref_link *ot_dev_link;
cfs_init_rwsem(&mo->oo_ext_idx_sem);
cfs_spin_lock_init(&mo->oo_guard);
return l;
- } else
+ } else {
return NULL;
+ }
}
/*
RETURN(-ENOENT);
result = osd_oi_lookup(info, oi, fid, id);
- if (result == 0) {
- inode = osd_iget(info, dev, id);
- if (!IS_ERR(inode)) {
- obj->oo_inode = inode;
- LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
- if (dev->od_iop_mode) {
- obj->oo_compat_dot_created = 1;
- obj->oo_compat_dotdot_created = 1;
- }
+ if (result != 0) {
+ if (result == -ENOENT)
result = 0;
- } else
- /*
- * If fid wasn't found in oi, inode-less object is
- * created, for which lu_object_exists() returns
- * false. This is used in a (frequent) case when
- * objects are created as locking anchors or
- * place holders for objects yet to be created.
- */
- result = PTR_ERR(inode);
- } else if (result == -ENOENT)
- result = 0;
+ goto out;
+ }
+
+ inode = osd_iget(info, dev, id);
+ if (IS_ERR(inode)) {
+ /*
+ * If fid wasn't found in oi, inode-less object is
+ * created, for which lu_object_exists() returns
+ * false. This is used in a (frequent) case when
+ * objects are created as locking anchors or
+ * place holders for objects yet to be created.
+ */
+ result = PTR_ERR(inode);
+ goto out;
+ }
+
+ obj->oo_inode = inode;
+ LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
+ if (dev->od_iop_mode) {
+ obj->oo_compat_dot_created = 1;
+ obj->oo_compat_dotdot_created = 1;
+ }
+
+ if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
+ goto out;
+
+ LASSERT(obj->oo_hl_head == NULL);
+ obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+ if (obj->oo_hl_head == NULL) {
+ obj->oo_inode = NULL;
+ iput(inode);
+ result = -ENOMEM;
+ }
+out:
LINVRNT(osd_invariant(obj));
RETURN(result);
LINVRNT(osd_invariant(obj));
dt_object_fini(&obj->oo_dt);
+ if (obj->oo_hl_head != NULL)
+ ldiskfs_htree_lock_head_free(obj->oo_hl_head);
OBD_FREE_PTR(obj);
}
{
struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
struct thandle *th = &oh->ot_super;
- struct dt_device *dev = th->th_dev;
- struct lu_device *lud = &dev->dd_lu_dev;
+ struct lu_device *lud = &th->th_dev->dd_lu_dev;
+ struct dt_txn_commit_cb *dcb, *tmp;
- LASSERT(dev != NULL);
LASSERT(oh->ot_handle == NULL);
- if (error) {
+ if (error)
CERROR("transaction @0x%p commit error: %d\n", th, error);
- } else {
- struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
- /*
- * This od_env_for_commit is only for commit usage. see
- * "struct dt_device"
- */
- lu_context_enter(&env->le_ctx);
- dt_txn_hook_commit(env, th);
- lu_context_exit(&env->le_ctx);
- }
+
+ dt_txn_hook_commit(th);
+
+ /* call per-transaction callbacks if any */
+ cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
+ dcb->dcb_func(NULL, th, dcb, error);
lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
lu_device_put(lud);
* be used.
*/
oti->oti_dev = dev;
+ CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
osd_th_alloced(oh);
jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
osd_th_started(oh);
th = &oh->ot_super;
th->th_dev = d;
th->th_result = 0;
- jh->h_sync = p->tp_sync;
+ th->th_sync = 0;
lu_device_get(&d->dd_lu_dev);
oh->ot_dev_link = lu_ref_add
(&d->dd_lu_dev.ld_reference,
/* add commit callback */
lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
lu_context_enter(&th->th_ctx);
- osd_journal_callback_set(jh,osd_trans_commit_cb,
- &oh->ot_jcb);
LASSERT(oti->oti_txns == 0);
LASSERT(oti->oti_r_locks == 0);
LASSERT(oti->oti_w_locks == 0);
if (oh->ot_handle != NULL) {
handle_t *hdl = oh->ot_handle;
+ hdl->h_sync = th->th_sync;
+ /*
+ * add commit callback
+ * notice we don't do this in osd_trans_start()
+ * as underlying transaction can change during truncate
+ */
+ osd_journal_callback_set(hdl, osd_trans_commit_cb,
+ &oh->ot_jcb);
+
LASSERT(oti->oti_txns == 1);
oti->oti_txns--;
LASSERT(oti->oti_r_locks == 0);
result = ldiskfs_journal_stop(hdl));
if (result != 0)
CERROR("Failure to stop transaction: %d\n", result);
+ } else {
+ OBD_FREE_PTR(oh);
}
EXIT;
}
+static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
+{
+ struct osd_thandle *oh = container_of0(th, struct osd_thandle,
+ ot_super);
+
+ cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
+
+ return 0;
+}
+
/*
* Concurrency: no concurrent access is possible that late in object
* life-cycle.
const struct dt_device *dev,
struct dt_device_param *param)
{
+ struct super_block *sb = osd_sb(osd_dt_dev(dev));
+
/*
* XXX should be taken from not-yet-existing fs abstraction layer.
*/
- param->ddp_max_name_len = LDISKFS_NAME_LEN;
- param->ddp_max_nlink = LDISKFS_LINK_MAX;
- param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
+ param->ddp_max_name_len = LDISKFS_NAME_LEN;
+ param->ddp_max_nlink = LDISKFS_LINK_MAX;
+ param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
+ param->ddp_mntopts = 0;
+ if (test_opt(sb, XATTR_USER))
+ param->ddp_mntopts |= MNTOPT_USERXATTR;
+ if (test_opt(sb, POSIX_ACL))
+ param->ddp_mntopts |= MNTOPT_ACL;
}
/**
.dt_statfs = osd_statfs,
.dt_trans_start = osd_trans_start,
.dt_trans_stop = osd_trans_stop,
+ .dt_trans_cb_add = osd_trans_cb_add,
.dt_conf_get = osd_conf_get,
.dt_sync = osd_sync,
.dt_ro = osd_ro,
LINVRNT(osd_invariant(obj));
LASSERT(obj->oo_inode == NULL);
+ LASSERT(obj->oo_hl_head == NULL);
+
+ if (S_ISDIR(mode) && ldiskfs_pdo) {
+ obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+ if (obj->oo_hl_head == NULL)
+ return -ENOMEM;
+ }
oth = container_of(th, struct osd_thandle, ot_super);
LASSERT(oth->ot_handle->h_transaction != NULL);
inode->i_flags |= S_NOCMTIME;
obj->oo_inode = inode;
result = 0;
- } else
+ } else {
+ if (obj->oo_hl_head != NULL) {
+ ldiskfs_htree_lock_head_free(obj->oo_hl_head);
+ obj->oo_hl_head = NULL;
+ }
result = PTR_ERR(inode);
+ }
LINVRNT(osd_invariant(obj));
return result;
}
else
result = 0;
cfs_up_write(&obj->oo_ext_idx_sem);
- } else
+ } else {
result = -ENOMEM;
- } else
+ }
+ } else {
result = 0;
+ }
if (result == 0 && ea_dir == 0) {
if (!osd_iam_index_probe(env, obj, feat))
struct osd_thandle *oh;
struct ldiskfs_dir_entry_2 *de;
struct buffer_head *bh;
+ struct htree_lock *hlock = NULL;
int rc;
dentry = osd_child_dentry_get(env, obj,
(char *)key, strlen((char *)key));
- cfs_down_write(&obj->oo_ext_idx_sem);
- bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+ if (obj->oo_hl_head != NULL) {
+ hlock = osd_oti_get(env)->oti_hlock;
+ ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+ dir, LDISKFS_HLOCK_DEL);
+ } else {
+ cfs_down_write(&obj->oo_ext_idx_sem);
+ }
+
+ bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
if (bh) {
rc = ldiskfs_delete_entry(oh->ot_handle,
- dir, de, bh);
+ dir, de, bh);
brelse(bh);
- } else
+ } else {
rc = -ENOENT;
+ }
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
+ else
+ cfs_up_write(&obj->oo_ext_idx_sem);
- cfs_up_write(&obj->oo_ext_idx_sem);
LASSERT(osd_invariant(obj));
RETURN(rc);
}
struct inode *cinode,
const char *name,
const struct dt_rec *fid,
+ struct htree_lock *hlock,
struct thandle *th)
{
struct ldiskfs_dentry_param *ldp;
child->d_fsdata = (void*) ldp;
} else
child->d_fsdata = NULL;
- rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+ rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
RETURN(rc);
}
/* in case of rename, dotdot is already created */
if (dir->oo_compat_dotdot_created) {
return __osd_ea_add_rec(info, dir, parent_dir, name,
- dot_dot_fid, th);
+ dot_dot_fid, NULL, th);
}
- result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
- dot_ldp, dot_dot_ldp);
+ result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
+ inode, dot_ldp, dot_dot_ldp);
if (result == 0)
dir->oo_compat_dotdot_created = 1;
}
struct thandle *th)
{
struct osd_thread_info *info = osd_oti_get(env);
+ struct htree_lock *hlock;
int rc;
+ hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
+
if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
- name[2] =='\0')))
+ name[2] =='\0'))) {
+ if (hlock != NULL) {
+ ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+ pobj->oo_inode, 0);
+ } else {
+ cfs_down_write(&pobj->oo_ext_idx_sem);
+ }
rc = osd_add_dot_dotdot(info, pobj, cinode, name,
(struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
fid, th);
+ } else {
+ if (hlock != NULL) {
+ ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+ pobj->oo_inode, LDISKFS_HLOCK_ADD);
+ } else {
+ cfs_down_write(&pobj->oo_ext_idx_sem);
+ }
+
+ rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
+ hlock, th);
+ }
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
else
- rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
+ cfs_up_write(&pobj->oo_ext_idx_sem);
return rc;
}
struct ldiskfs_dir_entry_2 *de;
struct buffer_head *bh;
struct lu_fid *fid = (struct lu_fid *) rec;
+ struct htree_lock *hlock = NULL;
int ino;
int rc;
dentry = osd_child_dentry_get(env, obj,
(char *)key, strlen((char *)key));
- cfs_down_read(&obj->oo_ext_idx_sem);
- bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+ if (obj->oo_hl_head != NULL) {
+ hlock = osd_oti_get(env)->oti_hlock;
+ ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+ dir, LDISKFS_HLOCK_LOOKUP);
+ } else {
+ cfs_down_read(&obj->oo_ext_idx_sem);
+ }
+
+ bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
if (bh) {
ino = le32_to_cpu(de->inode);
rc = osd_get_fid_from_dentry(de, rec);
brelse(bh);
if (rc != 0)
rc = osd_ea_fid_get(env, obj, ino, fid);
- } else
+ } else {
rc = -ENOENT;
+ }
- cfs_up_read(&obj->oo_ext_idx_sem);
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
+ else
+ cfs_up_read(&obj->oo_ext_idx_sem);
RETURN (rc);
}
else
cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
#endif
- cfs_down_write(&obj->oo_ext_idx_sem);
rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
- cfs_up_write(&obj->oo_ext_idx_sem);
#ifdef HAVE_QUOTA_SUPPORT
cfs_curproc_cap_unpack(save);
#endif
* \retval 0 on success
* \retval -ve on error
*/
-static int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct lu_env *env,
+ const struct dt_it *di)
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
struct osd_object *obj = it->oie_obj;
struct inode *inode = obj->oo_inode;
- int result = 0;
+ struct htree_lock *hlock = NULL;
+ int result = 0;
ENTRY;
it->oie_dirent = it->oie_buf;
it->oie_rd_dirent = 0;
- cfs_down_read(&obj->oo_ext_idx_sem);
+ if (obj->oo_hl_head != NULL) {
+ hlock = osd_oti_get(env)->oti_hlock;
+ ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+ inode, LDISKFS_HLOCK_READDIR);
+ } else {
+ cfs_down_read(&obj->oo_ext_idx_sem);
+ }
+
result = inode->i_fop->readdir(&it->oie_file, it,
(filldir_t) osd_ldiskfs_filldir);
- cfs_up_read(&obj->oo_ext_idx_sem);
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
+ else
+ cfs_up_read(&obj->oo_ext_idx_sem);
if (it->oie_rd_dirent == 0) {
result = -EIO;
if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
rc = +1;
else
- rc = osd_ldiskfs_it_fill(di);
+ rc = osd_ldiskfs_it_fill(env, di);
}
RETURN(rc);
ENTRY;
it->oie_file.f_pos = hash;
- rc = osd_ldiskfs_it_fill(di);
+ rc = osd_ldiskfs_it_fill(env, di);
if (rc == 0)
rc = +1;
struct osd_thread_info *info;
OBD_ALLOC_PTR(info);
- if (info != NULL) {
- OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
- if (info->oti_it_ea_buf != NULL) {
- info->oti_env = container_of(ctx, struct lu_env,
- le_ctx);
- } else {
- OBD_FREE_PTR(info);
- info = ERR_PTR(-ENOMEM);
- }
- } else {
- info = ERR_PTR(-ENOMEM);
- }
+ if (info == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ if (info->oti_it_ea_buf == NULL)
+ goto out_free_info;
+
+ info->oti_env = container_of(ctx, struct lu_env, le_ctx);
+
+ info->oti_hlock = ldiskfs_htree_lock_alloc();
+ if (info->oti_hlock == NULL)
+ goto out_free_ea;
+
return info;
+
+ out_free_ea:
+ OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ out_free_info:
+ OBD_FREE_PTR(info);
+ return ERR_PTR(-ENOMEM);
}
static void osd_key_fini(const struct lu_context *ctx,
{
struct osd_thread_info *info = data;
+ if (info->oti_hlock != NULL)
+ ldiskfs_htree_lock_free(info->oti_hlock);
OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
OBD_FREE_PTR(info);
}
static int osd_device_init(const struct lu_env *env, struct lu_device *d,
const char *name, struct lu_device *next)
{
- int rc;
- struct lu_context *ctx;
-
- /* context for commit hooks */
- ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
- rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
- if (rc == 0) {
- rc = osd_procfs_init(osd_dev(d), name);
- ctx->lc_cookie = 0x3;
- }
- return rc;
+ return osd_procfs_init(osd_dev(d), name);
}
static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
osd_dev(d)->od_mount->lmi_mnt);
osd_dev(d)->od_mount = NULL;
- lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
RETURN(NULL);
}