Whamcloud - gitweb
LU-2449 osd: lookup(..) to fetch fid from parent's LMA
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Thu, 7 Mar 2013 11:09:52 +0000 (15:09 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 14 Mar 2013 04:18:11 +0000 (00:18 -0400)
in case LinkEA is not accessible for a reason, osd-zfs
will be trying to get fid for ".." using parent's dnode
which is stored in regular ZFS attributes. parent's
LMA can be used to get fid for ".."

Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Change-Id: I34e9f884eb60f036c2f941013bf22e154efc2ff4
Reviewed-on: http://review.whamcloud.com/5629
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_xattr.c

index 92a2723..f597e02 100644 (file)
@@ -197,6 +197,93 @@ static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
        ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
 }
 
+/*
+ * as we don't know FID, we can't use LU object, so this function
+ * partially duplicate __osd_xattr_get() which is built around
+ * LU-object and uses it to cache data like regular EA dnode, etc
+ */
+static int osd_find_parent_by_dnode(const struct lu_env *env,
+                                   struct dt_object *o,
+                                   struct lu_fid *fid)
+{
+       struct lustre_mdt_attrs *lma;
+       udmu_objset_t           *uos = &osd_obj2dev(osd_dt_obj(o))->od_objset;
+       struct lu_buf            buf;
+       sa_handle_t             *sa_hdl;
+       nvlist_t                *nvbuf = NULL;
+       uchar_t                 *value;
+       uint64_t                 dnode;
+       int                      rc, size;
+       ENTRY;
+
+       /* first of all, get parent dnode from own attributes */
+       LASSERT(osd_dt_obj(o)->oo_db);
+       rc = -sa_handle_get(uos->os, osd_dt_obj(o)->oo_db->db_object,
+                           NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc)
+               RETURN(rc);
+
+       dnode = ZFS_NO_OBJECT;
+       rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(uos), &dnode, 8);
+       sa_handle_destroy(sa_hdl);
+       if (rc)
+               RETURN(rc);
+
+       /* now get EA buffer */
+       rc = __osd_xattr_load(uos, dnode, &nvbuf);
+       if (rc)
+               GOTO(regular, rc);
+
+       /* XXX: if we get that far.. should we cache the result? */
+
+       /* try to find LMA attribute */
+       LASSERT(nvbuf != NULL);
+       rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
+       if (rc == 0 && size >= sizeof(*lma)) {
+               lma = (struct lustre_mdt_attrs *)value;
+               lustre_lma_swab(lma);
+               *fid = lma->lma_self_fid;
+               GOTO(out, rc = 0);
+       }
+
+regular:
+       /* no LMA attribute in SA, let's try regular EA */
+
+       /* first of all, get parent dnode storing regular EA */
+       rc = -sa_handle_get(uos->os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
+       if (rc)
+               GOTO(out, rc);
+
+       dnode = ZFS_NO_OBJECT;
+       rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(uos), &dnode, 8);
+       sa_handle_destroy(sa_hdl);
+       if (rc)
+               GOTO(out, rc);
+
+       CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
+       buf.lb_buf = osd_oti_get(env)->oti_buf;
+       buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
+
+       /* now try to find LMA */
+       rc = __osd_xattr_get_large(env, uos, dnode, &buf,
+                                  XATTR_NAME_LMA, &size);
+       if (rc == 0 && size >= sizeof(*lma)) {
+               lma = buf.lb_buf;
+               lustre_lma_swab(lma);
+               *fid = lma->lma_self_fid;
+               GOTO(out, rc = 0);
+       } else if (rc < 0) {
+               GOTO(out, rc);
+       } else {
+               GOTO(out, rc = -EIO);
+       }
+
+out:
+       if (nvbuf != NULL)
+               nvlist_free(nvbuf);
+       RETURN(rc);
+}
+
 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
                               struct lu_fid *fid)
 {
@@ -245,6 +332,25 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
 out:
        if (buf.lb_buf != osd_oti_get(env)->oti_buf)
                OBD_FREE(buf.lb_buf, buf.lb_len);
+
+#if 0
+       /* this block can be enabled for additional verification
+        * it's trying to match FID from LinkEA vs. FID from LMA */
+       if (rc == 0) {
+               struct lu_fid fid2;
+               int rc2;
+               rc2 = osd_find_parent_by_dnode(env, o, &fid2);
+               if (rc2 == 0)
+                       if (lu_fid_eq(fid, &fid2) == 0)
+                               CERROR("wrong parent: "DFID" != "DFID"\n",
+                                      PFID(fid), PFID(&fid2));
+       }
+#endif
+
+       /* no LinkEA is found, let's try to find the fid in parent's LMA */
+       if (unlikely(rc != 0))
+               rc = osd_find_parent_by_dnode(env, o, fid);
+
        RETURN(rc);
 }
 
@@ -431,9 +537,8 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                        /* update parent dnode in the child.
                         * later it will be used to generate ".." */
                        udmu_objset_t *uos = &osd->od_objset;
-                       rc = osd_object_sa_update(child,
-                                                 SA_ZPL_PARENT(uos),
-                                                 &parent->oo_db->db_object,
+                       rc = osd_object_sa_update(parent, SA_ZPL_PARENT(uos),
+                                                 &child->oo_db->db_object,
                                                  8, oh);
 
 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
index 6ea83ba..55759c3 100644 (file)
@@ -428,6 +428,10 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 
 
 /* osd_xattr.c */
+int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr);
+int __osd_xattr_get_large(const struct lu_env *env, udmu_objset_t *uos,
+                         uint64_t xattr, struct lu_buf *buf,
+                         const char *name, int *sizep);
 int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
                  struct lu_buf *buf, const char *name,
                  struct lustre_capa *capa);
