Whamcloud - gitweb
LU-7713 osd: osd-zfs should serialize destroy vs. others
[fs/lustre-release.git] / lustre / osd-zfs / osd_object.c
index 7935fab..b65a282 100644 (file)
@@ -315,7 +315,7 @@ struct lu_object *osd_object_alloc(const struct lu_env *env,
                INIT_LIST_HEAD(&mo->oo_sa_linkage);
                INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
                init_rwsem(&mo->oo_sem);
                INIT_LIST_HEAD(&mo->oo_sa_linkage);
                INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
                init_rwsem(&mo->oo_sem);
-               sema_init(&mo->oo_guard, 1);
+               init_rwsem(&mo->oo_guard);
                rwlock_init(&mo->oo_attr_lock);
                mo->oo_destroy = OSD_DESTROY_NONE;
                return l;
                rwlock_init(&mo->oo_attr_lock);
                mo->oo_destroy = OSD_DESTROY_NONE;
                return l;
@@ -466,17 +466,16 @@ osd_object_unlinked_add(struct osd_object *obj, struct osd_thandle *oh)
 {
        int rc = -EBUSY;
 
 {
        int rc = -EBUSY;
 
-       down(&obj->oo_guard);
-
        LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
 
        LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
 
+       /* the object is supposed to be exclusively locked by
+        * the caller (osd_object_destroy()), while the transaction
+        * (oh) is per-thread and not shared */
        if (likely(list_empty(&obj->oo_unlinked_linkage))) {
                list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
                rc = 0;
        }
 
        if (likely(list_empty(&obj->oo_unlinked_linkage))) {
                list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
                rc = 0;
        }
 
-       up(&obj->oo_guard);
-
        return rc;
 }
 
        return rc;
 }
 
@@ -493,14 +492,14 @@ osd_object_set_destroy_type(struct osd_object *obj)
         * Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
         * only once and use it consistently thereafter.
         */
         * Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
         * only once and use it consistently thereafter.
         */
-       down(&obj->oo_guard);
+       down_write(&obj->oo_guard);
        if (obj->oo_destroy == OSD_DESTROY_NONE) {
                if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
                        obj->oo_destroy = OSD_DESTROY_SYNC;
                else /* Larger objects are destroyed asynchronously */
                        obj->oo_destroy = OSD_DESTROY_ASYNC;
        }
        if (obj->oo_destroy == OSD_DESTROY_NONE) {
                if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
                        obj->oo_destroy = OSD_DESTROY_SYNC;
                else /* Larger objects are destroyed asynchronously */
                        obj->oo_destroy = OSD_DESTROY_ASYNC;
        }
-       up(&obj->oo_guard);
+       up_write(&obj->oo_guard);
 }
 
 static int osd_declare_object_destroy(const struct lu_env *env,
 }
 
 static int osd_declare_object_destroy(const struct lu_env *env,
@@ -569,9 +568,12 @@ static int osd_object_destroy(const struct lu_env *env,
        uint64_t                 oid, zapid;
        ENTRY;
 
        uint64_t                 oid, zapid;
        ENTRY;
 
+       down_write(&obj->oo_guard);
+
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
+
        LASSERT(obj->oo_db != NULL);
        LASSERT(obj->oo_db != NULL);
-       LASSERT(dt_object_exists(dt));
-       LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
 
        oh = container_of0(th, struct osd_thandle, ot_super);
        LASSERT(oh != NULL);
 
        oh = container_of0(th, struct osd_thandle, ot_super);
        LASSERT(oh != NULL);
@@ -642,6 +644,7 @@ out:
        set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
        if (rc == 0)
                obj->oo_destroyed = 1;
        set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
        if (rc == 0)
                obj->oo_destroyed = 1;
+       up_write(&obj->oo_guard);
        RETURN (0);
 }
 
        RETURN (0);
 }
 
@@ -740,11 +743,12 @@ static int osd_attr_get(const struct lu_env *env,
        struct osd_object       *obj = osd_dt_obj(dt);
        uint64_t                 blocks;
        uint32_t                 blksize;
        struct osd_object       *obj = osd_dt_obj(dt);
        uint64_t                 blocks;
        uint32_t                 blksize;
+       int                      rc = 0;
 
 
-       if (unlikely(!dt_object_exists(dt)))
-               return -ENOENT;
-       if (unlikely(obj->oo_destroyed))
-               return -ENOENT;
+       down_read(&obj->oo_guard);
+
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
 
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_db);
 
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_db);
@@ -772,7 +776,9 @@ static int osd_attr_get(const struct lu_env *env,
        attr->la_blocks = blocks;
        attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
 
        attr->la_blocks = blocks;
        attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
 
-       return 0;
+out:
+       up_read(&obj->oo_guard);
+       return rc;
 }
 
 /* Simple wrapper on top of qsd API which implement quota transfer for osd
 }
 
 /* Simple wrapper on top of qsd API which implement quota transfer for osd
@@ -848,24 +854,24 @@ static int osd_declare_attr_set(const struct lu_env *env,
        struct osd_thandle      *oh;
        uint64_t                 bspace;
        uint32_t                 blksize;
        struct osd_thandle      *oh;
        uint64_t                 bspace;
        uint32_t                 blksize;
-       int                      rc;
+       int                      rc = 0;
        ENTRY;
 
        ENTRY;
 
-       if (!dt_object_exists(dt)) {
-               /* XXX: sanity check that object creation is declared */
-               RETURN(0);
-       }
 
        LASSERT(handle != NULL);
        LASSERT(osd_invariant(obj));
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
 
 
        LASSERT(handle != NULL);
        LASSERT(osd_invariant(obj));
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
 
+       down_read(&obj->oo_guard);
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = 0);
+
        LASSERT(obj->oo_sa_hdl != NULL);
        LASSERT(oh->ot_tx != NULL);
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
        if (oh->ot_tx->tx_err != 0)
        LASSERT(obj->oo_sa_hdl != NULL);
        LASSERT(oh->ot_tx != NULL);
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
        if (oh->ot_tx->tx_err != 0)
-               RETURN(-oh->ot_tx->tx_err);
+               GOTO(out, rc = -oh->ot_tx->tx_err);
 
        sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
        bspace = toqb(bspace * blksize);
 
        sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
        bspace = toqb(bspace * blksize);
@@ -885,7 +891,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                          obj->oo_attr.la_uid, attr->la_uid,
                                          bspace, &info->oti_qi);
                        if (rc)
                                          obj->oo_attr.la_uid, attr->la_uid,
                                          bspace, &info->oti_qi);
                        if (rc)
-                               RETURN(rc);
+                               GOTO(out, rc);
                }
        }
        if (attr && attr->la_valid & LA_GID) {
                }
        }
        if (attr && attr->la_valid & LA_GID) {
@@ -900,11 +906,13 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                          obj->oo_attr.la_gid, attr->la_gid,
                                          bspace, &info->oti_qi);
                        if (rc)
                                          obj->oo_attr.la_gid, attr->la_gid,
                                          bspace, &info->oti_qi);
                        if (rc)
-                               RETURN(rc);
+                               GOTO(out, rc);
                }
        }
 
                }
        }
 
-       RETURN(0);
+out:
+       up_read(&obj->oo_guard);
+       RETURN(rc);
 }
 
 /*
 }
 
 /*
@@ -928,8 +936,12 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
        int                      rc = 0;
 
        ENTRY;
        int                      rc = 0;
 
        ENTRY;
+
+       down_read(&obj->oo_guard);
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
+
        LASSERT(handle != NULL);
        LASSERT(handle != NULL);
-       LASSERT(dt_object_exists(dt));
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_sa_hdl);
 
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_sa_hdl);
 
@@ -943,7 +955,7 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
                valid &= ~(LA_SIZE | LA_BLOCKS);
 
        if (valid == 0)
                valid &= ~(LA_SIZE | LA_BLOCKS);
 
        if (valid == 0)
-               RETURN(0);
+               GOTO(out, rc = 0);
 
        if (valid & LA_FLAGS) {
                struct lustre_mdt_attrs *lma;
 
        if (valid & LA_FLAGS) {
                struct lustre_mdt_attrs *lma;
@@ -980,7 +992,7 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
 
        OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 10);
        if (bulk == NULL)
 
        OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 10);
        if (bulk == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        /* do both accounting updates outside oo_attr_lock below */
        if ((valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
 
        /* do both accounting updates outside oo_attr_lock below */
        if ((valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
@@ -1078,6 +1090,8 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
        rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
 
        OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 10);
        rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
 
        OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 10);
+out:
+       up_read(&obj->oo_guard);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -1503,10 +1517,12 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
        /* concurrent create declarations should not see
         * the object inconsistent (db, attr, etc).
         * in regular cases acquisition should be cheap */
        /* concurrent create declarations should not see
         * the object inconsistent (db, attr, etc).
         * in regular cases acquisition should be cheap */
-       down(&obj->oo_guard);
+       down_write(&obj->oo_guard);
+
+       if (unlikely(dt_object_exists(dt)))
+               GOTO(out, rc = -EEXIST);
 
        LASSERT(osd_invariant(obj));
 
        LASSERT(osd_invariant(obj));
-       LASSERT(!dt_object_exists(dt));
        LASSERT(dof != NULL);
 
        LASSERT(th != NULL);
        LASSERT(dof != NULL);
 
        LASSERT(th != NULL);
@@ -1569,7 +1585,7 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
        }
 
 out:
        }
 
 out:
-       up(&obj->oo_guard);
+       up_write(&obj->oo_guard);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -1595,8 +1611,9 @@ static int osd_object_ref_add(const struct lu_env *env,
 
        ENTRY;
 
 
        ENTRY;
 
-       if (!dt_object_exists(dt))
-               RETURN(-ENOENT);
+       down_read(&obj->oo_guard);
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
 
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_sa_hdl != NULL);
 
        LASSERT(osd_invariant(obj));
        LASSERT(obj->oo_sa_hdl != NULL);
@@ -1608,7 +1625,10 @@ static int osd_object_ref_add(const struct lu_env *env,
        write_unlock(&obj->oo_attr_lock);
 
        rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
        write_unlock(&obj->oo_attr_lock);
 
        rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
-       return rc;
+
+out:
+       up_read(&obj->oo_guard);
+       RETURN(rc);
 }
 
 static int osd_declare_object_ref_del(const struct lu_env *env,
 }
 
 static int osd_declare_object_ref_del(const struct lu_env *env,
@@ -1633,8 +1653,12 @@ static int osd_object_ref_del(const struct lu_env *env,
 
        ENTRY;
 
 
        ENTRY;
 
+       down_read(&obj->oo_guard);
+
+       if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
+               GOTO(out, rc = -ENOENT);
+
        LASSERT(osd_invariant(obj));
        LASSERT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt));
        LASSERT(obj->oo_sa_hdl != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
        LASSERT(obj->oo_sa_hdl != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -1645,6 +1669,9 @@ static int osd_object_ref_del(const struct lu_env *env,
        write_unlock(&obj->oo_attr_lock);
 
        rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
        write_unlock(&obj->oo_attr_lock);
 
        rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
+
+out:
+       up_read(&obj->oo_guard);
        RETURN(rc);
 }
 
        RETURN(rc);
 }