Whamcloud - gitweb
- many fixes in fld, adding it to LMv and CMM
authoryury <yury>
Fri, 16 Jun 2006 14:28:15 +0000 (14:28 +0000)
committeryury <yury>
Fri, 16 Jun 2006 14:28:15 +0000 (14:28 +0000)
14 files changed:
lustre/cmm/cmm_device.c
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c
lustre/fid/fid_handler.c
lustre/fld/fld_handler.c
lustre/fld/fld_iam.c
lustre/fld/fld_internal.h
lustre/include/lustre_export.h
lustre/include/lustre_fld.h
lustre/include/obd.h
lustre/lmv/lmv_fld.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/mdt/mdt_handler.c

index 3550c5b..ffbdadf 100644 (file)
@@ -96,7 +96,7 @@ extern struct lu_device_type mdc_device_type;
 /* --- cmm_lu_operations --- */
 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
 static int cmm_add_mdc(const struct lu_context *ctx,
-                       struct cmm_device * cm, struct lustre_cfg *cfg)
+                       struct cmm_device *cm, struct lustre_cfg *cfg)
 {
         struct lu_device_type *ldt = &mdc_device_type;
         struct lu_device *ld;
@@ -140,6 +140,9 @@ static int cmm_add_mdc(const struct lu_context *ctx,
                 spin_unlock(&cm->cmm_tgt_guard);
 #endif                
                 lu_device_get(cmm2lu_dev(cm));
+
+                fld_client_add_export(&cm->cmm_fld,
+                                      mc->mc_desc.cl_exp);
         }
         RETURN(rc);
 }
@@ -233,6 +236,12 @@ static int cmm_device_init(const struct lu_context *ctx,
         m->cmm_tgt_count = 0;
         m->cmm_child = lu2md_dev(next);
 
+        err = fld_client_init(&m->cmm_fld, LUSTRE_CLI_FLD_HASH_RRB);
+        if (err) {
+                CERROR("can't init FLD, err %d\n",
+                       err);
+        }
+
         RETURN(err);
 }
 
@@ -243,6 +252,8 @@ static struct lu_device *cmm_device_fini(const struct lu_context *ctx,
         struct mdc_device *mc, *tmp;
         ENTRY;
 
+        fld_client_fini(&cm->cmm_fld);
+        
         /* finish all mdc devices */
         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
                 struct lu_device *ld_m = mdc2lu_dev(mc);
index 2e19e5a..b85901b 100644 (file)
 #if defined(__KERNEL__)
 
 #include <obd.h>
+#include <lustre_fld.h>
 #include <md_object.h>
 
 #ifdef CMM_CODE
 struct cmm_device {
-        struct md_device cmm_md_dev;
+        struct md_device      cmm_md_dev;
         /* underlaying device in MDS stack, usually MDD */
-        struct md_device *cmm_child;
+        struct md_device     *cmm_child;
         /* other MD servers in cluster */
-        __u32            cmm_local_num;
-        __u32            cmm_tgt_count;
-        struct list_head cmm_targets;
-        spinlock_t       cmm_tgt_guard;
+        __u32                 cmm_local_num;
+        __u32                 cmm_tgt_count;
+        struct list_head      cmm_targets;
+        spinlock_t            cmm_tgt_guard;
+
+        /* client FLD interface */
+        struct lu_client_fld  cmm_fld;
 };
 
 static inline struct md_device_operations *cmm_child_ops(struct cmm_device *d)
@@ -112,13 +116,15 @@ struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
 #else
 
 struct cmm_device {
-        struct md_device cmm_md_dev;
+        struct md_device      cmm_md_dev;
         /* underlaying device in MDS stack, usually MDD */
-        struct md_device *cmm_child;
+        struct md_device     *cmm_child;
         /* other MD servers in cluster */
-        __u32            cmm_local_num;
-        __u32            cmm_tgt_count;
-        struct list_head cmm_targets;
+        __u32                 cmm_local_num;
+        __u32                 cmm_tgt_count;
+        struct list_head      cmm_targets;
+        /* client FLD interface */
+        struct lu_client_fld  cmm_fld;
 };
 
 static inline struct md_device_operations *cmm_child_ops(struct cmm_device *d)
index 07d608f..09bea48 100644 (file)
 #include "mdc_internal.h"
 
 #ifdef CMM_CODE
-static int cmm_fld_lookup(const struct lu_fid *fid)
+static int cmm_fld_lookup(struct cmm_device *cm,
+                          const struct lu_fid *fid)
 {
+        __u64 mds;
         int rc;
-        /* temporary hack for proto mkdir */
-        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
-        CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
-        RETURN(rc);
+        ENTRY;
+        
+        LASSERT(fid_is_sane(fid));
+        rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), &mds);
+        if (rc) {
+                CERROR("can't find mds by seq "LPU64", rc %d\n",
+                       fid_seq(fid), rc);
+                RETURN(rc);
+        }
+        CWARN("CMM: got MDS "LPU64" for sequence: "LPU64"\n",
+              mds, fid_seq(fid));
+        RETURN((int)mds);
 }
 
 static struct md_object_operations cml_mo_ops;
