Whamcloud - gitweb
LU-1187 osd: add agent/local inode for remote directory
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index af3f674..b322eeb 100644 (file)
@@ -1546,8 +1546,6 @@ static int osd_inode_setattr(const struct lu_env *env,
 
         bits = attr->la_valid;
 
-        LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
-
         if (bits & LA_ATIME)
                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
         if (bits & LA_CTIME)
@@ -1976,7 +1974,7 @@ int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
        rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
        if (rc != 0) {
                CERROR("%s can not find "DFID": rc = %d\n",
-                      osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid), rc);
+                      osd_name(osd), PFID(fid), rc);
        }
        return rc;
 }
@@ -2142,8 +2140,15 @@ static int osd_object_destroy(const struct lu_env *env,
         * lock contention. So it will not affect unlink performance. */
        mutex_lock(&inode->i_mutex);
        if (S_ISDIR(inode->i_mode)) {
-               LASSERT(osd_inode_unlinked(inode) ||
-                       inode->i_nlink == 1);
+               LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1);
+               /* it will check/delete the agent inode for every dir
+                * destory, how to optimize it? unlink performance
+                * impaction XXX */
+               result = osd_delete_agent_inode(env, osd, obj, oh);
+               if (result != 0 && result != -ENOENT) {
+                       CERROR("%s: delete agent inode "DFID": rc = %d\n",
+                              osd_name(osd), PFID(fid), result);
+               }
                spin_lock(&obj->oo_guard);
                clear_nlink(inode);
                spin_unlock(&obj->oo_guard);
@@ -2251,6 +2256,101 @@ static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
        RETURN(0);
 }
 
