Whamcloud - gitweb
LU-1187 ofd: Allocate ofd group dynamically.
authorwangdi <di.wang@whamcloud.com>
Tue, 24 Sep 2013 08:06:42 +0000 (01:06 -0700)
committerOleg Drokin <green@whamcloud.com>
Thu, 10 Jan 2013 13:20:51 +0000 (08:20 -0500)
Allocate the OFD sequence maps dynamically, so that the OST can
handle an arbitrary number of MDTs or WBC clients, and potentially
multiple streams of object allocations from each MDT.

Signed-off-by: Wang Di <di.wang@whamcloud.com>
Change-Id: Id5f8e5bf4f3dcbe3ba228144c7542bdd3cfef20c
Reviewed-on: http://review.whamcloud.com/4322
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Hudson
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
lustre/ofd/lproc_ofd.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_fs.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_io.c
lustre/ofd/ofd_obd.c
lustre/ofd/ofd_objects.c

index 79911e3..5a71de9 100644 (file)
 
 #ifdef LPROCFS
 
 
 #ifdef LPROCFS
 
-static int lprocfs_ofd_rd_groups(char *page, char **start, off_t off,
-                                int count, int *eof, void *data)
+static int lprocfs_ofd_rd_seqs(char *page, char **start, off_t off,
+                               int count, int *eof, void *data)
 {
        struct obd_device *obd = (struct obd_device *)data;
        struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
 
        *eof = 1;
 {
        struct obd_device *obd = (struct obd_device *)data;
        struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
 
        *eof = 1;
-       return snprintf(page, count, "%u\n", ofd->ofd_max_group);
+       return snprintf(page, count, "%u\n", ofd->ofd_seq_count);
 }
 
 static int lprocfs_ofd_rd_tot_dirty(char *page, char **start, off_t off,
 }
 
 static int lprocfs_ofd_rd_tot_dirty(char *page, char **start, off_t off,
@@ -166,9 +166,9 @@ static int lprocfs_ofd_wr_precreate_batch(struct file *file, const char *buffer,
        if (val < 1)
                return -EINVAL;
 
        if (val < 1)
                return -EINVAL;
 
-       spin_lock(&ofd->ofd_objid_lock);
+       spin_lock(&ofd->ofd_batch_lock);
        ofd->ofd_precreate_batch = val;
        ofd->ofd_precreate_batch = val;
-       spin_unlock(&ofd->ofd_objid_lock);
+       spin_unlock(&ofd->ofd_batch_lock);
        return count;
 }
 
        return count;
 }
 
@@ -177,13 +177,16 @@ static int lprocfs_ofd_rd_last_id(char *page, char **start, off_t off,
 {
        struct obd_device       *obd = data;
        struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
 {
        struct obd_device       *obd = data;
        struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
-       int                      retval = 0, rc, i;
+       struct ofd_seq          *oseq = NULL;
+       int                     retval = 0, rc;
 
        if (obd == NULL)
                return 0;
 
 
        if (obd == NULL)
                return 0;
 
-       for (i = FID_SEQ_OST_MDT0; i <= ofd->ofd_max_group; i++) {
-               rc = snprintf(page, count, LPU64"\n", ofd_last_id(ofd, i));
+       read_lock(&ofd->ofd_seq_list_lock);
+       cfs_list_for_each_entry(oseq, &ofd->ofd_seq_list, os_list) {
+               rc = snprintf(page, count, LPX64": "LPX64"\n",
+                             oseq->os_seq, ofd_seq_last_oid(oseq));
                if (rc < 0) {
                        retval = rc;
                        break;
                if (rc < 0) {
                        retval = rc;
                        break;
@@ -192,6 +195,7 @@ static int lprocfs_ofd_rd_last_id(char *page, char **start, off_t off,
                count -= rc;
                retval += rc;
        }
                count -= rc;
                retval += rc;
        }
+       read_unlock(&ofd->ofd_seq_list_lock);
        return retval;
 }
 
        return retval;
 }
 
@@ -460,7 +464,7 @@ static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "kbytesavail",         lprocfs_rd_kbytesavail, 0, 0 },
        { "filestotal",          lprocfs_rd_filestotal, 0, 0 },
        { "filesfree",           lprocfs_rd_filesfree, 0, 0 },
        { "kbytesavail",         lprocfs_rd_kbytesavail, 0, 0 },
        { "filestotal",          lprocfs_rd_filestotal, 0, 0 },
        { "filesfree",           lprocfs_rd_filesfree, 0, 0 },
-       { "filegroups",          lprocfs_ofd_rd_groups, 0, 0 },
+       { "seqs_allocated",      lprocfs_ofd_rd_seqs, 0, 0 },
        { "fstype",              lprocfs_ofd_rd_fstype, 0, 0 },
        { "last_id",             lprocfs_ofd_rd_last_id, 0, 0 },
        { "tot_dirty",           lprocfs_ofd_rd_tot_dirty,   0, 0 },
        { "fstype",              lprocfs_ofd_rd_fstype, 0, 0 },
        { "last_id",             lprocfs_ofd_rd_last_id, 0, 0 },
        { "tot_dirty",           lprocfs_ofd_rd_tot_dirty,   0, 0 },
index 7dce425..e8c809f 100644 (file)
@@ -523,8 +523,9 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        m->ofd_tot_dirty = 0;
        m->ofd_tot_granted = 0;
        m->ofd_tot_pending = 0;
        m->ofd_tot_dirty = 0;
        m->ofd_tot_granted = 0;
        m->ofd_tot_pending = 0;
-       m->ofd_max_group = 0;
+       m->ofd_seq_count = 0;
 
 
+       spin_lock_init(&m->ofd_batch_lock);
        rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
        sptlrpc_rule_set_init(&obd->u.filter.fo_sptlrpc_rset);
 
        rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
        sptlrpc_rule_set_init(&obd->u.filter.fo_sptlrpc_rset);
 
index 39a2dc3..c022af7 100644 (file)
@@ -72,37 +72,101 @@ int ofd_precreate_batch(struct ofd_device *ofd, int batch)
 {
        int count;
 
 {
        int count;
 
-       spin_lock(&ofd->ofd_objid_lock);
+       spin_lock(&ofd->ofd_batch_lock);
        count = min(ofd->ofd_precreate_batch, batch);
        count = min(ofd->ofd_precreate_batch, batch);
-       spin_unlock(&ofd->ofd_objid_lock);
+       spin_unlock(&ofd->ofd_batch_lock);
 
        return count;
 }
 
 
        return count;
 }
 
-obd_id ofd_last_id(struct ofd_device *ofd, obd_seq group)
+struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, obd_seq seq)
 {
 {
-       obd_id id;
+       struct ofd_seq *oseq;
+
+       read_lock(&ofd->ofd_seq_list_lock);
+       cfs_list_for_each_entry(oseq, &ofd->ofd_seq_list, os_list) {
+               if (oseq->os_seq == seq) {
+                       cfs_atomic_inc(&oseq->os_refc);
+                       read_unlock(&ofd->ofd_seq_list_lock);
+                       return oseq;
+               }
+       }
+       read_unlock(&ofd->ofd_seq_list_lock);
+       return NULL;
+}
+
+static void ofd_seq_destroy(const struct lu_env *env,
+                           struct ofd_seq *oseq)
+{
+       LASSERT(cfs_list_empty(&oseq->os_list));
+       LASSERT(oseq->os_lastid_obj != NULL);
+       lu_object_put(env, &oseq->os_lastid_obj->do_lu);
+       OBD_FREE_PTR(oseq);
+}
+
+void ofd_seq_put(const struct lu_env *env, struct ofd_seq *oseq)
+{
+       if (cfs_atomic_dec_and_test(&oseq->os_refc))
+               ofd_seq_destroy(env, oseq);
+}
 
 
-       LASSERT(group <= ofd->ofd_max_group);
+static void ofd_seq_delete(const struct lu_env *env, struct ofd_seq *oseq)
+{
+       cfs_list_del_init(&oseq->os_list);
+       ofd_seq_put(env, oseq);
+}
 
 
-       spin_lock(&ofd->ofd_objid_lock);
-       id = ofd->ofd_last_objids[group];
-       spin_unlock(&ofd->ofd_objid_lock);
+/**
+ * Add a new sequence to the OFD device.
+ *
+ * \param ofd OFD device
+ * \param new_seq new sequence to be added
+ *
+ * \retval the seq to be added or the existing seq
+ **/
+static struct ofd_seq *ofd_seq_add(const struct lu_env *env,
+                                  struct ofd_device *ofd,
+                                  struct ofd_seq *new_seq)
+{
+       struct ofd_seq *os = NULL;
+
+       write_lock(&ofd->ofd_seq_list_lock);
+       cfs_list_for_each_entry(os, &ofd->ofd_seq_list, os_list) {
+               if (os->os_seq == new_seq->os_seq) {
+                       cfs_atomic_inc(&os->os_refc);
+                       write_unlock(&ofd->ofd_seq_list_lock);
+                       /* The seq has not been added to the list */
+                       ofd_seq_put(env, new_seq);
+                       return os;
+               }
+       }
+       cfs_atomic_inc(&new_seq->os_refc);
+       cfs_list_add_tail(&new_seq->os_list, &ofd->ofd_seq_list);
+       write_unlock(&ofd->ofd_seq_list_lock);
+       return new_seq;
+}
+
+obd_id ofd_seq_last_oid(struct ofd_seq *oseq)
+{
+       obd_id id;
+
+       spin_lock(&oseq->os_last_oid_lock);
+       id = oseq->os_last_oid;
+       spin_unlock(&oseq->os_last_oid_lock);
 
        return id;
 }
 
 
        return id;
 }
 
-void ofd_last_id_set(struct ofd_device *ofd, obd_id id, obd_seq group)
+void ofd_seq_last_oid_set(struct ofd_seq *oseq, obd_id id)
 {
 {
-       LASSERT(group <= ofd->ofd_max_group);
-       spin_lock(&ofd->ofd_objid_lock);
-       if (ofd->ofd_last_objids[group] < id)
-               ofd->ofd_last_objids[group] = id;
-       spin_unlock(&ofd->ofd_objid_lock);
+       spin_lock(&oseq->os_last_oid_lock);
+       if (likely(oseq->os_last_oid < id))
+               oseq->os_last_oid = id;
+       spin_unlock(&oseq->os_last_oid_lock);
 }
 
 }
 
-int ofd_last_id_write(const struct lu_env *env, struct ofd_device *ofd,
-                     obd_seq group)
+int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
+                          struct ofd_seq *oseq)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        obd_id                   tmp;
 {
        struct ofd_thread_info  *info = ofd_info(env);
        obd_id                   tmp;
@@ -114,17 +178,17 @@ int ofd_last_id_write(const struct lu_env *env, struct ofd_device *ofd,
        info->fti_buf.lb_len = sizeof(tmp);
        info->fti_off = 0;
 
        info->fti_buf.lb_len = sizeof(tmp);
        info->fti_off = 0;
 
-       CDEBUG(D_INODE, "%s: write last_objid for group "LPU64": "LPU64"\n",
-              ofd_obd(ofd)->obd_name, group, ofd_last_id(ofd, group));
+       CDEBUG(D_INODE, "%s: write last_objid for seq "LPX64" : "LPX64"\n",
+              ofd_name(ofd), oseq->os_seq, ofd_seq_last_oid(oseq));
 
 
-       tmp = cpu_to_le64(ofd_last_id(ofd, group));
+       tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
 
 
-       rc = ofd_record_write(env, ofd, ofd->ofd_lastid_obj[group],
-                             &info->fti_buf, &info->fti_off);
+       rc = ofd_record_write(env, ofd, oseq->os_lastid_obj, &info->fti_buf,
+                             &info->fti_off);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-int ofd_last_group_write(const struct lu_env *env, struct ofd_device *ofd)
+static int ofd_seq_count_write(const struct lu_env *env, struct ofd_device *ofd)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        obd_seq                  tmp;
 {
        struct ofd_thread_info  *info = ofd_info(env);
        obd_seq                  tmp;
@@ -136,50 +200,80 @@ int ofd_last_group_write(const struct lu_env *env, struct ofd_device *ofd)
        info->fti_buf.lb_len = sizeof(tmp);
        info->fti_off = 0;
 
        info->fti_buf.lb_len = sizeof(tmp);
        info->fti_off = 0;
 
-       tmp = cpu_to_le32(ofd->ofd_max_group);
+       tmp = cpu_to_le32(ofd->ofd_seq_count);
 
 
-       rc = ofd_record_write(env, ofd, ofd->ofd_last_group_file,
+       rc = ofd_record_write(env, ofd, ofd->ofd_seq_count_file,
                              &info->fti_buf, &info->fti_off);
 
        RETURN(rc);
 }
 
                              &info->fti_buf, &info->fti_off);
 
        RETURN(rc);
 }
 
-void ofd_group_fini(const struct lu_env *env, struct ofd_device *ofd,
-                   int group)
+void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
 {
 {
-       LASSERT(ofd->ofd_lastid_obj[group]);
-       lu_object_put(env, &ofd->ofd_lastid_obj[group]->do_lu);
-       ofd->ofd_lastid_obj[group] = NULL;
+       struct ofd_seq  *oseq;
+       struct ofd_seq  *tmp;
+       cfs_list_t       dispose;
+
+       CFS_INIT_LIST_HEAD(&dispose);
+       write_lock(&ofd->ofd_seq_list_lock);
+       cfs_list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list) {
+               cfs_list_move(&oseq->os_list, &dispose);
+       }
+       write_unlock(&ofd->ofd_seq_list_lock);
+
+       while (!cfs_list_empty(&dispose)) {
+               oseq = container_of0(dispose.next, struct ofd_seq, os_list);
+               ofd_seq_delete(env, oseq);
+       }
+
+       LASSERT(cfs_list_empty(&ofd->ofd_seq_list));
+       return;
 }
 
 }
 
-int ofd_group_load(const struct lu_env *env, struct ofd_device *ofd, int group)
+struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
+                            obd_seq seq)
 {
        struct ofd_thread_info  *info = ofd_info(env);
 {
        struct ofd_thread_info  *info = ofd_info(env);
+       struct ofd_seq          *oseq = NULL;
        struct dt_object        *dob;
        obd_id                   lastid;
        int                      rc;
 
        ENTRY;
 
        struct dt_object        *dob;
        obd_id                   lastid;
        int                      rc;
 
        ENTRY;
 
-       /* if group is already initialized */
-       if (ofd->ofd_lastid_obj[group])
-               RETURN(0);
+       /* if seq is already initialized */
+       oseq = ofd_seq_get(ofd, seq);
+       if (oseq != NULL)
+               RETURN(oseq);
+
+       OBD_ALLOC_PTR(oseq);
+       if (oseq == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       lu_local_obj_fid(&info->fti_fid, OFD_GROUP0_LAST_OID + seq);
 
 
-       lu_local_obj_fid(&info->fti_fid, OFD_GROUP0_LAST_OID + group);
        memset(&info->fti_attr, 0, sizeof(info->fti_attr));
        info->fti_attr.la_valid = LA_MODE;
        info->fti_attr.la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
        info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
 
        memset(&info->fti_attr, 0, sizeof(info->fti_attr));
        info->fti_attr.la_valid = LA_MODE;
        info->fti_attr.la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
        info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
 
-       /* create object tracking per-group last created
+       /* create object tracking per-seq last created
         * id to be used by orphan recovery mechanism */
        dob = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
                                &info->fti_dof, &info->fti_attr);
         * id to be used by orphan recovery mechanism */
        dob = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
                                &info->fti_dof, &info->fti_attr);
-       if (IS_ERR(dob))
-               RETURN(PTR_ERR(dob));
+       if (IS_ERR(dob)) {
+               OBD_FREE_PTR(oseq);
+               RETURN((void *)dob);
+       }
 
 
-       ofd->ofd_lastid_obj[group] = dob;
-       mutex_init(&ofd->ofd_create_locks[group]);
+       oseq->os_lastid_obj = dob;
+
+       CFS_INIT_LIST_HEAD(&oseq->os_list);
+       mutex_init(&oseq->os_create_lock);
+       spin_lock_init(&oseq->os_last_oid_lock);
+       oseq->os_seq = seq;
+
+       cfs_atomic_set(&oseq->os_refc, 1);
 
        rc = dt_attr_get(env, dob, &info->fti_attr, BYPASS_CAPA);
        if (rc)
 
        rc = dt_attr_get(env, dob, &info->fti_attr, BYPASS_CAPA);
        if (rc)
@@ -187,10 +281,9 @@ int ofd_group_load(const struct lu_env *env, struct ofd_device *ofd, int group)
 
        if (info->fti_attr.la_size == 0) {
                /* object is just created, initialize last id */
 
        if (info->fti_attr.la_size == 0) {
                /* object is just created, initialize last id */
-               ofd->ofd_last_objids[group] = OFD_INIT_OBJID;
-               ofd_last_id_set(ofd, OFD_INIT_OBJID, group);
-               ofd_last_id_write(env, ofd, group);
-               ofd_last_group_write(env, ofd);
+               oseq->os_last_oid = OFD_INIT_OBJID;
+               ofd_seq_last_oid_write(env, ofd, oseq);
+               ofd_seq_count_write(env, ofd);
        } else if (info->fti_attr.la_size == sizeof(lastid)) {
                info->fti_off = 0;
                info->fti_buf.lb_buf = &lastid;
        } else if (info->fti_attr.la_size == sizeof(lastid)) {
                info->fti_off = 0;
                info->fti_buf.lb_buf = &lastid;
@@ -198,76 +291,84 @@ int ofd_group_load(const struct lu_env *env, struct ofd_device *ofd, int group)
 
                rc = dt_record_read(env, dob, &info->fti_buf, &info->fti_off);
                if (rc) {
 
                rc = dt_record_read(env, dob, &info->fti_buf, &info->fti_off);
                if (rc) {
-                       CERROR("can't read last_id: %d\n", rc);
+                       CERROR("%s: can't read last_id: rc = %d\n",
+                               ofd_name(ofd), rc);
                        GOTO(cleanup, rc);
                }
                        GOTO(cleanup, rc);
                }
-               ofd->ofd_last_objids[group] = le64_to_cpu(lastid);
+               oseq->os_last_oid = le64_to_cpu(lastid);
        } else {
        } else {
-               CERROR("corrupted size %Lu LAST_ID of group %u\n",
-                      (unsigned long long)info->fti_attr.la_size, group);
-               rc = -EINVAL;
+               CERROR("%s: corrupted size "LPU64" LAST_ID of seq "LPX64"\n",
+                       ofd_name(ofd), (__u64)info->fti_attr.la_size, seq);
+               GOTO(cleanup, rc = -EINVAL);
        }
 
        }
 
-       RETURN(0);
+       oseq = ofd_seq_add(env, ofd, oseq);
+       RETURN(oseq);
 cleanup:
 cleanup:
-       ofd_group_fini(env, ofd, group);
-       RETURN(rc);
+       ofd_seq_put(env, oseq);
+       return ERR_PTR(rc);
 }
 
 }
 
