Whamcloud - gitweb
b=19486 add server identifier into lu_seq_range.
[fs/lustre-release.git] / lustre / fld / fld_handler.c
index 0f6e7cc..489bd2c 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -39,6 +39,7 @@
  *
  * Author: Yury Umanets <umka@clusterfs.com>
  * Author: WangDi <wangdi@clusterfs.com>
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
  */
 
 #ifndef EXPORT_SYMTAB
@@ -66,6 +67,7 @@
 #include <lustre_fid.h>
 #include <lustre_req_layout.h>
 #include "fld_internal.h"
+#include <lustre_fid.h>
 
 #ifdef __KERNEL__
 
@@ -109,107 +111,230 @@ static void __exit fld_mod_exit(void)
         }
 }
 
-/* Insert index entry and update cache. */
+/**
+ * Insert FLD index entry and update FLD cache.
+ *
+ * First it try to merge given range with existing range then update
+ * FLD index and FLD cache accordingly. FLD index consistency is maintained
+ * by this function.
+ * This function is called from the sequence allocator when a super-sequence
+ * is granted to a server.
+ */
+
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t mds)
+                      struct lu_seq_range *add_range,
+                      struct thandle *th)
 {
-        int rc;
+        struct lu_seq_range *erange;
+        struct lu_seq_range *new;
+        struct fld_thread_info *info;
+        int rc = 0;
+        int do_merge=0;
+
         ENTRY;
-        
-        rc = fld_index_create(fld, env, seq, mds);
-        
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        cfs_mutex_lock(&fld->lsf_lock);
+
+        erange = &info->fti_lrange;
+        new = &info->fti_irange;
+        *new = *add_range;
+
+        /* STEP 1: try to merge with previous range */
+        rc = fld_index_lookup(fld, env, new->lsr_start, erange);
+        if (rc == 0) {
+                /* in case of range overlap, the location must be same */
+                if (range_compare_loc(new, erange) != 0) {
+                        CERROR("the start of given range "DRANGE" conflict to"
+                               "an existing range "DRANGE"\n",
+                               PRANGE(new), PRANGE(erange));
+                        GOTO(out, rc = -EIO);
+                }
+
+                if (new->lsr_end < erange->lsr_end)
+                        GOTO(out, rc);
+                do_merge = 1;
+        } else if (rc == -ENOENT) {
+                /* check for merge case: optimizes for single mds lustre.
+                 * As entry does not exist, returned entry must be left side
+                 * entry compared to start of new range (ref dio_lookup()).
+                 * So try to merge from left.
+                 */
+                if (new->lsr_start == erange->lsr_end &&
+                    range_compare_loc(new, erange) == 0)
+                        do_merge = 1;
+        } else {
+                /* no overlap allowed in fld, so failure in lookup is error */
+                GOTO(out, rc);
+        }
+
+        if (do_merge) {
+                /* new range will be merged with the existing one.
+                 * delete this range at first. */
+                rc = fld_index_delete(fld, env, erange, th);
+                if (rc != 0)
+                        GOTO(out, rc);
+
+                new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                new->lsr_end = max(erange->lsr_end, new->lsr_end);
+                do_merge = 0;
+        }
+
+        /* STEP 2: try to merge with next range */
+        rc = fld_index_lookup(fld, env, new->lsr_end, erange);
         if (rc == 0) {
+                /* found a matched range, meaning we're either
+                 * overlapping or ajacent, must merge with it. */
+                do_merge = 1;
+        } else if (rc == -ENOENT) {
+                /* this range is left of new range end point */
+                LASSERT(erange->lsr_end <= new->lsr_end);
                 /*
-                 * Do not return result of calling fld_cache_insert()
-                 * here. First of all because it may return -EEXISTS. Another
-                 * reason is that, we do not want to stop proceeding even after
-                 * cache errors.
+                 * the found left range must be either:
+                 *  1. withing new range.
+                 *  2. left of new range (no overlapping).
+                 * because if they're partly overlapping, the STEP 1 must have
+                 * been removed this range.
                  */
-                fld_cache_insert(fld->lsf_cache, seq, mds);
+                LASSERTF(erange->lsr_start > new->lsr_start ||
+                         erange->lsr_end < new->lsr_start ||
+                         (erange->lsr_end == new->lsr_end &&
+                          range_compare_loc(new, erange) != 0),
+                         "left "DRANGE", new "DRANGE"\n",
+                         PRANGE(erange), PRANGE(new));
+
+                /* if it's within the new range, merge it */
+                if (erange->lsr_start > new->lsr_start)
+                        do_merge = 1;
+        } else {
+               GOTO(out, rc);
         }
 
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_server_create);
+        if (do_merge) {
+                if (range_compare_loc(new, erange) != 0) {
+                        CERROR("the end of given range "DRANGE" overlaps "
+                               "with an existing range "DRANGE"\n",
+                               PRANGE(new), PRANGE(erange));
+                        GOTO(out, rc = -EIO);
+                }
+
+                /* merge with next range */
+                rc = fld_index_delete(fld, env, erange, th);
+                if (rc != 0)
+                        GOTO(out, rc);
+
+                new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                new->lsr_end = max(erange->lsr_end, new->lsr_end);
+        }
 
