Whamcloud - gitweb
LU-1222 ldlm: Fix the race in AST sender vs multiple arriving RPCs
[fs/lustre-release.git] / lustre / fld / fld_index.c
index df552f6..e7f2a21 100644 (file)
@@ -16,8 +16,8 @@
  * in the LICENSE file that accompanied this code).
  *
  * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see [sun.com URL with a
- * copy of GPLv2].
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  * CA 95054 USA or visit www.sun.com if you need additional information or
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <dt_object.h>
 #include <md_object.h>
 #include <lustre_mdc.h>
+#include <lustre_fid.h>
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
 const char fld_index_name[] = "fld";
 
-static const struct dt_index_features fld_index_features = {
+static const struct lu_seq_range IGIF_FLD_RANGE = {
+        .lsr_start = 1,
+        .lsr_end   = FID_SEQ_IDIF,
+        .lsr_index   = 0,
+        .lsr_flags  = LU_SEQ_RANGE_MDT
+};
+
+const struct dt_index_features fld_index_features = {
         .dif_flags       = DT_IND_UPDATE,
         .dif_keysize_min = sizeof(seqno_t),
         .dif_keysize_max = sizeof(seqno_t),
-        .dif_recsize_min = sizeof(mdsno_t),
-        .dif_recsize_max = sizeof(mdsno_t)
-};
-
-/*
- * number of blocks to reserve for particular operations. Should be function of
- * ... something. Stub for now.
- */
-enum {
-        FLD_TXN_INDEX_INSERT_CREDITS  = 20,
-        FLD_TXN_INDEX_DELETE_CREDITS  = 20,
+        .dif_recsize_min = sizeof(struct lu_seq_range),
+        .dif_recsize_max = sizeof(struct lu_seq_range),
+        .dif_ptrsize     = 4
 };
 
 extern struct lu_context_key fld_thread_key;
@@ -98,83 +100,207 @@ static struct dt_key *fld_key(const struct lu_env *env,
 }
 
 static struct dt_rec *fld_rec(const struct lu_env *env,
-                              const mdsno_t mds)
+                              const struct lu_seq_range *range)
 {
         struct fld_thread_info *info;
+        struct lu_seq_range *rec;
         ENTRY;
 
         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
         LASSERT(info != NULL);
+        rec = &info->fti_rec;
+
+        range_cpu_to_be(rec, range);
+        RETURN((void *)rec);
+}
+
+struct thandle *fld_trans_create(struct lu_server_fld *fld,
+                                const struct lu_env *env)
+{
+        struct dt_device *dt_dev;
+
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+
+        return dt_dev->dd_ops->dt_trans_create(env, dt_dev);
+}
+
+int fld_trans_start(struct lu_server_fld *fld,
+                                const struct lu_env *env, struct thandle *th)
+{
+        struct dt_device *dt_dev;
+
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
 
-        info->fti_rec = cpu_to_be64(mds);
-        RETURN((void *)&info->fti_rec);
+        return dt_dev->dd_ops->dt_trans_start(env, dt_dev, th);
 }
 
+void fld_trans_stop(struct lu_server_fld *fld,
+                    const struct lu_env *env, struct thandle* th)
+{
+        struct dt_device *dt_dev;
+
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+        dt_dev->dd_ops->dt_trans_stop(env, th);
+}
+
+int fld_declare_index_create(struct lu_server_fld *fld,
+                             const struct lu_env *env,
+                             const struct lu_seq_range *range,
+                             struct thandle *th)
+{
+        struct dt_object *dt_obj = fld->lsf_obj;
+        seqno_t start;
+        int rc;
+
+        ENTRY;
+
+        start = range->lsr_start;
+        LASSERT(range_is_sane(range));
+
+        rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
+                                                      fld_rec(env, range),
+                                                      fld_key(env, start), th);
+        RETURN(rc);
+}
+
+/**
+ * insert range in fld store.
+ *
+ *      \param  range  range to be inserted
+ *      \param  th     transaction for this operation as it could compound
+ *                     transaction.
+ *
+ *      \retval  0  success
+ *      \retval  -ve error
+ */
+
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq, mdsno_t mds)
+                     const struct lu_seq_range *range,
+                     struct thandle *th)
 {
         struct dt_object *dt_obj = fld->lsf_obj;
-        struct dt_device *dt_dev;
-        struct txn_param txn;
-        struct thandle *th;
+        seqno_t start;
         int rc;
+
         ENTRY;
 
-        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+        start = range->lsr_start;
+        LASSERT(range_is_sane(range));
 
-        /* stub here, will fix it later */
-        txn_param_init(&txn, FLD_TXN_INDEX_INSERT_CREDITS);
-
-        th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
-        if (!IS_ERR(th)) {
-                rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
-                                                      fld_rec(env, mds),
-                                                      fld_key(env, seq),
-                                                      th, BYPASS_CAPA);
-                dt_dev->dd_ops->dt_trans_stop(env, th);
-        } else
-                rc = PTR_ERR(th);
+        rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
+                                              fld_rec(env, range),
+                                              fld_key(env, start),
+                                              th, BYPASS_CAPA, 1);
+
+        CDEBUG(D_INFO, "%s: insert given range : "DRANGE" rc = %d\n",
+               fld->lsf_name, PRANGE(range), rc);
         RETURN(rc);
 }
 
