Whamcloud - gitweb
LU-5070 osd-zfs: get object's FID from its LMA EA 95/18595/2
authorFan Yong <fan.yong@intel.com>
Tue, 23 Feb 2016 03:33:55 +0000 (11:33 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 22 Apr 2016 15:47:22 +0000 (15:47 +0000)
Do not assume that the FID-in-dirent is always valid. If the
FID-in-dirent is absent or invalid, then the osd_dir_lookup
needs to find out the FID from the target object's LMA EA.

It is the similar logic as osd-ldiskfs does.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: If75ff4ed03bd75a3385666842e94838ece3f4344
Reviewed-on: http://review.whamcloud.com/18595
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_object.c

index 705c86d..8b6c8fe 100644 (file)
@@ -228,90 +228,121 @@ static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
        ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
 }
 
-/*
- * as we don't know FID, we can't use LU object, so this function
- * partially duplicate __osd_xattr_get() which is built around
- * LU-object and uses it to cache data like regular EA dnode, etc
+/**
+ * Get the object's FID from its LMA EA.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] osd      pointer to the OSD device
+ * \param[in] oid      the object's local identifier
+ * \param[out] fid     the buffer to hold the object's FID
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
  */
-static int osd_find_parent_by_dnode(const struct lu_env *env,
-                                   struct dt_object *o,
-                                   struct lu_fid *fid)
+static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+                             uint64_t oid, struct lu_fid *fid)
 {
-       struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
-       struct lustre_mdt_attrs *lma;
+       struct objset           *os       = osd->od_os;
+       struct osd_thread_info  *oti      = osd_oti_get(env);
+       struct lustre_mdt_attrs *lma      =
+                       (struct lustre_mdt_attrs *)oti->oti_buf;
        struct lu_buf            buf;
-       sa_handle_t             *sa_hdl;
-       nvlist_t                *nvbuf = NULL;
-       uchar_t                 *value;
-       uint64_t                 dnode;
-       int                      rc, size;
+       nvlist_t                *sa_xattr = NULL;
+       sa_handle_t             *sa_hdl   = NULL;
+       uchar_t                 *nv_value = NULL;
+       uint64_t                 xattr    = ZFS_NO_OBJECT;
+       int                      size     = 0;
+       int                      rc;
        ENTRY;
 
-       /* first of all, get parent dnode from own attributes */
-       LASSERT(osd_dt_obj(o)->oo_db);
-       rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
-                           NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
-               RETURN(rc);
+       rc = __osd_xattr_load(osd, oid, &sa_xattr);
+       if (rc == -ENOENT)
+               goto regular;
 
-       dnode = ZFS_NO_OBJECT;
-       rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
-       sa_handle_destroy(sa_hdl);
-       if (rc)
-               RETURN(rc);
+       if (rc != 0)
+               GOTO(out, rc);
 
-       /* now get EA buffer */
-       rc = __osd_xattr_load(osd, dnode, &nvbuf);
-       if (rc)
-               GOTO(regular, rc);
+       rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value,
+                                      &size);
+       if (rc == -ENOENT)
+               goto regular;
 
-       /* XXX: if we get that far.. should we cache the result? */
+       if (rc != 0)
+               GOTO(out, rc);
 
-       /* try to find LMA attribute */
-       LASSERT(nvbuf != NULL);
-       rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
-       if (rc == 0 && size >= sizeof(*lma)) {
-               lma = (struct lustre_mdt_attrs *)value;
-               lustre_lma_swab(lma);
-               *fid = lma->lma_self_fid;
-               GOTO(out, rc = 0);
-       }
+       if (unlikely(size > sizeof(oti->oti_buf)))
+               GOTO(out, rc = -ERANGE);
 
-regular:
-       /* no LMA attribute in SA, let's try regular EA */
+       memcpy(lma, nv_value, size);
 
-       /* first of all, get parent dnode storing regular EA */
-       rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
-       if (rc)
+       goto found;
+
+regular:
+       rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc != 0)
                GOTO(out, rc);
 
-       dnode = ZFS_NO_OBJECT;
-       rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
+       rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8);
        sa_handle_destroy(sa_hdl);
-       if (rc)
+       if (rc != 0)
                GOTO(out, rc);
 
-       CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
-       buf.lb_buf = osd_oti_get(env)->oti_buf;
-       buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
-
-       /* now try to find LMA */
-       rc = __osd_xattr_get_large(env, osd, dnode, &buf,
+       buf.lb_buf = lma;
+       buf.lb_len = sizeof(oti->oti_buf);
+       rc = __osd_xattr_get_large(env, osd, xattr, &buf,
                                   XATTR_NAME_LMA, &size);
-       if (rc == 0 && size >= sizeof(*lma)) {
-               lma = buf.lb_buf;
-               lustre_lma_swab(lma);
-               *fid = lma->lma_self_fid;
-               GOTO(out, rc = 0);
-       } else if (rc < 0) {
+       if (rc != 0)
                GOTO(out, rc);
-       } else {
+
+found:
+       if (size < sizeof(*lma))
                GOTO(out, rc = -EIO);
+
+       lustre_lma_swab(lma);
+       if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
+                    CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
+               CWARN("%s: unsupported incompat LMA feature(s) %#x for "
+                     "oid = "LPX64"\n", osd->od_svname,
+                     lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid);
+               GOTO(out, rc = -EOPNOTSUPP);
+       } else {
+               *fid = lma->lma_self_fid;
+               GOTO(out, rc = 0);
        }
 
 out:
-       if (nvbuf != NULL)
-               nvlist_free(nvbuf);
+       if (sa_xattr != NULL)
+               nvlist_free(sa_xattr);
+       return rc;
+}
+
+/*
+ * As we don't know FID, we can't use LU object, so this function
+ * partially duplicate __osd_xattr_get() which is built around
+ * LU-object and uses it to cache data like regular EA dnode, etc
+ */
+static int osd_find_parent_by_dnode(const struct lu_env *env,
+                                   struct dt_object *o,
+                                   struct lu_fid *fid)
+{
+       struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
+       sa_handle_t             *sa_hdl;
+       uint64_t                 dnode = ZFS_NO_OBJECT;
+       int                      rc;
+       ENTRY;
+
+       /* first of all, get parent dnode from own attributes */
+       LASSERT(osd_dt_obj(o)->oo_db);
+       rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
+                           NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc != 0)
+               RETURN(rc);
+
+       rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
+       sa_handle_destroy(sa_hdl);
+       if (rc == 0)
+               rc = osd_get_fid_by_oid(env, osd, dnode, fid);
+
        RETURN(rc);
 }
 
@@ -407,12 +438,22 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
                }
        }
 
+       memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid));
        rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
                         (char *)key, 8, sizeof(oti->oti_zde) / 8,
                         (void *)&oti->oti_zde);
-       memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+       if (rc != 0)
+               RETURN(rc);
 
-       RETURN(rc == 0 ? 1 : rc);
+       if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
+               memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
+               RETURN(1);
+       }
+
+       rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode,
+                               (struct lu_fid *)rec);
+
+       RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
 }
 
 static int osd_declare_dir_insert(const struct lu_env *env,
index 5c63388..b875be2 100644 (file)
@@ -1577,12 +1577,9 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
        LASSERT(osd_invariant(obj));
 
        rc = osd_init_lma(env, obj, fid, oh);
-       if (rc) {
+       if (rc != 0)
                CERROR("%s: can not set LMA on "DFID": rc = %d\n",
                       osd->od_svname, PFID(fid), rc);
-               /* ignore errors during LMA initialization */
-               rc = 0;
-       }
 
 out:
        up_write(&obj->oo_guard);