-/* Delete index entry. */
-int fld_server_delete(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq)
-{
-        int rc;
-        ENTRY;
+        /* now update fld entry. */
+        rc = fld_index_create(fld, env, new, th);
+
+        LASSERT(rc != -EEXIST);
+out:
+        if (rc == 0)
+                fld_cache_insert(fld->lsf_cache, new);
+
+        cfs_mutex_unlock(&fld->lsf_lock);
+
+        CDEBUG((rc != 0 ? D_ERROR : D_INFO),
+               "%s: FLD create: given range : "DRANGE
+               "after merge "DRANGE" rc = %d \n", fld->lsf_name,
+                PRANGE(add_range), PRANGE(new), rc);
 
-        fld_cache_delete(fld->lsf_cache, seq);
-        rc = fld_index_delete(fld, env, seq);
-        
         RETURN(rc);
 }
-EXPORT_SYMBOL(fld_server_delete);
 
-/* Lookup mds by seq. */
+EXPORT_SYMBOL(fld_server_create);
+
+/**
+ *  Lookup mds by seq, returns a range for given seq.
+ *
+ *  If that entry is not cached in fld cache, request is sent to super
+ *  sequence controller node (MDT0). All other MDT[1...N] and client
+ *  cache fld entries, but this cache is not persistent.
+ */
+
 int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t *mds)
+                      seqno_t seq, struct lu_seq_range *range)
 {
+        struct lu_seq_range *erange;
+        struct fld_thread_info *info;
         int rc;
         ENTRY;
-        
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        erange = &info->fti_lrange;
+
         /* Lookup it in the cache. */
-        rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
-        if (rc == 0)
+        rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
+        if (rc == 0) {
+                if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+                        CERROR("FLD cache found a range "DRANGE" doesn't "
+                               "match the requested flag %x\n",
+                               PRANGE(erange), range->lsr_flags);
+                        RETURN(-EIO);
+                }
+                *range = *erange;
                 RETURN(0);
+        }
 
-        rc = fld_index_lookup(fld, env, seq, mds);
-        if (rc == 0) {
-                /*
-                 * Do not return error here as well. See previous comment in
-                 * same situation in function fld_server_create().
+        if (fld->lsf_obj) {
+                rc = fld_index_lookup(fld, env, seq, erange);
+                if (rc == 0) {
+                        if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+                                CERROR("FLD found a range "DRANGE" doesn't "
+                                       "match the requested flag %x\n",
+                                       PRANGE(erange), range->lsr_flags);
+                                RETURN(-EIO);
+                        }
+                        *range = *erange;
+                }
+        } else {
+                LASSERT(fld->lsf_control_exp);
+                /* send request to mdt0 i.e. super seq. controller.
+                 * This is temporary solution, long term solution is fld
+                 * replication on all mdt servers.
                  */
-                fld_cache_insert(fld->lsf_cache, seq, *mds);
+                rc = fld_client_rpc(fld->lsf_control_exp,
+                                    range, FLD_LOOKUP);
         }
