Whamcloud - gitweb
LU-2240 mds: Assign special fid sequence to root.
[fs/lustre-release.git] / lustre / fld / fld_handler.c
index a5809bc..7fe5d7e 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  *
  * Author: Yury Umanets <umka@clusterfs.com>
  * Author: WangDi <wangdi@clusterfs.com>
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_FLD
 
 #ifdef __KERNEL__
@@ -66,6 +64,7 @@
 #include <lustre_fid.h>
 #include <lustre_req_layout.h>
 #include "fld_internal.h"
+#include <lustre_fid.h>
 
 #ifdef __KERNEL__
 
 LU_KEY_INIT_FINI(fld, struct fld_thread_info);
 
 /* context key: fld_thread_key */
-LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD|LCT_DT_THREAD);
+LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
 
 cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
 
-static struct lu_local_obj_desc llod_fld_index = {
-        .llod_name      = fld_index_name,
-        .llod_oid       = FLD_INDEX_OID,
-        .llod_is_index  = 1,
-        .llod_feat      = &fld_index_features,
-};
-
 static int __init fld_mod_init(void)
 {
         fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
@@ -92,8 +84,6 @@ static int __init fld_mod_init(void)
         if (IS_ERR(fld_type_proc_dir))
                 return PTR_ERR(fld_type_proc_dir);
 
-        llo_local_obj_register(&llod_fld_index);
-
         LU_CONTEXT_KEY_INIT(&fld_thread_key);
         lu_context_key_register(&fld_thread_key);
         return 0;
@@ -108,107 +98,118 @@ static void __exit fld_mod_exit(void)
         }
 }
 
-/* Insert index entry and update cache. */
-int fld_server_create(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq, mdsno_t mds)
+int fld_declare_server_create(const struct lu_env *env,
+                             struct lu_server_fld *fld,
+                             struct lu_seq_range *range,
+                             struct thandle *th)
 {
-        int rc;
-        ENTRY;
-        
-        rc = fld_index_create(fld, env, seq, mds);
-        
-        if (rc == 0) {
-                /*
-                 * Do not return result of calling fld_cache_insert()
-                 * here. First of all because it may return -EEXISTS. Another
-                 * reason is that, we do not want to stop proceeding even after
-                 * cache errors.
-                 */
-                fld_cache_insert(fld->lsf_cache, seq, mds);
-        }
+       int rc;
 
-        RETURN(rc);
+       rc = fld_declare_index_create(env, fld, range, th);
+       RETURN(rc);
 }
-EXPORT_SYMBOL(fld_server_create);
+EXPORT_SYMBOL(fld_declare_server_create);
 
-/* Delete index entry. */
-int fld_server_delete(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq)
+/**
+ * Insert FLD index entry and update FLD cache.
+ *
+ * This function is called from the sequence allocator when a super-sequence
+ * is granted to a server.
+ */
+int fld_server_create(const struct lu_env *env, struct lu_server_fld *fld,
+                     struct lu_seq_range *range, struct thandle *th)
 {
-        int rc;
-        ENTRY;
+       int rc;
 
-        fld_cache_delete(fld->lsf_cache, seq);
-        rc = fld_index_delete(fld, env, seq);
-        
-        RETURN(rc);
+       mutex_lock(&fld->lsf_lock);
+       rc = fld_index_create(env, fld, range, th);
+       mutex_unlock(&fld->lsf_lock);
+
+       RETURN(rc);
 }
-EXPORT_SYMBOL(fld_server_delete);
+EXPORT_SYMBOL(fld_server_create);
 
-/* Lookup mds by seq. */
-int fld_server_lookup(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq, mdsno_t *mds)
+/**
+ *  Lookup mds by seq, returns a range for given seq.
+ *
+ *  If that entry is not cached in fld cache, request is sent to super
+ *  sequence controller node (MDT0). All other MDT[1...N] and client
+ *  cache fld entries, but this cache is not persistent.
+ */
+
+int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+                     seqno_t seq, struct lu_seq_range *range)
 {
+        struct lu_seq_range *erange;
+        struct fld_thread_info *info;
         int rc;
         ENTRY;
-        
-        /* Lookup it in the cache. */
-        rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
-        if (rc == 0)
-                RETURN(0);
 
-        rc = fld_index_lookup(fld, env, seq, mds);
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+       LASSERT(info != NULL);
+       erange = &info->fti_lrange;
+
+        /* Lookup it in the cache. */
+        rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
         if (rc == 0) {
-                /*
-                 * Do not return error here as well. See previous comment in
-                 * same situation in function fld_server_create().
-                 */
-                fld_cache_insert(fld->lsf_cache, seq, *mds);
+               if (unlikely(erange->lsr_flags != range->lsr_flags) &&
+                   range->lsr_flags != -1) {
+                        CERROR("FLD cache found a range "DRANGE" doesn't "
+                               "match the requested flag %x\n",
+                               PRANGE(erange), range->lsr_flags);
+                        RETURN(-EIO);
+                }
+                *range = *erange;
+                RETURN(0);
         }
-        RETURN(rc);
+
+       if (fld->lsf_obj) {
+               /* On server side, all entries should be in cache.
+                * If we can not find it in cache, just return error */
+               CERROR("%s: Can not found the seq "LPX64"\n",
+                       fld->lsf_name, seq);
+               RETURN(-EIO);
+       } else {
+               LASSERT(fld->lsf_control_exp);
+               /* send request to mdt0 i.e. super seq. controller.
+                * This is temporary solution, long term solution is fld
+                * replication on all mdt servers.
+                */
+               range->lsr_start = seq;
+               rc = fld_client_rpc(fld->lsf_control_exp,
+                                   range, FLD_LOOKUP);
+               if (rc == 0)
+                       fld_cache_insert(fld->lsf_cache, range);
+       }
+       RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
+/**
+ * All MDT server handle fld lookup operation. But only MDT0 has fld index.
+ * if entry is not found in cache we need to forward lookup request to MDT0
+ */
+
 static int fld_server_handle(struct lu_server_fld *fld,
                              const struct lu_env *env,
-                             __u32 opc, struct md_fld *mf,
+                             __u32 opc, struct lu_seq_range *range,
                              struct fld_thread_info *info)
 {
         int rc;
         ENTRY;
 
         switch (opc) {
-        case FLD_CREATE:
-                rc = fld_server_create(fld, env,
-                                       mf->mf_seq, mf->mf_mds);
-
-                /* Do not return -EEXIST error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
-                        rc = 0;
-                break;
-        case FLD_DELETE:
-                rc = fld_server_delete(fld, env, mf->mf_seq);
-
-                /* Do not return -ENOENT error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
-                        rc = 0;
-                break;
         case FLD_LOOKUP:
-                rc = fld_server_lookup(fld, env,
-                                       mf->mf_seq, &mf->mf_mds);
+               rc = fld_server_lookup(env, fld, range->lsr_start, range);
                 break;
         default:
                 rc = -EINVAL;
                 break;
         }
 
-        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
-               LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
-               mf->mf_seq, mf->mf_mds);
-        
+        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
+               DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
+
         RETURN(rc);
 
 }
@@ -216,15 +217,14 @@ static int fld_server_handle(struct lu_server_fld *fld,
 static int fld_req_handle(struct ptlrpc_request *req,
                           struct fld_thread_info *info)
 {
-        struct lu_site *site;
-        struct md_fld *in;
-        struct md_fld *out;
+        struct obd_export *exp = req->rq_export;
+        struct lu_site *site = exp->exp_obd->obd_lu_dev->ld_site;
+        struct lu_seq_range *in;
+        struct lu_seq_range *out;
         int rc;
         __u32 *opc;
         ENTRY;
 
-        site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
-
         rc = req_capsule_server_pack(info->fti_pill);
         if (rc)
                 RETURN(err_serious(rc));
@@ -239,9 +239,17 @@ static int fld_req_handle(struct ptlrpc_request *req,
                         RETURN(err_serious(-EPROTO));
                 *out = *in;
 
-                rc = fld_server_handle(lu_site2md(site)->ms_server_fld,
-                                       req->rq_svc_thread->t_env,
-                                       *opc, out, info);
+               /* For old 2.0 client, the 'lsr_flags' is uninitialized.
+                * Set it as 'LU_SEQ_RANGE_MDT' by default. */
+               if (!(exp_connect_flags(exp) & OBD_CONNECT_64BITHASH) &&
+                   !(exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) &&
+                   !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) &&
+                   !exp->exp_libclient)
+                       out->lsr_flags = LU_SEQ_RANGE_MDT;
+
+               rc = fld_server_handle(lu_site2seq(site)->ss_server_fld,
+                                      req->rq_svc_thread->t_env,
+                                      *opc, out, info);
         } else
                 rc = err_serious(-EPROTO);
 
@@ -251,8 +259,6 @@ static int fld_req_handle(struct ptlrpc_request *req,
 static void fld_thread_info_init(struct ptlrpc_request *req,
                                  struct fld_thread_info *info)
 {
-        info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
-
         info->fti_pill = &req->rq_pill;
         /* Init request capsule. */
         req_capsule_init(info->fti_pill, req, RCL_SERVER);
@@ -300,23 +306,29 @@ EXPORT_SYMBOL(fld_query);
  *
  * fid_is_local() is supposed to be used in assertion checks only.
  */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+int fid_is_local(const struct lu_env *env,
+                 struct lu_site *site, const struct lu_fid *fid)
 {
-        int result;
-        struct md_site *msite;
-
-        result = 1; /* conservatively assume fid is local */
-        msite = lu_site2md(site);
-        if (msite->ms_client_fld != NULL) {
-                mdsno_t mds;
-                int rc;
-
-                rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
-                                      fid_seq(fid), &mds);
-                if (rc == 0)
-                        result = (mds == msite->ms_node_id);
-        }
-        return result;
+       int result;
+       struct seq_server_site *ss_site;
+       struct lu_seq_range *range;
+       struct fld_thread_info *info;
+       ENTRY;
+
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+       range = &info->fti_lrange;
+
+       result = 1; /* conservatively assume fid is local */
+       ss_site = lu_site2seq(site);
+       if (ss_site->ss_client_fld != NULL) {
+               int rc;
+
+               rc = fld_cache_lookup(ss_site->ss_client_fld->lcf_cache,
+                                     fid_seq(fid), range);
+               if (rc == 0)
+                       result = (range->lsr_index == ss_site->ss_node_id);
+       }
+       return result;
 }
 EXPORT_SYMBOL(fid_is_local);
 
@@ -336,7 +348,14 @@ static int fld_server_proc_init(struct lu_server_fld *fld)
                 RETURN(rc);
         }
 
-        RETURN(rc);
+       rc = lprocfs_seq_create(fld->lsf_proc_dir, "fldb", 0444,
+                               &fld_proc_seq_fops, fld);
+       if (rc) {
+               lprocfs_remove(&fld->lsf_proc_dir);
+               fld->lsf_proc_dir = NULL;
+       }
+
+       RETURN(rc);
 }
 
 static void fld_server_proc_fini(struct lu_server_fld *fld)
