Whamcloud - gitweb
LU-1303 fld: verify support for range lookups
[fs/lustre-release.git] / lustre / fld / fld_handler.c
index 2b6ab12..1d28b21 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -42,9 +42,6 @@
  * Author: Pravin Shelar <pravin.shelar@sun.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_FLD
 
 #ifdef __KERNEL__
@@ -67,6 +64,7 @@
 #include <lustre_fid.h>
 #include <lustre_req_layout.h>
 #include "fld_internal.h"
+#include <lustre_fid.h>
 
 #ifdef __KERNEL__
 
@@ -110,6 +108,35 @@ static void __exit fld_mod_exit(void)
         }
 }
 
+int fld_declare_server_create(struct lu_server_fld *fld,
+                              const struct lu_env *env,
+                              struct thandle *th)
+{
+        struct dt_object *dt_obj = fld->lsf_obj;
+        int rc;
+
+        ENTRY;
+
+       if (fld->lsf_no_range_lookup) {
+               /* Stub for underlying FS which can't lookup ranges */
+               return 0;
+       }
+
+        /* for ldiskfs OSD it's enough to declare operation with any ops
+         * with DMU we'll probably need to specify exact key/value */
+        rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
+        if (rc)
+                GOTO(out, rc);
+        rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
+        if (rc)
+                GOTO(out, rc);
+        rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
+                                                      NULL, NULL, th);
+out:
+        RETURN(rc);
+}
+EXPORT_SYMBOL(fld_declare_server_create);
+
 /**
  * Insert FLD index entry and update FLD cache.
  *
@@ -134,7 +161,7 @@ int fld_server_create(struct lu_server_fld *fld,
         ENTRY;
 
         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
-        mutex_lock(&fld->lsf_lock);
+        cfs_mutex_lock(&fld->lsf_lock);
 
         erange = &info->fti_lrange;
         new = &info->fti_irange;
@@ -142,20 +169,18 @@ int fld_server_create(struct lu_server_fld *fld,
 
         /* STEP 1: try to merge with previous range */
         rc = fld_index_lookup(fld, env, new->lsr_start, erange);
-        if (!rc) {
-                /* in case of range overlap, mdt ID must be same for both ranges */
-                if (new->lsr_mdt != erange->lsr_mdt) {
-                        CERROR("mdt[%x] for given range is different from"
-                               "existing overlapping range mdt[%x]\n",
-                                new->lsr_mdt, erange->lsr_mdt);
-                        rc = -EIO;
-                        GOTO(out, rc);
+        if (rc == 0) {
+                /* in case of range overlap, the location must be same */
+                if (range_compare_loc(new, erange) != 0) {
+                        CERROR("the start of given range "DRANGE" conflict to"
+                               "an existing range "DRANGE"\n",
+                               PRANGE(new), PRANGE(erange));
+                        GOTO(out, rc = -EIO);
                 }
 
                 if (new->lsr_end < erange->lsr_end)
                         GOTO(out, rc);
                 do_merge = 1;
-
         } else if (rc == -ENOENT) {
                 /* check for merge case: optimizes for single mds lustre.
                  * As entry does not exist, returned entry must be left side
@@ -163,7 +188,7 @@ int fld_server_create(struct lu_server_fld *fld,
                  * So try to merge from left.
                  */
                 if (new->lsr_start == erange->lsr_end &&
-                    new->lsr_mdt == erange->lsr_mdt)
+                    range_compare_loc(new, erange) == 0)
                         do_merge = 1;
         } else {
                 /* no overlap allowed in fld, so failure in lookup is error */
@@ -171,53 +196,62 @@ int fld_server_create(struct lu_server_fld *fld,
         }
 
         if (do_merge) {
-                /* new range can be combined with existing one.
-                 * So delete existing range.
-                 */
-
+                /* new range will be merged with the existing one.
+                 * delete this range at first. */
                 rc = fld_index_delete(fld, env, erange, th);
-                if (rc == 0) {
-                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
-                } else
+                if (rc != 0)
                         GOTO(out, rc);
 
+                new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                new->lsr_end = max(erange->lsr_end, new->lsr_end);
                 do_merge = 0;
         }
 
         /* STEP 2: try to merge with next range */
         rc = fld_index_lookup(fld, env, new->lsr_end, erange);
-        if (!rc) {
-                /* case range overlap: with right side entry. */
-                if (new->lsr_mdt == erange->lsr_mdt)
-                        do_merge = 1;
+        if (rc == 0) {
+                /* found a matched range, meaning we're either
+                 * overlapping or ajacent, must merge with it. */
+                do_merge = 1;
         } else if (rc == -ENOENT) {
                 /* this range is left of new range end point */
                 LASSERT(erange->lsr_end <= new->lsr_end);
-
-                if (new->lsr_end == erange->lsr_end)
-                        do_merge = 1;
-                if (new->lsr_start <= erange->lsr_start)
+                /*
+                 * the found left range must be either:
+                 *  1. withing new range.
+                 *  2. left of new range (no overlapping).
+                 * because if they're partly overlapping, the STEP 1 must have
+                 * been removed this range.
+                 */
+                LASSERTF(erange->lsr_start > new->lsr_start ||
+                         erange->lsr_end < new->lsr_start ||
+                         (erange->lsr_end == new->lsr_start &&
+                          range_compare_loc(new, erange) != 0),
+                         "left "DRANGE", new "DRANGE"\n",
+                         PRANGE(erange), PRANGE(new));
+
+                /* if it's within the new range, merge it */
+                if (erange->lsr_start > new->lsr_start)
                         do_merge = 1;
-        } else
+        } else {
                GOTO(out, rc);
+        }
 
         if (do_merge) {
-                if (new->lsr_mdt != erange->lsr_mdt) {
-                        CERROR("mdt[%x] for given range is different from"
-                               "existing overlapping range mdt[%x]\n",
-                                new->lsr_mdt, erange->lsr_mdt);
-                        rc = -EIO;
-                        GOTO(out, rc);
+                if (range_compare_loc(new, erange) != 0) {
+                        CERROR("the end of given range "DRANGE" overlaps "
+                               "with an existing range "DRANGE"\n",
+                               PRANGE(new), PRANGE(erange));
+                        GOTO(out, rc = -EIO);
                 }
-        
+
                 /* merge with next range */
                 rc = fld_index_delete(fld, env, erange, th);
-                if (rc == 0) {
-                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
-                } else
+                if (rc != 0)
                         GOTO(out, rc);
+
+                new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                new->lsr_end = max(erange->lsr_end, new->lsr_end);
         }
 
         /* now update fld entry. */
@@ -228,7 +262,7 @@ out:
         if (rc == 0)
                 fld_cache_insert(fld->lsf_cache, new);
 
-        mutex_unlock(&fld->lsf_lock);
+        cfs_mutex_unlock(&fld->lsf_lock);
 
         CDEBUG((rc != 0 ? D_ERROR : D_INFO),
                "%s: FLD create: given range : "DRANGE
@@ -237,7 +271,6 @@ out:
 
         RETURN(rc);
 }
-
 EXPORT_SYMBOL(fld_server_create);
 
 /**
@@ -252,17 +285,39 @@ int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
                       seqno_t seq, struct lu_seq_range *range)
 {
+        struct lu_seq_range *erange;
+        struct fld_thread_info *info;
         int rc;
         ENTRY;
 
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        erange = &info->fti_lrange;
+
         /* Lookup it in the cache. */
-        rc = fld_cache_lookup(fld->lsf_cache, seq, range);
-        if (rc == 0)
+        rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
+        if (rc == 0) {
+                if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+                        CERROR("FLD cache found a range "DRANGE" doesn't "
+                               "match the requested flag %x\n",
+                               PRANGE(erange), range->lsr_flags);
+                        RETURN(-EIO);
+                }
+                *range = *erange;
                 RETURN(0);
+        }
 
-        if (fld->lsf_obj)
-                rc = fld_index_lookup(fld, env, seq, range);
-        else {
+        if (fld->lsf_obj) {
+                rc = fld_index_lookup(fld, env, seq, erange);
+                if (rc == 0) {
+                        if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+                                CERROR("FLD found a range "DRANGE" doesn't "
+                                       "match the requested flag %x\n",
+                                       PRANGE(erange), range->lsr_flags);
+                                RETURN(-EIO);
+                        }
+                        *range = *erange;
+                }
+        } else {
                 LASSERT(fld->lsf_control_exp);
                 /* send request to mdt0 i.e. super seq. controller.
                  * This is temporary solution, long term solution is fld
@@ -304,7 +359,7 @@ static int fld_server_handle(struct lu_server_fld *fld,
 
         CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
                DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
-        
+
         RETURN(rc);
 
 }
@@ -312,15 +367,14 @@ static int fld_server_handle(struct lu_server_fld *fld,
 static int fld_req_handle(struct ptlrpc_request *req,
                           struct fld_thread_info *info)
 {
-        struct lu_site *site;
+        struct obd_export *exp = req->rq_export;
+        struct lu_site *site = exp->exp_obd->obd_lu_dev->ld_site;
         struct lu_seq_range *in;
         struct lu_seq_range *out;
         int rc;
         __u32 *opc;
         ENTRY;
 
-        site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
-
         rc = req_capsule_server_pack(info->fti_pill);
         if (rc)
                 RETURN(err_serious(rc));
@@ -335,6 +389,13 @@ static int fld_req_handle(struct ptlrpc_request *req,
                         RETURN(err_serious(-EPROTO));
                 *out = *in;
 
+                /* For old 2.0 client, the 'lsr_flags' is uninitialized.
+                 * Set it as 'LU_SEQ_RANGE_MDT' by default.
+                 * Old 2.0 liblustre client cannot talk with new 2.1 server. */
+                if (!(exp->exp_connect_flags & OBD_CONNECT_64BITHASH) &&
+                    !exp->exp_libclient)
+                        out->lsr_flags = LU_SEQ_RANGE_MDT;
+
                 rc = fld_server_handle(lu_site2md(site)->ms_server_fld,
                                        req->rq_svc_thread->t_env,
                                        *opc, out, info);
@@ -414,7 +475,7 @@ int fid_is_local(const struct lu_env *env,
                 rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
                                       fid_seq(fid), range);
                 if (rc == 0)
-                        result = (range->lsr_mdt == msite->ms_node_id);
+                        result = (range->lsr_index == msite->ms_node_id);
         }
         return result;
 }
@@ -436,7 +497,14 @@ static int fld_server_proc_init(struct lu_server_fld *fld)
                 RETURN(rc);
         }
 
-        RETURN(rc);
+       rc = lprocfs_seq_create(fld->lsf_proc_dir, "fldb", 0444,
+                               &fld_proc_seq_fops, fld);
+       if (rc) {
+               lprocfs_remove(&fld->lsf_proc_dir);
+               fld->lsf_proc_dir = NULL;
+       }
+
+       RETURN(rc);
 }
 
 static void fld_server_proc_fini(struct lu_server_fld *fld)
@@ -466,6 +534,7 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
                     int mds_node_id)
 {
         int cache_size, cache_threshold;
+        struct lu_seq_range range;
         int rc;
         ENTRY;
 
@@ -478,7 +547,7 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         cache_threshold = cache_size *
                 FLD_SERVER_CACHE_THRESHOLD / 100;
 
-        mutex_init(&fld->lsf_lock);
+        cfs_mutex_init(&fld->lsf_lock);
         fld->lsf_cache = fld_cache_init(fld->lsf_name,
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lsf_cache)) {
@@ -499,6 +568,14 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
                 GOTO(out, rc);
 
         fld->lsf_control_exp = NULL;
+
+        /* Insert reserved sequence number of ".lustre" into fld cache. */
+        range.lsr_start = FID_SEQ_DOT_LUSTRE;
+        range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
+        range.lsr_index = 0;
+        range.lsr_flags = LU_SEQ_RANGE_MDT;
+        fld_cache_insert(fld->lsf_cache, &range);
+
         EXIT;
 out:
         if (rc)