+static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
+                                       struct inode *dir,
+                                       struct inode  *parent_dir,
+                                       const struct dt_rec *dot_fid,
+                                       const struct dt_rec *dot_dot_fid,
+                                       struct osd_thandle *oth)
+{
+       struct ldiskfs_dentry_param *dot_ldp;
+       struct ldiskfs_dentry_param *dot_dot_ldp;
+
+       dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
+       osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
+
+       dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
+       dot_ldp->edp_magic = 0;
+       return ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
+                                       dir, dot_ldp, dot_dot_ldp);
+}
+
+/**
+ * Create an local inode for remote entry
+ */
+static struct inode *osd_create_remote_inode(const struct lu_env *env,
+                                            struct osd_device *osd,
+                                            struct osd_object *pobj,
+                                            const struct lu_fid *fid,
+                                            struct thandle *th)
+{
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct inode            *local;
+       struct osd_thandle      *oh;
+       int                     rc;
+       ENTRY;
+
+       LASSERT(th);
+       oh = container_of(th, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle->h_transaction != NULL);
+
+#ifdef HAVE_QUOTA_SUPPORT
+       osd_push_ctxt(info->oti_env, save);
+#endif
+       /* FIXME: Insert index api needs to know the mode of
+        * the remote object. Just use S_IFDIR for now */
+       local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR);
+#ifdef HAVE_QUOTA_SUPPORT
+       osd_pop_ctxt(info->oti_env, save);
+#endif
+       if (IS_ERR(local)) {
+               CERROR("%s: create local error %d\n", osd_name(osd),
+                      (int)PTR_ERR(local));
+               RETURN(local);
+       }
+
+       rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
+               (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
+               (const struct dt_rec *)fid, oh);
+       if (rc != 0) {
+               CERROR("%s: "DFID" add dot dotdot error: rc = %d\n",
+                       osd_name(osd), PFID(fid), rc);
+               RETURN(ERR_PTR(rc));
+       }
+
+       RETURN(local);
+}
+
+/**
+ * Delete local inode for remote entry
+ */
+static int osd_delete_remote_inode(const struct lu_env *env,
+                                  struct osd_device *osd,
+                                  const struct lu_fid *fid,
+                                   __u32 ino, struct osd_thandle *oh)
+{
+       struct osd_thread_info  *oti = osd_oti_get(env);
+       struct osd_inode_id     *id = &oti->oti_id;
+       struct inode            *inode;
+       ENTRY;
+
+       id->oii_ino = le32_to_cpu(ino);
+       id->oii_gen = OSD_OII_NOGEN;
+       inode = osd_iget(oti, osd, id);
+       if (IS_ERR(inode)) {
+               CERROR("%s: iget error "DFID" id %u:%u\n", osd_name(osd),
+                      PFID(fid), id->oii_ino, id->oii_gen);
+               RETURN(PTR_ERR(inode));
+       }
+
+       clear_nlink(inode);
+       mark_inode_dirty(inode);
+       CDEBUG(D_INODE, "%s: delete remote inode "DFID" %lu\n",
+               osd_name(osd), PFID(fid), inode->i_ino);
+       iput(inode);
+       RETURN(0);
+}
+
 /**
  * OSD layer object create function for interoperability mode (b11826).
  * This is mostly similar to osd_object_create(). Only difference being, fid is
@@ -3040,6 +3140,27 @@ static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
         RETURN(rc);
 }
 
+static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+                         struct lu_fid *fid)
+{
+       struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
+       struct seq_server_site  *ss = osd_seq_site(osd);
+       int                     rc;
+       ENTRY;
+
+       if ((!fid_is_norm(fid) && !fid_is_igif(fid)) || ss == NULL)
+               RETURN(0);
+
+       rc = osd_fld_lookup(env, osd, fid, range);
+       if (rc != 0) {
+               CERROR("%s: Can not lookup fld for "DFID"\n",
+                      osd_name(osd), PFID(fid));
+               RETURN(rc);
+       }
+
+       RETURN(ss->ss_node_id != range->lsr_index);
+}
+
 /**
  * Index delete function for interoperability mode (b11826).
  * It will remove the directory entry added by osd_index_ea_insert().
@@ -3059,11 +3180,12 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         struct inode               *dir    = obj->oo_inode;
         struct dentry              *dentry;
         struct osd_thandle         *oh;
-        struct ldiskfs_dir_entry_2 *de;
+       struct ldiskfs_dir_entry_2 *de = NULL;
         struct buffer_head         *bh;
         struct htree_lock          *hlock = NULL;
-        int                         rc;
-
+       struct lu_fid              *fid = &osd_oti_get(env)->oti_fid;
+       struct osd_device          *osd = osd_dev(dt->do_lu.lo_dev);
+       int                        rc;
         ENTRY;
 
         LINVRNT(osd_invariant(obj));
@@ -3104,6 +3226,24 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         else
                up_write(&obj->oo_ext_idx_sem);
 
+       if (rc != 0)
+               GOTO(out, rc);
+
+       LASSERT(de != NULL);
+       rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid);
+       if (rc == 0 && osd_remote_fid(env, osd, fid)) {
+               __u32 ino = le32_to_cpu(de->inode);
+
+               rc = osd_delete_remote_inode(env, osd, fid, ino, oh);
+               if (rc != 0)
+                       CERROR("%s: del local inode "DFID": rc = %d\n",
+                               osd_name(osd), PFID(fid), rc);
+       } else {
+               if (rc == -ENODATA)
+                       rc = 0;
+       }
+out:
+
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
@@ -3319,8 +3459,6 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                               struct thandle *th)
 {
         struct inode                *inode = dir->oo_inode;
-        struct ldiskfs_dentry_param *dot_ldp;
-        struct ldiskfs_dentry_param *dot_dot_ldp;
         struct osd_thandle          *oth;
         int result = 0;
 
@@ -3336,21 +3474,18 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                         dir->oo_compat_dot_created = 1;
                         result = 0;
                 }
-        } else if(strcmp(name, dotdot) == 0) {
+       } else if (strcmp(name, dotdot) == 0) {
                if (!dir->oo_compat_dot_created)
                        return -EINVAL;
-
-               dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
-               osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
                /* in case of rename, dotdot is already created */
-               if (dir->oo_compat_dotdot_created)
+               if (dir->oo_compat_dotdot_created) {
                        return __osd_ea_add_rec(info, dir, parent_dir, name,
                                                dot_dot_fid, NULL, th);
+               }
 
-               dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
-               dot_ldp->edp_magic = 0;
-               result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
-                                               inode, dot_ldp, dot_dot_ldp);
+               result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
+                                               parent_dir, dot_fid,
+                                               dot_dot_fid, oth);
                if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
        }
@@ -3489,6 +3624,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
                struct osd_thread_info *oti = osd_oti_get(env);
