Whamcloud - gitweb
LU-13170 osd: positive lookup shouldn't return an error
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
index 20f86f2..765099a 100644 (file)
 #include <sys/dsl_prop.h>
 #include <sys/sa_impl.h>
 #include <sys/txg.h>
-
-static inline int osd_object_is_zap(dnode_t *dn)
-{
-       return (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
-                       dn->dn_type == DMU_OT_USERGROUP_USED);
-}
+#include <lustre_scrub.h>
 
 /* We don't actually have direct access to the zap_hashbits() function
  * so just pretend like we do for now.  If this ever breaks we can look at
@@ -247,8 +242,8 @@ int __osd_xattr_load_by_oid(struct osd_device *osd, uint64_t oid, nvlist_t **sa)
  * \retval             0 for success
  * \retval             negative error number on failure
  */
-static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
-                             uint64_t oid, struct lu_fid *fid)
+int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+                      uint64_t oid, struct lu_fid *fid)
 {
        struct objset           *os       = osd->od_os;
        struct osd_thread_info  *oti      = osd_oti_get(env);
@@ -484,9 +479,12 @@ static int osd_check_lmv(const struct lu_env *env, struct osd_device *osd,
        if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
                GOTO(out_nvbuf, rc = 0);
 
-       if (rc || le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+       if (rc)
                GOTO(out_nvbuf, rc);
 
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+               GOTO(out_nvbuf, rc = -EINVAL);
+
        zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
        rc = -zap_cursor_retrieve(zc, za);
        if (rc == -ENOENT) {
@@ -566,15 +564,15 @@ osd_consistency_check(const struct lu_env *env, struct osd_device *osd,
                        RETURN(rc);
        }
 
-       if (thread_is_running(&scrub->os_thread)) {
+       if (scrub->os_running) {
                if (scrub->os_pos_current > oid)
                        RETURN(0);
        } else if (osd->od_auto_scrub_interval == AS_NEVER) {
                RETURN(0);
        } else {
-               if (cfs_time_before(cfs_time_current_sec(),
-                                   scrub->os_file.sf_time_last_complete +
-                                   osd->od_auto_scrub_interval))
+               if (ktime_get_real_seconds() <
+                   scrub->os_file.sf_time_last_complete +
+                   osd->od_auto_scrub_interval)
                        RETURN(0);
        }
 
@@ -598,7 +596,7 @@ again:
        insert = false;
 
 trigger:
-       if (thread_is_running(&scrub->os_thread)) {
+       if (scrub->os_running) {
                if (!dn) {
                        rc = __osd_obj2dnode(osd->od_os, oid, &dn);
                        /* The object has been removed (by race maybe). */
@@ -684,11 +682,17 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
 
 out:
        if (!rc && !osd_remote_fid(env, osd, fid)) {
-               rc = osd_consistency_check(env, osd, obj, fid, oid,
+               /*
+                * this should ask the scrubber to check OI given
+                * the mapping we just found in the dir entry.
+                * but result of that check should not affect
+                * result of the lookup in the directory.
+                * otherwise such a direntry becomes hidden
+                * from the layers above, including LFSCK which
+                * is supposed to fix dangling entries.
+                */
+               osd_consistency_check(env, osd, obj, fid, oid,
                                S_ISDIR(DTTOIF(oti->oti_zde.lzd_reg.zde_type)));
-               /* Only -ENOENT error will affect the lookup result. */
-               if (rc != -ENOENT)
-                       rc = 0;
        }
 
        return rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc);
@@ -903,7 +907,7 @@ static int osd_declare_dir_insert(const struct lu_env *env,
        LASSERT(rec1->rec_type != 0);
 
        LASSERT(th != NULL);
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        idc = osd_idc_find_or_init(env, osd, fid);
        if (IS_ERR(idc))
@@ -990,14 +994,13 @@ int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
  *      \param  key     key for index
  *      \param  rec     record reference
  *      \param  th      transaction handler
- *      \param  ignore_quota update should not affect quota
  *
  *      \retval  0  success
  *      \retval -ve failure
  */
 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                          const struct dt_rec *rec, const struct dt_key *key,
-                         struct thandle *th, int ignore_quota)
+                         struct thandle *th)
 {
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object   *parent = osd_dt_obj(dt);
@@ -1018,7 +1021,7 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        LASSERT(osd_invariant(parent));
 
        LASSERT(th != NULL);
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        idc = osd_idc_find(env, osd, fid);
        if (unlikely(idc == NULL)) {
@@ -1027,15 +1030,16 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                 * lookup FID in FLDB/OI and don't risk to deadlock,
                 * but in some special cases (lfsck testing, etc)
                 * it's much simpler than fixing a caller */
-               CERROR("%s: "DFID" wasn't declared for insert\n",
-                      osd_name(osd), PFID(fid));
                idc = osd_idc_find_or_init(env, osd, fid);
-               if (IS_ERR(idc))
+               if (IS_ERR(idc)) {
+                       CERROR("%s: "DFID" wasn't declared for insert\n",
+                              osd_name(osd), PFID(fid));
                        RETURN(PTR_ERR(idc));
+               }
        }
 
-       CLASSERT(sizeof(zde->lzd_reg) == 8);
-       CLASSERT(sizeof(*zde) % 8 == 0);
+       BUILD_BUG_ON(sizeof(zde->lzd_reg) != 8);
+       BUILD_BUG_ON(sizeof(*zde) % 8 != 0);
 
        memset(&zde->lzd_reg, 0, sizeof(zde->lzd_reg));
        zde->lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
@@ -1124,7 +1128,7 @@ static int osd_declare_dir_delete(const struct lu_env *env,
        LASSERT(zap_dn != NULL);
 
        LASSERT(th != NULL);
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        /*
         * In Orion . and .. were stored in the directory (not generated upon
@@ -1163,7 +1167,7 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
        LASSERT(zap_dn);
 
        LASSERT(th != NULL);
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        /*
         * In Orion . and .. were stored in the directory (not generated upon
@@ -1224,7 +1228,7 @@ static struct dt_it *osd_dir_it_init(const struct lu_env *env,
 
        it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
        if (!IS_ERR(it))
-               it->ozi_pos = 0;
+               it->ozi_pos = OZI_POS_INIT;
 
        RETURN((struct dt_it *)it);
 }
@@ -1259,22 +1263,22 @@ static int osd_dir_it_get(const struct lu_env *env,
        LASSERT(((const char *)key)[0] == 0);
 
        if (name[0] == 0) {
-               it->ozi_pos = 0;
+               it->ozi_pos = OZI_POS_INIT;
                RETURN(1);
        }
 
        if (name[0] == '.') {
                if (name[1] == 0) {
-                       it->ozi_pos = 1;
+                       it->ozi_pos = OZI_POS_DOT;
                        GOTO(out, rc = 1);
                } else if (name[1] == '.' && name[2] == 0) {
-                       it->ozi_pos = 2;
+                       it->ozi_pos = OZI_POS_DOTDOT;
                        GOTO(out, rc = 1);
                }
        }
 
        /* neither . nor .. - some real record */
-       it->ozi_pos = 3;
+       it->ozi_pos = OZI_POS_REAL;
        rc = +1;
 
 out:
@@ -1335,16 +1339,16 @@ static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
        ENTRY;
 
        /* temp. storage should be enough for any key supported by ZFS */
-       CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
+       BUILD_BUG_ON(sizeof(za->za_name) > sizeof(it->ozi_name));
 
        /*
         * the first ->next() moves the cursor to .
         * the second ->next() moves the cursor to ..
         * then we get to the real records and have to verify any exist
         */
-       if (it->ozi_pos <= 2) {
+       if (it->ozi_pos <= OZI_POS_DOTDOT) {
                it->ozi_pos++;
-               if (it->ozi_pos <=2)
+               if (it->ozi_pos <= OZI_POS_DOTDOT)
                        RETURN(0);
 
        } else {
@@ -1373,10 +1377,10 @@ static struct dt_key *osd_dir_it_key(const struct lu_env *env,
        int                rc = 0;
        ENTRY;
 
-       if (it->ozi_pos <= 1) {
-               it->ozi_pos = 1;
+       if (it->ozi_pos <= OZI_POS_DOT) {
+               it->ozi_pos = OZI_POS_DOT;
                RETURN((struct dt_key *)".");
-       } else if (it->ozi_pos == 2) {
+       } else if (it->ozi_pos == OZI_POS_DOTDOT) {
                RETURN((struct dt_key *)"..");
        }
 
@@ -1395,10 +1399,10 @@ static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
        int                rc;
        ENTRY;
 
-       if (it->ozi_pos <= 1) {
-               it->ozi_pos = 1;
+       if (it->ozi_pos <= OZI_POS_DOT) {
+               it->ozi_pos = OZI_POS_DOT;
                RETURN(2);
-       } else if (it->ozi_pos == 2) {
+       } else if (it->ozi_pos == OZI_POS_DOTDOT) {
                RETURN(3);
        }
 
@@ -1486,8 +1490,12 @@ static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
        ENTRY;
 
        lde->lde_attrs = 0;
-       if (it->ozi_pos <= 1) {
-               lde->lde_hash = cpu_to_le64(1);
+       if (it->ozi_pos <= OZI_POS_DOT) {
+               /* notice hash=0 here, this is needed to avoid
+                * case when some real entry (after ./..) may
+                * have hash=0. in this case the client would
+                * be confused having records out of hash order. */
+               lde->lde_hash = cpu_to_le64(0);
                strcpy(lde->lde_name, ".");
                lde->lde_namelen = cpu_to_le16(1);
                fid_cpu_to_le(&lde->lde_fid,
@@ -1496,10 +1504,11 @@ static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
                /* append lustre attributes */
                osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
                lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
-               it->ozi_pos = 1;
+               it->ozi_pos = OZI_POS_DOT;
                RETURN(0);
-       } else if (it->ozi_pos == 2) {
-               lde->lde_hash = cpu_to_le64(2);
+       } else if (it->ozi_pos == OZI_POS_DOTDOT) {
+               /* same as for . above */
+               lde->lde_hash = cpu_to_le64(0);
                strcpy(lde->lde_name, "..");
                lde->lde_namelen = cpu_to_le16(2);
                rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, fid, NULL);
@@ -1612,9 +1621,9 @@ static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
        int                  rc;
        ENTRY;
 
-       if (it->ozi_pos <= 1)
+       if (it->ozi_pos <= OZI_POS_DOT)
                namelen = 1;
-       else if (it->ozi_pos == 2)
+       else if (it->ozi_pos == OZI_POS_DOTDOT)
                namelen = 2;
 
        if (namelen > 0) {
@@ -1648,8 +1657,8 @@ static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
        __u64              pos;
        ENTRY;
 
-       if (it->ozi_pos <= 2)
-               pos = it->ozi_pos;
+       if (it->ozi_pos <= OZI_POS_DOTDOT)
+               pos = 0;
        else
                pos = osd_zap_cursor_serialize(it->ozi_zc);
 
@@ -1675,11 +1684,11 @@ static int osd_dir_it_load(const struct lu_env *env,
        zap_cursor_fini(it->ozi_zc);
        osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
 
-       if (hash <= 2) {
-               it->ozi_pos = hash;
-               rc = +1;
+       if (hash == 0) {
+               it->ozi_pos = OZI_POS_INIT;
+               rc = +1; /* there will be ./.. at least */
        } else {
-               it->ozi_pos = 3;
+               it->ozi_pos = OZI_POS_REAL;
                /* to return whether the end has been reached */
                rc = osd_index_retrieve_skip_dots(it, za);
                if (rc == 0)
@@ -1766,7 +1775,7 @@ static int osd_declare_index_insert(const struct lu_env *env,
        ENTRY;
 
        LASSERT(th != NULL);
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        LASSERT(obj->oo_dn);
 
@@ -1781,7 +1790,7 @@ static int osd_declare_index_insert(const struct lu_env *env,
 
 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
                            const struct dt_rec *rec, const struct dt_key *key,
-                           struct thandle *th, int ignore_quota)
+                           struct thandle *th)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_device  *osd = osd_obj2dev(obj);
@@ -1795,7 +1804,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
        LASSERT(osd_invariant(obj));
        LASSERT(th != NULL);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        rc = osd_prepare_key_uint64(obj, k, key);
 
@@ -1820,7 +1829,7 @@ static int osd_declare_index_delete(const struct lu_env *env,
        LASSERT(th != NULL);
        LASSERT(obj->oo_dn);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        /* do not specify the key as then DMU is trying to look it up
         * which is very expensive. usually the layers above lookup
@@ -1843,7 +1852,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
 
        LASSERT(obj->oo_dn);
        LASSERT(th != NULL);
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        rc = osd_prepare_key_uint64(obj, k, key);
 
@@ -2012,6 +2021,8 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                const struct dt_index_features *feat)
 {
        struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_device *osd = osd_obj2dev(obj);
+       const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
        int rc = 0;
        ENTRY;
 
@@ -2036,7 +2047,7 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                else
                        GOTO(out, rc = -ENOTDIR);
        } else if (unlikely(feat == &dt_acct_features)) {
-               LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
+               LASSERT(fid_is_acct(fid));
                dt->do_index_ops = &osd_acct_index_ops;
        } else if (dt->do_index_ops == NULL) {
                /* For index file, we don't support variable key & record sizes
@@ -2067,6 +2078,24 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                        obj->oo_recusize = 8;
                }
                dt->do_index_ops = &osd_index_ops;
+
+               if (feat == &dt_lfsck_layout_orphan_features ||
+                   feat == &dt_lfsck_layout_dangling_features ||
+                   feat == &dt_lfsck_namespace_features)
+                       GOTO(out, rc = 0);
+
+               rc = osd_index_register(osd, fid, obj->oo_keysize,
+                                       obj->oo_recusize * obj->oo_recsize);
+               if (rc < 0)
+                       CWARN("%s: failed to register index "DFID": rc = %d\n",
+                             osd_name(osd), PFID(fid), rc);
+               else if (rc > 0)
+                       rc = 0;
+               else
+                       CDEBUG(D_LFSCK, "%s: index object "DFID
+                              " (%u/%u/%u) registered\n",
+                              osd_name(osd), PFID(fid), obj->oo_keysize,
+                              obj->oo_recusize, obj->oo_recsize);
        }
 
 out: