Whamcloud - gitweb
LU-3409 llite: silence lockdep warning in ll_md_blocking_ast
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_compat.c
index 09045b7..4794d78 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <linux/types.h>
 /* prerequisite for linux/xattr.h */
 #include <linux/fs.h>
+/* XATTR_{REPLACE,CREATE} */
+#include <linux/xattr.h>
 
 /*
  * struct OBD_{ALLOC,FREE}*()
  * OBD_FAIL_CHECK
  */
 #include <obd_support.h>
-#include <lvfs.h>
 
 #include "osd_internal.h"
 #include "osd_oi.h"
@@ -76,6 +77,54 @@ static void osd_pop_ctxt(const struct osd_device *dev,
        pop_ctxt(save, new, NULL);
 }
 
+/* utility to make a directory */
+static struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt,
+                                  const char *name, int mode, int fix)
+{
+       struct dentry *dchild;
+       int err = 0;
+       ENTRY;
+
+       // ASSERT_KERNEL_CTXT("kernel doing mkdir outside kernel context\n");
+       CDEBUG(D_INODE, "creating directory %.*s\n", (int)strlen(name), name);
+       dchild = ll_lookup_one_len(name, dir, strlen(name));
+       if (IS_ERR(dchild))
+               GOTO(out_up, dchild);
+
+       if (dchild->d_inode) {
+               int old_mode = dchild->d_inode->i_mode;
+               if (!S_ISDIR(old_mode)) {
+                       CERROR("found %s (%lu/%u) is mode %o\n", name,
+                              dchild->d_inode->i_ino,
+                              dchild->d_inode->i_generation, old_mode);
+                       GOTO(out_err, err = -ENOTDIR);
+               }
+
+               /* Fixup directory permissions if necessary */
+               if (fix && (old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
+                       CDEBUG(D_CONFIG,
+                              "fixing permissions on %s from %o to %o\n",
+                              name, old_mode, mode);
+                       dchild->d_inode->i_mode = (mode & S_IALLUGO) |
+                                                 (old_mode & ~S_IALLUGO);
+                       mark_inode_dirty(dchild->d_inode);
+               }
+               GOTO(out_up, dchild);
+       }
+
+       err = ll_vfs_mkdir(dir->d_inode, dchild, mnt, mode);
+       if (err)
+               GOTO(out_err, err);
+
+       RETURN(dchild);
+
+out_err:
+       dput(dchild);
+       dchild = ERR_PTR(err);
+out_up:
+       return dchild;
+}
+
 int osd_last_rcvd_subdir_count(struct osd_device *osd)
 {
         struct lr_server_data lsd;
@@ -114,23 +163,17 @@ out:
        return count;
 }
 
-/*
- * directory structure on legacy MDT:
- *
- * REM_OBJ_DIR/ per mdt
- * AGENT_OBJ_DIR/ per mdt
- *
- */
-static const char remote_obj_dir[] = "REM_OBJ_DIR";
-static const char agent_obj_dir[] = "AGENT_OBJ_DIR";
-int osd_mdt_init(struct osd_device *dev)
+static const char remote_parent_dir[] = "REMOTE_PARENT_DIR";
+static int osd_mdt_init(const struct lu_env *env, struct osd_device *dev)
 {
-       struct lvfs_run_ctxt  new;
-       struct lvfs_run_ctxt  save;
-       struct dentry   *parent;
-       struct osd_mdobj_map *omm;
-       struct dentry   *d;
-       int                rc = 0;
+       struct lvfs_run_ctxt    new;
+       struct lvfs_run_ctxt    save;
+       struct dentry           *parent;
+       struct osd_mdobj_map    *omm;
+       struct dentry           *d;
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct lu_fid           *fid = &info->oti_fid;
+       int                     rc = 0;
        ENTRY;
 
        OBD_ALLOC_PTR(dev->od_mdt_map);
@@ -144,18 +187,24 @@ int osd_mdt_init(struct osd_device *dev)
        parent = osd_sb(dev)->s_root;
        osd_push_ctxt(dev, &new, &save);
 
-       d = simple_mkdir(parent, dev->od_mnt, agent_obj_dir,
+       d = simple_mkdir(parent, dev->od_mnt, remote_parent_dir,
                         0755, 1);
        if (IS_ERR(d))
                GOTO(cleanup, rc = PTR_ERR(d));
 
-       omm->omm_agent_dentry = d;
+       ldiskfs_set_inode_state(d->d_inode, LDISKFS_STATE_LUSTRE_NO_OI);
+       omm->omm_remote_parent = d;
 
+       /* Set LMA for remote parent inode */
+       lu_local_obj_fid(fid, REMOTE_PARENT_DIR_OID);
+       rc = osd_ea_fid_set(info, d->d_inode, fid, 0);
+       if (rc != 0)
+               GOTO(cleanup, rc);
 cleanup:
        pop_ctxt(&save, &new, NULL);
        if (rc) {
-               if (omm->omm_agent_dentry != NULL)
-                       dput(omm->omm_agent_dentry);
+               if (omm->omm_remote_parent != NULL)
+                       dput(omm->omm_remote_parent);
                OBD_FREE_PTR(omm);
                dev->od_mdt_map = NULL;
        }
@@ -169,65 +218,104 @@ static void osd_mdt_fini(struct osd_device *osd)
        if (omm == NULL)
                return;
 
-       if (omm->omm_agent_dentry)
-               dput(omm->omm_agent_dentry);
+       if (omm->omm_remote_parent)
+               dput(omm->omm_remote_parent);
 
        OBD_FREE_PTR(omm);
        osd->od_ost_map = NULL;
 }
 
-int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
-                          struct osd_object *obj, struct osd_thandle *oh)
+int osd_add_to_remote_parent(const struct lu_env *env, struct osd_device *osd,
+                            struct osd_object *obj, struct osd_thandle *oh)
 {
        struct osd_mdobj_map    *omm = osd->od_mdt_map;
        struct osd_thread_info  *oti = osd_oti_get(env);
-       char                    *name_buf = oti->oti_name;
-       struct dentry           *agent;
+       struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs;
+       char                    *name = oti->oti_name;
+       struct dentry           *dentry;
        struct dentry           *parent;
        int                     rc;
 
-       parent = omm->omm_agent_dentry;
-       sprintf(name_buf, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
-       agent = osd_child_dentry_by_inode(env, parent->d_inode,
-                                         name_buf, strlen(name_buf));
+       /* Set REMOTE_PARENT in lma, so other process like unlink or lfsck
+        * can identify this object quickly */
+       rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry, lma);
+       if (rc != 0)
+               RETURN(rc);
+
+       lma->lma_incompat |= LMAI_REMOTE_PARENT;
+       lustre_lma_swab(lma);
+       rc = __osd_xattr_set(oti, obj->oo_inode, XATTR_NAME_LMA, lma,
+                            sizeof(*lma), XATTR_REPLACE);
+       if (rc != 0)
+               RETURN(rc);
+
+       parent = omm->omm_remote_parent;
+       sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+       dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+                                          name, strlen(name));
        mutex_lock(&parent->d_inode->i_mutex);
-       rc = osd_ldiskfs_add_entry(oh->ot_handle, agent, obj->oo_inode, NULL);
+       rc = osd_ldiskfs_add_entry(oh->ot_handle, dentry, obj->oo_inode,
+                                  NULL);
+       CDEBUG(D_INODE, "%s: add %s:%lu to remote parent %lu.\n", osd_name(osd),
+              name, obj->oo_inode->i_ino, parent->d_inode->i_ino);
+       LASSERTF(parent->d_inode->i_nlink > 1, "%s: %lu nlink %d",
+                osd_name(osd), parent->d_inode->i_ino,
+                parent->d_inode->i_nlink);
        parent->d_inode->i_nlink++;
        mark_inode_dirty(parent->d_inode);
        mutex_unlock(&parent->d_inode->i_mutex);
-       if (rc != 0)
-               CERROR("%s: "DFID" add agent error: rc = %d\n", osd_name(osd),
-                      PFID(lu_object_fid(&obj->oo_dt.do_lu)), rc);
        RETURN(rc);
 }
 