-/* ofd groups managements */
-int ofd_groups_init(const struct lu_env *env, struct ofd_device *ofd)
+/* object sequence management */
+int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
 {
        struct ofd_thread_info  *info = ofd_info(env);
 {
        struct ofd_thread_info  *info = ofd_info(env);
-       unsigned long            groups_size;
-       obd_seq                  last_group;
-       int                      rc = 0;
-       int                      i;
+       unsigned long           seq_count_size;
+       obd_seq                 seq_count;
+       int                     rc = 0;
+       int                     i;
 
        ENTRY;
 
 
        ENTRY;
 
-       spin_lock_init(&ofd->ofd_objid_lock);
+       rwlock_init(&ofd->ofd_seq_list_lock);
+       CFS_INIT_LIST_HEAD(&ofd->ofd_seq_list);
 
 
-       rc = dt_attr_get(env, ofd->ofd_last_group_file,
+       rc = dt_attr_get(env, ofd->ofd_seq_count_file,
                         &info->fti_attr, BYPASS_CAPA);
        if (rc)
                GOTO(cleanup, rc);
 
                         &info->fti_attr, BYPASS_CAPA);
        if (rc)
                GOTO(cleanup, rc);
 
-       groups_size = (unsigned long)info->fti_attr.la_size;
+       seq_count_size = (unsigned long)info->fti_attr.la_size;
 
 
-       if (groups_size == sizeof(last_group)) {
+       if (seq_count_size == sizeof(seq_count)) {
                info->fti_off = 0;
                info->fti_off = 0;
-               info->fti_buf.lb_buf = &last_group;
-               info->fti_buf.lb_len = sizeof(last_group);
+               info->fti_buf.lb_buf = &seq_count;
+               info->fti_buf.lb_len = sizeof(seq_count);
 
 
-               rc = dt_record_read(env, ofd->ofd_last_group_file,
+               rc = dt_record_read(env, ofd->ofd_seq_count_file,
                                    &info->fti_buf, &info->fti_off);
                if (rc) {
                                    &info->fti_buf, &info->fti_off);
                if (rc) {
-                       CERROR("can't read LAST_GROUP: %d\n", rc);
+                       CERROR("%s: can't read LAST_GROUP: rc = %d\n",
+                              ofd_name(ofd), rc);
                        GOTO(cleanup, rc);
                }
 
                        GOTO(cleanup, rc);
                }
 
-               ofd->ofd_max_group = le32_to_cpu(last_group);
-               LASSERT(ofd->ofd_max_group <= OFD_MAX_GROUPS);
-       } else if (groups_size == 0) {
-               ofd->ofd_max_group = 0;
+               ofd->ofd_seq_count = le64_to_cpu(seq_count);
+       } else if (seq_count_size == 0) {
+               ofd->ofd_seq_count = 0;
        } else {
        } else {
-               CERROR("groups file is corrupted? size = %lu\n", groups_size);
+               CERROR("%s: seqs file is corrupted? size = %lu\n",
+                      ofd_name(ofd), seq_count_size);
                GOTO(cleanup, rc = -EIO);
        }
 
                GOTO(cleanup, rc = -EIO);
        }
 
-       for (i = 0; i <= ofd->ofd_max_group; i++) {
-               rc = ofd_group_load(env, ofd, i);
-               if (rc) {
-                       CERROR("can't load group %d: %d\n", i, rc);
-                       /* Clean all previously set groups */
-                       while (i > 0)
-                               ofd_group_fini(env, ofd, --i);
+       for (i = 0; i <= ofd->ofd_seq_count; i++) {
+               struct ofd_seq *oseq;
+
+               oseq = ofd_seq_load(env, ofd, i);
+               if (IS_ERR(oseq)) {
+                       CERROR("%s: can't load seq %d: rc = %d\n",
+                              ofd_name(ofd), i, rc);
+                       /* Clean all previously set seqs */
+                       ofd_seqs_fini(env, ofd);
                        GOTO(cleanup, rc);
                        GOTO(cleanup, rc);
+               } else {
+                       ofd_seq_put(env, oseq);
                }
        }
 
                }
        }
 
-       CDEBUG(D_OTHER, "%s: %u groups initialized\n",
-             ofd_obd(ofd)->obd_name, ofd->ofd_max_group + 1);
+       CDEBUG(D_OTHER, "%s: %u seqs initialized\n", ofd_name(ofd),
+              ofd->ofd_seq_count + 1);
 cleanup:
        RETURN(rc);
 }
 cleanup:
        RETURN(rc);
 }
