Whamcloud - gitweb
- cleanups in seq-mgr, move cache statistics to lu_cache;
authoryury <yury>
Tue, 17 Oct 2006 14:12:21 +0000 (14:12 +0000)
committeryury <yury>
Tue, 17 Oct 2006 14:12:21 +0000 (14:12 +0000)
- remove needless locks in fld server and client.

lustre/fid/fid_internal.h
lustre/fid/fid_request.c
lustre/fld/fld_cache.c
lustre/fld/fld_handler.c
lustre/fld/fld_request.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/include/lustre_fld.h
lustre/llite/llite_fid.c
lustre/mdd/mdd_handler.c

index b88445a..73d5959 100644 (file)
@@ -46,6 +46,15 @@ struct seq_thread_info {
 
 extern struct lu_context_key seq_thread_key;
 
+/* Functions used internally in module. */
+int seq_client_alloc_super(struct lu_client_seq *seq,
+                           const struct lu_env *env);
+
+int seq_client_replay_super(struct lu_client_seq *seq,
+                            struct lu_range *range,
+                            const struct lu_env *env);
+
+/* Store API functions. */
 int seq_store_init(struct lu_server_seq *seq,
                    const struct lu_env *env,
                    struct dt_device *dt);
index 180d8fe..c5a79e9 100644 (file)
 #include <lustre_mdc.h>
 #include "fid_internal.h"
 
-static int seq_client_rpc(struct lu_client_seq *seq,
-                          struct lu_range *input,
-                          struct lu_range *output,
-                          __u32 opc, const char *opcname)
+static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
+                          struct lu_range *output, __u32 opc,
+                          const char *opcname)
 {
         int rc, size[3] = { sizeof(struct ptlrpc_body),
                             sizeof(__u32),
@@ -64,10 +63,8 @@ static int seq_client_rpc(struct lu_client_seq *seq,
         __u32 *op;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                             LUSTRE_MDS_VERSION,
-                              SEQ_QUERY, 3, size,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              SEQ_QUERY, 3, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -117,11 +114,6 @@ static int seq_client_rpc(struct lu_client_seq *seq,
                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
                 GOTO(out_req, rc = -EINVAL);
         }
-
-        /* 
-         * Save server response to request for recovery case, it will be sent to
-         * server later if needed.
-         */
         *in = *out;
 
         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
@@ -135,47 +127,15 @@ out_req:
 }
 
 /* Request sequence-controller node to allocate new super-sequence. */
-static int __seq_client_alloc_super(struct lu_client_seq *seq,
-                                    const struct lu_env *env)
-{
-        int rc;
-
-#ifdef __KERNEL__
-        if (seq->lcs_srv) {
-                LASSERT(env != NULL);
-                rc = seq_server_alloc_super(seq->lcs_srv, NULL,
-                                            &seq->lcs_space, env);
-        } else {
-#endif
-                rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
-                                    SEQ_ALLOC_SUPER, "super");
-#ifdef __KERNEL__
-        }
-#endif
-        return rc;
-}
-
-int seq_client_alloc_super(struct lu_client_seq *seq,
-                           const struct lu_env *env)
+int seq_client_replay_super(struct lu_client_seq *seq,
+                            struct lu_range *range,
+                            const struct lu_env *env)
 {
         int rc;
         ENTRY;
 
         down(&seq->lcs_sem);
-        rc = __seq_client_alloc_super(seq, env);
-        up(&seq->lcs_sem);
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_super);
-
-/* Request sequence-controller node to allocate new super-sequence. */
-static int __seq_client_replay_super(struct lu_client_seq *seq,
-                                     struct lu_range *range,
-                                     const struct lu_env *env)
-{
-        int rc = 0;
-
+        
 #ifdef __KERNEL__
         if (seq->lcs_srv) {
                 LASSERT(env != NULL);
@@ -183,47 +143,35 @@ static int __seq_client_replay_super(struct lu_client_seq *seq,
                                             &seq->lcs_space, env);
         } else {
 #endif
-#if 0
-                /* 
-                 * XXX: Seems we do not need to replay in case of remote
-                 * controller. Lustre anyway supports onlu signle failure
-                 * recovery.
-                 */
                 rc = seq_client_rpc(seq, range, &seq->lcs_space,
                                     SEQ_ALLOC_SUPER, "super");
-#endif
 #ifdef __KERNEL__
         }
 #endif
-        return rc;
+        up(&seq->lcs_sem);
+        RETURN(rc);
 }
 
-int seq_client_replay_super(struct lu_client_seq *seq,
-                            struct lu_range *range,
-                            const struct lu_env *env)
+/* Request sequence-controller node to allocate new super-sequence. */
+int seq_client_alloc_super(struct lu_client_seq *seq,
+                           const struct lu_env *env)
 {
-        int rc;
         ENTRY;
-
-        down(&seq->lcs_sem);
-        rc = __seq_client_replay_super(seq, range, env);
-        up(&seq->lcs_sem);
-
-        RETURN(rc);
+        RETURN(seq_client_replay_super(seq, NULL, env));
 }
 
 /* Request sequence-controller node to allocate new meta-sequence. */
-static int __seq_client_alloc_meta(struct lu_client_seq *seq,
-                                   const struct lu_env *env)
+static int seq_client_alloc_meta(struct lu_client_seq *seq,
+                                 const struct lu_env *env)
 {
         int rc;
+        ENTRY;
 
 #ifdef __KERNEL__
         if (seq->lcs_srv) {
                 LASSERT(env != NULL);
                 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
-                                           &seq->lcs_space,
-                                           env);
+                                           &seq->lcs_space, env);
         } else {
 #endif
                 rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
@@ -231,37 +179,19 @@ static int __seq_client_alloc_meta(struct lu_client_seq *seq,
 #ifdef __KERNEL__
         }
 #endif
-        return rc;
-}
-
-int seq_client_alloc_meta(struct lu_client_seq *seq,
-                          const struct lu_env *env)
-{
-        int rc;
-        ENTRY;
-
-        down(&seq->lcs_sem);
-        rc = __seq_client_alloc_meta(seq, env);
-        up(&seq->lcs_sem);
-
         RETURN(rc);
 }
