*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
int rc;
ENTRY;
+ if (obj->oo_destroyed)
+ RETURN(ERR_PTR(-ENOENT));
+
LASSERT(lu_object_exists(lo));
LASSERT(obj->oo_db);
- LASSERT(osd_object_is_zap(obj->oo_db));
LASSERT(info);
OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
}
-/*
- * as we don't know FID, we can't use LU object, so this function
- * partially duplicate __osd_xattr_get() which is built around
- * LU-object and uses it to cache data like regular EA dnode, etc
+/**
+ * Get the object's FID from its LMA EA.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] osd pointer to the OSD device
+ * \param[in] oid the object's local identifier
+ * \param[out] fid the buffer to hold the object's FID
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
*/
-static int osd_find_parent_by_dnode(const struct lu_env *env,
- struct dt_object *o,
- struct lu_fid *fid)
+static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+ uint64_t oid, struct lu_fid *fid)
{
- struct osd_device *osd = osd_obj2dev(osd_dt_obj(o));
- struct lustre_mdt_attrs *lma;
+ struct objset *os = osd->od_os;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma =
+ (struct lustre_mdt_attrs *)oti->oti_buf;
struct lu_buf buf;
- sa_handle_t *sa_hdl;
- nvlist_t *nvbuf = NULL;
- uchar_t *value;
- uint64_t dnode;
- int rc, size;
+ nvlist_t *sa_xattr = NULL;
+ sa_handle_t *sa_hdl = NULL;
+ uchar_t *nv_value = NULL;
+ uint64_t xattr = ZFS_NO_OBJECT;
+ int size = 0;
+ int rc;
ENTRY;
- /* first of all, get parent dnode from own attributes */
- LASSERT(osd_dt_obj(o)->oo_db);
- rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
- NULL, SA_HDL_PRIVATE, &sa_hdl);
- if (rc)
- RETURN(rc);
+ rc = __osd_xattr_load(osd, oid, &sa_xattr);
+ if (rc == -ENOENT)
+ goto regular;
- dnode = ZFS_NO_OBJECT;
- rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
- sa_handle_destroy(sa_hdl);
- if (rc)
- RETURN(rc);
+ if (rc != 0)
+ GOTO(out, rc);
- /* now get EA buffer */
- rc = __osd_xattr_load(osd, dnode, &nvbuf);
- if (rc)
- GOTO(regular, rc);
+ rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value,
+ &size);
+ if (rc == -ENOENT)
+ goto regular;
+
+ if (rc != 0)
+ GOTO(out, rc);
- /* XXX: if we get that far.. should we cache the result? */
+ if (unlikely(size > sizeof(oti->oti_buf)))
+ GOTO(out, rc = -ERANGE);
- /* try to find LMA attribute */
- LASSERT(nvbuf != NULL);
- rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
- if (rc == 0 && size >= sizeof(*lma)) {
- lma = (struct lustre_mdt_attrs *)value;
- lustre_lma_swab(lma);
- *fid = lma->lma_self_fid;
- GOTO(out, rc = 0);
- }
+ memcpy(lma, nv_value, size);
-regular:
- /* no LMA attribute in SA, let's try regular EA */
+ goto found;
- /* first of all, get parent dnode storing regular EA */
- rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
- if (rc)
+regular:
+ rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
+ if (rc != 0)
GOTO(out, rc);
- dnode = ZFS_NO_OBJECT;
- rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
+ rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8);
sa_handle_destroy(sa_hdl);
- if (rc)
+ if (rc != 0)
GOTO(out, rc);
- CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
- buf.lb_buf = osd_oti_get(env)->oti_buf;
- buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
-
- /* now try to find LMA */
- rc = __osd_xattr_get_large(env, osd, dnode, &buf,
+ buf.lb_buf = lma;
+ buf.lb_len = sizeof(oti->oti_buf);
+ rc = __osd_xattr_get_large(env, osd, xattr, &buf,
XATTR_NAME_LMA, &size);
- if (rc == 0 && size >= sizeof(*lma)) {
- lma = buf.lb_buf;
- lustre_lma_swab(lma);
- *fid = lma->lma_self_fid;
- GOTO(out, rc = 0);
- } else if (rc < 0) {
+ if (rc != 0)
GOTO(out, rc);
- } else {
+
+found:
+ if (size < sizeof(*lma))
GOTO(out, rc = -EIO);
+
+ lustre_lma_swab(lma);
+ if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
+ CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
+ CWARN("%s: unsupported incompat LMA feature(s) %#x for "
+ "oid = %#llx\n", osd->od_svname,
+ lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid);
+ GOTO(out, rc = -EOPNOTSUPP);
+ } else {
+ *fid = lma->lma_self_fid;
+ GOTO(out, rc = 0);
}
out:
- if (nvbuf != NULL)
- nvlist_free(nvbuf);
+ if (sa_xattr != NULL)
+ nvlist_free(sa_xattr);
+ return rc;
+}
+
+/*
+ * As we don't know FID, we can't use LU object, so this function
+ * partially duplicate __osd_xattr_get() which is built around
+ * LU-object and uses it to cache data like regular EA dnode, etc
+ */
+static int osd_find_parent_by_dnode(const struct lu_env *env,
+ struct dt_object *o,
+ struct lu_fid *fid)
+{
+ struct osd_device *osd = osd_obj2dev(osd_dt_obj(o));
+ sa_handle_t *sa_hdl;
+ uint64_t dnode = ZFS_NO_OBJECT;
+ int rc;
+ ENTRY;
+
+ /* first of all, get parent dnode from own attributes */
+ LASSERT(osd_dt_obj(o)->oo_db);
+ rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
+ NULL, SA_HDL_PRIVATE, &sa_hdl);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
+ sa_handle_destroy(sa_hdl);
+ if (rc == 0)
+ rc = osd_get_fid_by_oid(env, osd, dnode, fid);
+
RETURN(rc);
}
int rc;
ENTRY;
- LASSERT(osd_object_is_zap(obj->oo_db));
-
if (name[0] == '.') {
if (name[1] == 0) {
const struct lu_fid *f = lu_object_fid(&dt->do_lu);
}
}
+ memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid));
rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
(char *)key, 8, sizeof(oti->oti_zde) / 8,
(void *)&oti->oti_zde);
- memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+ if (rc != 0)
+ RETURN(rc);
- RETURN(rc == 0 ? 1 : rc);
+ if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
+ memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+ RETURN(1);
+ }
+
+ rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode,
+ (struct lu_fid *)rec);
+
+ RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
}
static int osd_declare_dir_insert(const struct lu_env *env,
else
object = obj->oo_db->db_object;
- dmu_tx_hold_bonus(oh->ot_tx, object);
- dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
+ /* do not specify the key as then DMU is trying to look it up
+ * which is very expensive. usually the layers above lookup
+ * before insertion */
+ dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL);
RETURN(0);
}
child = osd_obj(lo);
else
LU_OBJECT_DEBUG(D_ERROR, env, luch,
- "%s: object can't be located "DFID"\n",
+ "%s: object can't be located "DFID,
osd_dev(ludev)->od_svname, PFID(fid));
if (child == NULL) {
}
} else {
LU_OBJECT_DEBUG(D_ERROR, env, luch,
- "%s: lu_object does not exists "DFID"\n",
+ "%s: lu_object does not exists "DFID,
osd_dev(ludev)->od_svname, PFID(fid));
lu_object_put(env, luch);
child = ERR_PTR(-ENOENT);
rc = osd_fld_lookup(env, osd, seq, range);
if (rc != 0) {
if (rc != -ENOENT)
- CERROR("%s: Can not lookup fld for "LPX64"\n",
+ CERROR("%s: Can not lookup fld for %#llx\n",
osd_name(osd), seq);
RETURN(0);
}
ENTRY;
LASSERT(parent->oo_db);
- LASSERT(osd_object_is_zap(parent->oo_db));
LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(parent));
const struct dt_key *key,
struct thandle *th)
{
- struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_object *obj = osd_dt_obj(dt);
struct osd_thandle *oh;
+ uint64_t dnode;
ENTRY;
LASSERT(dt_object_exists(dt));
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
- LASSERT(obj->oo_db);
- LASSERT(osd_object_is_zap(obj->oo_db));
+ if (dt_object_exists(dt)) {
+ LASSERT(obj->oo_db);
+ dnode = obj->oo_db->db_object;
+ } else {
+ dnode = DMU_NEW_OBJECT;
+ }
- dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
+ /* do not specify the key as then DMU is trying to look it up
+ * which is very expensive. usually the layers above lookup
+ * before deletion */
+ dmu_tx_hold_zap(oh->ot_tx, dnode, FALSE, NULL);
RETURN(0);
}
ENTRY;
LASSERT(zap_db);
- LASSERT(osd_object_is_zap(zap_db));
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
- /* It is not clear what API should be used for binary keys, so we pass
- * a null name which has the side effect of over-reserving space,
- * accounting for the worst case. See zap_count_write() */
+ /* do not specify the key as then DMU is trying to look it up
+ * which is very expensive. usually the layers above lookup
+ * before insertion */
dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
RETURN(0);
LASSERT(obj->oo_db);
oh = container_of0(th, struct osd_thandle, ot_super);
- dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
+
+ /* do not specify the key as then DMU is trying to look it up
+ * which is very expensive. usually the layers above lookup
+ * before deletion */
+ dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, FALSE, NULL);
RETURN(0);
}
* XXX: we need a binary version of zap_cursor_move_to_key()
* to implement this API */
if (*((const __u64 *)key) != 0)
- CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
+ CERROR("NOT IMPLEMETED YET (move to %#llx)\n",
*((__u64 *)key));
zap_cursor_fini(it->ozi_zc);
if (unlikely(rc != 0))
break;
- /* dmu_prefetch() was exported in 0.6.2, if you use with
- * an older release, just comment it out - this is an
- * optimization */
- dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
+ osd_dmu_prefetch(dev->od_os, it->mit_prefetched_dnode,
+ 0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
it->mit_prefetched++;
}
const struct dt_index_features *feat)
{
struct osd_object *obj = osd_dt_obj(dt);
+ int rc = 0;
ENTRY;
- LASSERT(dt_object_exists(dt));
+ down_read(&obj->oo_guard);
/*
* XXX: implement support for fixed-size keys sorted with natural
* numerical way (not using internal hash value)
*/
if (feat->dif_flags & DT_IND_RANGE)
- RETURN(-ERANGE);
+ GOTO(out, rc = -ERANGE);
if (unlikely(feat == &dt_otable_features)) {
dt->do_index_ops = &osd_zfs_otable_ops;
- RETURN(0);
+ GOTO(out, rc = 0);
}
- LASSERT(obj->oo_db != NULL);
+ LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL);
if (likely(feat == &dt_directory_features)) {
- if (osd_object_is_zap(obj->oo_db))
+ if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db))
dt->do_index_ops = &osd_dir_ops;
else
- RETURN(-ENOTDIR);
+ GOTO(out, rc = -ENOTDIR);
} else if (unlikely(feat == &dt_acct_features)) {
LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
dt->do_index_ops = &osd_acct_index_ops;
- } else if (osd_object_is_zap(obj->oo_db) &&
- dt->do_index_ops == NULL) {
+ } else if (dt->do_index_ops == NULL) {
/* For index file, we don't support variable key & record sizes
* and the key has to be unique */
if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
- RETURN(-EINVAL);
+ GOTO(out, rc = -EINVAL);
if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
- RETURN(-E2BIG);
+ GOTO(out, rc = -E2BIG);
if (feat->dif_keysize_max != feat->dif_keysize_min)
- RETURN(-EINVAL);
+ GOTO(out, rc = -EINVAL);
/* As for the record size, it should be a multiple of 8 bytes
* and smaller than the maximum value length supported by ZAP.
*/
if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
- RETURN(-E2BIG);
+ GOTO(out, rc = -E2BIG);
if (feat->dif_recsize_max != feat->dif_recsize_min)
- RETURN(-EINVAL);
+ GOTO(out, rc = -EINVAL);
obj->oo_keysize = feat->dif_keysize_max;
obj->oo_recsize = feat->dif_recsize_max;
dt->do_index_ops = &osd_index_ops;
}
- RETURN(0);
+out:
+ up_read(&obj->oo_guard);
+
+ RETURN(rc);
}