Whamcloud - gitweb
LU-7713 osd: osd-zfs should serialize destroy vs. others
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
index 13fae10..705c86d 100644 (file)
 /*
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
- */
-/*
- * Copyright (c) 2012, 2013, Intel Corporation.
- * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -140,7 +138,8 @@ static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
                                                 uint64_t dirhash)
 {
        struct osd_device *d = osd_obj2dev(o);
-       zap_cursor_init_serialized(zc, d->od_os, o->oo_db->db_object, dirhash);
+       osd_zap_cursor_init_serialized(zc, d->od_os,
+                                      o->oo_db->db_object, dirhash);
 }
 
 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
@@ -152,8 +151,7 @@ static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
 
 static struct dt_it *osd_index_it_init(const struct lu_env *env,
                                       struct dt_object *dt,
-                                      __u32 unused,
-                                      struct lustre_capa *capa)
+                                      __u32 unused)
 {
        struct osd_thread_info  *info = osd_oti_get(env);
        struct osd_zap_it       *it;
@@ -162,34 +160,25 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
        int                      rc;
        ENTRY;
 
-       /* XXX: check capa ? */
+       if (obj->oo_destroyed)
+               RETURN(ERR_PTR(-ENOENT));
 
        LASSERT(lu_object_exists(lo));
        LASSERT(obj->oo_db);
        LASSERT(osd_object_is_zap(obj->oo_db));
        LASSERT(info);
 
-       if (info->oti_it_inline) {
-               OBD_ALLOC_PTR(it);
-               if (it == NULL)
-                       RETURN(ERR_PTR(-ENOMEM));
-       } else {
-               it = &info->oti_it_zap;
-               info->oti_it_inline = 1;
-       }
+       OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
+       if (it == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
 
        rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
        if (rc != 0) {
-               if (it != &info->oti_it_zap)
-                       OBD_FREE_PTR(it);
-               else
-                       info->oti_it_inline = 0;
-
+               OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
                RETURN(ERR_PTR(rc));
        }
 
        it->ozi_obj   = obj;
-       it->ozi_capa  = capa;
        it->ozi_reset = 1;
        lu_object_get(lo);
 
@@ -198,7 +187,6 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
 
 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
 {
-       struct osd_thread_info  *info   = osd_oti_get(env);
        struct osd_zap_it       *it     = (struct osd_zap_it *)di;
        struct osd_object       *obj;
        ENTRY;
@@ -210,10 +198,7 @@ static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
 
        osd_zap_cursor_fini(it->ozi_zc);
        lu_object_put(env, &obj->oo_dt.do_lu);
-       if (it != &info->oti_it_zap)
-               OBD_FREE_PTR(it);
-       else
-               info->oti_it_inline = 0;
+       OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
 
        EXIT;
 }
@@ -342,10 +327,9 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
        buf.lb_buf = osd_oti_get(env)->oti_buf;
        buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
 
-       rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
+       rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
        if (rc == -ERANGE) {
-               rc = osd_xattr_get(env, o, &LU_BUF_NULL,
-                                  XATTR_NAME_LINK, BYPASS_CAPA);
+               rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
                if (rc < 0)
                        RETURN(rc);
                LASSERT(rc > 0);
@@ -353,7 +337,7 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
                if (buf.lb_buf == NULL)
                        RETURN(-ENOMEM);
                buf.lb_len = rc;
-               rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
+               rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
        }
        if (rc < 0)
                GOTO(out, rc);
@@ -401,8 +385,7 @@ out:
 }
 
 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
-                         struct dt_rec *rec, const struct dt_key *key,
-                         struct lustre_capa *capa)
+                         struct dt_rec *rec, const struct dt_key *key)
 {
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object  *obj = osd_dt_obj(dt);
@@ -539,8 +522,9 @@ static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
 
        rc = osd_fld_lookup(env, osd, seq, range);
        if (rc != 0) {
-               CERROR("%s: Can not lookup fld for "LPX64"\n",
-                      osd_name(osd), seq);
+               if (rc != -ENOENT)
+                       CERROR("%s: Can not lookup fld for "LPX64"\n",
+                              osd_name(osd), seq);
                RETURN(0);
        }
 
@@ -577,7 +561,6 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
  *      \param  key     key for index
  *      \param  rec     record reference
  *      \param  th      transaction handler
- *      \param  capa    capability descriptor
  *      \param  ignore_quota update should not affect quota
  *
  *      \retval  0  success
@@ -585,8 +568,7 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
  */
 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                          const struct dt_rec *rec, const struct dt_key *key,
-                         struct thandle *th, struct lustre_capa *capa,
-                         int ignore_quota)
+                         struct thandle *th, int ignore_quota)
 {
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object   *parent = osd_dt_obj(dt);
@@ -690,8 +672,9 @@ static int osd_declare_dir_delete(const struct lu_env *env,
                                  const struct dt_key *key,
                                  struct thandle *th)
 {
-       struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_thandle *oh;
+       uint64_t            dnode;
        ENTRY;
 
        LASSERT(dt_object_exists(dt));
@@ -700,17 +683,20 @@ static int osd_declare_dir_delete(const struct lu_env *env,
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
 
-       LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
-
-       dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
+       if (dt_object_exists(dt)) {
+               LASSERT(obj->oo_db);
+               LASSERT(osd_object_is_zap(obj->oo_db));
+               dnode = obj->oo_db->db_object;
+       } else {
+               dnode = DMU_NEW_OBJECT;
+       }
+       dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key);
 
        RETURN(0);
 }
 
 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
-                         const struct dt_key *key, struct thandle *th,
-                         struct lustre_capa *capa)
+                         const struct dt_key *key, struct thandle *th)
 {
        struct osd_object *obj = osd_dt_obj(dt);
        struct osd_device *osd = osd_obj2dev(obj);
@@ -720,8 +706,8 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
        int rc;
        ENTRY;
 
-       LASSERT(obj->oo_db);
-       LASSERT(osd_object_is_zap(obj->oo_db));
+       LASSERT(zap_db);
+       LASSERT(osd_object_is_zap(zap_db));
 
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
@@ -750,12 +736,11 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
 
 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
                                     struct dt_object *dt,
-                                    __u32 unused,
-                                    struct lustre_capa *capa)
+                                    __u32 unused)
 {
        struct osd_zap_it *it;
 
-       it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
+       it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
        if (!IS_ERR(it))
                it->ozi_pos = 0;
 
@@ -1153,8 +1138,7 @@ static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
 }
 
 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
-                       struct dt_rec *rec, const struct dt_key *key,
-                       struct lustre_capa *capa)
+                       struct dt_rec *rec, const struct dt_key *key)
 {
        struct osd_object *obj = osd_dt_obj(dt);
        struct osd_device *osd = osd_obj2dev(obj);
@@ -1197,8 +1181,7 @@ static int osd_declare_index_insert(const struct lu_env *env,
 
 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
                            const struct dt_rec *rec, const struct dt_key *key,
-                           struct thandle *th, struct lustre_capa *capa,
-                           int ignore_quota)
+                           struct thandle *th, int ignore_quota)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_device  *osd = osd_obj2dev(obj);
@@ -1244,8 +1227,7 @@ static int osd_declare_index_delete(const struct lu_env *env,
 }
 
 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
-                           const struct dt_key *key, struct thandle *th,
-                           struct lustre_capa *capa)
+                           const struct dt_key *key, struct thandle *th)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
        struct osd_device  *osd = osd_obj2dev(obj);
@@ -1430,8 +1412,7 @@ struct osd_metadnode_it {
 };
 
 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
-                                           struct dt_object *dt, __u32 attr,
-                                           struct lustre_capa *capa)
+                                           struct dt_object *dt, __u32 attr)
 {
        struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
        struct osd_metadnode_it *it;
@@ -1633,50 +1614,50 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                const struct dt_index_features *feat)
 {
        struct osd_object *obj = osd_dt_obj(dt);
+       int rc = 0;
        ENTRY;
 
-       LASSERT(dt_object_exists(dt));
+       down_read(&obj->oo_guard);
 
        /*
         * XXX: implement support for fixed-size keys sorted with natural
         *      numerical way (not using internal hash value)
         */
        if (feat->dif_flags & DT_IND_RANGE)
-               RETURN(-ERANGE);
+               GOTO(out, rc = -ERANGE);
 
        if (unlikely(feat == &dt_otable_features)) {
                dt->do_index_ops = &osd_zfs_otable_ops;
-               RETURN(0);
+               GOTO(out, rc = 0);
        }
 
-       LASSERT(obj->oo_db != NULL);
+       LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL);
        if (likely(feat == &dt_directory_features)) {
-               if (osd_object_is_zap(obj->oo_db))
+               if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db))
                        dt->do_index_ops = &osd_dir_ops;
                else
-                       RETURN(-ENOTDIR);
+                       GOTO(out, rc = -ENOTDIR);
        } else if (unlikely(feat == &dt_acct_features)) {
                LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
                dt->do_index_ops = &osd_acct_index_ops;
-       } else if (osd_object_is_zap(obj->oo_db) &&
-                  dt->do_index_ops == NULL) {
+       } else if (dt->do_index_ops == NULL) {
                /* For index file, we don't support variable key & record sizes
                 * and the key has to be unique */
                if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
-                       RETURN(-E2BIG);
+                       GOTO(out, rc = -E2BIG);
                if (feat->dif_keysize_max != feat->dif_keysize_min)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                /* As for the record size, it should be a multiple of 8 bytes
                 * and smaller than the maximum value length supported by ZAP.
                 */
                if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
-                       RETURN(-E2BIG);
+                       GOTO(out, rc = -E2BIG);
                if (feat->dif_recsize_max != feat->dif_recsize_min)
-                       RETURN(-EINVAL);
+                       GOTO(out, rc = -EINVAL);
 
                obj->oo_keysize = feat->dif_keysize_max;
                obj->oo_recsize = feat->dif_recsize_max;
@@ -1690,5 +1671,8 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                dt->do_index_ops = &osd_index_ops;
        }
 
-       RETURN(0);
+out:
+       up_read(&obj->oo_guard);
+
+       RETURN(rc);
 }