Whamcloud - gitweb
LU-1187 osd: add agent/local inode for remote directory
authorwangdi <di.wang@whamcloud.com>
Sat, 16 Nov 2013 08:41:27 +0000 (00:41 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 1 Feb 2013 19:38:31 +0000 (14:38 -0500)
A local inode will be created for remote directory entry, so
to make e2fsck check pass.

The remote directory on the remote MDT will be put under the
agent directory.

Remove LA_TYPE checking during osd_inode_set, because when
initialize the remote object, it needs to get TYPE information
from remote attr get, where we need type information anyway,
(osp_md_attr_get ->out_attr_get->osd_attr_get | LA_TYPE).
So osd_attr_get will always return LA_TYPE valid information
here, then if it do attr_get/modify/attr_set might hit this
ASSERT, So during osd_attr_set, we can just ignore type.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I7f177fa8ae2ab450cde78a8c2da9a2770ec6b0cb
Reviewed-on: http://review.whamcloud.com/4931
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
lustre/osd-ldiskfs/osd_compat.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h

index 62837d8..09045b7 100644 (file)
@@ -115,6 +115,123 @@ out:
 }
 
 /*
+ * directory structure on legacy MDT:
+ *
+ * REM_OBJ_DIR/ per mdt
+ * AGENT_OBJ_DIR/ per mdt
+ *
+ */
+static const char remote_obj_dir[] = "REM_OBJ_DIR";
+static const char agent_obj_dir[] = "AGENT_OBJ_DIR";
+int osd_mdt_init(struct osd_device *dev)
+{
+       struct lvfs_run_ctxt  new;
+       struct lvfs_run_ctxt  save;
+       struct dentry   *parent;
+       struct osd_mdobj_map *omm;
+       struct dentry   *d;
+       int                rc = 0;
+       ENTRY;
+
+       OBD_ALLOC_PTR(dev->od_mdt_map);
+       if (dev->od_mdt_map == NULL)
+               RETURN(-ENOMEM);
+
+       omm = dev->od_mdt_map;
+
+       LASSERT(dev->od_fsops);
+
+       parent = osd_sb(dev)->s_root;
+       osd_push_ctxt(dev, &new, &save);
+
+       d = simple_mkdir(parent, dev->od_mnt, agent_obj_dir,
+                        0755, 1);
+       if (IS_ERR(d))
+               GOTO(cleanup, rc = PTR_ERR(d));
+
+       omm->omm_agent_dentry = d;
+
+cleanup:
+       pop_ctxt(&save, &new, NULL);
+       if (rc) {
+               if (omm->omm_agent_dentry != NULL)
+                       dput(omm->omm_agent_dentry);
+               OBD_FREE_PTR(omm);
+               dev->od_mdt_map = NULL;
+       }
+       RETURN(rc);
+}
+
+static void osd_mdt_fini(struct osd_device *osd)
+{
+       struct osd_mdobj_map *omm = osd->od_mdt_map;
+
+       if (omm == NULL)
+               return;
+
+       if (omm->omm_agent_dentry)
+               dput(omm->omm_agent_dentry);
+
+       OBD_FREE_PTR(omm);
+       osd->od_ost_map = NULL;
+}
+
+int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
+                          struct osd_object *obj, struct osd_thandle *oh)
+{
+       struct osd_mdobj_map    *omm = osd->od_mdt_map;
+       struct osd_thread_info  *oti = osd_oti_get(env);
+       char                    *name_buf = oti->oti_name;
+       struct dentry           *agent;
+       struct dentry           *parent;
+       int                     rc;
+
+       parent = omm->omm_agent_dentry;
+       sprintf(name_buf, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+       agent = osd_child_dentry_by_inode(env, parent->d_inode,
+                                         name_buf, strlen(name_buf));
+       mutex_lock(&parent->d_inode->i_mutex);
+       rc = osd_ldiskfs_add_entry(oh->ot_handle, agent, obj->oo_inode, NULL);
+       parent->d_inode->i_nlink++;
+       mark_inode_dirty(parent->d_inode);
+       mutex_unlock(&parent->d_inode->i_mutex);
+       if (rc != 0)
+               CERROR("%s: "DFID" add agent error: rc = %d\n", osd_name(osd),
+                      PFID(lu_object_fid(&obj->oo_dt.do_lu)), rc);
+       RETURN(rc);
+}
+
+int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
+                          struct osd_object *obj, struct osd_thandle *oh)
+{
+       struct osd_mdobj_map       *omm = osd->od_mdt_map;
+       struct osd_thread_info     *oti = osd_oti_get(env);
+       char                       *name = oti->oti_name;
+       struct dentry              *agent;
+       struct dentry              *parent;
+       struct ldiskfs_dir_entry_2 *de;
+       struct buffer_head         *bh;
+       int                        rc;
+
+       parent = omm->omm_agent_dentry;
+       sprintf(name, DFID, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+       agent = osd_child_dentry_by_inode(env, parent->d_inode,
+                                         name, strlen(name));
+       mutex_lock(&parent->d_inode->i_mutex);
+       bh = osd_ldiskfs_find_entry(parent->d_inode, agent, &de, NULL);
+       if (bh == NULL) {
+               mutex_unlock(&parent->d_inode->i_mutex);
+               RETURN(-ENOENT);
+       }
+       rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode, de, bh);
+       parent->d_inode->i_nlink--;
+       mark_inode_dirty(parent->d_inode);
+       mutex_unlock(&parent->d_inode->i_mutex);
+       brelse(bh);
+       RETURN(rc);
+}
+
+/*
  * directory structure on legacy OST:
  *
  * O/<seq>/d0-31/<objid>
@@ -193,6 +310,30 @@ static void osd_seq_free(struct osd_obj_map *map,
        OBD_FREE_PTR(osd_seq);
 }
 
+static void osd_ost_fini(struct osd_device *osd)
+{
+       struct osd_obj_seq    *osd_seq;
+       struct osd_obj_seq    *tmp;
+       struct osd_obj_map    *map = osd->od_ost_map;
+       ENTRY;
+
+       if (map == NULL)
+               return;
+
+       write_lock(&map->om_seq_list_lock);
+       cfs_list_for_each_entry_safe(osd_seq, tmp,
+                                    &map->om_seq_list,
+                                    oos_seq_list) {
+               osd_seq_free(map, osd_seq);
+       }
+       write_unlock(&map->om_seq_list_lock);
+       if (map->om_root)
+               dput(map->om_root);
+       OBD_FREE_PTR(map);
+       osd->od_ost_map = NULL;
+       EXIT;
+}
+
 int osd_obj_map_init(struct osd_device *dev)
 {
        int rc;
@@ -200,8 +341,11 @@ int osd_obj_map_init(struct osd_device *dev)
 
        /* prepare structures for OST */
        rc = osd_ost_init(dev);
+       if (rc)
+               RETURN(rc);
 
        /* prepare structures for MDS */
+       rc = osd_mdt_init(dev);
 
         RETURN(rc);
 }
@@ -229,27 +373,8 @@ struct osd_obj_seq *osd_seq_find(struct osd_obj_map *map, obd_seq seq)
 
 void osd_obj_map_fini(struct osd_device *dev)
 {
-       struct osd_obj_seq    *osd_seq;
-       struct osd_obj_seq    *tmp;
-       struct osd_obj_map    *map = dev->od_ost_map;
-       ENTRY;
-
-       map = dev->od_ost_map;
-       if (map == NULL)
-               return;
-
-       write_lock(&dev->od_ost_map->om_seq_list_lock);
-       cfs_list_for_each_entry_safe(osd_seq, tmp,
-                                    &dev->od_ost_map->om_seq_list,
-                                    oos_seq_list) {
-               osd_seq_free(map, osd_seq);
-       }
-       write_unlock(&dev->od_ost_map->om_seq_list_lock);
-       if (map->om_root)
-               dput(map->om_root);
-       OBD_FREE_PTR(dev->od_ost_map);
-       dev->od_ost_map = NULL;
-       EXIT;
+       osd_ost_fini(dev);
+       osd_mdt_fini(dev);
 }
 
 static int osd_obj_del_entry(struct osd_thread_info *info,
@@ -257,26 +382,25 @@ static int osd_obj_del_entry(struct osd_thread_info *info,
                             struct dentry *dird, char *name,
                             struct thandle *th)
 {
-        struct ldiskfs_dir_entry_2 *de;
-        struct buffer_head         *bh;
-        struct osd_thandle         *oh;
-        struct dentry              *child;
-        struct inode               *dir = dird->d_inode;
-        int                         rc;
-
-        ENTRY;
+       struct ldiskfs_dir_entry_2 *de;
+       struct buffer_head         *bh;
+       struct osd_thandle         *oh;
+       struct dentry              *child;
+       struct inode               *dir = dird->d_inode;
+       int                         rc;
+       ENTRY;
 
-        oh = container_of(th, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle != NULL);
-        LASSERT(oh->ot_handle->h_transaction != NULL);
+       oh = container_of(th, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle != NULL);
+       LASSERT(oh->ot_handle->h_transaction != NULL);
 
 
-        child = &info->oti_child_dentry;
-        child->d_name.hash = 0;
-        child->d_name.name = name;
-        child->d_name.len = strlen(name);
-        child->d_parent = dird;
-        child->d_inode = NULL;
+       child = &info->oti_child_dentry;
+       child->d_name.hash = 0;
+       child->d_name.name = name;
+       child->d_name.len = strlen(name);
+       child->d_parent = dird;
+       child->d_inode = NULL;
 
        ll_vfs_dq_init(dir);
        mutex_lock(&dir->i_mutex);
index af3f674..b322eeb 100644 (file)
@@ -1546,8 +1546,6 @@ static int osd_inode_setattr(const struct lu_env *env,
 
         bits = attr->la_valid;
 
-        LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
-
         if (bits & LA_ATIME)
                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
         if (bits & LA_CTIME)
@@ -1976,7 +1974,7 @@ int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
        rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
        if (rc != 0) {
                CERROR("%s can not find "DFID": rc = %d\n",
-                      osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid), rc);
+                      osd_name(osd), PFID(fid), rc);
        }
        return rc;
 }
@@ -2142,8 +2140,15 @@ static int osd_object_destroy(const struct lu_env *env,
         * lock contention. So it will not affect unlink performance. */
        mutex_lock(&inode->i_mutex);
        if (S_ISDIR(inode->i_mode)) {
-               LASSERT(osd_inode_unlinked(inode) ||
-                       inode->i_nlink == 1);
+               LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1);
+               /* it will check/delete the agent inode for every dir
+                * destory, how to optimize it? unlink performance
+                * impaction XXX */
+               result = osd_delete_agent_inode(env, osd, obj, oh);
+               if (result != 0 && result != -ENOENT) {
+                       CERROR("%s: delete agent inode "DFID": rc = %d\n",
+                              osd_name(osd), PFID(fid), result);
+               }
                spin_lock(&obj->oo_guard);
                clear_nlink(inode);
                spin_unlock(&obj->oo_guard);
@@ -2251,6 +2256,101 @@ static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
        RETURN(0);
 }
 
+static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
+                                       struct inode *dir,
+                                       struct inode  *parent_dir,
+                                       const struct dt_rec *dot_fid,
+                                       const struct dt_rec *dot_dot_fid,
+                                       struct osd_thandle *oth)
+{
+       struct ldiskfs_dentry_param *dot_ldp;
+       struct ldiskfs_dentry_param *dot_dot_ldp;
+
+       dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
+       osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
+
+       dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
+       dot_ldp->edp_magic = 0;
+       return ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
+                                       dir, dot_ldp, dot_dot_ldp);
+}
+
+/**
+ * Create an local inode for remote entry
+ */
+static struct inode *osd_create_remote_inode(const struct lu_env *env,
+                                            struct osd_device *osd,
+                                            struct osd_object *pobj,
+                                            const struct lu_fid *fid,
+                                            struct thandle *th)
+{
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct inode            *local;
+       struct osd_thandle      *oh;
+       int                     rc;
+       ENTRY;
+
+       LASSERT(th);
+       oh = container_of(th, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle->h_transaction != NULL);
+
+#ifdef HAVE_QUOTA_SUPPORT
+       osd_push_ctxt(info->oti_env, save);
+#endif
+       /* FIXME: Insert index api needs to know the mode of
+        * the remote object. Just use S_IFDIR for now */
+       local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR);
+#ifdef HAVE_QUOTA_SUPPORT
+       osd_pop_ctxt(info->oti_env, save);
+#endif
+       if (IS_ERR(local)) {
+               CERROR("%s: create local error %d\n", osd_name(osd),
+                      (int)PTR_ERR(local));
+               RETURN(local);
+       }
+
+       rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
+               (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
+               (const struct dt_rec *)fid, oh);
+       if (rc != 0) {
+               CERROR("%s: "DFID" add dot dotdot error: rc = %d\n",
+                       osd_name(osd), PFID(fid), rc);
+               RETURN(ERR_PTR(rc));
+       }
+
+       RETURN(local);
+}
+
+/**
+ * Delete local inode for remote entry
+ */
+static int osd_delete_remote_inode(const struct lu_env *env,
+                                  struct osd_device *osd,
+                                  const struct lu_fid *fid,
+                                   __u32 ino, struct osd_thandle *oh)
+{
+       struct osd_thread_info  *oti = osd_oti_get(env);
+       struct osd_inode_id     *id = &oti->oti_id;
+       struct inode            *inode;
+       ENTRY;
+
+       id->oii_ino = le32_to_cpu(ino);
+       id->oii_gen = OSD_OII_NOGEN;
+       inode = osd_iget(oti, osd, id);
+       if (IS_ERR(inode)) {
+               CERROR("%s: iget error "DFID" id %u:%u\n", osd_name(osd),
+                      PFID(fid), id->oii_ino, id->oii_gen);
+               RETURN(PTR_ERR(inode));
+       }
+
+       clear_nlink(inode);
+       mark_inode_dirty(inode);
+       CDEBUG(D_INODE, "%s: delete remote inode "DFID" %lu\n",
+               osd_name(osd), PFID(fid), inode->i_ino);
+       iput(inode);
+       RETURN(0);
+}
+
 /**
  * OSD layer object create function for interoperability mode (b11826).
  * This is mostly similar to osd_object_create(). Only difference being, fid is
@@ -3040,6 +3140,27 @@ static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
         RETURN(rc);
 }
 
+static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+                         struct lu_fid *fid)
+{
+       struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
+       struct seq_server_site  *ss = osd_seq_site(osd);
+       int                     rc;
+       ENTRY;
+
+       if ((!fid_is_norm(fid) && !fid_is_igif(fid)) || ss == NULL)
+               RETURN(0);
+
+       rc = osd_fld_lookup(env, osd, fid, range);
+       if (rc != 0) {
+               CERROR("%s: Can not lookup fld for "DFID"\n",
+                      osd_name(osd), PFID(fid));
+               RETURN(rc);
+       }
+
+       RETURN(ss->ss_node_id != range->lsr_index);
+}
+
 /**
  * Index delete function for interoperability mode (b11826).
  * It will remove the directory entry added by osd_index_ea_insert().
@@ -3059,11 +3180,12 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         struct inode               *dir    = obj->oo_inode;
         struct dentry              *dentry;
         struct osd_thandle         *oh;
-        struct ldiskfs_dir_entry_2 *de;
+       struct ldiskfs_dir_entry_2 *de = NULL;
         struct buffer_head         *bh;
         struct htree_lock          *hlock = NULL;
-        int                         rc;
-
+       struct lu_fid              *fid = &osd_oti_get(env)->oti_fid;
+       struct osd_device          *osd = osd_dev(dt->do_lu.lo_dev);
+       int                        rc;
         ENTRY;
 
         LINVRNT(osd_invariant(obj));
@@ -3104,6 +3226,24 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         else
                up_write(&obj->oo_ext_idx_sem);
 
+       if (rc != 0)
+               GOTO(out, rc);
+
+       LASSERT(de != NULL);
+       rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid);
+       if (rc == 0 && osd_remote_fid(env, osd, fid)) {
+               __u32 ino = le32_to_cpu(de->inode);
+
+               rc = osd_delete_remote_inode(env, osd, fid, ino, oh);
+               if (rc != 0)
+                       CERROR("%s: del local inode "DFID": rc = %d\n",
+                               osd_name(osd), PFID(fid), rc);
+       } else {
+               if (rc == -ENODATA)
+                       rc = 0;
+       }
+out:
+
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
@@ -3319,8 +3459,6 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                               struct thandle *th)
 {
         struct inode                *inode = dir->oo_inode;
-        struct ldiskfs_dentry_param *dot_ldp;
-        struct ldiskfs_dentry_param *dot_dot_ldp;
         struct osd_thandle          *oth;
         int result = 0;
 
@@ -3336,21 +3474,18 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                         dir->oo_compat_dot_created = 1;
                         result = 0;
                 }
-        } else if(strcmp(name, dotdot) == 0) {
+       } else if (strcmp(name, dotdot) == 0) {
                if (!dir->oo_compat_dot_created)
                        return -EINVAL;
-
-               dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
-               osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
                /* in case of rename, dotdot is already created */