-int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
-                          struct osd_object *obj, struct osd_thandle *oh)
+int osd_delete_from_remote_parent(const struct lu_env *env,
+                                 struct osd_device *osd,
+                                 struct osd_object *obj,
+                                 struct osd_thandle *oh)
 {
        struct osd_mdobj_map       *omm = osd->od_mdt_map;
        struct osd_thread_info     *oti = osd_oti_get(env);
+       struct lustre_mdt_attrs    *lma = &oti->oti_mdt_attrs;
        char                       *name = oti->oti_name;
-       struct dentry              *agent;
+       struct dentry              *dentry;
        struct dentry              *parent;
        struct ldiskfs_dir_entry_2 *de;
        struct buffer_head         *bh;
        int                        rc;
 
-       parent = omm->omm_agent_dentry;
-       sprintf(name, DFID, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
-       agent = osd_child_dentry_by_inode(env, parent->d_inode,
-                                         name, strlen(name));
+       /* Check lma to see whether it is remote object */
+       rc = osd_get_lma(oti, obj->oo_inode, &oti->oti_obj_dentry, lma);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (likely(!(lma->lma_incompat & LMAI_REMOTE_PARENT)))
+               RETURN(0);
+
+       parent = omm->omm_remote_parent;
+       sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+       dentry = osd_child_dentry_by_inode(env, parent->d_inode,
+                                          name, strlen(name));
        mutex_lock(&parent->d_inode->i_mutex);
-       bh = osd_ldiskfs_find_entry(parent->d_inode, agent, &de, NULL);
+       bh = osd_ldiskfs_find_entry(parent->d_inode, dentry, &de, NULL);
        if (bh == NULL) {
                mutex_unlock(&parent->d_inode->i_mutex);
                RETURN(-ENOENT);
        }
+       CDEBUG(D_INODE, "%s: el %s:%lu to remote parent %lu.\n", osd_name(osd),
+              name, obj->oo_inode->i_ino, parent->d_inode->i_ino);
        rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode, de, bh);
+       LASSERTF(parent->d_inode->i_nlink > 1, "%s: %lu nlink %d",
+                osd_name(osd), parent->d_inode->i_ino,
+                parent->d_inode->i_nlink);
        parent->d_inode->i_nlink--;
        mark_inode_dirty(parent->d_inode);
        mutex_unlock(&parent->d_inode->i_mutex);
        brelse(bh);
+
+       /* Get rid of REMOTE_PARENT flag from incompat */
+       lma->lma_incompat &= ~LMAI_REMOTE_PARENT;
+       lustre_lma_swab(lma);
+       rc = __osd_xattr_set(oti, obj->oo_inode, XATTR_NAME_LMA, lma,
+                            sizeof(*lma), XATTR_REPLACE);
        RETURN(rc);
 }
 