-EXPORT_SYMBOL(seq_client_alloc_meta);
 
-/* allocate new sequence for client (llite or MDC are expected to use this) */
-static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+/* Allocate new sequence for client. */
+static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
 {
-        int rc = 0;
+        int rc;
         ENTRY;
 
         LASSERT(range_is_sane(&seq->lcs_space));
 
-        /*
-         * If we still have free sequences in meta-sequence we allocate new seq
-         * from given range, if not - allocate new meta-sequence.
-         */
-        if (range_space(&seq->lcs_space) == 0) {
-                rc = __seq_client_alloc_meta(seq, NULL);
+        if (range_is_exhausted(&seq->lcs_space)) {
+                rc = seq_client_alloc_meta(seq, NULL);
                 if (rc) {
                         CERROR("%s: Can't allocate new meta-sequence, "
                                "rc %d\n", seq->lcs_name, rc);
@@ -270,55 +200,47 @@ static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
                                seq->lcs_name, PRANGE(&seq->lcs_space));
                 }
+        } else {
+                rc = 0;
         }
 
-        LASSERT(range_space(&seq->lcs_space) > 0);
+        LASSERT(!range_is_exhausted(&seq->lcs_space));
         *seqnr = seq->lcs_space.lr_start;
-        seq->lcs_space.lr_start++;
-
-        CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n",
-               seq->lcs_name, *seqnr);
-        RETURN(rc);
-}
-
-int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
-{
-        int rc = 0;
-        ENTRY;
-
-        down(&seq->lcs_sem);
-        rc = __seq_client_alloc_seq(seq, seqnr);
-        up(&seq->lcs_sem);
+        seq->lcs_space.lr_start += 1;
 
+        CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
+               *seqnr);
+        
         RETURN(rc);
 }
-EXPORT_SYMBOL(seq_client_alloc_seq);
 