-               if (dir->oo_compat_dotdot_created)
+               if (dir->oo_compat_dotdot_created) {
                        return __osd_ea_add_rec(info, dir, parent_dir, name,
                                                dot_dot_fid, NULL, th);
+               }
 
-               dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
-               dot_ldp->edp_magic = 0;
-               result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
-                                               inode, dot_ldp, dot_dot_ldp);
+               result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
+                                               parent_dir, dot_fid,
+                                               dot_dot_fid, oth);
                if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
        }
@@ -3489,6 +3624,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
                struct osd_thread_info *oti = osd_oti_get(env);
+               struct osd_inode_id *id = &oti->oti_id;
                struct osd_idmap_cache *oic = &oti->oti_cache;
                struct osd_device *dev = osd_obj2dev(obj);
                struct osd_scrub *scrub = &dev->od_scrub;
@@ -3500,14 +3636,15 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                /* done with de, release bh */
                brelse(bh);
                if (rc != 0)
-                       rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+                       rc = osd_ea_fid_get(env, obj, ino, fid, id);
                else
-                       osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
-               if (rc != 0) {
+                       osd_id_gen(id, ino, OSD_OII_NOGEN);
+               if (rc != 0 || osd_remote_fid(env, dev, fid)) {
                        fid_zero(&oic->oic_fid);
                        GOTO(out, rc);
                }
 
+               oic->oic_lid = *id;
                oic->oic_fid = *fid;
                if ((scrub->os_pos_current <= ino) &&
                    (sf->sf_flags & SF_INCONSISTENT ||
@@ -3602,7 +3739,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
                                       struct thandle *handle)
 {
        struct osd_thandle      *oh;
-       struct inode            *inode;
+       struct osd_device       *osd   = osd_dev(dt->do_lu.lo_dev);
        struct lu_fid           *fid = (struct lu_fid *)rec;
        int                     rc;
        ENTRY;
@@ -3616,24 +3753,42 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
        osd_trans_declare_op(env, oh, OSD_OT_INSERT,
                             osd_dto_credits_noquota[DTO_INDEX_INSERT]);
 
-       inode = osd_dt_obj(dt)->oo_inode;
-       LASSERT(inode);
+       if (osd_dt_obj(dt)->oo_inode == NULL) {
+               const char *name  = (const char *)key;
+               /* Object is not being created yet. Only happens when
+                *     1. declare directory create
+                *     2. declare insert .
+                *     3. declare insert ..
+                */
+               LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0);
+       } else {
+               struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+               /* We ignore block quota on meta pool (MDTs), so needn't
+                * calculate how many blocks will be consumed by this index
+                * insert */
+               rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0,
+                                          oh, true, true, NULL, false);
+       }
 
-       /* We ignore block quota on meta pool (MDTs), so needn't
-        * calculate how many blocks will be consumed by this index
-        * insert */
-       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
-                                  true, true, NULL, false);
        if (fid == NULL)
                RETURN(0);
 
