Whamcloud - gitweb
b=20748
[fs/lustre-release.git] / lustre / fld / fld_handler.c
index ec52b5c..5092ac1 100644 (file)
@@ -1,31 +1,47 @@
-/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/fld/fld_handler.c
- *  FLD (Fids Location Database)
+ * GPL HEADER START
  *
- *  Copyright (C) 2006 Cluster File Systems, Inc.
- *   Author: Yury Umanets <umka@clusterfs.com>
- *           WangDi <wangdi@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
  */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/fld/fld_handler.c
+ *
+ * FLD (Fids Location Database)
+ *
+ * Author: Yury Umanets <umka@clusterfs.com>
+ * Author: WangDi <wangdi@clusterfs.com>
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
+ */
+
 #ifndef EXPORT_SYMTAB
 # define EXPORT_SYMTAB
 #endif
 #include <lprocfs_status.h>
 
 #include <md_object.h>
+#include <lustre_fid.h>
 #include <lustre_req_layout.h>
 #include "fld_internal.h"
+#include <lustre_fid.h>
 
 #ifdef __KERNEL__
 
+/* context key constructor/destructor: fld_key_init, fld_key_fini */
 LU_KEY_INIT_FINI(fld, struct fld_thread_info);
 
+/* context key: fld_thread_key */
 LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD|LCT_DT_THREAD);
 
 cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
 
+static struct lu_local_obj_desc llod_fld_index = {
+        .llod_name      = fld_index_name,
+        .llod_oid       = FLD_INDEX_OID,
+        .llod_is_index  = 1,
+        .llod_feat      = &fld_index_features,
+};
+
 static int __init fld_mod_init(void)
 {
         fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
@@ -67,6 +94,8 @@ static int __init fld_mod_init(void)
         if (IS_ERR(fld_type_proc_dir))
                 return PTR_ERR(fld_type_proc_dir);
 
+        llo_local_obj_register(&llod_fld_index);
+
         LU_CONTEXT_KEY_INIT(&fld_thread_key);
         lu_context_key_register(&fld_thread_key);
         return 0;
@@ -74,6 +103,7 @@ static int __init fld_mod_init(void)
 
 static void __exit fld_mod_exit(void)
 {
+        llo_local_obj_unregister(&llod_fld_index);
         lu_context_key_degister(&fld_thread_key);
         if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) {
                 lprocfs_remove(&fld_type_proc_dir);
@@ -81,106 +111,200 @@ static void __exit fld_mod_exit(void)
         }
 }
 