+/**
+ * delete range in fld store.
+ *
+ *      \param  range range to be deleted
+ *      \param  th     transaction
+ *
+ *      \retval  0  success
+ *      \retval  -ve error
+ */
+
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq)
+                     struct lu_seq_range *range,
+                     struct thandle   *th)
 {
         struct dt_object *dt_obj = fld->lsf_obj;
-        struct dt_device *dt_dev;
-        struct txn_param txn;
-        struct thandle *th;
+        seqno_t seq = range->lsr_start;
         int rc;
+
         ENTRY;
 
-        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
-        txn_param_init(&txn, FLD_TXN_INDEX_DELETE_CREDITS);
-        th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
-        if (!IS_ERR(th)) {
-                rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
-                                                      fld_key(env, seq), th,
-                                                      BYPASS_CAPA);
-                dt_dev->dd_ops->dt_trans_stop(env, th);
-        } else
-                rc = PTR_ERR(th);
+        rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, fld_key(env, seq),
+                                              th, BYPASS_CAPA);
+
+        CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n",
+               fld->lsf_name, PRANGE(range), rc);
+
         RETURN(rc);
 }
 
+/**
+ * lookup range for a seq passed. note here we only care about the start/end,
+ * caller should handle the attached location data (flags, index).
+ *
+ * \param  seq     seq for lookup.
+ * \param  range   result of lookup.
+ *
+ * \retval  0           found, \a range is the matched range;
+ * \retval -ENOENT      not found, \a range is the left-side range;
+ * \retval  -ve         other error;
+ */
+
 int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq, mdsno_t *mds)
+                     seqno_t seq,
+                     struct lu_seq_range *range)
 {
-        struct dt_object *dt_obj = fld->lsf_obj;
-        struct dt_rec    *rec = fld_rec(env, 0);
+        struct dt_object        *dt_obj = fld->lsf_obj;
+        struct lu_seq_range     *fld_rec;
+        struct dt_key           *key = fld_key(env, seq);
+        struct fld_thread_info  *info;
+        int rc;
+
+        ENTRY;
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        fld_rec = &info->fti_rec;
+
+        rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj,
+                                              (struct dt_rec*) fld_rec,
+                                              key, BYPASS_CAPA);
+
+        if (rc >= 0) {
+                range_be_to_cpu(fld_rec, fld_rec);
+                *range = *fld_rec;
+                if (range_within(range, seq))
+                        rc = 0;
+                else
+                        rc = -ENOENT;
+        }
+
+        CDEBUG(D_INFO, "%s: lookup seq = "LPX64" range : "DRANGE" rc = %d\n",
+               fld->lsf_name, seq, PRANGE(range), rc);
+
+        RETURN(rc);
+}
+
+static int fld_insert_igif_fld(struct lu_server_fld *fld,
+                               const struct lu_env *env)
+{
+        struct thandle *th;
         int rc;
         ENTRY;
 
-        rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec,
-                                              fld_key(env, seq), BYPASS_CAPA);
-        if (rc == 0)
-                *mds = be64_to_cpu(*(__u64 *)rec);
+        /* FLD_TXN_INDEX_INSERT_CREDITS */
+        th = fld_trans_create(fld, env);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+        rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th);
+        if (rc) {
+                fld_trans_stop(fld, env, th);
+                RETURN(rc);
+        }
+        rc = fld_trans_start(fld, env, th);
+        if (rc) {
+                fld_trans_stop(fld, env, th);
+                RETURN(rc);
+        }
+
+        rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
+        fld_trans_stop(fld, env, th);
+        if (rc == -EEXIST)
+                rc = 0;
         RETURN(rc);
 }
 
@@ -187,16 +313,25 @@ int fld_index_init(struct lu_server_fld *fld,
         int rc;
         ENTRY;
 
-        dt_obj = dt_store_open(env, dt, fld_index_name, &fid);
+        dt_obj = dt_store_open(env, dt, "", fld_index_name, &fid);
         if (!IS_ERR(dt_obj)) {
                 fld->lsf_obj = dt_obj;
                 rc = dt_obj->do_ops->do_index_try(env, dt_obj,
                                                   &fld_index_features);
-                if (rc == 0)
+                if (rc == 0) {
                         LASSERT(dt_obj->do_index_ops != NULL);
-                else
+                        rc = fld_insert_igif_fld(fld, env);
+
+                        if (rc != 0) {
+                                CERROR("insert igif in fld! = %d\n", rc);
+                                lu_object_put(env, &dt_obj->do_lu);
+                                fld->lsf_obj = NULL;
+                        }
+                } else
                         CERROR("%s: File \"%s\" is not an index!\n",
                                fld->lsf_name, fld_index_name);
+
+
         } else {
                 CERROR("%s: Can't find \"%s\" obj %d\n",
                        fld->lsf_name, fld_index_name, (int)PTR_ERR(dt_obj));