#include "fld_internal.h"
#ifdef __KERNEL__
-/* XXX: maybe these 2 items should go to sbi */
struct fld_cache_info *fld_cache = NULL;
enum {
spin_lock(&fld_cache->fld_lock);
hlist_for_each_entry(fld, scan, bucket, fld_list) {
- if (fld->fld_seq == seq) {
- spin_unlock(&fld_cache->fld_lock);
- GOTO(exit, rc = -EEXIST);
- }
+ if (fld->fld_seq == seq)
+ GOTO(exit_unlock, rc = -EEXIST);
}
hlist_add_head(&fld->fld_list, bucket);
+ EXIT;
+exit_unlock:
spin_unlock(&fld_cache->fld_lock);
-exit:
if (rc != 0)
OBD_FREE(fld, sizeof(*fld));
- RETURN(rc);
+ return rc;
}
static struct fld_cache *
static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
{
+ if (fld->fld_count == 0)
+ return 0;
+
return do_div(seq, fld->fld_count);
}
static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
{
- CWARN("using Round Robin hash func for while\n");
- return do_div(seq, fld->fld_count);
+ /* XXX: here should DHT hash */
+ return fld_rrb_hash(fld, seq);
}
static struct lu_fld_hash fld_hash[3] = {
};
static struct obd_export *
-fld_client_get_exp(struct lu_client_fld *fld, __u64 seq)
+fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
{
struct obd_export *fld_exp;
int count = 0, hash;
ENTRY;
+ LASSERT(fld->fld_hash != NULL);
hash = fld->fld_hash->fh_func(fld, seq);
spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
- if (count == hash)
- break;
+ list_for_each_entry(fld_exp,
+ &fld->fld_exports, exp_fld_chain) {
+ if (count == hash) {
+ spin_unlock(&fld->fld_lock);
+ RETURN(fld_exp);
+ }
count++;
}
spin_unlock(&fld->fld_lock);
-
- RETURN(fld_exp);
+ RETURN(NULL);
}
/* add export to FLD. This is usually done by CMM and LMV as they are main users
}
fld_exp = class_export_get(exp);
- list_add_tail(&exp->exp_fld_chain,
+ list_add_tail(&fld_exp->exp_fld_chain,
&fld->fld_exports);
fld->fld_count++;
}
}
spin_unlock(&fld->fld_lock);
-
RETURN(-ENOENT);
}
EXPORT_SYMBOL(fld_client_del_export);
fld_client_rpc(struct obd_export *exp,
struct md_fld *mf, __u32 fld_op)
{
+ int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
+ int mf_size = sizeof(struct md_fld);
struct ptlrpc_request *req;
struct md_fld *pmf;
- int mf_size = sizeof(*mf);
__u32 *op;
- int size[2] = {sizeof(*op), mf_size}, rc;
ENTRY;
+ LASSERT(exp != NULL);
+
req = ptlrpc_prep_req(class_exp2cliimp(exp),
LUSTRE_MDS_VERSION, FLD_QUERY,
2, size, NULL);
fld_client_create(struct lu_client_fld *fld,
__u64 seq, __u64 mds)
{
-#if 0
struct obd_export *fld_exp;
struct md_fld md_fld;
__u32 rc;
ENTRY;
- fld_exp = fld_client_get_exp(fld, seq);
+ fld_exp = fld_client_get_export(fld, seq);
if (!fld_exp)
RETURN(-EINVAL);
-
md_fld.mf_seq = seq;
md_fld.mf_mds = mds;
-
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
+ rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
#ifdef __KERNEL__
fld_cache_insert(fld_cache, seq, mds);
#endif
RETURN(rc);
-#endif
- return 0;
}
EXPORT_SYMBOL(fld_client_create);
fld_cache_delete(fld_cache, seq);
#endif
- fld_exp = fld_client_get_exp(fld, seq);
+ fld_exp = fld_client_get_export(fld, seq);
if (!fld_exp)
RETURN(-EINVAL);
}
EXPORT_SYMBOL(fld_client_delete);
-int
+static int
fld_client_get(struct lu_client_fld *fld,
__u64 seq, __u64 *mds)
{
struct obd_export *fld_exp;
- struct md_fld md_fld;
- int vallen, rc;
-
- fld_exp = fld_client_get_exp(fld, seq);
- if (!fld_exp);
- RETURN(-EINVAL);
-
- md_fld.mf_seq = seq;
- vallen = sizeof(struct md_fld);
+ struct md_fld md_fld;
+ int rc;
+ ENTRY;
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_GET);
- if (rc == 0)
- *mds = md_fld.mf_mds;
+ fld_exp = fld_client_get_export(fld, seq);
+ if (!fld_exp) {
+ /* XXX: hack, should be fixed later */
+ rc = 0;
+ *mds = 0;
+ } else {
+ md_fld.mf_seq = seq;
+ rc = fld_client_rpc(fld_exp,
+ &md_fld, FLD_LOOKUP);
+ if (rc == 0)
+ *mds = md_fld.mf_mds;
+ }
RETURN(rc);
}
fld_client_lookup(struct lu_client_fld *fld,
__u64 seq, __u64 *mds)
{
-#if 0
#ifdef __KERNEL__
struct fld_cache *fld_entry;
#endif
#endif
RETURN(rc);
-#endif
- *mds = 0;
- return 0;
}
EXPORT_SYMBOL(fld_client_lookup);
#ifdef __KERNEL__
static int fld_init(void)
{
+ int i;
ENTRY;
OBD_ALLOC_PTR(fld_cache);
if (fld_cache == NULL)
RETURN(-ENOMEM);
+ spin_lock_init(&fld_cache->fld_lock);
+
/* init fld cache info */
fld_cache->fld_hash_mask = FLD_HTABLE_MASK;
OBD_ALLOC(fld_cache->fld_hash, FLD_HTABLE_SIZE *
- sizeof fld_cache->fld_hash[0]);
- spin_lock_init(&fld_cache->fld_lock);
+ sizeof(*fld_cache->fld_hash));
+ if (fld_cache->fld_hash == NULL) {
+ OBD_FREE_PTR(fld_cache);
+ RETURN(-ENOMEM);
+ }
+
+ for (i = 0; i < FLD_HTABLE_SIZE; i++)
+ INIT_HLIST_HEAD(&fld_cache->fld_hash[i]);
CDEBUG(D_INFO, "Client FLD, cache size %d\n",
FLD_HTABLE_SIZE);
static int fld_fini(void)
{
+ ENTRY;
if (fld_cache != NULL) {
OBD_FREE(fld_cache->fld_hash, FLD_HTABLE_SIZE *
- sizeof fld_cache->fld_hash[0]);
+ sizeof(*fld_cache->fld_hash));
OBD_FREE_PTR(fld_cache);
}
- return 0;
+ RETURN(0);
}
static int __init fld_mod_init(void)
case FLD_DELETE:
rc = fld_handle_delete(fld, ctx, mf->mf_seq);
break;
- case FLD_GET:
+ case FLD_LOOKUP:
rc = fld_handle_lookup(fld, ctx, mf->mf_seq, &mf->mf_mds);
break;
default: