Whamcloud - gitweb
LU-8068 osd-zfs: large dnode support
[fs/lustre-release.git] / lustre / osd-zfs / osd_object.c
index 39b31ba..b534906 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -231,6 +231,28 @@ int __osd_object_attr_get(const struct lu_env *env, struct osd_device *o,
        la->la_flags = attrs_zfs2fs(osa->flags);
        la->la_size = osa->size;
 
+       /* Try to get extra flag from LMA. Right now, only LMAI_ORPHAN
+        * flags is stored in LMA, and it is only for orphan directory */
+       if (S_ISDIR(la->la_mode) && dt_object_exists(&obj->oo_dt)) {
+               struct osd_thread_info *info = osd_oti_get(env);
+               struct lustre_mdt_attrs *lma;
+               struct lu_buf buf;
+
+               lma = (struct lustre_mdt_attrs *)info->oti_buf;
+               buf.lb_buf = lma;
+               buf.lb_len = sizeof(info->oti_buf);
+               rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
+               if (rc > 0) {
+                       rc = 0;
+                       lma->lma_incompat = le32_to_cpu(lma->lma_incompat);
+                       obj->oo_lma_flags =
+                               lma_to_lustre_flags(lma->lma_incompat);
+
+               } else if (rc == -ENODATA) {
+                       rc = 0;
+               }
+       }
+
        if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
                rc = -sa_lookup(sa_hdl, SA_ZPL_RDEV(o), &osa->rdev, 8);
                if (rc)
@@ -293,7 +315,7 @@ struct lu_object *osd_object_alloc(const struct lu_env *env,
                INIT_LIST_HEAD(&mo->oo_sa_linkage);
                INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
                init_rwsem(&mo->oo_sem);
-               sema_init(&mo->oo_guard, 1);
+               init_rwsem(&mo->oo_guard);
                rwlock_init(&mo->oo_attr_lock);
                mo->oo_destroy = OSD_DESTROY_NONE;
                return l;
@@ -395,10 +417,18 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
                RETURN(0);
        }
 
+       if (conf != NULL && conf->loc_flags & LOC_F_NEW)
+               GOTO(out, rc = 0);
+
        rc = osd_fid_lookup(env, osd, lu_object_fid(l), &oid);
        if (rc == 0) {
                LASSERT(obj->oo_db == NULL);
                rc = __osd_obj2dbuf(env, osd->od_os, oid, &obj->oo_db);
+               /* EEXIST will be returned if object is being deleted in ZFS */
+               if (rc == -EEXIST) {
+                       rc = 0;
+                       GOTO(out, rc);
+               }
                if (rc != 0) {
                        CERROR("%s: lookup "DFID"/"LPX64" failed: rc = %d\n",
                               osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
@@ -439,25 +469,24 @@ osd_object_unlinked_add(struct osd_object *obj, struct osd_thandle *oh)
 {
        int rc = -EBUSY;
 
-       down(&obj->oo_guard);
-
        LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
 
+       /* the object is supposed to be exclusively locked by
+        * the caller (osd_object_destroy()), while the transaction
+        * (oh) is per-thread and not shared */
        if (likely(list_empty(&obj->oo_unlinked_linkage))) {
                list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
                rc = 0;
        }
 
-       up(&obj->oo_guard);
-
        return rc;
 }
 
 /* Default to max data size covered by a level-1 indirect block */
 static unsigned long osd_sync_destroy_max_size =
        1UL << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT + SPA_MAXBLOCKSHIFT);
-CFS_MODULE_PARM(osd_sync_destroy_max_size, "ul", ulong, 0444,
-               "Maximum object size to use synchronous destroy.");
+module_param(osd_sync_destroy_max_size, ulong, 0444);
+MODULE_PARM_DESC(osd_sync_destroy_max_size, "Maximum object size to use synchronous destroy.");
 
 static inline void
 osd_object_set_destroy_type(struct osd_object *obj)
@@ -466,14 +495,14 @@ osd_object_set_destroy_type(struct osd_object *obj)
         * Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
         * only once and use it consistently thereafter.
         */
-       down(&obj->oo_guard);
+       down_write(&obj->oo_guard);
        if (obj->oo_destroy == OSD_DESTROY_NONE) {
                if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
                        obj->oo_destroy = OSD_DESTROY_SYNC;
                else /* Larger objects are destroyed asynchronously */
                        obj->oo_destroy = OSD_DESTROY_ASYNC;
        }
-       up(&obj->oo_guard);
+       up_write(&obj->oo_guard);
 }
 
 static int osd_declare_object_destroy(const struct lu_env *env,
@@ -542,9 +571,12 @@ static int osd_object_destroy(const struct lu_env *env,
        uint64_t                 oid, zapid;
        ENTRY;
 
+       down_write(&obj->oo_guard);
+
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
+
        LASSERT(obj->oo_db != NULL);
-       LASSERT(dt_object_exists(dt));
-       LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
 
        oh = container_of0(th, struct osd_thandle, ot_super);
        LASSERT(oh != NULL);
@@ -583,7 +615,17 @@ static int osd_object_destroy(const struct lu_env *env,
                       obj->oo_attr.la_gid, rc);
 
        oid = obj->oo_db->db_object;
-       if (obj->oo_destroy == OSD_DESTROY_SYNC) {
+       if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
+               /* this may happen if the destroy wasn't declared
+                * e.g. when the object is created and then destroyed
+                * in the same transaction - we don't need additional
+                * space for destroy specifically */
+               LASSERT(obj->oo_attr.la_size <= osd_sync_destroy_max_size);
+               rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
+               if (rc)
+                       CERROR("%s: failed to free %s "LPU64": rc = %d\n",
+                              osd->od_svname, buf, oid, rc);
+       } else if (obj->oo_destroy == OSD_DESTROY_SYNC) {
                rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
                if (rc)
                        CERROR("%s: failed to free %s "LPU64": rc = %d\n",
@@ -605,6 +647,7 @@ out:
        set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
        if (rc == 0)
                obj->oo_destroyed = 1;
+       up_write(&obj->oo_guard);
        RETURN (0);
 }
 
@@ -650,7 +693,7 @@ static void osd_object_read_lock(const struct lu_env *env,
 
        LASSERT(osd_invariant(obj));
 
-       down_read(&obj->oo_sem);
+       down_read_nested(&obj->oo_sem, role);
 }
 
 static void osd_object_write_lock(const struct lu_env *env,
@@ -660,7 +703,7 @@ static void osd_object_write_lock(const struct lu_env *env,
 
        LASSERT(osd_invariant(obj));
 
-       down_write(&obj->oo_sem);
+       down_write_nested(&obj->oo_sem, role);
 }
 
 static void osd_object_read_unlock(const struct lu_env *env,
@@ -703,13 +746,20 @@ static int osd_attr_get(const struct lu_env *env,
        struct osd_object       *obj = osd_dt_obj(dt);
        uint64_t                 blocks;
        uint32_t                 blksize;
+       int                      rc = 0;
+
+       down_read(&obj->oo_guard);
+
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
 
-       LASSERT(dt_object_exists(dt));
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_db);
 
        read_lock(&obj->oo_attr_lock);
        *attr = obj->oo_attr;
+       if (obj->oo_lma_flags & LUSTRE_ORPHAN_FL)
+               attr->la_flags |= LUSTRE_ORPHAN_FL;
        read_unlock(&obj->oo_attr_lock);
 
        /* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
@@ -729,7 +779,9 @@ static int osd_attr_get(const struct lu_env *env,
        attr->la_blocks = blocks;
        attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
 
-       return 0;
+out:
+       up_read(&obj->oo_guard);
+       return rc;
 }
 
 /* Simple wrapper on top of qsd API which implement quota transfer for osd
@@ -746,7 +798,7 @@ static inline int qsd_transfer(const struct lu_env *env,
        if (unlikely(qsd == NULL))
                return 0;
 
-       LASSERT(qtype >= 0 && qtype < MAXQUOTAS);
+       LASSERT(qtype >= 0 && qtype < LL_MAXQUOTAS);
        qi->lqi_type = qtype;
 
        /* inode accounting */
@@ -805,28 +857,31 @@ static int osd_declare_attr_set(const struct lu_env *env,
        struct osd_thandle      *oh;
        uint64_t                 bspace;
        uint32_t                 blksize;
-       int                      rc;
+       int                      rc = 0;
        ENTRY;
 
-       if (!dt_object_exists(dt)) {
-               /* XXX: sanity check that object creation is declared */
-               RETURN(0);
-       }
 
        LASSERT(handle != NULL);
        LASSERT(osd_invariant(obj));
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
 
+       down_read(&obj->oo_guard);
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = 0);
+
        LASSERT(obj->oo_sa_hdl != NULL);
        LASSERT(oh->ot_tx != NULL);
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
        if (oh->ot_tx->tx_err != 0)
-               RETURN(-oh->ot_tx->tx_err);
+               GOTO(out, rc = -oh->ot_tx->tx_err);
 
        sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
        bspace = toqb(bspace * blksize);
 
+       __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
+                               XATTR_NAME_LMA, oh);
+
        if (attr && attr->la_valid & LA_UID) {
                /* account for user inode tracking ZAP update */
                dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
@@ -839,7 +894,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                          obj->oo_attr.la_uid, attr->la_uid,
                                          bspace, &info->oti_qi);
                        if (rc)
-                               RETURN(rc);
+                               GOTO(out, rc);
                }
        }
        if (attr && attr->la_valid & LA_GID) {
@@ -854,11 +909,13 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                          obj->oo_attr.la_gid, attr->la_gid,
                                          bspace, &info->oti_qi);
                        if (rc)
-                               RETURN(rc);
+                               GOTO(out, rc);
                }
        }
 
-       RETURN(0);
+out:
+       up_read(&obj->oo_guard);
+       RETURN(rc);
 }
 
 /*
@@ -871,18 +928,23 @@ static int osd_declare_attr_set(const struct lu_env *env,
 static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
                        const struct lu_attr *la, struct thandle *handle)
 {
+       struct osd_thread_info  *info = osd_oti_get(env);
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
-       struct osa_attr         *osa = &osd_oti_get(env)->oti_osa;
+       struct osa_attr         *osa = &info->oti_osa;
        sa_bulk_attr_t          *bulk;
        __u64                    valid = la->la_valid;
        int                      cnt;
        int                      rc = 0;
 
        ENTRY;
+
+       down_read(&obj->oo_guard);
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
+
        LASSERT(handle != NULL);
-       LASSERT(dt_object_exists(dt));
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_sa_hdl);
 
@@ -895,12 +957,54 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
        if (!S_ISREG(dt->do_lu.lo_header->loh_attr))
                valid &= ~(LA_SIZE | LA_BLOCKS);
 
+       if (valid & LA_CTIME && la->la_ctime == obj->oo_attr.la_ctime)
+               valid &= ~LA_CTIME;
+
+       if (valid & LA_MTIME && la->la_mtime == obj->oo_attr.la_mtime)
+               valid &= ~LA_MTIME;
+
+       if (valid & LA_ATIME && la->la_atime == obj->oo_attr.la_atime)
+               valid &= ~LA_ATIME;
+
        if (valid == 0)
-               RETURN(0);
+               GOTO(out, rc = 0);
+
+       if (valid & LA_FLAGS) {
+               struct lustre_mdt_attrs *lma;
+               struct lu_buf buf;
+
+               if (la->la_flags & LUSTRE_LMA_FL_MASKS) {
+                       CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
+                       lma = (struct lustre_mdt_attrs *)&info->oti_buf;
+                       buf.lb_buf = lma;
+                       buf.lb_len = sizeof(info->oti_buf);
+                       rc = osd_xattr_get(env, &obj->oo_dt, &buf,
+                                          XATTR_NAME_LMA);
+                       if (rc > 0) {
+                               lma->lma_incompat =
+                                       le32_to_cpu(lma->lma_incompat);
+                               lma->lma_incompat |=
+                                       lustre_to_lma_flags(la->la_flags);
+                               lma->lma_incompat =
+                                       cpu_to_le32(lma->lma_incompat);
+                               buf.lb_buf = lma;
+                               buf.lb_len = sizeof(*lma);
+                               rc = osd_xattr_set_internal(env, obj, &buf,
+                                                           XATTR_NAME_LMA,
+                                                           LU_XATTR_REPLACE,
+                                                           oh);
+                       }
+                       if (rc < 0) {
+                               CWARN("%s: failed to set LMA flags: rc = %d\n",
+                                      osd->od_svname, rc);
+                               RETURN(rc);
+                       }
+               }
+       }
 
        OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 10);
        if (bulk == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        /* do both accounting updates outside oo_attr_lock below */
        if ((valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
@@ -998,6 +1102,8 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
        rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
 
        OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 10);
+out:
+       up_read(&obj->oo_guard);
        RETURN(rc);
 }
 
@@ -1187,9 +1293,8 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
                     fid_seq_is_local_file(fid_seq(fid))))
                type = DMU_OTN_UINT8_METADATA;
 
-       /* Create a new DMU object. */
-       oid = dmu_object_alloc(osd->od_os, type, 0,
-                              DMU_OT_SA, DN_MAX_BONUSLEN, tx);
+       /* Create a new DMU object using the default dnode size. */
+       oid = osd_dmu_object_alloc(osd->od_os, type, 0, 0, tx);
        rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, dbp);
        LASSERTF(rc == 0, "sa_buf_hold "LPU64" failed: %d\n", oid, rc);
 
@@ -1229,11 +1334,11 @@ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
           transaction group. */
        LASSERT(tx->tx_txg != 0);
 
-       oid = zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
-                              DMU_OT_DIRECTORY_CONTENTS,
-                              14, /* == ZFS fzap_default_block_shift */
-                              DN_MAX_INDBLKSHIFT, /* indirect block shift */
-                              DMU_OT_SA, DN_MAX_BONUSLEN, tx);
+       oid = osd_zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
+                                  DMU_OT_DIRECTORY_CONTENTS,
+                                  14, /* == ZFS fzap_default_blockshift */
+                                  DN_MAX_INDBLKSHIFT, /* indirect blockshift */
+                                  0, tx);
 
        rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, zap_dbp);
        if (rc)
@@ -1284,8 +1389,9 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
                            struct lu_attr *la, uint64_t parent,
                            struct osd_thandle *oh)
 {
-       dmu_buf_t         *db;
-       int                rc;
+       const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+       dmu_buf_t           *db;
+       int                  rc;
        struct osd_device *osd = osd_obj2dev(obj);
 
        LASSERT(S_ISREG(la->la_mode));
@@ -1293,11 +1399,16 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
        if (rc)
                return ERR_PTR(rc);
 
-       if (!lu_device_is_md(osd2lu_dev(osd))) {
-               /* uses 4K as default block size because clients write data
-                * with page size that is 4K at minimum */
+       /*
+        * XXX: This heuristic is non-optimal.  It would be better to
+        * increase the blocksize up to osd->od_max_blksz during the write.
+        * This is exactly how the ZPL behaves and it ensures that the right
+        * blocksize is selected based on the file size rather than the
+        * making broad assumptions based on the osd type.
+        */
+       if ((fid_is_idif(fid) || fid_is_norm(fid)) && osd->od_is_ost) {
                rc = -dmu_object_set_blocksize(osd->od_os, db->db_object,
-                                              4096, 0, oh->ot_tx);
+                                              osd->od_max_blksz, 0, oh->ot_tx);
                if (unlikely(rc)) {
                        CERROR("%s: can't change blocksize: %d\n",
                               osd->od_svname, rc);
@@ -1418,10 +1529,12 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
        /* concurrent create declarations should not see
         * the object inconsistent (db, attr, etc).
         * in regular cases acquisition should be cheap */
-       down(&obj->oo_guard);
+       down_write(&obj->oo_guard);
+
+       if (unlikely(dt_object_exists(dt)))
+               GOTO(out, rc = -EEXIST);
 
        LASSERT(osd_invariant(obj));
-       LASSERT(!dt_object_exists(dt));
        LASSERT(dof != NULL);
 
        LASSERT(th != NULL);
@@ -1476,15 +1589,12 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
        LASSERT(osd_invariant(obj));
 
        rc = osd_init_lma(env, obj, fid, oh);
-       if (rc) {
+       if (rc != 0)
                CERROR("%s: can not set LMA on "DFID": rc = %d\n",
                       osd->od_svname, PFID(fid), rc);
-               /* ignore errors during LMA initialization */
-               rc = 0;
-       }
 
 out:
-       up(&obj->oo_guard);
+       up_write(&obj->oo_guard);
        RETURN(rc);
 }
 
@@ -1510,8 +1620,11 @@ static int osd_object_ref_add(const struct lu_env *env,
 
        ENTRY;
 
+       down_read(&obj->oo_guard);
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
+
        LASSERT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt));
        LASSERT(obj->oo_sa_hdl != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -1521,7 +1634,10 @@ static int osd_object_ref_add(const struct lu_env *env,
        write_unlock(&obj->oo_attr_lock);
 
        rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
-       return rc;
+
+out:
+       up_read(&obj->oo_guard);
+       RETURN(rc);
 }
 
 static int osd_declare_object_ref_del(const struct lu_env *env,
@@ -1546,8 +1662,12 @@ static int osd_object_ref_del(const struct lu_env *env,
 
        ENTRY;
 
+       down_read(&obj->oo_guard);
+
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
+
        LASSERT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt));
        LASSERT(obj->oo_sa_hdl != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -1558,7 +1678,10 @@ static int osd_object_ref_del(const struct lu_env *env,
        write_unlock(&obj->oo_attr_lock);
 
        rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
-       return rc;
+
+out:
+       up_read(&obj->oo_guard);
+       RETURN(rc);
 }
 
 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,