Whamcloud - gitweb
LU-10192 osd-zfs: create agent entry for remote entry 17/29617/21
authorFan Yong <fan.yong@intel.com>
Wed, 6 Dec 2017 13:54:45 +0000 (21:54 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 22 Dec 2017 06:49:03 +0000 (06:49 +0000)
In DNE environment, the object (in spite of regular file
or directory) and its name entry may reside on different
MDTs. Under such case, we will create an agent entry on
the MDT where the object resides. The agent entry references
the object locally, that makes the object to be visible to
the userspace when mounted as 'zfs' directly. Then the
userspace tools, such as 'tar' can handle the object properly.
That is compatibile between ldiskfs backend and ZFS backend.

We handle the agent entry during set linkEA that is the common
interface for both regular file and directroy, can handle kinds
of cases, such as create/link/unlink/rename, and so on.

NOTE: we can NOT do that when ea_{insert,delete} that is only
for directory.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Icc4a63027221edf279994fbecda4d47cc121b799
Reviewed-on: https://review.whamcloud.com/29617
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c
lustre/osd-zfs/osd_xattr.c

index 81a7385..c7e87af 100644 (file)
@@ -531,6 +531,118 @@ out:
        return rc;
 }
 
+int osd_add_to_remote_parent(const struct lu_env *env,
+                            struct osd_device *osd,
+                            struct osd_object *obj,
+                            struct osd_thandle *oh)
+{
+       struct osd_thread_info *info = osd_oti_get(env);
+       struct luz_direntry *zde = &info->oti_zde;
+       char *name = info->oti_str;
+       const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+       struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
+       struct lu_buf buf = {
+               .lb_buf = lma,
+               .lb_len = sizeof(info->oti_buf),
+       };
+       int size = 0;
+       int rc;
+       ENTRY;
+
+       rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
+       if (rc) {
+               CWARN("%s: fail to load LMA for adding "
+                     DFID" to remote parent: rc = %d\n",
+                     osd_name(osd), PFID(fid), rc);
+               RETURN(rc);
+       }
+
+       lustre_lma_swab(lma);
+       lma->lma_incompat |= LMAI_REMOTE_PARENT;
+       lustre_lma_swab(lma);
+       buf.lb_len = size;
+       rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
+                                   LU_XATTR_REPLACE, oh);
+       if (rc) {
+               CWARN("%s: fail to update LMA for adding "
+                     DFID" to remote parent: rc = %d\n",
+                     osd_name(osd), PFID(fid), rc);
+               RETURN(rc);
+       }
+
+       osd_fid2str(name, fid, sizeof(info->oti_str));
+       zde->lzd_reg.zde_dnode = obj->oo_dn->dn_object;
+       zde->lzd_reg.zde_type = IFTODT(S_IFDIR);
+       zde->lzd_fid = *fid;
+
+       rc = osd_zap_add(osd, osd->od_remote_parent_dir, NULL,
+                        name, 8, sizeof(*zde) / 8, zde, oh->ot_tx);
+       if (unlikely(rc == -EEXIST))
+               rc = 0;
+       if (rc)
+               CWARN("%s: fail to add name entry for "
+                     DFID" to remote parent: rc = %d\n",
+                     osd_name(osd), PFID(fid), rc);
+       else
+               lu_object_set_agent_entry(&obj->oo_dt.do_lu);
+
+       RETURN(rc);
+}
+
+int osd_delete_from_remote_parent(const struct lu_env *env,
+                                 struct osd_device *osd,
+                                 struct osd_object *obj,
+                                 struct osd_thandle *oh, bool destroy)
+{
+       struct osd_thread_info *info = osd_oti_get(env);
+       char *name = info->oti_str;
+       const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+       struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
+       struct lu_buf buf = {
+               .lb_buf = lma,
+               .lb_len = sizeof(info->oti_buf),
+       };
+       int size = 0;
+       int rc;
+       ENTRY;
+
+       osd_fid2str(name, fid, sizeof(info->oti_str));
+       rc = osd_zap_remove(osd, osd->od_remote_parent_dir, NULL,
+                           name, oh->ot_tx);
+       if (unlikely(rc == -ENOENT))
+               rc = 0;
+       if (rc)
+               CERROR("%s: fail to remove entry under remote "
+                      "parent for "DFID": rc = %d\n",
+                      osd_name(osd), PFID(fid), rc);
+
+       if (destroy || rc)
+               RETURN(rc);
+
+       rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
+       if (rc) {
+               CERROR("%s: fail to load LMA for removing "
+                      DFID" from remote parent: rc = %d\n",
+                      osd_name(osd), PFID(fid), rc);
+               RETURN(rc);
+       }
+
+       lustre_lma_swab(lma);
+       lma->lma_incompat &= ~LMAI_REMOTE_PARENT;
+       lustre_lma_swab(lma);
+       buf.lb_len = size;
+       rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
+                                   LU_XATTR_REPLACE, oh);
+       if (rc)
+               CERROR("%s: fail to update LMA for removing "
+                      DFID" from remote parent: rc = %d\n",
+                      osd_name(osd), PFID(fid), rc);
+       else
+               lu_object_clear_agent_entry(&obj->oo_dt.do_lu);
+
+       RETURN(rc);
+}
+
 static int osd_declare_dir_insert(const struct lu_env *env,
                                  struct dt_object *dt,
                                  const struct dt_rec *rec,
@@ -737,8 +849,12 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
 
        if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR))
                zde->lzd_fid.f_ver = ~0;