+               struct osd_inode_id *id = &oti->oti_id;
                struct osd_idmap_cache *oic = &oti->oti_cache;
                struct osd_device *dev = osd_obj2dev(obj);
                struct osd_scrub *scrub = &dev->od_scrub;
@@ -3500,14 +3636,15 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                /* done with de, release bh */
                brelse(bh);
                if (rc != 0)
-                       rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+                       rc = osd_ea_fid_get(env, obj, ino, fid, id);
                else
-                       osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
-               if (rc != 0) {
+                       osd_id_gen(id, ino, OSD_OII_NOGEN);
+               if (rc != 0 || osd_remote_fid(env, dev, fid)) {
                        fid_zero(&oic->oic_fid);
                        GOTO(out, rc);
                }
 
+               oic->oic_lid = *id;
                oic->oic_fid = *fid;
                if ((scrub->os_pos_current <= ino) &&
                    (sf->sf_flags & SF_INCONSISTENT ||
@@ -3602,7 +3739,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
                                       struct thandle *handle)
 {
        struct osd_thandle      *oh;
-       struct inode            *inode;
+       struct osd_device       *osd   = osd_dev(dt->do_lu.lo_dev);
        struct lu_fid           *fid = (struct lu_fid *)rec;
        int                     rc;
        ENTRY;
@@ -3616,24 +3753,42 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
        osd_trans_declare_op(env, oh, OSD_OT_INSERT,
                             osd_dto_credits_noquota[DTO_INDEX_INSERT]);
 
-       inode = osd_dt_obj(dt)->oo_inode;
-       LASSERT(inode);
+       if (osd_dt_obj(dt)->oo_inode == NULL) {
+               const char *name  = (const char *)key;
+               /* Object is not being created yet. Only happens when
+                *     1. declare directory create
+                *     2. declare insert .
+                *     3. declare insert ..
+                */
+               LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0);
+       } else {
+               struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+               /* We ignore block quota on meta pool (MDTs), so needn't
+                * calculate how many blocks will be consumed by this index
+                * insert */
+               rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0,
+                                          oh, true, true, NULL, false);
+       }
 
-       /* We ignore block quota on meta pool (MDTs), so needn't
-        * calculate how many blocks will be consumed by this index
-        * insert */
-       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
-                                  true, true, NULL, false);
        if (fid == NULL)
                RETURN(0);
 
