Whamcloud - gitweb
LU-10190 osd-zfs: create agent object for remote object 55/28855/25
authorFan Yong <fan.yong@intel.com>
Tue, 5 Dec 2017 14:28:10 +0000 (22:28 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 17 Dec 2017 06:19:51 +0000 (06:19 +0000)
In DNE environment, the object and its name entry may reside
on different MDTs. Under such case, we will create an agent
object on the MDT where the name entry resides. The agent
object is empty to indicates that the real object for this
name entry resides on another MDT. If without agent object,
related name entry will be skipped when perform MDT side file
level backup/restore via ZPL by userspace tool, such as 'tar'.

Test-Parameters: envdefinitions=ONLY=803 testlist=sanity mdtfilesystemtype=zfs ostfilesystemtype=zfs mdscount=2 mdtcount=4
Test-Parameters: envdefinitions=ONLY=34 testlist=sanity-lfsck mdtfilesystemtype=zfs ostfilesystemtype=zfs mdscount=2 mdtcount=4
Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I7fb8c54ca774f3877bcd326bc10d8e99083ac90a
Reviewed-on: https://review.whamcloud.com/28855
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/obd_support.h
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_xattr.c
lustre/tests/sanity-lfsck.sh
lustre/tests/sanity.sh

index 6c34255..f26af0c 100644 (file)
@@ -591,6 +591,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_LFSCK_ASSISTANT_DIRECT        0x162d
 #define OBD_FAIL_LFSCK_LOST_MDTOBJ2    0x162e
 #define OBD_FAIL_LFSCK_BAD_PFL_RANGE   0x162f
+#define OBD_FAIL_LFSCK_NO_AGENTOBJ     0x1630
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index 6d607c8..81a7385 100644 (file)
@@ -458,6 +458,79 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
 }
 
+/*
+ * In DNE environment, the object and its name entry may reside on different
+ * MDTs. Under such case, we will create an agent object on the MDT where the
+ * name entry resides. The agent object is empty, and indicates that the real
+ * object for the name entry resides on another MDT. If without agent object,
+ * related name entry will be skipped when perform MDT side file level backup
+ * and restore via ZPL by userspace tool, such as 'tar'.
+ */
+static int osd_create_agent_object(const struct lu_env *env,
+                                  struct osd_device *osd,
+                                  struct luz_direntry *zde,
+                                  uint64_t parent, dmu_tx_t *tx)
+{
+       struct osd_thread_info *info = osd_oti_get(env);
+       struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
+       struct lu_attr *la = &info->oti_la;
+       nvlist_t *nvbuf = NULL;
+       dnode_t *dn = NULL;
+       sa_handle_t *hdl;
+       int rc = 0;
+       ENTRY;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AGENTOBJ))
+               RETURN(0);
+
+       rc = -nvlist_alloc(&nvbuf, NV_UNIQUE_NAME, KM_SLEEP);
+       if (rc)
+               RETURN(rc);
+
+       lustre_lma_init(lma, &zde->lzd_fid, 0, LMAI_AGENT);
+       lustre_lma_swab(lma);
+       rc = -nvlist_add_byte_array(nvbuf, XATTR_NAME_LMA, (uchar_t *)lma,
+                                   sizeof(*lma));
+       if (rc)
+               GOTO(out, rc);
+
+       la->la_valid = LA_TYPE | LA_MODE;
+       la->la_mode = (DTTOIF(zde->lzd_reg.zde_type) & S_IFMT) |
+                       S_IRUGO | S_IWUSR | S_IXUGO;
+
+       if (S_ISDIR(la->la_mode))
+               rc = __osd_zap_create(env, osd, &dn, tx, la,
+                               osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS), 0);
+       else
+               rc = __osd_object_create(env, osd, NULL, &zde->lzd_fid,
+                                        &dn, tx, la);
+       if (rc)
+               GOTO(out, rc);
+
+       zde->lzd_reg.zde_dnode = dn->dn_object;
+       rc = -sa_handle_get(osd->od_os, dn->dn_object, NULL,
+                           SA_HDL_PRIVATE, &hdl);
+       if (!rc) {
+               rc = __osd_attr_init(env, osd, NULL, hdl, tx,
+                                    la, parent, nvbuf);
+               sa_handle_destroy(hdl);
+       }
+
+       GOTO(out, rc);
+
+out:
+       if (dn) {
+               if (rc)
+                       dmu_object_free(osd->od_os, dn->dn_object, tx);
+               osd_dnode_rele(dn);
+       }
+
+       if (nvbuf)
+               nvlist_free(nvbuf);
+
+       return rc;
+}
+
 static int osd_declare_dir_insert(const struct lu_env *env,
                                  struct dt_object *dt,
                                  const struct dt_rec *rec,
@@ -470,6 +543,7 @@ static int osd_declare_dir_insert(const struct lu_env *env,
        const struct lu_fid     *fid;
        struct osd_thandle      *oh;
        uint64_t                 object;
+       struct osd_idmap_cache *idc;
        ENTRY;
 
        rec1 = (struct dt_insert_rec *)rec;
@@ -480,6 +554,25 @@ static int osd_declare_dir_insert(const struct lu_env *env,
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
 
+       idc = osd_idc_find_or_init(env, osd, fid);
+       if (IS_ERR(idc))
+               RETURN(PTR_ERR(idc));
+
+       if (idc->oic_remote) {
+               const char *name = (const char *)key;
+
+               if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
+                       /* Prepare agent object for remote entry that will
+                        * be used for operations via ZPL, such as MDT side
+                        * file-level backup and restore. */
+                       dmu_tx_hold_sa_create(oh->ot_tx,
+                               osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
+                       if (S_ISDIR(rec1->rec_type))
+                               dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT,
+                                               FALSE, NULL);
+               }
+       }
+
        /* This is for inserting dot/dotdot for new created dir. */
        if (obj->oo_dn == NULL)
                object = DMU_NEW_OBJECT;
@@ -491,8 +584,6 @@ static int osd_declare_dir_insert(const struct lu_env *env,
         * before insertion */
        osd_tx_hold_zap(oh->ot_tx, object, obj->oo_dn, TRUE, NULL);
 
-       osd_idc_find_or_init(env, osd, fid);
-
        RETURN(0);
 }
 
@@ -564,9 +655,10 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        const struct lu_fid *fid = rec1->rec_fid;
        struct osd_thandle *oh;
        struct osd_idmap_cache *idc;
-       char                *name = (char *)key;
-       int                  rc;
-       int num = sizeof(oti->oti_zde) / 8;
+       const char *name = (const char *)key;
+       struct luz_direntry *zde = &oti->oti_zde;
+       int num = sizeof(*zde) / 8;
+       int rc;
        ENTRY;
 
        LASSERT(parent->oo_dn);
@@ -591,10 +683,29 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                        RETURN(PTR_ERR(idc));
        }
 
+       CLASSERT(sizeof(zde->lzd_reg) == 8);
+       CLASSERT(sizeof(*zde) % 8 == 0);
+
+       memset(&zde->lzd_reg, 0, sizeof(zde->lzd_reg));
+       zde->lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
+       zde->lzd_fid = *fid;
+
        if (idc->oic_remote) {
-               /* Insert remote entry */
-               memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
-               oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
+               if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
+                       /* Create agent inode for remote object that will
+                        * be used for MDT file-level backup and restore. */
+                       rc = osd_create_agent_object(env, osd, zde,
+                                       parent->oo_dn->dn_object, oh->ot_tx);
+                       if (rc) {
+                               CWARN("%s: Fail to create agent object for "
+                                     DFID": rc = %d\n",
+                                     osd_name(osd), PFID(fid), rc);
+                               /* Ignore the failure since the system can go
+                                * ahead if we do not care about the MDT side
+                                * file-level backup and restore. */
+                               rc = 0;
+                       }
+               }
        } else {
                if (unlikely(idc->oic_dnode == 0)) {
                        /* for a reason OI cache wasn't filled properly */
@@ -621,26 +732,21 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                                GOTO(out, rc);
                        }
                }
