Whamcloud - gitweb
LU-4315 docs: Fix Makefile.am to have one man page per line
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
index 261ac90..f0b588c 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -160,9 +156,11 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
        int                      rc;
        ENTRY;
 
+       if (obj->oo_destroyed)
+               RETURN(ERR_PTR(-ENOENT));
+
        LASSERT(lu_object_exists(lo));
        LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
        LASSERT(info);
 
        OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
@@ -225,90 +223,121 @@ static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
        ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
 }
 
-/*
- * as we don't know FID, we can't use LU object, so this function
- * partially duplicate __osd_xattr_get() which is built around
- * LU-object and uses it to cache data like regular EA dnode, etc
+/**
+ * Get the object's FID from its LMA EA.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] osd      pointer to the OSD device
+ * \param[in] oid      the object's local identifier
+ * \param[out] fid     the buffer to hold the object's FID
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
  */
-static int osd_find_parent_by_dnode(const struct lu_env *env,
-                                   struct dt_object *o,
-                                   struct lu_fid *fid)
+static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+                             uint64_t oid, struct lu_fid *fid)
 {
-       struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
-       struct lustre_mdt_attrs *lma;
+       struct objset           *os       = osd->od_os;
+       struct osd_thread_info  *oti      = osd_oti_get(env);
+       struct lustre_mdt_attrs *lma      =
+                       (struct lustre_mdt_attrs *)oti->oti_buf;
        struct lu_buf            buf;
-       sa_handle_t             *sa_hdl;
-       nvlist_t                *nvbuf = NULL;
-       uchar_t                 *value;
-       uint64_t                 dnode;
-       int                      rc, size;
+       nvlist_t                *sa_xattr = NULL;
+       sa_handle_t             *sa_hdl   = NULL;
+       uchar_t                 *nv_value = NULL;
+       uint64_t                 xattr    = ZFS_NO_OBJECT;
+       int                      size     = 0;
+       int                      rc;
        ENTRY;
 
-       /* first of all, get parent dnode from own attributes */
-       LASSERT(osd_dt_obj(o)->oo_db);
-       rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
-                           NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
-               RETURN(rc);
+       rc = __osd_xattr_load(osd, oid, &sa_xattr);
+       if (rc == -ENOENT)
+               goto regular;
 
-       dnode = ZFS_NO_OBJECT;
-       rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
-       sa_handle_destroy(sa_hdl);
-       if (rc)
-               RETURN(rc);
+       if (rc != 0)
+               GOTO(out, rc);
 
-       /* now get EA buffer */
-       rc = __osd_xattr_load(osd, dnode, &nvbuf);
-       if (rc)
-               GOTO(regular, rc);
+       rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value,
+                                      &size);
+       if (rc == -ENOENT)
+               goto regular;
+
+       if (rc != 0)
+               GOTO(out, rc);
 
-       /* XXX: if we get that far.. should we cache the result? */
+       if (unlikely(size > sizeof(oti->oti_buf)))
+               GOTO(out, rc = -ERANGE);
 
-       /* try to find LMA attribute */
-       LASSERT(nvbuf != NULL);
-       rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
-       if (rc == 0 && size >= sizeof(*lma)) {
-               lma = (struct lustre_mdt_attrs *)value;
-               lustre_lma_swab(lma);
-               *fid = lma->lma_self_fid;
-               GOTO(out, rc = 0);
-       }
+       memcpy(lma, nv_value, size);
 
-regular:
-       /* no LMA attribute in SA, let's try regular EA */
+       goto found;
 
-       /* first of all, get parent dnode storing regular EA */
-       rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
+regular:
+       rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc != 0)
                GOTO(out, rc);
 
-       dnode = ZFS_NO_OBJECT;
-       rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
+       rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8);
        sa_handle_destroy(sa_hdl);
-       if (rc)
+       if (rc != 0)
                GOTO(out, rc);
 