@@ -361,10 +380,12 @@ static void fld_server_proc_fini(struct lu_server_fld *fld)
 }
 #endif
 
-int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
-                    const char *prefix, const struct lu_env *env)
+int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
+                   struct dt_device *dt, const char *prefix, int mds_node_id,
+                   __u32 lsr_flags)
 {
         int cache_size, cache_threshold;
+        struct lu_seq_range range;
         int rc;
         ENTRY;
 
@@ -377,8 +398,8 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         cache_threshold = cache_size *
                 FLD_SERVER_CACHE_THRESHOLD / 100;
 
+       mutex_init(&fld->lsf_lock);
         fld->lsf_cache = fld_cache_init(fld->lsf_name,
-                                        FLD_SERVER_HTABLE_SIZE,
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lsf_cache)) {
                 rc = PTR_ERR(fld->lsf_cache);
@@ -386,37 +407,50 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
                 GOTO(out, rc);
         }
 
-        rc = fld_index_init(fld, env, dt);
-        if (rc)
-                GOTO(out, rc);
+       if (!mds_node_id && lsr_flags == LU_SEQ_RANGE_MDT) {
+               rc = fld_index_init(env, fld, dt);
+                if (rc)
+                        GOTO(out, rc);
+        } else
+                fld->lsf_obj = NULL;
 
         rc = fld_server_proc_init(fld);
         if (rc)
                 GOTO(out, rc);
 
+        fld->lsf_control_exp = NULL;
+
+       if (lsr_flags == LU_SEQ_RANGE_MDT) {
+               /* Insert reserved sequence of "ROOT" and ".lustre"
+                * into fld cache. */
+               range.lsr_start = FID_SEQ_LOCAL_FILE;
+               range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
+               range.lsr_index = 0;
+               range.lsr_flags = lsr_flags;
+               fld_cache_insert(fld->lsf_cache, &range);
+       }
         EXIT;
 out:
-        if (rc)
-                fld_server_fini(fld, env);
-        return rc;
+       if (rc)
+               fld_server_fini(env, fld);
+       return rc;
 }
 EXPORT_SYMBOL(fld_server_init);
 
-void fld_server_fini(struct lu_server_fld *fld,
-                     const struct lu_env *env)
+void fld_server_fini(const struct lu_env *env, struct lu_server_fld *fld)
 {
-        ENTRY;
+       ENTRY;
 
-        fld_server_proc_fini(fld);
-        fld_index_fini(fld, env);
+       fld_server_proc_fini(fld);
+       fld_index_fini(env, fld);
 
-        if (fld->lsf_cache != NULL) {
-                if (!IS_ERR(fld->lsf_cache))
-                        fld_cache_fini(fld->lsf_cache);
-                fld->lsf_cache = NULL;
-        }
+       if (fld->lsf_cache != NULL) {
+               if (!IS_ERR(fld->lsf_cache))
+                       fld_cache_fini(fld->lsf_cache);
+               fld->lsf_cache = NULL;
+       }
 
-        EXIT;
+       EXIT;
 }
 EXPORT_SYMBOL(fld_server_fini);