-/* Insert index entry and update cache. */
+/**
+ * Insert FLD index entry and update FLD cache.
+ *
+ * First it try to merge given range with existing range then update
+ * FLD index and FLD cache accordingly. FLD index consistency is maintained
+ * by this function.
+ * This function is called from the sequence allocator when a super-sequence
+ * is granted to a server.
+ */
+
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t mds)
+                      struct lu_seq_range *add_range,
+                      struct thandle *th)
 {
-        int rc;
+        struct lu_seq_range *erange;
+        struct lu_seq_range *new;
+        struct fld_thread_info *info;
+        int rc = 0;
+        int do_merge=0;
+
         ENTRY;
-        
-        rc = fld_index_create(fld, env, seq, mds);
-        
-        if (rc == 0) {
-                /*
-                 * Do not return result of calling fld_cache_insert()
-                 * here. First of all because it may return -EEXISTS. Another
-                 * reason is that, we do not want to stop proceeding even after
-                 * cache errors.
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        mutex_lock(&fld->lsf_lock);
+
+        erange = &info->fti_lrange;
+        new = &info->fti_irange;
+        *new = *add_range;
+
+        /* STEP 1: try to merge with previous range */
+        rc = fld_index_lookup(fld, env, new->lsr_start, erange);
+        if (!rc) {
+                /* in case of range overlap, mdt ID must be same for both ranges */
+                if (new->lsr_mdt != erange->lsr_mdt) {
+                        CERROR("mdt[%x] for given range is different from"
+                               "existing overlapping range mdt[%x]\n",
+                                new->lsr_mdt, erange->lsr_mdt);
+                        rc = -EIO;
+                        GOTO(out, rc);
+                }
+
+                if (new->lsr_end < erange->lsr_end)
+                        GOTO(out, rc);
+                do_merge = 1;
+
+        } else if (rc == -ENOENT) {
+                /* check for merge case: optimizes for single mds lustre.
+                 * As entry does not exist, returned entry must be left side
+                 * entry compared to start of new range (ref dio_lookup()).
+                 * So try to merge from left.
                  */
-                fld_cache_insert(fld->lsf_cache, seq, mds);
+                if (new->lsr_start == erange->lsr_end &&
+                    new->lsr_mdt == erange->lsr_mdt)
+                        do_merge = 1;
+        } else {
+                /* no overlap allowed in fld, so failure in lookup is error */
+                GOTO(out, rc);
         }
 
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_server_create);
+        if (do_merge) {
+                /* new range can be combined with existing one.
+                 * So delete existing range.
+                 */
 
-/* Delete index entry. */
-int fld_server_delete(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq)
-{
-        int rc;
-        ENTRY;
+                rc = fld_index_delete(fld, env, erange, th);
+                if (rc == 0) {
+                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
+                } else
+                        GOTO(out, rc);
 
-        fld_cache_delete(fld->lsf_cache, seq);
-        rc = fld_index_delete(fld, env, seq);
+                do_merge = 0;
+        }
+
+        /* STEP 2: try to merge with next range */
+        rc = fld_index_lookup(fld, env, new->lsr_end, erange);
+        if (!rc) {
+                /* case range overlap: with right side entry. */
+                if (new->lsr_mdt == erange->lsr_mdt)
+                        do_merge = 1;
+        } else if (rc == -ENOENT) {
+                /* this range is left of new range end point */
+                LASSERT(erange->lsr_end <= new->lsr_end);
+
+                if (new->lsr_end == erange->lsr_end)
+                        do_merge = 1;
+                if (new->lsr_start <= erange->lsr_start)
+                        do_merge = 1;
+        } else
+               GOTO(out, rc);
+
+        if (do_merge) {
+                if (new->lsr_mdt != erange->lsr_mdt) {
+                        CERROR("mdt[%x] for given range is different from"
+                               "existing overlapping range mdt[%x]\n",
+                                new->lsr_mdt, erange->lsr_mdt);
+                        rc = -EIO;
+                        GOTO(out, rc);
+                }
         
+                /* merge with next range */
+                rc = fld_index_delete(fld, env, erange, th);
+                if (rc == 0) {
+                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
+                } else
+                        GOTO(out, rc);
+        }
+
+        /* now update fld entry. */
+        rc = fld_index_create(fld, env, new, th);
+
+        LASSERT(rc != -EEXIST);
+out:
+        if (rc == 0)
+                fld_cache_insert(fld->lsf_cache, new);
+
+        mutex_unlock(&fld->lsf_lock);
+
+        CDEBUG((rc != 0 ? D_ERROR : D_INFO),
+               "%s: FLD create: given range : "DRANGE
+               "after merge "DRANGE" rc = %d \n", fld->lsf_name,
+                PRANGE(add_range), PRANGE(new), rc);
+
         RETURN(rc);
 }
-EXPORT_SYMBOL(fld_server_delete);
 
-/* Lookup mds by seq. */
+EXPORT_SYMBOL(fld_server_create);
+
+/**
+ *  Lookup mds by seq, returns a range for given seq.
+ *
+ *  If that entry is not cached in fld cache, request is sent to super
+ *  sequence controller node (MDT0). All other MDT[1...N] and client
+ *  cache fld entries, but this cache is not persistent.
+ */
+
 int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t *mds)