index 56d1352..463ea28 100644 (file)
  *
  * No locking is done here.
  */
-int __osd_xattr_cache(const struct lu_env *env, struct osd_object *obj)
+int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr)
 {
-       struct osd_device *osd = osd_obj2dev(obj);
-       udmu_objset_t     *uos = &osd->od_objset;
-       sa_handle_t       *sa_hdl;
-       char              *buf;
-       int                size;
-       int                rc;
+       sa_handle_t *sa_hdl;
+       char        *buf;
+       int          rc, size;
 
-       LASSERT(obj->oo_sa_xattr == NULL);
-       LASSERT(obj->oo_db != NULL);
+       if (unlikely(dnode == ZFS_NO_OBJECT))
+               return -ENOENT;
 
-       rc = -sa_handle_get(uos->os, obj->oo_db->db_object, NULL,
-                       SA_HDL_PRIVATE, &sa_hdl);
+       rc = -sa_handle_get(uos->os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
        if (rc)
                return rc;
 
        rc = -sa_size(sa_hdl, SA_ZPL_DXATTR(uos), &size);
        if (rc) {
                if (rc == -ENOENT)
-                       rc = -nvlist_alloc(&obj->oo_sa_xattr,
-                                       NV_UNIQUE_NAME, KM_SLEEP);
+                       rc = -nvlist_alloc(sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
                goto out_sa;
        }
 
@@ -116,7 +111,7 @@ int __osd_xattr_cache(const struct lu_env *env, struct osd_object *obj)
        }
        rc = -sa_lookup(sa_hdl, SA_ZPL_DXATTR(uos), buf, size);
        if (rc == 0)
-               rc = -nvlist_unpack(buf, size, &obj->oo_sa_xattr, KM_SLEEP);
+               rc = -nvlist_unpack(buf, size, sa_xattr, KM_SLEEP);
        sa_spill_free(buf);
 out_sa:
        sa_handle_destroy(sa_hdl);
@@ -124,6 +119,16 @@ out_sa:
        return rc;
 }
 
+static inline int __osd_xattr_cache(const struct lu_env *env,
+                                   struct osd_object *obj)
+{
+       LASSERT(obj->oo_sa_xattr == NULL);
+       LASSERT(obj->oo_db != NULL);
+
+       return __osd_xattr_load(&osd_obj2dev(obj)->od_objset,
+                               obj->oo_db->db_object, &obj->oo_sa_xattr);
+}
+
 int __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj,
                const struct lu_buf *buf, const char *name, int *sizep)
 {
@@ -156,28 +161,21 @@ int __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj,
        return 0;
 }
 
-int __osd_xattr_get(const struct lu_env *env, struct osd_object *obj,
-               struct lu_buf *buf, const char *name, int *sizep)
+int __osd_xattr_get_large(const struct lu_env *env, udmu_objset_t *uos,
+                         uint64_t xattr, struct lu_buf *buf,
+                         const char *name, int *sizep)
 {
-       struct osd_device *osd = osd_obj2dev(obj);
-       udmu_objset_t     *uos = &osd->od_objset;
-       uint64_t           xa_data_obj;
-       dmu_buf_t         *xa_data_db;
-       sa_handle_t       *sa_hdl = NULL;
-       uint64_t           size;
-       int                rc;
-
-       /* check SA_ZPL_DXATTR first then fallback to directory xattr */
-       rc = __osd_sa_xattr_get(env, obj, buf, name, sizep);
-       if (rc != -ENOENT)
-               return rc;
+       dmu_buf_t       *xa_data_db;
+       sa_handle_t     *sa_hdl = NULL;
+       uint64_t         xa_data_obj, size;
+       int              rc;
 
        /* are there any extended attributes? */
-       if (obj->oo_xattr == ZFS_NO_OBJECT)
+       if (xattr == ZFS_NO_OBJECT)
                return -ENOENT;
 
        /* Lookup the object number containing the xattr data */
-       rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
+       rc = -zap_lookup(uos->os, xattr, name, sizeof(uint64_t), 1,
                        &xa_data_obj);
        if (rc)
                return rc;
@@ -219,6 +217,23 @@ out:
        sa_handle_destroy(sa_hdl);
 out_rele:
        dmu_buf_rele(xa_data_db, FTAG);
+
+       return rc;
+}
+
+int __osd_xattr_get(const struct lu_env *env, struct osd_object *obj,
+               struct lu_buf *buf, const char *name, int *sizep)
+{
+       int rc;
+
+       /* check SA_ZPL_DXATTR first then fallback to directory xattr */
+       rc = __osd_sa_xattr_get(env, obj, buf, name, sizep);
+       if (rc != -ENOENT)
+               return rc;
+
+       rc = __osd_xattr_get_large(env, &osd_obj2dev(obj)->od_objset,
+                                  obj->oo_xattr, buf, name, sizep);
+
        return rc;
 }
 
@@ -790,7 +805,9 @@ int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
 
                zap_cursor_advance(zc);
        }
-       if (rc < 0)
+       if (rc == -ENOENT) /* no more kes in the index */
+               rc = 0;
+       else if (unlikely(rc < 0))
                GOTO(out_fini, rc);
        rc = counted;