Whamcloud - gitweb
LU-50 ldiskfs: pdirops patch for ldiskfs
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index 9e4f75b..788eb31 100644 (file)
 /* llo_* api support */
 #include <md_object.h>
 
 /* llo_* api support */
 #include <md_object.h>
 
+#ifdef HAVE_LDISKFS_PDO
+int ldiskfs_pdo = 1;
+CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
+                "ldiskfs with parallel directory operations");
+#else
+int ldiskfs_pdo = 0;
+#endif
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char remote_obj_dir[] = "REM_OBJ_DIR";
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char remote_obj_dir[] = "REM_OBJ_DIR";
@@ -104,6 +112,7 @@ struct osd_object {
         /**
          * to protect index ops.
          */
         /**
          * to protect index ops.
          */
+        struct htree_lock_head *oo_hl_head;
         cfs_rw_semaphore_t     oo_ext_idx_sem;
         cfs_rw_semaphore_t     oo_sem;
         struct osd_directory  *oo_dir;
         cfs_rw_semaphore_t     oo_ext_idx_sem;
         cfs_rw_semaphore_t     oo_sem;
         struct osd_directory  *oo_dir;
@@ -315,8 +324,9 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
                 cfs_spin_lock_init(&mo->oo_guard);
                 return l;
                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
                 cfs_spin_lock_init(&mo->oo_guard);
                 return l;
-        } else
+        } else {
                 return NULL;
                 return NULL;
+        }
 }
 
 /*
 }
 
 /*
@@ -398,27 +408,43 @@ static int osd_fid_lookup(const struct lu_env *env,
                 RETURN(-ENOENT);
 
         result = osd_oi_lookup(info, oi, fid, id);
                 RETURN(-ENOENT);
 
         result = osd_oi_lookup(info, oi, fid, id);
-        if (result == 0) {
-                inode = osd_iget(info, dev, id);
-                if (!IS_ERR(inode)) {
-                        obj->oo_inode = inode;
-                        LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
-                        if (dev->od_iop_mode) {
-                                obj->oo_compat_dot_created = 1;
-                                obj->oo_compat_dotdot_created = 1;
-                        }
+        if (result != 0) {
+                if (result == -ENOENT)
                         result = 0;
                         result = 0;
-                } else
-                        /*
-                         * If fid wasn't found in oi, inode-less object is
-                         * created, for which lu_object_exists() returns
-                         * false. This is used in a (frequent) case when
-                         * objects are created as locking anchors or
-                         * place holders for objects yet to be created.
-                         */
-                        result = PTR_ERR(inode);
-        } else if (result == -ENOENT)
-                result = 0;
+                goto out;
+        }
+
+        inode = osd_iget(info, dev, id);
+        if (IS_ERR(inode)) {
+                /*
+                 * If fid wasn't found in oi, inode-less object is
+                 * created, for which lu_object_exists() returns
+                 * false. This is used in a (frequent) case when
+                 * objects are created as locking anchors or
+                 * place holders for objects yet to be created.
+                 */
+                result = PTR_ERR(inode);
+                goto out;
+        }
+
+        obj->oo_inode = inode;
+        LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
+        if (dev->od_iop_mode) {
+                obj->oo_compat_dot_created = 1;
+                obj->oo_compat_dotdot_created = 1;
+        }
+
+        if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
+                goto out;
+
+        LASSERT(obj->oo_hl_head == NULL);
+        obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+        if (obj->oo_hl_head == NULL) {
+                obj->oo_inode = NULL;
+                iput(inode);
+                result = -ENOMEM;
+        }
+out:
         LINVRNT(osd_invariant(obj));
 
         RETURN(result);
         LINVRNT(osd_invariant(obj));
 
         RETURN(result);
@@ -467,6 +493,8 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
         LINVRNT(osd_invariant(obj));
 
         dt_object_fini(&obj->oo_dt);
         LINVRNT(osd_invariant(obj));
 
         dt_object_fini(&obj->oo_dt);
+        if (obj->oo_hl_head != NULL)
+                ldiskfs_htree_lock_head_free(obj->oo_hl_head);
         OBD_FREE_PTR(obj);
 }
 
         OBD_FREE_PTR(obj);
 }
 
@@ -1536,6 +1564,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
+        LASSERT(obj->oo_hl_head == NULL);
+
+        if (S_ISDIR(mode) && ldiskfs_pdo) {
+                obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+                if (obj->oo_hl_head == NULL)
+                        return -ENOMEM;
+        }
 
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle->h_transaction != NULL);
 
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle->h_transaction != NULL);
@@ -1563,8 +1598,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
                 inode->i_flags |= S_NOCMTIME;
                 obj->oo_inode = inode;
                 result = 0;
                 inode->i_flags |= S_NOCMTIME;
                 obj->oo_inode = inode;
                 result = 0;
-        } else
+        } else {
+                if (obj->oo_hl_head != NULL) {
+                        ldiskfs_htree_lock_head_free(obj->oo_hl_head);
+                        obj->oo_hl_head = NULL;
+                }
                 result = PTR_ERR(inode);
                 result = PTR_ERR(inode);
+        }
         LINVRNT(osd_invariant(obj));
         return result;
 }
         LINVRNT(osd_invariant(obj));
         return result;
 }
@@ -2400,10 +2440,12 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                         else
                                 result = 0;
                         cfs_up_write(&obj->oo_ext_idx_sem);
                         else
                                 result = 0;
                         cfs_up_write(&obj->oo_ext_idx_sem);
-                } else
+                } else {
                         result = -ENOMEM;
                         result = -ENOMEM;
-        } else
+                }
+        } else {
                 result = 0;
                 result = 0;
+        }
 
         if (result == 0 && ea_dir == 0) {
                 if (!osd_iam_index_probe(env, obj, feat))
 
         if (result == 0 && ea_dir == 0) {
                 if (!osd_iam_index_probe(env, obj, feat))
@@ -2771,6 +2813,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         struct osd_thandle         *oh;
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
         struct osd_thandle         *oh;
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
+        struct htree_lock          *hlock = NULL;
 
         int rc;
 
 
         int rc;
 
@@ -2790,16 +2833,27 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
-        cfs_down_write(&obj->oo_ext_idx_sem);
-        bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   dir, LDISKFS_HLOCK_DEL);
+        } else {
+                cfs_down_write(&obj->oo_ext_idx_sem);
+        }
+
+        bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
                 rc = ldiskfs_delete_entry(oh->ot_handle,
         if (bh) {
                 rc = ldiskfs_delete_entry(oh->ot_handle,
-                                dir, de, bh);
+                                          dir, de, bh);
                 brelse(bh);
                 brelse(bh);
-        } else
+        } else {
                 rc = -ENOENT;
                 rc = -ENOENT;
+        }
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_write(&obj->oo_ext_idx_sem);
 
 
-        cfs_up_write(&obj->oo_ext_idx_sem);
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
@@ -2940,6 +2994,7 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
                             struct inode  *cinode,
                             const char *name,
                             const struct dt_rec *fid,
                             struct inode  *cinode,
                             const char *name,
                             const struct dt_rec *fid,
+                            struct htree_lock *hlock,
                             struct thandle *th)
 {
         struct ldiskfs_dentry_param *ldp;
                             struct thandle *th)
 {
         struct ldiskfs_dentry_param *ldp;
@@ -2960,7 +3015,7 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
                 child->d_fsdata = (void*) ldp;
         } else
                 child->d_fsdata = NULL;
                 child->d_fsdata = (void*) ldp;
         } else
                 child->d_fsdata = NULL;
-        rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+        rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -3018,11 +3073,11 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                 /* in case of rename, dotdot is already created */
                 if (dir->oo_compat_dotdot_created) {
                         return __osd_ea_add_rec(info, dir, parent_dir, name,
                 /* in case of rename, dotdot is already created */
                 if (dir->oo_compat_dotdot_created) {
                         return __osd_ea_add_rec(info, dir, parent_dir, name,
-                                                dot_dot_fid, th);
+                                                dot_dot_fid, NULL, th);
                 }
 
                 }
 
-                result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
-                                                dot_ldp, dot_dot_ldp);
+                result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
+                                                inode, dot_ldp, dot_dot_ldp);
                 if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
         }
                 if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
         }
@@ -3043,15 +3098,37 @@ static int osd_ea_add_rec(const struct lu_env *env,
                           struct thandle *th)
 {
         struct osd_thread_info    *info   = osd_oti_get(env);
                           struct thandle *th)
 {
         struct osd_thread_info    *info   = osd_oti_get(env);
+        struct htree_lock         *hlock;
         int rc;
 
         int rc;
 
+        hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
+
         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
-                                                   name[2] =='\0')))
+                                                   name[2] =='\0'))) {
+                if (hlock != NULL) {
+                        ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+                                           pobj->oo_inode, 0);
+                } else {
+                        cfs_down_write(&pobj->oo_ext_idx_sem);
+                }
                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
                                         fid, th);
                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
                                         fid, th);
+        } else {
+                if (hlock != NULL) {
+                        ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+                                           pobj->oo_inode, LDISKFS_HLOCK_ADD);
+                } else {
+                        cfs_down_write(&pobj->oo_ext_idx_sem);
+                }
+
+                rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
+                                      hlock, th);
+        }
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
         else
         else
-                rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
+                cfs_up_write(&pobj->oo_ext_idx_sem);
 
         return rc;
 }
 
         return rc;
 }
@@ -3072,6 +3149,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
         struct lu_fid              *fid = (struct lu_fid *) rec;
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
         struct lu_fid              *fid = (struct lu_fid *) rec;
+        struct htree_lock          *hlock = NULL;
         int ino;
         int rc;
 
         int ino;
         int rc;
 
@@ -3080,8 +3158,15 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
-        cfs_down_read(&obj->oo_ext_idx_sem);
-        bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   dir, LDISKFS_HLOCK_LOOKUP);
+        } else {
+                cfs_down_read(&obj->oo_ext_idx_sem);
+        }
+
+        bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
                 ino = le32_to_cpu(de->inode);
                 rc = osd_get_fid_from_dentry(de, rec);
         if (bh) {
                 ino = le32_to_cpu(de->inode);
                 rc = osd_get_fid_from_dentry(de, rec);
@@ -3090,10 +3175,14 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                 brelse(bh);
                 if (rc != 0)
                         rc = osd_ea_fid_get(env, obj, ino, fid);
                 brelse(bh);
                 if (rc != 0)
                         rc = osd_ea_fid_get(env, obj, ino, fid);
-        } else
+        } else {
                 rc = -ENOENT;
                 rc = -ENOENT;
+        }
 
 
-        cfs_up_read(&obj->oo_ext_idx_sem);
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_read(&obj->oo_ext_idx_sem);
         RETURN (rc);
 }
 
         RETURN (rc);
 }
 
@@ -3195,9 +3284,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                 else
                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
                 else
                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
-                cfs_down_write(&obj->oo_ext_idx_sem);
                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
-                cfs_up_write(&obj->oo_ext_idx_sem);
 #ifdef HAVE_QUOTA_SUPPORT
                 cfs_curproc_cap_unpack(save);
 #endif
 #ifdef HAVE_QUOTA_SUPPORT
                 cfs_curproc_cap_unpack(save);
 #endif
@@ -3616,22 +3703,34 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
  * \retval   0 on success
  * \retval -ve on error
  */
  * \retval   0 on success
  * \retval -ve on error
  */
-static int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct lu_env *env,
+                               const struct dt_it *di)
 {
         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
         struct osd_object  *obj   = it->oie_obj;
         struct inode       *inode = obj->oo_inode;
 {
         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
         struct osd_object  *obj   = it->oie_obj;
         struct inode       *inode = obj->oo_inode;
-        int                result = 0;
+        struct htree_lock  *hlock = NULL;
+        int                 result = 0;
 
         ENTRY;
         it->oie_dirent = it->oie_buf;
         it->oie_rd_dirent = 0;
 
 
         ENTRY;
         it->oie_dirent = it->oie_buf;
         it->oie_rd_dirent = 0;
 
-        cfs_down_read(&obj->oo_ext_idx_sem);
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   inode, LDISKFS_HLOCK_READDIR);
+        } else {
+                cfs_down_read(&obj->oo_ext_idx_sem);
+        }
+
         result = inode->i_fop->readdir(&it->oie_file, it,
                                        (filldir_t) osd_ldiskfs_filldir);
 
         result = inode->i_fop->readdir(&it->oie_file, it,
                                        (filldir_t) osd_ldiskfs_filldir);
 
-        cfs_up_read(&obj->oo_ext_idx_sem);
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_read(&obj->oo_ext_idx_sem);
 
         if (it->oie_rd_dirent == 0) {
                 result = -EIO;
 
         if (it->oie_rd_dirent == 0) {
                 result = -EIO;
@@ -3672,7 +3771,7 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
                         rc = +1;
                 else
                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
                         rc = +1;
                 else
-                        rc = osd_ldiskfs_it_fill(di);
+                        rc = osd_ldiskfs_it_fill(env, di);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
@@ -3777,7 +3876,7 @@ static int osd_it_ea_load(const struct lu_env *env,
         ENTRY;
         it->oie_file.f_pos = hash;
 
         ENTRY;
         it->oie_file.f_pos = hash;
 
-        rc =  osd_ldiskfs_it_fill(di);
+        rc =  osd_ldiskfs_it_fill(env, di);
         if (rc == 0)
                 rc = +1;
 
         if (rc == 0)
                 rc = +1;
 
@@ -3842,19 +3941,26 @@ static void *osd_key_init(const struct lu_context *ctx,
         struct osd_thread_info *info;
 
         OBD_ALLOC_PTR(info);
         struct osd_thread_info *info;
 
         OBD_ALLOC_PTR(info);
-        if (info != NULL) {
-                OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
-                if (info->oti_it_ea_buf != NULL) {
-                        info->oti_env = container_of(ctx, struct lu_env,
-                                                     le_ctx);
-                } else {
-                        OBD_FREE_PTR(info);
-                        info = ERR_PTR(-ENOMEM);
-                }
-        } else {
-                info = ERR_PTR(-ENOMEM);
-        }
+        if (info == NULL)
+                return ERR_PTR(-ENOMEM);
+
+        OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+        if (info->oti_it_ea_buf == NULL)
+                goto out_free_info;
+
+        info->oti_env = container_of(ctx, struct lu_env, le_ctx);
+
+        info->oti_hlock = ldiskfs_htree_lock_alloc();
+        if (info->oti_hlock == NULL)
+                goto out_free_ea;
+
         return info;
         return info;
+
+ out_free_ea:
+        OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ out_free_info:
+        OBD_FREE_PTR(info);
+        return ERR_PTR(-ENOMEM);
 }
 
 static void osd_key_fini(const struct lu_context *ctx,
 }
 
 static void osd_key_fini(const struct lu_context *ctx,
@@ -3862,6 +3968,8 @@ static void osd_key_fini(const struct lu_context *ctx,
 {
         struct osd_thread_info *info = data;
 
 {
         struct osd_thread_info *info = data;
 
+        if (info->oti_hlock != NULL)
+                ldiskfs_htree_lock_free(info->oti_hlock);
         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
         OBD_FREE_PTR(info);
 }
         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
         OBD_FREE_PTR(info);
 }