+                      seqno_t seq, struct lu_seq_range *range)
 {
         int rc;
         ENTRY;
-        
+
         /* Lookup it in the cache. */
-        rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
+        rc = fld_cache_lookup(fld->lsf_cache, seq, range);
         if (rc == 0)
                 RETURN(0);
 
-        rc = fld_index_lookup(fld, env, seq, mds);
-        if (rc == 0) {
-                /*
-                 * Do not return error here as well. See previous comment in
-                 * same situation in function fld_server_create().
+        if (fld->lsf_obj)
+                rc = fld_index_lookup(fld, env, seq, range);
+        else {
+                LASSERT(fld->lsf_control_exp);
+                /* send request to mdt0 i.e. super seq. controller.
+                 * This is temporary solution, long term solution is fld
+                 * replication on all mdt servers.
                  */
-                fld_cache_insert(fld->lsf_cache, seq, *mds);
+                rc = fld_client_rpc(fld->lsf_control_exp,
+                                    range, FLD_LOOKUP);
         }
+
+        if (rc == 0)
+                fld_cache_insert(fld->lsf_cache, range);
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
+/**
+ * All MDT server handle fld lookup operation. But only MDT0 has fld index.
+ * if entry is not found in cache we need to forward lookup request to MDT0
+ */
+
 static int fld_server_handle(struct lu_server_fld *fld,
                              const struct lu_env *env,
-                             __u32 opc, struct md_fld *mf,
+                             __u32 opc, struct lu_seq_range *range,
                              struct fld_thread_info *info)
 {
         int rc;
         ENTRY;
 
         switch (opc) {
-        case FLD_CREATE:
-                rc = fld_server_create(fld, env,
-                                       mf->mf_seq, mf->mf_mds);
-
-                /* Do not return -EEXIST error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
-                        rc = 0;
-                break;
-        case FLD_DELETE:
-                rc = fld_server_delete(fld, env, mf->mf_seq);
-
-                /* Do not return -ENOENT error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
-                        rc = 0;
-                break;
         case FLD_LOOKUP:
                 rc = fld_server_lookup(fld, env,
-                                       mf->mf_seq, &mf->mf_mds);
+                                       range->lsr_start, range);
                 break;
         default:
                 rc = -EINVAL;
                 break;
         }
 
-        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
-               LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
-               mf->mf_seq, mf->mf_mds);
+        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
+               DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
         
         RETURN(rc);
 
@@ -190,29 +314,29 @@ static int fld_req_handle(struct ptlrpc_request *req,
                           struct fld_thread_info *info)
 {
         struct lu_site *site;
-        struct md_fld *in;
-        struct md_fld *out;
+        struct lu_seq_range *in;
+        struct lu_seq_range *out;
         int rc;
         __u32 *opc;
         ENTRY;
 
         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
 
-        rc = req_capsule_pack(&info->fti_pill);
+        rc = req_capsule_server_pack(info->fti_pill);
         if (rc)
                 RETURN(err_serious(rc));
 
-        opc = req_capsule_client_get(&info->fti_pill, &RMF_FLD_OPC);
+        opc = req_capsule_client_get(info->fti_pill, &RMF_FLD_OPC);
         if (opc != NULL) {
-                in = req_capsule_client_get(&info->fti_pill, &RMF_FLD_MDFLD);
+                in = req_capsule_client_get(info->fti_pill, &RMF_FLD_MDFLD);
                 if (in == NULL)
                         RETURN(err_serious(-EPROTO));
-                out = req_capsule_server_get(&info->fti_pill, &RMF_FLD_MDFLD);
+                out = req_capsule_server_get(info->fti_pill, &RMF_FLD_MDFLD);
                 if (out == NULL)
                         RETURN(err_serious(-EPROTO));
                 *out = *in;
 
-                rc = fld_server_handle(site->ls_server_fld,
+                rc = fld_server_handle(lu_site2md(site)->ms_server_fld,
                                        req->rq_svc_thread->t_env,
                                        *opc, out, info);
         } else
@@ -224,24 +348,15 @@ static int fld_req_handle(struct ptlrpc_request *req,
 static void fld_thread_info_init(struct ptlrpc_request *req,
                                  struct fld_thread_info *info)
 {
-        int i;
-
-        info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
-
-        /* Mark rep buffer as req-layout stuff expects. */
-        for (i = 0; i < ARRAY_SIZE(info->fti_rep_buf_size); i++)
-                info->fti_rep_buf_size[i] = -1;
-
+        info->fti_pill = &req->rq_pill;
         /* Init request capsule. */
-        req_capsule_init(&info->fti_pill, req, RCL_SERVER,
-                         info->fti_rep_buf_size);
-
-        req_capsule_set(&info->fti_pill, &RQF_FLD_QUERY);
+        req_capsule_init(info->fti_pill, req, RCL_SERVER);
+        req_capsule_set(info->fti_pill, &RQF_FLD_QUERY);
 }
 
 static void fld_thread_info_fini(struct fld_thread_info *info)
 {
-        req_capsule_fini(&info->fti_pill);
+        req_capsule_fini(info->fti_pill);
 }
 
 static int fld_handle(struct ptlrpc_request *req)
@@ -268,7 +383,7 @@ static int fld_handle(struct ptlrpc_request *req)
  */
 int fld_query(struct com_thread_info *info)
 {
-        return fld_handle(info->cti_pill.rc_req);
+        return fld_handle(info->cti_pill->rc_req);
 }
 EXPORT_SYMBOL(fld_query);
 
@@ -280,19 +395,27 @@ EXPORT_SYMBOL(fld_query);
  *
  * fid_is_local() is supposed to be used in assertion checks only.
  */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+int fid_is_local(const struct lu_env *env,
+                 struct lu_site *site, const struct lu_fid *fid)
 {
         int result;
+        struct md_site *msite;
+        struct lu_seq_range *range;
+        struct fld_thread_info *info;
+        ENTRY;
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        range = &info->fti_lrange;
 
         result = 1; /* conservatively assume fid is local */
-        if (site->ls_client_fld != NULL) {
-                mdsno_t mds;
+        msite = lu_site2md(site);
+        if (msite->ms_client_fld != NULL) {
                 int rc;
 
-                rc = fld_cache_lookup(site->ls_client_fld->lcf_cache,
-                                      fid_seq(fid), &mds);
+                rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
+                                      fid_seq(fid), range);
                 if (rc == 0)
-                        result = (mds == site->ls_node_id);
+                        result = (range->lsr_mdt == msite->ms_node_id);
         }
         return result;
 }
@@ -340,9 +463,11 @@ static void fld_server_proc_fini(struct lu_server_fld *fld)
 #endif
 
 int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
-                    const char *prefix, const struct lu_env *env)
+                    const char *prefix, const struct lu_env *env,
+                    int mds_node_id)
 {
         int cache_size, cache_threshold;
+        struct lu_seq_range range;
         int rc;
         ENTRY;
 
@@ -355,8 +480,8 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         cache_threshold = cache_size *
                 FLD_SERVER_CACHE_THRESHOLD / 100;
 
+        mutex_init(&fld->lsf_lock);
         fld->lsf_cache = fld_cache_init(fld->lsf_name,
-                                        FLD_SERVER_HTABLE_SIZE,
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lsf_cache)) {
                 rc = PTR_ERR(fld->lsf_cache);
@@ -364,14 +489,25 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
                 GOTO(out, rc);
         }
 
-        rc = fld_index_init(fld, env, dt);
-        if (rc)
-                GOTO(out, rc);
+        if (!mds_node_id) {
+                rc = fld_index_init(fld, env, dt);
+                if (rc)
+                        GOTO(out, rc);
+        } else
+                fld->lsf_obj = NULL;
 
         rc = fld_server_proc_init(fld);
         if (rc)
                 GOTO(out, rc);
 
+        fld->lsf_control_exp = NULL;
+
+        /* Insert reserved sequence number of ".lustre" into fld cache. */
+        range.lsr_start = LU_DOT_LUSTRE_SEQ;
+        range.lsr_end = LU_DOT_LUSTRE_SEQ + 1;
+        range.lsr_mdt = 0;
+        fld_cache_insert(fld->lsf_cache, &range);
+
         EXIT;
 out:
         if (rc)
@@ -398,7 +534,7 @@ void fld_server_fini(struct lu_server_fld *fld,
 }
 EXPORT_SYMBOL(fld_server_fini);
 
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 MODULE_DESCRIPTION("Lustre FLD");
 MODULE_LICENSE("GPL");