/*
* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
- */
-/*
- * Copyright (c) 2012, 2013, Intel Corporation.
- * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
uint64_t dirhash)
{
struct osd_device *d = osd_obj2dev(o);
- zap_cursor_init_serialized(zc, d->od_os, o->oo_db->db_object, dirhash);
+ osd_zap_cursor_init_serialized(zc, d->od_os,
+ o->oo_db->db_object, dirhash);
}
static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
static struct dt_it *osd_index_it_init(const struct lu_env *env,
struct dt_object *dt,
- __u32 unused,
- struct lustre_capa *capa)
+ __u32 unused)
{
struct osd_thread_info *info = osd_oti_get(env);
struct osd_zap_it *it;
int rc;
ENTRY;
- /* XXX: check capa ? */
+ if (obj->oo_destroyed)
+ RETURN(ERR_PTR(-ENOENT));
LASSERT(lu_object_exists(lo));
LASSERT(obj->oo_db);
LASSERT(osd_object_is_zap(obj->oo_db));
LASSERT(info);
- it = &info->oti_it_zap;
+ OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
+ if (it == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
- if (rc != 0)
+ if (rc != 0) {
+ OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
RETURN(ERR_PTR(rc));
+ }
it->ozi_obj = obj;
- it->ozi_capa = capa;
it->ozi_reset = 1;
lu_object_get(lo);
static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
{
- struct osd_zap_it *it = (struct osd_zap_it *)di;
- struct osd_object *obj;
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ struct osd_object *obj;
ENTRY;
LASSERT(it);
osd_zap_cursor_fini(it->ozi_zc);
lu_object_put(env, &obj->oo_dt.do_lu);
+ OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
EXIT;
}
ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
}
-/*
- * as we don't know FID, we can't use LU object, so this function
- * partially duplicate __osd_xattr_get() which is built around
- * LU-object and uses it to cache data like regular EA dnode, etc
+/**
+ * Get the object's FID from its LMA EA.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] osd pointer to the OSD device
+ * \param[in] oid the object's local identifier
+ * \param[out] fid the buffer to hold the object's FID
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
*/
-static int osd_find_parent_by_dnode(const struct lu_env *env,
- struct dt_object *o,
- struct lu_fid *fid)
+static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+ uint64_t oid, struct lu_fid *fid)
{
- struct osd_device *osd = osd_obj2dev(osd_dt_obj(o));
- struct lustre_mdt_attrs *lma;
+ struct objset *os = osd->od_os;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma =
+ (struct lustre_mdt_attrs *)oti->oti_buf;
struct lu_buf buf;
- sa_handle_t *sa_hdl;
- nvlist_t *nvbuf = NULL;
- uchar_t *value;
- uint64_t dnode;
- int rc, size;
+ nvlist_t *sa_xattr = NULL;
+ sa_handle_t *sa_hdl = NULL;
+ uchar_t *nv_value = NULL;
+ uint64_t xattr = ZFS_NO_OBJECT;
+ int size = 0;
+ int rc;
ENTRY;
- /* first of all, get parent dnode from own attributes */
- LASSERT(osd_dt_obj(o)->oo_db);
- rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
- NULL, SA_HDL_PRIVATE, &sa_hdl);
- if (rc)
- RETURN(rc);
+ rc = __osd_xattr_load(osd, oid, &sa_xattr);
+ if (rc == -ENOENT)
+ goto regular;
- dnode = ZFS_NO_OBJECT;
- rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
- sa_handle_destroy(sa_hdl);
- if (rc)
- RETURN(rc);
+ if (rc != 0)
+ GOTO(out, rc);
- /* now get EA buffer */
- rc = __osd_xattr_load(osd, dnode, &nvbuf);
- if (rc)
- GOTO(regular, rc);
+ rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value,
+ &size);
+ if (rc == -ENOENT)
+ goto regular;
- /* XXX: if we get that far.. should we cache the result? */
+ if (rc != 0)
+ GOTO(out, rc);
- /* try to find LMA attribute */
- LASSERT(nvbuf != NULL);
- rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
- if (rc == 0 && size >= sizeof(*lma)) {
- lma = (struct lustre_mdt_attrs *)value;
- lustre_lma_swab(lma);
- *fid = lma->lma_self_fid;
- GOTO(out, rc = 0);
- }
+ if (unlikely(size > sizeof(oti->oti_buf)))
+ GOTO(out, rc = -ERANGE);
-regular:
- /* no LMA attribute in SA, let's try regular EA */
+ memcpy(lma, nv_value, size);
- /* first of all, get parent dnode storing regular EA */
- rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
- if (rc)
+ goto found;
+
+regular:
+ rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
+ if (rc != 0)
GOTO(out, rc);
- dnode = ZFS_NO_OBJECT;
- rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
+ rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8);
sa_handle_destroy(sa_hdl);
- if (rc)
+ if (rc != 0)
GOTO(out, rc);
- CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
- buf.lb_buf = osd_oti_get(env)->oti_buf;
- buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
-
- /* now try to find LMA */
- rc = __osd_xattr_get_large(env, osd, dnode, &buf,
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(oti->oti_buf);
+ rc = __osd_xattr_get_large(env, osd, xattr, &buf,
XATTR_NAME_LMA, &size);
- if (rc == 0 && size >= sizeof(*lma)) {
- lma = buf.lb_buf;
- lustre_lma_swab(lma);
- *fid = lma->lma_self_fid;
- GOTO(out, rc = 0);
- } else if (rc < 0) {
+ if (rc != 0)
GOTO(out, rc);
- } else {
+
+found:
+ if (size < sizeof(*lma))
GOTO(out, rc = -EIO);
+
+ lustre_lma_swab(lma);
+ if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
+ CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
+ CWARN("%s: unsupported incompat LMA feature(s) %#x for "
+ "oid = "LPX64"\n", osd->od_svname,
+ lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid);
+ GOTO(out, rc = -EOPNOTSUPP);
+ } else {
+ *fid = lma->lma_self_fid;
+ GOTO(out, rc = 0);
}
out:
- if (nvbuf != NULL)
- nvlist_free(nvbuf);
+ if (sa_xattr != NULL)
+ nvlist_free(sa_xattr);
+ return rc;
+}
+
+/*
+ * As we don't know FID, we can't use LU object, so this function
+ * partially duplicate __osd_xattr_get() which is built around
+ * LU-object and uses it to cache data like regular EA dnode, etc
+ */
+static int osd_find_parent_by_dnode(const struct lu_env *env,
+ struct dt_object *o,
+ struct lu_fid *fid)
+{
+ struct osd_device *osd = osd_obj2dev(osd_dt_obj(o));
+ sa_handle_t *sa_hdl;
+ uint64_t dnode = ZFS_NO_OBJECT;
+ int rc;
+ ENTRY;
+
+ /* first of all, get parent dnode from own attributes */
+ LASSERT(osd_dt_obj(o)->oo_db);
+ rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
+ NULL, SA_HDL_PRIVATE, &sa_hdl);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
+ sa_handle_destroy(sa_hdl);
+ if (rc == 0)
+ rc = osd_get_fid_by_oid(env, osd, dnode, fid);
+
RETURN(rc);
}
buf.lb_buf = osd_oti_get(env)->oti_buf;
buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
- rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
+ rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
if (rc == -ERANGE) {
- rc = osd_xattr_get(env, o, &LU_BUF_NULL,
- XATTR_NAME_LINK, BYPASS_CAPA);
+ rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
if (rc < 0)
RETURN(rc);
LASSERT(rc > 0);
if (buf.lb_buf == NULL)
RETURN(-ENOMEM);
buf.lb_len = rc;
- rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
+ rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
}
if (rc < 0)
GOTO(out, rc);
}
static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key,
- struct lustre_capa *capa)
+ struct dt_rec *rec, const struct dt_key *key)
{
struct osd_thread_info *oti = osd_oti_get(env);
struct osd_object *obj = osd_dt_obj(dt);
}
}
+ memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid));
rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
(char *)key, 8, sizeof(oti->oti_zde) / 8,
(void *)&oti->oti_zde);
- memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+ if (rc != 0)
+ RETURN(rc);
- RETURN(rc == 0 ? 1 : rc);
+ if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
+ memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+ RETURN(1);
+ }
+
+ rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode,
+ (struct lu_fid *)rec);
+
+ RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
}
static int osd_declare_dir_insert(const struct lu_env *env,
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_thandle *oh;
+ uint64_t object;
ENTRY;
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
- LASSERT(obj->oo_db);
- LASSERT(osd_object_is_zap(obj->oo_db));
+ /* This is for inserting dot/dotdot for new created dir. */
+ if (obj->oo_db == NULL)
+ object = DMU_NEW_OBJECT;
+ else
+ object = obj->oo_db->db_object;
- dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
- dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
+ dmu_tx_hold_bonus(oh->ot_tx, object);
+ dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
RETURN(0);
}
child = osd_obj(lo);
else
LU_OBJECT_DEBUG(D_ERROR, env, luch,
- "%s: object can't be located "DFID"\n",
+ "%s: object can't be located "DFID,
osd_dev(ludev)->od_svname, PFID(fid));
if (child == NULL) {
}
} else {
LU_OBJECT_DEBUG(D_ERROR, env, luch,
- "%s: lu_object does not exists "DFID"\n",
+ "%s: lu_object does not exists "DFID,
osd_dev(ludev)->od_svname, PFID(fid));
lu_object_put(env, luch);
child = ERR_PTR(-ENOENT);
}
static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
- obd_seq seq)
+ u64 seq)
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
rc = osd_fld_lookup(env, osd, seq, range);
if (rc != 0) {
- CERROR("%s: Can not lookup fld for "LPX64"\n",
- osd_name(osd), seq);
+ if (rc != -ENOENT)
+ CERROR("%s: Can not lookup fld for "LPX64"\n",
+ osd_name(osd), seq);
RETURN(0);
}
* \param key key for index
* \param rec record reference
* \param th transaction handler
- * \param capa capability descriptor
* \param ignore_quota update should not affect quota
*
* \retval 0 success
*/
static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *th, struct lustre_capa *capa,
- int ignore_quota)
+ struct thandle *th, int ignore_quota)
{
struct osd_thread_info *oti = osd_oti_get(env);
struct osd_object *parent = osd_dt_obj(dt);
* during iteration */
GOTO(out, rc = 0);
} else if (name[1] == '.' && name[2] == 0) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
+ struct lu_fid tfid = *fid;
+
+ osd_object_put(env, child);
+ tfid.f_oid--;
+ child = osd_object_find(env, dt, &tfid);
+ if (IS_ERR(child))
+ RETURN(PTR_ERR(child));
+
+ LASSERT(child->oo_db);
+ }
+
/* update parent dnode in the child.
* later it will be used to generate ".." */
rc = osd_object_sa_update(parent,
SA_ZPL_PARENT(osd),
&child->oo_db->db_object,
8, oh);
+
GOTO(out, rc);
}
}
rc = -zap_add(osd->od_os, parent->oo_db->db_object,
(char *)key, 8, sizeof(oti->oti_zde) / 8,
(void *)&oti->oti_zde, oh->ot_tx);
+ if (unlikely(rc == -EEXIST &&
+ name[0] == '.' && name[1] == '.' && name[2] == 0))
+ /* Update (key,oid) in ZAP */
+ rc = -zap_update(osd->od_os, parent->oo_db->db_object,
+ (char *)key, 8, sizeof(oti->oti_zde) / 8,
+ (void *)&oti->oti_zde, oh->ot_tx);
out:
if (child != NULL)
const struct dt_key *key,
struct thandle *th)
{
- struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_object *obj = osd_dt_obj(dt);
struct osd_thandle *oh;
+ uint64_t dnode;
ENTRY;
LASSERT(dt_object_exists(dt));
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
- LASSERT(obj->oo_db);
- LASSERT(osd_object_is_zap(obj->oo_db));
-
- dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
+ if (dt_object_exists(dt)) {
+ LASSERT(obj->oo_db);
+ LASSERT(osd_object_is_zap(obj->oo_db));
+ dnode = obj->oo_db->db_object;
+ } else {
+ dnode = DMU_NEW_OBJECT;
+ }
+ dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key);
RETURN(0);
}
static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
- const struct dt_key *key, struct thandle *th,
- struct lustre_capa *capa)
+ const struct dt_key *key, struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
int rc;
ENTRY;
- LASSERT(obj->oo_db);
- LASSERT(osd_object_is_zap(obj->oo_db));
+ LASSERT(zap_db);
+ LASSERT(osd_object_is_zap(zap_db));
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
static struct dt_it *osd_dir_it_init(const struct lu_env *env,
struct dt_object *dt,
- __u32 unused,
- struct lustre_capa *capa)
+ __u32 unused)
{
struct osd_zap_it *it;
- it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
+ it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
if (!IS_ERR(it))
it->ozi_pos = 0;
zap_attribute_t *za = &osd_oti_get(env)->oti_za;
int rc;
+ ENTRY;
+
/* temp. storage should be enough for any key supported by ZFS */
CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
it->ozi_pos++;
if (it->ozi_pos <=2)
RETURN(0);
- }
- zap_cursor_advance(it->ozi_zc);
+ } else {
+ zap_cursor_advance(it->ozi_zc);
+ }
/*
* According to current API we need to return error if its last entry.
osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
- /*
- * early Orion code was not setting LinkEA, so it's possible
- * some setups still have objects with no LinkEA set.
- * but at that time .. was a real record in the directory
- * so we should try to lookup .. in ZAP
- */
- if (rc != -ENOENT)
- GOTO(out, rc);
+
+ /* ENOENT happens at the root of filesystem so ignore it */
+ if (rc == -ENOENT)
+ rc = 0;
+ GOTO(out, rc);
}
LASSERT(lde);
{
struct osd_zap_it *it = (struct osd_zap_it *)di;
zap_attribute_t *za = &osd_oti_get(env)->oti_za;
- int rc, namelen = 0;
+ size_t namelen = 0;
+ int rc;
ENTRY;
if (it->ozi_pos <= 1)
RETURN(rc);
}
-static struct dt_index_operations osd_dir_ops = {
+struct dt_index_operations osd_dir_ops = {
.dio_lookup = osd_dir_lookup,
.dio_declare_insert = osd_declare_dir_insert,
.dio_insert = osd_dir_insert,
}
static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key,
- struct lustre_capa *capa)
+ struct dt_rec *rec, const struct dt_key *key)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *th, struct lustre_capa *capa,
- int ignore_quota)
+ struct thandle *th, int ignore_quota)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
}
static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
- const struct dt_key *key, struct thandle *th,
- struct lustre_capa *capa)
+ const struct dt_key *key, struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
};
static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
- struct dt_object *dt, __u32 attr,
- struct lustre_capa *capa)
+ struct dt_object *dt, __u32 attr)
{
struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
struct osd_metadnode_it *it;
const struct dt_index_features *feat)
{
struct osd_object *obj = osd_dt_obj(dt);
+ int rc = 0;
ENTRY;
- LASSERT(dt_object_exists(dt));
+ down_read(&obj->oo_guard);
/*
* XXX: implement support for fixed-size keys sorted with natural
* numerical way (not using internal hash value)
*/
if (feat->dif_flags & DT_IND_RANGE)
- RETURN(-ERANGE);
+ GOTO(out, rc = -ERANGE);
if (unlikely(feat == &dt_otable_features)) {
dt->do_index_ops = &osd_zfs_otable_ops;
- RETURN(0);
+ GOTO(out, rc = 0);
}
- LASSERT(obj->oo_db != NULL);
+ LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL);
if (likely(feat == &dt_directory_features)) {
- if (osd_object_is_zap(obj->oo_db))
+ if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db))
dt->do_index_ops = &osd_dir_ops;
else
- RETURN(-ENOTDIR);
+ GOTO(out, rc = -ENOTDIR);
} else if (unlikely(feat == &dt_acct_features)) {
LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
dt->do_index_ops = &osd_acct_index_ops;
- } else if (osd_object_is_zap(obj->oo_db) &&
- dt->do_index_ops == NULL) {
+ } else if (dt->do_index_ops == NULL) {
/* For index file, we don't support variable key & record sizes
* and the key has to be unique */
if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
- RETURN(-EINVAL);
+ GOTO(out, rc = -EINVAL);
if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
- RETURN(-E2BIG);
+ GOTO(out, rc = -E2BIG);
if (feat->dif_keysize_max != feat->dif_keysize_min)
- RETURN(-EINVAL);
+ GOTO(out, rc = -EINVAL);
/* As for the record size, it should be a multiple of 8 bytes
* and smaller than the maximum value length supported by ZAP.
*/
if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
- RETURN(-E2BIG);
+ GOTO(out, rc = -E2BIG);
if (feat->dif_recsize_max != feat->dif_recsize_min)
- RETURN(-EINVAL);
+ GOTO(out, rc = -EINVAL);
obj->oo_keysize = feat->dif_keysize_max;
obj->oo_recsize = feat->dif_recsize_max;
dt->do_index_ops = &osd_index_ops;
}
- RETURN(0);
+out:
+ up_read(&obj->oo_guard);
+
+ RETURN(rc);
}