@@ -241,7 +329,7 @@ int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
  * CONFIGS
  *
  */
-int osd_ost_init(struct osd_device *dev)
+static int osd_ost_init(struct osd_device *dev)
 {
        struct lvfs_run_ctxt  new;
        struct lvfs_run_ctxt  save;
@@ -334,7 +422,7 @@ static void osd_ost_fini(struct osd_device *osd)
        EXIT;
 }
 
-int osd_obj_map_init(struct osd_device *dev)
+int osd_obj_map_init(const struct lu_env *env, struct osd_device *dev)
 {
        int rc;
        ENTRY;
@@ -345,7 +433,7 @@ int osd_obj_map_init(struct osd_device *dev)
                RETURN(rc);
 
        /* prepare structures for MDS */
-       rc = osd_mdt_init(dev);
+       rc = osd_mdt_init(env, dev);
 
         RETURN(rc);
 }
@@ -614,16 +702,16 @@ int osd_obj_map_lookup(struct osd_thread_info *info, struct osd_device *dev,
         LASSERT(map);
        LASSERT(map->om_root);
 
-        fid_ostid_pack(fid, ostid);
-       osd_seq = osd_seq_load(dev, ostid->oi_seq);
+        fid_to_ostid(fid, ostid);
+       osd_seq = osd_seq_load(dev, ostid_seq(ostid));
        if (IS_ERR(osd_seq))
                RETURN(PTR_ERR(osd_seq));
 
-       dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+       dirn = ostid_id(ostid) & (osd_seq->oos_subdir_count - 1);
        d_seq = osd_seq->oos_dirs[dirn];
        LASSERT(d_seq);
 
-       osd_oid_name(name, fid, ostid->oi_id);
+       osd_oid_name(name, fid, ostid_id(ostid));
 
        child = &info->oti_child_dentry;
        child->d_parent = d_seq;
@@ -669,17 +757,17 @@ int osd_obj_map_insert(struct osd_thread_info *info,
         LASSERT(map);
 
        /* map fid to seq:objid */
-        fid_ostid_pack(fid, ostid);
+        fid_to_ostid(fid, ostid);
 
-       osd_seq = osd_seq_load(osd, ostid->oi_seq);
+       osd_seq = osd_seq_load(osd, ostid_seq(ostid));
        if (IS_ERR(osd_seq))
                RETURN(PTR_ERR(osd_seq));
 
-       dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+       dirn = ostid_id(ostid) & (osd_seq->oos_subdir_count - 1);
        d = osd_seq->oos_dirs[dirn];
         LASSERT(d);
 
-       osd_oid_name(name, fid, ostid->oi_id);
+       osd_oid_name(name, fid, ostid_id(ostid));
        rc = osd_obj_add_entry(info, osd, d, name, id, th);
 
         RETURN(rc);
@@ -700,17 +788,17 @@ int osd_obj_map_delete(struct osd_thread_info *info, struct osd_device *osd,
         LASSERT(map);
 
        /* map fid to seq:objid */
-        fid_ostid_pack(fid, ostid);
+        fid_to_ostid(fid, ostid);
 
-       osd_seq = osd_seq_load(osd, ostid->oi_seq);
+       osd_seq = osd_seq_load(osd, ostid_seq(ostid));
        if (IS_ERR(osd_seq))
                GOTO(cleanup, rc = PTR_ERR(osd_seq));
 
-       dirn = ostid->oi_id & (osd_seq->oos_subdir_count - 1);
+       dirn = ostid_id(ostid) & (osd_seq->oos_subdir_count - 1);
        d = osd_seq->oos_dirs[dirn];
        LASSERT(d);
 
-       osd_oid_name(name, fid, ostid->oi_id);
+       osd_oid_name(name, fid, ostid_id(ostid));
        rc = osd_obj_del_entry(info, osd, d, name, th);
 cleanup:
         RETURN(rc);
@@ -752,10 +840,10 @@ int osd_obj_spec_lookup(struct osd_thread_info *info, struct osd_device *osd,
                        const struct lu_fid *fid, struct osd_inode_id *id)
 {
        struct dentry   *root;
-       struct dentry *dentry;
-       struct inode  *inode;
-       char          *name;
-       int            rc = -ENOENT;
+       struct dentry   *dentry;
+       struct inode    *inode;
+       char            *name;
+       int             rc = -ENOENT;
        ENTRY;
 
        if (fid_is_last_id(fid)) {