+/* Allocate new fid on passed client @seq and save it to @fid. */
 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
 {
         int rc;
         ENTRY;
 
+        LASSERT(seq != NULL);
         LASSERT(fid != NULL);
 
         down(&seq->lcs_sem);
 
-        if (!fid_is_sane(&seq->lcs_fid) ||
+        if (fid_is_zero(&seq->lcs_fid) ||
             fid_oid(&seq->lcs_fid) >= seq->lcs_width)
         {
                 seqno_t seqnr;
 
-                /*
-                 * Allocate new sequence for case client has no sequence at all
-                 * or sequence is exhausted and should be switched.
-                 */
-                rc = __seq_client_alloc_seq(seq, &seqnr);
+                rc = seq_client_alloc_seq(seq, &seqnr);
                 if (rc) {
                         CERROR("%s: Can't allocate new sequence, "
                                "rc %d\n", seq->lcs_name, rc);
-                        GOTO(out, rc);
+                        up(&seq->lcs_sem);
+                        RETURN(rc);
                 }
 
+                CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence "
+                       "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
+
                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
                 seq->lcs_fid.f_seq = seqnr;
                 seq->lcs_fid.f_ver = 0;
@@ -328,24 +250,17 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
                  * to setup FLD for it.
                  */
                 rc = 1;
-
-                CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence "
-                       "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
         } else {
-                seq->lcs_fid.f_oid++;
+                /* Just bump last allocated fid and return to caller. */
+                seq->lcs_fid.f_oid += 1;
                 rc = 0;
         }
-
+        
         *fid = seq->lcs_fid;
-        LASSERT(fid_is_sane(fid));
-
-        CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,
-               PFID(fid));
-
-        EXIT;
-out:
         up(&seq->lcs_sem);
-        return rc;
+
+        CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
+        RETURN(rc);
 }
 EXPORT_SYMBOL(seq_client_alloc_fid);
 