-       /* It does fld look up inside declare, and the result will be
-       * added to fld cache, so the following fld lookup inside insert
-       * does not need send RPC anymore, so avoid send rpc with holding
-       * transaction */
-       LASSERTF(fid_is_sane(fid), "fid is insane"DFID"\n", PFID(fid));
-       osd_fld_lookup(env, osd_dt_dev(handle->th_dev), fid,
-                       &osd_oti_get(env)->oti_seq_range);
+       rc = osd_remote_fid(env, osd, fid);
+       if (rc <= 0)
+               RETURN(rc);
+
+#ifdef OSD_DECLARE_OP
+#define OSD_OT_CREATE create
+#define OSD_OT_INSERT insert
+#define osd_trans_declare_op(env, oh, op, cred) OSD_DECLARE_OP(oh, op, cred)
+#endif
+       osd_trans_declare_op(env, oh, OSD_OT_CREATE,
+                            osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
+       osd_trans_declare_op(env, oh, OSD_OT_INSERT,
+                            osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
+       osd_trans_declare_op(env, oh, OSD_OT_INSERT,
+                            osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
 
        RETURN(rc);
 }
@@ -3654,13 +3809,16 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                                const struct dt_key *key, struct thandle *th,
                                struct lustre_capa *capa, int ignore_quota)
 {
-        struct osd_object *obj   = osd_dt_obj(dt);
-        struct lu_fid     *fid   = (struct lu_fid *) rec;
-        const char        *name  = (const char *)key;
-        struct osd_object *child;
-        int                rc;
-
-        ENTRY;
+       struct osd_object       *obj = osd_dt_obj(dt);
+       struct osd_device       *osd = osd_dev(dt->do_lu.lo_dev);
+       struct lu_fid           *fid = (struct lu_fid *) rec;
+       const char              *name = (const char *)key;
+       struct osd_thread_info  *oti   = osd_oti_get(env);
+       struct osd_inode_id     *id    = &oti->oti_id;
+       struct inode            *child_inode = NULL;
+       struct osd_object       *child = NULL;
+       int                     rc;
+       ENTRY;
 
         LASSERT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
@@ -3671,16 +3829,59 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                 RETURN(-EACCES);
 
-        child = osd_object_find(env, dt, fid);
-        if (!IS_ERR(child)) {
-                rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
-                osd_object_put(env, child);
-        } else {
-                rc = PTR_ERR(child);
-        }
+       LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
 
-        LASSERT(osd_invariant(obj));
-        RETURN(rc);
+       rc = osd_remote_fid(env, osd, fid);
+       if (rc < 0) {
+               CERROR("%s: Can not find object "DFID" rc %d\n",
+                      osd_name(osd), PFID(fid), rc);
+               RETURN(rc);
+       }
+
+       if (rc == 1) {
+               /* Insert remote entry */
+               if (strcmp(name, dotdot) == 0 && strlen(name) == 2) {
+                       struct osd_mdobj_map    *omm = osd->od_mdt_map;
+                       struct osd_thandle      *oh;
+
+                       /* If parent on remote MDT, we need put this object
+                        * under AGENT */
+                       oh = container_of(th, typeof(*oh), ot_super);
+                       rc = osd_create_agent_inode(env, osd, obj, oh);
+                       if (rc != 0) {
+                               CERROR("%s: add agent "DFID" error: rc = %d\n",
+                                      osd_name(osd),
+                                      PFID(lu_object_fid(&dt->do_lu)), rc);
+                               RETURN(rc);
+                       }
+
+                       child_inode = igrab(omm->omm_agent_dentry->d_inode);
+               } else {
+                       child_inode = osd_create_remote_inode(env, osd, obj,
+                                                             fid, th);
+                       if (IS_ERR(child_inode))
+                               RETURN(PTR_ERR(child_inode));
+               }
+       } else {
+               /* Insert local entry */
+               child = osd_object_find(env, dt, fid);
+               if (IS_ERR(child)) {
+                       CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n",
+                              osd_name(osd), PFID(fid),
+                              id->oii_ino, id->oii_gen,
+                              (int)PTR_ERR(child_inode));
+                       RETURN(PTR_ERR(child_inode));
+               }
+               child_inode = igrab(child->oo_inode);
+       }
+
+       rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th);
+
+       iput(child_inode);
+       if (child != NULL)
+               osd_object_put(env, child);
+       LASSERT(osd_invariant(obj));
+       RETURN(rc);
 }
 
 /**
@@ -3763,7 +3964,6 @@ static int osd_it_iam_get(const struct lu_env *env,
  *
  *  \param  di      osd iterator
  */
-
 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
 {
         struct osd_it_iam *it = (struct osd_it_iam *)di;
@@ -4249,6 +4449,7 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
        struct osd_scrub       *scrub = &dev->od_scrub;
        struct scrub_file      *sf    = &scrub->os_file;
        struct osd_thread_info *oti   = osd_oti_get(env);
+       struct osd_inode_id    *id    = &oti->oti_id;
        struct osd_idmap_cache *oic   = &oti->oti_cache;
        struct lu_fid          *fid   = &it->oie_dirent->oied_fid;
        struct lu_dirent       *lde   = (struct lu_dirent *)dtrec;
@@ -4257,19 +4458,24 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
        ENTRY;
 
        if (!fid_is_sane(fid)) {
-               rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+               rc = osd_ea_fid_get(env, obj, ino, fid, id);
                if (rc != 0) {
                        fid_zero(&oic->oic_fid);
                        RETURN(rc);
                }
        } else {
-               osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
+               osd_id_gen(id, ino, OSD_OII_NOGEN);
        }
 
        osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
                           it->oie_dirent->oied_name,
                           it->oie_dirent->oied_namelen,
                           it->oie_dirent->oied_type, attr);
+
+       if (osd_remote_fid(env, dev, fid))
+               RETURN(0);
+
+       oic->oic_lid = *id;
        oic->oic_fid = *fid;
        if ((scrub->os_pos_current <= ino) &&
            (sf->sf_flags & SF_INCONSISTENT ||