-               CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
-               CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
-               oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
-               oti->oti_zde.lzd_reg.zde_dnode = idc->oic_dnode;
+               zde->lzd_reg.zde_dnode = idc->oic_dnode;
        }
 
-       oti->oti_zde.lzd_fid = *fid;
        if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR))
-               oti->oti_zde.lzd_fid.f_ver = ~0;
+               zde->lzd_fid.f_ver = ~0;
        if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
                num = 1;
        /* Insert (key,oid) into ZAP */
        rc = osd_zap_add(osd, parent->oo_dn->dn_object, parent->oo_dn,
-                        (char *)key, 8, num, (void *)&oti->oti_zde, oh->ot_tx);
+                        name, 8, num, (void *)zde, oh->ot_tx);
        if (unlikely(rc == -EEXIST &&
                     name[0] == '.' && name[1] == '.' && name[2] == 0))
                /* Update (key,oid) in ZAP */
-               rc = -zap_update(osd->od_os, parent->oo_dn->dn_object,
-                               (char *)key, 8, sizeof(oti->oti_zde) / 8,
-                               (void *)&oti->oti_zde, oh->ot_tx);
+               rc = -zap_update(osd->od_os, parent->oo_dn->dn_object, name, 8,
+                                sizeof(*zde) / 8, (void *)zde, oh->ot_tx);
 
 out:
 
@@ -652,28 +758,37 @@ static int osd_declare_dir_delete(const struct lu_env *env,
                                  const struct dt_key *key,
                                  struct thandle *th)
 {
-       struct osd_object  *obj = osd_dt_obj(dt);
+       struct osd_object *obj = osd_dt_obj(dt);
+       dnode_t *zap_dn = obj->oo_dn;
        struct osd_thandle *oh;
-       uint64_t            dnode;
+       const char *name = (const char *)key;
        ENTRY;
 
        LASSERT(dt_object_exists(dt));
        LASSERT(osd_invariant(obj));
+       LASSERT(zap_dn != NULL);
 
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
 
-       if (dt_object_exists(dt)) {
-               LASSERT(obj->oo_dn);
-               dnode = obj->oo_dn->dn_object;
-       } else {
-               dnode = DMU_NEW_OBJECT;
+       /*
+        * In Orion . and .. were stored in the directory (not generated upon
+        * request as now). We preserve them for backward compatibility.
+        */
+       if (name[0] == '.') {
+               if (name[1] == 0)
+                       RETURN(0);
+               else if (name[1] == '.' && name[2] == 0)
+                       RETURN(0);
        }
 
        /* do not specify the key as then DMU is trying to look it up
         * which is very expensive. usually the layers above lookup
         * before deletion */
-       osd_tx_hold_zap(oh->ot_tx, dnode, obj->oo_dn, FALSE, NULL);
+       osd_tx_hold_zap(oh->ot_tx, zap_dn->dn_object, zap_dn, FALSE, NULL);
+
+       /* For destroying agent object if have. */
+       dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
 
        RETURN(0);
 }
@@ -681,6 +796,7 @@ static int osd_declare_dir_delete(const struct lu_env *env,
 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
                          const struct dt_key *key, struct thandle *th)
 {
+       struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
        struct osd_object *obj = osd_dt_obj(dt);
        struct osd_device *osd = osd_obj2dev(obj);
        struct osd_thandle *oh;
@@ -706,12 +822,41 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
                }
        }
 
