Whamcloud - gitweb
- cleanups and small fixes accordingly to Nikita's DLDINSP.
authoryury <yury>
Tue, 4 Jul 2006 17:14:50 +0000 (17:14 +0000)
committeryury <yury>
Tue, 4 Jul 2006 17:14:50 +0000 (17:14 +0000)
lustre/fld/fld_cache.c
lustre/fld/fld_handler.c
lustre/fld/fld_request.c
lustre/include/lustre_fld.h
lustre/include/lustre_net.h

index 372fb92..84863ae 100644 (file)
@@ -178,12 +178,12 @@ fld_cache_delete(struct fld_cache_info *cache, seqno_t seq)
         EXIT;
 out_unlock:
         spin_unlock(&cache->fci_lock);
-        return;
 }
 EXPORT_SYMBOL(fld_cache_delete);
 
-struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *cache, seqno_t seq)
+int
+fld_cache_lookup(struct fld_cache_info *cache,
+                 seqno_t seq, mdsno_t *mds)
 {
         struct fld_cache_entry *flde;
         struct hlist_head *bucket;
@@ -196,12 +196,36 @@ fld_cache_lookup(struct fld_cache_info *cache, seqno_t seq)
         spin_lock(&cache->fci_lock);
         hlist_for_each_entry(flde, scan, bucket, fce_list) {
                 if (flde->fce_seq == seq) {
+                        *mds = flde->fce_mds;
                         spin_unlock(&cache->fci_lock);
-                        RETURN(flde);
+                        RETURN(0);
                 }
         }
         spin_unlock(&cache->fci_lock);
-        RETURN(NULL);
+        RETURN(-ENOENT);
+}
+EXPORT_SYMBOL(fld_cache_lookup);
+#else
+int
+fld_cache_insert(struct fld_cache_info *cache,
+                 seqno_t seq, mdsno_t mds)
+{
+        return -ENOTSUPP;
+}
+EXPORT_SYMBOL(fld_cache_insert);
+
+void
+fld_cache_delete(struct fld_cache_info *cache, seqno_t seq)
+{
+        return;
+}
+EXPORT_SYMBOL(fld_cache_delete);
+
+int
+fld_cache_lookup(struct fld_cache_info *cache,
+                 seqno_t seq, mdsno_t *mds)
+{
+        return -ENOTSUPP;
 }
 EXPORT_SYMBOL(fld_cache_lookup);
 #endif
index e734802..32d3631 100644 (file)
@@ -220,12 +220,13 @@ int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
 
         result = 1; /* conservatively assume fid is local */
         if (site->ls_client_fld != NULL) {
-                struct fld_cache_entry *entry;
+                mdsno_t mds;
+                int rc;
 
-                entry = fld_cache_lookup(site->ls_client_fld->fld_cache,
-                                         fid_seq(fid));
-                if (entry != NULL)
-                        result = (entry->fce_mds == site->ls_node_id);
+                rc = fld_cache_lookup(site->ls_client_fld->fld_cache,
+                                      fid_seq(fid), &mds);
+                if (rc == 0)
+                        result = (mds == site->ls_node_id);
         }
         return result;
 }