@@ -64,7 +74,7 @@ struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
         ENTRY;
 
         /* get object location */
-        mdsnum = cmm_fld_lookup(fid);
+        mdsnum = cmm_fld_lookup(lu2cmm_dev(ld), fid);
 
         /* select the proper set of operations based on object location */
         if (mdsnum == lu2cmm_dev(ld)->cmm_local_num) {
@@ -617,16 +627,26 @@ static struct md_object_operations cmm_mo_ops;
 static struct md_dir_operations    cmm_dir_ops;
 static struct lu_object_operations cmm_obj_ops;
 
-static int cmm_fld_lookup(const struct lu_fid *fid)
+static int cmm_fld_lookup(struct cmm_device *cm,
+                          const struct lu_fid *fid)
 {
+        __u64 mds;
         int rc;
-        /* temporary hack for proto mkdir */
-        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
-        CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
-        RETURN(rc);
+        ENTRY;
+        
+        LASSERT(fid_is_sane(fid));
+        rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), &mds);
+        if (rc) {
+                CERROR("can't find mds by seq "LPU64", rc %d\n",
+                       fid_seq(fid), rc);
+                RETURN(rc);
+        }
+        CWARN("CMM: got MDS "LPU64" for sequence: "LPU64"\n",
+              mds, fid_seq(fid));
+        RETURN((int)mds);
 }
 
-/* get child device by mdsnum*/
+/* get child device by mdsnum */
 static struct lu_device *cmm_get_child(struct cmm_device *d, __u32 num)
 {
         struct lu_device *next = NULL;
@@ -684,7 +704,7 @@ static int cmm_object_init(const struct lu_context *ctx, struct lu_object *lo)
         ENTRY;
 
         /* under device can be MDD or MDC */
-        mdsnum = cmm_fld_lookup(fid);
+        mdsnum = cmm_fld_lookup(cd, fid);
         c_dev = cmm_get_child(cd, mdsnum);
         if (c_dev == NULL) {
                 rc = -ENOENT;
index a2fdad0..d7f357e 100644 (file)
@@ -197,6 +197,7 @@ seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
                         
                         *fid = seq->seq_fid;
                         seq->seq_fid.f_oid += 1;
+                        rc = -ERESTART;
                 }
         }
         LASSERT(fid_is_sane(fid));
index 325cb54..52f550f 100644 (file)
@@ -62,14 +62,14 @@ enum {
         FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
 };
 
