Whamcloud - gitweb
LU-7117 osp: set ptlrpc_request::rq_allow_replay properly
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
index b28f65f..8c1ffbf 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -151,8 +151,7 @@ static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
 
 static struct dt_it *osd_index_it_init(const struct lu_env *env,
                                       struct dt_object *dt,
-                                      __u32 unused,
-                                      struct lustre_capa *capa)
+                                      __u32 unused)
 {
        struct osd_thread_info  *info = osd_oti_get(env);
        struct osd_zap_it       *it;
@@ -161,7 +160,8 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
        int                      rc;
        ENTRY;
 
-       /* XXX: check capa ? */
+       if (obj->oo_destroyed)
+               RETURN(ERR_PTR(-ENOENT));
 
        LASSERT(lu_object_exists(lo));
        LASSERT(obj->oo_db);
@@ -179,7 +179,6 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
        }
 
        it->ozi_obj   = obj;
-       it->ozi_capa  = capa;
        it->ozi_reset = 1;
        lu_object_get(lo);
 
@@ -229,90 +228,121 @@ static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
        ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
 }
 
-/*
- * as we don't know FID, we can't use LU object, so this function
- * partially duplicate __osd_xattr_get() which is built around
- * LU-object and uses it to cache data like regular EA dnode, etc
+/**
+ * Get the object's FID from its LMA EA.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] osd      pointer to the OSD device
+ * \param[in] oid      the object's local identifier
+ * \param[out] fid     the buffer to hold the object's FID
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
  */
-static int osd_find_parent_by_dnode(const struct lu_env *env,
-                                   struct dt_object *o,
-                                   struct lu_fid *fid)
+static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+                             uint64_t oid, struct lu_fid *fid)
 {
-       struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
-       struct lustre_mdt_attrs *lma;
+       struct objset           *os       = osd->od_os;
+       struct osd_thread_info  *oti      = osd_oti_get(env);
+       struct lustre_mdt_attrs *lma      =
+                       (struct lustre_mdt_attrs *)oti->oti_buf;
        struct lu_buf            buf;
-       sa_handle_t             *sa_hdl;
-       nvlist_t                *nvbuf = NULL;
-       uchar_t                 *value;
-       uint64_t                 dnode;
-       int                      rc, size;
+       nvlist_t                *sa_xattr = NULL;
+       sa_handle_t             *sa_hdl   = NULL;
+       uchar_t                 *nv_value = NULL;
+       uint64_t                 xattr    = ZFS_NO_OBJECT;
+       int                      size     = 0;
+       int                      rc;
        ENTRY;
 
-       /* first of all, get parent dnode from own attributes */
-       LASSERT(osd_dt_obj(o)->oo_db);
-       rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
-                           NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
-               RETURN(rc);
+       rc = __osd_xattr_load(osd, oid, &sa_xattr);
+       if (rc == -ENOENT)
+               goto regular;
 
-       dnode = ZFS_NO_OBJECT;
-       rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
-       sa_handle_destroy(sa_hdl);
-       if (rc)
-               RETURN(rc);
+       if (rc != 0)
+               GOTO(out, rc);
 
-       /* now get EA buffer */
-       rc = __osd_xattr_load(osd, dnode, &nvbuf);
-       if (rc)
-               GOTO(regular, rc);
+       rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value,
+                                      &size);
+       if (rc == -ENOENT)
+               goto regular;
 
-       /* XXX: if we get that far.. should we cache the result? */
+       if (rc != 0)
+               GOTO(out, rc);
 
-       /* try to find LMA attribute */
-       LASSERT(nvbuf != NULL);
-       rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
-       if (rc == 0 && size >= sizeof(*lma)) {
-               lma = (struct lustre_mdt_attrs *)value;
-               lustre_lma_swab(lma);
-               *fid = lma->lma_self_fid;
-               GOTO(out, rc = 0);
-       }
+       if (unlikely(size > sizeof(oti->oti_buf)))
+               GOTO(out, rc = -ERANGE);
 
