RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
}
+/*
+ * In DNE environment, the object and its name entry may reside on different
+ * MDTs. Under such case, we will create an agent object on the MDT where the
+ * name entry resides. The agent object is empty, and indicates that the real
+ * object for the name entry resides on another MDT. If without agent object,
+ * related name entry will be skipped when perform MDT side file level backup
+ * and restore via ZPL by userspace tool, such as 'tar'.
+ */
+static int osd_create_agent_object(const struct lu_env *env,
+ struct osd_device *osd,
+ struct luz_direntry *zde,
+ uint64_t parent, dmu_tx_t *tx)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
+ struct lu_attr *la = &info->oti_la;
+ nvlist_t *nvbuf = NULL;
+ dnode_t *dn = NULL;
+ sa_handle_t *hdl;
+ int rc = 0;
+ ENTRY;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AGENTOBJ))
+ RETURN(0);
+
+ rc = -nvlist_alloc(&nvbuf, NV_UNIQUE_NAME, KM_SLEEP);
+ if (rc)
+ RETURN(rc);
+
+ lustre_lma_init(lma, &zde->lzd_fid, 0, LMAI_AGENT);
+ lustre_lma_swab(lma);
+ rc = -nvlist_add_byte_array(nvbuf, XATTR_NAME_LMA, (uchar_t *)lma,
+ sizeof(*lma));
+ if (rc)
+ GOTO(out, rc);
+
+ la->la_valid = LA_TYPE | LA_MODE;
+ la->la_mode = (DTTOIF(zde->lzd_reg.zde_type) & S_IFMT) |
+ S_IRUGO | S_IWUSR | S_IXUGO;
+
+ if (S_ISDIR(la->la_mode))
+ rc = __osd_zap_create(env, osd, &dn, tx, la,
+ osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS), 0);
+ else
+ rc = __osd_object_create(env, osd, NULL, &zde->lzd_fid,
+ &dn, tx, la);
+ if (rc)
+ GOTO(out, rc);
+
+ zde->lzd_reg.zde_dnode = dn->dn_object;
+ rc = -sa_handle_get(osd->od_os, dn->dn_object, NULL,
+ SA_HDL_PRIVATE, &hdl);
+ if (!rc) {
+ rc = __osd_attr_init(env, osd, NULL, hdl, tx,
+ la, parent, nvbuf);
+ sa_handle_destroy(hdl);
+ }
+
+ GOTO(out, rc);
+
+out:
+ if (dn) {
+ if (rc)
+ dmu_object_free(osd->od_os, dn->dn_object, tx);
+ osd_dnode_rele(dn);
+ }
+
+ if (nvbuf)
+ nvlist_free(nvbuf);
+
+ return rc;
+}
+
static int osd_declare_dir_insert(const struct lu_env *env,
struct dt_object *dt,
const struct dt_rec *rec,
const struct lu_fid *fid;
struct osd_thandle *oh;
uint64_t object;
+ struct osd_idmap_cache *idc;
ENTRY;
rec1 = (struct dt_insert_rec *)rec;
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
+ idc = osd_idc_find_or_init(env, osd, fid);
+ if (IS_ERR(idc))
+ RETURN(PTR_ERR(idc));
+
+ if (idc->oic_remote) {
+ const char *name = (const char *)key;
+
+ if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
+ /* Prepare agent object for remote entry that will
+ * be used for operations via ZPL, such as MDT side
+ * file-level backup and restore. */
+ dmu_tx_hold_sa_create(oh->ot_tx,
+ osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
+ if (S_ISDIR(rec1->rec_type))
+ dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT,
+ FALSE, NULL);
+ }
+ }
+
/* This is for inserting dot/dotdot for new created dir. */
if (obj->oo_dn == NULL)
object = DMU_NEW_OBJECT;
* before insertion */
osd_tx_hold_zap(oh->ot_tx, object, obj->oo_dn, TRUE, NULL);
- osd_idc_find_or_init(env, osd, fid);
-
RETURN(0);
}
const struct lu_fid *fid = rec1->rec_fid;
struct osd_thandle *oh;
struct osd_idmap_cache *idc;
- char *name = (char *)key;
- int rc;
- int num = sizeof(oti->oti_zde) / 8;
+ const char *name = (const char *)key;
+ struct luz_direntry *zde = &oti->oti_zde;
+ int num = sizeof(*zde) / 8;
+ int rc;
ENTRY;
LASSERT(parent->oo_dn);
RETURN(PTR_ERR(idc));
}
+ CLASSERT(sizeof(zde->lzd_reg) == 8);
+ CLASSERT(sizeof(*zde) % 8 == 0);
+
+ memset(&zde->lzd_reg, 0, sizeof(zde->lzd_reg));
+ zde->lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
+ zde->lzd_fid = *fid;
+
if (idc->oic_remote) {
- /* Insert remote entry */
- memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
- oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
+ if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
+ /* Create agent inode for remote object that will
+ * be used for MDT file-level backup and restore. */
+ rc = osd_create_agent_object(env, osd, zde,
+ parent->oo_dn->dn_object, oh->ot_tx);
+ if (rc) {
+ CWARN("%s: Fail to create agent object for "
+ DFID": rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ /* Ignore the failure since the system can go
+ * ahead if we do not care about the MDT side
+ * file-level backup and restore. */
+ rc = 0;
+ }
+ }
} else {
if (unlikely(idc->oic_dnode == 0)) {
/* for a reason OI cache wasn't filled properly */
GOTO(out, rc);
}
}
- CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
- CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
- oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
- oti->oti_zde.lzd_reg.zde_dnode = idc->oic_dnode;
+ zde->lzd_reg.zde_dnode = idc->oic_dnode;
}
- oti->oti_zde.lzd_fid = *fid;
if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR))
- oti->oti_zde.lzd_fid.f_ver = ~0;
+ zde->lzd_fid.f_ver = ~0;
if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
num = 1;
/* Insert (key,oid) into ZAP */
rc = osd_zap_add(osd, parent->oo_dn->dn_object, parent->oo_dn,
- (char *)key, 8, num, (void *)&oti->oti_zde, oh->ot_tx);
+ name, 8, num, (void *)zde, oh->ot_tx);
if (unlikely(rc == -EEXIST &&
name[0] == '.' && name[1] == '.' && name[2] == 0))
/* Update (key,oid) in ZAP */
- rc = -zap_update(osd->od_os, parent->oo_dn->dn_object,
- (char *)key, 8, sizeof(oti->oti_zde) / 8,
- (void *)&oti->oti_zde, oh->ot_tx);
+ rc = -zap_update(osd->od_os, parent->oo_dn->dn_object, name, 8,
+ sizeof(*zde) / 8, (void *)zde, oh->ot_tx);
out:
const struct dt_key *key,
struct thandle *th)
{
- struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_object *obj = osd_dt_obj(dt);
+ dnode_t *zap_dn = obj->oo_dn;
struct osd_thandle *oh;
- uint64_t dnode;
+ const char *name = (const char *)key;
ENTRY;
LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
+ LASSERT(zap_dn != NULL);
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
- if (dt_object_exists(dt)) {
- LASSERT(obj->oo_dn);
- dnode = obj->oo_dn->dn_object;
- } else {
- dnode = DMU_NEW_OBJECT;
+ /*
+ * In Orion . and .. were stored in the directory (not generated upon
+ * request as now). We preserve them for backward compatibility.
+ */
+ if (name[0] == '.') {
+ if (name[1] == 0)
+ RETURN(0);
+ else if (name[1] == '.' && name[2] == 0)
+ RETURN(0);
}
/* do not specify the key as then DMU is trying to look it up
* which is very expensive. usually the layers above lookup
* before deletion */
- osd_tx_hold_zap(oh->ot_tx, dnode, obj->oo_dn, FALSE, NULL);
+ osd_tx_hold_zap(oh->ot_tx, zap_dn->dn_object, zap_dn, FALSE, NULL);
+
+ /* For destroying agent object if have. */
+ dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
RETURN(0);
}
static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
const struct dt_key *key, struct thandle *th)
{
+ struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
}
}
+ /* XXX: We have to say that lookup during delete_declare will affect
+ * performance, but we have to check whether the name entry (to
+ * be deleted) has agent object or not to avoid orphans.
+ *
+ * We will improve that in the future, some possible solutions,
+ * for example:
+ * 1) Some hint from the caller via transaction handle to make
+ * the lookup conditionally.
+ * 2) Enhance the ZFS logic to recognize the OSD lookup result
+ * and delete the given entry directly without lookup again
+ * internally. LU-10295 */
+ memset(&zde->lzd_fid, 0, sizeof(zde->lzd_fid));
+ rc = osd_zap_lookup(osd, zap_dn->dn_object, zap_dn, name, 8, 3, zde);
+ if (unlikely(rc)) {
+ if (rc != -ENOENT)
+ CERROR("%s: failed to locate entry %s: rc = %d\n",
+ osd->od_svname, name, rc);
+ RETURN(rc);
+ }
+
+ if (unlikely(osd_remote_fid(env, osd, &zde->lzd_fid) > 0)) {
+ rc = -dmu_object_free(osd->od_os, zde->lzd_reg.zde_dnode,
+ oh->ot_tx);
+ if (rc)
+ CERROR("%s: failed to destroy agent object (%llu) "
+ "for the entry %s: rc = %d\n", osd->od_svname,
+ (__u64)zde->lzd_reg.zde_dnode, name, rc);
+ }
+
/* Remove key from the ZAP */
rc = osd_zap_remove(osd, zap_dn->dn_object, zap_dn,
(char *)key, oh->ot_tx);
-
- if (unlikely(rc && rc != -ENOENT))
- CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
+ if (unlikely(rc))
+ CERROR("%s: zap_remove %s failed: rc = %d\n",
+ osd->od_svname, name, rc);
RETURN(rc);
}
RETURN(rc);
}
+static int osd_update_entry_for_agent(const struct lu_env *env,
+ struct osd_device *osd,
+ uint64_t zap, const char *name,
+ struct luz_direntry *zde, __u32 attr)
+{
+ dmu_tx_t *tx = NULL;
+ int rc = 0;
+ ENTRY;
+
+ if (attr & LUDA_VERIFY_DRYRUN)
+ GOTO(out, rc = 0);
+
+ tx = dmu_tx_create(osd->od_os);
+ if (!tx)
+ GOTO(out, rc = -ENOMEM);
+
+ dmu_tx_hold_sa_create(tx, osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
+ dmu_tx_hold_zap(tx, zap, FALSE, NULL);
+ rc = -dmu_tx_assign(tx, TXG_WAIT);
+ if (rc) {
+ dmu_tx_abort(tx);
+ GOTO(out, rc);
+ }
+
+ rc = osd_create_agent_object(env, osd, zde, zap, tx);
+ if (!rc)
+ rc = -zap_update(osd->od_os, zap, name, 8, sizeof(*zde) / 8,
+ (const void *)zde, tx);
+ dmu_tx_commit(tx);
+
+ GOTO(out, rc);
+
+out:
+ CDEBUG(D_LFSCK, "%s: Updated (%s) remote entry for "DFID": rc = %d\n",
+ osd_name(osd), (attr & LUDA_VERIFY_DRYRUN) ? "(ro)" : "(rw)",
+ PFID(&zde->lzd_fid), rc);
+ return rc;
+}
+
static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
struct dt_rec *dtrec, __u32 attr)
{
if (za->za_num_integers >= 3 && fid_is_sane(&zde->lzd_fid)) {
lde->lde_attrs = LUDA_FID;
fid_cpu_to_le(&lde->lde_fid, &zde->lzd_fid);
+ if (unlikely(zde->lzd_reg.zde_dnode == ZFS_NO_OBJECT &&
+ osd_remote_fid(env, osd, &zde->lzd_fid) > 0 &&
+ attr & LUDA_VERIFY)) {
+ /* It is mainly used for handling the MDT
+ * upgraded from old ZFS based backend. */
+ rc = osd_update_entry_for_agent(env, osd,
+ it->ozi_obj->oo_dn->dn_object,
+ za->za_name, zde, attr);
+ if (!rc)
+ lde->lde_attrs |= LUDA_REPAIR;
+ else
+ lde->lde_attrs |= LUDA_UNKNOWN;
+ }
+
GOTO(pack_attr, rc = 0);
}
/* Lustre object */
lma = (struct lustre_mdt_attrs *)v;
lustre_lma_swab(lma);
- it->mit_fid = lma->lma_self_fid;
- nvlist_free(nvbuf);
- break;
- } else {
- /* not a Lustre object, try next one */
- nvlist_free(nvbuf);
+ if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
+ !(lma->lma_incompat & LMAI_AGENT))) {
+ it->mit_fid = lma->lma_self_fid;
+ nvlist_free(nvbuf);
+ break;
+ }
}
+ /* not a Lustre visible object, try next one */
+ nvlist_free(nvbuf);
} while (1);
LASSERT(oh->ot_tx != NULL);
/* this is the minimum set of EAs on every Lustre object */
- obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE +
- sizeof(__u64) + /* VBR VERSION */
- sizeof(struct lustre_mdt_attrs); /* LMA */
+ obj->oo_ea_in_bonus = OSD_BASE_EA_IN_BONUS;
/* reserve 32 bytes for extra stuff like ACLs */
dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32);
}
#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
-static int osd_find_dnsize(struct osd_object *obj)
+int osd_find_dnsize(struct osd_device *osd, int ea_in_bonus)
{
- struct osd_device *osd = osd_obj2dev(obj);
int dnsize;
if (osd->od_dnsize == ZFS_DNSIZE_AUTO) {
dnsize = DNODE_MIN_SIZE;
do {
- if (DN_BONUS_SIZE(dnsize) >= obj->oo_ea_in_bonus + 32)
+ if (DN_BONUS_SIZE(dnsize) >= ea_in_bonus + 32)
break;
dnsize <<= 1;
} while (dnsize < DNODE_MAX_SIZE);
}
return dnsize;
}
-#else
-static int inline osd_find_dnsize(struct osd_object *obj)
-{
- return DN_MAX_BONUSLEN;
-}
#endif
/*
* dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
* to a transaction group.
*/
-int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
+int __osd_object_create(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, const struct lu_fid *fid,
dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la)
{
- struct osd_device *osd = osd_obj2dev(obj);
- const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
- dmu_object_type_t type = DMU_OT_PLAIN_FILE_CONTENTS;
+ dmu_object_type_t type = DMU_OT_PLAIN_FILE_CONTENTS;
uint64_t oid;
+ int size;
/* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
* would get an additional ditto copy */
type = DMU_OTN_UINT8_METADATA;
/* Create a new DMU object using the default dnode size. */
+ if (obj)
+ size = obj->oo_ea_in_bonus;
+ else
+ size = OSD_BASE_EA_IN_BONUS;
oid = osd_dmu_object_alloc(osd->od_os, type, 0,
- osd_find_dnsize(obj), tx);
+ osd_find_dnsize(osd, size), tx);
LASSERT(la->la_valid & LA_MODE);
la->la_size = 0;
static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
struct lu_attr *la, struct osd_thandle *oh)
{
+ struct osd_device *osd = osd_obj2dev(obj);
dnode_t *dn;
int rc;
* We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
* binary keys */
LASSERT(S_ISREG(la->la_mode));
- rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
- osd_find_dnsize(obj), ZAP_FLAG_UINT64_KEY);
+ rc = __osd_zap_create(env, osd, &dn, oh->ot_tx, la,
+ osd_find_dnsize(osd, obj->oo_ea_in_bonus), ZAP_FLAG_UINT64_KEY);
if (rc)
return ERR_PTR(rc);
return dn;
static dnode_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
struct lu_attr *la, struct osd_thandle *oh)
{
+ struct osd_device *osd = osd_obj2dev(obj);
dnode_t *dn;
int rc;
LASSERT(S_ISDIR(la->la_mode));
- rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
- osd_find_dnsize(obj), 0);
+ rc = __osd_zap_create(env, osd, &dn, oh->ot_tx, la,
+ osd_find_dnsize(osd, obj->oo_ea_in_bonus), 0);
if (rc)
return ERR_PTR(rc);
return dn;
int rc;
LASSERT(S_ISREG(la->la_mode));
- rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
+ rc = __osd_object_create(env, osd, obj, fid, &dn, oh->ot_tx, la);
if (rc)
return ERR_PTR(rc);
int rc;
LASSERT(S_ISLNK(la->la_mode));
- rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
+ rc = __osd_object_create(env, osd_obj2dev(obj), obj,
+ lu_object_fid(&obj->oo_dt.do_lu),
+ &dn, oh->ot_tx, la);
if (rc)
return ERR_PTR(rc);
return dn;
if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
la->la_valid |= LA_RDEV;
- rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
+ rc = __osd_object_create(env, osd_obj2dev(obj), obj,
+ lu_object_fid(&obj->oo_dt.do_lu),
+ &dn, oh->ot_tx, la);
if (rc)
return ERR_PTR(rc);
return dn;