struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
struct osd_device *dev;
struct osd_inode_id *id;
- struct osd_oi *oi;
struct inode *inode;
int result;
info = osd_oti_get(env);
dev = osd_dev(ldev);
id = &info->oti_id;
- oi = &dev->od_oi;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
RETURN(-ENOENT);
- result = osd_oi_lookup(info, oi, fid, id);
+ result = osd_oi_lookup(info, osd_fid2oi(dev, fid), fid, id);
if (result != 0) {
if (result == -ENOENT)
result = 0;
id->oii_ino = obj->oo_inode->i_ino;
id->oii_gen = obj->oo_inode->i_generation;
- return osd_oi_insert(info, &osd->od_oi, fid, id, th,
+ return osd_oi_insert(info, osd_fid2oi(osd, fid), fid, id, th,
uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
}
OSD_EXEC_OP(th, destroy);
- result = osd_oi_delete(osd_oti_get(env), &osd->od_oi, fid, th);
+ result = osd_oi_delete(osd_oti_get(env),
+ osd_fid2oi(osd, fid), fid, th);
/* XXX: add to ext3 orphan list */
/* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
lu_object_put(env, &o->od_obj_area->do_lu);
o->od_obj_area = NULL;
}
- osd_oi_fini(info, &o->od_oi);
+ if (o->od_oi_table != NULL)
+ osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
RETURN(0);
}
ENTRY;
/* 1. initialize oi before any file create or file open */
- result = osd_oi_init(oti, &osd->od_oi,
+ result = osd_oi_init(oti, &osd->od_oi_table,
&osd->od_dt_dev, lu2md_dev(pdev));
- if (result != 0)
+ if (result < 0)
RETURN(result);
+ LASSERT(result > 0);
+ osd->od_oi_count = result;
+
lmi = osd->od_mount;
lsi = s2lsi(lmi->lmi_sb);
ldd = lsi->lsi_ldd;
#include "osd_igif.h"
#include "dt_object.h"
-struct oi_descr {
- int fid_size;
- char *name;
- __u32 oid;
-};
+#define OSD_OI_FID_NR (1UL << OSD_OI_FID_OID_BITS)
+#define OSD_OI_FID_NR_MAX (1UL << OSD_OI_FID_OID_BITS_MAX)
+
+static unsigned int osd_oi_num = OSD_OI_FID_NR;
+CFS_MODULE_PARM(osd_oi_num, "i", int, 0444,
+ "Number of Object Index containers to be created, "
+ "it's only valid for new filesystem.");
/** to serialize concurrent OI index initialization */
static cfs_mutex_t oi_init_lock;
.dif_ptrsize = 4
};
-static const struct oi_descr oi_descr[OSD_OI_FID_NR] = {
- [OSD_OI_FID_16] = {
- .fid_size = sizeof(struct lu_fid),
- .name = "oi.16",
- .oid = OSD_OI_FID_16_OID
+#define OSD_OI_NAME_BASE "oi.16"
+
+/**
+ * Open an OI(Ojbect Index) container.
+ *
+ * \param name Name of OI container
+ * \param objp Pointer of returned OI
+ *
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int
+osd_oi_open(struct osd_thread_info *info,
+ struct dt_device *dev, char *name, struct dt_object **objp)
+{
+ const struct lu_env *env = info->oti_env;
+ struct dt_object *obj;
+ int rc;
+
+ obj = dt_store_open(env, dev, "", name, &info->oti_fid);
+ if (IS_ERR(obj))
+ return PTR_ERR(obj);
+
+ oi_feat.dif_keysize_min = sizeof(info->oti_fid);
+ oi_feat.dif_keysize_max = sizeof(info->oti_fid);
+
+ rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
+ if (rc != 0) {
+ lu_object_put(info->oti_env, &obj->do_lu);
+ CERROR("%s: wrong index %s: rc = %d\n",
+ dev->dd_lu_dev.ld_obd->obd_name, name, rc);
+ return rc;
}
-};
-static int osd_oi_index_create(struct osd_thread_info *info,
+ *objp = obj;
+ return 0;
+}
+
+
+static void
+osd_oi_table_put(struct osd_thread_info *info,
+ struct osd_oi *oi_table, unsigned oi_count)
+{
+ int i;
+
+ for (i = 0; i < oi_count; i++) {
+ LASSERT(oi_table[i].oi_dir != NULL);
+
+ lu_object_put(info->oti_env, &oi_table[i].oi_dir->do_lu);
+ oi_table[i].oi_dir = NULL;
+ }
+}
+
+/**
+ * Open OI(Object Index) table.
+ * If \a oi_count is zero, which means caller doesn't know how many OIs there
+ * will be, this function can either return 0 for new filesystem, or number
+ * of OIs on existed filesystem.
+ *
+ * If \a oi_count is non-zero, which means caller does know number of OIs on
+ * filesystem, this function should return the exactly same number on
+ * success, or error code in failure.
+ *
+ * \param oi_count Number of expected OI containers
+ * \param try_all Try to open all OIs even see failures
+ *
+ * \retval +ve number of opened OI containers
+ * \retval 0 no OI containers found
+ * \retval -ve failure
+ */
+static int
+osd_oi_table_open(struct osd_thread_info *info, struct dt_device *dev,
+ struct osd_oi *oi_table, unsigned oi_count, int try_all)
+{
+ int count = 0;
+ int rc = 0;
+ int i;
+
+ /* NB: oi_count != 0 means that we have already created/known all OIs
+ * and have known exact number of OIs. */
+ LASSERT(oi_count <= OSD_OI_FID_NR_MAX);
+
+ for (i = 0; i < (oi_count != 0 ? oi_count : OSD_OI_FID_NR_MAX); i++) {
+ char name[12];
+
+ sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
+ rc = osd_oi_open(info, dev, name, &oi_table[i].oi_dir);
+ if (rc == 0) {
+ count++;
+ continue;
+ }
+
+ if (try_all)
+ continue;
+
+ if (rc == -ENOENT && oi_count == 0)
+ return count;
+
+ CERROR("%s: can't open %s: rc = %d\n",
+ dev->dd_lu_dev.ld_obd->obd_name, name, rc);
+
+ if (oi_count > 0) {
+ CERROR("%s: expect to open total %d OI files.\n",
+ dev->dd_lu_dev.ld_obd->obd_name, oi_count);
+ }
+
+ break;
+ }
+
+ if (try_all)
+ return count;
+
+ if (rc < 0) {
+ osd_oi_table_put(info, oi_table, count);
+ return rc;
+ }
+
+ return count;
+}
+
+static int osd_oi_table_create(struct osd_thread_info *info,
struct dt_device *dev,
- struct md_device *mdev)
+ struct md_device *mdev, int oi_count)
{
const struct lu_env *env;
- struct lu_fid *oi_fid = &info->oti_fid;
struct md_object *mdo;
int i;
- int rc;
env = info->oti_env;
+ for (i = 0; i < oi_count; ++i) {
+ char name[12];
- for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
- char *name;
- name = oi_descr[i].name;
- lu_local_obj_fid(oi_fid, oi_descr[i].oid);
- oi_feat.dif_keysize_min = oi_descr[i].fid_size,
- oi_feat.dif_keysize_max = oi_descr[i].fid_size,
+ sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
- mdo = llo_store_create_index(env, mdev, dev,
- "", name,
- oi_fid, &oi_feat);
+ lu_local_obj_fid(&info->oti_fid, OSD_OI_FID_OID_FIRST + i);
+ oi_feat.dif_keysize_min = sizeof(info->oti_fid);
+ oi_feat.dif_keysize_max = sizeof(info->oti_fid);
- if (IS_ERR(mdo))
+ mdo = llo_store_create_index(env, mdev, dev, "", name,
+ &info->oti_fid, &oi_feat);
+ if (IS_ERR(mdo)) {
+ CERROR("Failed to create OI[%d] on %s: %d\n",
+ i, dev->dd_lu_dev.ld_obd->obd_name,
+ (int)PTR_ERR(mdo));
RETURN(PTR_ERR(mdo));
+ }
lu_object_put(env, &mdo->mo_lu);
}
}
int osd_oi_init(struct osd_thread_info *info,
- struct osd_oi *oi,
+ struct osd_oi **oi_table,
struct dt_device *dev,
struct md_device *mdev)
{
- const struct lu_env *env;
+ struct osd_oi *oi;
int rc;
- int i;
- env = info->oti_env;
+ OBD_ALLOC(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
+ if (oi == NULL)
+ return -ENOMEM;
+
cfs_mutex_lock(&oi_init_lock);
- memset(oi, 0, sizeof *oi);
-retry:
- for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
- const char *name;
- struct dt_object *obj;
-
- name = oi_descr[i].name;
- oi_feat.dif_keysize_min = oi_descr[i].fid_size,
- oi_feat.dif_keysize_max = oi_descr[i].fid_size,
-
- obj = dt_store_open(env, dev, "", name, &info->oti_fid);
- if (!IS_ERR(obj)) {
- rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
- if (rc == 0) {
- LASSERT(obj->do_index_ops != NULL);
- oi->oi_dir = obj;
- } else {
- CERROR("Wrong index \"%s\": %d\n", name, rc);
- lu_object_put(env, &obj->do_lu);
- }
- } else {
- rc = PTR_ERR(obj);
- if (rc == -ENOENT) {
- rc = osd_oi_index_create(info, dev, mdev);
- if (!rc)
- goto retry;
- }
- CERROR("Cannot open \"%s\": %d\n", name, rc);
- }
+
+ rc = osd_oi_table_open(info, dev, oi, 0, 0);
+ if (rc != 0)
+ goto out;
+
+ rc = osd_oi_open(info, dev, OSD_OI_NAME_BASE, &oi[0].oi_dir);
+ if (rc == 0) { /* found single OI from old filesystem */
+ rc = 1;
+ goto out;
+ }
+
+ if (rc != -ENOENT) {
+ CERROR("%s: can't open %s: rc = %d\n",
+ dev->dd_lu_dev.ld_obd->obd_name, OSD_OI_NAME_BASE, rc);
+ goto out;
}
+
+ /* create OI objects */
+ rc = osd_oi_table_create(info, dev, mdev, osd_oi_num);
if (rc != 0)
- osd_oi_fini(info, oi);
+ goto out;
+
+ rc = osd_oi_table_open(info, dev, oi, osd_oi_num, 0);
+ LASSERT(rc == osd_oi_num || rc < 0);
+
+ out:
+ if (rc < 0)
+ OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
+ else
+ *oi_table = oi;
cfs_mutex_unlock(&oi_init_lock);
return rc;
}
-void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi)
+void osd_oi_fini(struct osd_thread_info *info,
+ struct osd_oi **oi_table, unsigned oi_count)
{
- if (oi->oi_dir != NULL) {
- lu_object_put(info->oti_env, &oi->oi_dir->do_lu);
- oi->oi_dir = NULL;
- }
-}
+ struct osd_oi *oi = *oi_table;
-static inline int fid_is_oi_fid(const struct lu_fid *fid)
-{
- /* We need to filter-out oi obj's fid. As we can not store it, while
- * oi-index create operation.
- */
- return (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
- fid_oid(fid) == OSD_OI_FID_16_OID));
+ osd_oi_table_put(info, oi, oi_count);
+
+ OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
+ *oi_table = NULL;
}
int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
struct dt_object *idx;
const struct dt_key *key;
- if (fid_is_oi_fid(fid))
+ if (!fid_is_norm(fid))
return -ENOENT;
idx = oi->oi_dir;
struct osd_inode_id *id;
const struct dt_key *key;
- if (osd_fid_is_igif(fid))
- return 0;
-
- if (fid_is_oi_fid(fid))
+ if (!fid_is_norm(fid))
return 0;
idx = oi->oi_dir;
struct dt_object *idx;
const struct dt_key *key;
- if (osd_fid_is_igif(fid))
+ if (!fid_is_norm(fid))
return 0;
idx = oi->oi_dir;
int osd_oi_mod_init()
{
+ if (osd_oi_num == 0 || osd_oi_num > OSD_OI_FID_NR_MAX)
+ osd_oi_num = OSD_OI_FID_NR;
+
cfs_mutex_init(&oi_init_lock);
return 0;
}