-regular:
-       /* no LMA attribute in SA, let's try regular EA */
+       memcpy(lma, nv_value, size);
 
-       /* first of all, get parent dnode storing regular EA */
-       rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
+       goto found;
+
+regular:
+       rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc != 0)
                GOTO(out, rc);
 
-       dnode = ZFS_NO_OBJECT;
-       rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
+       rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8);
        sa_handle_destroy(sa_hdl);
-       if (rc)
+       if (rc != 0)
                GOTO(out, rc);
 
-       CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
-       buf.lb_buf = osd_oti_get(env)->oti_buf;
-       buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
-
-       /* now try to find LMA */
-       rc = __osd_xattr_get_large(env, osd, dnode, &buf,
+       buf.lb_buf = lma;
+       buf.lb_len = sizeof(oti->oti_buf);
+       rc = __osd_xattr_get_large(env, osd, xattr, &buf,
                                   XATTR_NAME_LMA, &size);
-       if (rc == 0 && size >= sizeof(*lma)) {
-               lma = buf.lb_buf;
-               lustre_lma_swab(lma);
-               *fid = lma->lma_self_fid;
-               GOTO(out, rc = 0);
-       } else if (rc < 0) {
+       if (rc != 0)
                GOTO(out, rc);
-       } else {
+
+found:
+       if (size < sizeof(*lma))
                GOTO(out, rc = -EIO);
+
+       lustre_lma_swab(lma);
+       if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
+                    CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
+               CWARN("%s: unsupported incompat LMA feature(s) %#x for "
+                     "oid = "LPX64"\n", osd->od_svname,
+                     lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid);
+               GOTO(out, rc = -EOPNOTSUPP);
+       } else {
+               *fid = lma->lma_self_fid;
+               GOTO(out, rc = 0);
        }
 
 out:
-       if (nvbuf != NULL)
-               nvlist_free(nvbuf);
+       if (sa_xattr != NULL)
+               nvlist_free(sa_xattr);
+       return rc;
+}
+
+/*
+ * As we don't know FID, we can't use LU object, so this function
+ * partially duplicate __osd_xattr_get() which is built around
+ * LU-object and uses it to cache data like regular EA dnode, etc
+ */
+static int osd_find_parent_by_dnode(const struct lu_env *env,
+                                   struct dt_object *o,
+                                   struct lu_fid *fid)
+{
+       struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
+       sa_handle_t             *sa_hdl;
+       uint64_t                 dnode = ZFS_NO_OBJECT;
+       int                      rc;
+       ENTRY;
+
+       /* first of all, get parent dnode from own attributes */
+       LASSERT(osd_dt_obj(o)->oo_db);
+       rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
+                           NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc != 0)
+               RETURN(rc);
+
+       rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
+       sa_handle_destroy(sa_hdl);
+       if (rc == 0)
+               rc = osd_get_fid_by_oid(env, osd, dnode, fid);
+
        RETURN(rc);
 }
 
@@ -328,10 +358,9 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
        buf.lb_buf = osd_oti_get(env)->oti_buf;
        buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
 
-       rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
+       rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
        if (rc == -ERANGE) {
-               rc = osd_xattr_get(env, o, &LU_BUF_NULL,
-                                  XATTR_NAME_LINK, BYPASS_CAPA);
+               rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
                if (rc < 0)
                        RETURN(rc);
                LASSERT(rc > 0);
@@ -339,7 +368,7 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
                if (buf.lb_buf == NULL)
                        RETURN(-ENOMEM);
                buf.lb_len = rc;
-               rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
+               rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
        }
        if (rc < 0)
                GOTO(out, rc);
@@ -387,8 +416,7 @@ out:
 }
 
 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
-                         struct dt_rec *rec, const struct dt_key *key,
-                         struct lustre_capa *capa)
+                         struct dt_rec *rec, const struct dt_key *key)
 {
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object  *obj = osd_dt_obj(dt);
@@ -410,12 +438,22 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
                }
        }
 