@@ -300,8 +401,9 @@ int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd,
                off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
                rc = tgt_client_data_read(env, &ofd->ofd_lut, lcd, &off, cl_idx);
                if (rc) {
                off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
                rc = tgt_client_data_read(env, &ofd->ofd_lut, lcd, &off, cl_idx);
                if (rc) {
-                       CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
-                              LAST_RCVD, cl_idx, off, rc);
+                       CERROR("%s: error reading FILT %s idx %d off %llu: "
+                              "rc = %d\n", ofd_name(ofd), LAST_RCVD, cl_idx,
+                              off, rc);
                        rc = 0;
                        break; /* read error shouldn't cause startup to fail */
                }
                        rc = 0;
                        break; /* read error shouldn't cause startup to fail */
                }
@@ -326,7 +428,8 @@ int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd,
                if (IS_ERR(exp)) {
                        if (PTR_ERR(exp) == -EALREADY) {
                                /* export already exists, zero out this one */
                if (IS_ERR(exp)) {
                        if (PTR_ERR(exp) == -EALREADY) {
                                /* export already exists, zero out this one */
-                               CERROR("Duplicate export %s!\n", lcd->lcd_uuid);
+                               CERROR("%s: Duplicate export %s!\n",
+                                      ofd_name(ofd), lcd->lcd_uuid);
                                continue;
                        }
                        GOTO(err_out, rc = PTR_ERR(exp));
                                continue;
                        }
                        GOTO(err_out, rc = PTR_ERR(exp));
@@ -517,15 +620,15 @@ int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
        if (IS_ERR(fo))
                GOTO(out_hc, rc = PTR_ERR(fo));
 
        if (IS_ERR(fo))
                GOTO(out_hc, rc = PTR_ERR(fo));
 
-       ofd->ofd_last_group_file = fo;
+       ofd->ofd_seq_count_file = fo;
 
 
-       rc = ofd_groups_init(env, ofd);
+       rc = ofd_seqs_init(env, ofd);
        if (rc)
                GOTO(out_lg, rc);
 
        RETURN(0);
 out_lg:
        if (rc)
                GOTO(out_lg, rc);
 
        RETURN(0);
 out_lg:
-       lu_object_put(env, &ofd->ofd_last_group_file->do_lu);
+       lu_object_put(env, &ofd->ofd_seq_count_file->do_lu);
 out_hc:
        lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
 out:
 out_hc:
        lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
 out:
@@ -541,12 +644,7 @@ void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
 
        ofd_info_init(env, NULL);
 
 
        ofd_info_init(env, NULL);
 
-       for (i = 0; i <= ofd->ofd_max_group; i++) {
-               if (ofd->ofd_lastid_obj[i]) {
-                       ofd_last_id_write(env, ofd, i);
-                       ofd_group_fini(env, ofd, i);
-               }
-       }
+       ofd_seqs_fini(env, ofd);
 
        i = dt_sync(env, ofd->ofd_osd);
        if (i)
 
        i = dt_sync(env, ofd->ofd_osd);
        if (i)
@@ -555,9 +653,9 @@ void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
        /* Remove transaction callback */
        dt_txn_callback_del(ofd->ofd_osd, &ofd->ofd_txn_cb);
 
        /* Remove transaction callback */
        dt_txn_callback_del(ofd->ofd_osd, &ofd->ofd_txn_cb);
 
-       if (ofd->ofd_last_group_file) {
-               lu_object_put(env, &ofd->ofd_last_group_file->do_lu);
-               ofd->ofd_last_group_file = NULL;
+       if (ofd->ofd_seq_count_file) {
+               lu_object_put(env, &ofd->ofd_seq_count_file->do_lu);
+               ofd->ofd_seq_count_file = NULL;
        }
 
        if (ofd->ofd_health_check_file) {
        }
 
        if (ofd->ofd_health_check_file) {
index 5b8cd32..5617008 100644 (file)
@@ -48,7 +48,6 @@
 #define OFD_ROCOMPAT_SUPP (0)
 #define OFD_INCOMPAT_SUPP (OBD_INCOMPAT_GROUPS | OBD_INCOMPAT_OST | \
                           OBD_INCOMPAT_COMMON_LR)
 #define OFD_ROCOMPAT_SUPP (0)
 #define OFD_INCOMPAT_SUPP (OBD_INCOMPAT_GROUPS | OBD_INCOMPAT_OST | \
                           OBD_INCOMPAT_COMMON_LR)
-#define OFD_MAX_GROUPS 256
 #define OFD_PRECREATE_BATCH_DEFAULT (FILTER_SUBDIR_COUNT * 4)
 
 /* on small filesystems we should not precreate too many objects in
 #define OFD_PRECREATE_BATCH_DEFAULT (FILTER_SUBDIR_COUNT * 4)
 
 /* on small filesystems we should not precreate too many objects in
@@ -106,6 +105,17 @@ static inline void ofd_counter_incr(struct obd_export *exp, int opcode,
        }
 }
 
        }
 }
 
+struct ofd_seq {
+       cfs_list_t              os_list;
+       obd_id                  os_last_oid;
+       obd_seq                 os_seq;
+       spinlock_t              os_last_oid_lock;
+       struct mutex            os_create_lock;
+       cfs_atomic_t            os_refc;
+       struct dt_object        *os_lastid_obj;
+       unsigned long           os_destroys_in_progress:1;
+};
+
 struct ofd_device {
        struct dt_device         ofd_dt_dev;
        struct dt_device        *ofd_osd;
 struct ofd_device {
        struct dt_device         ofd_dt_dev;
        struct dt_device        *ofd_osd;
@@ -119,18 +129,16 @@ struct ofd_device {
 
        /* last_rcvd file */
        struct lu_target         ofd_lut;
 
        /* last_rcvd file */
        struct lu_target         ofd_lut;
-       struct dt_object        *ofd_last_group_file;
+       struct dt_object        *ofd_seq_count_file;
        struct dt_object        *ofd_health_check_file;
 
        int                      ofd_subdir_count;
 
        struct dt_object        *ofd_health_check_file;
 
        int                      ofd_subdir_count;
 
-       int                      ofd_max_group;
-       obd_id                   ofd_last_objids[OFD_MAX_GROUPS];
-       struct mutex             ofd_create_locks[OFD_MAX_GROUPS];
-       struct dt_object        *ofd_lastid_obj[OFD_MAX_GROUPS];
-       spinlock_t               ofd_objid_lock;
-       unsigned long            ofd_destroys_in_progress;
-       int                      ofd_precreate_batch;
+       cfs_list_t              ofd_seq_list;
+       rwlock_t                ofd_seq_list_lock;
+       int                     ofd_seq_count;
+       int                     ofd_precreate_batch;
+       spinlock_t              ofd_batch_lock;
 
        /* protect all statfs-related counters */
        spinlock_t               ofd_osfs_lock;
 
        /* protect all statfs-related counters */
        spinlock_t               ofd_osfs_lock;
@@ -333,15 +341,21 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
                        int *from_cache);
 
 /* ofd_fs.c */
                        int *from_cache);
 
 /* ofd_fs.c */