+
+       /* The logic is not related with IGIF, just re-use the fail_loc value
+        * to be consistent with ldiskfs case, then share the same test logic */
        if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
                num = 1;
+
        /* Insert (key,oid) into ZAP */
        rc = osd_zap_add(osd, parent->oo_dn->dn_object, parent->oo_dn,
                         name, 8, num, (void *)zde, oh->ot_tx);
@@ -832,7 +948,7 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
         *         the lookup conditionally.
         *      2) Enhance the ZFS logic to recognize the OSD lookup result
         *         and delete the given entry directly without lookup again
-        *         internally. LU-10295 */
+        *         internally. LU-10190 */
        memset(&zde->lzd_fid, 0, sizeof(zde->lzd_fid));
        rc = osd_zap_lookup(osd, zap_dn->dn_object, zap_dn, name, 8, 3, zde);
        if (unlikely(rc)) {
index d00bd47..d03ff00 100644 (file)
@@ -239,13 +239,13 @@ struct osd_thandle {
                                 ot_assigned:1;
 };
 
-#define OSD_OI_NAME_SIZE        16
+#define OSD_OI_NAME_SIZE        24
 
 /*
  * Object Index (OI) instance.
  */
 struct osd_oi {
-       char                    oi_name[OSD_OI_NAME_SIZE]; /* unused */
+       char                    oi_name[OSD_OI_NAME_SIZE];
        uint64_t                oi_zapid;
        dnode_t *oi_dn;
 };
@@ -283,6 +283,7 @@ struct osd_device {
        struct proc_dir_entry   *od_proc_entry;
        struct lprocfs_stats    *od_stats;
 
+       uint64_t                 od_remote_parent_dir;
        uint64_t                 od_max_blksz;
        uint64_t                 od_root;
        uint64_t                 od_O_id;
@@ -556,6 +557,14 @@ void osd_zap_cursor_fini(zap_cursor_t *zc);
 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
 int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
                   const struct lu_fid *fid);
+int osd_add_to_remote_parent(const struct lu_env *env,
+                            struct osd_device *osd,
+                            struct osd_object *obj,
+                            struct osd_thandle *oh);
+int osd_delete_from_remote_parent(const struct lu_env *env,
+                                 struct osd_device *osd,
+                                 struct osd_object *obj,
+                                 struct osd_thandle *oh, bool destroy);
 
 /* osd_xattr.c */
 int __osd_sa_xattr_schedule_update(const struct lu_env *env,
@@ -570,6 +579,8 @@ int __osd_xattr_load(struct osd_device *osd, sa_handle_t *hdl,
 int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
                          uint64_t xattr, struct lu_buf *buf,
                          const char *name, int *sizep);
+int osd_xattr_get_internal(const struct lu_env *env, struct osd_object *obj,
+                          struct lu_buf *buf, const char *name, int *sizep);
 int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
                  struct lu_buf *buf, const char *name);
 int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
@@ -614,6 +625,12 @@ osd_find_dnsize(struct osd_device *osd, int ea_in_bonus)
 }
 #endif
 
