/* --- cmm_lu_operations --- */
/* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
static int cmm_add_mdc(const struct lu_context *ctx,
- struct cmm_device * cm, struct lustre_cfg *cfg)
+ struct cmm_device *cm, struct lustre_cfg *cfg)
{
struct lu_device_type *ldt = &mdc_device_type;
struct lu_device *ld;
spin_unlock(&cm->cmm_tgt_guard);
#endif
lu_device_get(cmm2lu_dev(cm));
+
+ fld_client_add_export(&cm->cmm_fld,
+ mc->mc_desc.cl_exp);
}
RETURN(rc);
}
m->cmm_tgt_count = 0;
m->cmm_child = lu2md_dev(next);
+ err = fld_client_init(&m->cmm_fld, LUSTRE_CLI_FLD_HASH_RRB);
+ if (err) {
+ CERROR("can't init FLD, err %d\n",
+ err);
+ }
+
RETURN(err);
}
struct mdc_device *mc, *tmp;
ENTRY;
+ fld_client_fini(&cm->cmm_fld);
+
/* finish all mdc devices */
list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
struct lu_device *ld_m = mdc2lu_dev(mc);
#if defined(__KERNEL__)
#include <obd.h>
+#include <lustre_fld.h>
#include <md_object.h>
#ifdef CMM_CODE
struct cmm_device {
- struct md_device cmm_md_dev;
+ struct md_device cmm_md_dev;
/* underlaying device in MDS stack, usually MDD */
- struct md_device *cmm_child;
+ struct md_device *cmm_child;
/* other MD servers in cluster */
- __u32 cmm_local_num;
- __u32 cmm_tgt_count;
- struct list_head cmm_targets;
- spinlock_t cmm_tgt_guard;
+ __u32 cmm_local_num;
+ __u32 cmm_tgt_count;
+ struct list_head cmm_targets;
+ spinlock_t cmm_tgt_guard;
+
+ /* client FLD interface */
+ struct lu_client_fld cmm_fld;
};
static inline struct md_device_operations *cmm_child_ops(struct cmm_device *d)
#else
struct cmm_device {
- struct md_device cmm_md_dev;
+ struct md_device cmm_md_dev;
/* underlaying device in MDS stack, usually MDD */
- struct md_device *cmm_child;
+ struct md_device *cmm_child;
/* other MD servers in cluster */
- __u32 cmm_local_num;
- __u32 cmm_tgt_count;
- struct list_head cmm_targets;
+ __u32 cmm_local_num;
+ __u32 cmm_tgt_count;
+ struct list_head cmm_targets;
+ /* client FLD interface */
+ struct lu_client_fld cmm_fld;
};
static inline struct md_device_operations *cmm_child_ops(struct cmm_device *d)
#include "mdc_internal.h"
#ifdef CMM_CODE
-static int cmm_fld_lookup(const struct lu_fid *fid)
+static int cmm_fld_lookup(struct cmm_device *cm,
+ const struct lu_fid *fid)
{
+ __u64 mds;
int rc;
- /* temporary hack for proto mkdir */
- rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
- CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
- RETURN(rc);
+ ENTRY;
+
+ LASSERT(fid_is_sane(fid));
+ rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), &mds);
+ if (rc) {
+ CERROR("can't find mds by seq "LPU64", rc %d\n",
+ fid_seq(fid), rc);
+ RETURN(rc);
+ }
+ CWARN("CMM: got MDS "LPU64" for sequence: "LPU64"\n",
+ mds, fid_seq(fid));
+ RETURN((int)mds);
}
static struct md_object_operations cml_mo_ops;
ENTRY;
/* get object location */
- mdsnum = cmm_fld_lookup(fid);
+ mdsnum = cmm_fld_lookup(lu2cmm_dev(ld), fid);
/* select the proper set of operations based on object location */
if (mdsnum == lu2cmm_dev(ld)->cmm_local_num) {
static struct md_dir_operations cmm_dir_ops;
static struct lu_object_operations cmm_obj_ops;
-static int cmm_fld_lookup(const struct lu_fid *fid)
+static int cmm_fld_lookup(struct cmm_device *cm,
+ const struct lu_fid *fid)
{
+ __u64 mds;
int rc;
- /* temporary hack for proto mkdir */
- rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
- CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
- RETURN(rc);
+ ENTRY;
+
+ LASSERT(fid_is_sane(fid));
+ rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), &mds);
+ if (rc) {
+ CERROR("can't find mds by seq "LPU64", rc %d\n",
+ fid_seq(fid), rc);
+ RETURN(rc);
+ }
+ CWARN("CMM: got MDS "LPU64" for sequence: "LPU64"\n",
+ mds, fid_seq(fid));
+ RETURN((int)mds);
}
-/* get child device by mdsnum*/
+/* get child device by mdsnum */
static struct lu_device *cmm_get_child(struct cmm_device *d, __u32 num)
{
struct lu_device *next = NULL;
ENTRY;
/* under device can be MDD or MDC */
- mdsnum = cmm_fld_lookup(fid);
+ mdsnum = cmm_fld_lookup(cd, fid);
c_dev = cmm_get_child(cd, mdsnum);
if (c_dev == NULL) {
rc = -ENOENT;
*fid = seq->seq_fid;
seq->seq_fid.f_oid += 1;
+ rc = -ERESTART;
}
}
LASSERT(fid_is_sane(fid));
FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
};
-static __u32 fld_cache_hash(__u64 lu_seq)
+static __u32 fld_cache_hash(__u64 seq)
{
- return lu_seq;
+ return seq;
}
static int
fld_cache_insert(struct fld_cache_info *fld_cache,
- __u64 lu_seq, __u64 mds_num)
+ __u64 seq, __u64 mds)
{
struct fld_cache *fld;
struct hlist_head *bucket;
int rc = 0;
ENTRY;
- bucket = fld_cache->fld_hash + (fld_cache_hash(lu_seq) &
+ bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
fld_cache->fld_hash_mask);
OBD_ALLOC_PTR(fld);
RETURN(-ENOMEM);
INIT_HLIST_NODE(&fld->fld_list);
- fld->fld_mds = mds_num;
- fld->fld_seq = lu_seq;
+ fld->fld_mds = mds;
+ fld->fld_seq = seq;
spin_lock(&fld_cache->fld_lock);
hlist_for_each_entry(fld, scan, bucket, fld_list) {
- if (fld->fld_seq == lu_seq) {
+ if (fld->fld_seq == seq) {
spin_unlock(&fld_cache->fld_lock);
GOTO(exit, rc = -EEXIST);
}
}
static struct fld_cache *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 lu_seq)
+fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
{
struct hlist_head *bucket;
struct hlist_node *scan;
struct fld_cache *fld;
ENTRY;
- bucket = fld_cache->fld_hash + (fld_cache_hash(lu_seq) &
+ bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
fld_cache->fld_hash_mask);
spin_lock(&fld_cache->fld_lock);
hlist_for_each_entry(fld, scan, bucket, fld_list) {
- if (fld->fld_seq == lu_seq) {
+ if (fld->fld_seq == seq) {
spin_unlock(&fld_cache->fld_lock);
RETURN(fld);
}
}
static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 lu_seq)
+fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
{
struct hlist_head *bucket;
struct hlist_node *scan;
struct fld_cache *fld;
ENTRY;
- bucket = fld_cache->fld_hash + (fld_cache_hash(lu_seq) &
+ bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
fld_cache->fld_hash_mask);
spin_lock(&fld_cache->fld_lock);
hlist_for_each_entry(fld, scan, bucket, fld_list) {
- if (fld->fld_seq == lu_seq) {
+ if (fld->fld_seq == seq) {
hlist_del_init(&fld->fld_list);
- spin_unlock(&fld_cache->fld_lock);
- EXIT;
- return;
+ GOTO(out_unlock, 0);
}
}
+
+ EXIT;
+out_unlock:
spin_unlock(&fld_cache->fld_lock);
return;
}
return do_div(seq, fld->fld_count);
}
-static struct lu_fld_hash fld_hash[2] = {
+static struct lu_fld_hash fld_hash[3] = {
{
.fh_name = "DHT",
.fh_func = fld_dht_hash
{
.fh_name = "Round Robin",
.fh_func = fld_rrb_hash
- }
+ },
+ {
+ 0,
+ }
};
static struct obd_export *
hash = fld->fld_hash->fh_func(fld, seq);
spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp, &fld->fld_exports, exp_obd_chain) {
+ list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
if (count == hash)
break;
count++;
struct obd_export *fld_exp;
ENTRY;
+ LASSERT(exp != NULL);
+
spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp, &fld->fld_exports, exp_obd_chain) {
+ list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
if (obd_uuid_equals(&fld_exp->exp_client_uuid,
&exp->exp_client_uuid))
{
}
fld_exp = class_export_get(exp);
- list_add_tail(&exp->exp_obd_chain,
+ list_add_tail(&exp->exp_fld_chain,
&fld->fld_exports);
fld->fld_count++;
RETURN(0);
}
+EXPORT_SYMBOL(fld_client_add_export);
/* remove export from FLD */
int fld_client_del_export(struct lu_client_fld *fld,
ENTRY;
spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_obd_chain) {
+ list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
if (obd_uuid_equals(&fld_exp->exp_client_uuid,
&exp->exp_client_uuid))
{
fld->fld_count--;
- list_del(&fld_exp->exp_obd_chain);
+ list_del(&fld_exp->exp_fld_chain);
class_export_get(fld_exp);
spin_unlock(&fld->fld_lock);
RETURN(-ENOENT);
}
+EXPORT_SYMBOL(fld_client_del_export);
int fld_client_init(struct lu_client_fld *fld, int hash)
{
fld->fld_hash->fh_name);
RETURN(rc);
}
+EXPORT_SYMBOL(fld_client_init);
void fld_client_fini(struct lu_client_fld *fld)
{
ENTRY;
spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_obd_chain) {
+ list_for_each_entry_safe(fld_exp, tmp,
+ &fld->fld_exports, exp_fld_chain) {
fld->fld_count--;
- list_del(&fld_exp->exp_obd_chain);
+ list_del(&fld_exp->exp_fld_chain);
class_export_get(fld_exp);
}
spin_unlock(&fld->fld_lock);
CDEBUG(D_INFO, "Client FLD finalized\n");
EXIT;
}
+EXPORT_SYMBOL(fld_client_fini);
static int
fld_client_rpc(struct obd_export *exp,
int
fld_client_create(struct lu_client_fld *fld,
- __u64 seq, __u64 mds_num)
+ __u64 seq, __u64 mds)
{
+#if 0
struct obd_export *fld_exp;
struct md_fld md_fld;
__u32 rc;
RETURN(-EINVAL);
md_fld.mf_seq = seq;
- md_fld.mf_mds = mds_num;
+ md_fld.mf_mds = mds;
rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
#ifdef __KERNEL__
- fld_cache_insert(fld_cache, seq, mds_num);
+ fld_cache_insert(fld_cache, seq, mds);
#endif
RETURN(rc);
+#endif
+ return 0;
}
+EXPORT_SYMBOL(fld_client_create);
int
fld_client_delete(struct lu_client_fld *fld,
- __u64 seq, __u64 mds_num)
+ __u64 seq, __u64 mds)
{
struct obd_export *fld_exp;
struct md_fld md_fld;
RETURN(-EINVAL);
md_fld.mf_seq = seq;
- md_fld.mf_mds = mds_num;
+ md_fld.mf_mds = mds;
rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
RETURN(rc);
}
+EXPORT_SYMBOL(fld_client_delete);
int
fld_client_get(struct lu_client_fld *fld,
- __u64 lu_seq, __u64 *mds_num)
+ __u64 seq, __u64 *mds)
{
struct obd_export *fld_exp;
struct md_fld md_fld;
int vallen, rc;
- fld_exp = fld_client_get_exp(fld, lu_seq);
+ fld_exp = fld_client_get_exp(fld, seq);
if (!fld_exp);
RETURN(-EINVAL);
- md_fld.mf_seq = lu_seq;
+ md_fld.mf_seq = seq;
vallen = sizeof(struct md_fld);
rc = fld_client_rpc(fld_exp, &md_fld, FLD_GET);
if (rc == 0)
- *mds_num = md_fld.mf_mds;
+ *mds = md_fld.mf_mds;
RETURN(rc);
}
/* lookup fid in the namespace of pfid according to the name */
int
fld_client_lookup(struct lu_client_fld *fld,
- __u64 lu_seq, __u64 *mds_num)
+ __u64 seq, __u64 *mds)
{
+#if 0
+#ifdef __KERNEL__
struct fld_cache *fld_entry;
+#endif
int rc;
ENTRY;
#ifdef __KERNEL__
/* lookup it in the cache */
- fld_entry = fld_cache_lookup(fld_cache, lu_seq);
+ fld_entry = fld_cache_lookup(fld_cache, seq);
if (fld_entry != NULL) {
- *mds_num = fld_entry->fld_mds;
+ *mds = fld_entry->fld_mds;
RETURN(0);
}
#endif
/* can not find it in the cache */
- rc = fld_client_get(fld, lu_seq, mds_num);
+ rc = fld_client_get(fld, seq, mds);
if (rc)
RETURN(rc);
#ifdef __KERNEL__
- rc = fld_cache_insert(fld_cache, lu_seq, *mds_num);
+ rc = fld_cache_insert(fld_cache, seq, *mds);
#endif
RETURN(rc);
+#endif
+ *mds = 0;
+ return 0;
}
+EXPORT_SYMBOL(fld_client_lookup);
#ifdef __KERNEL__
static int fld_init(void)
sizeof fld_cache->fld_hash[0]);
spin_lock_init(&fld_cache->fld_lock);
+ CDEBUG(D_INFO, "Client FLD, cache size %d\n",
+ FLD_HTABLE_SIZE);
+
RETURN(0);
}
};
static struct dt_key *fld_key(const struct lu_context *ctx,
- const fidseq_t seq_num)
+ const fidseq_t seq)
{
struct fld_thread_info *info;
ENTRY;
info = lu_context_key_get(ctx, &fld_thread_key);
LASSERT(info != NULL);
- info->fti_key = cpu_to_be64(seq_num);
+ info->fti_key = cpu_to_be64(seq);
RETURN((void *)&info->fti_key);
}
static struct dt_rec *fld_rec(const struct lu_context *ctx,
- const mdsno_t mds_num)
+ const mdsno_t mds)
{
struct fld_thread_info *info;
ENTRY;
info = lu_context_key_get(ctx, &fld_thread_key);
LASSERT(info != NULL);
- info->fti_rec = cpu_to_be64(mds_num);
+ info->fti_rec = cpu_to_be64(mds);
RETURN((void *)&info->fti_rec);
}
int fld_handle_insert(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq_num, mdsno_t mds_num)
+ fidseq_t seq, mdsno_t mds)
{
struct dt_device *dt = fld->fld_dt;
struct dt_object *dt_obj = fld->fld_obj;
th = dt->dd_ops->dt_trans_start(ctx, dt, &txn);
rc = dt_obj->do_index_ops->dio_insert(ctx, dt_obj,
- fld_rec(ctx, mds_num),
- fld_key(ctx, seq_num), th);
+ fld_rec(ctx, mds),
+ fld_key(ctx, seq), th);
dt->dd_ops->dt_trans_stop(ctx, th);
RETURN(rc);
int fld_handle_delete(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq_num)
+ fidseq_t seq)
{
struct dt_device *dt = fld->fld_dt;
struct dt_object *dt_obj = fld->fld_obj;
txn.tp_credits = FLD_TXN_INDEX_DELETE_CREDITS;
th = dt->dd_ops->dt_trans_start(ctx, dt, &txn);
rc = dt_obj->do_index_ops->dio_delete(ctx, dt_obj,
- fld_key(ctx, seq_num), th);
+ fld_key(ctx, seq), th);
dt->dd_ops->dt_trans_stop(ctx, th);
RETURN(rc);
int fld_handle_lookup(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq_num, mdsno_t *mds_num)
+ fidseq_t seq, mdsno_t *mds)
{
struct dt_object *dt_obj = fld->fld_obj;
struct dt_rec *rec = fld_rec(ctx, 0);
ENTRY;
rc = dt_obj->do_index_ops->dio_lookup(ctx, dt_obj, rec,
- fld_key(ctx, seq_num));
+ fld_key(ctx, seq));
if (rc == 0)
- *mds_num = be64_to_cpu(*(__u64 *)rec);
+ *mds = be64_to_cpu(*(__u64 *)rec);
RETURN(rc);
}
int fld_handle_insert(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq_num, mdsno_t mdsno);
+ fidseq_t seq, mdsno_t mds);
int fld_handle_delete(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq_num);
+ fidseq_t seq);
int fld_handle_lookup(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq_num, mdsno_t *mds);
+ fidseq_t seq, mdsno_t *mds);
int fld_iam_init(struct lu_server_fld *fld,
const struct lu_context *ctx);
struct portals_handle exp_handle;
atomic_t exp_refcount;
struct obd_uuid exp_client_uuid;
+ struct list_head exp_fld_chain;
struct list_head exp_obd_chain;
/* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
struct list_head exp_obd_chain_timed;
struct obd_export *exp);
int fld_client_create(struct lu_client_fld *fld,
- __u64 seq, __u64 mds_num);
+ __u64 seq, __u64 mds);
int fld_client_delete(struct lu_client_fld *fld,
- __u64 seq, __u64 mds_num);
+ __u64 seq, __u64 mds);
int fld_client_get(struct lu_client_fld *fld,
- __u64 lu_seq, __u64 *mds_num);
+ __u64 seq, __u64 *mds);
int fld_client_lookup(struct lu_client_fld *fld,
- __u64 lu_seq, __u64 *mds_num);
+ __u64 seq, __u64 *mds);
#endif
#include <lustre_lib.h>
#include <lustre_export.h>
#include <lustre_quota.h>
+#include <lustre_fld.h>
#include <lu_object.h>
/* this is really local to the OSC */
struct lmv_obd {
int refcount;
+ struct lu_client_fld lmv_fld;
spinlock_t lmv_lock;
struct lmv_desc desc;
struct obd_uuid cluuid;
#include <lprocfs_status.h>
#include "lmv_internal.h"
-/* dummy function for a while */
-int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid)
+int lmv_fld_lookup(struct obd_device *obd, const struct lu_fid *fid)
{
+ struct lmv_obd *lmv = &obd->u.lmv;
+ __u64 mds;
int rc;
ENTRY;
LASSERT(fid_is_sane(fid));
-
- /* temporary hack until fld will works */
- rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
- CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
- RETURN(rc);
+ rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), &mds);
+ if (rc) {
+ CERROR("can't find mds by seq "LPU64", rc %d\n",
+ fid_seq(fid), rc);
+ RETURN(rc);
+ }
+ CWARN("LMV: got MDS "LPU64" for sequence: "LPU64"\n",
+ mds, fid_seq(fid));
+ RETURN((int)mds);
}
int lmv_handle_split(struct obd_export *, struct lu_fid *);
int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
void *, int);
-int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid);
+int lmv_fld_lookup(struct obd_device *obd, const struct lu_fid *fid);
static inline struct lmv_stripe_md *
lmv_get_mea(struct ptlrpc_request *req, int offset)
}
mdc_exp = class_conn2export(&conn);
+ fld_client_add_export(&lmv->lmv_fld, mdc_exp);
+
mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
rc = obd_register_observer(mdc_obd, obd);
/* asking underlaying tgt layer to allocate new fid */
rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, hint);
+
+ /* client switches to new sequence, setup fld */
+ if (rc == -ERESTART) {
+ rc = fld_client_create(&lmv->lmv_fld,
+ fid_seq(fid),
+ mds);
+ if (rc) {
+ CERROR("can't create fld entry, "
+ "rc %d\n", rc);
+ }
+ }
+
RETURN(rc);
}
CERROR("Can't setup LMV object manager, "
"error %d.\n", rc);
GOTO(out_free_datas, rc);
- RETURN(rc);
}
lprocfs_init_vars(lmv, &lvars);
}
}
#endif
+ rc = fld_client_init(&lmv->lmv_fld, LUSTRE_CLI_FLD_HASH_RRB);
+ if (rc) {
+ CERROR("can't init FLD, err %d\n",
+ rc);
+ GOTO(out_free_datas, rc);
+ }
+
RETURN(0);
out_free_datas:
lprocfs_obd_cleanup(obd);
lmv_mgr_cleanup(obd);
+ fld_client_fini(&lmv->lmv_fld);
OBD_FREE(lmv->datas, lmv->datas_size);
OBD_FREE(lmv->tgts, lmv->tgts_size);
RETURN(0);
}
+/* XXX: this is ugly, should be something else */
static int mdt_controller_init(struct mdt_device *m,
struct lustre_cfg *cfg)
{