X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd%2Fosd_handler.c;h=454b9e3cfb01b3d044fb94d9389d8111186a7f83;hp=6b53bc9a472407e19a98d3621c9094aef6e6b50e;hb=d57062541b587184daab88d544065e04e09ede0d;hpb=b64d843a984a5a715de4dd1df7edf6f7f9e52bcb diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 6b53bc9..454b9e3 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -92,7 +92,6 @@ static const char remote_obj_dir[] = "REM_OBJ_DIR"; struct osd_directory { struct iam_container od_container; struct iam_descr od_descr; - struct semaphore od_sem; }; struct osd_object { @@ -105,6 +104,10 @@ struct osd_object { * creation, or assigned by osd_object_create() under write lock). */ struct inode *oo_inode; + /** + * to protect index ops. + */ + struct rw_semaphore oo_ext_idx_sem; struct rw_semaphore oo_sem; struct osd_directory *oo_dir; /** protects inode attributes. */ @@ -402,6 +405,7 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, l->lo_ops = &osd_lu_obj_ops; init_rwsem(&mo->oo_sem); + init_rwsem(&mo->oo_ext_idx_sem); spin_lock_init(&mo->oo_guard); return l; } else @@ -1331,8 +1335,6 @@ static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj, static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct thandle *th) { - LASSERT(obj->oo_inode != NULL); - osd_object_init0(obj); return 0; } @@ -2076,6 +2078,35 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) RETURN(rc); } +/* + * Get the 64-bit version for an inode. + */ +static dt_obj_version_t osd_object_version_get(const struct lu_env *env, + struct dt_object *dt) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n", + LDISKFS_I(inode)->i_fs_version, inode->i_ino); + return LDISKFS_I(inode)->i_fs_version; +} + +/* + * Set the 64-bit version and return the old version. + */ +static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt, + dt_obj_version_t new_version) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n", + new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino); + LDISKFS_I(inode)->i_fs_version = new_version; + /** Version is set after all inode operations are finished, + * so we should mark it dirty here */ + inode->i_sb->s_op->dirty_inode(inode); +} + static int osd_data_get(const struct lu_env *env, struct dt_object *dt, void **data) { @@ -2104,6 +2135,8 @@ static const struct dt_object_operations osd_obj_ops = { .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, + .do_version_get = osd_object_version_get, + .do_version_set = osd_object_version_set, .do_data_get = osd_data_get, }; @@ -2129,6 +2162,8 @@ static const struct dt_object_operations osd_obj_ea_ops = { .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, + .do_version_get = osd_object_version_get, + .do_version_set = osd_object_version_set, .do_data_get = osd_data_get, }; @@ -2302,7 +2337,6 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, OBD_ALLOC_PTR(dir); if (dir != NULL) { - sema_init(&dir->od_sem, 1); spin_lock(&obj->oo_guard); if (obj->oo_dir == NULL) @@ -2317,7 +2351,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, * Now, that we have container data, serialize its * initialization. */ - down(&obj->oo_dir->od_sem); + down_write(&obj->oo_ext_idx_sem); /* * recheck under lock. */ @@ -2325,7 +2359,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, result = osd_iam_container_init(env, obj, dir); else result = 0; - up(&obj->oo_dir->od_sem); + up_write(&obj->oo_ext_idx_sem); } else result = -ENOMEM; } else @@ -2424,6 +2458,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); + + down_write(&obj->oo_ext_idx_sem); bh = ldiskfs_find_entry(dentry, &de); if (bh) { struct osd_thread_info *oti = osd_oti_get(env); @@ -2444,6 +2480,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, } else rc = -ENOENT; + up_write(&obj->oo_ext_idx_sem); LASSERT(osd_invariant(obj)); RETURN(rc); } @@ -2681,6 +2718,8 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); + + down_read(&obj->oo_ext_idx_sem); bh = ldiskfs_find_entry(dentry, &de); if (bh) { ino = le32_to_cpu(de->inode); @@ -2695,10 +2734,11 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, rc = osd_ea_fid_get(env, dentry, rec); iput(inode); } else - rc = -ENOENT; + rc = PTR_ERR(inode); } else rc = -ENOENT; + up_read(&obj->oo_ext_idx_sem); RETURN (rc); } @@ -2811,8 +2851,9 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, else current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK; #endif + down_write(&obj->oo_ext_idx_sem); rc = osd_ea_add_rec(env, obj, child, name, th); - + up_write(&obj->oo_ext_idx_sem); #ifdef HAVE_QUOTA_SUPPORT current->cap_effective = save; #endif @@ -3031,16 +3072,18 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env, obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); obj_dentry->d_name.hash = 0; - it->oie_namelen = 0; + it->oie_rd_dirent = 0; + it->oie_it_dirent = 0; it->oie_curr_pos = 0; it->oie_next_pos = 0; + it->oie_dirent = NULL; + it->oie_buf = info->oti_it_ea_buf; it->oie_obj = obj; it->oie_file.f_dentry = obj_dentry; it->oie_file.f_mapping = obj->oo_inode->i_mapping; it->oie_file.f_op = obj->oo_inode->i_fop; it->oie_file.private_data = NULL; lu_object_get(lo); - RETURN((struct dt_it*) it); } @@ -3078,9 +3121,11 @@ static int osd_it_ea_get(const struct lu_env *env, ENTRY; LASSERT(((const char *)key)[0] == '\0'); - it->oie_namelen = 0; it->oie_curr_pos = 0; it->oie_next_pos = 0; + it->oie_rd_dirent = 0; + it->oie_it_dirent = 0; + it->oie_dirent = NULL; RETURN(+1); } @@ -3107,22 +3152,27 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, loff_t offset, ino_t ino, unsigned int d_type) { - struct osd_it_ea *it = (struct osd_it_ea *)buf; - struct dirent64 *dirent = &it->oie_dirent64; - + struct osd_it_ea *it = (struct osd_it_ea *)buf; + struct osd_it_ea_dirent *ent = it->oie_dirent; ENTRY; - if (it->oie_namelen) - RETURN(-ENOENT); - if (namelen == 0 || namelen > LDISKFS_NAME_LEN) + /* this should never happen */ + if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) { + CERROR("ldiskfs return invalid namelen %d\n", namelen); RETURN(-EIO); + } + + if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen > + OSD_IT_EA_BUFSIZE) + RETURN(1); - strncpy(dirent->d_name, name, LDISKFS_NAME_LEN); - dirent->d_name[namelen] = 0; - dirent->d_ino = ino; - it->oie_namelen = namelen; - it->oie_curr_pos = offset; + ent->oied_ino = ino; + ent->oied_off = offset; + ent->oied_namelen = namelen; + memcpy(ent->oied_name, name, namelen); + it->oie_rd_dirent++; + it->oie_dirent = (void *) ent + size_round(sizeof(*ent) + namelen); RETURN(0); } @@ -3135,7 +3185,7 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, * \retval 0, on success * \retval -ve, on error */ -int osd_ldiskfs_it_fill(const struct dt_it *di) +static int osd_ldiskfs_it_fill(const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; @@ -3143,16 +3193,23 @@ int osd_ldiskfs_it_fill(const struct dt_it *di) int result = 0; ENTRY; - it->oie_namelen = 0; + it->oie_dirent = it->oie_buf; + it->oie_rd_dirent = 0; it->oie_file.f_pos = it->oie_curr_pos; + down_read(&obj->oo_ext_idx_sem); result = inode->i_fop->readdir(&it->oie_file, it, (filldir_t) osd_ldiskfs_filldir); + up_read(&obj->oo_ext_idx_sem); it->oie_next_pos = it->oie_file.f_pos; - if (it->oie_namelen == 0) + if (it->oie_rd_dirent == 0) { result = -EIO; + } else { + it->oie_dirent = it->oie_buf; + it->oie_it_dirent = 1; + } RETURN(result); } @@ -3174,12 +3231,21 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) int rc; ENTRY; - it->oie_curr_pos = it->oie_next_pos; - if (it->oie_curr_pos == LDISKFS_HTREE_EOF) - rc = +1; - else - rc = osd_ldiskfs_it_fill(di); + if (it->oie_it_dirent < it->oie_rd_dirent) { + it->oie_dirent = (void *) it->oie_dirent + + size_round(sizeof(struct osd_it_ea_dirent) + + it->oie_dirent->oied_namelen); + it->oie_it_dirent++; + RETURN(0); + } else { + it->oie_curr_pos = it->oie_next_pos; + + if (it->oie_curr_pos == LDISKFS_HTREE_EOF) + rc = +1; + else + rc = osd_ldiskfs_it_fill(di); + } RETURN(rc); } @@ -3196,7 +3262,7 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env, { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN((struct dt_key *)it->oie_dirent64.d_name); + RETURN((struct dt_key *)it->oie_dirent->oied_name); } /** @@ -3210,7 +3276,7 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN(it->oie_namelen); + RETURN(it->oie_dirent->oied_namelen); } /** @@ -3237,14 +3303,13 @@ static struct dt_rec *osd_it_ea_rec(const struct lu_env *env, ENTRY; dev = osd_dev(ldev); - id->oii_ino = it->oie_dirent64.d_ino; + id->oii_ino = it->oie_dirent->oied_ino; id->oii_gen = OSD_OII_NOGEN; inode = osd_iget(info, dev, id); if (!IS_ERR(inode)) { dentry->d_inode = inode; LASSERT(dentry->d_inode->i_sb == osd_sb(dev)); } else { - CERROR("Error getting inode for ino =%d", id->oii_ino); RETURN((struct dt_rec *) PTR_ERR(inode)); } @@ -3269,7 +3334,7 @@ static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN(it->oie_curr_pos); + RETURN(it->oie_dirent->oied_off); } /** @@ -3365,15 +3430,29 @@ static void *osd_key_init(const struct lu_context *ctx, struct osd_thread_info *info; OBD_ALLOC_PTR(info); - if (info != NULL) - info->oti_env = container_of(ctx, struct lu_env, le_ctx); - else + if (info != NULL) { + OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + if (info->oti_it_ea_buf != NULL) { + info->oti_env = container_of(ctx, struct lu_env, + le_ctx); + } else { + OBD_FREE_PTR(info); + info = ERR_PTR(-ENOMEM); + } + } else { info = ERR_PTR(-ENOMEM); + } return info; } -/* context key destructor: osd_key_fini */ -LU_KEY_FINI(osd, struct osd_thread_info); +static void osd_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void* data) +{ + struct osd_thread_info *info = data; + + OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + OBD_FREE_PTR(info); +} static void osd_key_exit(const struct lu_context *ctx, struct lu_context_key *key, void *data) @@ -3609,17 +3688,22 @@ static struct inode *osd_iget(struct osd_thread_info *info, if (inode == NULL) { CERROR("no inode\n"); inode = ERR_PTR(-EACCES); - } else if (is_bad_inode(inode)) { - CERROR("bad inode\n"); - iput(inode); - inode = ERR_PTR(-ENOENT); } else if (id->oii_gen != OSD_OII_NOGEN && inode->i_generation != id->oii_gen) { - CERROR("stale inode\n"); iput(inode); inode = ERR_PTR(-ESTALE); + } else if (inode->i_nlink == 0) { + /* due to parallel readdir and unlink, + * we can have dead inode here. */ + CWARN("stale inode\n"); + make_bad_inode(inode); + iput(inode); + inode = ERR_PTR(-ESTALE); + } else if (is_bad_inode(inode)) { + CERROR("bad inode %lx\n",inode->i_ino); + iput(inode); + inode = ERR_PTR(-ENOENT); } - return inode; }