+
+        if (rc == 0)
+                fld_cache_insert(fld->lsf_cache, range);
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
+/**
+ * All MDT server handle fld lookup operation. But only MDT0 has fld index.
+ * if entry is not found in cache we need to forward lookup request to MDT0
+ */
+
 static int fld_server_handle(struct lu_server_fld *fld,
                              const struct lu_env *env,
-                             __u32 opc, struct md_fld *mf,
+                             __u32 opc, struct lu_seq_range *range,
                              struct fld_thread_info *info)
 {
         int rc;
         ENTRY;
 
         switch (opc) {
-        case FLD_CREATE:
-                rc = fld_server_create(fld, env,
-                                       mf->mf_seq, mf->mf_mds);
-
-                /* Do not return -EEXIST error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
-                        rc = 0;
-                break;
-        case FLD_DELETE:
-                rc = fld_server_delete(fld, env, mf->mf_seq);
-
-                /* Do not return -ENOENT error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
-                        rc = 0;
-                break;
         case FLD_LOOKUP:
                 rc = fld_server_lookup(fld, env,
-                                       mf->mf_seq, &mf->mf_mds);
+                                       range->lsr_start, range);
                 break;
         default:
                 rc = -EINVAL;
                 break;
         }
 
-        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
-               LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
-               mf->mf_seq, mf->mf_mds);
-        
+        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
+               DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
+
         RETURN(rc);
 
 }
@@ -218,8 +343,8 @@ static int fld_req_handle(struct ptlrpc_request *req,
                           struct fld_thread_info *info)
 {
         struct lu_site *site;
-        struct md_fld *in;
-        struct md_fld *out;
+        struct lu_seq_range *in;
+        struct lu_seq_range *out;
         int rc;
         __u32 *opc;
         ENTRY;
@@ -252,8 +377,6 @@ static int fld_req_handle(struct ptlrpc_request *req,
 static void fld_thread_info_init(struct ptlrpc_request *req,
                                  struct fld_thread_info *info)
 {
-        info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
-
         info->fti_pill = &req->rq_pill;
         /* Init request capsule. */
         req_capsule_init(info->fti_pill, req, RCL_SERVER);
@@ -301,21 +424,27 @@ EXPORT_SYMBOL(fld_query);
  *
  * fid_is_local() is supposed to be used in assertion checks only.
  */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+int fid_is_local(const struct lu_env *env,
+                 struct lu_site *site, const struct lu_fid *fid)
 {
         int result;
         struct md_site *msite;
+        struct lu_seq_range *range;
+        struct fld_thread_info *info;
+        ENTRY;
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        range = &info->fti_lrange;
 
         result = 1; /* conservatively assume fid is local */
         msite = lu_site2md(site);
         if (msite->ms_client_fld != NULL) {
-                mdsno_t mds;
                 int rc;
 
                 rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
-                                      fid_seq(fid), &mds);
+                                      fid_seq(fid), range);
                 if (rc == 0)
-                        result = (mds == msite->ms_node_id);
+                        result = (range->lsr_index == msite->ms_node_id);
         }
         return result;
 }
@@ -363,9 +492,11 @@ static void fld_server_proc_fini(struct lu_server_fld *fld)
 #endif
 
 int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
-                    const char *prefix, const struct lu_env *env)
+                    const char *prefix, const struct lu_env *env,
+                    int mds_node_id)
 {
         int cache_size, cache_threshold;
+        struct lu_seq_range range;
         int rc;
         ENTRY;
 
@@ -378,8 +509,8 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         cache_threshold = cache_size *
                 FLD_SERVER_CACHE_THRESHOLD / 100;
 
+        cfs_mutex_init(&fld->lsf_lock);
         fld->lsf_cache = fld_cache_init(fld->lsf_name,
-                                        FLD_SERVER_HTABLE_SIZE,
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lsf_cache)) {
                 rc = PTR_ERR(fld->lsf_cache);
@@ -387,14 +518,26 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
                 GOTO(out, rc);
         }
 
-        rc = fld_index_init(fld, env, dt);
-        if (rc)
-                GOTO(out, rc);
+        if (!mds_node_id) {
+                rc = fld_index_init(fld, env, dt);
+                if (rc)
+                        GOTO(out, rc);
+        } else
+                fld->lsf_obj = NULL;
 
         rc = fld_server_proc_init(fld);
         if (rc)
                 GOTO(out, rc);
 
+        fld->lsf_control_exp = NULL;
+
+        /* Insert reserved sequence number of ".lustre" into fld cache. */
+        range.lsr_start = FID_SEQ_DOT_LUSTRE;
+        range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
+        range.lsr_index = 0;
+        range.lsr_flags = LU_SEQ_RANGE_MDT;
+        fld_cache_insert(fld->lsf_cache, &range);
+
         EXIT;
 out:
         if (rc)