-       CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
-       buf.lb_buf = osd_oti_get(env)->oti_buf;
-       buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
-
-       /* now try to find LMA */
-       rc = __osd_xattr_get_large(env, osd, dnode, &buf,
+       buf.lb_buf = lma;
+       buf.lb_len = sizeof(oti->oti_buf);
+       rc = __osd_xattr_get_large(env, osd, xattr, &buf,
                                   XATTR_NAME_LMA, &size);
-       if (rc == 0 && size >= sizeof(*lma)) {
-               lma = buf.lb_buf;
-               lustre_lma_swab(lma);
-               *fid = lma->lma_self_fid;
-               GOTO(out, rc = 0);
-       } else if (rc < 0) {
+       if (rc != 0)
                GOTO(out, rc);
-       } else {
+
+found:
+       if (size < sizeof(*lma))
                GOTO(out, rc = -EIO);
+
+       lustre_lma_swab(lma);
+       if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
+                    CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
+               CWARN("%s: unsupported incompat LMA feature(s) %#x for "
+                     "oid = %#llx\n", osd->od_svname,
+                     lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid);
+               GOTO(out, rc = -EOPNOTSUPP);
+       } else {
+               *fid = lma->lma_self_fid;
+               GOTO(out, rc = 0);
        }
 
 out:
-       if (nvbuf != NULL)
-               nvlist_free(nvbuf);
+       if (sa_xattr != NULL)
+               nvlist_free(sa_xattr);
+       return rc;
+}
+
+/*
+ * As we don't know FID, we can't use LU object, so this function
+ * partially duplicate __osd_xattr_get() which is built around
+ * LU-object and uses it to cache data like regular EA dnode, etc
+ */
+static int osd_find_parent_by_dnode(const struct lu_env *env,
+                                   struct dt_object *o,
+                                   struct lu_fid *fid)
+{
+       struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
+       sa_handle_t             *sa_hdl;
+       uint64_t                 dnode = ZFS_NO_OBJECT;
+       int                      rc;
+       ENTRY;
+
+       /* first of all, get parent dnode from own attributes */
+       LASSERT(osd_dt_obj(o)->oo_db);
+       rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
+                           NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc != 0)
+               RETURN(rc);
+
+       rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
+       sa_handle_destroy(sa_hdl);
+       if (rc == 0)
+               rc = osd_get_fid_by_oid(env, osd, dnode, fid);
+
        RETURN(rc);
 }
 
@@ -391,8 +420,6 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
        int                 rc;
        ENTRY;
 
-       LASSERT(osd_object_is_zap(obj->oo_db));
-
        if (name[0] == '.') {
                if (name[1] == 0) {
                        const struct lu_fid *f = lu_object_fid(&dt->do_lu);
@@ -404,12 +431,22 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
                }
        }
 
+       memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid));
        rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
                         (char *)key, 8, sizeof(oti->oti_zde) / 8,
                         (void *)&oti->oti_zde);
-       memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+       if (rc != 0)
+               RETURN(rc);
 
-       RETURN(rc == 0 ? 1 : rc);
+       if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
+               memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+               RETURN(1);
+       }
+
+       rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode,
+                               (struct lu_fid *)rec);
+
+       RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
 }
 
 static int osd_declare_dir_insert(const struct lu_env *env,
@@ -432,8 +469,10 @@ static int osd_declare_dir_insert(const struct lu_env *env,
        else
                object = obj->oo_db->db_object;
 
-       dmu_tx_hold_bonus(oh->ot_tx, object);
-       dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before insertion */
+       dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL);
 
        RETURN(0);
 }
@@ -475,7 +514,7 @@ struct osd_object *osd_object_find(const struct lu_env *env,
                        child = osd_obj(lo);
                else
                        LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                                       "%s: object can't be located "DFID"\n",
+                                       "%s: object can't be located "DFID,
                                        osd_dev(ludev)->od_svname, PFID(fid));
 
                if (child == NULL) {
@@ -486,7 +525,7 @@ struct osd_object *osd_object_find(const struct lu_env *env,
                }
        } else {
                LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                               "%s: lu_object does not exists "DFID"\n",
+                               "%s: lu_object does not exists "DFID,
                                osd_dev(ludev)->od_svname, PFID(fid));
                lu_object_put(env, luch);
                child = ERR_PTR(-ENOENT);
@@ -520,7 +559,7 @@ static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
        rc = osd_fld_lookup(env, osd, seq, range);
        if (rc != 0) {
                if (rc != -ENOENT)
-                       CERROR("%s: Can not lookup fld for "LPX64"\n",
+                       CERROR("%s: Can not lookup fld for %#llx\n",
                               osd_name(osd), seq);
                RETURN(0);
        }
@@ -580,7 +619,6 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(parent->oo_db);
-       LASSERT(osd_object_is_zap(parent->oo_db));
 
        LASSERT(dt_object_exists(dt));
        LASSERT(osd_invariant(parent));
@@ -669,8 +707,9 @@ static int osd_declare_dir_delete(const struct lu_env *env,
                                  const struct dt_key *key,
                                  struct thandle *th)
 {
-       struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_thandle *oh;
+       uint64_t            dnode;
        ENTRY;
 
        LASSERT(dt_object_exists(dt));
@@ -679,10 +718,17 @@ static int osd_declare_dir_delete(const struct lu_env *env,
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
 
-       LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
+       if (dt_object_exists(dt)) {
+               LASSERT(obj->oo_db);
+               dnode = obj->oo_db->db_object;
+       } else {
+               dnode = DMU_NEW_OBJECT;
+       }
 
-       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before deletion */
+       dmu_tx_hold_zap(oh->ot_tx, dnode, FALSE, NULL);
 
        RETURN(0);
 }
@@ -699,7 +745,6 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(zap_db);
-       LASSERT(osd_object_is_zap(zap_db));
 
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
@@ -1163,9 +1208,9 @@ static int osd_declare_index_insert(const struct lu_env *env,
 
        dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
 
-       /* It is not clear what API should be used for binary keys, so we pass
-        * a null name which has the side effect of over-reserving space,
-        * accounting for the worst case. See zap_count_write() */
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before insertion */
        dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
 
        RETURN(0);
@@ -1213,7 +1258,11 @@ static int osd_declare_index_delete(const struct lu_env *env,
        LASSERT(obj->oo_db);
 
        oh = container_of0(th, struct osd_thandle, ot_super);
-       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
+
+       /* do not specify the key as then DMU is trying to look it up
+        * which is very expensive. usually the layers above lookup
+        * before deletion */
+       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, FALSE, NULL);
 
        RETURN(0);
 }
@@ -1255,7 +1304,7 @@ static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
         * XXX: we need a binary version of zap_cursor_move_to_key()
         *      to implement this API */
        if (*((const __u64 *)key) != 0)
-               CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
+               CERROR("NOT IMPLEMETED YET (move to %#llx)\n",
                       *((__u64 *)key));
 
        zap_cursor_fini(it->ozi_zc);
@@ -1466,10 +1515,8 @@ static void osd_zfs_otable_prefetch(const struct lu_env *env,
                if (unlikely(rc != 0))
                        break;
 
-               /* dmu_prefetch() was exported in 0.6.2, if you use with
-                * an older release, just comment it out - this is an
-                * optimization */
-               dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
+               osd_dmu_prefetch(dev->od_os, it->mit_prefetched_dnode,
+                                0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
 
                it->mit_prefetched++;
        }
@@ -1606,50 +1653,50 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                const struct dt_index_features *feat)
 {
        struct osd_object *obj = osd_dt_obj(dt);
+       int rc = 0;
        ENTRY;
 
-       LASSERT(dt_object_exists(dt));
+       down_read(&obj->oo_guard);
 
        /*
         * XXX: implement support for fixed-size keys sorted with natural
         *      numerical way (not using internal hash value)
         */
        if (feat->dif_flags & DT_IND_RANGE)
-               RETURN(-ERANGE);
+               GOTO(out, rc = -ERANGE);
 
        if (unlikely(feat == &dt_otable_features)) {
                dt->do_index_ops = &osd_zfs_otable_ops;
-               RETURN(0);
+               GOTO(out, rc = 0);
        }
 
-       LASSERT(obj->oo_db != NULL);
+       LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL);
        if (likely(feat == &dt_directory_features)) {
-               if (osd_object_is_zap(obj->oo_db))
+               if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db))
                        dt->do_index_ops = &osd_dir_ops;
                else
-                       RETURN(-ENOTDIR);
+                       GOTO(out, rc = -ENOTDIR);
        } else if (unlikely(feat == &dt_acct_features)) {
                LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
                dt->do_index_ops = &osd_acct_index_ops;
-       } else if (osd_object_is_zap(obj->oo_db) &&
-                  dt->do_index_ops == NULL) {
+       } else if (dt->do_index_ops == NULL) {
                /* For index file, we don't support variable key & record sizes
                 * and the key has to be unique */
                if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
-                       RETURN(-E2BIG);
+                       GOTO(out, rc = -E2BIG);
                if (feat->dif_keysize_max != feat->dif_keysize_min)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                /* As for the record size, it should be a multiple of 8 bytes
                 * and smaller than the maximum value length supported by ZAP.
                 */
                if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
-                       RETURN(-E2BIG);
+                       GOTO(out, rc = -E2BIG);
                if (feat->dif_recsize_max != feat->dif_recsize_min)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                obj->oo_keysize = feat->dif_keysize_max;
                obj->oo_recsize = feat->dif_recsize_max;
@@ -1663,5 +1710,8 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                dt->do_index_ops = &osd_index_ops;
        }
 
-       RETURN(0);
+out:
+       up_read(&obj->oo_guard);
+
+       RETURN(rc);
 }