@@ -300,8 +301,8 @@ fld_server_init(struct lu_server_fld *fld,
         struct ptlrpc_service_conf fld_conf = {
                 .psc_nbufs            = MDS_NBUFS,
                 .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
+                .psc_max_req_size     = FLD_MAXREQSIZE,
+                .psc_max_reply_size   = FLD_MAXREPSIZE,
                 .psc_req_portal       = FLD_REQUEST_PORTAL,
                 .psc_rep_portal       = MDC_REPLY_PORTAL,
                 .psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
index 0b1f781..50ce950 100644 (file)
@@ -82,6 +82,17 @@ struct lu_fld_hash fld_hash[3] = {
         }
 };
 
+/* this function makes decision if passed @target appropriate acoordingly to
+ * passed @hash. In case of usual round-robin hash, this is decided by comparing
+ * hash and target's index. In the case of DHT, algorithm is a bit more
+ * complicated. */
+static int
+fld_client_apt_target(struct fld_target *target, int hash)
+{
+        /* XXX: DHT case should be worked out. */
+        return (target->fldt_idx == hash);
+}
+
 static struct fld_target *
 fld_client_get_target(struct lu_client_fld *fld, seqno_t seq)
 {
@@ -96,12 +107,16 @@ fld_client_get_target(struct lu_client_fld *fld, seqno_t seq)
 
         list_for_each_entry(target,
                             &fld->fld_targets, fldt_chain) {
-                if (target->fldt_idx == hash) {
+                if (fld_client_apt_target(target, hash)) {
                         spin_unlock(&fld->fld_lock);
                         RETURN(target);
                 }
         }
         spin_unlock(&fld->fld_lock);
+
+        /* if target is not found, there is logical error anyway, so here is
+         * LBUG() to catch that situation. */
+        LBUG();
         RETURN(NULL);
 }
 
@@ -163,8 +178,8 @@ fld_client_del_target(struct lu_client_fld *fld,
                 {
                         fld->fld_count--;
                         list_del(&target->fldt_chain);
-                        class_export_put(target->fldt_exp);
                         spin_unlock(&fld->fld_lock);
+                        class_export_put(target->fldt_exp);
                         OBD_FREE_PTR(target);
                         RETURN(0);
                 }
@@ -220,7 +235,7 @@ fld_client_proc_fini(struct lu_client_fld *fld)
 
 static inline int hash_is_sane(int hash)
 {
-        return (hash >= 0 && hash < LUSTRE_CLI_FLD_HASH_LAST);
+        return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
 }
 
 int
@@ -245,12 +260,14 @@ fld_client_init(struct lu_client_fld *fld,
         snprintf(fld->fld_name, sizeof(fld->fld_name),
                  "%s-%s", LUSTRE_FLD_NAME, uuid);
         
+#ifdef __KERNEL__
         fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
         if (IS_ERR(fld->fld_cache)) {
                 rc = PTR_ERR(fld->fld_cache);
                 fld->fld_cache = NULL;
                 GOTO(out, rc);
         }
+#endif
 
 #ifdef LPROCFS
         rc = fld_client_proc_init(fld);
@@ -289,11 +306,13 @@ fld_client_fini(struct lu_client_fld *fld)
         }
         spin_unlock(&fld->fld_lock);
 
+#ifdef __KERNEL__
         if (fld->fld_cache != NULL) {
                 fld_cache_fini(fld->fld_cache);
                 fld->fld_cache = NULL;
         }
-        
+#endif
+
         CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
         EXIT;
 }
@@ -322,7 +341,7 @@ fld_client_rpc(struct obd_export *exp,
         *op = fld_op;
 
         pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
-        memcpy(pmf, mf, sizeof(*mf));
+        *pmf = *mf;
 
         req->rq_replen = lustre_msg_size(1, &mf_size);
         req->rq_request_portal = FLD_REQUEST_PORTAL;
@@ -339,24 +358,21 @@ out_req:
         RETURN(rc);
 }
 
-int
-fld_client_create(struct lu_client_fld *fld,
-                  seqno_t seq, mdsno_t mds)
+static int
+__fld_client_create(struct lu_client_fld *fld,
+                    seqno_t seq, mdsno_t mds,
+                    struct md_fld *md_fld)
 {
         struct fld_target *target;
-        struct md_fld      md_fld;
         __u32 rc;
         ENTRY;
 
         target = fld_client_get_target(fld, seq);
         if (!target)
                 RETURN(-EINVAL);
-        md_fld.mf_seq = seq;
-        md_fld.mf_mds = mds;
         
-        rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
+        rc = fld_client_rpc(target->fldt_exp, md_fld, FLD_CREATE);
         
-#ifdef __KERNEL__
         if (rc  == 0) {
                 /* do not return result of calling fld_cache_insert()
                  * here. First of all because it may return -EEXISTS. Another
@@ -364,90 +380,93 @@ fld_client_create(struct lu_client_fld *fld,
                  * cache errors. --umka */
                 fld_cache_insert(fld->fld_cache, seq, mds);
         }
-#endif
         
         RETURN(rc);
 }
-EXPORT_SYMBOL(fld_client_create);
 
 int
-fld_client_delete(struct lu_client_fld *fld,
-                  seqno_t seq)
+fld_client_create(struct lu_client_fld *fld,
+                  seqno_t seq, mdsno_t mds)
+{
+        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
+        __u32 rc;
+        ENTRY;
+
+        rc = __fld_client_create(fld, seq, mds, &md_fld);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_create);
+
+static int
+__fld_client_delete(struct lu_client_fld *fld,
+                    seqno_t seq, struct md_fld *md_fld)
 {
         struct fld_target *target;
-        struct md_fld      md_fld;
         __u32 rc;
 
-#ifdef __KERNEL__
         fld_cache_delete(fld->fld_cache, seq);
-#endif
         
         target = fld_client_get_target(fld, seq);
         if (!target)
                 RETURN(-EINVAL);
 
-        md_fld.mf_seq = seq;
-        md_fld.mf_mds = 0;
-
         rc = fld_client_rpc(target->fldt_exp,
-                            &md_fld, FLD_DELETE);
+                            md_fld, FLD_DELETE);
+        RETURN(rc);
+}
+
+int
+fld_client_delete(struct lu_client_fld *fld,
+                  seqno_t seq)
+{
+        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+        __u32 rc;
+
+        rc = __fld_client_delete(fld, seq, &md_fld);
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_delete);
 
 static int
-fld_client_get(struct lu_client_fld *fld,
-               seqno_t seq, mdsno_t *mds)
+__fld_client_lookup(struct lu_client_fld *fld,
+                    seqno_t seq, mdsno_t *mds,
+                    struct md_fld *md_fld)
 {
         struct fld_target *target;
-        struct md_fld md_fld;
         int rc;
         ENTRY;
 
+        /* lookup it in the cache */
+        rc = fld_cache_lookup(fld->fld_cache, seq, mds);
+        if (rc == 0)
+                RETURN(0);
+        
+        /* can not find it in the cache */
         target = fld_client_get_target(fld, seq);
         if (!target)
                 RETURN(-EINVAL);
                 
-        md_fld.mf_seq = seq;
         rc = fld_client_rpc(target->fldt_exp,
-                            &md_fld, FLD_LOOKUP);
+                            md_fld, FLD_LOOKUP);
         if (rc == 0)
-                *mds = md_fld.mf_mds;
+                *mds = md_fld->mf_mds;
 
+        /* do not return error here as well. See previous comment in same
+         * situation in function fld_client_create(). --umka */
+        fld_cache_insert(fld->fld_cache, seq, *mds);
+        
         RETURN(rc);
 }
 
