Whamcloud - gitweb
LU-2240 mds: Assign special fid sequence to root.
[fs/lustre-release.git] / lustre / fld / fld_handler.c
index 01bc8d8..7fe5d7e 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -72,7 +72,7 @@
 LU_KEY_INIT_FINI(fld, struct fld_thread_info);
 
 /* context key: fld_thread_key */
-LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD|LCT_DT_THREAD);
+LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
 
 cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
 
@@ -98,29 +98,14 @@ static void __exit fld_mod_exit(void)
         }
 }
 
-int fld_declare_server_create(struct lu_server_fld *fld,
-                              const struct lu_env *env,
-                              struct thandle *th)
+int fld_declare_server_create(const struct lu_env *env,
+                             struct lu_server_fld *fld,
+                             struct lu_seq_range *range,
+                             struct thandle *th)
 {
-        int rc;
-
-        ENTRY;
-
-       if (fld->lsf_no_range_lookup) {
-               /* Stub for underlying FS which can't lookup ranges */
-               return 0;
-       }
+       int rc;
 
-        /* for ldiskfs OSD it's enough to declare operation with any ops
-         * with DMU we'll probably need to specify exact key/value */
-       rc = dt_declare_delete(env, fld->lsf_obj, NULL, th);
-       if (rc)
-               GOTO(out, rc);
-       rc = dt_declare_delete(env, fld->lsf_obj, NULL, th);
-       if (rc)
-               GOTO(out, rc);
-       rc = dt_declare_insert(env, fld->lsf_obj, NULL, NULL, th);
-out:
+       rc = fld_declare_index_create(env, fld, range, th);
        RETURN(rc);
 }
 EXPORT_SYMBOL(fld_declare_server_create);
@@ -128,136 +113,19 @@ EXPORT_SYMBOL(fld_declare_server_create);
 /**
  * Insert FLD index entry and update FLD cache.
  *
- * First it try to merge given range with existing range then update
- * FLD index and FLD cache accordingly. FLD index consistency is maintained
- * by this function.
  * This function is called from the sequence allocator when a super-sequence
  * is granted to a server.
  */