+/* XXX: f_ver is not counted, but may differ too */
+static inline void osd_fid2str(char *buf, const struct lu_fid *fid, int len)
+{
+       snprintf(buf, len, DFID_NOBRACE, PFID(fid));
+}
+
 static inline int
 osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
index f57bcf9..da446da 100644 (file)
@@ -455,9 +455,14 @@ static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
                              PFID(lu_object_fid(&obj->oo_dt.do_lu)));
                        rc = -EOPNOTSUPP;
                } else {
+                       struct osd_device *osd = osd_obj2dev(obj);
+
                        if (lma->lma_compat & LMAC_STRIPE_INFO &&
-                           osd_obj2dev(obj)->od_is_ost)
+                           osd->od_is_ost)
                                obj->oo_pfid_in_lma = 1;
+                       if (unlikely(lma->lma_incompat & LMAI_REMOTE_PARENT) &&
+                           osd->od_remote_parent_dir != ZFS_NO_OBJECT)
+                               lu_object_set_agent_entry(&obj->oo_dt.do_lu);
                }
        } else if (rc == -ENODATA) {
                /* haven't initialize LMA xattr */
@@ -661,6 +666,11 @@ static int osd_declare_destroy(const struct lu_env *env, struct dt_object *dt,
                osd_tx_hold_zap(oh->ot_tx, osd->od_unlinked->dn_object,
                                osd->od_unlinked, TRUE, NULL);
 
+       /* remove agent entry (if have) from remote parent */
+       if (lu_object_has_agent_entry(&obj->oo_dt.do_lu))
+               osd_tx_hold_zap(oh->ot_tx, osd->od_remote_parent_dir,
+                               NULL, FALSE, NULL);
+
        /* will help to find FID->ino when this object is being
         * added to PENDING/ */
        osd_idc_find_and_init(env, osd, obj);
@@ -710,6 +720,12 @@ static int osd_destroy(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
        }
 
+       if (lu_object_has_agent_entry(&obj->oo_dt.do_lu)) {
+               rc = osd_delete_from_remote_parent(env, osd, obj, oh, true);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
        oid = obj->oo_dn->dn_object;
        if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
                /* this may happen if the destroy wasn't declared
index c266b7e..480eaa3 100644 (file)
@@ -467,12 +467,6 @@ osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
        return osd_seq->os_compat_dirs[b];
 }
 
-/* XXX: f_ver is not counted, but may differ too */
-static void osd_fid2str(char *buf, const struct lu_fid *fid)
-{
-       sprintf(buf, DFID_NOBRACE, PFID(fid));
-}
-
 /*
  * Determine the zap object id which is being used as the OI for the
  * given fid.  The lowest N bits in the sequence ID are used as the
@@ -481,14 +475,14 @@ static void osd_fid2str(char *buf, const struct lu_fid *fid)
  */
 static uint64_t
 osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
-                   char *buf, dnode_t **zdn)
+                   char *buf, dnode_t **zdn, int bufsize)
 {
        struct osd_oi *oi;
 
        LASSERT(osd->od_oi_table != NULL);
        oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
        if (buf)
-               osd_fid2str(buf, fid);
+               osd_fid2str(buf, fid, bufsize);
        if (zdn)
                *zdn = oi->oi_dn;
 
@@ -520,10 +514,11 @@ osd_get_name_n_idx_compat(const struct lu_env *env, struct osd_device *osd,
                        if (buf)
                                strncpy(buf, name, bufsize);
                } else {
-                       zapid = osd_get_idx_for_fid(osd, fid, buf, NULL);
+                       zapid = osd_get_idx_for_fid(osd, fid, buf, NULL,
+                                                   bufsize);
                }
        } else {
-               zapid = osd_get_idx_for_fid(osd, fid, buf, zdn);
+               zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize);
        }
 
        return zapid;