+       /* XXX: We have to say that lookup during delete_declare will affect
+        *      performance, but we have to check whether the name entry (to
+        *      be deleted) has agent object or not to avoid orphans.
+        *
+        *      We will improve that in the future, some possible solutions,
+        *      for example:
+        *      1) Some hint from the caller via transaction handle to make
+        *         the lookup conditionally.
+        *      2) Enhance the ZFS logic to recognize the OSD lookup result
+        *         and delete the given entry directly without lookup again
+        *         internally. LU-10295 */
+       memset(&zde->lzd_fid, 0, sizeof(zde->lzd_fid));
+       rc = osd_zap_lookup(osd, zap_dn->dn_object, zap_dn, name, 8, 3, zde);
+       if (unlikely(rc)) {
+               if (rc != -ENOENT)
+                       CERROR("%s: failed to locate entry  %s: rc = %d\n",
+                              osd->od_svname, name, rc);
+               RETURN(rc);
+       }
+
+       if (unlikely(osd_remote_fid(env, osd, &zde->lzd_fid) > 0)) {
+               rc = -dmu_object_free(osd->od_os, zde->lzd_reg.zde_dnode,
+                                     oh->ot_tx);
+               if (rc)
+                       CERROR("%s: failed to destroy agent object (%llu) "
+                              "for the entry %s: rc = %d\n", osd->od_svname,
+                              (__u64)zde->lzd_reg.zde_dnode, name, rc);
+       }
+
        /* Remove key from the ZAP */
        rc = osd_zap_remove(osd, zap_dn->dn_object, zap_dn,
                            (char *)key, oh->ot_tx);
-
-       if (unlikely(rc && rc != -ENOENT))
-               CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
+       if (unlikely(rc))
+               CERROR("%s: zap_remove %s failed: rc = %d\n",
+                      osd->od_svname, name, rc);
 
        RETURN(rc);
 }
@@ -933,6 +1078,45 @@ osd_dirent_update(const struct lu_env *env, struct osd_device *dev,
        RETURN(rc);
 }
 
+static int osd_update_entry_for_agent(const struct lu_env *env,
+                                     struct osd_device *osd,
+                                     uint64_t zap, const char *name,
+                                     struct luz_direntry *zde, __u32 attr)
+{
+       dmu_tx_t *tx = NULL;
+       int rc = 0;
+       ENTRY;
+
+       if (attr & LUDA_VERIFY_DRYRUN)
+               GOTO(out, rc = 0);
+
+       tx = dmu_tx_create(osd->od_os);
+       if (!tx)
+               GOTO(out, rc = -ENOMEM);
+
+       dmu_tx_hold_sa_create(tx, osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
+       dmu_tx_hold_zap(tx, zap, FALSE, NULL);
+       rc = -dmu_tx_assign(tx, TXG_WAIT);
+       if (rc) {
+               dmu_tx_abort(tx);
+               GOTO(out, rc);
+       }
+
+       rc = osd_create_agent_object(env, osd, zde, zap, tx);
+       if (!rc)
+               rc = -zap_update(osd->od_os, zap, name, 8, sizeof(*zde) / 8,
+                                (const void *)zde, tx);
+       dmu_tx_commit(tx);
+
+       GOTO(out, rc);
+
+out:
+       CDEBUG(D_LFSCK, "%s: Updated (%s) remote entry for "DFID": rc = %d\n",
+              osd_name(osd), (attr & LUDA_VERIFY_DRYRUN) ? "(ro)" : "(rw)",
+              PFID(&zde->lzd_fid), rc);
+       return rc;
+}
+
 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
                          struct dt_rec *dtrec, __u32 attr)
 {
@@ -1006,6 +1190,20 @@ static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
        if (za->za_num_integers >= 3 && fid_is_sane(&zde->lzd_fid)) {
                lde->lde_attrs = LUDA_FID;
                fid_cpu_to_le(&lde->lde_fid, &zde->lzd_fid);
+               if (unlikely(zde->lzd_reg.zde_dnode == ZFS_NO_OBJECT &&
+                            osd_remote_fid(env, osd, &zde->lzd_fid) > 0 &&
+                            attr & LUDA_VERIFY)) {
+                       /* It is mainly used for handling the MDT
+                        * upgraded from old ZFS based backend. */
+                       rc = osd_update_entry_for_agent(env, osd,
+                                       it->ozi_obj->oo_dn->dn_object,
+                                       za->za_name, zde, attr);
+                       if (!rc)
+                               lde->lde_attrs |= LUDA_REPAIR;
+                       else
+                               lde->lde_attrs |= LUDA_UNKNOWN;
+               }
+
                GOTO(pack_attr, rc = 0);
        }
 
@@ -1565,14 +1763,16 @@ static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
                        /* Lustre object */
                        lma = (struct lustre_mdt_attrs *)v;
                        lustre_lma_swab(lma);
-                       it->mit_fid = lma->lma_self_fid;
-                       nvlist_free(nvbuf);
-                       break;
-               } else {
-                       /* not a Lustre object, try next one */
-                       nvlist_free(nvbuf);
+                       if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
+                                  !(lma->lma_incompat & LMAI_AGENT))) {
+                               it->mit_fid = lma->lma_self_fid;
+                               nvlist_free(nvbuf);
+                               break;
+                       }
                }
 
