return rc;
}
+int osd_add_to_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct luz_direntry *zde = &info->oti_zde;
+ char *name = info->oti_str;
+ const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+ struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
+ struct lu_buf buf = {
+ .lb_buf = lma,
+ .lb_len = sizeof(info->oti_buf),
+ };
+ int size = 0;
+ int rc;
+ ENTRY;
+
+ rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
+ if (rc) {
+ CWARN("%s: fail to load LMA for adding "
+ DFID" to remote parent: rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ RETURN(rc);
+ }
+
+ lustre_lma_swab(lma);
+ lma->lma_incompat |= LMAI_REMOTE_PARENT;
+ lustre_lma_swab(lma);
+ buf.lb_len = size;
+ rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
+ LU_XATTR_REPLACE, oh);
+ if (rc) {
+ CWARN("%s: fail to update LMA for adding "
+ DFID" to remote parent: rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ RETURN(rc);
+ }
+
+ osd_fid2str(name, fid, sizeof(info->oti_str));
+ zde->lzd_reg.zde_dnode = obj->oo_dn->dn_object;
+ zde->lzd_reg.zde_type = IFTODT(S_IFDIR);
+ zde->lzd_fid = *fid;
+
+ rc = osd_zap_add(osd, osd->od_remote_parent_dir, NULL,
+ name, 8, sizeof(*zde) / 8, zde, oh->ot_tx);
+ if (unlikely(rc == -EEXIST))
+ rc = 0;
+ if (rc)
+ CWARN("%s: fail to add name entry for "
+ DFID" to remote parent: rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ else
+ lu_object_set_agent_entry(&obj->oo_dt.do_lu);
+
+ RETURN(rc);
+}
+
+int osd_delete_from_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh, bool destroy)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ char *name = info->oti_str;
+ const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+ struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
+ struct lu_buf buf = {
+ .lb_buf = lma,
+ .lb_len = sizeof(info->oti_buf),
+ };
+ int size = 0;
+ int rc;
+ ENTRY;
+
+ osd_fid2str(name, fid, sizeof(info->oti_str));
+ rc = osd_zap_remove(osd, osd->od_remote_parent_dir, NULL,
+ name, oh->ot_tx);
+ if (unlikely(rc == -ENOENT))
+ rc = 0;
+ if (rc)
+ CERROR("%s: fail to remove entry under remote "
+ "parent for "DFID": rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+
+ if (destroy || rc)
+ RETURN(rc);
+
+ rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
+ if (rc) {
+ CERROR("%s: fail to load LMA for removing "
+ DFID" from remote parent: rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ RETURN(rc);
+ }
+
+ lustre_lma_swab(lma);
+ lma->lma_incompat &= ~LMAI_REMOTE_PARENT;
+ lustre_lma_swab(lma);
+ buf.lb_len = size;
+ rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
+ LU_XATTR_REPLACE, oh);
+ if (rc)
+ CERROR("%s: fail to update LMA for removing "
+ DFID" from remote parent: rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ else
+ lu_object_clear_agent_entry(&obj->oo_dt.do_lu);
+
+ RETURN(rc);
+}
+
static int osd_declare_dir_insert(const struct lu_env *env,
struct dt_object *dt,
const struct dt_rec *rec,
if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR))
zde->lzd_fid.f_ver = ~0;
+
+ /* The logic is not related with IGIF, just re-use the fail_loc value
+ * to be consistent with ldiskfs case, then share the same test logic */
if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
num = 1;
+
/* Insert (key,oid) into ZAP */
rc = osd_zap_add(osd, parent->oo_dn->dn_object, parent->oo_dn,
name, 8, num, (void *)zde, oh->ot_tx);
* the lookup conditionally.
* 2) Enhance the ZFS logic to recognize the OSD lookup result
* and delete the given entry directly without lookup again
- * internally. LU-10295 */
+ * internally. LU-10190 */
memset(&zde->lzd_fid, 0, sizeof(zde->lzd_fid));
rc = osd_zap_lookup(osd, zap_dn->dn_object, zap_dn, name, 8, 3, zde);
if (unlikely(rc)) {
ot_assigned:1;
};
-#define OSD_OI_NAME_SIZE 16
+#define OSD_OI_NAME_SIZE 24
/*
* Object Index (OI) instance.
*/
struct osd_oi {
- char oi_name[OSD_OI_NAME_SIZE]; /* unused */
+ char oi_name[OSD_OI_NAME_SIZE];
uint64_t oi_zapid;
dnode_t *oi_dn;
};
struct proc_dir_entry *od_proc_entry;
struct lprocfs_stats *od_stats;
+ uint64_t od_remote_parent_dir;
uint64_t od_max_blksz;
uint64_t od_root;
uint64_t od_O_id;
uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
const struct lu_fid *fid);
+int osd_add_to_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh);
+int osd_delete_from_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh, bool destroy);
/* osd_xattr.c */
int __osd_sa_xattr_schedule_update(const struct lu_env *env,
int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
uint64_t xattr, struct lu_buf *buf,
const char *name, int *sizep);
+int osd_xattr_get_internal(const struct lu_env *env, struct osd_object *obj,
+ struct lu_buf *buf, const char *name, int *sizep);
int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
struct lu_buf *buf, const char *name);
int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
}
#endif
+/* XXX: f_ver is not counted, but may differ too */
+static inline void osd_fid2str(char *buf, const struct lu_fid *fid, int len)
+{
+ snprintf(buf, len, DFID_NOBRACE, PFID(fid));
+}
+
static inline int
osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj,
const struct lu_buf *buf, const char *name, int fl,
PFID(lu_object_fid(&obj->oo_dt.do_lu)));
rc = -EOPNOTSUPP;
} else {
+ struct osd_device *osd = osd_obj2dev(obj);
+
if (lma->lma_compat & LMAC_STRIPE_INFO &&
- osd_obj2dev(obj)->od_is_ost)
+ osd->od_is_ost)
obj->oo_pfid_in_lma = 1;
+ if (unlikely(lma->lma_incompat & LMAI_REMOTE_PARENT) &&
+ osd->od_remote_parent_dir != ZFS_NO_OBJECT)
+ lu_object_set_agent_entry(&obj->oo_dt.do_lu);
}
} else if (rc == -ENODATA) {
/* haven't initialize LMA xattr */
osd_tx_hold_zap(oh->ot_tx, osd->od_unlinked->dn_object,
osd->od_unlinked, TRUE, NULL);
+ /* remove agent entry (if have) from remote parent */
+ if (lu_object_has_agent_entry(&obj->oo_dt.do_lu))
+ osd_tx_hold_zap(oh->ot_tx, osd->od_remote_parent_dir,
+ NULL, FALSE, NULL);
+
/* will help to find FID->ino when this object is being
* added to PENDING/ */
osd_idc_find_and_init(env, osd, obj);
GOTO(out, rc);
}
+ if (lu_object_has_agent_entry(&obj->oo_dt.do_lu)) {
+ rc = osd_delete_from_remote_parent(env, osd, obj, oh, true);
+ if (rc)
+ GOTO(out, rc);
+ }
+
oid = obj->oo_dn->dn_object;
if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
/* this may happen if the destroy wasn't declared
return osd_seq->os_compat_dirs[b];
}
-/* XXX: f_ver is not counted, but may differ too */
-static void osd_fid2str(char *buf, const struct lu_fid *fid)
-{
- sprintf(buf, DFID_NOBRACE, PFID(fid));
-}
-
/*
* Determine the zap object id which is being used as the OI for the
* given fid. The lowest N bits in the sequence ID are used as the
*/
static uint64_t
osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
- char *buf, dnode_t **zdn)
+ char *buf, dnode_t **zdn, int bufsize)
{
struct osd_oi *oi;
LASSERT(osd->od_oi_table != NULL);
oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
if (buf)
- osd_fid2str(buf, fid);
+ osd_fid2str(buf, fid, bufsize);
if (zdn)
*zdn = oi->oi_dn;
if (buf)
strncpy(buf, name, bufsize);
} else {
- zapid = osd_get_idx_for_fid(osd, fid, buf, NULL);
+ zapid = osd_get_idx_for_fid(osd, fid, buf, NULL,
+ bufsize);
}
} else {
- zapid = osd_get_idx_for_fid(osd, fid, buf, zdn);
+ zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize);
}
return zapid;
if (buf)
strncpy(buf, name, bufsize);
} else {
- zapid = osd_get_idx_for_fid(osd, fid, buf, NULL);
+ zapid = osd_get_idx_for_fid(osd, fid, buf, NULL,
+ bufsize);
}
} else {
- zapid = osd_get_idx_for_fid(osd, fid, buf, zdn);
+ zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize);
}
return zapid;
8, 1, &info->oti_zde);
} else if (fid_is_objseq(fid) || fid_is_batchid(fid)) {
zapid = osd_get_idx_for_fid(dev, fid,
- buf, NULL);
+ buf, NULL, sizeof(info->oti_buf));
rc = osd_zap_lookup(dev, zapid, zdn, buf,
8, 1, &info->oti_zde);
}
RETURN(rc);
}
+static void
+osd_oi_init_remote_parent(const struct lu_env *env, struct osd_device *o)
+{
+ uint64_t sdb;
+ int rc;
+ ENTRY;
+
+ if (o->od_is_ost) {
+ o->od_remote_parent_dir = ZFS_NO_OBJECT;
+ } else {
+ /* Remote parent only used for cross-MDT objects,
+ * it is usless for single MDT case or under read
+ * only mode. So ignore the failure. */
+ rc = osd_oi_find_or_create(env, o, o->od_root,
+ REMOTE_PARENT_DIR, &sdb);
+ if (!rc)
+ o->od_remote_parent_dir = sdb;
+ else
+ o->od_remote_parent_dir = ZFS_NO_OBJECT;
+ }
+}
+
/**
* Initialize the OIs by either opening or creating them as needed.
*/
int i, rc, count = 0;
ENTRY;
+ osd_oi_init_remote_parent(env, o);
+
rc = osd_oi_probe(env, o, &count);
if (rc)
RETURN(rc);
#include <obd_class.h>
#include <lustre_disk.h>
#include <lustre_fid.h>
+#include <lustre_linkea.h>
#include "osd_internal.h"
int vallen, const char *name,
struct osd_thandle *oh)
{
+ struct osd_device *osd = osd_obj2dev(obj);
dmu_tx_t *tx = oh->ot_tx;
int bonuslen;
if (unlikely(obj->oo_destroyed))
return;
+ if (strcmp(name, XATTR_NAME_LINK) == 0 &&
+ osd->od_remote_parent_dir != ZFS_NO_OBJECT) {
+ /* If some name entry resides on remote MDT, then will create
+ * agent entry under remote parent. On the other hand, if the
+ * remote entry will be removed, then related agent entry may
+ * need to be removed from the remote parent. So there may be
+ * kinds of cases, let's declare enough credits. The credits
+ * for create agent entry is enough for remove case. */
+ osd_tx_hold_zap(tx, osd->od_remote_parent_dir,
+ NULL, TRUE, NULL);
+ }
+
if (unlikely(!osd_obj2dev(obj)->od_xattr_in_sa)) {
__osd_xattr_declare_legacy(env, obj, vallen, name, oh);
return;
RETURN(rc);
}
+/*
+ * In DNE environment, the object (in spite of regular file or directory)
+ * and its name entry may reside on different MDTs. Under such case, we will
+ * create an agent entry on the MDT where the object resides. The agent entry
+ * references the object locally, that makes the object to be visible to the
+ * userspace when mounted as 'zfs' directly. Then the userspace tools, such
+ * as 'tar' can handle the object properly.
+ *
+ * We handle the agent entry during set linkEA that is the common interface
+ * for both regular file and directroy, can handle kinds of cases, such as
+ * create/link/unlink/rename, and so on.
+ *
+ * NOTE: we need to do that for both directory and regular file, so we can NOT
+ * do that when ea_{insert,delete} that are directory based operations.
+ */
+static int osd_xattr_handle_linkea(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ const struct lu_buf *buf,
+ struct osd_thandle *oh)
+{
+ const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+ struct lu_fid *tfid = &osd_oti_get(env)->oti_fid;
+ struct linkea_data ldata = { .ld_buf = (struct lu_buf *)buf };
+ struct lu_name tmpname;
+ int rc;
+ bool remote = false;
+ ENTRY;
+
+ rc = linkea_init_with_rec(&ldata);
+ if (!rc) {
+ linkea_first_entry(&ldata);
+ while (ldata.ld_lee != NULL && !remote) {
+ linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+ &tmpname, tfid);
+ if (osd_remote_fid(env, osd, tfid) > 0)
+ remote = true;
+ else
+ linkea_next_entry(&ldata);
+ }
+ } else if (rc == -ENODATA) {
+ rc = 0;
+ } else {
+ RETURN(rc);
+ }
+
+ if (lu_object_has_agent_entry(&obj->oo_dt.do_lu) && !remote) {
+ rc = osd_delete_from_remote_parent(env, osd, obj, oh, false);
+ if (rc)
+ CERROR("%s: failed to remove agent entry for "DFID
+ ": rc = %d\n", osd_name(osd), PFID(fid), rc);
+ } else if (!lu_object_has_agent_entry(&obj->oo_dt.do_lu) && remote) {
+ rc = osd_add_to_remote_parent(env, osd, obj, oh);
+ if (rc)
+ CWARN("%s: failed to create agent entry for "DFID
+ ": rc = %d\n", osd_name(osd), PFID(fid), rc);
+ }
+
+ RETURN(rc);
+}
+
int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, const char *name, int fl,
struct thandle *handle)
{
- struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
int rc = 0;
ENTRY;
rc = osd_xattr_split_pfid(env, obj, oh);
if (!rc)
fl = LU_XATTR_CREATE;
+ } else if (strcmp(name, XATTR_NAME_LINK) == 0 &&
+ osd->od_remote_parent_dir != ZFS_NO_OBJECT) {
+ rc = osd_xattr_handle_linkea(env, osd, obj, buf, oh);
}
if (!rc)