+static int osd_xattr_set_pfid(const struct lu_env *env, struct osd_object *obj,
+ const struct lu_buf *buf, int fl,
+ struct thandle *handle)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct dentry *dentry = &info->oti_obj_dentry;
+ struct lustre_ost_attrs *loa = &info->oti_ost_attrs;
+ struct lustre_mdt_attrs *lma = &loa->loa_lma;
+ struct inode *inode = obj->oo_inode;
+ struct filter_fid *ff = buf->lb_buf;
+ struct ost_layout *ol = &ff->ff_layout;
+ int flags = XATTR_REPLACE;
+ int rc;
+
+ ENTRY;
+
+ if (buf->lb_len != sizeof(*ff) && buf->lb_len != sizeof(struct lu_fid))
+ RETURN(-EINVAL);
+
+ rc = osd_get_lma(info, inode, dentry, loa);
+ if (rc == -ENODATA) {
+ /* Usually for upgarding from old device */
+ lustre_loa_init(loa, lu_object_fid(&obj->oo_dt.do_lu),
+ LMAC_FID_ON_OST, 0);
+ flags = XATTR_CREATE;
+ } else if (rc) {
+ RETURN(rc);
+ }
+
+ if (!rc && lma->lma_compat & LMAC_STRIPE_INFO) {
+ if ((fl & LU_XATTR_CREATE) && !(fl & LU_XATTR_REPLACE))
+ RETURN(-EEXIST);
+
+ if (LDISKFS_INODE_SIZE(inode->i_sb) > 256) {
+ /* Separate PFID EA from LMA */
+ lma->lma_compat &= ~(LMAC_STRIPE_INFO | LMAC_COMP_INFO);
+ lustre_lma_swab(lma);
+ rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma,
+ sizeof(*lma), XATTR_REPLACE);
+ if (!rc) {
+ obj->oo_pfid_in_lma = 0;
+ rc = LU_XATTR_CREATE;
+ }
+
+ RETURN(rc);
+ }
+ } else {
+ if (LDISKFS_INODE_SIZE(inode->i_sb) > 256)
+ RETURN(fl);
+
+ /*
+ * Old client does not send stripe information,
+ * then store the PFID EA on disk separatedly.
+ */
+ if (unlikely(buf->lb_len == sizeof(struct lu_fid) ||
+ ol->ol_stripe_size == 0))
+ RETURN(fl);
+
+ /* Remove old PFID EA entry firstly. */
+ dquot_initialize(inode);
+ rc = osd_removexattr(dentry, inode, XATTR_NAME_FID);
+ if (rc == -ENODATA) {
+ if ((fl & LU_XATTR_REPLACE) && !(fl & LU_XATTR_CREATE))
+ RETURN(rc);
+ } else if (rc) {
+ RETURN(rc);
+ }
+ }
+
+ fid_le_to_cpu(&loa->loa_parent_fid, &ff->ff_parent);
+ if (likely(ol->ol_stripe_size != 0)) {
+ loa->loa_parent_fid.f_ver |= le32_to_cpu(ol->ol_stripe_count) <<
+ PFID_STRIPE_IDX_BITS;
+ loa->loa_stripe_size = le32_to_cpu(ol->ol_stripe_size);
+ lma->lma_compat |= LMAC_STRIPE_INFO;
+ if (ol->ol_comp_id != 0) {
+ loa->loa_comp_id = le32_to_cpu(ol->ol_comp_id);
+ loa->loa_comp_start = le64_to_cpu(ol->ol_comp_start);
+ loa->loa_comp_end = le64_to_cpu(ol->ol_comp_end);
+ lma->lma_compat |= LMAC_COMP_INFO;
+ }
+ }
+
+ lustre_loa_swab(loa, false);
+
+ /* Store the PFID EA inside LMA. */
+ rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, loa, sizeof(*loa),
+ flags);
+ if (!rc)
+ obj->oo_pfid_in_lma = 1;
+
+ RETURN(rc);
+}
+
+/*
+ * In DNE environment, the object (in spite of regular file or directory)
+ * and its name entry may reside on different MDTs. Under such case, we will
+ * create an agent entry on the MDT where the object resides. The agent entry
+ * references the object locally, that makes the object to be visible to the
+ * userspace when mounted as 'ldiskfs' directly. Then the userspace tools,
+ * such as 'tar' can handle the object properly.
+ *
+ * We handle the agent entry during set linkEA that is the common interface
+ * for both regular file and directroy, can handle kinds of cases, such as
+ * create/link/unlink/rename, and so on.
+ *
+ * NOTE: we can NOT do that when ea_{insert,delete} that is only for directory.
+ *
+ * XXX: There are two known issues:
+ * 1. For one object, we will create at most one agent entry even if there
+ * may be more than one cross-MDTs hard links on the object. So the local
+ * e2fsck may claim that the object's nlink is larger than the name entries
+ * that reference such inode. And in further, the e2fsck will fix the nlink
+ * attribute to match the local references. Then it will cause the object's
+ * nlink attribute to be inconsistent with the global references. it is bad
+ * but not fatal. The ref_del() can handle the zero-referenced case. On the
+ * other hand, the global namespace LFSCK can repair the object's attribute
+ * according to the linkEA.
+ * 2. There may be too many hard links on the object as to its linkEA overflow,
+ * then the linkEA entry for cross-MDTs reference may be discarded. If such
+ * case happened, then at this point, we do not know whether there are some
+ * cross-MDTs reference. But there are local references, it guarantees that
+ * object is visible to userspace when mounted as 'ldiskfs'. That is enough.
+ */
+static int osd_xattr_handle_linkea(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ const struct lu_buf *buf,
+ struct thandle *handle)
+{
+ const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+ struct lu_fid *tfid = &osd_oti_get(env)->oti_fid3;
+ struct linkea_data ldata = { .ld_buf = (struct lu_buf *)buf };
+ struct lu_name tmpname;
+ struct osd_thandle *oh;
+ int rc;
+ bool remote = false;
+
+ ENTRY;
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle != NULL);
+
+ rc = linkea_init_with_rec(&ldata);
+ if (!rc) {
+ linkea_first_entry(&ldata);
+ while (ldata.ld_lee != NULL && !remote) {
+ linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+ &tmpname, tfid);
+ if (osd_remote_fid(env, osd, tfid) > 0)
+ remote = true;
+ else
+ linkea_next_entry(&ldata);
+ }
+ } else if (rc == -ENODATA) {
+ rc = 0;
+ } else {
+ RETURN(rc);
+ }
+
+ if (lu_object_has_agent_entry(&obj->oo_dt.do_lu) && !remote) {
+ rc = osd_delete_from_remote_parent(env, osd, obj, oh, false);
+ if (rc)
+ CERROR("%s: failed to remove agent entry for "DFID
+ ": rc = %d\n", osd_name(osd), PFID(fid), rc);
+ } else if (!lu_object_has_agent_entry(&obj->oo_dt.do_lu) && remote) {
+ rc = osd_add_to_remote_parent(env, osd, obj, oh);
+ if (rc)
+ CERROR("%s: failed to create agent entry for "DFID
+ ": rc = %d\n", osd_name(osd), PFID(fid), rc);
+ }
+
+ RETURN(rc);
+}
+