-static __u32 fld_cache_hash(__u64 lu_seq)
+static __u32 fld_cache_hash(__u64 seq)
 {
-        return lu_seq;
+        return seq;
 }
 
 static int
 fld_cache_insert(struct fld_cache_info *fld_cache,
-                 __u64 lu_seq, __u64 mds_num)
+                 __u64 seq, __u64 mds)
 {
         struct fld_cache *fld;
         struct hlist_head *bucket;
@@ -77,7 +77,7 @@ fld_cache_insert(struct fld_cache_info *fld_cache,
         int rc = 0;
         ENTRY;
 
-        bucket = fld_cache->fld_hash + (fld_cache_hash(lu_seq) &
+        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
                                         fld_cache->fld_hash_mask);
 
         OBD_ALLOC_PTR(fld);
@@ -85,12 +85,12 @@ fld_cache_insert(struct fld_cache_info *fld_cache,
                 RETURN(-ENOMEM);
 
         INIT_HLIST_NODE(&fld->fld_list);
-        fld->fld_mds = mds_num;
-        fld->fld_seq = lu_seq;
+        fld->fld_mds = mds;
+        fld->fld_seq = seq;
 
         spin_lock(&fld_cache->fld_lock);
         hlist_for_each_entry(fld, scan, bucket, fld_list) {
-                if (fld->fld_seq == lu_seq) {
+                if (fld->fld_seq == seq) {
                         spin_unlock(&fld_cache->fld_lock);
                         GOTO(exit, rc = -EEXIST);
                 }
@@ -104,19 +104,19 @@ exit:
 }
 
 static struct fld_cache *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 lu_seq)
+fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
 {
         struct hlist_head *bucket;
         struct hlist_node *scan;
         struct fld_cache *fld;
         ENTRY;
 
-        bucket = fld_cache->fld_hash + (fld_cache_hash(lu_seq) &
+        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
                                         fld_cache->fld_hash_mask);
 
         spin_lock(&fld_cache->fld_lock);
         hlist_for_each_entry(fld, scan, bucket, fld_list) {
-                if (fld->fld_seq == lu_seq) {
+                if (fld->fld_seq == seq) {
                         spin_unlock(&fld_cache->fld_lock);
                         RETURN(fld);
                 }
@@ -127,25 +127,26 @@ fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 lu_seq)
 }
 
 static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 lu_seq)
+fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
 {
         struct hlist_head *bucket;
         struct hlist_node *scan;
         struct fld_cache *fld;
         ENTRY;
 
-        bucket = fld_cache->fld_hash + (fld_cache_hash(lu_seq) &
+        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
                                         fld_cache->fld_hash_mask);
 
         spin_lock(&fld_cache->fld_lock);
         hlist_for_each_entry(fld, scan, bucket, fld_list) {
-                if (fld->fld_seq == lu_seq) {
+                if (fld->fld_seq == seq) {
                         hlist_del_init(&fld->fld_list);
-                        spin_unlock(&fld_cache->fld_lock);
-                        EXIT;
-                        return;
+                        GOTO(out_unlock, 0);
                 }
         }
+
+        EXIT;
+out_unlock:
         spin_unlock(&fld_cache->fld_lock);
         return;
 }
@@ -162,7 +163,7 @@ static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
         return do_div(seq, fld->fld_count);
 }
 
-static struct lu_fld_hash fld_hash[2] = {
+static struct lu_fld_hash fld_hash[3] = {
         {
                 .fh_name = "DHT",
                 .fh_func = fld_dht_hash
@@ -170,7 +171,10 @@ static struct lu_fld_hash fld_hash[2] = {
         {
                 .fh_name = "Round Robin",
                 .fh_func = fld_rrb_hash
-        }        
+        },
+        {
+                0,
+        }
 };
 
 static struct obd_export *
@@ -183,7 +187,7 @@ fld_client_get_exp(struct lu_client_fld *fld, __u64 seq)
         hash = fld->fld_hash->fh_func(fld, seq);
 
         spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp, &fld->fld_exports, exp_obd_chain) {
+        list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
                 if (count == hash)
                         break;
                 count++;
@@ -201,8 +205,10 @@ int fld_client_add_export(struct lu_client_fld *fld,
         struct obd_export *fld_exp;
         ENTRY;
 
+        LASSERT(exp != NULL);
+
         spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp, &fld->fld_exports, exp_obd_chain) {
+        list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
                                     &exp->exp_client_uuid))
                 {
@@ -212,7 +218,7 @@ int fld_client_add_export(struct lu_client_fld *fld,
         }
         
         fld_exp = class_export_get(exp);
-        list_add_tail(&exp->exp_obd_chain,
+        list_add_tail(&exp->exp_fld_chain,
                       &fld->fld_exports);
         fld->fld_count++;
         
@@ -220,6 +226,7 @@ int fld_client_add_export(struct lu_client_fld *fld,
         
         RETURN(0);
 }
+EXPORT_SYMBOL(fld_client_add_export);
 
 /* remove export from FLD */
 int fld_client_del_export(struct lu_client_fld *fld,
@@ -230,12 +237,12 @@ int fld_client_del_export(struct lu_client_fld *fld,
         ENTRY;
 
         spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_obd_chain) {
+        list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
                                     &exp->exp_client_uuid))
                 {
                         fld->fld_count--;
-                        list_del(&fld_exp->exp_obd_chain);
+                        list_del(&fld_exp->exp_fld_chain);
                         class_export_get(fld_exp);
 
                         spin_unlock(&fld->fld_lock);
@@ -246,6 +253,7 @@ int fld_client_del_export(struct lu_client_fld *fld,
         
         RETURN(-ENOENT);
 }
+EXPORT_SYMBOL(fld_client_del_export);
 
 int fld_client_init(struct lu_client_fld *fld, int hash)
 {
@@ -268,6 +276,7 @@ int fld_client_init(struct lu_client_fld *fld, int hash)
                fld->fld_hash->fh_name);
         RETURN(rc);
 }
+EXPORT_SYMBOL(fld_client_init);
 
 void fld_client_fini(struct lu_client_fld *fld)
 {
@@ -276,15 +285,17 @@ void fld_client_fini(struct lu_client_fld *fld)
         ENTRY;
 
         spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_obd_chain) {
+        list_for_each_entry_safe(fld_exp, tmp,
+                                 &fld->fld_exports, exp_fld_chain) {
                 fld->fld_count--;
-                list_del(&fld_exp->exp_obd_chain);
+                list_del(&fld_exp->exp_fld_chain);
                 class_export_get(fld_exp);
         }
         spin_unlock(&fld->fld_lock);
         CDEBUG(D_INFO, "Client FLD finalized\n");
         EXIT;
 }
+EXPORT_SYMBOL(fld_client_fini);
 
 static int
 fld_client_rpc(struct obd_export *exp,
@@ -325,8 +336,9 @@ out_req:
 
 int
 fld_client_create(struct lu_client_fld *fld,
-                  __u64 seq, __u64 mds_num)
+                  __u64 seq, __u64 mds)
 {
+#if 0
         struct obd_export *fld_exp;
         struct md_fld      md_fld;
         __u32 rc;
@@ -337,20 +349,23 @@ fld_client_create(struct lu_client_fld *fld,
                 RETURN(-EINVAL);
         
         md_fld.mf_seq = seq;
-        md_fld.mf_mds = mds_num;
+        md_fld.mf_mds = mds;
 
         rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
         
 #ifdef __KERNEL__
-        fld_cache_insert(fld_cache, seq, mds_num);
+        fld_cache_insert(fld_cache, seq, mds);
 #endif
         
         RETURN(rc);
+#endif
+        return 0;
 }
+EXPORT_SYMBOL(fld_client_create);
 
 int
 fld_client_delete(struct lu_client_fld *fld,
-                  __u64 seq, __u64 mds_num)
+                  __u64 seq, __u64 mds)
 {
         struct obd_export *fld_exp;
         struct md_fld      md_fld;
@@ -365,30 +380,31 @@ fld_client_delete(struct lu_client_fld *fld,
                 RETURN(-EINVAL);
 
         md_fld.mf_seq = seq;
-        md_fld.mf_mds = mds_num;
+        md_fld.mf_mds = mds;
 
         rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
         RETURN(rc);
 }
+EXPORT_SYMBOL(fld_client_delete);
 
 int
 fld_client_get(struct lu_client_fld *fld,
-               __u64 lu_seq, __u64 *mds_num)
+               __u64 seq, __u64 *mds)
 {
         struct obd_export *fld_exp;
         struct md_fld      md_fld;
         int    vallen, rc;
 
-        fld_exp = fld_client_get_exp(fld, lu_seq);
+        fld_exp = fld_client_get_exp(fld, seq);
         if (!fld_exp);
                 RETURN(-EINVAL);
 
-        md_fld.mf_seq = lu_seq;
+        md_fld.mf_seq = seq;
         vallen = sizeof(struct md_fld);
 
         rc = fld_client_rpc(fld_exp, &md_fld, FLD_GET);
         if (rc == 0)
-                *mds_num = md_fld.mf_mds;
+                *mds = md_fld.mf_mds;
 
         RETURN(rc);
 }
@@ -396,32 +412,39 @@ fld_client_get(struct lu_client_fld *fld,
 /* lookup fid in the namespace of pfid according to the name */
 int
 fld_client_lookup(struct lu_client_fld *fld,
-                  __u64 lu_seq, __u64 *mds_num)
+                  __u64 seq, __u64 *mds)
 {
+#if 0
+#ifdef __KERNEL__
         struct fld_cache *fld_entry;
+#endif
         int rc;
         ENTRY;
 
 #ifdef __KERNEL__
         /* lookup it in the cache */
-        fld_entry = fld_cache_lookup(fld_cache, lu_seq);
+        fld_entry = fld_cache_lookup(fld_cache, seq);
         if (fld_entry != NULL) {
-                *mds_num = fld_entry->fld_mds;
+                *mds = fld_entry->fld_mds;
                 RETURN(0);
         }
 #endif
         
         /* can not find it in the cache */
-        rc = fld_client_get(fld, lu_seq, mds_num);
+        rc = fld_client_get(fld, seq, mds);
         if (rc)
                 RETURN(rc);
 
 #ifdef __KERNEL__
-        rc = fld_cache_insert(fld_cache, lu_seq, *mds_num);
+        rc = fld_cache_insert(fld_cache, seq, *mds);
 #endif
         
         RETURN(rc);
+#endif
+        *mds = 0;
+        return 0;
 }
+EXPORT_SYMBOL(fld_client_lookup);
 
 #ifdef __KERNEL__
 static int fld_init(void)
@@ -438,6 +461,9 @@ static int fld_init(void)
                   sizeof fld_cache->fld_hash[0]);
         spin_lock_init(&fld_cache->fld_lock);
 
+        CDEBUG(D_INFO, "Client FLD, cache size %d\n",
+               FLD_HTABLE_SIZE);
+        
         RETURN(0);
 }
 
index 2248046..6c62fa4 100644 (file)
@@ -102,7 +102,7 @@ static struct lu_context_key fld_thread_key = {
 };
 
 static struct dt_key *fld_key(const struct lu_context *ctx,
-                              const fidseq_t seq_num)
+                              const fidseq_t seq)
 {
         struct fld_thread_info *info;
         ENTRY;
@@ -110,12 +110,12 @@ static struct dt_key *fld_key(const struct lu_context *ctx,
         info = lu_context_key_get(ctx, &fld_thread_key);
         LASSERT(info != NULL);
 
-        info->fti_key = cpu_to_be64(seq_num);
+        info->fti_key = cpu_to_be64(seq);
         RETURN((void *)&info->fti_key);
 }
 
 static struct dt_rec *fld_rec(const struct lu_context *ctx,
-                              const mdsno_t mds_num)
+                              const mdsno_t mds)
 {
         struct fld_thread_info *info;
         ENTRY;
@@ -123,13 +123,13 @@ static struct dt_rec *fld_rec(const struct lu_context *ctx,
         info = lu_context_key_get(ctx, &fld_thread_key);
         LASSERT(info != NULL);
 
-        info->fti_rec = cpu_to_be64(mds_num);
+        info->fti_rec = cpu_to_be64(mds);
         RETURN((void *)&info->fti_rec);
 }
 
 int fld_handle_insert(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      fidseq_t seq_num, mdsno_t mds_num)
+                      fidseq_t seq, mdsno_t mds)
 {
         struct dt_device *dt = fld->fld_dt;
         struct dt_object *dt_obj = fld->fld_obj;
@@ -144,8 +144,8 @@ int fld_handle_insert(struct lu_server_fld *fld,
         th = dt->dd_ops->dt_trans_start(ctx, dt, &txn);
 
         rc = dt_obj->do_index_ops->dio_insert(ctx, dt_obj,
-                                              fld_rec(ctx, mds_num),
-                                              fld_key(ctx, seq_num), th);
+                                              fld_rec(ctx, mds),
+                                              fld_key(ctx, seq), th);
         dt->dd_ops->dt_trans_stop(ctx, th);
 
         RETURN(rc);
@@ -153,7 +153,7 @@ int fld_handle_insert(struct lu_server_fld *fld,
 
 int fld_handle_delete(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      fidseq_t seq_num)
+                      fidseq_t seq)
 {
         struct dt_device *dt = fld->fld_dt;
         struct dt_object *dt_obj = fld->fld_obj;
@@ -165,7 +165,7 @@ int fld_handle_delete(struct lu_server_fld *fld,
         txn.tp_credits = FLD_TXN_INDEX_DELETE_CREDITS;
         th = dt->dd_ops->dt_trans_start(ctx, dt, &txn);
         rc = dt_obj->do_index_ops->dio_delete(ctx, dt_obj,
-                                              fld_key(ctx, seq_num), th);
+                                              fld_key(ctx, seq), th);
         dt->dd_ops->dt_trans_stop(ctx, th);
 
         RETURN(rc);
@@ -173,7 +173,7 @@ int fld_handle_delete(struct lu_server_fld *fld,
 
 int fld_handle_lookup(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      fidseq_t seq_num, mdsno_t *mds_num)
+                      fidseq_t seq, mdsno_t *mds)
 {
         struct dt_object *dt_obj = fld->fld_obj;
         struct dt_rec    *rec = fld_rec(ctx, 0);
@@ -181,9 +181,9 @@ int fld_handle_lookup(struct lu_server_fld *fld,
         ENTRY;
 
         rc = dt_obj->do_index_ops->dio_lookup(ctx, dt_obj, rec,
-                                              fld_key(ctx, seq_num));
+                                              fld_key(ctx, seq));
         if (rc == 0)
-                *mds_num = be64_to_cpu(*(__u64 *)rec);
+                *mds = be64_to_cpu(*(__u64 *)rec);
         RETURN(rc);
 }
 
index aa30b85..6c414f9 100644 (file)
@@ -68,15 +68,15 @@ enum fld_op {
 
 int fld_handle_insert(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      fidseq_t seq_num, mdsno_t mdsno);
+                      fidseq_t seq, mdsno_t mds);
 
 int fld_handle_delete(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      fidseq_t seq_num);
+                      fidseq_t seq);
 
 int fld_handle_lookup(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      fidseq_t seq_num, mdsno_t *mds);
+                      fidseq_t seq, mdsno_t *mds);
 
 int fld_iam_init(struct lu_server_fld *fld,
                  const struct lu_context *ctx);
index f6e3f36..1f80d42 100644 (file)
@@ -56,6 +56,7 @@ struct obd_export {
         struct portals_handle     exp_handle;
         atomic_t                  exp_refcount;
         struct obd_uuid           exp_client_uuid;
+        struct list_head          exp_fld_chain;
         struct list_head          exp_obd_chain;
         /* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
         struct list_head          exp_obd_chain_timed;
index 7be9143..b9a8f06 100644 (file)
@@ -80,15 +80,15 @@ int fld_client_del_export(struct lu_client_fld *fld,
                           struct obd_export *exp);
 
 int fld_client_create(struct lu_client_fld *fld,
-                      __u64 seq, __u64 mds_num);
+                      __u64 seq, __u64 mds);
 
 int fld_client_delete(struct lu_client_fld *fld,
-                      __u64 seq, __u64 mds_num);
+                      __u64 seq, __u64 mds);
 
 int fld_client_get(struct lu_client_fld *fld,
-                   __u64 lu_seq, __u64 *mds_num);
+                   __u64 seq, __u64 *mds);
 
 int fld_client_lookup(struct lu_client_fld *fld,
-                      __u64 lu_seq, __u64 *mds_num);
+                      __u64 seq, __u64 *mds);
 
 #endif
index 6355d46..ddfa9d8 100644 (file)
@@ -31,6 +31,7 @@
 #include <lustre_lib.h>
 #include <lustre_export.h>
 #include <lustre_quota.h>
+#include <lustre_fld.h>
 #include <lu_object.h>
 
 /* this is really local to the OSC */
@@ -506,6 +507,7 @@ struct lmv_tgt_desc {
 
 struct lmv_obd {
         int                     refcount;
+        struct lu_client_fld    lmv_fld;
         spinlock_t              lmv_lock;
         struct lmv_desc         desc;
         struct obd_uuid         cluuid;
index 6810f09..a4e909b 100644 (file)
 #include <lprocfs_status.h>
 #include "lmv_internal.h"
 
-/* dummy function for a while */
-int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid)
+int lmv_fld_lookup(struct obd_device *obd, const struct lu_fid *fid)
 {
+        struct lmv_obd *lmv = &obd->u.lmv;
+        __u64 mds;
         int rc;
         ENTRY;
 
         LASSERT(fid_is_sane(fid));
-
-        /* temporary hack until fld will works */
-        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
-        CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
-        RETURN(rc);
+        rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), &mds);
+        if (rc) {
+                CERROR("can't find mds by seq "LPU64", rc %d\n",
+                       fid_seq(fid), rc);
+                RETURN(rc);
+        }
+        CWARN("LMV: got MDS "LPU64" for sequence: "LPU64"\n",
+              mds, fid_seq(fid));
+        RETURN((int)mds);
 }
index 094ae72..9ba41e8 100644 (file)
@@ -127,7 +127,7 @@ int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **,
 int lmv_handle_split(struct obd_export *, struct lu_fid *);
 int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                     void *, int);
-int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid);
+int lmv_fld_lookup(struct obd_device *obd, const struct lu_fid *fid);
 
 static inline struct lmv_stripe_md * 
 lmv_get_mea(struct ptlrpc_request *req, int offset)
index dd04eca..b612a93 100644 (file)
@@ -337,6 +337,8 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         }
 
         mdc_exp = class_conn2export(&conn);