+               /* not a Lustre visible object, try next one */
+               nvlist_free(nvbuf);
        } while (1);
 
 
index fbc7c99..d00bd47 100644 (file)
@@ -516,7 +516,8 @@ int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
                     dnode_t **zap_dnp, dmu_tx_t *tx, struct lu_attr *la,
                     unsigned dnsize, zap_flags_t flags);
-int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
+int __osd_object_create(const struct lu_env *env, struct osd_device *osd,
+                       struct osd_object *obj, const struct lu_fid *fid,
                        dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la);
 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
                    struct osd_object *obj, sa_handle_t *sa_hdl, dmu_tx_t *tx,
@@ -598,6 +599,21 @@ int __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
                    struct osd_thandle *oh);
 int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
                          struct osd_thandle *oh);
+
+#define OSD_BASE_EA_IN_BONUS   (ZFS_SA_BASE_ATTR_SIZE + \
+                                sizeof(__u64) /* VBR VERSION */ + \
+                                sizeof(struct lustre_mdt_attrs) /* LMA */)
+
+#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
+int osd_find_dnsize(struct osd_device *osd, int ea_in_bonus);
+#else
+static inline int
+osd_find_dnsize(struct osd_device *osd, int ea_in_bonus)
+{
+       return DN_MAX_BONUSLEN;
+}
+#endif
+
 static inline int
 osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
index d3f7363..f57bcf9 100644 (file)
@@ -1295,9 +1295,7 @@ static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
        LASSERT(oh->ot_tx != NULL);
 
        /* this is the minimum set of EAs on every Lustre object */
-       obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE +
-                               sizeof(__u64) + /* VBR VERSION */
-                               sizeof(struct lustre_mdt_attrs); /* LMA */
+       obj->oo_ea_in_bonus = OSD_BASE_EA_IN_BONUS;
        /* reserve 32 bytes for extra stuff like ACLs */
        dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32);
 
@@ -1479,15 +1477,14 @@ static int osd_find_new_dnode(const struct lu_env *env, dmu_tx_t *tx,
 }
 
 #ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
-static int osd_find_dnsize(struct osd_object *obj)
+int osd_find_dnsize(struct osd_device *osd, int ea_in_bonus)
 {
-       struct osd_device *osd = osd_obj2dev(obj);
        int dnsize;
 
        if (osd->od_dnsize == ZFS_DNSIZE_AUTO) {
                dnsize = DNODE_MIN_SIZE;
                do {
-                       if (DN_BONUS_SIZE(dnsize) >= obj->oo_ea_in_bonus + 32)
+                       if (DN_BONUS_SIZE(dnsize) >= ea_in_bonus + 32)
                                break;
                        dnsize <<= 1;
                } while (dnsize < DNODE_MAX_SIZE);
@@ -1508,11 +1505,6 @@ static int osd_find_dnsize(struct osd_object *obj)
        }
        return dnsize;
 }