-obd_id ofd_last_id(struct ofd_device *ofd, obd_seq seq);
-void ofd_last_id_set(struct ofd_device *ofd, obd_id id, obd_seq seq);
-int ofd_last_id_write(const struct lu_env *env, struct ofd_device *ofd,
-                     obd_seq seq);
-int ofd_group_load(const struct lu_env *env, struct ofd_device *ofd, int);
+obd_id ofd_seq_last_oid(struct ofd_seq *oseq);
+void ofd_seq_last_oid_set(struct ofd_seq *oseq, obd_id id);
+int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
+                          struct ofd_seq *oseq);
+int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd);
+struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, obd_seq seq);
+void ofd_seq_put(const struct lu_env *env, struct ofd_seq *oseq);
+
 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
                 struct obd_device *obd);
 void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd);
 int ofd_precreate_batch(struct ofd_device *ofd, int batch);
 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
                 struct obd_device *obd);
 void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd);
 int ofd_precreate_batch(struct ofd_device *ofd, int batch);
+struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
+                            obd_seq seq);
+void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd);
 
 /* ofd_io.c */
 int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
 
 /* ofd_io.c */
 int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
@@ -391,7 +405,7 @@ struct ofd_object *ofd_object_find_or_create(const struct lu_env *env,
                                             struct lu_attr *attr);
 int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo);
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                                             struct lu_attr *attr);
 int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo);
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
-                         obd_id id, obd_seq group, int nr);
+                         obd_id id, struct ofd_seq *oseq, int nr);
 
 void ofd_object_put(const struct lu_env *env, struct ofd_object *fo);
 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
 
 void ofd_object_put(const struct lu_env *env, struct ofd_object *fo);
 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
