+static int osd_mdt_init(const struct lu_env *env, struct osd_device *dev)
+{
+ struct lvfs_run_ctxt new;
+ struct lvfs_run_ctxt save;
+ struct dentry *parent;
+ struct osd_mdobj_map *omm;
+ struct dentry *d;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lu_fid *fid = &info->oti_fid3;
+ int rc = 0;
+
+ ENTRY;
+
+ OBD_ALLOC_PTR(dev->od_mdt_map);
+ if (dev->od_mdt_map == NULL)
+ RETURN(-ENOMEM);
+
+ omm = dev->od_mdt_map;
+
+ parent = osd_sb(dev)->s_root;
+ osd_push_ctxt(dev, &new, &save);
+
+ lu_local_obj_fid(fid, REMOTE_PARENT_DIR_OID);
+ d = simple_mkdir(env, dev, parent, fid, REMOTE_PARENT_DIR,
+ LMAC_NOT_IN_OI, 0755, NULL);
+ if (IS_ERR(d))
+ GOTO(cleanup, rc = PTR_ERR(d));
+
+ omm->omm_remote_parent = d;
+
+ GOTO(cleanup, rc = 0);
+
+cleanup:
+ pop_ctxt(&save, &new);
+ if (rc) {
+ if (omm->omm_remote_parent != NULL)
+ dput(omm->omm_remote_parent);
+ OBD_FREE_PTR(omm);
+ dev->od_mdt_map = NULL;
+ }
+ return rc;
+}
+
+static void osd_mdt_fini(struct osd_device *osd)
+{
+ struct osd_mdobj_map *omm = osd->od_mdt_map;
+
+ if (omm == NULL)
+ return;
+
+ if (omm->omm_remote_parent)
+ dput(omm->omm_remote_parent);
+
+ OBD_FREE_PTR(omm);
+ osd->od_ost_map = NULL;
+}
+
+int osd_add_to_remote_parent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh)
+{
+ struct osd_mdobj_map *omm = osd->od_mdt_map;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma = &oti->oti_ost_attrs.loa_lma;
+ char *name = oti->oti_name;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct dentry *dentry;
+ struct dentry *parent;
+ int rc;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AGENTENT))
+ RETURN(0);
+
+ /*
+ * Set REMOTE_PARENT in lma, so other process like unlink or lfsck
+ * can identify this object quickly
+ */
+ rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry,
+ &oti->oti_ost_attrs);
+ if (rc)
+ RETURN(rc);
+
+ lma->lma_incompat |= LMAI_REMOTE_PARENT;
+ lustre_lma_swab(lma);
+ rc = __osd_xattr_set(oti, obj->oo_inode, XATTR_NAME_LMA, lma,
+ sizeof(*lma), XATTR_REPLACE);
+ if (rc)
+ RETURN(rc);
+
+ parent = omm->omm_remote_parent;
+ sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+ dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+ name, strlen(name));
+ inode_lock(parent->d_inode);
+ rc = osd_ldiskfs_add_entry(info, osd, oh->ot_handle, dentry,
+ obj->oo_inode, NULL);
+ if (!rc && S_ISDIR(obj->oo_inode->i_mode))
+ ldiskfs_inc_count(oh->ot_handle, parent->d_inode);
+ else if (unlikely(rc == -EEXIST))
+ rc = 0;
+ if (!rc)
+ lu_object_set_agent_entry(&obj->oo_dt.do_lu);
+ CDEBUG(D_INODE, "%s: create agent entry for %s: rc = %d\n",
+ osd_name(osd), name, rc);
+ mark_inode_dirty(parent->d_inode);
+ inode_unlock(parent->d_inode);
+ RETURN(rc);
+}
+
+int osd_delete_from_remote_parent(const struct lu_env *env,
+ struct osd_device *osd,
+ struct osd_object *obj,
+ struct osd_thandle *oh, bool destroy)
+{
+ struct osd_mdobj_map *omm = osd->od_mdt_map;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct lustre_mdt_attrs *lma = &oti->oti_ost_attrs.loa_lma;
+ char *name = oti->oti_name;
+ struct dentry *dentry;
+ struct dentry *parent;
+ struct ldiskfs_dir_entry_2 *de;
+ struct buffer_head *bh;
+ int rc;
+
+ parent = omm->omm_remote_parent;
+ sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+ dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+ name, strlen(name));
+ inode_lock(parent->d_inode);
+ bh = osd_ldiskfs_find_entry(parent->d_inode, &dentry->d_name, &de,
+ NULL, NULL);
+ if (IS_ERR(bh)) {
+ inode_unlock(parent->d_inode);
+ rc = PTR_ERR(bh);
+ if (unlikely(rc == -ENOENT))
+ rc = 0;
+ } else {
+ rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode,
+ de, bh);
+ if (!rc && S_ISDIR(obj->oo_inode->i_mode))
+ ldiskfs_dec_count(oh->ot_handle, parent->d_inode);
+ mark_inode_dirty(parent->d_inode);
+ inode_unlock(parent->d_inode);
+ brelse(bh);
+ CDEBUG(D_INODE, "%s: remove agent entry for %s: rc = %d\n",
+ osd_name(osd), name, rc);
+ }
+
+ if (destroy || rc) {
+ if (!rc)
+ lu_object_clear_agent_entry(&obj->oo_dt.do_lu);
+
+ RETURN(rc);
+ }
+
+ rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry,
+ &oti->oti_ost_attrs);
+ if (rc)
+ RETURN(rc);
+
+ /* Get rid of REMOTE_PARENT flag from incompat */
+ lma->lma_incompat &= ~LMAI_REMOTE_PARENT;
+ lustre_lma_swab(lma);
+ rc = __osd_xattr_set(oti, obj->oo_inode, XATTR_NAME_LMA, lma,
+ sizeof(*lma), XATTR_REPLACE);
+ if (!rc)
+ lu_object_clear_agent_entry(&obj->oo_dt.do_lu);
+ RETURN(rc);
+}
+
+int osd_lookup_in_remote_parent(struct osd_thread_info *oti,
+ struct osd_device *osd,
+ const struct lu_fid *fid,
+ struct osd_inode_id *id)
+{
+ struct osd_mdobj_map *omm = osd->od_mdt_map;
+ char *name = oti->oti_name;
+ struct dentry *parent;
+ struct dentry *dentry;
+ struct ldiskfs_dir_entry_2 *de;
+ struct buffer_head *bh;
+ int rc;
+
+ ENTRY;
+
+ if (unlikely(osd->od_is_ost))
+ RETURN(-ENOENT);
+
+ parent = omm->omm_remote_parent;
+ sprintf(name, DFID_NOBRACE, PFID(fid));
+ dentry = osd_child_dentry_by_inode(oti->oti_env, parent->d_inode,
+ name, strlen(name));
+ inode_lock(parent->d_inode);
+ bh = osd_ldiskfs_find_entry(parent->d_inode, &dentry->d_name, &de,
+ NULL, NULL);
+ if (IS_ERR(bh)) {
+ rc = PTR_ERR(bh);
+ } else {
+ struct inode *inode;
+
+ osd_id_gen(id, le32_to_cpu(de->inode), OSD_OII_NOGEN);
+ brelse(bh);
+ inode = osd_iget(oti, osd, id);
+ if (IS_ERR(inode)) {
+ rc = PTR_ERR(inode);
+ if (rc == -ESTALE)
+ rc = -ENOENT;
+ } else {
+ iput(inode);
+ rc = 0;
+ }
+ }
+ inode_unlock(parent->d_inode);
+ if (rc == 0)
+ osd_add_oi_cache(oti, osd, id, fid);
+ RETURN(rc);
+}
+