-#else
-static int inline osd_find_dnsize(struct osd_object *obj)
-{
-       return DN_MAX_BONUSLEN;
-}
 #endif
 
 /*
@@ -1520,13 +1512,13 @@ static int inline osd_find_dnsize(struct osd_object *obj)
  * dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
  * to a transaction group.
  */
-int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
+int __osd_object_create(const struct lu_env *env, struct osd_device *osd,
+                       struct osd_object *obj, const struct lu_fid *fid,
                        dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la)
 {
-       struct osd_device   *osd = osd_obj2dev(obj);
-       const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
-       dmu_object_type_t    type = DMU_OT_PLAIN_FILE_CONTENTS;
+       dmu_object_type_t type = DMU_OT_PLAIN_FILE_CONTENTS;
        uint64_t oid;
+       int size;
 
        /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
         * would get an additional ditto copy */
@@ -1535,8 +1527,12 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
                type = DMU_OTN_UINT8_METADATA;
 
        /* Create a new DMU object using the default dnode size. */
+       if (obj)
+               size = obj->oo_ea_in_bonus;
+       else
+               size = OSD_BASE_EA_IN_BONUS;
        oid = osd_dmu_object_alloc(osd->od_os, type, 0,
-                                  osd_find_dnsize(obj), tx);
+                                  osd_find_dnsize(osd, size), tx);
 
        LASSERT(la->la_valid & LA_MODE);
        la->la_size = 0;
@@ -1581,6 +1577,7 @@ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
 static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
                          struct lu_attr *la, struct osd_thandle *oh)
 {
+       struct osd_device *osd = osd_obj2dev(obj);
        dnode_t *dn;
        int rc;
 
@@ -1589,8 +1586,8 @@ static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
         * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
         * binary keys */
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
-                             osd_find_dnsize(obj), ZAP_FLAG_UINT64_KEY);
+       rc = __osd_zap_create(env, osd, &dn, oh->ot_tx, la,
+               osd_find_dnsize(osd, obj->oo_ea_in_bonus), ZAP_FLAG_UINT64_KEY);
        if (rc)
                return ERR_PTR(rc);
        return dn;
@@ -1599,12 +1596,13 @@ static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
 static dnode_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
                          struct lu_attr *la, struct osd_thandle *oh)
 {
+       struct osd_device *osd = osd_obj2dev(obj);
        dnode_t *dn;
        int rc;
 
        LASSERT(S_ISDIR(la->la_mode));
-       rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
-                             osd_find_dnsize(obj), 0);
+       rc = __osd_zap_create(env, osd, &dn, oh->ot_tx, la,
+                             osd_find_dnsize(osd, obj->oo_ea_in_bonus), 0);
        if (rc)
                return ERR_PTR(rc);
        return dn;
@@ -1619,7 +1617,7 @@ static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
        int rc;
 
        LASSERT(S_ISREG(la->la_mode));