index 16eaab6..5d4d347 100644 (file)
@@ -120,7 +120,8 @@ struct fld_cache *fld_cache_init(const char *name, int hash_size,
 
         for (i = 0; i < hash_size; i++)
                 INIT_HLIST_HEAD(&cache->fci_hash_table[i]);
-
+        memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
+        
         CDEBUG(D_INFO|D_WARNING, "%s: FLD cache - Size: %d, Threshold: %d\n", 
                cache->fci_name, cache_size, cache_threshold);
 
@@ -130,11 +131,24 @@ EXPORT_SYMBOL(fld_cache_init);
 
 void fld_cache_fini(struct fld_cache *cache)
 {
+        __u64 pct;
         ENTRY;
 
         LASSERT(cache != NULL);
         fld_cache_flush(cache);
 
+        if (cache->fci_stat.fst_count > 0) {
+                pct = cache->fci_stat.fst_cache * 100;
+                do_div(pct, cache->fci_stat.fst_count);
+        } else {
+                pct = 0;
+        }
+
+        printk("FLD cache statistics (%s):\n", cache->fci_name);
+        printk("  Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
+        printk("  Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
+        printk("  Cache hits: "LPU64"%%\n", pct);
+        
        OBD_FREE(cache->fci_hash_table, cache->fci_hash_size *
                 sizeof(*cache->fci_hash_table));
        OBD_FREE_PTR(cache);
@@ -286,11 +300,13 @@ int fld_cache_lookup(struct fld_cache *cache,
         bucket = fld_cache_bucket(cache, seq);
 
         spin_lock(&cache->fci_lock);
+        cache->fci_stat.fst_count++;
         hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
                 if (flde->fce_seq == seq) {
                         *mds = flde->fce_mds;
                         list_del(&flde->fce_lru);
                         list_add(&flde->fce_lru, &cache->fci_lru);
+                        cache->fci_stat.fst_cache++;
                         spin_unlock(&cache->fci_lock);
                         RETURN(0);
                 }
index 0956bd0..7fafc6e 100644 (file)
@@ -152,14 +152,10 @@ int fld_server_lookup(struct lu_server_fld *fld,
         int rc;
         ENTRY;
         
-        fld->lsf_stat.fst_count++;
-        
         /* Lookup it in the cache. */
         rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
-        if (rc == 0) {
-                fld->lsf_stat.fst_cache++;
+        if (rc == 0)
                 RETURN(0);
-        }
 
         rc = fld_index_lookup(fld, env, seq, mds);
         if (rc == 0) {
@@ -181,8 +177,6 @@ static int fld_server_handle(struct lu_server_fld *fld,
         int rc;
         ENTRY;
 
-        down(&fld->lsf_sem);
-        
         switch (opc) {
         case FLD_CREATE:
                 rc = fld_server_create(fld, env,
@@ -208,8 +202,6 @@ static int fld_server_handle(struct lu_server_fld *fld,
                 break;
         }
 
-        up(&fld->lsf_sem);
-
         CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
                LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
                mf->mf_seq, mf->mf_mds);
@@ -378,12 +370,9 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         int rc;
         ENTRY;
 
-        memset(&fld->lsf_stat, 0, sizeof(fld->lsf_stat));
         snprintf(fld->lsf_name, sizeof(fld->lsf_name),
                  "srv-%s", prefix);
 
-        sema_init(&fld->lsf_sem, 1);
-        
         cache_size = FLD_SERVER_CACHE_SIZE /
                 sizeof(struct fld_cache_entry);
 
@@ -418,21 +407,8 @@ EXPORT_SYMBOL(fld_server_init);
 void fld_server_fini(struct lu_server_fld *fld,
                      const struct lu_env *env)
 {
-        __u64 pct;
         ENTRY;
 
-        if (fld->lsf_stat.fst_count > 0) {
-                pct = fld->lsf_stat.fst_cache * 100;
-                do_div(pct, fld->lsf_stat.fst_count);
-        } else {
-                pct = 0;
-        }
-
-        printk("FLD cache statistics (%s):\n", fld->lsf_name);
-        printk("  Total reqs: "LPU64"\n", fld->lsf_stat.fst_count);
-        printk("  Cache reqs: "LPU64"\n", fld->lsf_stat.fst_cache);
-        printk("  Cache hits: "LPU64"%%\n", pct);
-
         fld_server_proc_fini(fld);
         fld_index_fini(fld, env);
 
index d1e0bff..2c6a3ed 100644 (file)
@@ -298,7 +298,6 @@ int fld_client_init(struct lu_client_fld *fld,
 
         LASSERT(fld != NULL);
 
-        memset(&fld->lcf_stat, 0, sizeof(fld->lcf_stat));
         snprintf(fld->lcf_name, sizeof(fld->lcf_name),
                  "cli-%s", prefix);
 
@@ -310,7 +309,6 @@ int fld_client_init(struct lu_client_fld *fld,
 
         fld->lcf_count = 0;
         spin_lock_init(&fld->lcf_lock);
-        sema_init(&fld->lcf_sem, 1);
         fld->lcf_hash = &fld_hash[hash];
         INIT_LIST_HEAD(&fld->lcf_targets);
 
@@ -349,21 +347,8 @@ EXPORT_SYMBOL(fld_client_init);
 void fld_client_fini(struct lu_client_fld *fld)
 {
         struct lu_fld_target *target, *tmp;
-        __u64 pct;
         ENTRY;
 
-        if (fld->lcf_stat.fst_count > 0) {
-                pct = fld->lcf_stat.fst_cache * 100;
-                do_div(pct, fld->lcf_stat.fst_count);
-        } else {
-                pct = 0;
-        }
-
-        printk("FLD cache statistics (%s):\n", fld->lcf_name);
-        printk("  Total reqs: "LPU64"\n", fld->lcf_stat.fst_count);
-        printk("  Cache reqs: "LPU64"\n", fld->lcf_stat.fst_cache);
-        printk("  Cache hits: "LPU64"%%\n", pct);
-        
         fld_client_proc_fini(fld);
 
         spin_lock(&fld->lcf_lock);
@@ -452,8 +437,6 @@ int fld_client_create(struct lu_client_fld *fld,
         int rc;
         ENTRY;
 
-        down(&fld->lcf_sem);
-        
         target = fld_client_get_target(fld, seq);
         LASSERT(target != NULL);
 
@@ -486,7 +469,6 @@ int fld_client_create(struct lu_client_fld *fld,
                 CERROR("%s: Can't create FLD entry, rc %d\n",
                        fld->lcf_name, rc);
         }
-        up(&fld->lcf_sem);
         
         RETURN(rc);
 }
@@ -500,8 +482,6 @@ int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
         int rc;
         ENTRY;
 
-        down(&fld->lcf_sem);
-        
         fld_cache_delete(fld->lcf_cache, seq);
 
         target = fld_client_get_target(fld, seq);
@@ -524,7 +504,6 @@ int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
         }
 #endif
 
-        up(&fld->lcf_sem);
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_delete);
@@ -538,17 +517,10 @@ int fld_client_lookup(struct lu_client_fld *fld,
         int rc;
         ENTRY;
 
-        down(&fld->lcf_sem);
-
-        fld->lcf_stat.fst_count++;
-                
         /* Lookup it in the cache */
         rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
-        if (rc == 0) {
-                fld->lcf_stat.fst_cache++;
-                up(&fld->lcf_sem);
+        if (rc == 0)
                 RETURN(0);
-        }
 
         /* Can not find it in the cache */
         target = fld_client_get_target(fld, seq);
@@ -579,7 +551,6 @@ int fld_client_lookup(struct lu_client_fld *fld,
                  */
                 fld_cache_insert(fld->lcf_cache, seq, *mds);
         }
-        up(&fld->lcf_sem);
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_lookup);
index c872f9f..1fddc38 100644 (file)
@@ -241,6 +241,11 @@ static inline int fid_is_sane(const struct lu_fid *fid)
                 fid_seq_is_sane(fid_seq(fid)) && fid_oid(fid) != 0;
 }
 
+static inline int fid_is_zero(const struct lu_fid *fid)
+{
+        return fid_seq(fid) == 0 && fid_oid(fid) == 0;
+}
+
 #define DFID "[0x%16.16"LPF64"x/0x%8.8x:0x%8.8x]"
 
 #define PFID(fid)     \
index 9a70219..febb8b9 100644 (file)
@@ -76,7 +76,7 @@ struct lu_server_seq;
 
 /* Client sequence manager interface. */
 struct lu_client_seq {
-        /* sequence-controller export. */
+        /* Sequence-controller export. */
         struct obd_export      *lcs_exp;
         struct semaphore        lcs_sem;
 
@@ -108,7 +108,7 @@ struct lu_client_seq {
          */
         __u64                   lcs_width;
 
-        /* seq-server for direct talking */
+        /* Seq-server for direct talking */
         struct lu_server_seq   *lcs_srv;
 };
 
@@ -186,25 +186,11 @@ int seq_client_init(struct lu_client_seq *seq,
 
 void seq_client_fini(struct lu_client_seq *seq);
 
-int seq_client_alloc_super(struct lu_client_seq *seq,
-                           const struct lu_env *env);
-
-int seq_client_replay_super(struct lu_client_seq *seq,
-                            struct lu_range *range,
-                            const struct lu_env *env);
-
-int seq_client_alloc_meta(struct lu_client_seq *seq,
-                          const struct lu_env *env);
-
-int seq_client_alloc_seq(struct lu_client_seq *seq,
-                         seqno_t *seqnr);
-
 int seq_client_alloc_fid(struct lu_client_seq *seq,
                          struct lu_fid *fid);
 
 /* Fids common stuff */
 int fid_is_local(struct lu_site *site, const struct lu_fid *fid);
-
 void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src);
 void fid_cpu_to_be(struct lu_fid *dst, const struct lu_fid *src);
 void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src);
index 0ca8205..d3780ee 100644 (file)
@@ -102,6 +102,9 @@ struct fld_cache {
         /* Lru list */
         struct list_head         fci_lru;
 
+        /* Cache statistics. */
+        struct fld_stats         fci_stat;
+        
         /* Cache name used for debug and messages. */
         char                     fci_name[80];
 };
@@ -119,9 +122,6 @@ struct lu_server_fld {
         /* Protect index modifications */
         struct semaphore         lsf_sem;
 
-        /* Server cache statistics. */
-        struct fld_stats         lsf_stat;
-        
         /* Fld service name in form "fld-srv-lustre-MDTXXX" */
         char                     lsf_name[80];
 };
@@ -142,9 +142,6 @@ struct lu_client_fld {
         /* Lock protecting exports list and fld_hash. */
         spinlock_t               lcf_lock;
 
-        /* Client cache statistics. */
-        struct fld_stats         lcf_stat;
-        
         /* Protect fld req + cache modification. */
         struct semaphore         lcf_sem;
 
index e919e92..db63a3c 100644 (file)
@@ -145,8 +145,10 @@ ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
         ino_t ino;
         ENTRY;
 
-        /* very stupid and having many downsides inode allocation algorithm
-         * based on fid. */
+        /*
+         * Very stupid and having many downsides inode allocation algorithm
+         * based on fid.
+         */
         ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(fid);
         RETURN(ino & 0x7fffffff);
 }
index 969d367..587a954 100644 (file)
@@ -2397,8 +2397,8 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         struct lu_fid     *fid       = &info->mti_fid;
         struct mdd_object *obj       = md2mdd_obj(pobj);
         int rc;
-
         ENTRY;
+
         /* EEXIST check */
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);