index 489256a..83f58a4 100644 (file)
@@ -214,16 +214,29 @@ int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
        struct ofd_thread_info  *info;
        int                      rc = 0;
 
        struct ofd_thread_info  *info;
        int                      rc = 0;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT) &&
-           ofd->ofd_destroys_in_progress == 0) {
-               /* don't fail lookups for orphan recovery, it causes
-                * later LBUGs when objects still exist during precreate */
-               CDEBUG(D_INFO, "*** obd_fail_loc=%x ***\n",OBD_FAIL_OST_ENOENT);
-               RETURN(-ENOENT);
-       }
-
+       rc = lu_env_refill((struct lu_env *)env);
+       LASSERT(rc == 0);
        info = ofd_info_init(env, exp);
 
        info = ofd_info_init(env, exp);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) {
+               struct ofd_seq          *oseq;
+               oseq = ofd_seq_load(env, ofd, oa->o_seq);
+               if (oseq == NULL) {
+                       CERROR("%s: Can not find seq for "LPU64":"LPU64"\n",
+                               ofd_name(ofd), oa->o_seq, oa->o_id);
+                       RETURN(-EINVAL);
+               }
+
+               if (oseq->os_destroys_in_progress == 0) {
+                       /* don't fail lookups for orphan recovery, it causes
+                        * later LBUGs when objects still exist during
+                        * precreate */
+                       ofd_seq_put(env, oseq);
+                       RETURN(-ENOENT);
+               }
+               ofd_seq_put(env, oseq);
+       }
+
        LASSERT(objcount == 1);
        LASSERT(obj->ioo_bufcnt > 0);
 
        LASSERT(objcount == 1);
        LASSERT(obj->ioo_bufcnt > 0);
 