@@ -553,10 +548,11 @@ uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
                        if (buf)
                                strncpy(buf, name, bufsize);
                } else {
-                       zapid = osd_get_idx_for_fid(osd, fid, buf, NULL);
+                       zapid = osd_get_idx_for_fid(osd, fid, buf, NULL,
+                                                   bufsize);
                }
        } else {
-               zapid = osd_get_idx_for_fid(osd, fid, buf, zdn);
+               zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize);
        }
 
        return zapid;
@@ -599,7 +595,7 @@ int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev,
                                                    8, 1, &info->oti_zde);
                        } else if (fid_is_objseq(fid) || fid_is_batchid(fid)) {
                                zapid = osd_get_idx_for_fid(dev, fid,
-                                                           buf, NULL);
+                                       buf, NULL, sizeof(info->oti_buf));
                                rc = osd_zap_lookup(dev, zapid, zdn, buf,
                                                    8, 1, &info->oti_zde);
                        }
@@ -779,6 +775,28 @@ osd_oi_init_compat(const struct lu_env *env, struct osd_device *o)
        RETURN(rc);
 }
 
+static void
+osd_oi_init_remote_parent(const struct lu_env *env, struct osd_device *o)
+{
+       uint64_t sdb;
+       int rc;
+       ENTRY;
+
+       if (o->od_is_ost) {
+               o->od_remote_parent_dir = ZFS_NO_OBJECT;
+       } else {
+               /* Remote parent only used for cross-MDT objects,
+                * it is usless for single MDT case or under read
+                * only mode. So ignore the failure. */
+               rc = osd_oi_find_or_create(env, o, o->od_root,
+                                          REMOTE_PARENT_DIR, &sdb);
+               if (!rc)
+                       o->od_remote_parent_dir = sdb;
+               else
+                       o->od_remote_parent_dir = ZFS_NO_OBJECT;
+       }
+}
+
 /**
  * Initialize the OIs by either opening or creating them as needed.
  */
@@ -788,6 +806,8 @@ int osd_oi_init(const struct lu_env *env, struct osd_device *o)
        int      i, rc, count = 0;
        ENTRY;
 
+       osd_oi_init_remote_parent(env, o);
+
        rc = osd_oi_probe(env, o, &count);
        if (rc)
                RETURN(rc);
index 3832028..181dd6b 100644 (file)
@@ -45,6 +45,7 @@
 #include <obd_class.h>
 #include <lustre_disk.h>
 #include <lustre_fid.h>
+#include <lustre_linkea.h>
 
 #include "osd_internal.h"
 