-
-int fld_server_create(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      struct lu_seq_range *add_range,
-                      struct thandle *th)
+int fld_server_create(const struct lu_env *env, struct lu_server_fld *fld,
+                     struct lu_seq_range *range, struct thandle *th)
 {
-        struct lu_seq_range *erange;
-        struct lu_seq_range *new;
-        struct fld_thread_info *info;
-        int rc = 0;
-        int do_merge=0;
-
-        ENTRY;
+       int rc;
 
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
        mutex_lock(&fld->lsf_lock);
-
-        erange = &info->fti_lrange;
-        new = &info->fti_irange;
-        *new = *add_range;
-
-        /* STEP 1: try to merge with previous range */
-        rc = fld_index_lookup(fld, env, new->lsr_start, erange);
-        if (rc == 0) {
-                /* in case of range overlap, the location must be same */
-                if (range_compare_loc(new, erange) != 0) {
-                        CERROR("the start of given range "DRANGE" conflict to"
-                               "an existing range "DRANGE"\n",
-                               PRANGE(new), PRANGE(erange));
-                        GOTO(out, rc = -EIO);
-                }
-
-                if (new->lsr_end < erange->lsr_end)
-                        GOTO(out, rc);
-                do_merge = 1;
-        } else if (rc == -ENOENT) {
-                /* check for merge case: optimizes for single mds lustre.
-                 * As entry does not exist, returned entry must be left side
-                 * entry compared to start of new range (ref dio_lookup()).
-                 * So try to merge from left.
-                 */
-                if (new->lsr_start == erange->lsr_end &&
-                    range_compare_loc(new, erange) == 0)
-                        do_merge = 1;
-        } else {
-                /* no overlap allowed in fld, so failure in lookup is error */
-                GOTO(out, rc);
-        }
-
-        if (do_merge) {
-                /* new range will be merged with the existing one.
-                 * delete this range at first. */
-                rc = fld_index_delete(fld, env, erange, th);
-                if (rc != 0)
-                        GOTO(out, rc);
-
-                new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                new->lsr_end = max(erange->lsr_end, new->lsr_end);
-                do_merge = 0;
-        }
-
-        /* STEP 2: try to merge with next range */
-        rc = fld_index_lookup(fld, env, new->lsr_end, erange);
-        if (rc == 0) {
-                /* found a matched range, meaning we're either
-                 * overlapping or ajacent, must merge with it. */
-                do_merge = 1;
-        } else if (rc == -ENOENT) {
-                /* this range is left of new range end point */
-                LASSERT(erange->lsr_end <= new->lsr_end);
-                /*
-                 * the found left range must be either:
-                 *  1. withing new range.
-                 *  2. left of new range (no overlapping).
-                 * because if they're partly overlapping, the STEP 1 must have
-                 * been removed this range.
-                 */
-                LASSERTF(erange->lsr_start > new->lsr_start ||
-                         erange->lsr_end < new->lsr_start ||
-                         (erange->lsr_end == new->lsr_start &&
-                          range_compare_loc(new, erange) != 0),
-                         "left "DRANGE", new "DRANGE"\n",
-                         PRANGE(erange), PRANGE(new));
-
-                /* if it's within the new range, merge it */
-                if (erange->lsr_start > new->lsr_start)
-                        do_merge = 1;
-        } else {
-               GOTO(out, rc);
-        }
-
-        if (do_merge) {
-                if (range_compare_loc(new, erange) != 0) {
-                        CERROR("the end of given range "DRANGE" overlaps "
-                               "with an existing range "DRANGE"\n",
-                               PRANGE(new), PRANGE(erange));
-                        GOTO(out, rc = -EIO);
-                }
-
-                /* merge with next range */
-                rc = fld_index_delete(fld, env, erange, th);
-                if (rc != 0)
-                        GOTO(out, rc);
-
-                new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                new->lsr_end = max(erange->lsr_end, new->lsr_end);
-        }
-
-        /* now update fld entry. */
-        rc = fld_index_create(fld, env, new, th);
-
-        LASSERT(rc != -EEXIST);
-out:
-        if (rc == 0)
-                fld_cache_insert(fld->lsf_cache, new);
-
+       rc = fld_index_create(env, fld, range, th);
        mutex_unlock(&fld->lsf_lock);
 
-        CDEBUG((rc != 0 ? D_ERROR : D_INFO),
-               "%s: FLD create: given range : "DRANGE
-               "after merge "DRANGE" rc = %d \n", fld->lsf_name,
-                PRANGE(add_range), PRANGE(new), rc);
-
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_create);
 
@@ -269,22 +137,23 @@ EXPORT_SYMBOL(fld_server_create);
  *  cache fld entries, but this cache is not persistent.
  */
 