index ba34412..af49a50 100644 (file)
@@ -273,8 +273,8 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp,
        struct obd_export       *exp;
        struct ofd_device       *ofd;
        struct lustre_handle     conn = { 0 };
        struct obd_export       *exp;
        struct ofd_device       *ofd;
        struct lustre_handle     conn = { 0 };
-       int                      rc, group;
-
+       int                      rc;
+       obd_seq                 seq;
        ENTRY;
 
        if (_exp == NULL || obd == NULL || cluuid == NULL)
        ENTRY;
 
        if (_exp == NULL || obd == NULL || cluuid == NULL)
@@ -301,7 +301,7 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp,
        if (rc)
                GOTO(out, rc);
 
        if (rc)
                GOTO(out, rc);
 
-       group = data->ocd_group;
+       seq = data->ocd_group;
        if (obd->obd_replayable) {
                struct tg_export_data *ted = &exp->exp_target_data;
 
        if (obd->obd_replayable) {
                struct tg_export_data *ted = &exp->exp_target_data;
 
@@ -312,14 +312,24 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp,
                        GOTO(out, rc);
                ofd_export_stats_init(ofd, exp, localdata);
        }
                        GOTO(out, rc);
                ofd_export_stats_init(ofd, exp, localdata);
        }
-       if (group == 0)
+       if (seq == 0)
                GOTO(out, rc = 0);
 
                GOTO(out, rc = 0);
 
-       /* init new group */
-       if (group > ofd->ofd_max_group) {
-               ofd->ofd_max_group = group;
-               rc = ofd_group_load(env, ofd, group);
+       /* init new seq */
+       if (seq > ofd->ofd_seq_count) {
+               struct ofd_seq *oseq;
+
+               ofd->ofd_seq_count = seq;
+               oseq = ofd_seq_load(env, ofd, seq);
+               if (IS_ERR(oseq)) {
+                       CERROR("%s: load oseq "LPX64" error: rc = %ld\n",
+                              ofd_name(ofd), oseq->os_seq, PTR_ERR(oseq));
+                       GOTO(out, rc = PTR_ERR(oseq));
+               } else {
+                       ofd_seq_put(env, oseq);
+               }
        }
        }
+
 out:
        if (rc != 0) {
                class_disconnect(exp);
 out:
        if (rc != 0) {
                class_disconnect(exp);
@@ -558,12 +568,18 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp,
                *vallen = sizeof(*blocksize_bits);
        } else if (KEY_IS(KEY_LAST_ID)) {
                obd_id *last_id = val;
                *vallen = sizeof(*blocksize_bits);
        } else if (KEY_IS(KEY_LAST_ID)) {
                obd_id *last_id = val;
+               struct ofd_seq *oseq;
+
+               oseq = ofd_seq_get(ofd, exp->exp_filter_data.fed_group);
+               LASSERT(oseq != NULL);
                if (last_id) {
                if (last_id) {
-                       if (*vallen < sizeof(*last_id))
+                       if (*vallen < sizeof(*last_id)) {
+                               ofd_seq_put(env, oseq);
                                RETURN(-EOVERFLOW);
                                RETURN(-EOVERFLOW);
-                       *last_id = ofd_last_id(ofd,
-                                              exp->exp_filter_data.fed_group);
+                       }
+                       *last_id = ofd_seq_last_oid(oseq);
                }
                }
+               ofd_seq_put(env, oseq);
                *vallen = sizeof(*last_id);
        } else if (KEY_IS(KEY_FIEMAP)) {
                struct ofd_thread_info          *info;
                *vallen = sizeof(*last_id);
        } else if (KEY_IS(KEY_FIEMAP)) {
                struct ofd_thread_info          *info;
@@ -1059,15 +1075,24 @@ static int ofd_orphans_destroy(const struct lu_env *env,
        int                      skip_orphan;
        int                      rc = 0;
        struct ost_id            oi = oa->o_oi;
        int                      skip_orphan;
        int                      rc = 0;
        struct ost_id            oi = oa->o_oi;
+       struct ofd_seq          *oseq;
 
        ENTRY;
 
 
        ENTRY;
 
+       oseq = ofd_seq_get(ofd, oa->o_seq);
+       if (oseq == NULL) {
+               CERROR("%s: Can not find seq for "LPU64":"LPU64"\n",
+                      ofd_name(ofd), oa->o_seq, oa->o_id);
+               RETURN(-EINVAL);
+       }
+
        LASSERT(exp != NULL);
        skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN);
 
        LASSERT(exp != NULL);
        skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN);
 
-       last = ofd_last_id(ofd, oa->o_seq);
-       LCONSOLE_INFO("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
-                     ofd_obd(ofd)->obd_name, oa->o_id + 1, last);
+       last = ofd_seq_last_oid(oseq);
+       LCONSOLE_INFO("%s: deleting orphan objects from "LPX64":"LPU64
+                     " to "LPU64"\n", ofd_name(ofd), oa->o_seq,
+                     oa->o_id + 1, last);
 
        for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) {
                fid_ostid_unpack(&info->fti_fid, &oi, 0);
 
        for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) {
                fid_ostid_unpack(&info->fti_fid, &oi, 0);
@@ -1076,23 +1101,24 @@ static int ofd_orphans_destroy(const struct lu_env *env,
                        CEMERG("error destroying precreated id "LPU64": %d\n",
                               oi.oi_id, rc);
                if (!skip_orphan) {
                        CEMERG("error destroying precreated id "LPU64": %d\n",
                               oi.oi_id, rc);
                if (!skip_orphan) {
-                       ofd_last_id_set(ofd, oi.oi_id - 1, oa->o_seq);
+                       ofd_seq_last_oid_set(oseq, oi.oi_id - 1);
                        /* update last_id on disk periodically so that if we
                         * restart * we don't need to re-scan all of the just
                         * deleted objects. */
                        if ((oi.oi_id & 511) == 0)
                        /* update last_id on disk periodically so that if we
                         * restart * we don't need to re-scan all of the just
                         * deleted objects. */
                        if ((oi.oi_id & 511) == 0)
-                               ofd_last_id_write(env, ofd, oa->o_seq);
+                               ofd_seq_last_oid_write(env, ofd, oseq);
                }
        }
        CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
               ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id);
        if (!skip_orphan) {
                }
        }
        CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
               ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id);
        if (!skip_orphan) {
-               rc = ofd_last_id_write(env, ofd, oa->o_seq);
+               rc = ofd_seq_last_oid_write(env, ofd, oseq);
        } else {
                /* don't reuse orphan object, return last used objid */
                oa->o_id = last;
                rc = 0;
        }
        } else {
                /* don't reuse orphan object, return last used objid */
                oa->o_id = last;
                rc = 0;
        }
