Whamcloud - gitweb
LU-2886 obdclass: remove obsoleted md_local_file.c
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index 688a968..95538e5 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -463,13 +463,25 @@ static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
                lustre_lma_swab(lma);
                if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
                             CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
+                       rc = -EOPNOTSUPP;
                        CWARN("%s: unsupported incompat LMA feature(s) %#x for "
-                             "fid = "DFID", ino = %lu\n",
+                             "fid = "DFID", ino = %lu: rc = %d\n",
                              osd_obj2dev(obj)->od_svname,
                              lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
                              PFID(lu_object_fid(&obj->oo_dt.do_lu)),
-                             obj->oo_inode->i_ino);
-                       rc = -EOPNOTSUPP;
+                             obj->oo_inode->i_ino, rc);
+               }
+               if (unlikely(!lu_fid_eq(lu_object_fid(&obj->oo_dt.do_lu),
+                                       &lma->lma_self_fid))) {
+                       CDEBUG(D_INODE, "%s: FID "DFID" != self_fid "DFID"\n",
+                              osd_obj2dev(obj)->od_svname,
+                              PFID(lu_object_fid(&obj->oo_dt.do_lu)),
+                              PFID(&lma->lma_self_fid));
+                       if (obj->oo_inode != NULL) {
+                               iput(obj->oo_inode);
+                               obj->oo_inode = NULL;
+                       }
+                       rc = -ESTALE;
                }
        } else if (rc == -ENODATA) {
                /* haven't initialize LMA xattr */
@@ -500,8 +512,11 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
        result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
        obj->oo_dt.do_body_ops = &osd_body_ops_new;
        if (result == 0 && obj->oo_inode != NULL) {
-               osd_object_init0(obj);
                result = osd_check_lma(env, obj);
+               if (result != 0)
+                       return result;
+
+               osd_object_init0(obj);
        }
 
        LINVRNT(osd_invariant(obj));
@@ -2476,8 +2491,10 @@ static int osd_declare_object_ref_add(const struct lu_env *env,
 static int osd_object_ref_add(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
-        struct osd_object *obj = osd_dt_obj(dt);
-        struct inode      *inode = obj->oo_inode;
+       struct osd_object *obj = osd_dt_obj(dt);
+       struct inode      *inode = obj->oo_inode;
+       bool               need_dirty = false;
+       int                rc = 0;
 
         LINVRNT(osd_invariant(obj));
        LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
@@ -2486,33 +2503,44 @@ static int osd_object_ref_add(const struct lu_env *env,
 
        osd_trans_exec_op(env, th, OSD_OT_REF_ADD);
 
-       /*
-        * DIR_NLINK feature is set for compatibility reasons if:
-        * 1) nlinks > LDISKFS_LINK_MAX, or
-        * 2) nlinks == 2, since this indicates i_nlink was previously 1.
+       /* This based on ldiskfs_inc_count(), which is not exported.
+        *
+        * The DIR_NLINK feature allows directories to exceed LDISKFS_LINK_MAX
+        * (65000) subdirectories by storing "1" in i_nlink if the link count
+        * would otherwise overflow. Directory tranversal tools understand
+        * that (st_nlink == 1) indicates that the filesystem dose not track
+        * hard links count on the directory, and will not abort subdirectory
+        * scanning early once (st_nlink - 2) subdirs have been found.
         *
-        * It is easier to always set this flag (rather than check and set),
-        * since it has less overhead, and the superblock will be dirtied
-        * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
-        * do not actually care whether this flag is set or not.
+        * This also has to properly handle the case of inodes with nlink == 0
+        * in case they are being linked into the PENDING directory
         */
        spin_lock(&obj->oo_guard);
-       /* inc_nlink from 0 may cause WARN_ON */
-       if(inode->i_nlink == 0)
+       if (unlikely(!S_ISDIR(inode->i_mode) &&
+                    inode->i_nlink >= LDISKFS_LINK_MAX)) {
+               /* MDD should have checked this, but good to be safe */
+               rc = -EMLINK;
+       } else if (unlikely(inode->i_nlink == 0 ||
+                           (S_ISDIR(inode->i_mode) &&
+                            inode->i_nlink >= LDISKFS_LINK_MAX))) {
+               /* inc_nlink from 0 may cause WARN_ON */
                set_nlink(inode, 1);
-       else
+               need_dirty = true;
+       } else if (!S_ISDIR(inode->i_mode) ||
+                  (S_ISDIR(inode->i_mode) && inode->i_nlink >= 2)) {
                inc_nlink(inode);
-       if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
-               if (inode->i_nlink >= LDISKFS_LINK_MAX ||
-                   inode->i_nlink == 2)
-                       set_nlink(inode, 1);
-       }
+               need_dirty = true;
+       } /* else (S_ISDIR(inode->i_mode) && inode->i_nlink == 1) { ; } */
+
        LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
        spin_unlock(&obj->oo_guard);
-       ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+
+       if (need_dirty)
+               ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+
        LINVRNT(osd_invariant(obj));
 
-       return 0;
+       return rc;
 }
 
 static int osd_declare_object_ref_del(const struct lu_env *env,
@@ -2551,15 +2579,24 @@ static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
 
        spin_lock(&obj->oo_guard);
        LASSERT(inode->i_nlink > 0);
-       drop_nlink(inode);
-       /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
-        * then the nlink count is 1. Don't let it be set to 0 or the directory
-        * inode will be deleted incorrectly. */
-       if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
-               set_nlink(inode, 1);
-       spin_unlock(&obj->oo_guard);
-       ll_dirty_inode(inode, I_DIRTY_DATASYNC);
-       LINVRNT(osd_invariant(obj));
+
+       /* This based on ldiskfs_dec_count(), which is not exported.
+        *
+        * If a directory already has nlink == 1, then do not drop the nlink
+        * count to 0, even temporarily, to avoid race conditions with other
+        * threads not holding oo_guard seeing i_nlink == 0 in rare cases.
+        *
+        * nlink == 1 means the directory has/had > EXT4_LINK_MAX subdirs.
+        * */
+       if (!S_ISDIR(inode->i_mode) || inode->i_nlink > 1) {
+               drop_nlink(inode);
+
+               spin_unlock(&obj->oo_guard);
+               ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+               LINVRNT(osd_invariant(obj));
+       } else {
+               spin_unlock(&obj->oo_guard);
+       }
 
        return 0;
 }
@@ -3751,6 +3788,19 @@ static int osd_fail_fid_lookup(struct osd_thread_info *oti,
        return rc;
 }
 
+static int osd_add_oi_cache(struct osd_thread_info *info,
+                           struct osd_device *osd,
+                           struct osd_inode_id *id,
+                           struct lu_fid *fid)
+{
+       CDEBUG(D_INODE, "add "DFID" %u:%u to info %p\n", PFID(fid),
+              id->oii_ino, id->oii_gen, info);
+       info->oti_cache.oic_lid = *id;
+       info->oti_cache.oic_fid = *fid;
+
+       return 0;
+}
+
 /**
  * Calls ->lookup() to find dentry. From dentry get inode and
  * read inode's ea to get fid. This is required for  interoperability
@@ -3814,8 +3864,10 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                        GOTO(out, rc);
                }
 
-               oic->oic_lid = *id;
-               oic->oic_fid = *fid;
+               rc = osd_add_oi_cache(osd_oti_get(env), osd_obj2dev(obj), id,
+                                     fid);
+               if (rc != 0)
+                       GOTO(out, rc);
                if ((scrub->os_pos_current <= ino) &&
                    ((sf->sf_flags & SF_INCONSISTENT) ||
                     (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) ||
@@ -5016,10 +5068,8 @@ pack:
        if (osd_remote_fid(env, dev, fid))
                RETURN(0);
 
-       if (likely(!(attr & LUDA_IGNORE))) {
-               oic->oic_lid = *id;
-               oic->oic_fid = *fid;
-       }
+       if (likely(!(attr & LUDA_IGNORE)))
+               rc = osd_add_oi_cache(oti, dev, id, fid);
 
        if (!(attr & LUDA_VERIFY) &&
            (scrub->os_pos_current <= ino) &&
@@ -5202,20 +5252,33 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
 {
        ENTRY;
 
-       osd_scrub_cleanup(env, o);
+       /* shutdown quota slave instance associated with the device */
+       if (o->od_quota_slave != NULL) {
+               qsd_fini(env, o->od_quota_slave);
+               o->od_quota_slave = NULL;
+       }
+
+       RETURN(0);
+}
+
+static void osd_umount(const struct lu_env *env, struct osd_device *o)
+{
+       ENTRY;
 
        if (o->od_fsops) {
                fsfilt_put_ops(o->od_fsops);
                o->od_fsops = NULL;
        }
 
-       /* shutdown quota slave instance associated with the device */
-       if (o->od_quota_slave != NULL) {
-               qsd_fini(env, o->od_quota_slave);
-               o->od_quota_slave = NULL;
+       if (o->od_mnt != NULL) {
+               shrink_dcache_sb(osd_sb(o));
+               osd_sync(env, &o->od_dt_dev);
+
+               mntput(o->od_mnt);
+               o->od_mnt = NULL;
        }
 
-       RETURN(0);
+       EXIT;
 }
 
 static int osd_mount(const struct lu_env *env,
@@ -5317,30 +5380,18 @@ out:
 }
 
 static struct lu_device *osd_device_fini(const struct lu_env *env,
-                                         struct lu_device *d)
+                                        struct lu_device *d)
 {
-        int rc;
-        ENTRY;
-
-       rc = osd_shutdown(env, osd_dev(d));
-
-       osd_obj_map_fini(osd_dev(d));
-
-        shrink_dcache_sb(osd_sb(osd_dev(d)));
-        osd_sync(env, lu2dt_dev(d));
-
-        rc = osd_procfs_fini(osd_dev(d));
-        if (rc) {
-                CERROR("proc fini error %d \n", rc);
-                RETURN (ERR_PTR(rc));
-        }
+       struct osd_device *o = osd_dev(d);
+       ENTRY;
 
-       if (osd_dev(d)->od_mnt) {
-               mntput(osd_dev(d)->od_mnt);
-               osd_dev(d)->od_mnt = NULL;
-       }
+       osd_procfs_fini(o);
+       osd_shutdown(env, o);
+       osd_scrub_cleanup(env, o);
+       osd_obj_map_fini(o);
+       osd_umount(env, o);
 
-        RETURN(NULL);
+       RETURN(NULL);
 }
 
 static int osd_device_init0(const struct lu_env *env,
@@ -5378,12 +5429,6 @@ static int osd_device_init0(const struct lu_env *env,
        if (rc)
                GOTO(out_capa, rc);
 
-       CFS_INIT_LIST_HEAD(&o->od_ios_list);
-       /* setup scrub, including OI files initialization */
-       rc = osd_scrub_setup(env, o);
-       if (rc < 0)
-               GOTO(out_mnt, rc);
-
        cplen = strlcpy(o->od_svname, lustre_cfg_string(cfg, 4),
                        sizeof(o->od_svname));
        if (cplen >= sizeof(o->od_svname)) {
@@ -5393,22 +5438,28 @@ static int osd_device_init0(const struct lu_env *env,
 
        rc = osd_obj_map_init(env, o);
        if (rc != 0)
-               GOTO(out_scrub, rc);
+               GOTO(out_mnt, rc);
 
        rc = lu_site_init(&o->od_site, l);
-       if (rc)
+       if (rc != 0)
                GOTO(out_compat, rc);
        o->od_site.ls_bottom_dev = l;
 
        rc = lu_site_init_finish(&o->od_site);
-       if (rc)
+       if (rc != 0)
+               GOTO(out_site, rc);
+
+       CFS_INIT_LIST_HEAD(&o->od_ios_list);
+       /* setup scrub, including OI files initialization */
+       rc = osd_scrub_setup(env, o);
+       if (rc < 0)
                GOTO(out_site, rc);
 
        rc = osd_procfs_init(o, o->od_svname);
        if (rc != 0) {
                CERROR("%s: can't initialize procfs: rc = %d\n",
                       o->od_svname, rc);
-               GOTO(out_site, rc);
+               GOTO(out_scrub, rc);
        }
 
        LASSERT(l->ld_site->ls_linkage.next && l->ld_site->ls_linkage.prev);
@@ -5423,23 +5474,21 @@ static int osd_device_init0(const struct lu_env *env,
        }
 
        RETURN(0);
+
 out_procfs:
        osd_procfs_fini(o);
+out_scrub:
+       osd_scrub_cleanup(env, o);
 out_site:
        lu_site_fini(&o->od_site);
 out_compat:
        osd_obj_map_fini(o);
-out_scrub:
-       osd_scrub_cleanup(env, o);
 out_mnt:
-       osd_oi_fini(info, o);
-       osd_shutdown(env, o);
-       mntput(o->od_mnt);
-       o->od_mnt = NULL;
+       osd_umount(env, o);
 out_capa:
        cleanup_capa_hash(o->od_capa_hash);
 out:
-       RETURN(rc);
+       return rc;
 }
 
 static struct lu_device *osd_device_alloc(const struct lu_env *env,
@@ -5586,15 +5635,6 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
        int                result = 0;
        ENTRY;
 
-       if (dev->ld_site && lu_device_is_md(dev->ld_site->ls_top_dev)) {
-               /* MDT/MDD still use old infrastructure to create
-                * special files */
-               result = llo_local_objects_setup(env, lu2md_dev(pdev),
-                                                lu2dt_dev(dev));
-               if (result)
-                       RETURN(result);
-       }
-
        if (osd->od_quota_slave != NULL)
                /* set up quota slave objects */
                result = qsd_prepare(env, osd->od_quota_slave);