-int fld_server_lookup(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq, struct lu_seq_range *range)
+int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+                     seqno_t seq, struct lu_seq_range *range)
 {
         struct lu_seq_range *erange;
         struct fld_thread_info *info;
         int rc;
         ENTRY;
 
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
-        erange = &info->fti_lrange;
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+       LASSERT(info != NULL);
+       erange = &info->fti_lrange;
 
         /* Lookup it in the cache. */
         rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
         if (rc == 0) {
-                if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+               if (unlikely(erange->lsr_flags != range->lsr_flags) &&
+                   range->lsr_flags != -1) {
                         CERROR("FLD cache found a range "DRANGE" doesn't "
                                "match the requested flag %x\n",
                                PRANGE(erange), range->lsr_flags);
@@ -294,31 +163,25 @@ int fld_server_lookup(struct lu_server_fld *fld,
                 RETURN(0);
         }
 
-        if (fld->lsf_obj) {
-                rc = fld_index_lookup(fld, env, seq, erange);
-                if (rc == 0) {
-                        if (unlikely(erange->lsr_flags != range->lsr_flags)) {
-                                CERROR("FLD found a range "DRANGE" doesn't "
-                                       "match the requested flag %x\n",
-                                       PRANGE(erange), range->lsr_flags);
-                                RETURN(-EIO);
-                        }
-                        *range = *erange;
-                }
-        } else {
-                LASSERT(fld->lsf_control_exp);
-                /* send request to mdt0 i.e. super seq. controller.
-                 * This is temporary solution, long term solution is fld
-                 * replication on all mdt servers.
-                 */
-                rc = fld_client_rpc(fld->lsf_control_exp,
-                                    range, FLD_LOOKUP);
-        }
-
-        if (rc == 0)
-                fld_cache_insert(fld->lsf_cache, range);
-
-        RETURN(rc);
+       if (fld->lsf_obj) {
+               /* On server side, all entries should be in cache.
+                * If we can not find it in cache, just return error */
+               CERROR("%s: Can not found the seq "LPX64"\n",
+                       fld->lsf_name, seq);
+               RETURN(-EIO);
+       } else {
+               LASSERT(fld->lsf_control_exp);
+               /* send request to mdt0 i.e. super seq. controller.
+                * This is temporary solution, long term solution is fld
+                * replication on all mdt servers.
+                */
+               range->lsr_start = seq;
+               rc = fld_client_rpc(fld->lsf_control_exp,
+                                   range, FLD_LOOKUP);
+               if (rc == 0)
+                       fld_cache_insert(fld->lsf_cache, range);
+       }
+       RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
@@ -337,8 +200,7 @@ static int fld_server_handle(struct lu_server_fld *fld,
 
         switch (opc) {
         case FLD_LOOKUP:
-                rc = fld_server_lookup(fld, env,
-                                       range->lsr_start, range);
+               rc = fld_server_lookup(env, fld, range->lsr_start, range);
                 break;
         default:
                 rc = -EINVAL;
@@ -377,16 +239,17 @@ static int fld_req_handle(struct ptlrpc_request *req,
                         RETURN(err_serious(-EPROTO));
                 *out = *in;
 
-                /* For old 2.0 client, the 'lsr_flags' is uninitialized.
-                 * Set it as 'LU_SEQ_RANGE_MDT' by default.
-                 * Old 2.0 liblustre client cannot talk with new 2.1 server. */
-                if (!(exp->exp_connect_flags & OBD_CONNECT_64BITHASH) &&
-                    !exp->exp_libclient)
-                        out->lsr_flags = LU_SEQ_RANGE_MDT;
-
-                rc = fld_server_handle(lu_site2md(site)->ms_server_fld,
-                                       req->rq_svc_thread->t_env,
-                                       *opc, out, info);
+               /* For old 2.0 client, the 'lsr_flags' is uninitialized.
+                * Set it as 'LU_SEQ_RANGE_MDT' by default. */
+               if (!(exp_connect_flags(exp) & OBD_CONNECT_64BITHASH) &&
+                   !(exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) &&
+                   !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) &&
+                   !exp->exp_libclient)
+                       out->lsr_flags = LU_SEQ_RANGE_MDT;
+
+               rc = fld_server_handle(lu_site2seq(site)->ss_server_fld,
+                                      req->rq_svc_thread->t_env,
+                                      *opc, out, info);
         } else
                 rc = err_serious(-EPROTO);
 
@@ -446,26 +309,26 @@ EXPORT_SYMBOL(fld_query);
 int fid_is_local(const struct lu_env *env,
                  struct lu_site *site, const struct lu_fid *fid)
 {
-        int result;
-        struct md_site *msite;
-        struct lu_seq_range *range;
-        struct fld_thread_info *info;
-        ENTRY;
-
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
-        range = &info->fti_lrange;
-
-        result = 1; /* conservatively assume fid is local */
-        msite = lu_site2md(site);
-        if (msite->ms_client_fld != NULL) {
-                int rc;
-
-                rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
-                                      fid_seq(fid), range);
-                if (rc == 0)
-                        result = (range->lsr_index == msite->ms_node_id);
-        }
-        return result;
+       int result;
+       struct seq_server_site *ss_site;
+       struct lu_seq_range *range;
+       struct fld_thread_info *info;
+       ENTRY;
+
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+       range = &info->fti_lrange;
+
+       result = 1; /* conservatively assume fid is local */
+       ss_site = lu_site2seq(site);
+       if (ss_site->ss_client_fld != NULL) {
+               int rc;
+
+               rc = fld_cache_lookup(ss_site->ss_client_fld->lcf_cache,
+                                     fid_seq(fid), range);
+               if (rc == 0)
+                       result = (range->lsr_index == ss_site->ss_node_id);
+       }
+       return result;
 }
 EXPORT_SYMBOL(fid_is_local);
 
@@ -517,9 +380,9 @@ static void fld_server_proc_fini(struct lu_server_fld *fld)
 }
 #endif
 
-int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
-                    const char *prefix, const struct lu_env *env,
-                    int mds_node_id)
+int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
+                   struct dt_device *dt, const char *prefix, int mds_node_id,
+                   __u32 lsr_flags)
 {
         int cache_size, cache_threshold;
         struct lu_seq_range range;
@@ -544,8 +407,8 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
                 GOTO(out, rc);
         }
 
