int rc;
ENTRY;
- if (ss == NULL)
- RETURN(1);
+ LASSERT(ss != NULL);
+ LASSERT(ss->ss_server_fld != NULL);
rc = osd_fld_lookup(env, osd, seq, range);
if (rc != 0) {
}
static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
- struct lu_fid *fid)
+ const struct lu_fid *fid)
{
+ struct seq_server_site *ss = osd_seq_site(osd);
ENTRY;
- if (!fid_is_norm(fid) && !fid_is_root(fid))
+ /* FID seqs not in FLDB, must be local seq */
+ if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
+ RETURN(0);
+
+ /* If FLD is not being initialized yet, it only happens during the
+ * initialization, likely during mgs initialization, and we assume
+ * this is local FID. */
+ if (ss == NULL || ss->ss_server_fld == NULL)
RETURN(0);
+ /* Only check the local FLDB here */
if (osd_seq_exists(env, osd, fid_seq(fid)))
RETURN(0);
struct osd_thread_info *oti = osd_oti_get(env);
struct osd_object *parent = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(parent);
- struct lu_fid *fid = (struct lu_fid *)rec;
+ struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
+ const struct lu_fid *fid = rec1->rec_fid;
struct osd_thandle *oh;
struct osd_object *child = NULL;
__u32 attr;
if (unlikely(rc == 1)) {
/* Insert remote entry */
memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
- oti->oti_zde.lzd_reg.zde_type = IFTODT(S_IFDIR & S_IFMT);
+ oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
} else {
/*
* To simulate old Orion setups with ./.. stored in the
LASSERT(lde);
- lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc));
-
- if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
+ rc = -zap_cursor_retrieve(it->ozi_zc, za);
+ if (unlikely(rc != 0))
GOTO(out, rc);
+ lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc));
namelen = strlen(za->za_name);
if (namelen > NAME_MAX)
GOTO(out, rc = -EOVERFLOW);
RETURN(rc);
}
+static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
+ __u32 attr)
+{
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ zap_attribute_t *za = &osd_oti_get(env)->oti_za;
+ int rc, namelen = 0;
+ ENTRY;
+
+ if (it->ozi_pos <= 1)
+ namelen = 1;
+ else if (it->ozi_pos == 2)
+ namelen = 2;
+
+ if (namelen > 0) {
+ rc = lu_dirent_calc_size(namelen, attr);
+ RETURN(rc);
+ }
+
+ rc = -zap_cursor_retrieve(it->ozi_zc, za);
+ if (unlikely(rc != 0))
+ RETURN(rc);
+
+ if (za->za_integer_length != 8 || za->za_num_integers < 3) {
+ CERROR("%s: unsupported direntry format: %d %d\n",
+ osd_obj2dev(it->ozi_obj)->od_svname,
+ za->za_integer_length, (int)za->za_num_integers);
+ RETURN(-EIO);
+ }
+
+ namelen = strlen(za->za_name);
+ if (namelen > NAME_MAX)
+ RETURN(-EOVERFLOW);
+
+ rc = lu_dirent_calc_size(namelen, attr);
+
+ RETURN(rc);
+}
+
static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
{
struct osd_zap_it *it = (struct osd_zap_it *)di;
.key = osd_dir_it_key,
.key_size = osd_dir_it_key_size,
.rec = osd_dir_it_rec,
+ .rec_size = osd_dir_it_rec_size,
.store = osd_dir_it_store,
.load = osd_dir_it_load
}
* Primitives for index files using binary keys.
*/
-static int osd_prepare_key(struct osd_object *o, __u64 *dst,
- const struct dt_key *src)
+/* key integer_size is 8 */
+static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
+ const struct dt_key *src)
{
int size;
memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
memcpy(dst, (const char *)src, o->oo_keysize);
- return size;
+ return (size/sizeof(__u64));
}
static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
int rc;
ENTRY;
- rc = osd_prepare_key(obj, k, key);
+ rc = osd_prepare_key_uint64(obj, k, key);
rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
k, rc, obj->oo_recusize, obj->oo_recsize,
oh = container_of0(th, struct osd_thandle, ot_super);
- rc = osd_prepare_key(obj, k, key);
+ rc = osd_prepare_key_uint64(obj, k, key);
/* Insert (key,oid) into ZAP */
rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object,
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
- rc = osd_prepare_key(obj, k, key);
+ rc = osd_prepare_key_uint64(obj, k, key);
/* Remove binary key from the ZAP */
rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object,
* XXX: we need a binary version of zap_cursor_move_to_key()
* to implement this API */
if (*((const __u64 *)key) != 0)
- CERROR("NOT IMPLEMETED YET (move to %Lx)\n", *((__u64 *)key));
+ CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
+ *((__u64 *)key));
zap_cursor_fini(it->ozi_zc);
memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
if (rc)
RETURN(rc);
- rc = osd_prepare_key(obj, k, (const struct dt_key *)za->za_name);
+ rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
k, rc, obj->oo_recusize, obj->oo_recsize,
}
};
+struct osd_metadnode_it {
+ struct osd_device *mit_dev;
+ __u64 mit_pos;
+ struct lu_fid mit_fid;
+ int mit_prefetched;
+ __u64 mit_prefetched_dnode;
+};
+
+static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
+ struct dt_object *dt, __u32 attr,
+ struct lustre_capa *capa)
+{
+ struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
+ struct osd_metadnode_it *it;
+ ENTRY;
+
+ OBD_ALLOC_PTR(it);
+ if (unlikely(it == NULL))
+ RETURN(ERR_PTR(-ENOMEM));
+
+ it->mit_dev = dev;
+
+ /* XXX: dmu_object_next() does NOT find dnodes allocated
+ * in the current non-committed txg, so we force txg
+ * commit to find all existing dnodes ... */
+ txg_wait_synced(dmu_objset_pool(dev->od_objset.os), 0ULL);
+
+ RETURN((struct dt_it *)it);
+}
+
+static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
+{
+ struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
+
+ OBD_FREE_PTR(it);
+}
+
+static int osd_zfs_otable_it_get(const struct lu_env *env,
+ struct dt_it *di, const struct dt_key *key)
+{
+ return 0;
+}
+
+static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
+{
+}
+
+#define OTABLE_PREFETCH 256
+
+static void osd_zfs_otable_prefetch(const struct lu_env *env,
+ struct osd_metadnode_it *it)
+{
+ struct osd_device *dev = it->mit_dev;
+ udmu_objset_t *uos = &dev->od_objset;
+ int rc;
+
+ /* can go negative on the very first access to the iterator
+ * or if some non-Lustre objects were found */
+ if (unlikely(it->mit_prefetched < 0))
+ it->mit_prefetched = 0;
+
+ if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
+ return;
+
+ if (it->mit_prefetched_dnode == 0)
+ it->mit_prefetched_dnode = it->mit_pos;
+
+ while (it->mit_prefetched < OTABLE_PREFETCH) {
+ rc = -dmu_object_next(uos->os, &it->mit_prefetched_dnode,
+ B_FALSE, 0);
+ if (unlikely(rc != 0))
+ break;
+
+ /* dmu_prefetch() was exported in 0.6.2, if you use with
+ * an older release, just comment it out - this is an
+ * optimization */
+ dmu_prefetch(uos->os, it->mit_prefetched_dnode, 0, 0);
+
+ it->mit_prefetched++;
+ }
+}
+
+static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
+{
+ struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
+ struct lustre_mdt_attrs *lma;
+ struct osd_device *dev = it->mit_dev;
+ udmu_objset_t *uos = &dev->od_objset;
+ nvlist_t *nvbuf = NULL;
+ uchar_t *v;
+ __u64 dnode;
+ int rc, s;
+
+ memset(&it->mit_fid, 0, sizeof(it->mit_fid));
+
+ dnode = it->mit_pos;
+ do {
+ rc = -dmu_object_next(uos->os, &it->mit_pos, B_FALSE, 0);
+ if (unlikely(rc != 0))
+ GOTO(out, rc = 1);
+ it->mit_prefetched--;
+
+ /* LMA is required for this to be a Lustre object.
+ * If there is no xattr skip it. */
+ rc = __osd_xattr_load(uos, it->mit_pos, &nvbuf);
+ if (unlikely(rc != 0))
+ continue;
+
+ LASSERT(nvbuf != NULL);
+ rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
+ if (likely(rc == 0)) {
+ /* Lustre object */
+ lma = (struct lustre_mdt_attrs *)v;
+ lustre_lma_swab(lma);
+ it->mit_fid = lma->lma_self_fid;
+ nvlist_free(nvbuf);
+ break;
+ } else {
+ /* not a Lustre object, try next one */
+ nvlist_free(nvbuf);
+ }
+
+ } while (1);
+
+
+ /* we aren't prefetching in the above loop because the number of
+ * non-Lustre objects is very small and we will be repeating very
+ * rare. in case we want to use this to iterate over non-Lustre
+ * objects (i.e. when we convert regular ZFS in Lustre) it makes
+ * sense to initiate prefetching in the loop */
+
+ /* 0 - there are more items, +1 - the end */
+ if (likely(rc == 0))
+ osd_zfs_otable_prefetch(env, it);
+
+ CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
+ it->mit_pos, PFID(&it->mit_fid), rc);
+
+out:
+ return rc;
+}
+
+static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
+ const struct dt_it *di)
+{
+ return NULL;
+}
+
+static int osd_zfs_otable_it_key_size(const struct lu_env *env,
+ const struct dt_it *di)
+{
+ return sizeof(__u64);
+}
+
+static int osd_zfs_otable_it_rec(const struct lu_env *env,
+ const struct dt_it *di,
+ struct dt_rec *rec, __u32 attr)
+{
+ struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
+ struct lu_fid *fid = (struct lu_fid *)rec;
+ ENTRY;
+
+ *fid = it->mit_fid;
+
+ RETURN(0);
+}
+
+
+static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
+ const struct dt_it *di)
+{
+ struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
+
+ return it->mit_pos;
+}
+
+static int osd_zfs_otable_it_load(const struct lu_env *env,
+ const struct dt_it *di, __u64 hash)
+{
+ struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
+
+ it->mit_pos = hash;
+ it->mit_prefetched = 0;
+ it->mit_prefetched_dnode = 0;
+
+ return osd_zfs_otable_it_next(env, (struct dt_it *)di);
+}
+
+static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
+ const struct dt_it *di, void *key_rec)
+{
+ return 0;
+}
+
+const struct dt_index_operations osd_zfs_otable_ops = {
+ .dio_it = {
+ .init = osd_zfs_otable_it_init,
+ .fini = osd_zfs_otable_it_fini,
+ .get = osd_zfs_otable_it_get,
+ .put = osd_zfs_otable_it_put,
+ .next = osd_zfs_otable_it_next,
+ .key = osd_zfs_otable_it_key,
+ .key_size = osd_zfs_otable_it_key_size,
+ .rec = osd_zfs_otable_it_rec,
+ .store = osd_zfs_otable_it_store,
+ .load = osd_zfs_otable_it_load,
+ .key_rec = osd_zfs_otable_it_key_rec,
+ }
+};
+
int osd_index_try(const struct lu_env *env, struct dt_object *dt,
const struct dt_index_features *feat)
{
if (feat->dif_flags & DT_IND_RANGE)
RETURN(-ERANGE);
- if (unlikely(feat == &dt_otable_features))
- /* do not support oi scrub yet. */
- RETURN(-ENOTSUPP);
+ if (unlikely(feat == &dt_otable_features)) {
+ dt->do_index_ops = &osd_zfs_otable_ops;
+ RETURN(0);
+ }
LASSERT(obj->oo_db != NULL);
if (likely(feat == &dt_directory_features)) {
RETURN(0);
}
-