-       rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
+       rc = __osd_object_create(env, osd, obj, fid, &dn, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
 
@@ -1647,7 +1645,9 @@ static dnode_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
        int rc;
 
        LASSERT(S_ISLNK(la->la_mode));
-       rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
+       rc = __osd_object_create(env, osd_obj2dev(obj), obj,
+                                lu_object_fid(&obj->oo_dt.do_lu),
+                                &dn, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
        return dn;
@@ -1662,7 +1662,9 @@ static dnode_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
        if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
                la->la_valid |= LA_RDEV;
 
-       rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
+       rc = __osd_object_create(env, osd_obj2dev(obj), obj,
+                                lu_object_fid(&obj->oo_dt.do_lu),
+                                &dn, oh->ot_tx, la);
        if (rc)
                return ERR_PTR(rc);
        return dn;
index d6bed9a..3832028 100644 (file)
@@ -706,7 +706,9 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
 
                la->la_valid = LA_MODE;
                la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
-               rc = __osd_object_create(env, obj, &xa_data_dn, tx, la);
+               rc = __osd_object_create(env, osd, obj,
+                                        lu_object_fid(&obj->oo_dt.do_lu),
+                                        &xa_data_dn, tx, la);
                if (rc)
                        goto out;
                xa_data_obj = xa_data_dn->dn_object;
index 4e24e1c..8bdddf7 100644 (file)
@@ -5066,6 +5066,47 @@ test_33()
 }
 run_test 33 "check LFSCK paramters"
 
+test_34()
+{
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       [ $(facet_fstype $SINGLEMDS) != zfs ] &&
+               skip "Only valid for ZFS backend" && return
+
+       lfsck_prep 1 1
+
+       #define OBD_FAIL_LFSCK_NO_AGENTOBJ      0x1630
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1630
+       $LFS mkdir -i 1 $DIR/$tdir/dummy ||
+               error "(1) Fail to create $DIR/$tdir/dummy"
+
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+       $START_NAMESPACE -r || error "(2) Fail to start LFSCK for namespace!"
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(3) unexpected status"
+       }
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^dirent_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(4) Fail to repair the lost agent object: $repaired"
+
+       $START_NAMESPACE -r || error "(5) Fail to start LFSCK for namespace!"
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(6) unexpected status"
+       }
+
+       repaired=$($SHOW_NAMESPACE | awk '/^dirent_repaired/ { print $2 }')
+       [ $repaired -eq 0 ] ||
+               error "(7) Unexpected repairing: $repaired"
+}
+run_test 34 "LFSCK can rebuild the lost agent object"
+
 # restore MDS/OST size
 MDSSIZE=${SAVED_MDSSIZE}
 OSTSIZE=${SAVED_OSTSIZE}
index 91167df..ab22ed6 100755 (executable)
@@ -17522,6 +17522,53 @@ test_802() {
 }
 run_test 802 "simulate readonly device"
 
+test_803() {
+       [[ $MDSCOUNT -lt 2 ]] && skip "needs >= 2 MDTs" && return
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.10.54) ] &&
+               skip "MDS needs to be newer than 2.10.54" && return
+
+       mkdir -p $DIR/$tdir
+       # Create some objects on all MDTs to trigger related logs objects
+       for idx in $(seq $MDSCOUNT); do
+               $LFS mkdir -c $MDSCOUNT -i $((idx % $MDSCOUNT)) \
+                       $DIR/$tdir/dir${idx} ||
+                       error "Fail to create $DIR/$tdir/dir${idx}"
+       done
+
+       sync; sleep 5
+       echo "before create:"
+       $LFS df -i $MOUNT
+       local before_used=$($LFS df -i | grep MDT0000_UUID | awk '{print $3}')
+
+       for ((i=0; i<10; i++)); do
+               $LFS mkdir -c 1 -i 1 $DIR/$tdir/foo$i ||
+                       error "Fail to create $DIR/$tdir/foo$i"
+       done
+
+       sync; sleep 5
+       echo "after create:"
+       $LFS df -i $MOUNT
+       local after_used=$($LFS df -i | grep MDT0000_UUID | awk '{print $3}')
+
+       [ $after_used -ge $((before_used + 10)) ] ||
+               error "before ($before_used) + 10 > after ($after_used)"
+
+       for ((i=0; i<10; i++)); do
+               rm -rf $DIR/$tdir/foo$i ||
+                       error "Fail to remove $DIR/$tdir/foo$i"
+       done
+
+       wait_delete_completed
+       echo "after unlink:"
+       $LFS df -i $MOUNT
+       before_used=$after_used
+       after_used=$($LFS df -i | grep MDT0000_UUID | awk '{print $3}')
+
+       [ $after_used -le $((before_used - 8)) ] ||
+               error "before ($before_used) - 8 < after ($after_used)"
+}
+run_test 803 "verify agent object for remote object"
+
 #
 # tests that do cleanup/setup should be run at the end
 #