-        if (!mds_node_id) {
-                rc = fld_index_init(fld, env, dt);
+       if (!mds_node_id && lsr_flags == LU_SEQ_RANGE_MDT) {
+               rc = fld_index_init(env, fld, dt);
                 if (rc)
                         GOTO(out, rc);
         } else
@@ -557,36 +420,37 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
 
         fld->lsf_control_exp = NULL;
 
-        /* Insert reserved sequence number of ".lustre" into fld cache. */
-        range.lsr_start = FID_SEQ_DOT_LUSTRE;
-        range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
-        range.lsr_index = 0;
-        range.lsr_flags = LU_SEQ_RANGE_MDT;
-        fld_cache_insert(fld->lsf_cache, &range);
-
+       if (lsr_flags == LU_SEQ_RANGE_MDT) {
+               /* Insert reserved sequence of "ROOT" and ".lustre"
+                * into fld cache. */
+               range.lsr_start = FID_SEQ_LOCAL_FILE;
+               range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
+               range.lsr_index = 0;
+               range.lsr_flags = lsr_flags;
+               fld_cache_insert(fld->lsf_cache, &range);
+       }
         EXIT;
 out:
-        if (rc)
-                fld_server_fini(fld, env);
-        return rc;
+       if (rc)
+               fld_server_fini(env, fld);
+       return rc;
 }
 EXPORT_SYMBOL(fld_server_init);
 
-void fld_server_fini(struct lu_server_fld *fld,
-                     const struct lu_env *env)
+void fld_server_fini(const struct lu_env *env, struct lu_server_fld *fld)
 {
-        ENTRY;
+       ENTRY;
 
-        fld_server_proc_fini(fld);
-        fld_index_fini(fld, env);
+       fld_server_proc_fini(fld);
+       fld_index_fini(env, fld);
 
-        if (fld->lsf_cache != NULL) {
-                if (!IS_ERR(fld->lsf_cache))
-                        fld_cache_fini(fld->lsf_cache);
-                fld->lsf_cache = NULL;
-        }
+       if (fld->lsf_cache != NULL) {
+               if (!IS_ERR(fld->lsf_cache))
+                       fld_cache_fini(fld->lsf_cache);
+               fld->lsf_cache = NULL;
+       }
 
-        EXIT;
+       EXIT;
 }
 EXPORT_SYMBOL(fld_server_fini);