-       /* It does fld look up inside declare, and the result will be
-       * added to fld cache, so the following fld lookup inside insert
-       * does not need send RPC anymore, so avoid send rpc with holding
-       * transaction */
-       LASSERTF(fid_is_sane(fid), "fid is insane"DFID"\n", PFID(fid));
-       osd_fld_lookup(env, osd_dt_dev(handle->th_dev), fid,
-                       &osd_oti_get(env)->oti_seq_range);
+       rc = osd_remote_fid(env, osd, fid);
+       if (rc <= 0)
+               RETURN(rc);
+
+#ifdef OSD_DECLARE_OP
+#define OSD_OT_CREATE create
+#define OSD_OT_INSERT insert
+#define osd_trans_declare_op(env, oh, op, cred) OSD_DECLARE_OP(oh, op, cred)
+#endif
+       osd_trans_declare_op(env, oh, OSD_OT_CREATE,
+                            osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
+       osd_trans_declare_op(env, oh, OSD_OT_INSERT,
+                            osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
+       osd_trans_declare_op(env, oh, OSD_OT_INSERT,
+                            osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
 
        RETURN(rc);
 }
@@ -3654,13 +3809,16 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                                const struct dt_key *key, struct thandle *th,
                                struct lustre_capa *capa, int ignore_quota)
 {
-        struct osd_object *obj   = osd_dt_obj(dt);
-        struct lu_fid     *fid   = (struct lu_fid *) rec;
-        const char        *name  = (const char *)key;
-        struct osd_object *child;
-        int                rc;
-
-        ENTRY;
+       struct osd_object       *obj = osd_dt_obj(dt);
+       struct osd_device       *osd = osd_dev(dt->do_lu.lo_dev);
+       struct lu_fid           *fid = (struct lu_fid *) rec;
+       const char              *name = (const char *)key;
+       struct osd_thread_info  *oti   = osd_oti_get(env);
+       struct osd_inode_id     *id    = &oti->oti_id;
+       struct inode            *child_inode = NULL;
+       struct osd_object       *child = NULL;
+       int                     rc;
+       ENTRY;
 
         LASSERT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
@@ -3671,16 +3829,59 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                 RETURN(-EACCES);
 
-        child = osd_object_find(env, dt, fid);
-        if (!IS_ERR(child)) {
-                rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
-                osd_object_put(env, child);
-        } else {
-                rc = PTR_ERR(child);
-        }
+       LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
 
-        LASSERT(osd_invariant(obj));
-        RETURN(rc);
+       rc = osd_remote_fid(env, osd, fid);
+       if (rc < 0) {
+               CERROR("%s: Can not find object "DFID" rc %d\n",
+                      osd_name(osd), PFID(fid), rc);
+               RETURN(rc);
+       }
+
+       if (rc == 1) {
+               /* Insert remote entry */
+               if (strcmp(name, dotdot) == 0 && strlen(name) == 2) {
+                       struct osd_mdobj_map    *omm = osd->od_mdt_map;
+                       struct osd_thandle      *oh;
+
+                       /* If parent on remote MDT, we need put this object
+                        * under AGENT */
+                       oh = container_of(th, typeof(*oh), ot_super);
+                       rc = osd_create_agent_inode(env, osd, obj, oh);
+                       if (rc != 0) {
+                               CERROR("%s: add agent "DFID" error: rc = %d\n",
+                                      osd_name(osd),
+                                      PFID(lu_object_fid(&dt->do_lu)), rc);
+                               RETURN(rc);
+                       }
+
+                       child_inode = igrab(omm->omm_agent_dentry->d_inode);
+               } else {
+                       child_inode = osd_create_remote_inode(env, osd, obj,
+                                                             fid, th);
+                       if (IS_ERR(child_inode))
+                               RETURN(PTR_ERR(child_inode));
+               }
+       } else {
+               /* Insert local entry */
+               child = osd_object_find(env, dt, fid);
+               if (IS_ERR(child)) {
+                       CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n",
+                              osd_name(osd), PFID(fid),
+                              id->oii_ino, id->oii_gen,
+                              (int)PTR_ERR(child_inode));
+                       RETURN(PTR_ERR(child_inode));
+               }
+               child_inode = igrab(child->oo_inode);
+       }
+
+       rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th);
+
+       iput(child_inode);
+       if (child != NULL)
+               osd_object_put(env, child);
+       LASSERT(osd_invariant(obj));
+       RETURN(rc);
 }
 
 /**
@@ -3763,7 +3964,6 @@ static int osd_it_iam_get(const struct lu_env *env,
  *
  *  \param  di      osd iterator
  */