+       ofd_seq_put(env, oseq);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -1102,6 +1128,8 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
 {
        struct ofd_device       *ofd = ofd_exp(exp);
        struct ofd_thread_info  *info;
 {
        struct ofd_device       *ofd = ofd_exp(exp);
        struct ofd_thread_info  *info;
+       obd_seq                 seq = oa->o_seq;
+       struct ofd_seq          *oseq;
        int                      rc = 0, diff;
 
        ENTRY;
        int                      rc = 0, diff;
 
        ENTRY;
@@ -1113,14 +1141,20 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 
        CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n",
        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 
        CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n",
-              oa->o_seq, oa->o_id);
+              seq, oa->o_id);
+
+       oseq = ofd_seq_get(ofd, seq);
+       if (oseq == NULL) {
+               CERROR("%s: Can't find oseq "LPX64"\n", ofd_name(ofd), seq);
+               RETURN(-EINVAL);
+       }
 
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
                if (!ofd_obd(ofd)->obd_recovering ||
 
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
                if (!ofd_obd(ofd)->obd_recovering ||
-                   oa->o_id > ofd_last_id(ofd, oa->o_seq)) {
+                   oa->o_id > ofd_seq_last_oid(oseq)) {
                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
-                                       oa->o_id, ofd_last_id(ofd, oa->o_seq));
+                                       oa->o_id, ofd_seq_last_oid(oseq));
                        GOTO(out_nolock, rc = -EINVAL);
                }
                /* do nothing because we create objects during first write */
                        GOTO(out_nolock, rc = -EINVAL);
                }
                /* do nothing because we create objects during first write */
@@ -1132,42 +1166,42 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                /* destroy orphans */
                if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
                        CERROR("%s: dropping old orphan cleanup request\n",
                /* destroy orphans */
                if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
                        CERROR("%s: dropping old orphan cleanup request\n",
-                              ofd_obd(ofd)->obd_name);
+                              ofd_name(ofd));
                        GOTO(out_nolock, rc = 0);
                }
                /* This causes inflight precreates to abort and drop lock */
                        GOTO(out_nolock, rc = 0);
                }
                /* This causes inflight precreates to abort and drop lock */
-               set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
-               mutex_lock(&ofd->ofd_create_locks[oa->o_seq]);
-               if (!test_bit(oa->o_seq, &ofd->ofd_destroys_in_progress)) {
+               oseq->os_destroys_in_progress = 1;
+               mutex_lock(&oseq->os_create_lock);
+               if (!oseq->os_destroys_in_progress) {
                        CERROR("%s:["LPU64"] destroys_in_progress already cleared\n",
                               exp->exp_obd->obd_name, oa->o_seq);
                        GOTO(out, rc = 0);
                }
                        CERROR("%s:["LPU64"] destroys_in_progress already cleared\n",
                               exp->exp_obd->obd_name, oa->o_seq);
                        GOTO(out, rc = 0);
                }
-               diff = oa->o_id - ofd_last_id(ofd, oa->o_seq);
+               diff = oa->o_id - ofd_seq_last_oid(oseq);
                CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n",
                CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n",
-                      ofd_last_id(ofd, oa->o_seq), diff);
+                       ofd_seq_last_oid(oseq), diff);
                if (-diff > OST_MAX_PRECREATE) {
                        /* FIXME: should reset precreate_next_id on MDS */
                        rc = 0;
                } else if (diff < 0) {
                        rc = ofd_orphans_destroy(env, exp, ofd, oa);
                if (-diff > OST_MAX_PRECREATE) {
                        /* FIXME: should reset precreate_next_id on MDS */
                        rc = 0;
                } else if (diff < 0) {
                        rc = ofd_orphans_destroy(env, exp, ofd, oa);
-                       clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
+                       oseq->os_destroys_in_progress = 0;
                } else {
                        /* XXX: Used by MDS for the first time! */
                } else {
                        /* XXX: Used by MDS for the first time! */
-                       clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
+                       oseq->os_destroys_in_progress = 0;
                }
        } else {
                }
        } else {
-               mutex_lock(&ofd->ofd_create_locks[oa->o_seq]);
+               mutex_lock(&oseq->os_create_lock);
                if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
                        CERROR("%s: dropping old precreate request\n",
                if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
                        CERROR("%s: dropping old precreate request\n",
-                              ofd_obd(ofd)->obd_name);
+                               ofd_obd(ofd)->obd_name);
                        GOTO(out, rc = 0);
                }
                        GOTO(out, rc = 0);
                }
-               /* only precreate if group == 0 and o_id is specfied */
+               /* only precreate if seq == 0 and o_id is specfied */
                if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) {
                        diff = 1; /* shouldn't we create this right now? */
                } else {
                if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) {
                        diff = 1; /* shouldn't we create this right now? */
                } else {
-                       diff = oa->o_id - ofd_last_id(ofd, oa->o_seq);
+                       diff = oa->o_id - ofd_seq_last_oid(oseq);
                }
        }
        if (diff > 0) {
                }
        }
        if (diff > 0) {
@@ -1185,13 +1219,13 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                        if (rc) {
                                CDEBUG(D_HA, "%s: failed to acquire grant space"
                                       "for precreate (%d)\n",
                        if (rc) {
                                CDEBUG(D_HA, "%s: failed to acquire grant space"
                                       "for precreate (%d)\n",
-                                      ofd_obd(ofd)->obd_name, diff);
+                                      ofd_name(ofd), diff);
                                diff = 0;
                        }
                }
 
                while (diff > 0) {
                                diff = 0;
                        }
                }
 
                while (diff > 0) {
-                       next_id = ofd_last_id(ofd, oa->o_seq) + 1;
+                       next_id = ofd_seq_last_oid(oseq) + 1;
                        count = ofd_precreate_batch(ofd, diff);
 
                        CDEBUG(D_HA, "%s: reserve %d objects in group "LPU64
                        count = ofd_precreate_batch(ofd, diff);
 
                        CDEBUG(D_HA, "%s: reserve %d objects in group "LPU64
@@ -1208,7 +1242,7 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                        }
 
                        rc = ofd_precreate_objects(env, ofd, next_id,
                        }
 
                        rc = ofd_precreate_objects(env, ofd, next_id,
-                                                  oa->o_seq, count);
+                                                  oseq, count);
                        if (rc > 0) {
                                created += rc;
                                diff -= rc;
                        if (rc > 0) {
                                created += rc;
                                diff -= rc;
@@ -1216,16 +1250,14 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                                break;
                        }
                }
                                break;
                        }
                }