+       memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid));
        rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
                         (char *)key, 8, sizeof(oti->oti_zde) / 8,
                         (void *)&oti->oti_zde);
-       memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+       if (rc != 0)
+               RETURN(rc);
 
-       RETURN(rc == 0 ? 1 : rc);
+       if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
+               memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+               RETURN(1);
+       }
+
+       rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode,
+                               (struct lu_fid *)rec);
+
+       RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
 }
 
 static int osd_declare_dir_insert(const struct lu_env *env,
@@ -481,7 +519,7 @@ struct osd_object *osd_object_find(const struct lu_env *env,
                        child = osd_obj(lo);
                else
                        LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                                       "%s: object can't be located "DFID"\n",
+                                       "%s: object can't be located "DFID,
                                        osd_dev(ludev)->od_svname, PFID(fid));
 
                if (child == NULL) {
@@ -492,7 +530,7 @@ struct osd_object *osd_object_find(const struct lu_env *env,
                }
        } else {
                LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                               "%s: lu_object does not exists "DFID"\n",
+                               "%s: lu_object does not exists "DFID,
                                osd_dev(ludev)->od_svname, PFID(fid));
                lu_object_put(env, luch);
                child = ERR_PTR(-ENOENT);
@@ -525,8 +563,9 @@ static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
 
        rc = osd_fld_lookup(env, osd, seq, range);
        if (rc != 0) {
-               CERROR("%s: Can not lookup fld for "LPX64"\n",
-                      osd_name(osd), seq);
+               if (rc != -ENOENT)
+                       CERROR("%s: Can not lookup fld for "LPX64"\n",
+                              osd_name(osd), seq);
                RETURN(0);
        }
 
@@ -563,7 +602,6 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
  *      \param  key     key for index
  *      \param  rec     record reference
  *      \param  th      transaction handler
- *      \param  capa    capability descriptor
  *      \param  ignore_quota update should not affect quota
  *
  *      \retval  0  success
@@ -571,8 +609,7 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
  */
 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                          const struct dt_rec *rec, const struct dt_key *key,
-                         struct thandle *th, struct lustre_capa *capa,
-                         int ignore_quota)
+                         struct thandle *th, int ignore_quota)
 {
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object   *parent = osd_dt_obj(dt);
@@ -676,8 +713,9 @@ static int osd_declare_dir_delete(const struct lu_env *env,
                                  const struct dt_key *key,
                                  struct thandle *th)
 {
-       struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_thandle *oh;
+       uint64_t            dnode;
        ENTRY;
 
        LASSERT(dt_object_exists(dt));
@@ -686,17 +724,20 @@ static int osd_declare_dir_delete(const struct lu_env *env,
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
 
-       LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
-
-       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
+       if (dt_object_exists(dt)) {
+               LASSERT(obj->oo_db);
+               LASSERT(osd_object_is_zap(obj->oo_db));
+               dnode = obj->oo_db->db_object;
+       } else {
+               dnode = DMU_NEW_OBJECT;
+       }
+       dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key);
 
        RETURN(0);
 }
 
 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
-                         const struct dt_key *key, struct thandle *th,
-                         struct lustre_capa *capa)
+                         const struct dt_key *key, struct thandle *th)
 {
        struct osd_object *obj = osd_dt_obj(dt);
        struct osd_device *osd = osd_obj2dev(obj);
@@ -706,8 +747,8 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
        int rc;
        ENTRY;
 
-       LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
+       LASSERT(zap_db);
+       LASSERT(osd_object_is_zap(zap_db));
 
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
@@ -736,12 +777,11 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
 
 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
                                     struct dt_object *dt,
-                                    __u32 unused,
-                                    struct lustre_capa *capa)
+                                    __u32 unused)
 {
        struct osd_zap_it *it;
 
-       it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
+       it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
        if (!IS_ERR(it))
                it->ozi_pos = 0;
 
@@ -1139,8 +1179,7 @@ static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
 }
 
 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
-                       struct dt_rec *rec, const struct dt_key *key,
-                       struct lustre_capa *capa)
+                       struct dt_rec *rec, const struct dt_key *key)
 {
        struct osd_object *obj = osd_dt_obj(dt);
        struct osd_device *osd = osd_obj2dev(obj);
@@ -1183,8 +1222,7 @@ static int osd_declare_index_insert(const struct lu_env *env,
 
 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
                            const struct dt_rec *rec, const struct dt_key *key,
-                           struct thandle *th, struct lustre_capa *capa,
-                           int ignore_quota)
+                           struct thandle *th, int ignore_quota)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_device  *osd = osd_obj2dev(obj);
@@ -1230,8 +1268,7 @@ static int osd_declare_index_delete(const struct lu_env *env,
 }
 
 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
-                           const struct dt_key *key, struct thandle *th,
-                           struct lustre_capa *capa)
+                           const struct dt_key *key, struct thandle *th)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_device  *osd = osd_obj2dev(obj);
@@ -1416,8 +1453,7 @@ struct osd_metadnode_it {
 };
 
 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
-                                           struct dt_object *dt, __u32 attr,
-                                           struct lustre_capa *capa)
+                                           struct dt_object *dt, __u32 attr)
 {
        struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
        struct osd_metadnode_it *it;
@@ -1479,10 +1515,8 @@ static void osd_zfs_otable_prefetch(const struct lu_env *env,
                if (unlikely(rc != 0))
                        break;
 
-               /* dmu_prefetch() was exported in 0.6.2, if you use with
-                * an older release, just comment it out - this is an
-                * optimization */
-               dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
+               osd_dmu_prefetch(dev->od_os, it->mit_prefetched_dnode,
+                                0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
 
                it->mit_prefetched++;
        }
@@ -1619,50 +1653,50 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                const struct dt_index_features *feat)
 {
        struct osd_object *obj = osd_dt_obj(dt);
+       int rc = 0;
        ENTRY;
 
-       LASSERT(dt_object_exists(dt));
+       down_read(&obj->oo_guard);
 
        /*
         * XXX: implement support for fixed-size keys sorted with natural
         *      numerical way (not using internal hash value)
         */
        if (feat->dif_flags & DT_IND_RANGE)
-               RETURN(-ERANGE);
+               GOTO(out, rc = -ERANGE);
 
        if (unlikely(feat == &dt_otable_features)) {
                dt->do_index_ops = &osd_zfs_otable_ops;
-               RETURN(0);
+               GOTO(out, rc = 0);
        }
 
-       LASSERT(obj->oo_db != NULL);
+       LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL);
        if (likely(feat == &dt_directory_features)) {
-               if (osd_object_is_zap(obj->oo_db))
+               if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db))
                        dt->do_index_ops = &osd_dir_ops;
                else
-                       RETURN(-ENOTDIR);
+                       GOTO(out, rc = -ENOTDIR);
        } else if (unlikely(feat == &dt_acct_features)) {
                LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
                dt->do_index_ops = &osd_acct_index_ops;
-       } else if (osd_object_is_zap(obj->oo_db) &&
-                  dt->do_index_ops == NULL) {
+       } else if (dt->do_index_ops == NULL) {
                /* For index file, we don't support variable key & record sizes
                 * and the key has to be unique */
                if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
-                       RETURN(-E2BIG);
+                       GOTO(out, rc = -E2BIG);
                if (feat->dif_keysize_max != feat->dif_keysize_min)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                /* As for the record size, it should be a multiple of 8 bytes
                 * and smaller than the maximum value length supported by ZAP.
                 */
                if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
-                       RETURN(-E2BIG);
+                       GOTO(out, rc = -E2BIG);
                if (feat->dif_recsize_max != feat->dif_recsize_min)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                obj->oo_keysize = feat->dif_keysize_max;
                obj->oo_recsize = feat->dif_recsize_max;
@@ -1676,5 +1710,8 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                dt->do_index_ops = &osd_index_ops;
        }
 
-       RETURN(0);
+out:
+       up_read(&obj->oo_guard);
+
+       RETURN(rc);
 }