-
 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
 {
         struct osd_it_iam *it = (struct osd_it_iam *)di;
@@ -4249,6 +4449,7 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
        struct osd_scrub       *scrub = &dev->od_scrub;
        struct scrub_file      *sf    = &scrub->os_file;
        struct osd_thread_info *oti   = osd_oti_get(env);
+       struct osd_inode_id    *id    = &oti->oti_id;
        struct osd_idmap_cache *oic   = &oti->oti_cache;
        struct lu_fid          *fid   = &it->oie_dirent->oied_fid;
        struct lu_dirent       *lde   = (struct lu_dirent *)dtrec;
@@ -4257,19 +4458,24 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
        ENTRY;
 
        if (!fid_is_sane(fid)) {
-               rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+               rc = osd_ea_fid_get(env, obj, ino, fid, id);
                if (rc != 0) {
                        fid_zero(&oic->oic_fid);
                        RETURN(rc);
                }
        } else {
-               osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
+               osd_id_gen(id, ino, OSD_OII_NOGEN);
        }
 
        osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
                           it->oie_dirent->oied_name,
                           it->oie_dirent->oied_namelen,
                           it->oie_dirent->oied_type, attr);
+
+       if (osd_remote_fid(env, dev, fid))
+               RETURN(0);
+
+       oic->oic_lid = *id;
        oic->oic_fid = *fid;
        if ((scrub->os_pos_current <= ino) &&
            (sf->sf_flags & SF_INCONSISTENT ||
index e210f74..01ffd80 100644 (file)
@@ -169,6 +169,16 @@ struct osd_obj_map {
        struct semaphore om_dir_init_sem;
 };
 
+struct osd_mdobj {
+       struct dentry   *om_root;      /* AGENT/<index> */
+       obd_seq         om_index;     /* mdt index */
+       cfs_list_t      om_list;      /* list to omm_list */
+};
+
+struct osd_mdobj_map {
+       struct dentry   *omm_agent_dentry;
+};
+
 #define osd_ldiskfs_find_entry(dir, dentry, de, lock)   \
         ll_ldiskfs_find_entry(dir, dentry, de, lock)
 #define osd_ldiskfs_add_entry(handle, child, cinode, hlock) \
@@ -265,6 +275,7 @@ struct osd_device {
        struct lu_site            od_site;
 
        struct osd_obj_map      *od_ost_map;
+       struct osd_mdobj_map    *od_mdt_map;
 
         unsigned long long        od_readcache_max_filesize;
         int                       od_read_cache;
@@ -591,6 +602,7 @@ struct osd_thread_info {
        bool                    oti_rollback;
 #endif
 
+       char                    oti_name[48];
 };
 
 extern int ldiskfs_pdo;
@@ -643,6 +655,15 @@ int osd_scrub_dump(struct osd_device *dev, char *buf, int len);
 
 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
                   const struct lu_fid *fid, struct lu_seq_range *range);
+struct dentry *osd_agent_lookup(struct osd_mdobj_map *omm, int index);
+struct dentry *osd_agent_load(const struct osd_device *osd, int mdt_index,
+                             int create);
+
+int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
+                          struct osd_object *obj, struct osd_thandle *oh);
+int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
+                          struct osd_object *obj, struct osd_thandle *oh);
+
 /* osd_quota_fmt.c */
 int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
                       int type, uint blk, int depth, uint index,