-/* lookup fid in the namespace of pfid according to the name */
 int
 fld_client_lookup(struct lu_client_fld *fld,
                   seqno_t seq, mdsno_t *mds)
 {
-#ifdef __KERNEL__
-        struct fld_cache_entry *flde;
-#endif
+        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
         int rc;
         ENTRY;
 
-#ifdef __KERNEL__
-        /* lookup it in the cache */
-        flde = fld_cache_lookup(fld->fld_cache, seq);
-        if (flde != NULL) {
-                *mds = flde->fce_mds;
-                RETURN(0);
-        }
-#endif
-        
-        /* can not find it in the cache */
-        rc = fld_client_get(fld, seq, mds);
-        if (rc)
-                RETURN(rc);
-
-#ifdef __KERNEL__
-        /* do not return error here as well. See previous comment in same
-         * situation in function fld_client_create(). --umka */
-        fld_cache_insert(fld->fld_cache, seq, *mds);
-#endif
-        
+        rc = __fld_client_lookup(fld, seq, mds, &md_fld);
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_lookup);
index 757b4a4..6faf493 100644 (file)
@@ -31,8 +31,7 @@ struct lu_server_fld;
  */
 enum {
         LUSTRE_CLI_FLD_HASH_DHT = 0,
-        LUSTRE_CLI_FLD_HASH_RRB,
-        LUSTRE_CLI_FLD_HASH_LAST
+        LUSTRE_CLI_FLD_HASH_RRB
 };
 
 typedef int (*fld_hash_func_t) (struct lu_client_fld *, __u64);
@@ -155,8 +154,8 @@ int fld_cache_insert(struct fld_cache_info *cache,
 void fld_cache_delete(struct fld_cache_info *cache,
                       seqno_t seq);
 
-struct fld_cache_entry *
+int
 fld_cache_lookup(struct fld_cache_info *cache,
-                 seqno_t seq);
+                 seqno_t seq, mdsno_t *mds);
 
 #endif
index c5ed0f9..22adf17 100644 (file)
 #define MDS_MAXREQSIZE  (5 * 1024)
 #define MDS_MAXREPSIZE  max(9 * 1024, 280 + LOV_MAX_STRIPE_COUNT * 56)
 
+#define FLD_MAXREQSIZE  (8192)
+#define FLD_MAXREPSIZE  (16)
+
 /* FIXME fix all constants here.  Andreas suggests dyamically adding threads. */
 #define MGS_MAX_THREADS 8UL
 #define MGS_NUM_THREADS max(2UL, min_t(unsigned long, MGS_MAX_THREADS, \