@@ -352,12 +353,25 @@ void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
                             int vallen, const char *name,
                             struct osd_thandle *oh)
 {
+       struct osd_device *osd = osd_obj2dev(obj);
        dmu_tx_t *tx = oh->ot_tx;
        int bonuslen;
 
        if (unlikely(obj->oo_destroyed))
                return;
 
+       if (strcmp(name, XATTR_NAME_LINK) == 0 &&
+           osd->od_remote_parent_dir != ZFS_NO_OBJECT) {
+               /* If some name entry resides on remote MDT, then will create
+                * agent entry under remote parent. On the other hand, if the
+                * remote entry will be removed, then related agent entry may
+                * need to be removed from the remote parent. So there may be
+                * kinds of cases, let's declare enough credits. The credits
+                * for create agent entry is enough for remove case. */
+               osd_tx_hold_zap(tx, osd->od_remote_parent_dir,
+                               NULL, TRUE, NULL);
+       }
+
        if (unlikely(!osd_obj2dev(obj)->od_xattr_in_sa)) {
                __osd_xattr_declare_legacy(env, obj, vallen, name, oh);
                return;
@@ -779,11 +793,73 @@ static int osd_xattr_split_pfid(const struct lu_env *env,
        RETURN(rc);
 }
 
+/*
+ * In DNE environment, the object (in spite of regular file or directory)
+ * and its name entry may reside on different MDTs. Under such case, we will
+ * create an agent entry on the MDT where the object resides. The agent entry
+ * references the object locally, that makes the object to be visible to the
+ * userspace when mounted as 'zfs' directly. Then the userspace tools, such
+ * as 'tar' can handle the object properly.
+ *
+ * We handle the agent entry during set linkEA that is the common interface
+ * for both regular file and directroy, can handle kinds of cases, such as
+ * create/link/unlink/rename, and so on.
+ *
+ * NOTE: we need to do that for both directory and regular file, so we can NOT
+ *      do that when ea_{insert,delete} that are directory based operations.
+ */
+static int osd_xattr_handle_linkea(const struct lu_env *env,
+                                  struct osd_device *osd,
+                                  struct osd_object *obj,
+                                  const struct lu_buf *buf,
+                                  struct osd_thandle *oh)
+{
+       const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+       struct lu_fid *tfid = &osd_oti_get(env)->oti_fid;
+       struct linkea_data ldata = { .ld_buf = (struct lu_buf *)buf };
+       struct lu_name tmpname;
+       int rc;
+       bool remote = false;
+       ENTRY;
+
+       rc = linkea_init_with_rec(&ldata);
+       if (!rc) {
+               linkea_first_entry(&ldata);
+               while (ldata.ld_lee != NULL && !remote) {
+                       linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+                                           &tmpname, tfid);
+                       if (osd_remote_fid(env, osd, tfid) > 0)
+                               remote = true;
+                       else
+                               linkea_next_entry(&ldata);
+               }
+       } else if (rc == -ENODATA) {
+               rc = 0;
+       } else {
+               RETURN(rc);
+       }
+
+       if (lu_object_has_agent_entry(&obj->oo_dt.do_lu) && !remote) {
+               rc = osd_delete_from_remote_parent(env, osd, obj, oh, false);
+               if (rc)
+                       CERROR("%s: failed to remove agent entry for "DFID
+                              ": rc = %d\n", osd_name(osd), PFID(fid), rc);
+       } else if (!lu_object_has_agent_entry(&obj->oo_dt.do_lu) && remote) {
+               rc = osd_add_to_remote_parent(env, osd, obj, oh);
+               if (rc)
+                       CWARN("%s: failed to create agent entry for "DFID
+                             ": rc = %d\n", osd_name(osd), PFID(fid), rc);
+       }
+
+       RETURN(rc);
+}
+
 int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  const struct lu_buf *buf, const char *name, int fl,
                  struct thandle *handle)
 {
-       struct osd_object  *obj = osd_dt_obj(dt);
+       struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_device *osd = osd_obj2dev(obj);
        struct osd_thandle *oh;
        int rc = 0;
        ENTRY;
@@ -808,6 +884,9 @@ int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                rc = osd_xattr_split_pfid(env, obj, oh);
                if (!rc)
                        fl = LU_XATTR_CREATE;
+       } else if (strcmp(name, XATTR_NAME_LINK) == 0 &&
+                  osd->od_remote_parent_dir != ZFS_NO_OBJECT) {
+               rc = osd_xattr_handle_linkea(env, osd, obj, buf, oh);
        }
 
        if (!rc)