Whamcloud - gitweb
b=19669
[fs/lustre-release.git] / lustre / osd / osd_handler.c
index 6b53bc9..c584859 100644 (file)
@@ -92,7 +92,6 @@ static const char remote_obj_dir[] = "REM_OBJ_DIR";
 struct osd_directory {
         struct iam_container od_container;
         struct iam_descr     od_descr;
-        struct semaphore     od_sem;
 };
 
 struct osd_object {
@@ -105,6 +104,10 @@ struct osd_object {
          * creation, or assigned by osd_object_create() under write lock).
          */
         struct inode          *oo_inode;
+        /**
+         * to protect index ops.
+         */
+        struct rw_semaphore    oo_ext_idx_sem;
         struct rw_semaphore    oo_sem;
         struct osd_directory  *oo_dir;
         /** protects inode attributes. */
@@ -402,6 +405,7 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
 
                 l->lo_ops = &osd_lu_obj_ops;
                 init_rwsem(&mo->oo_sem);
+                init_rwsem(&mo->oo_ext_idx_sem);
                 spin_lock_init(&mo->oo_guard);
                 return l;
         } else
@@ -519,17 +523,22 @@ static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
         struct osd_device      *osd = osd_obj2dev(obj);
         struct osd_thread_info *oti = osd_oti_get(env);
         struct txn_param       *prm = &oti->oti_txn;
+        struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
         struct thandle         *th;
         int result;
 
+        lu_env_init(env_del_obj, LCT_DT_THREAD);
         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
                             OSD_TXN_INODE_DELETE_CREDITS);
-        th = osd_trans_start(env, &osd->od_dt_dev, prm);
+        th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
         if (!IS_ERR(th)) {
-                result = osd_oi_delete(oti, &osd->od_oi, fid, th);
-                osd_trans_stop(env, th);
+                result = osd_oi_delete(osd_oti_get(env_del_obj),
+                                       &osd->od_oi, fid, th);
+                osd_trans_stop(env_del_obj, th);
         } else
                 result = PTR_ERR(th);
+
+        lu_env_fini(env_del_obj);
         return result;
 }
 
@@ -955,7 +964,7 @@ static const int osd_dto_credits_quota[DTO_NR] = {
         [DTO_INDEX_DELETE]  = 20,
         /**
          * Unused now.
-         */ 
+         */
         [DTO_IDNEX_UPDATE]  = 16,
         /*
          * Create a object. Same as create object in EXT3 filesystem.
@@ -971,7 +980,7 @@ static const int osd_dto_credits_quota[DTO_NR] = {
          * INDEX_EXTRA_BLOCKS(8) +
          * 3(inode bits, groups, GDT) +
          * QUOTA(?)
-         */ 
+         */
         [DTO_OBJECT_DELETE] = 27,
         /**
          * Attr set credits.
@@ -1287,14 +1296,14 @@ static int osd_inode_setattr(const struct lu_env *env,
                 LDISKFS_I(inode)->i_disksize = attr->la_size;
                 i_size_write(inode, attr->la_size);
         }
-# if 0
-        /*
-         * OSD should not change "i_blocks" which is used by quota.
+
+        /* OSD should not change "i_blocks" which is used by quota.
          * "i_blocks" should be changed by ldiskfs only.
-         * Disable this assignment until SOM to fix some EA field. */
+         * Enable this assignment for SOM purpose now, until it is
+         * stored in SOM EA. */
         if (bits & LA_BLOCKS)
                 inode->i_blocks = attr->la_blocks;
-#endif
+
         if (bits & LA_MODE)
                 inode->i_mode   = (inode->i_mode & S_IFMT) |
                         (attr->la_mode & ~S_IFMT);
@@ -1331,8 +1340,6 @@ static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
                            struct lu_attr *attr, struct thandle *th)
 {
-        LASSERT(obj->oo_inode != NULL);
-
         osd_object_init0(obj);
         return 0;
 }
@@ -1765,6 +1772,15 @@ static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry,
         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
                                    sizeof *mdt_attrs);
 
+        /* Check LMA compatibility */
+        if (rc > 0 &&
+            (mdt_attrs->lma_incompat & ~cpu_to_be32(LMA_INCOMPAT_SUPP))) {
+                CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
+                      inode->i_ino, be32_to_cpu(mdt_attrs->lma_incompat) &
+                      ~LMA_INCOMPAT_SUPP);
+                return -ENOSYS;
+        }
+
         if (rc > 0) {
                 fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid);
                 rc = 0;
@@ -2076,6 +2092,35 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
         RETURN(rc);
 }
 
+/*
+ * Get the 64-bit version for an inode.
+ */
+static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
+                                               struct dt_object *dt)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
+               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        return LDISKFS_I(inode)->i_fs_version;
+}
+
+/*
+ * Set the 64-bit version and return the old version.
+ */
+static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
+                                   dt_obj_version_t new_version)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+               new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        LDISKFS_I(inode)->i_fs_version = new_version;
+        /** Version is set after all inode operations are finished,
+         *  so we should mark it dirty here */
+        inode->i_sb->s_op->dirty_inode(inode);
+}
+
 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
                         void **data)
 {
@@ -2104,6 +2149,8 @@ static const struct dt_object_operations osd_obj_ops = {
         .do_xattr_list   = osd_xattr_list,
         .do_capa_get     = osd_capa_get,
         .do_object_sync  = osd_object_sync,
+        .do_version_get  = osd_object_version_get,
+        .do_version_set  = osd_object_version_set,
         .do_data_get     = osd_data_get,
 };
 
@@ -2129,6 +2176,8 @@ static const struct dt_object_operations osd_obj_ea_ops = {
         .do_xattr_list   = osd_xattr_list,
         .do_capa_get     = osd_capa_get,
         .do_object_sync  = osd_object_sync,
+        .do_version_get  = osd_object_version_get,
+        .do_version_set  = osd_object_version_set,
         .do_data_get     = osd_data_get,
 };
 
@@ -2302,7 +2351,6 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 
                 OBD_ALLOC_PTR(dir);
                 if (dir != NULL) {
-                        sema_init(&dir->od_sem, 1);
 
                         spin_lock(&obj->oo_guard);
                         if (obj->oo_dir == NULL)
@@ -2317,7 +2365,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                          * Now, that we have container data, serialize its
                          * initialization.
                          */
-                        down(&obj->oo_dir->od_sem);
+                        down_write(&obj->oo_ext_idx_sem);
                         /*
                          * recheck under lock.
                          */
@@ -2325,7 +2373,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                                 result = osd_iam_container_init(env, obj, dir);
                         else
                                 result = 0;
-                        up(&obj->oo_dir->od_sem);
+                        up_write(&obj->oo_ext_idx_sem);
                 } else
                         result = -ENOMEM;
         } else
@@ -2424,6 +2472,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
 
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
+
+        down_write(&obj->oo_ext_idx_sem);
         bh = ldiskfs_find_entry(dentry, &de);
         if (bh) {
                 struct osd_thread_info *oti = osd_oti_get(env);
@@ -2444,6 +2494,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         } else
                 rc = -ENOENT;
 
+        up_write(&obj->oo_ext_idx_sem);
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
@@ -2681,6 +2732,8 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
 
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
+
+        down_read(&obj->oo_ext_idx_sem);
         bh = ldiskfs_find_entry(dentry, &de);
         if (bh) {
                 ino = le32_to_cpu(de->inode);
@@ -2695,10 +2748,11 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                         rc = osd_ea_fid_get(env, dentry, rec);
                         iput(inode);
                 } else
-                        rc = -ENOENT;
+                        rc = PTR_ERR(inode);
         } else
                 rc = -ENOENT;
 
+        up_read(&obj->oo_ext_idx_sem);
         RETURN (rc);
 }
 
@@ -2811,8 +2865,9 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                 else
                         current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
 #endif
+                down_write(&obj->oo_ext_idx_sem);
                 rc = osd_ea_add_rec(env, obj, child, name, th);
-
+                up_write(&obj->oo_ext_idx_sem);
 #ifdef HAVE_QUOTA_SUPPORT
                 current->cap_effective = save;
 #endif
@@ -3031,16 +3086,18 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
         obj_dentry->d_name.hash = 0;
 
-        it->oie_namelen         = 0;
+        it->oie_rd_dirent       = 0;
+        it->oie_it_dirent       = 0;
         it->oie_curr_pos        = 0;
         it->oie_next_pos        = 0;
+        it->oie_dirent          = NULL;
+        it->oie_buf             = info->oti_it_ea_buf;
         it->oie_obj             = obj;
         it->oie_file.f_dentry   = obj_dentry;
         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
         it->oie_file.f_op         = obj->oo_inode->i_fop;
         it->oie_file.private_data = NULL;
         lu_object_get(lo);
-
         RETURN((struct dt_it*) it);
 }
 
@@ -3078,9 +3135,11 @@ static int osd_it_ea_get(const struct lu_env *env,
 
         ENTRY;
         LASSERT(((const char *)key)[0] == '\0');
-        it->oie_namelen         = 0;
         it->oie_curr_pos        = 0;
         it->oie_next_pos        = 0;
+        it->oie_rd_dirent       = 0;
+        it->oie_it_dirent       = 0;
+        it->oie_dirent          = NULL;
 
         RETURN(+1);
 }
@@ -3107,22 +3166,27 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
                                loff_t offset, ino_t ino,
                                unsigned int d_type)
 {
-        struct osd_it_ea   *it     = (struct osd_it_ea *)buf;
-        struct dirent64    *dirent = &it->oie_dirent64;
-
+        struct osd_it_ea        *it = (struct osd_it_ea *)buf;
+        struct osd_it_ea_dirent *ent = it->oie_dirent;
         ENTRY;
-        if (it->oie_namelen)
-                RETURN(-ENOENT);
 
-        if (namelen == 0 || namelen > LDISKFS_NAME_LEN)
+        /* this should never happen */
+        if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
+                CERROR("ldiskfs return invalid namelen %d\n", namelen);
                 RETURN(-EIO);
+        }
+
+        if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
+            OSD_IT_EA_BUFSIZE)
+                RETURN(1);
 
-        strncpy(dirent->d_name, name, LDISKFS_NAME_LEN);
-        dirent->d_name[namelen] = 0;
-        dirent->d_ino           = ino;
-        it->oie_namelen         = namelen;
-        it->oie_curr_pos        = offset;
+        ent->oied_ino     = ino;
+        ent->oied_off     = offset;
+        ent->oied_namelen = namelen;
+        memcpy(ent->oied_name, name, namelen);
 
+        it->oie_rd_dirent++;
+        it->oie_dirent = (void *) ent + size_round(sizeof(*ent) + namelen);
         RETURN(0);
 }
 
@@ -3135,7 +3199,7 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
  * \retval   0, on success
  * \retval -ve, on error
  */
-int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct dt_it *di)
 {
         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
         struct osd_object  *obj   = it->oie_obj;
@@ -3143,16 +3207,23 @@ int osd_ldiskfs_it_fill(const struct dt_it *di)
         int                result = 0;
 
         ENTRY;
-        it->oie_namelen    = 0;
+        it->oie_dirent = it->oie_buf;
+        it->oie_rd_dirent = 0;
         it->oie_file.f_pos = it->oie_curr_pos;
 
+        down_read(&obj->oo_ext_idx_sem);
         result = inode->i_fop->readdir(&it->oie_file, it,
                                        (filldir_t) osd_ldiskfs_filldir);
 
+        up_read(&obj->oo_ext_idx_sem);
         it->oie_next_pos = it->oie_file.f_pos;
 
-        if (it->oie_namelen == 0)
+        if (it->oie_rd_dirent == 0) {
                 result = -EIO;
+        } else {
+                it->oie_dirent = it->oie_buf;
+                it->oie_it_dirent = 1;
+        }
 
         RETURN(result);
 }
@@ -3174,12 +3245,21 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
         int rc;
 
         ENTRY;
-        it->oie_curr_pos = it->oie_next_pos;
 
-        if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
-                rc = +1;
-        else
-                rc = osd_ldiskfs_it_fill(di);
+        if (it->oie_it_dirent < it->oie_rd_dirent) {
+                it->oie_dirent = (void *) it->oie_dirent +
+                                 size_round(sizeof(struct osd_it_ea_dirent) +
+                                            it->oie_dirent->oied_namelen);
+                it->oie_it_dirent++;
+                RETURN(0);
+        } else {
+                it->oie_curr_pos = it->oie_next_pos;
+
+                if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
+                        rc = +1;
+                else
+                        rc = osd_ldiskfs_it_fill(di);
+        }
 
         RETURN(rc);
 }
@@ -3196,7 +3276,7 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env,
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
         ENTRY;
-        RETURN((struct dt_key *)it->oie_dirent64.d_name);
+        RETURN((struct dt_key *)it->oie_dirent->oied_name);
 }
 
 /**
@@ -3210,7 +3290,7 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
         ENTRY;
-        RETURN(it->oie_namelen);
+        RETURN(it->oie_dirent->oied_namelen);
 }
 
 /**
@@ -3237,14 +3317,13 @@ static struct dt_rec *osd_it_ea_rec(const struct lu_env *env,
 
         ENTRY;
         dev  = osd_dev(ldev);
-        id->oii_ino = it->oie_dirent64.d_ino;
+        id->oii_ino = it->oie_dirent->oied_ino;
         id->oii_gen = OSD_OII_NOGEN;
         inode = osd_iget(info, dev, id);
         if (!IS_ERR(inode)) {
                 dentry->d_inode = inode;
                 LASSERT(dentry->d_inode->i_sb == osd_sb(dev));
         } else {
-                CERROR("Error getting inode for ino =%d", id->oii_ino);
                 RETURN((struct dt_rec *) PTR_ERR(inode));
         }
 
@@ -3269,7 +3348,7 @@ static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
         ENTRY;
-        RETURN(it->oie_curr_pos);
+        RETURN(it->oie_dirent->oied_off);
 }
 
 /**
@@ -3365,15 +3444,29 @@ static void *osd_key_init(const struct lu_context *ctx,
         struct osd_thread_info *info;
 
         OBD_ALLOC_PTR(info);
-        if (info != NULL)
-                info->oti_env = container_of(ctx, struct lu_env, le_ctx);
-        else
+        if (info != NULL) {
+                OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+                if (info->oti_it_ea_buf != NULL) {
+                        info->oti_env = container_of(ctx, struct lu_env,
+                                                     le_ctx);
+                } else {
+                        OBD_FREE_PTR(info);
+                        info = ERR_PTR(-ENOMEM);
+                }
+        } else {
                 info = ERR_PTR(-ENOMEM);
+        }
         return info;
 }
 
-/* context key destructor: osd_key_fini */
-LU_KEY_FINI(osd, struct osd_thread_info);
+static void osd_key_fini(const struct lu_context *ctx,
+                         struct lu_context_key *key, void* data)
+{
+        struct osd_thread_info *info = data;
+
+        OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+        OBD_FREE_PTR(info);
+}
 
 static void osd_key_exit(const struct lu_context *ctx,
                          struct lu_context_key *key, void *data)
@@ -3609,17 +3702,22 @@ static struct inode *osd_iget(struct osd_thread_info *info,
         if (inode == NULL) {
                 CERROR("no inode\n");
                 inode = ERR_PTR(-EACCES);
-        } else if (is_bad_inode(inode)) {
-                CERROR("bad inode\n");
-                iput(inode);
-                inode = ERR_PTR(-ENOENT);
         } else if (id->oii_gen != OSD_OII_NOGEN &&
                    inode->i_generation != id->oii_gen) {
-                CERROR("stale inode\n");
                 iput(inode);
                 inode = ERR_PTR(-ESTALE);
+        } else if (inode->i_nlink == 0) {
+                /* due to parallel readdir and unlink,
+                * we can have dead inode here. */
+                CWARN("stale inode\n");
+                make_bad_inode(inode);
+                iput(inode);
+                inode = ERR_PTR(-ESTALE);
+        } else if (is_bad_inode(inode)) {
+                CERROR("bad inode %lx\n",inode->i_ino);
+                iput(inode);
+                inode = ERR_PTR(-ENOENT);
         }
-
         return inode;
 
 }