+        fld_client_add_export(&lmv->lmv_fld, mdc_exp);
+
         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
 
         rc = obd_register_observer(mdc_obd, obd);
@@ -725,6 +727,18 @@ static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
 
         /* asking underlaying tgt layer to allocate new fid */
         rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, hint);
+
+        /* client switches to new sequence, setup fld */
+        if (rc == -ERESTART) {
+                rc = fld_client_create(&lmv->lmv_fld,
+                                       fid_seq(fid),
+                                       mds);
+                if (rc) {
+                        CERROR("can't create fld entry, "
+                               "rc %d\n", rc);
+                }
+        }
+
         RETURN(rc);
 }
 
@@ -790,7 +804,6 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 CERROR("Can't setup LMV object manager, "
                        "error %d.\n", rc);
                 GOTO(out_free_datas, rc);
-                RETURN(rc);
         }
 
         lprocfs_init_vars(lmv, &lvars);
@@ -807,6 +820,13 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 }
        }
 #endif
+        rc = fld_client_init(&lmv->lmv_fld, LUSTRE_CLI_FLD_HASH_RRB);
+        if (rc) {
+                CERROR("can't init FLD, err %d\n",
+                       rc);
+                GOTO(out_free_datas, rc);
+        }
+
         RETURN(0);
 
 out_free_datas:        
@@ -825,6 +845,7 @@ static int lmv_cleanup(struct obd_device *obd)
 
         lprocfs_obd_cleanup(obd);
         lmv_mgr_cleanup(obd);
+        fld_client_fini(&lmv->lmv_fld);
         OBD_FREE(lmv->datas, lmv->datas_size);
         OBD_FREE(lmv->tgts, lmv->tgts_size);
         
index 74af075..37abf0f 100644 (file)
@@ -1508,6 +1508,7 @@ static int mdt_seq_fini(const struct lu_context *ctx,
         RETURN(0);
 }
 
+/* XXX: this is ugly, should be something else */
 static int mdt_controller_init(struct mdt_device *m,
                                struct lustre_cfg *cfg)
 {