- fid_is_local() uses client FLD cache, site contains ls_client_fld which is initialized and used by cmm;
- in fld_client_del_target() use class_export_put() instead of class_export_get(). The same in fini path;
- added struct fld_target which wraps obd_export and target index to not use silly counter while getting correct target by hash. This prevents it from wrong behavios if some target will be removed in alive cluster;
- all __u64 seq are replaced by seqno_t seq and all __u64 mds replaced by mdsno_t mds.
{
struct lu_device_type *ldt = &mdc_device_type;
struct lu_device *ld;
+ struct lu_site *ls;
struct mdc_device *mc, *tmp;
char *p, *num = lustre_cfg_string(cfg, 2);
mdsno_t mdc_num;
lu_device_get(cmm2lu_dev(cm));
- fld_client_add_target(&cm->cmm_fld,
+ ls = cm->cmm_md_dev.md_lu_dev.ld_site;
+ fld_client_add_target(ls->ls_client_fld,
mc->mc_desc.cl_exp);
}
RETURN(rc);
struct lu_device *d, struct lu_device *next)
{
struct cmm_device *m = lu2cmm_dev(d);
+ struct lu_site *ls;
int err = 0;
-
ENTRY;
spin_lock_init(&m->cmm_tgt_guard);
m->cmm_tgt_count = 0;
m->cmm_child = lu2md_dev(next);
- err = fld_client_init(&m->cmm_fld, "CMM_UUID",
+ ls = m->cmm_md_dev.md_lu_dev.ld_site;
+ OBD_ALLOC_PTR(ls->ls_client_fld);
+ if (!ls->ls_client_fld)
+ RETURN(-ENOMEM);
+
+ err = fld_client_init(ls->ls_client_fld, "CMM_UUID",
LUSTRE_CLI_FLD_HASH_RRB);
if (err) {
CERROR("can't init FLD, err %d\n", err);
{
struct cmm_device *cm = lu2cmm_dev(ld);
struct mdc_device *mc, *tmp;
+ struct lu_site *ls;
ENTRY;
- fld_client_fini(&cm->cmm_fld);
+ ls = cm->cmm_md_dev.md_lu_dev.ld_site;
+ fld_client_fini(ls->ls_client_fld);
+ OBD_FREE_PTR(ls->ls_client_fld);
+ ls->ls_client_fld = NULL;
+
/* finish all mdc devices */
spin_lock(&cm->cmm_tgt_guard);
list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
__u32 cmm_tgt_count;
struct list_head cmm_targets;
spinlock_t cmm_tgt_guard;
-
- /* client FLD interface */
- struct lu_client_fld cmm_fld;
};
enum cmm_flags {
static int cmm_fld_lookup(struct cmm_device *cm,
const struct lu_fid *fid, mdsno_t *mds)
{
+ struct lu_site *ls;
int rc = 0;
ENTRY;
LASSERT(fid_is_sane(fid));
- rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), mds);
+ ls = cm->cmm_md_dev.md_lu_dev.ld_site;
+
+ rc = fld_client_lookup(ls->ls_client_fld,
+ fid_seq(fid), mds);
if (rc) {
CERROR("can't find mds by seq "LPX64", rc %d\n",
fid_seq(fid), rc);
# include <liblustre.h>
#endif
+#include <obd.h>
+#include <lu_object.h>
#include <lustre_fid.h>
void fid_to_le(struct lu_fid *dst, const struct lu_fid *src)
dst->f_ver = le32_to_cpu(fid_ver(src));
}
EXPORT_SYMBOL(fid_to_le);
+
+/*
+ * Returns true, if fid is local to this server node.
+ *
+ * WARNING: this function is *not* guaranteed to return false if fid is
+ * remote: it makes an educated conservative guess only.
+ *
+ * fid_is_local() is supposed to be used in assertion checks only.
+ */
+int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+{
+ int result;
+
+ result = 1; /* conservatively assume fid is local */
+ if (site->ls_client_fld != NULL) {
+ struct fld_cache_entry *entry;
+
+ entry = fld_cache_lookup(site->ls_client_fld->fld_cache,
+ fid_seq(fid));
+ if (entry != NULL)
+ result = (entry->fce_mds == site->ls_node_id);
+ }
+ return result;
+}
+EXPORT_SYMBOL(fid_is_local);
/* allocate new sequence for client (llite or MDC are expected to use this) */
static int
-__seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+__seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
{
int rc = 0;
ENTRY;
}
int
-seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
{
int rc = 0;
ENTRY;
int
seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
{
- __u64 seqnr = 0;
+ seqno_t seqnr = 0;
int rc;
ENTRY;
RETURN(cache);
}
+EXPORT_SYMBOL(fld_cache_init);
void fld_cache_fini(struct fld_cache_info *cache)
{
EXIT;
}
+EXPORT_SYMBOL(fld_cache_fini);
int
fld_cache_insert(struct fld_cache_info *cache,
- __u64 seq, __u64 mds)
+ seqno_t seq, mdsno_t mds)
{
struct fld_cache_entry *flde, *fldt;
struct hlist_head *bucket;
OBD_FREE_PTR(flde);
return rc;
}
+EXPORT_SYMBOL(fld_cache_insert);
void
-fld_cache_delete(struct fld_cache_info *cache, __u64 seq)
+fld_cache_delete(struct fld_cache_info *cache, seqno_t seq)
{
struct fld_cache_entry *flde;
struct hlist_head *bucket;
spin_unlock(&cache->fci_lock);
return;
}
+EXPORT_SYMBOL(fld_cache_delete);
struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *cache, __u64 seq)
+fld_cache_lookup(struct fld_cache_info *cache, seqno_t seq)
{
struct fld_cache_entry *flde;
struct hlist_head *bucket;
spin_unlock(&cache->fci_lock);
RETURN(NULL);
}
+EXPORT_SYMBOL(fld_cache_lookup);
#endif
#include "fld_internal.h"
#ifdef __KERNEL__
-struct fld_cache_info *fld_cache = NULL;
-
-static int fld_init(void)
-{
- int rc = 0;
- ENTRY;
-
- fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
- if (IS_ERR(fld_cache))
- rc = PTR_ERR(fld_cache);
-
- if (rc != 0)
- fld_cache = NULL;
-
- RETURN(rc);
-}
-
-static void fld_fini(void)
-{
- ENTRY;
- if (fld_cache != NULL) {
- fld_cache_fini(fld_cache);
- fld_cache = NULL;
- }
- EXIT;
-}
-
static int __init fld_mod_init(void)
{
- fld_init();
+ /* nothing to init seems */
return 0;
}
static void __exit fld_mod_exit(void)
{
- fld_fini();
+ /* nothing to fini seems */
return;
}
int
fld_server_create(struct lu_server_fld *fld,
const struct lu_context *ctx,
- __u64 seq, mdsno_t mds)
+ seqno_t seq, mdsno_t mds)
{
- int rc;
ENTRY;
-
- rc = fld_index_create(fld, ctx, seq, mds);
- if (rc == 0) {
- /* do not return result of calling fld_cache_insert()
- * here. First of all because it may return -EEXISTS. Another
- * reason is that, we do not want to stop proceeding because of
- * cache errors. --umka */
- fld_cache_insert(fld_cache, seq, mds);
- }
- RETURN(rc);
+ RETURN(fld_index_create(fld, ctx, seq, mds));
}
EXPORT_SYMBOL(fld_server_create);
-/* delete index entry and update cache */
+/* delete index entry */
int
fld_server_delete(struct lu_server_fld *fld,
const struct lu_context *ctx,
- __u64 seq)
+ seqno_t seq)
{
ENTRY;
- fld_cache_delete(fld_cache, seq);
RETURN(fld_index_delete(fld, ctx, seq));
}
EXPORT_SYMBOL(fld_server_delete);
-/* lookup in cache first and then issue index lookup */
+/* issue on-disk index lookup */
int
fld_server_lookup(struct lu_server_fld *fld,
const struct lu_context *ctx,
- __u64 seq, mdsno_t *mds)
+ seqno_t seq, mdsno_t *mds)
{
- struct fld_cache_entry *flde;
- int rc;
ENTRY;
-
- /* lookup it in the cache first */
- flde = fld_cache_lookup(fld_cache, seq);
- if (flde != NULL) {
- *mds = flde->fce_mds;
- RETURN(0);
- }
-
- rc = fld_index_lookup(fld, ctx, seq, mds);
- RETURN(rc);
+ RETURN(fld_index_lookup(fld, ctx, seq, mds));
}
EXPORT_SYMBOL(fld_server_lookup);
if (req->rq_export != NULL) {
site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
LASSERT(site != NULL);
- rc = fld_req_handle0(ctx, site->ls_fld, req);
+ rc = fld_req_handle0(ctx, site->ls_server_fld, req);
} else {
CERROR("Unconnected request\n");
req->rq_status = -ENOTCONN;
return 0;
}
-/*
- * Returns true, if fid is local to this server node.
- *
- * WARNING: this function is *not* guaranteed to return false if fid is
- * remote: it makes an educated conservative guess only.
- *
- * fid_is_local() is supposed to be used in assertion checks only.
- */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
-{
- int result;
-
- result = 1; /* conservatively assume fid is local */
- if (site->ls_fld != NULL) {
- struct fld_cache_entry *entry;
-
- entry = fld_cache_lookup(fld_cache, fid_seq(fid));
- if (entry != NULL)
- result = (entry->fce_mds == site->ls_node_id);
- }
- return result;
-}
-EXPORT_SYMBOL(fid_is_local);
-
#ifdef LPROCFS
static int
fld_server_proc_init(struct lu_server_fld *fld)
static const struct dt_index_features fld_index_features = {
.dif_flags = DT_IND_UPDATE,
- .dif_keysize_min = sizeof(fidseq_t),
- .dif_keysize_max = sizeof(fidseq_t),
+ .dif_keysize_min = sizeof(seqno_t),
+ .dif_keysize_max = sizeof(seqno_t),
.dif_recsize_min = sizeof(mdsno_t),
.dif_recsize_max = sizeof(mdsno_t)
};
};
static struct dt_key *fld_key(const struct lu_context *ctx,
- const fidseq_t seq)
+ const seqno_t seq)
{
struct fld_thread_info *info;
ENTRY;
int fld_index_create(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq, mdsno_t mds)
+ seqno_t seq, mdsno_t mds)
{
struct dt_device *dt = fld->fld_dt;
struct dt_object *dt_obj = fld->fld_obj;
int fld_index_delete(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq)
+ seqno_t seq)
{
struct dt_device *dt = fld->fld_dt;
struct dt_object *dt_obj = fld->fld_obj;
int fld_index_lookup(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq, mdsno_t *mds)
+ seqno_t seq, mdsno_t *mds)
{
struct dt_object *dt_obj = fld->fld_obj;
struct dt_rec *rec = fld_rec(ctx, 0);
#include <linux/types.h>
-typedef __u64 fidseq_t;
-
-struct fld_cache_entry {
- struct hlist_node fce_list;
- mdsno_t fce_mds;
- fidseq_t fce_seq;
-};
-
-struct fld_cache_info {
- struct hlist_head *fci_hash;
- spinlock_t fci_lock;
- int fci_size;
- int fci_hash_mask;
+struct fld_target {
+ struct list_head fldt_chain;
+ struct obd_export *fldt_exp;
+ __u64 fldt_idx;
};
enum fld_op {
#define FLD_HTABLE_SIZE 256
extern struct lu_fld_hash fld_hash[3];
-extern struct fld_cache_info *fld_cache;
#ifdef __KERNEL__
#define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
int fld_index_create(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq, mdsno_t mds);
+ seqno_t seq, mdsno_t mds);
int fld_index_delete(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq);
+ seqno_t seq);
int fld_index_lookup(struct lu_server_fld *fld,
const struct lu_context *ctx,
- fidseq_t seq, mdsno_t *mds);
-
-struct fld_cache_info *fld_cache_init(int size);
-
-void fld_cache_fini(struct fld_cache_info *cache);
+ seqno_t seq, mdsno_t *mds);
-int fld_cache_insert(struct fld_cache_info *cache,
- __u64 seq, __u64 mds);
-
-void fld_cache_delete(struct fld_cache_info *cache,
- __u64 seq);
-
-struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *cache,
- __u64 seq);
-
-static inline __u32 fld_cache_hash(__u64 seq)
+static inline __u32 fld_cache_hash(seqno_t seq)
{
return (__u32)seq;
}
-
#endif
#ifdef LPROCFS
#include "fld_internal.h"
static int
-fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
+fld_rrb_hash(struct lu_client_fld *fld, seqno_t seq)
{
if (fld->fld_count == 0)
return 0;
}
static int
-fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
+fld_dht_hash(struct lu_client_fld *fld, seqno_t seq)
{
/* XXX: here should DHT hash */
return fld_rrb_hash(fld, seq);
}
};
-static struct obd_export *
-fld_client_get_target(struct lu_client_fld *fld, __u64 seq)
+static struct fld_target *
+fld_client_get_target(struct lu_client_fld *fld, seqno_t seq)
{
- struct obd_export *fld_exp;
- int count = 0, hash;
+ struct fld_target *target;
+ int hash;
ENTRY;
LASSERT(fld->fld_hash != NULL);
spin_unlock(&fld->fld_lock);
spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp,
- &fld->fld_exports, exp_fld_chain) {
- if (count == hash) {
+ list_for_each_entry(target,
+ &fld->fld_targets, fldt_chain) {
+ if (target->fldt_idx == hash) {
spin_unlock(&fld->fld_lock);
- RETURN(fld_exp);
+ RETURN(target);
}
- count++;
}
spin_unlock(&fld->fld_lock);
RETURN(NULL);
struct obd_export *exp)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
- struct obd_export *fld_exp;
+ struct fld_target *target, *tmp;
ENTRY;
LASSERT(exp != NULL);
CDEBUG(D_INFO|D_WARNING, "FLD(cli): adding export %s\n",
cli->cl_target_uuid.uuid);
+ OBD_ALLOC_PTR(target);
+ if (target == NULL)
+ RETURN(-ENOMEM);
+
spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
- if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+ list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
+ if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
&exp->exp_client_uuid))
{
spin_unlock(&fld->fld_lock);
+ OBD_FREE_PTR(target);
RETURN(-EEXIST);
}
}
+
+ target->fldt_exp = class_export_get(exp);
+ target->fldt_idx = fld->fld_count;
- fld_exp = class_export_get(exp);
- list_add_tail(&fld_exp->exp_fld_chain,
- &fld->fld_exports);
+ list_add_tail(&target->fldt_chain,
+ &fld->fld_targets);
fld->fld_count++;
-
spin_unlock(&fld->fld_lock);
RETURN(0);
/* remove export from FLD */
int
fld_client_del_target(struct lu_client_fld *fld,
- struct obd_export *exp)
+ struct obd_export *exp)
{
- struct obd_export *fld_exp;
- struct obd_export *tmp;
+ struct fld_target *target, *tmp;
ENTRY;
spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
- if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+ list_for_each_entry_safe(target, tmp,
+ &fld->fld_targets, fldt_chain) {
+ if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
&exp->exp_client_uuid))
{
fld->fld_count--;
- list_del(&fld_exp->exp_fld_chain);
- class_export_get(fld_exp);
-
+ list_del(&target->fldt_chain);
+ class_export_put(target->fldt_exp);
spin_unlock(&fld->fld_lock);
+ OBD_FREE_PTR(target);
RETURN(0);
}
}
RETURN(-EINVAL);
}
- INIT_LIST_HEAD(&fld->fld_exports);
+ INIT_LIST_HEAD(&fld->fld_targets);
spin_lock_init(&fld->fld_lock);
fld->fld_hash = &fld_hash[hash];
fld->fld_count = 0;
snprintf(fld->fld_name, sizeof(fld->fld_name),
"%s-%s", LUSTRE_FLD_NAME, uuid);
+ fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
+ if (IS_ERR(fld->fld_cache)) {
+ rc = PTR_ERR(fld->fld_cache);
+ fld->fld_cache = NULL;
+ GOTO(out, rc);
+ }
+
#ifdef LPROCFS
rc = fld_client_proc_init(fld);
if (rc)
void
fld_client_fini(struct lu_client_fld *fld)
{
- struct obd_export *fld_exp;
- struct obd_export *tmp;
+ struct fld_target *target, *tmp;
ENTRY;
#ifdef LPROCFS
#endif
spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp,
- &fld->fld_exports, exp_fld_chain) {
+ list_for_each_entry_safe(target, tmp,
+ &fld->fld_targets, fldt_chain) {
fld->fld_count--;
- list_del(&fld_exp->exp_fld_chain);
- class_export_get(fld_exp);
+ list_del(&target->fldt_chain);
+ class_export_put(target->fldt_exp);
+ OBD_FREE_PTR(target);
}
spin_unlock(&fld->fld_lock);
+
+ if (fld->fld_cache != NULL) {
+ fld_cache_fini(fld->fld_cache);
+ fld->fld_cache = NULL;
+ }
+
CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
EXIT;
}
int
fld_client_create(struct lu_client_fld *fld,
- __u64 seq, mdsno_t mds)
+ seqno_t seq, mdsno_t mds)
{
- struct obd_export *fld_exp;
+ struct fld_target *target;
struct md_fld md_fld;
__u32 rc;
ENTRY;
- fld_exp = fld_client_get_target(fld, seq);
- if (!fld_exp)
+ target = fld_client_get_target(fld, seq);
+ if (!target)
RETURN(-EINVAL);
md_fld.mf_seq = seq;
md_fld.mf_mds = mds;
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
+ rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
#ifdef __KERNEL__
if (rc == 0) {
* here. First of all because it may return -EEXISTS. Another
* reason is that, we do not want to stop proceeding because of
* cache errors. --umka */
- fld_cache_insert(fld_cache, seq, mds);
+ fld_cache_insert(fld->fld_cache, seq, mds);
}
#endif
int
fld_client_delete(struct lu_client_fld *fld,
- __u64 seq)
+ seqno_t seq)
{
- struct obd_export *fld_exp;
+ struct fld_target *target;
struct md_fld md_fld;
__u32 rc;
#ifdef __KERNEL__
- fld_cache_delete(fld_cache, seq);
+ fld_cache_delete(fld->fld_cache, seq);
#endif
- fld_exp = fld_client_get_target(fld, seq);
- if (!fld_exp)
+ target = fld_client_get_target(fld, seq);
+ if (!target)
RETURN(-EINVAL);
md_fld.mf_seq = seq;
md_fld.mf_mds = 0;
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
+ rc = fld_client_rpc(target->fldt_exp,
+ &md_fld, FLD_DELETE);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_delete);
static int
fld_client_get(struct lu_client_fld *fld,
- __u64 seq, mdsno_t *mds)
+ seqno_t seq, mdsno_t *mds)
{
- struct obd_export *fld_exp;
+ struct fld_target *target;
struct md_fld md_fld;
int rc;
ENTRY;
- fld_exp = fld_client_get_target(fld, seq);
- if (!fld_exp)
+ target = fld_client_get_target(fld, seq);
+ if (!target)
RETURN(-EINVAL);
md_fld.mf_seq = seq;
- rc = fld_client_rpc(fld_exp,
+ rc = fld_client_rpc(target->fldt_exp,
&md_fld, FLD_LOOKUP);
if (rc == 0)
*mds = md_fld.mf_mds;
/* lookup fid in the namespace of pfid according to the name */
int
fld_client_lookup(struct lu_client_fld *fld,
- __u64 seq, mdsno_t *mds)
+ seqno_t seq, mdsno_t *mds)
{
#ifdef __KERNEL__
struct fld_cache_entry *flde;
#ifdef __KERNEL__
/* lookup it in the cache */
- flde = fld_cache_lookup(fld_cache, seq);
+ flde = fld_cache_lookup(fld->fld_cache, seq);
if (flde != NULL) {
*mds = flde->fce_mds;
RETURN(0);
#ifdef __KERNEL__
/* do not return error here as well. See previous comment in same
* situation in function fld_client_create(). --umka */
- fld_cache_insert(fld_cache, seq, *mds);
+ fld_cache_insert(fld->fld_cache, seq, *mds);
#endif
RETURN(rc);
int count, int *eof, void *data)
{
struct lu_client_fld *fld = (struct lu_client_fld *)data;
- struct obd_export *fld_exp;
+ struct fld_target *target;
int total = 0, rc;
ENTRY;
LASSERT(fld != NULL);
spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp,
- &fld->fld_exports, exp_fld_chain)
+ list_for_each_entry(target,
+ &fld->fld_targets, fldt_chain)
{
- struct client_obd *cli = &fld_exp->exp_obd->u.cli;
+ struct client_obd *cli = &target->fldt_exp->exp_obd->u.cli;
rc = snprintf(page, count, "%s\n",
cli->cl_target_uuid.uuid);
/*
* Fid location database
*/
- struct lu_server_fld *ls_fld;
+ struct lu_server_fld *ls_server_fld;
+ struct lu_client_fld *ls_client_fld;
/*
* Server Seq Manager
#define LUSTRE_MGS_VERSION 0x00060000
typedef __u64 mdsno_t;
+typedef __u64 seqno_t;
struct lu_range {
__u64 lr_start;
/* end adding MDT by huanghua@clusterfs.com */
struct md_fld {
- __u64 mf_seq;
+ seqno_t mf_seq;
mdsno_t mf_mds;
};
struct portals_handle exp_handle;
atomic_t exp_refcount;
struct obd_uuid exp_client_uuid;
- struct list_head exp_fld_chain;
struct list_head exp_obd_chain;
/* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
struct list_head exp_obd_chain_timed;
int seq_client_alloc_meta(struct lu_client_seq *seq);
int seq_client_alloc_seq(struct lu_client_seq *seq,
- __u64 *seqnr);
+ seqno_t *seqnr);
int seq_client_alloc_fid(struct lu_client_seq *seq,
struct lu_fid *fid);
char fld_name[80];
};
+struct fld_cache_entry {
+ struct hlist_node fce_list;
+ mdsno_t fce_mds;
+ seqno_t fce_seq;
+};
+
+struct fld_cache_info {
+ struct hlist_head *fci_hash;
+ spinlock_t fci_lock;
+ int fci_size;
+ int fci_hash_mask;
+};
+
struct lu_client_fld {
/* client side proc entry */
cfs_proc_dir_entry_t *fld_proc_dir;
/* list of exports client FLD knows about */
- struct list_head fld_exports;
+ struct list_head fld_targets;
/* current hash to be used to chose an export */
struct lu_fld_hash *fld_hash;
/* lock protecting exports list and fld_hash */
spinlock_t fld_lock;
+ /* client FLD cache */
+ struct fld_cache_info *fld_cache;
+
/* client fld proc entry name */
char fld_name[80];
};
int fld_server_lookup(struct lu_server_fld *fld,
const struct lu_context *ctx,
- __u64 seq, mdsno_t *mds);
+ seqno_t seq, mdsno_t *mds);
int fld_server_create(struct lu_server_fld *fld,
const struct lu_context *ctx,
- __u64 seq, mdsno_t mds);
+ seqno_t seq, mdsno_t mds);
int fld_server_delete(struct lu_server_fld *fld,
const struct lu_context *ctx,
- __u64 seq);
+ seqno_t seq);
/* client methods */
int fld_client_init(struct lu_client_fld *fld,
void fld_client_fini(struct lu_client_fld *fld);
int fld_client_lookup(struct lu_client_fld *fld,
- __u64 seq, mdsno_t *mds);
+ seqno_t seq, mdsno_t *mds);
int fld_client_create(struct lu_client_fld *fld,
- __u64 seq, mdsno_t mds);
+ seqno_t seq, mdsno_t mds);
int fld_client_delete(struct lu_client_fld *fld,
- __u64 seq);
+ seqno_t seq);
int fld_client_add_target(struct lu_client_fld *fld,
struct obd_export *exp);
int fld_client_del_target(struct lu_client_fld *fld,
struct obd_export *exp);
+/* cache methods */
+struct fld_cache_info *fld_cache_init(int size);
+
+void fld_cache_fini(struct fld_cache_info *cache);
+
+int fld_cache_insert(struct fld_cache_info *cache,
+ seqno_t seq, mdsno_t mds);
+
+void fld_cache_delete(struct fld_cache_info *cache,
+ seqno_t seq);
+
+struct fld_cache_entry *
+fld_cache_lookup(struct fld_cache_info *cache,
+ seqno_t seq);
+
#endif
ls = m->mdt_md_dev.md_lu_dev.ld_site;
- OBD_ALLOC_PTR(ls->ls_fld);
+ OBD_ALLOC_PTR(ls->ls_server_fld);
- if (ls->ls_fld != NULL) {
- rc = fld_server_init(ls->ls_fld, ctx,
+ if (ls->ls_server_fld != NULL) {
+ rc = fld_server_init(ls->ls_server_fld, ctx,
uuid, m->mdt_bottom);
if (rc) {
- OBD_FREE_PTR(ls->ls_fld);
- ls->ls_fld = NULL;
+ OBD_FREE_PTR(ls->ls_server_fld);
+ ls->ls_server_fld = NULL;
}
} else
rc = -ENOMEM;
struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
ENTRY;
- if (ls && ls->ls_fld) {
- fld_server_fini(ls->ls_fld, ctx);
- OBD_FREE_PTR(ls->ls_fld);
- ls->ls_fld = NULL;
+ if (ls && ls->ls_server_fld) {
+ fld_server_fini(ls->ls_server_fld, ctx);
+ OBD_FREE_PTR(ls->ls_server_fld);
+ ls->ls_server_fld = NULL;
}
RETURN(0);
}