-               if (created > 0) {
+               if (created > 0)
                        /* some objects got created, we can return
                         * them, even if last creation failed */
                        /* some objects got created, we can return
                         * them, even if last creation failed */
-                       oa->o_id = ofd_last_id(ofd, oa->o_seq);
                        rc = 0;
                        rc = 0;
-               } else {
+               else
                        CERROR("unable to precreate: %d\n", rc);
                        CERROR("unable to precreate: %d\n", rc);
-                       oa->o_id = ofd_last_id(ofd, oa->o_seq);
-               }
 
 
+               oa->o_id = ofd_seq_last_oid(oseq);
                oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
 
                if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
                oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
 
                if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
@@ -1236,14 +1268,15 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
 
        ofd_info2oti(info, oti);
 out:
 
        ofd_info2oti(info, oti);
 out:
-       mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]);
+       mutex_unlock(&oseq->os_create_lock);
 out_nolock:
        if (rc == 0 && ea != NULL) {
                struct lov_stripe_md *lsm = *ea;
 
                lsm->lsm_object_id = oa->o_id;
        }
 out_nolock:
        if (rc == 0 && ea != NULL) {
                struct lov_stripe_md *lsm = *ea;
 
                lsm->lsm_object_id = oa->o_id;
        }
-       return rc;
+       ofd_seq_put(env, oseq);
+       RETURN(rc);
 }
 
 int ofd_getattr(const struct lu_env *env, struct obd_export *exp,
 }
 
 int ofd_getattr(const struct lu_env *env, struct obd_export *exp,
index c8fe60c..1dd1abb 100644 (file)
@@ -146,7 +146,7 @@ void ofd_object_put(const struct lu_env *env, struct ofd_object *fo)
 }
 
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
 }
 
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
-                         obd_id id, obd_seq seq, int nr)
+                         obd_id id, struct ofd_seq *oseq, int nr)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_object       *fo = NULL;
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_object       *fo = NULL;
@@ -162,14 +162,14 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
        ENTRY;
 
        /* Don't create objects beyond the valid range for this SEQ */
        ENTRY;
 
        /* Don't create objects beyond the valid range for this SEQ */
-       if (unlikely(fid_seq_is_mdt0(seq) && (id + nr) >= IDIF_MAX_OID)) {
+       if (unlikely(fid_seq_is_mdt0(oseq->os_seq) && (id + nr) >= IDIF_MAX_OID)) {
                CERROR("%s:"POSTID" hit the IDIF_MAX_OID (1<<48)!\n",
                CERROR("%s:"POSTID" hit the IDIF_MAX_OID (1<<48)!\n",
-                      ofd_name(ofd), id, seq);
+                      ofd_name(ofd), id, oseq->os_seq);
                RETURN(rc = -ENOSPC);
                RETURN(rc = -ENOSPC);
-       } else if (unlikely(!fid_seq_is_mdt0(seq) &&
-                  (id + nr) >= OBIF_MAX_OID)) {
+       } else if (unlikely(!fid_seq_is_mdt0(oseq->os_seq) &&
+                           (id + nr) >= OBIF_MAX_OID)) {
                CERROR("%s:"POSTID" hit the OBIF_MAX_OID (1<<32)!\n",
                CERROR("%s:"POSTID" hit the OBIF_MAX_OID (1<<32)!\n",
-                      ofd_name(ofd), id, seq);
+                      ofd_name(ofd), id, oseq->os_seq);
                RETURN(rc = -ENOSPC);
        }
 
                RETURN(rc = -ENOSPC);
        }
 
@@ -197,7 +197,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
        /* prepare objects */
        for (i = 0; i < nr; i++) {
                info->fti_ostid.oi_id = id + i;
        /* prepare objects */
        for (i = 0; i < nr; i++) {
                info->fti_ostid.oi_id = id + i;
-               info->fti_ostid.oi_seq = seq;
+               info->fti_ostid.oi_seq = oseq->os_seq;
 
                rc = fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0);
                if (rc) {
 
                rc = fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0);
                if (rc) {
@@ -228,8 +228,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, ofd->ofd_lastid_obj[seq],
-                                    sizeof(tmp), info->fti_off, th);
+       rc = dt_declare_record_write(env, oseq->os_lastid_obj, sizeof(tmp),
+                                    info->fti_off, th);
        if (rc)
                GOTO(trans_stop, rc);
 
        if (rc)
                GOTO(trans_stop, rc);
 
@@ -239,8 +239,9 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
 
                if (unlikely(ofd_object_exists(fo))) {
                        /* object may exist being re-created by write replay */
 
                if (unlikely(ofd_object_exists(fo))) {
                        /* object may exist being re-created by write replay */
-                       CDEBUG(D_INODE, "object "LPD64"/"LPD64" exists: "
-                              DFID"\n", seq, id, PFID(&info->fti_fid));
+                       CDEBUG(D_INODE, "object "LPX64"/"LPX64" exists: "
+                              DFID"\n", oseq->os_seq, id,
+                              PFID(&info->fti_fid));
                        continue;
                }
 
                        continue;
                }
 
@@ -275,13 +276,13 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                                break;
                        LASSERT(ofd_object_exists(fo));
                }
                                break;
                        LASSERT(ofd_object_exists(fo));
                }
-               ofd_last_id_set(ofd, id + i, seq);
+               ofd_seq_last_oid_set(oseq, id + i);
        }
 
        objects = i;
        if (objects > 0) {
        }
 
        objects = i;
        if (objects > 0) {
-               tmp = cpu_to_le64(ofd_last_id(ofd, seq));
-               rc = dt_record_write(env, ofd->ofd_lastid_obj[seq],
+               tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
+               rc = dt_record_write(env, oseq->os_lastid_obj,
                                     &info->fti_buf, &info->fti_off, th);
        }
 trans_stop:
                                     &info->fti_buf, &info->fti_off, th);
        }
 trans_stop: