X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd-ldiskfs%2Fosd_handler.c;h=788eb3178197761e71906ca8b4a37168cd6defe8;hp=9e4f75b40ab1dee45d6529830cb4f9fa279a780a;hb=8e858671be59ed53fad2d340cf841b026943cc8a;hpb=d7e86afb0c64b5107a5aff8c45b962ec0104d255 diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 9e4f75b..788eb31 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -82,6 +82,14 @@ /* llo_* api support */ #include +#ifdef HAVE_LDISKFS_PDO +int ldiskfs_pdo = 1; +CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644, + "ldiskfs with parallel directory operations"); +#else +int ldiskfs_pdo = 0; +#endif + static const char dot[] = "."; static const char dotdot[] = ".."; static const char remote_obj_dir[] = "REM_OBJ_DIR"; @@ -104,6 +112,7 @@ struct osd_object { /** * to protect index ops. */ + struct htree_lock_head *oo_hl_head; cfs_rw_semaphore_t oo_ext_idx_sem; cfs_rw_semaphore_t oo_sem; struct osd_directory *oo_dir; @@ -315,8 +324,9 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, cfs_init_rwsem(&mo->oo_ext_idx_sem); cfs_spin_lock_init(&mo->oo_guard); return l; - } else + } else { return NULL; + } } /* @@ -398,27 +408,43 @@ static int osd_fid_lookup(const struct lu_env *env, RETURN(-ENOENT); result = osd_oi_lookup(info, oi, fid, id); - if (result == 0) { - inode = osd_iget(info, dev, id); - if (!IS_ERR(inode)) { - obj->oo_inode = inode; - LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); - if (dev->od_iop_mode) { - obj->oo_compat_dot_created = 1; - obj->oo_compat_dotdot_created = 1; - } + if (result != 0) { + if (result == -ENOENT) result = 0; - } else - /* - * If fid wasn't found in oi, inode-less object is - * created, for which lu_object_exists() returns - * false. This is used in a (frequent) case when - * objects are created as locking anchors or - * place holders for objects yet to be created. - */ - result = PTR_ERR(inode); - } else if (result == -ENOENT) - result = 0; + goto out; + } + + inode = osd_iget(info, dev, id); + if (IS_ERR(inode)) { + /* + * If fid wasn't found in oi, inode-less object is + * created, for which lu_object_exists() returns + * false. This is used in a (frequent) case when + * objects are created as locking anchors or + * place holders for objects yet to be created. + */ + result = PTR_ERR(inode); + goto out; + } + + obj->oo_inode = inode; + LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); + if (dev->od_iop_mode) { + obj->oo_compat_dot_created = 1; + obj->oo_compat_dotdot_created = 1; + } + + if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */ + goto out; + + LASSERT(obj->oo_hl_head == NULL); + obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF); + if (obj->oo_hl_head == NULL) { + obj->oo_inode = NULL; + iput(inode); + result = -ENOMEM; + } +out: LINVRNT(osd_invariant(obj)); RETURN(result); @@ -467,6 +493,8 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l) LINVRNT(osd_invariant(obj)); dt_object_fini(&obj->oo_dt); + if (obj->oo_hl_head != NULL) + ldiskfs_htree_lock_head_free(obj->oo_hl_head); OBD_FREE_PTR(obj); } @@ -1536,6 +1564,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); + LASSERT(obj->oo_hl_head == NULL); + + if (S_ISDIR(mode) && ldiskfs_pdo) { + obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF); + if (obj->oo_hl_head == NULL) + return -ENOMEM; + } oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); @@ -1563,8 +1598,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, inode->i_flags |= S_NOCMTIME; obj->oo_inode = inode; result = 0; - } else + } else { + if (obj->oo_hl_head != NULL) { + ldiskfs_htree_lock_head_free(obj->oo_hl_head); + obj->oo_hl_head = NULL; + } result = PTR_ERR(inode); + } LINVRNT(osd_invariant(obj)); return result; } @@ -2400,10 +2440,12 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, else result = 0; cfs_up_write(&obj->oo_ext_idx_sem); - } else + } else { result = -ENOMEM; - } else + } + } else { result = 0; + } if (result == 0 && ea_dir == 0) { if (!osd_iam_index_probe(env, obj, feat)) @@ -2771,6 +2813,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, struct osd_thandle *oh; struct ldiskfs_dir_entry_2 *de; struct buffer_head *bh; + struct htree_lock *hlock = NULL; int rc; @@ -2790,16 +2833,27 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); - cfs_down_write(&obj->oo_ext_idx_sem); - bh = ll_ldiskfs_find_entry(dir, dentry, &de); + if (obj->oo_hl_head != NULL) { + hlock = osd_oti_get(env)->oti_hlock; + ldiskfs_htree_lock(hlock, obj->oo_hl_head, + dir, LDISKFS_HLOCK_DEL); + } else { + cfs_down_write(&obj->oo_ext_idx_sem); + } + + bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); if (bh) { rc = ldiskfs_delete_entry(oh->ot_handle, - dir, de, bh); + dir, de, bh); brelse(bh); - } else + } else { rc = -ENOENT; + } + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + cfs_up_write(&obj->oo_ext_idx_sem); - cfs_up_write(&obj->oo_ext_idx_sem); LASSERT(osd_invariant(obj)); RETURN(rc); } @@ -2940,6 +2994,7 @@ static int __osd_ea_add_rec(struct osd_thread_info *info, struct inode *cinode, const char *name, const struct dt_rec *fid, + struct htree_lock *hlock, struct thandle *th) { struct ldiskfs_dentry_param *ldp; @@ -2960,7 +3015,7 @@ static int __osd_ea_add_rec(struct osd_thread_info *info, child->d_fsdata = (void*) ldp; } else child->d_fsdata = NULL; - rc = ldiskfs_add_entry(oth->ot_handle, child, cinode); + rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock); RETURN(rc); } @@ -3018,11 +3073,11 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info, /* in case of rename, dotdot is already created */ if (dir->oo_compat_dotdot_created) { return __osd_ea_add_rec(info, dir, parent_dir, name, - dot_dot_fid, th); + dot_dot_fid, NULL, th); } - result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode, - dot_ldp, dot_dot_ldp); + result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, + inode, dot_ldp, dot_dot_ldp); if (result == 0) dir->oo_compat_dotdot_created = 1; } @@ -3043,15 +3098,37 @@ static int osd_ea_add_rec(const struct lu_env *env, struct thandle *th) { struct osd_thread_info *info = osd_oti_get(env); + struct htree_lock *hlock; int rc; + hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL; + if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' && - name[2] =='\0'))) + name[2] =='\0'))) { + if (hlock != NULL) { + ldiskfs_htree_lock(hlock, pobj->oo_hl_head, + pobj->oo_inode, 0); + } else { + cfs_down_write(&pobj->oo_ext_idx_sem); + } rc = osd_add_dot_dotdot(info, pobj, cinode, name, (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu), fid, th); + } else { + if (hlock != NULL) { + ldiskfs_htree_lock(hlock, pobj->oo_hl_head, + pobj->oo_inode, LDISKFS_HLOCK_ADD); + } else { + cfs_down_write(&pobj->oo_ext_idx_sem); + } + + rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, + hlock, th); + } + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); else - rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th); + cfs_up_write(&pobj->oo_ext_idx_sem); return rc; } @@ -3072,6 +3149,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, struct ldiskfs_dir_entry_2 *de; struct buffer_head *bh; struct lu_fid *fid = (struct lu_fid *) rec; + struct htree_lock *hlock = NULL; int ino; int rc; @@ -3080,8 +3158,15 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); - cfs_down_read(&obj->oo_ext_idx_sem); - bh = ll_ldiskfs_find_entry(dir, dentry, &de); + if (obj->oo_hl_head != NULL) { + hlock = osd_oti_get(env)->oti_hlock; + ldiskfs_htree_lock(hlock, obj->oo_hl_head, + dir, LDISKFS_HLOCK_LOOKUP); + } else { + cfs_down_read(&obj->oo_ext_idx_sem); + } + + bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock); if (bh) { ino = le32_to_cpu(de->inode); rc = osd_get_fid_from_dentry(de, rec); @@ -3090,10 +3175,14 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, brelse(bh); if (rc != 0) rc = osd_ea_fid_get(env, obj, ino, fid); - } else + } else { rc = -ENOENT; + } - cfs_up_read(&obj->oo_ext_idx_sem); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + cfs_up_read(&obj->oo_ext_idx_sem); RETURN (rc); } @@ -3195,9 +3284,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, else cfs_cap_lower(CFS_CAP_SYS_RESOURCE); #endif - cfs_down_write(&obj->oo_ext_idx_sem); rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th); - cfs_up_write(&obj->oo_ext_idx_sem); #ifdef HAVE_QUOTA_SUPPORT cfs_curproc_cap_unpack(save); #endif @@ -3616,22 +3703,34 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, * \retval 0 on success * \retval -ve on error */ -static int osd_ldiskfs_it_fill(const struct dt_it *di) +static int osd_ldiskfs_it_fill(const struct lu_env *env, + const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; struct inode *inode = obj->oo_inode; - int result = 0; + struct htree_lock *hlock = NULL; + int result = 0; ENTRY; it->oie_dirent = it->oie_buf; it->oie_rd_dirent = 0; - cfs_down_read(&obj->oo_ext_idx_sem); + if (obj->oo_hl_head != NULL) { + hlock = osd_oti_get(env)->oti_hlock; + ldiskfs_htree_lock(hlock, obj->oo_hl_head, + inode, LDISKFS_HLOCK_READDIR); + } else { + cfs_down_read(&obj->oo_ext_idx_sem); + } + result = inode->i_fop->readdir(&it->oie_file, it, (filldir_t) osd_ldiskfs_filldir); - cfs_up_read(&obj->oo_ext_idx_sem); + if (hlock != NULL) + ldiskfs_htree_unlock(hlock); + else + cfs_up_read(&obj->oo_ext_idx_sem); if (it->oie_rd_dirent == 0) { result = -EIO; @@ -3672,7 +3771,7 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) if (it->oie_file.f_pos == LDISKFS_HTREE_EOF) rc = +1; else - rc = osd_ldiskfs_it_fill(di); + rc = osd_ldiskfs_it_fill(env, di); } RETURN(rc); @@ -3777,7 +3876,7 @@ static int osd_it_ea_load(const struct lu_env *env, ENTRY; it->oie_file.f_pos = hash; - rc = osd_ldiskfs_it_fill(di); + rc = osd_ldiskfs_it_fill(env, di); if (rc == 0) rc = +1; @@ -3842,19 +3941,26 @@ static void *osd_key_init(const struct lu_context *ctx, struct osd_thread_info *info; OBD_ALLOC_PTR(info); - if (info != NULL) { - OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); - if (info->oti_it_ea_buf != NULL) { - info->oti_env = container_of(ctx, struct lu_env, - le_ctx); - } else { - OBD_FREE_PTR(info); - info = ERR_PTR(-ENOMEM); - } - } else { - info = ERR_PTR(-ENOMEM); - } + if (info == NULL) + return ERR_PTR(-ENOMEM); + + OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + if (info->oti_it_ea_buf == NULL) + goto out_free_info; + + info->oti_env = container_of(ctx, struct lu_env, le_ctx); + + info->oti_hlock = ldiskfs_htree_lock_alloc(); + if (info->oti_hlock == NULL) + goto out_free_ea; + return info; + + out_free_ea: + OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + out_free_info: + OBD_FREE_PTR(info); + return ERR_PTR(-ENOMEM); } static void osd_key_fini(const struct lu_context *ctx, @@ -3862,6 +3968,8 @@ static void osd_key_fini(const struct lu_context *ctx, { struct osd_thread_info *info = data; + if (info->oti_hlock != NULL) + ldiskfs_htree_lock_free(info->oti_hlock); OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); OBD_FREE_PTR(info); }