* RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a
removed cwd "./" (refer to Bugzilla 14399).
* File join has been disabled in this release, refer to Bugzilla 16929.
-
+
+Severity : enhancement
+Bugzilla : 15957
+Description: compact fld format with extents
+Details : Store range of seq rather than every seq in FLD. Seq
+ controller update FLD rather than clients. In Case of CMD, mdt0
+ has FLD, all other metadata server act as non persistent proxy
+ for FLD queries and cache fld entries in fld cache.
+
Severity : normal
Frequency : rare
Bugzilla : 16081
struct lu_device *ld;
struct lu_device *cmm_lu = cmm2lu_dev(cm);
mdsno_t mdc_num;
+ struct lu_site *site = cmm2lu_dev(cm)->ld_site;
int rc;
ENTRY;
if (IS_ERR(ld))
RETURN(PTR_ERR(ld));
- ld->ld_site = cmm2lu_dev(cm)->ld_site;
+ ld->ld_site = site;
rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
if (rc) {
target.ft_exp = mc->mc_desc.cl_exp;
fld_client_add_target(cm->cmm_fld, &target);
+ if (mc->mc_num == 0) {
+ /* this is mdt0 -> mc export, fld lookup need this export
+ to forward fld lookup request. */
+ LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
+ lu_site2md(site)->ms_server_fld->lsf_control_exp =
+ mc->mc_desc.cl_exp;
+ }
/* Set max md size for the mdc. */
rc = cmm_post_init_mdc(env, cm);
RETURN(rc);
}
if (*mds > cm->cmm_tgt_count) {
- CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
+ CERROR("Got invalid mdsno: %x (max: %x)\n",
*mds, cm->cmm_tgt_count);
rc = -EINVAL;
} else {
- CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
- LPU64"\n", *mds, fid_seq(fid));
+ CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
+ LPX64"\n", *mds, fid_seq(fid));
}
RETURN (rc);
/* Alloc new fid on @mc. */
rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
- if (rc > 0) {
- /* Setup FLD for new sequenceif needed. */
- rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
- mc->mc_num, env);
- if (rc)
- CERROR("Can't create fld entry, rc %d\n", rc);
- }
+ if (rc > 0)
+ rc = 0;
up(&mc->mc_fid_sem);
RETURN(rc);
seq->lss_name, cli->lcs_name);
seq->lss_cli = cli;
+ cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
EXIT;
out_up:
up(&seq->lss_sem);
}
EXPORT_SYMBOL(seq_server_set_cli);
-/*
+/**
* On controller node, allocate new super sequence for regular sequence server.
+ * As this super sequence controller, this node suppose to maintain fld
+ * and update index.
+ * \a out range always has currect mds node number of requester.
*/
+
static int __seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
- struct lu_range *space = &seq->lss_space;
- int rc;
+ struct lu_seq_range *space = &seq->lss_space;
+ struct thandle *th;
+ __u64 mdt = out->lsr_mdt;
+ int rc, credit;
ENTRY;
LASSERT(range_is_sane(space));
CDEBUG(D_INFO, "%s: Input seq range: "
DRANGE"\n", seq->lss_name, PRANGE(in));
- if (in->lr_end > space->lr_start)
- space->lr_start = in->lr_end;
+ if (in->lsr_end > space->lsr_start)
+ space->lsr_start = in->lsr_end;
*out = *in;
CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
"Only "LPU64" sequences left\n", seq->lss_name,
range_space(space));
*out = *space;
- space->lr_start = space->lr_end;
+ space->lsr_start = space->lsr_end;
} else if (range_is_exhausted(space)) {
CERROR("%s: Sequences space is exhausted\n",
seq->lss_name);
range_alloc(out, space, seq->lss_width);
}
}
+ out->lsr_mdt = mdt;
+
+ credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
+
+ th = seq_store_trans_start(seq, env, credit);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
- rc = seq_store_write(seq, env);
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
- RETURN(rc);
+ goto out;
}
- CDEBUG(D_INFO, "%s: Allocated super-sequence "
- DRANGE"\n", seq->lss_name, PRANGE(out));
+ rc = fld_server_create(seq->lss_site->ms_server_fld,
+ env, out, th);
+ if (rc) {
+ CERROR("%s: Can't Update fld database, rc %d\n",
+ seq->lss_name, rc);
+ }
+
+out:
+ seq_store_trans_stop(seq, env, th);
+
+ CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
+ DRANGE"\n", seq->lss_name, rc, PRANGE(out));
RETURN(rc);
}
int seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
int rc;
}
static int __seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
- struct lu_range *space = &seq->lss_space;
+ struct lu_seq_range *space = &seq->lss_space;
+ struct thandle *th;
int rc = 0;
+
ENTRY;
LASSERT(range_is_sane(space));
* we check here that range from client is "newer" than
* exhausted super.
*/
- LASSERT(in->lr_end > space->lr_start);
+ LASSERT(in->lsr_end > space->lsr_start);
/*
* Start is set to end of last allocated, because it
* *is* already allocated so we take that into account
* and do not use for other allocations.
*/
- space->lr_start = in->lr_end;
+ space->lsr_start = in->lsr_end;
/*
- * End is set to in->lr_start + super sequence
- * allocation unit. That is because in->lr_start is
+ * End is set to in->lsr_start + super sequence
+ * allocation unit. That is because in->lsr_start is
* first seq in new allocated range from controller
* before failure.
*/
- space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
+ space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
if (!seq->lss_cli) {
CERROR("%s: No sequence controller "
* obtained range from it was @space.
*/
rc = seq_client_replay_super(seq->lss_cli, space, env);
+
if (rc) {
CERROR("%s: Can't replay super-sequence, "
"rc %d\n", seq->lss_name, rc);
* Update super start by end from client's range. Super
* end should not be changed if range was not exhausted.
*/
- if (in->lr_end > space->lr_start)
- space->lr_start = in->lr_end;
+ if (in->lsr_end > space->lsr_start)
+ space->lsr_start = in->lsr_end;
}
*out = *in;
range_alloc(out, space, seq->lss_width);
}
- rc = seq_store_write(seq, env);
+ th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
DRANGE"\n", seq->lss_name, PRANGE(out));
}
+ seq_store_trans_stop(seq, env, th);
RETURN(rc);
}
int seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
int rc;
static int seq_server_handle(struct lu_site *site,
const struct lu_env *env,
- __u32 opc, struct lu_range *in,
- struct lu_range *out)
+ __u32 opc, struct lu_seq_range *in,
+ struct lu_seq_range *out)
{
int rc;
struct md_site *mite;
const struct lu_env *env,
struct seq_thread_info *info)
{
- struct lu_range *out, *in = NULL;
+ struct lu_seq_range *out, *in = NULL, *tmp;
struct lu_site *site;
int rc = -EPROTO;
__u32 *opc;
if (out == NULL)
RETURN(err_serious(-EPROTO));
- if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
- in = req_capsule_client_get(info->sti_pill,
- &RMF_SEQ_RANGE);
+ tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ in = tmp;
LASSERT(!range_is_zero(in) && range_is_sane(in));
}
+ /* seq client passed mdt id, we need to pass that using out
+ * range parameter */
+ out->lsr_mdt = tmp->lsr_mdt;
rc = seq_server_handle(site, env, *opc, in, out);
} else
rc = err_serious(-EPROTO);
struct dt_device *dev,
const char *prefix,
enum lu_mgr_type type,
+ struct md_site *ms,
const struct lu_env *env)
{
+ struct thandle *th;
int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
ENTRY;
seq->lss_cli = NULL;
seq->lss_type = type;
+ seq->lss_site = ms;
range_init(&seq->lss_space);
sema_init(&seq->lss_sem, 1);
rc = seq_store_init(seq, env, dev);
if (rc)
GOTO(out, rc);
-
/* Request backing store for saved sequence info. */
rc = seq_store_read(seq, env);
if (rc == -ENODATA) {
LUSTRE_SEQ_ZERO_RANGE:
LUSTRE_SEQ_SPACE_RANGE;
+ seq->lss_space.lsr_mdt = ms->ms_node_id;
CDEBUG(D_INFO, "%s: No data found "
"on store. Initialize space\n",
seq->lss_name);
+ th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
/* Save default controller value to store. */
- rc = seq_store_write(seq, env);
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, "
"rc %d\n", seq->lss_name, rc);
}
+ seq_store_trans_stop(seq, env, th);
} else if (rc) {
CERROR("%s: Can't read space data, rc %d\n",
seq->lss_name, rc);
struct seq_thread_info {
struct req_capsule *sti_pill;
struct txn_param sti_txn;
- struct lu_range sti_space;
+ struct lu_seq_range sti_space;
struct lu_buf sti_buf;
};
+enum {
+ SEQ_TXN_STORE_CREDITS = 20
+};
+
extern struct lu_context_key seq_thread_key;
/* Functions used internally in module. */
const struct lu_env *env);
int seq_client_replay_super(struct lu_client_seq *seq,
- struct lu_range *range,
+ struct lu_seq_range *range,
const struct lu_env *env);
/* Store API functions. */
const struct lu_env *env);
int seq_store_write(struct lu_server_seq *seq,
- const struct lu_env *env);
+ const struct lu_env *env,
+ struct thandle *th);
int seq_store_read(struct lu_server_seq *seq,
const struct lu_env *env);
+struct thandle * seq_store_trans_start(struct lu_server_seq *seq,
+ const struct lu_env *env,
+ int credits);
+void seq_store_trans_stop(struct lu_server_seq *seq,
+ const struct lu_env *env,
+ struct thandle *th);
+
#ifdef LPROCFS
extern struct lprocfs_vars seq_server_proc_list[];
extern struct lprocfs_vars seq_client_proc_list[];
* The first 0x400 sequences of normal FID are reserved for special purpose.
* FID_SEQ_START + 1 is for local file id generation.
*/
-const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
+const struct lu_seq_range LUSTRE_SEQ_SPACE_RANGE = {
FID_SEQ_START + 0x400ULL,
(__u64)~0ULL
};
EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
/* Zero range, used for init and other purposes. */
-const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
+const struct lu_seq_range LUSTRE_SEQ_ZERO_RANGE = {
0,
0
};
.f_oid = 0x0000000000000001,
.f_ver = 0x0000000000000000 };
EXPORT_SYMBOL(LUSTRE_BFL_FID);
-
-void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src)
-{
- /* check that all fields are converted */
- CLASSERT(sizeof(*src) ==
- sizeof(src->lr_start) +
- sizeof(src->lr_end) +
- sizeof(src->lr_padding));
- dst->lr_start = cpu_to_le64(src->lr_start);
- dst->lr_end = cpu_to_le64(src->lr_end);
-}
-EXPORT_SYMBOL(range_cpu_to_le);
-
-void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src)
-{
- /* check that all fields are converted */
- CLASSERT(sizeof(*src) ==
- sizeof(src->lr_start) +
- sizeof(src->lr_end) +
- sizeof(src->lr_padding));
- dst->lr_start = le64_to_cpu(src->lr_start);
- dst->lr_end = le64_to_cpu(src->lr_end);
-}
-EXPORT_SYMBOL(range_le_to_cpu);
-
-#ifdef __KERNEL__
-void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src)
-{
- /* check that all fields are converted */
- CLASSERT(sizeof(*src) ==
- sizeof(src->lr_start) +
- sizeof(src->lr_end) +
- sizeof(src->lr_padding));
- dst->lr_start = cpu_to_be64(src->lr_start);
- dst->lr_end = cpu_to_be64(src->lr_end);
-}
-EXPORT_SYMBOL(range_cpu_to_be);
-
-void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src)
-{
- /* check that all fields are converted */
- CLASSERT(sizeof(*src) ==
- sizeof(src->lr_start) +
- sizeof(src->lr_end) +
- sizeof(src->lr_padding));
- dst->lr_start = be64_to_cpu(src->lr_start);
- dst->lr_end = be64_to_cpu(src->lr_end);
-}
-EXPORT_SYMBOL(range_be_to_cpu);
-
-#endif
#include <lustre_mdc.h>
#include "fid_internal.h"
-static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
- struct lu_range *output, __u32 opc,
+static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input,
+ struct lu_seq_range *output, __u32 opc,
const char *opcname)
{
struct obd_export *exp = seq->lcs_exp;
struct ptlrpc_request *req;
- struct lu_range *out, *in;
+ struct lu_seq_range *out, *in;
__u32 *op;
int rc;
ENTRY;
if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
+ /* update mdt field of *in, it is required for fld update
+ * on super sequence allocator node. */
+ if (opc == SEQ_ALLOC_SUPER)
+ in->lsr_mdt = seq->lcs_space.lsr_mdt;
} else {
- req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
- SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
+ LASSERT(opc == SEQ_ALLOC_META);
+ req->rq_request_portal = SEQ_DATA_PORTAL;
}
ptlrpc_at_set_req_timeout(req);
/* Request sequence-controller node to allocate new super-sequence. */
int seq_client_replay_super(struct lu_client_seq *seq,
- struct lu_range *range,
+ struct lu_seq_range *range,
const struct lu_env *env)
{
int rc;
}
LASSERT(!range_is_exhausted(&seq->lcs_space));
- *seqnr = seq->lcs_space.lr_start;
- seq->lcs_space.lr_start += 1;
+ *seqnr = seq->lcs_space.lsr_start;
+ seq->lcs_space.lsr_start += 1;
CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
*seqnr);
LASSERT(seq != NULL);
down(&seq->lcs_sem);
fid_zero(&seq->lcs_fid);
+ /**
+ * this id shld not be used for seq range allocation.
+ * set to -1 for dgb check.
+ */
+
+ seq->lcs_space.lsr_mdt = -1;
+
range_init(&seq->lcs_space);
up(&seq->lcs_sem);
}
#include "fid_internal.h"
#ifdef __KERNEL__
-enum {
- SEQ_TXN_STORE_CREDITS = 20
-};
static struct lu_buf *seq_store_buf(struct seq_thread_info *info)
{
return buf;
}
+struct thandle * seq_store_trans_start(struct lu_server_seq *seq,
+ const struct lu_env *env, int credit)
+{
+ struct seq_thread_info *info;
+ struct dt_device *dt_dev;
+ struct thandle *th;
+ ENTRY;
+
+ dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+ info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
+ LASSERT(info != NULL);
+
+ txn_param_init(&info->sti_txn, credit);
+
+ th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn);
+ return th;
+}
+
+void seq_store_trans_stop(struct lu_server_seq *seq,
+ const struct lu_env *env,
+ struct thandle *th)
+{
+ struct dt_device *dt_dev;
+ ENTRY;
+
+ dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+
+ dt_dev->dd_ops->dt_trans_stop(env, th);
+}
+
/* This function implies that caller takes care about locking. */
int seq_store_write(struct lu_server_seq *seq,
- const struct lu_env *env)
+ const struct lu_env *env,
+ struct thandle *th)
{
struct dt_object *dt_obj = seq->lss_obj;
struct seq_thread_info *info;
struct dt_device *dt_dev;
- struct thandle *th;
loff_t pos = 0;
- int rc;
- ENTRY;
+ int rc;
+ ENTRY;
dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
LASSERT(info != NULL);
- /* Stub here, will fix it later. */
- txn_param_init(&info->sti_txn, SEQ_TXN_STORE_CREDITS);
+ /* Store ranges in le format. */
+ range_cpu_to_le(&info->sti_space, &seq->lss_space);
- th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn);
- if (!IS_ERR(th)) {
- /* Store ranges in le format. */
- range_cpu_to_le(&info->sti_space, &seq->lss_space);
-
- rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
- seq_store_buf(info),
- &pos, th, BYPASS_CAPA, 1);
- if (rc == sizeof(info->sti_space)) {
- CDEBUG(D_INFO, "%s: Space - "DRANGE"\n",
- seq->lss_name, PRANGE(&seq->lss_space));
- rc = 0;
- } else if (rc >= 0) {
- rc = -EIO;
- }
-
- dt_dev->dd_ops->dt_trans_stop(env, th);
- } else {
- rc = PTR_ERR(th);
+ rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
+ seq_store_buf(info),
+ &pos, th, BYPASS_CAPA, 1);
+ if (rc == sizeof(info->sti_space)) {
+ CDEBUG(D_INFO, "%s: Space - "DRANGE"\n",
+ seq->lss_name, PRANGE(&seq->lss_space));
+ rc = 0;
+ } else if (rc >= 0) {
+ rc = -EIO;
}
-
- RETURN(rc);
+
+
+ RETURN(rc);
}
/*
static int
seq_proc_write_common(struct file *file, const char *buffer,
unsigned long count, void *data,
- struct lu_range *range)
+ struct lu_seq_range *range)
{
- struct lu_range tmp;
+ struct lu_seq_range tmp;
int rc;
ENTRY;
LASSERT(range != NULL);
- rc = sscanf(buffer, "[%Lx - %Lx]\n",(long long unsigned *)&tmp.lr_start,
- (long long unsigned *)&tmp.lr_end);
+ rc = sscanf(buffer, "[%Lx - %Lx]\n",(long long unsigned *)&tmp.lsr_start,
+ (long long unsigned *)&tmp.lsr_end);
if (rc != 2 || !range_is_sane(&tmp) || range_is_zero(&tmp))
RETURN(-EINVAL);
*range = tmp;
static int
seq_proc_read_common(char *page, char **start, off_t off,
int count, int *eof, void *data,
- struct lu_range *range)
+ struct lu_seq_range *range)
{
int rc;
ENTRY;
*eof = 1;
- rc = snprintf(page, count, "["LPX64" - "LPX64"]\n",
+ rc = snprintf(page, count, "["LPX64" - "LPX64"]:%x\n",
PRANGE(range));
RETURN(rc);
}
*
* FLD (Fids Location Database)
*
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
* Author: Yury Umanets <umka@clusterfs.com>
*/
#include <lustre_fld.h>
#include "fld_internal.h"
-#ifdef __KERNEL__
-static inline __u32 fld_cache_hash(seqno_t seq)
-{
- return (__u32)seq;
-}
-
-void fld_cache_flush(struct fld_cache *cache)
-{
- struct fld_cache_entry *flde;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- struct hlist_node *next;
- int i;
- ENTRY;
-
- /* Free all cache entries. */
- spin_lock(&cache->fci_lock);
- for (i = 0; i < cache->fci_hash_size; i++) {
- bucket = cache->fci_hash_table + i;
- hlist_for_each_entry_safe(flde, scan, next, bucket, fce_list) {
- hlist_del_init(&flde->fce_list);
- list_del_init(&flde->fce_lru);
- cache->fci_cache_count--;
- OBD_FREE_PTR(flde);
- }
- }
- spin_unlock(&cache->fci_lock);
- EXIT;
-}
-
-struct fld_cache *fld_cache_init(const char *name, int hash_size,
+/**
+ * create fld cache.
+ */
+struct fld_cache *fld_cache_init(const char *name,
int cache_size, int cache_threshold)
{
- struct fld_cache *cache;
- int i;
+ struct fld_cache *cache;
ENTRY;
LASSERT(name != NULL);
- LASSERT(IS_PO2(hash_size));
LASSERT(cache_threshold < cache_size);
OBD_ALLOC_PTR(cache);
if (cache == NULL)
RETURN(ERR_PTR(-ENOMEM));
- INIT_LIST_HEAD(&cache->fci_lru);
+ CFS_INIT_LIST_HEAD(&cache->fci_entries_head);
+ CFS_INIT_LIST_HEAD(&cache->fci_lru);
- cache->fci_cache_count = 0;
+ cache->fci_cache_count = 0;
spin_lock_init(&cache->fci_lock);
strncpy(cache->fci_name, name,
sizeof(cache->fci_name));
- cache->fci_hash_size = hash_size;
- cache->fci_cache_size = cache_size;
+ cache->fci_cache_size = cache_size;
cache->fci_threshold = cache_threshold;
/* Init fld cache info. */
- cache->fci_hash_mask = hash_size - 1;
- OBD_ALLOC(cache->fci_hash_table,
- hash_size * sizeof(*cache->fci_hash_table));
- if (cache->fci_hash_table == NULL) {
- OBD_FREE_PTR(cache);
- RETURN(ERR_PTR(-ENOMEM));
- }
-
- for (i = 0; i < hash_size; i++)
- INIT_HLIST_HEAD(&cache->fci_hash_table[i]);
memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
CDEBUG(D_INFO, "%s: FLD cache - Size: %d, Threshold: %d\n",
RETURN(cache);
}
-EXPORT_SYMBOL(fld_cache_init);
+/**
+ * destroy fld cache.
+ */
void fld_cache_fini(struct fld_cache *cache)
{
__u64 pct;
CDEBUG(D_INFO, "FLD cache statistics (%s):\n", cache->fci_name);
CDEBUG(D_INFO, " Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
CDEBUG(D_INFO, " Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
- CDEBUG(D_INFO, " Saved RPCs: "LPU64"\n", cache->fci_stat.fst_inflight);
CDEBUG(D_INFO, " Cache hits: "LPU64"%%\n", pct);
- OBD_FREE(cache->fci_hash_table, cache->fci_hash_size *
- sizeof(*cache->fci_hash_table));
- OBD_FREE_PTR(cache);
-
+ OBD_FREE_PTR(cache);
+
+ EXIT;
+}
+
+static inline void fld_cache_entry_delete(struct fld_cache *cache,
+ struct fld_cache_entry *node);
+
+/**
+ * fix list by checking new entry with NEXT entry in order.
+ */
+static void fld_fix_new_list(struct fld_cache *cache)
+{
+ struct fld_cache_entry *f_curr;
+ struct fld_cache_entry *f_next;
+ struct lu_seq_range *c_range;
+ struct lu_seq_range *n_range;
+ struct list_head *head = &cache->fci_entries_head;
+ ENTRY;
+
+restart_fixup:
+
+ list_for_each_entry_safe(f_curr, f_next, head, fce_list) {
+ c_range = &f_curr->fce_range;
+ n_range = &f_next->fce_range;
+
+ LASSERT(range_is_sane(c_range));
+ if (&f_next->fce_list == head)
+ break;
+
+ LASSERT(c_range->lsr_start <= n_range->lsr_start);
+
+ /* check merge possibility with next range */
+ if (c_range->lsr_end == n_range->lsr_start) {
+ if (c_range->lsr_mdt != n_range->lsr_mdt)
+ continue;
+ n_range->lsr_start = c_range->lsr_start;
+ fld_cache_entry_delete(cache, f_curr);
+ continue;
+ }
+
+ /* check if current range overlaps with next range. */
+ if (n_range->lsr_start < c_range->lsr_end) {
+
+ if (c_range->lsr_mdt == n_range->lsr_mdt) {
+ n_range->lsr_start = c_range->lsr_start;
+ n_range->lsr_end = max(c_range->lsr_end,
+ n_range->lsr_end);
+
+ fld_cache_entry_delete(cache, f_curr);
+ } else {
+ if (n_range->lsr_end <= c_range->lsr_end) {
+ *n_range = *c_range;
+ fld_cache_entry_delete(cache, f_curr);
+ } else
+ n_range->lsr_start = c_range->lsr_end;
+ }
+
+ /* we could have overlap over next
+ * range too. better restart. */
+ goto restart_fixup;
+ }
+
+ /* kill duplicates */
+ if (c_range->lsr_start == n_range->lsr_start &&
+ c_range->lsr_end == n_range->lsr_end)
+ fld_cache_entry_delete(cache, f_curr);
+ }
+
EXIT;
}
-EXPORT_SYMBOL(fld_cache_fini);
-static inline struct hlist_head *
-fld_cache_bucket(struct fld_cache *cache, seqno_t seq)
+/**
+ * add node to fld cache
+ */
+static inline void fld_cache_entry_add(struct fld_cache *cache,
+ struct fld_cache_entry *f_new,
+ struct list_head *pos)
{
- return cache->fci_hash_table + (fld_cache_hash(seq) &
- cache->fci_hash_mask);
+ list_add(&f_new->fce_list, pos);
+ list_add(&f_new->fce_lru, &cache->fci_lru);
+
+ cache->fci_cache_count++;
+ fld_fix_new_list(cache);
}
-/*
- * Check if cache needs to be shrinked. If so - do it. Tries to keep all
- * collision lists well balanced. That is, check all of them and remove one
- * entry in list and so on until cache is shrinked enough.
+/**
+ * delete given node from list.
+ */
+static inline void fld_cache_entry_delete(struct fld_cache *cache,
+ struct fld_cache_entry *node)
+{
+ list_del(&node->fce_list);
+ list_del(&node->fce_lru);
+ cache->fci_cache_count--;
+ OBD_FREE_PTR(node);
+}
+
+/**
+ * Check if cache needs to be shrunk. If so - do it.
+ * Remove one entry in list and so on until cache is shrunk enough.
*/
static int fld_cache_shrink(struct fld_cache *cache)
{
curr = cache->fci_lru.prev;
while (cache->fci_cache_count + cache->fci_threshold >
- cache->fci_cache_size && curr != &cache->fci_lru)
- {
+ cache->fci_cache_size && curr != &cache->fci_lru) {
+
flde = list_entry(curr, struct fld_cache_entry, fce_lru);
curr = curr->prev;
-
- /* keep inflights */
- if (flde->fce_inflight)
- continue;
-
- hlist_del_init(&flde->fce_list);
- list_del_init(&flde->fce_lru);
- cache->fci_cache_count--;
- OBD_FREE_PTR(flde);
+ fld_cache_entry_delete(cache, flde);
num++;
}
- CDEBUG(D_INFO, "%s: FLD cache - Shrinked by "
+ CDEBUG(D_INFO, "%s: FLD cache - Shrunk by "
"%d entries\n", cache->fci_name, num);
RETURN(0);
}
-int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
+/**
+ * kill all fld cache entries.
+ */
+void fld_cache_flush(struct fld_cache *cache)
{
- struct fld_cache_entry *flde, *fldt;
- struct hlist_head *bucket;
- struct hlist_node *scan;
ENTRY;
spin_lock(&cache->fci_lock);
-
- /* Check if cache already has the entry with such a seq. */
- bucket = fld_cache_bucket(cache, seq);
- hlist_for_each_entry(fldt, scan, bucket, fce_list) {
- if (fldt->fce_seq == seq) {
- spin_unlock(&cache->fci_lock);
- RETURN(-EEXIST);
- }
- }
+ cache->fci_cache_size = 0;
+ fld_cache_shrink(cache);
spin_unlock(&cache->fci_lock);
- /* Allocate new entry. */
- OBD_ALLOC_PTR(flde);
- if (!flde)
- RETURN(-ENOMEM);
+ EXIT;
+}
- /*
- * Check if cache has the entry with such a seq again. It could be added
- * while we were allocating new entry.
- */
- spin_lock(&cache->fci_lock);
- hlist_for_each_entry(fldt, scan, bucket, fce_list) {
- if (fldt->fce_seq == seq) {
- spin_unlock(&cache->fci_lock);
- OBD_FREE_PTR(flde);
- RETURN(0);
- }
+/**
+ * punch hole in existing range. divide this range and add new
+ * entry accordingly.
+ */
+
+void fld_cache_punch_hole(struct fld_cache *cache,
+ struct fld_cache_entry *f_curr,
+ struct fld_cache_entry *f_new)
+{
+ const struct lu_seq_range *range = &f_new->fce_range;
+ const seqno_t new_start = range->lsr_start;
+ const seqno_t new_end = range->lsr_end;
+ struct fld_cache_entry *fldt;
+
+ ENTRY;
+ OBD_ALLOC_GFP(fldt, sizeof *fldt, CFS_ALLOC_ATOMIC);
+ if (!fldt) {
+ OBD_FREE_PTR(f_new);
+ EXIT;
+ /* overlap is not allowed, so dont mess up list. */
+ return;
}
+ /* break f_curr RANGE into three RANGES:
+ * f_curr, f_new , fldt
+ */
- /* Add new entry to cache and lru list. */
- INIT_HLIST_NODE(&flde->fce_list);
- flde->fce_inflight = 1;
- flde->fce_invalid = 1;
- cfs_waitq_init(&flde->fce_waitq);
- flde->fce_seq = seq;
-
- hlist_add_head(&flde->fce_list, bucket);
- list_add(&flde->fce_lru, &cache->fci_lru);
- cache->fci_cache_count++;
+ /* f_new = *range */
- spin_unlock(&cache->fci_lock);
+ /* fldt */
+ fldt->fce_range.lsr_start = new_end;
+ fldt->fce_range.lsr_end = f_curr->fce_range.lsr_end;
+ fldt->fce_range.lsr_mdt = f_curr->fce_range.lsr_mdt;
- RETURN(0);
+ /* f_curr */
+ f_curr->fce_range.lsr_end = new_start;
+
+ /* add these two entries to list */
+ fld_cache_entry_add(cache, f_new, &f_curr->fce_list);
+ fld_cache_entry_add(cache, fldt, &f_new->fce_list);
+
+ /* no need to fixup */
+ EXIT;
}
-EXPORT_SYMBOL(fld_cache_insert_inflight);
-int fld_cache_insert(struct fld_cache *cache,
- seqno_t seq, mdsno_t mds)
+/**
+ * handle range overlap in fld cache.
+ */
+void fld_cache_overlap_handle(struct fld_cache *cache,
+ struct fld_cache_entry *f_curr,
+ struct fld_cache_entry *f_new)
{
- struct fld_cache_entry *flde, *fldt;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- int rc;
- ENTRY;
+ const struct lu_seq_range *range = &f_new->fce_range;
+ const seqno_t new_start = range->lsr_start;
+ const seqno_t new_end = range->lsr_end;
+ const mdsno_t mdt = range->lsr_mdt;
- spin_lock(&cache->fci_lock);
+ /* this is overlap case, these case are checking overlapping with
+ * prev range only. fixup will handle overlaping with next range. */
- /* Check if need to shrink cache. */
- rc = fld_cache_shrink(cache);
- if (rc) {
- spin_unlock(&cache->fci_lock);
- RETURN(rc);
- }
+ if (f_curr->fce_range.lsr_mdt == mdt) {
+ f_curr->fce_range.lsr_start = min(f_curr->fce_range.lsr_start,
+ new_start);
- /* Check if cache already has the entry with such a seq. */
- bucket = fld_cache_bucket(cache, seq);
- hlist_for_each_entry(fldt, scan, bucket, fce_list) {
- if (fldt->fce_seq == seq) {
- if (fldt->fce_inflight) {
- /* set mds for inflight entry */
- fldt->fce_mds = mds;
- fldt->fce_inflight = 0;
- fldt->fce_invalid = 0;
- cfs_waitq_signal(&fldt->fce_waitq);
- rc = 0;
- } else
- rc = -EEXIST;
- spin_unlock(&cache->fci_lock);
- RETURN(rc);
- }
- }
- spin_unlock(&cache->fci_lock);
+ f_curr->fce_range.lsr_end = max(f_curr->fce_range.lsr_end,
+ new_end);
- /* Allocate new entry. */
- OBD_ALLOC_PTR(flde);
- if (!flde)
- RETURN(-ENOMEM);
+ OBD_FREE_PTR(f_new);
+ fld_fix_new_list(cache);
- /*
- * Check if cache has the entry with such a seq again. It could be added
- * while we were allocating new entry.
- */
- spin_lock(&cache->fci_lock);
- hlist_for_each_entry(fldt, scan, bucket, fce_list) {
- if (fldt->fce_seq == seq) {
- spin_unlock(&cache->fci_lock);
- OBD_FREE_PTR(flde);
- RETURN(0);
- }
- }
+ } else if (new_start <= f_curr->fce_range.lsr_start &&
+ f_curr->fce_range.lsr_end <= new_end) {
+ /* case 1: new range completely overshadowed existing range.
+ * e.g. whole range migrated. update fld cache entry */
- /* Add new entry to cache and lru list. */
- INIT_HLIST_NODE(&flde->fce_list);
- flde->fce_mds = mds;
- flde->fce_seq = seq;
- flde->fce_inflight = 0;
- flde->fce_invalid = 0;
-
- hlist_add_head(&flde->fce_list, bucket);
- list_add(&flde->fce_lru, &cache->fci_lru);
- cache->fci_cache_count++;
+ f_curr->fce_range = *range;
+ OBD_FREE_PTR(f_new);
+ fld_fix_new_list(cache);
- spin_unlock(&cache->fci_lock);
+ } else if (f_curr->fce_range.lsr_start < new_start &&
+ new_end < f_curr->fce_range.lsr_end) {
+ /* case 2: new range fit within existing range. */
- RETURN(0);
+ fld_cache_punch_hole(cache, f_curr, f_new);
+
+ } else if (new_end <= f_curr->fce_range.lsr_end) {
+ /* case 3: overlap:
+ * [new_start [c_start new_end) c_end)
+ */
+
+ LASSERT(new_start <= f_curr->fce_range.lsr_start);
+
+ f_curr->fce_range.lsr_start = new_end;
+ fld_cache_entry_add(cache, f_new, f_curr->fce_list.prev);
+
+ } else if (f_curr->fce_range.lsr_start <= new_start) {
+ /* case 4: overlap:
+ * [c_start [new_start c_end) new_end)
+ */
+
+ LASSERT(f_curr->fce_range.lsr_end <= new_end);
+
+ f_curr->fce_range.lsr_end = new_start;
+ fld_cache_entry_add(cache, f_new, &f_curr->fce_list);
+ } else
+ CERROR("NEW range ="DRANGE" curr = "DRANGE"\n",
+ PRANGE(range),PRANGE(&f_curr->fce_range));
}
-EXPORT_SYMBOL(fld_cache_insert);
-void fld_cache_delete(struct fld_cache *cache, seqno_t seq)
+/**
+ * Insert FLD entry in FLD cache.
+ *
+ * This function handles all cases of merging and breaking up of
+ * ranges.
+ */
+void fld_cache_insert(struct fld_cache *cache,
+ const struct lu_seq_range *range)
{
- struct fld_cache_entry *flde;
- struct hlist_node *scan, *n;
- struct hlist_head *bucket;
+ struct fld_cache_entry *f_new;
+ struct fld_cache_entry *f_curr;
+ struct fld_cache_entry *n;
+ struct list_head *head;
+ struct list_head *prev = NULL;
+ const seqno_t new_start = range->lsr_start;
+ const seqno_t new_end = range->lsr_end;
ENTRY;
- bucket = fld_cache_bucket(cache, seq);
-
+ LASSERT(range_is_sane(range));
+
+ /* Allocate new entry. */
+ OBD_ALLOC_PTR(f_new);
+ if (!f_new) {
+ EXIT;
+ return;
+ }
+
+ f_new->fce_range = *range;
+
+ /*
+ * Duplicate entries are eliminated in inset op.
+ * So we don't need to search new entry before starting insertion loop.
+ */
+
spin_lock(&cache->fci_lock);
- hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
- if (flde->fce_seq == seq) {
- hlist_del_init(&flde->fce_list);
- list_del_init(&flde->fce_lru);
- if (flde->fce_inflight) {
- flde->fce_inflight = 0;
- flde->fce_invalid = 1;
- cfs_waitq_signal(&flde->fce_waitq);
- }
- cache->fci_cache_count--;
- OBD_FREE_PTR(flde);
- GOTO(out_unlock, 0);
+ fld_cache_shrink(cache);
+
+ head = &cache->fci_entries_head;
+
+ list_for_each_entry_safe(f_curr, n, head, fce_list) {
+ /* add list if next is end of list */
+ if (new_end < f_curr->fce_range.lsr_start)
+ break;
+
+ prev = &f_curr->fce_list;
+ /* check if this range is to left of new range. */
+ if (new_start < f_curr->fce_range.lsr_end) {
+ fld_cache_overlap_handle(cache, f_curr, f_new);
+ goto out;
}
}
- EXIT;
-out_unlock:
- spin_unlock(&cache->fci_lock);
-}
-EXPORT_SYMBOL(fld_cache_delete);
+ if (prev == NULL)
+ prev = head;
-static int fld_check_inflight(struct fld_cache_entry *flde)
-{
- return (flde->fce_inflight);
+ /* Add new entry to cache and lru list. */
+ fld_cache_entry_add(cache, f_new, prev);
+out:
+ spin_unlock(&cache->fci_lock);
+ EXIT;
}
+/**
+ * lookup \a seq sequence for range in fld cache.
+ */
int fld_cache_lookup(struct fld_cache *cache,
- seqno_t seq, mdsno_t *mds)
+ const seqno_t seq, struct lu_seq_range *range)
{
struct fld_cache_entry *flde;
- struct hlist_node *scan, *n;
- struct hlist_head *bucket;
+ struct list_head *head;
ENTRY;
- bucket = fld_cache_bucket(cache, seq);
spin_lock(&cache->fci_lock);
+ head = &cache->fci_entries_head;
+
cache->fci_stat.fst_count++;
- hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
- if (flde->fce_seq == seq) {
- if (flde->fce_inflight) {
- /* lookup RPC is inflight need to wait */
- struct l_wait_info lwi;
- spin_unlock(&cache->fci_lock);
- lwi = LWI_TIMEOUT(0, NULL, NULL);
- l_wait_event(flde->fce_waitq,
- !fld_check_inflight(flde), &lwi);
- LASSERT(!flde->fce_inflight);
- if (flde->fce_invalid)
- RETURN(-ENOENT);
-
- *mds = flde->fce_mds;
- cache->fci_stat.fst_inflight++;
- } else {
- LASSERT(!flde->fce_invalid);
- *mds = flde->fce_mds;
- list_del(&flde->fce_lru);
- list_add(&flde->fce_lru, &cache->fci_lru);
- cache->fci_stat.fst_cache++;
- spin_unlock(&cache->fci_lock);
- }
+ list_for_each_entry(flde, head, fce_list) {
+ if (flde->fce_range.lsr_start > seq)
+ break;
+
+ if (range_within(&flde->fce_range, seq)) {
+ *range = flde->fce_range;
+
+ /* update position of this entry in lru list. */
+ list_move(&flde->fce_lru, &cache->fci_lru);
+ cache->fci_stat.fst_cache++;
+ spin_unlock(&cache->fci_lock);
RETURN(0);
}
}
spin_unlock(&cache->fci_lock);
RETURN(-ENOENT);
}
-EXPORT_SYMBOL(fld_cache_lookup);
-#else
-int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
-{
- return -ENOTSUPP;
-}
-EXPORT_SYMBOL(fld_cache_insert_inflight);
-
-int fld_cache_insert(struct fld_cache *cache,
- seqno_t seq, mdsno_t mds)
-{
- return -ENOTSUPP;
-}
-EXPORT_SYMBOL(fld_cache_insert);
-
-void fld_cache_delete(struct fld_cache *cache,
- seqno_t seq)
-{
- return;
-}
-EXPORT_SYMBOL(fld_cache_delete);
-
-int fld_cache_lookup(struct fld_cache *cache,
- seqno_t seq, mdsno_t *mds)
-{
- return -ENOTSUPP;
-}
-EXPORT_SYMBOL(fld_cache_lookup);
-#endif
*
* Author: Yury Umanets <umka@clusterfs.com>
* Author: WangDi <wangdi@clusterfs.com>
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
*/
#ifndef EXPORT_SYMTAB
}
}
-/* Insert index entry and update cache. */
+/**
+ * Insert FLD index entry and update FLD cache.
+ *
+ * First it try to merge given range with existing range then update
+ * FLD index and FLD cache accordingly. FLD index consistency is maintained
+ * by this function.
+ * This function is called from the sequence allocator when a super-sequence
+ * is granted to a server.
+ */
+
int fld_server_create(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t mds)
+ struct lu_seq_range *add_range,
+ struct thandle *th)
{
- int rc;
+ struct lu_seq_range *erange;
+ struct lu_seq_range *new;
+ struct fld_thread_info *info;
+ int rc = 0;
+ int do_merge=0;
+
ENTRY;
-
- rc = fld_index_create(fld, env, seq, mds);
-
- if (rc == 0) {
- /*
- * Do not return result of calling fld_cache_insert()
- * here. First of all because it may return -EEXISTS. Another
- * reason is that, we do not want to stop proceeding even after
- * cache errors.
+
+ info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+ mutex_lock(&fld->lsf_lock);
+
+ erange = &info->fti_lrange;
+ new = &info->fti_irange;
+ *new = *add_range;
+
+ /* STEP 1: try to merge with previous range */
+ rc = fld_index_lookup(fld, env, new->lsr_start, erange);
+ if (!rc) {
+ /* in case of range overlap, mdt ID must be same for both ranges */
+ if (new->lsr_mdt != erange->lsr_mdt) {
+ CERROR("mdt[%x] for given range is different from"
+ "existing overlapping range mdt[%x]\n",
+ new->lsr_mdt, erange->lsr_mdt);
+ rc = -EIO;
+ GOTO(out, rc);
+ }
+
+ if (new->lsr_end < erange->lsr_end)
+ GOTO(out, rc);
+ do_merge = 1;
+
+ } else if (rc == -ENOENT) {
+ /* check for merge case: optimizes for single mds lustre.
+ * As entry does not exist, returned entry must be left side
+ * entry compared to start of new range (ref dio_lookup()).
+ * So try to merge from left.
*/
- fld_cache_insert(fld->lsf_cache, seq, mds);
+ if (new->lsr_start == erange->lsr_end &&
+ new->lsr_mdt == erange->lsr_mdt)
+ do_merge = 1;
+ } else {
+ /* no overlap allowed in fld, so failure in lookup is error */
+ GOTO(out, rc);
}
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_server_create);
+ if (do_merge) {
+ /* new range can be combined with existing one.
+ * So delete existing range.
+ */
-/* Delete index entry. */
-int fld_server_delete(struct lu_server_fld *fld,
- const struct lu_env *env,
- seqno_t seq)
-{
- int rc;
- ENTRY;
+ rc = fld_index_delete(fld, env, erange, th);
+ if (rc == 0) {
+ new->lsr_start = min(erange->lsr_start, new->lsr_start);
+ new->lsr_end = max(erange->lsr_end, new->lsr_end);
+ } else
+ GOTO(out, rc);
+
+ do_merge = 0;
+ }
- fld_cache_delete(fld->lsf_cache, seq);
- rc = fld_index_delete(fld, env, seq);
+ /* STEP 2: try to merge with next range */
+ rc = fld_index_lookup(fld, env, new->lsr_end, erange);
+ if (!rc) {
+ /* case range overlap: with right side entry. */
+ if (new->lsr_mdt == erange->lsr_mdt)
+ do_merge = 1;
+ } else if (rc == -ENOENT) {
+ /* this range is left of new range end point */
+ LASSERT(erange->lsr_end <= new->lsr_end);
+
+ if (new->lsr_end == erange->lsr_end)
+ do_merge = 1;
+ if (new->lsr_start <= erange->lsr_start)
+ do_merge = 1;
+ } else
+ GOTO(out, rc);
+
+ if (do_merge) {
+ if (new->lsr_mdt != erange->lsr_mdt) {
+ CERROR("mdt[%x] for given range is different from"
+ "existing overlapping range mdt[%x]\n",
+ new->lsr_mdt, erange->lsr_mdt);
+ rc = -EIO;
+ GOTO(out, rc);
+ }
+ /* merge with next range */
+ rc = fld_index_delete(fld, env, erange, th);
+ if (rc == 0) {
+ new->lsr_start = min(erange->lsr_start, new->lsr_start);
+ new->lsr_end = max(erange->lsr_end, new->lsr_end);
+ } else
+ GOTO(out, rc);
+ }
+
+ /* now update fld entry. */
+ rc = fld_index_create(fld, env, new, th);
+
+ LASSERT(rc != -EEXIST);
+out:
+ if (rc == 0)
+ fld_cache_insert(fld->lsf_cache, new);
+
+ mutex_unlock(&fld->lsf_lock);
+
+ CDEBUG((rc != 0 ? D_ERROR : D_INFO),
+ "%s: FLD create: given range : "DRANGE
+ "after merge "DRANGE" rc = %d \n", fld->lsf_name,
+ PRANGE(add_range), PRANGE(new), rc);
+
RETURN(rc);
}
-EXPORT_SYMBOL(fld_server_delete);
-/* Lookup mds by seq. */
+EXPORT_SYMBOL(fld_server_create);
+
+/**
+ * Lookup mds by seq, returns a range for given seq.
+ *
+ * If that entry is not cached in fld cache, request is sent to super
+ * sequence controller node (MDT0). All other MDT[1...N] and client
+ * cache fld entries, but this cache is not persistent.
+ */
+
int fld_server_lookup(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t *mds)
+ seqno_t seq, struct lu_seq_range *range)
{
int rc;
ENTRY;
-
+
/* Lookup it in the cache. */
- rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
+ rc = fld_cache_lookup(fld->lsf_cache, seq, range);
if (rc == 0)
RETURN(0);
- rc = fld_index_lookup(fld, env, seq, mds);
- if (rc == 0) {
- /*
- * Do not return error here as well. See previous comment in
- * same situation in function fld_server_create().
+ if (fld->lsf_obj)
+ rc = fld_index_lookup(fld, env, seq, range);
+ else {
+ LASSERT(fld->lsf_control_exp);
+ /* send request to mdt0 i.e. super seq. controller.
+ * This is temporary solution, long term solution is fld
+ * replication on all mdt servers.
*/
- fld_cache_insert(fld->lsf_cache, seq, *mds);
+ rc = fld_client_rpc(fld->lsf_control_exp,
+ range, FLD_LOOKUP);
}
+
+ if (rc == 0)
+ fld_cache_insert(fld->lsf_cache, range);
+
RETURN(rc);
}
EXPORT_SYMBOL(fld_server_lookup);
+/**
+ * All MDT server handle fld lookup operation. But only MDT0 has fld index.
+ * if entry is not found in cache we need to forward lookup request to MDT0
+ */
+
static int fld_server_handle(struct lu_server_fld *fld,
const struct lu_env *env,
- __u32 opc, struct md_fld *mf,
+ __u32 opc, struct lu_seq_range *range,
struct fld_thread_info *info)
{
int rc;
ENTRY;
switch (opc) {
- case FLD_CREATE:
- rc = fld_server_create(fld, env,
- mf->mf_seq, mf->mf_mds);
-
- /* Do not return -EEXIST error for resent case */
- if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
- rc = 0;
- break;
- case FLD_DELETE:
- rc = fld_server_delete(fld, env, mf->mf_seq);
-
- /* Do not return -ENOENT error for resent case */
- if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
- rc = 0;
- break;
case FLD_LOOKUP:
rc = fld_server_lookup(fld, env,
- mf->mf_seq, &mf->mf_mds);
+ range->lsr_start, range);
break;
default:
rc = -EINVAL;
break;
}
- CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
- LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
- mf->mf_seq, mf->mf_mds);
+ CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
+ DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
RETURN(rc);
struct fld_thread_info *info)
{
struct lu_site *site;
- struct md_fld *in;
- struct md_fld *out;
+ struct lu_seq_range *in;
+ struct lu_seq_range *out;
int rc;
__u32 *opc;
ENTRY;
static void fld_thread_info_init(struct ptlrpc_request *req,
struct fld_thread_info *info)
{
- info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
-
info->fti_pill = &req->rq_pill;
/* Init request capsule. */
req_capsule_init(info->fti_pill, req, RCL_SERVER);
*
* fid_is_local() is supposed to be used in assertion checks only.
*/
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+int fid_is_local(const struct lu_env *env,
+ struct lu_site *site, const struct lu_fid *fid)
{
int result;
struct md_site *msite;
+ struct lu_seq_range *range;
+ struct fld_thread_info *info;
+ ENTRY;
+
+ info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+ range = &info->fti_lrange;
result = 1; /* conservatively assume fid is local */
msite = lu_site2md(site);
if (msite->ms_client_fld != NULL) {
- mdsno_t mds;
int rc;
rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
- fid_seq(fid), &mds);
+ fid_seq(fid), range);
if (rc == 0)
- result = (mds == msite->ms_node_id);
+ result = (range->lsr_mdt == msite->ms_node_id);
}
return result;
}
#endif
int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
- const char *prefix, const struct lu_env *env)
+ const char *prefix, const struct lu_env *env,
+ int mds_node_id)
{
int cache_size, cache_threshold;
int rc;
cache_threshold = cache_size *
FLD_SERVER_CACHE_THRESHOLD / 100;
+ mutex_init(&fld->lsf_lock);
fld->lsf_cache = fld_cache_init(fld->lsf_name,
- FLD_SERVER_HTABLE_SIZE,
cache_size, cache_threshold);
if (IS_ERR(fld->lsf_cache)) {
rc = PTR_ERR(fld->lsf_cache);
GOTO(out, rc);
}
- rc = fld_index_init(fld, env, dt);
- if (rc)
- GOTO(out, rc);
+ if (!mds_node_id) {
+ rc = fld_index_init(fld, env, dt);
+ if (rc)
+ GOTO(out, rc);
+ } else
+ fld->lsf_obj = NULL;
rc = fld_server_proc_init(fld);
if (rc)
GOTO(out, rc);
+ fld->lsf_control_exp = NULL;
EXIT;
out:
if (rc)
#include <dt_object.h>
#include <md_object.h>
#include <lustre_mdc.h>
+#include <lustre_fid.h>
#include <lustre_fld.h>
#include "fld_internal.h"
const char fld_index_name[] = "fld";
-EXPORT_SYMBOL(fld_index_name);
+
+static const struct lu_seq_range IGIF_FLD_RANGE = {
+ .lsr_start = 1,
+ .lsr_end = IDIF_SEQ_START,
+ .lsr_mdt = 0
+};
const struct dt_index_features fld_index_features = {
.dif_flags = DT_IND_UPDATE,
.dif_keysize_min = sizeof(seqno_t),
.dif_keysize_max = sizeof(seqno_t),
- .dif_recsize_min = sizeof(mdsno_t),
- .dif_recsize_max = sizeof(mdsno_t),
+ .dif_recsize_min = sizeof(struct lu_seq_range),
+ .dif_recsize_max = sizeof(struct lu_seq_range),
.dif_ptrsize = 4
};
-EXPORT_SYMBOL(fld_index_features);
-
-/*
- * number of blocks to reserve for particular operations. Should be function of
- * ... something. Stub for now.
- */
-enum {
- FLD_TXN_INDEX_INSERT_CREDITS = 20,
- FLD_TXN_INDEX_DELETE_CREDITS = 20,
-};
-
extern struct lu_context_key fld_thread_key;
static struct dt_key *fld_key(const struct lu_env *env,
}
static struct dt_rec *fld_rec(const struct lu_env *env,
- const mdsno_t mds)
+ const struct lu_seq_range *range)
{
struct fld_thread_info *info;
+ struct lu_seq_range *rec;
ENTRY;
info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
LASSERT(info != NULL);
+ rec = &info->fti_rec;
+
+ range_cpu_to_be(rec, range);
+ RETURN((void *)rec);
+}
+
+struct thandle* fld_trans_start(struct lu_server_fld *fld,
+ const struct lu_env *env, int credit)
+{
+ struct fld_thread_info *info;
+ struct dt_device *dt_dev;
+ struct txn_param *p;
+
+ dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+ info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+ p = &info->fti_txn_param;
+ txn_param_init(p, credit);
- info->fti_rec = cpu_to_be64(mds);
- RETURN((void *)&info->fti_rec);
+ return dt_dev->dd_ops->dt_trans_start(env, dt_dev, p);
}
+void fld_trans_stop(struct lu_server_fld *fld,
+ const struct lu_env *env, struct thandle* th)
+{
+ struct dt_device *dt_dev;
+
+ dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+ dt_dev->dd_ops->dt_trans_stop(env, th);
+}
+
+/**
+ * insert range in fld store.
+ *
+ * \param range range to be inserted
+ * \param th transaction for this operation as it could compound
+ * transaction.
+ *
+ * \retval 0 success
+ * \retval -ve error
+ */
+
int fld_index_create(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t mds)
+ const struct lu_seq_range *range,
+ struct thandle *th)
{
struct dt_object *dt_obj = fld->lsf_obj;
struct dt_device *dt_dev;
- struct txn_param txn;
- struct thandle *th;
+ seqno_t start;
int rc;
+
ENTRY;
+ start = range->lsr_start;
+ LASSERT(range_is_sane(range));
dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
- /* stub here, will fix it later */
- txn_param_init(&txn, FLD_TXN_INDEX_INSERT_CREDITS);
-
- th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
- if (!IS_ERR(th)) {
- rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
- fld_rec(env, mds),
- fld_key(env, seq),
- th, BYPASS_CAPA, 1);
- dt_dev->dd_ops->dt_trans_stop(env, th);
- } else
- rc = PTR_ERR(th);
+ rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
+ fld_rec(env, range),
+ fld_key(env, start),
+ th, BYPASS_CAPA, 1);
+
+ CDEBUG(D_INFO, "%s: insert given range : "DRANGE" rc = %d\n",
+ fld->lsf_name, PRANGE(range), rc);
RETURN(rc);
}
+/**
+ * delete range in fld store.
+ *
+ * \param range range to be deleted
+ * \param th transaction
+ *
+ * \retval 0 success
+ * \retval -ve error
+ */
+
int fld_index_delete(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq)
+ struct lu_seq_range *range,
+ struct thandle *th)
{
struct dt_object *dt_obj = fld->lsf_obj;
struct dt_device *dt_dev;
- struct txn_param txn;
- struct thandle *th;
+ seqno_t seq = range->lsr_start;
int rc;
+
ENTRY;
dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
- txn_param_init(&txn, FLD_TXN_INDEX_DELETE_CREDITS);
- th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
- if (!IS_ERR(th)) {
- rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
- fld_key(env, seq), th,
- BYPASS_CAPA);
- dt_dev->dd_ops->dt_trans_stop(env, th);
- } else
- rc = PTR_ERR(th);
+ rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
+ fld_key(env, seq), th,
+ BYPASS_CAPA);
+
+ CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n",
+ fld->lsf_name, PRANGE(range), rc);
+
RETURN(rc);
}
+/**
+ * lookup range for a seq passed
+ *
+ * \param seq seq for lookup.
+ * \param range result of lookup.
+ *
+ * \retval 0 success
+ * \retval -ve error
+ */
+
int fld_index_lookup(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t *mds)
+ seqno_t seq,
+ struct lu_seq_range *range)
{
- struct dt_object *dt_obj = fld->lsf_obj;
- struct dt_rec *rec = fld_rec(env, 0);
+ struct dt_object *dt_obj = fld->lsf_obj;
+ struct lu_seq_range *fld_rec;
+ struct dt_key *key = fld_key(env, seq);
+ struct fld_thread_info *info;
int rc;
+
ENTRY;
- rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec,
- fld_key(env, seq), BYPASS_CAPA);
- if (rc > 0) {
- *mds = be64_to_cpu(*(__u64 *)rec);
+ info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+ fld_rec = &info->fti_rec;
+
+ rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj,
+ (struct dt_rec*) fld_rec,
+ key, BYPASS_CAPA);
+
+ if (rc >= 0) {
+ range_be_to_cpu(fld_rec, fld_rec);
+ *range = *fld_rec;
+ if (range_within(range, seq))
+ rc = 0;
+ else
+ rc = -ENOENT;
+ }
+
+ CDEBUG(D_INFO, "%s: lookup seq = %llx range : "DRANGE" rc = %d\n",
+ fld->lsf_name, seq, PRANGE(range), rc);
+
+ RETURN(rc);
+}
+
+static int fld_insert_igif_fld(struct lu_server_fld *fld,
+ const struct lu_env *env)
+{
+ struct thandle *th;
+ int rc;
+
+ ENTRY;
+ th = fld_trans_start(fld, env, FLD_TXN_INDEX_INSERT_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
+ fld_trans_stop(fld, env, th);
+ if (rc == -EEXIST)
rc = 0;
- } else
- rc = -ENOENT;
RETURN(rc);
}
fld->lsf_obj = dt_obj;
rc = dt_obj->do_ops->do_index_try(env, dt_obj,
&fld_index_features);
- if (rc == 0)
+ if (rc == 0) {
LASSERT(dt_obj->do_index_ops != NULL);
- else
+ rc = fld_insert_igif_fld(fld, env);
+
+ if (rc != 0) {
+ CERROR("insert igif in fld! = %d\n", rc);
+ lu_object_put(env, &dt_obj->do_lu);
+ fld->lsf_obj = NULL;
+ }
+ } else
CERROR("%s: File \"%s\" is not an index!\n",
fld->lsf_name, fld_index_name);
+
+
} else {
CERROR("%s: Can't find \"%s\" obj %d\n",
fld->lsf_name, fld_index_name, (int)PTR_ERR(dt_obj));
#include <dt_object.h>
#include <libcfs/libcfs.h>
-
#include <lustre_req_layout.h>
#include <lustre_fld.h>
+enum {
+ LUSTRE_FLD_INIT = 1 << 0,
+ LUSTRE_FLD_RUN = 1 << 1
+};
+
+struct fld_stats {
+ __u64 fst_count;
+ __u64 fst_cache;
+ __u64 fst_inflight;
+};
+
+typedef int (*fld_hash_func_t) (struct lu_client_fld *, __u64);
+
+typedef struct lu_fld_target *
+(*fld_scan_func_t) (struct lu_client_fld *, __u64);
+
+struct lu_fld_hash {
+ const char *fh_name;
+ fld_hash_func_t fh_hash_func;
+ fld_scan_func_t fh_scan_func;
+};
+
+struct fld_cache_entry {
+ struct list_head fce_lru;
+ struct list_head fce_list;
+ /**
+ * fld cache entries are sorted on range->lsr_start field. */
+ struct lu_seq_range fce_range;
+};
+
+struct fld_cache {
+ /**
+ * Cache guard, protects fci_hash mostly because others immutable after
+ * init is finished.
+ */
+ spinlock_t fci_lock;
+
+ /**
+ * Cache shrink threshold */
+ int fci_threshold;
+
+ /**
+ * Prefered number of cached entries */
+ int fci_cache_size;
+
+ /**
+ * Current number of cached entries. Protected by @fci_lock */
+ int fci_cache_count;
+
+ /**
+ * LRU list fld entries. */
+ struct list_head fci_lru;
+
+ /**
+ * sorted fld entries. */
+ struct list_head fci_entries_head;
+
+ /**
+ * Cache statistics. */
+ struct fld_stats fci_stat;
+
+ /**
+ * Cache name used for debug and messages. */
+ char fci_name[80];
+};
+
enum fld_op {
FLD_CREATE = 0,
FLD_DELETE = 1,
FLD_CLIENT_CACHE_THRESHOLD = 10
};
-enum {
- /*
- * One page is used for hashtable. That is sizeof(struct hlist_head) *
- * 1024.
- */
- FLD_CLIENT_HTABLE_SIZE = (1024 * 1),
-
- /*
- * Here 4 pages are used for hashtable of server cache. This is is
- * because cache it self is 4 times bugger.
- */
- FLD_SERVER_HTABLE_SIZE = (1024 * 4)
-};
-
extern struct lu_fld_hash fld_hash[];
#ifdef __KERNEL__
+
struct fld_thread_info {
struct req_capsule *fti_pill;
__u64 fti_key;
- __u64 fti_rec;
- __u32 fti_flags;
+ struct lu_seq_range fti_rec;
+ struct lu_seq_range fti_lrange;
+ struct lu_seq_range fti_irange;
+ struct txn_param fti_txn_param;
};
+
+struct thandle* fld_trans_start(struct lu_server_fld *fld,
+ const struct lu_env *env, int credit);
+
+void fld_trans_stop(struct lu_server_fld *fld,
+ const struct lu_env *env, struct thandle* th);
+
int fld_index_init(struct lu_server_fld *fld,
const struct lu_env *env,
struct dt_device *dt);
int fld_index_create(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t mds);
+ const struct lu_seq_range *range,
+ struct thandle *th);
int fld_index_delete(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq);
+ struct lu_seq_range *range,
+ struct thandle *th);
int fld_index_lookup(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t *mds);
+ seqno_t seq, struct lu_seq_range *range);
+
+int fld_client_rpc(struct obd_export *exp,
+ struct lu_seq_range *range, __u32 fld_op);
#ifdef LPROCFS
extern struct lprocfs_vars fld_server_proc_list[];
#endif
+struct fld_cache *fld_cache_init(const char *name,
+ int cache_size, int cache_threshold);
+
+void fld_cache_fini(struct fld_cache *cache);
+
+void fld_cache_flush(struct fld_cache *cache);
+
+void fld_cache_insert(struct fld_cache *cache,
+ const struct lu_seq_range *range);
+
+void fld_cache_delete(struct fld_cache *cache,
+ const struct lu_seq_range *range);
+
+int fld_cache_lookup(struct fld_cache *cache,
+ const seqno_t seq, struct lu_seq_range *range);
+
static inline const char *
fld_target_name(struct lu_fld_target *tar)
{
RETURN(NULL);
}
-static int fld_dht_hash(struct lu_client_fld *fld,
- seqno_t seq)
-{
- /* XXX: here should be DHT hash */
- return fld_rrb_hash(fld, seq);
-}
-
-static struct lu_fld_target *
-fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
-{
- /* XXX: here should be DHT scan code */
- return fld_rrb_scan(fld, seq);
-}
-
-struct lu_fld_hash fld_hash[3] = {
- {
- .fh_name = "DHT",
- .fh_hash_func = fld_dht_hash,
- .fh_scan_func = fld_dht_scan
- },
+struct lu_fld_hash fld_hash[] = {
{
.fh_name = "RRB",
.fh_hash_func = fld_rrb_hash,
FLD_CLIENT_CACHE_THRESHOLD / 100;
fld->lcf_cache = fld_cache_init(fld->lcf_name,
- FLD_CLIENT_HTABLE_SIZE,
cache_size, cache_threshold);
if (IS_ERR(fld->lcf_cache)) {
rc = PTR_ERR(fld->lcf_cache);
}
EXPORT_SYMBOL(fld_client_fini);
-static int fld_client_rpc(struct obd_export *exp,
- struct md_fld *mf, __u32 fld_op)
+int fld_client_rpc(struct obd_export *exp,
+ struct lu_seq_range *range, __u32 fld_op)
{
struct ptlrpc_request *req;
- struct md_fld *pmf;
+ struct lu_seq_range *prange;
__u32 *op;
int rc;
ENTRY;
op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
*op = fld_op;
- pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
- *pmf = *mf;
+ prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
+ *prange = *range;
ptlrpc_request_set_replen(req);
req->rq_request_portal = FLD_REQUEST_PORTAL;
if (rc)
GOTO(out_req, rc);
- pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
- if (pmf == NULL)
+ prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
+ if (prange == NULL)
GOTO(out_req, rc = -EFAULT);
- *mf = *pmf;
+ *range = *prange;
EXIT;
out_req:
ptlrpc_req_finished(req);
return rc;
}
-int fld_client_create(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t mds,
- const struct lu_env *env)
-{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
- struct lu_fld_target *target;
- int rc;
- ENTRY;
-
- fld->lcf_flags |= LUSTRE_FLD_RUN;
- target = fld_client_get_target(fld, seq);
- LASSERT(target != NULL);
-
- CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: "
- LPU64") on target %s (idx "LPU64")\n", fld->lcf_name,
- seq, mds, fld_target_name(target), target->ft_idx);
-
-#ifdef __KERNEL__
- if (target->ft_srv != NULL) {
- LASSERT(env != NULL);
- rc = fld_server_create(target->ft_srv, env, seq, mds);
- } else {
-#endif
- rc = fld_client_rpc(target->ft_exp, &md_fld, FLD_CREATE);
-#ifdef __KERNEL__
- }
-#endif
-
- if (rc == 0) {
- /*
- * Do not return result of calling fld_cache_insert()
- * here. First of all because it may return -EEXIST. Another
- * reason is that, we do not want to stop proceeding because of
- * cache errors.
- */
- fld_cache_insert(fld->lcf_cache, seq, mds);
- } else {
- CERROR("%s: Can't create FLD entry, rc %d\n",
- fld->lcf_name, rc);
- }
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_create);
-
-int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
- const struct lu_env *env)
-{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
- struct lu_fld_target *target;
- int rc;
- ENTRY;
-
- fld->lcf_flags |= LUSTRE_FLD_RUN;
- fld_cache_delete(fld->lcf_cache, seq);
-
- target = fld_client_get_target(fld, seq);
- LASSERT(target != NULL);
-
- CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on "
- "target %s (idx "LPU64")\n", fld->lcf_name, seq,
- fld_target_name(target), target->ft_idx);
-
-#ifdef __KERNEL__
- if (target->ft_srv != NULL) {
- LASSERT(env != NULL);
- rc = fld_server_delete(target->ft_srv,
- env, seq);
- } else {
-#endif
- rc = fld_client_rpc(target->ft_exp,
- &md_fld, FLD_DELETE);
-#ifdef __KERNEL__
- }
-#endif
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_delete);
-
int fld_client_lookup(struct lu_client_fld *fld,
seqno_t seq, mdsno_t *mds,
const struct lu_env *env)
{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+ struct lu_seq_range res;
struct lu_fld_target *target;
int rc;
ENTRY;
fld->lcf_flags |= LUSTRE_FLD_RUN;
- rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
- if (rc == 0)
+ rc = fld_cache_lookup(fld->lcf_cache, seq, &res);
+ if (rc == 0) {
+ *mds = res.lsr_mdt;
RETURN(0);
+ }
/* Can not find it in the cache */
target = fld_client_get_target(fld, seq);
"target %s (idx "LPU64")\n", fld->lcf_name, seq,
fld_target_name(target), target->ft_idx);
+ res.lsr_start = seq;
#ifdef __KERNEL__
if (target->ft_srv != NULL) {
LASSERT(env != NULL);
rc = fld_server_lookup(target->ft_srv,
- env, seq, &md_fld.mf_mds);
+ env, seq, &res);
} else {
#endif
- /*
- * insert the 'inflight' sequence. No need to protect that,
- * we are trying to reduce numbers of RPC but not restrict
- * to them exactly one
- */
- fld_cache_insert_inflight(fld->lcf_cache, seq);
rc = fld_client_rpc(target->ft_exp,
- &md_fld, FLD_LOOKUP);
+ &res, FLD_LOOKUP);
#ifdef __KERNEL__
}
#endif
- if (seq < FID_SEQ_START) {
- /*
- * The current solution for IGIF is to bind it to mds0.
- * In the future, this should be fixed once IGIF can be found
- * in FLD.
- */
- md_fld.mf_mds = 0;
- rc = 0;
- }
if (rc == 0) {
- *mds = md_fld.mf_mds;
+ *mds = res.lsr_mdt;
- /*
- * Do not return error here as well. See previous comment in
- * same situation in function fld_client_create().
- */
- fld_cache_insert(fld->lcf_cache, seq, *mds);
- } else {
- /* remove 'inflight' seq if it exists */
- fld_cache_delete(fld->lcf_cache, seq);
+ fld_cache_insert(fld->lcf_cache, &res);
}
RETURN(rc);
}
#define LUSTRE_LOG_VERSION 0x00050000
#define LUSTRE_MGS_VERSION 0x00060000
-typedef __u64 mdsno_t;
+typedef __u32 mdsno_t;
typedef __u64 seqno_t;
-struct lu_range {
- __u64 lr_start;
- __u64 lr_end;
- /** stub for compact fld work. */
- __u64 lr_padding;
+/**
+ * Describes a range of sequence, lsr_start is included but lsr_end is
+ * not in the range.
+ * Same structure is used in fld module where lsr_mdt field holds mdt id
+ * of the home mdt.
+ */
+
+struct lu_seq_range {
+ __u64 lsr_start;
+ __u64 lsr_end;
+ __u32 lsr_mdt;
+ __u32 lsr_padding;
};
/**
* returns width of given range \a r
*/
-static inline __u64 range_space(const struct lu_range *range)
+static inline __u64 range_space(const struct lu_seq_range *range)
{
- return range->lr_end - range->lr_start;
+ return range->lsr_end - range->lsr_start;
}
/**
* initialize range to zero
*/
-static inline void range_init(struct lu_range *range)
+
+static inline void range_init(struct lu_seq_range *range)
{
- range->lr_start = range->lr_end = 0;
+ range->lsr_start = range->lsr_end = range->lsr_mdt = 0;
}
/**
* check if given seq id \a s is within given range \a r
*/
-static inline int range_within(struct lu_range *range,
+
+static inline int range_within(const struct lu_seq_range *range,
__u64 s)
{
- return s >= range->lr_start && s < range->lr_end;
+ return s >= range->lsr_start && s < range->lsr_end;
}
/**
* allocate \a w units of sequence from range \a from.
*/
-static inline void range_alloc(struct lu_range *to,
- struct lu_range *from,
+static inline void range_alloc(struct lu_seq_range *to,
+ struct lu_seq_range *from,
__u64 width)
{
- to->lr_start = from->lr_start;
- to->lr_end = from->lr_start + width;
- from->lr_start += width;
+ to->lsr_start = from->lsr_start;
+ to->lsr_end = from->lsr_start + width;
+ from->lsr_start += width;
}
-static inline int range_is_sane(const struct lu_range *range)
+static inline int range_is_sane(const struct lu_seq_range *range)
{
- return (range->lr_end >= range->lr_start);
+ return (range->lsr_end >= range->lsr_start);
}
-static inline int range_is_zero(const struct lu_range *range)
+static inline int range_is_zero(const struct lu_seq_range *range)
{
- return (range->lr_start == 0 && range->lr_end == 0);
+ return (range->lsr_start == 0 && range->lsr_end == 0);
}
-static inline int range_is_exhausted(const struct lu_range *range)
+static inline int range_is_exhausted(const struct lu_seq_range *range)
+
{
return range_space(range) == 0;
}
-#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x]"
+#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x):%x"
#define PRANGE(range) \
- (range)->lr_start, \
- (range)->lr_end
+ (range)->lsr_start, \
+ (range)->lsr_end, \
+ (range)->lsr_mdt
/** \defgroup lu_fid lu_fid
* @{ */
}
extern void lustre_swab_lu_fid(struct lu_fid *fid);
-extern void lustre_swab_lu_range(struct lu_range *range);
+extern void lustre_swab_lu_seq_range(struct lu_seq_range *range);
static inline int lu_fid_eq(const struct lu_fid *f0,
const struct lu_fid *f1)
extern void lustre_swab_lmv_desc (struct lmv_desc *ld);
-struct md_fld {
- seqno_t mf_seq;
- mdsno_t mf_mds;
-};
-
-extern void lustre_swab_md_fld (struct md_fld *mf);
-
enum fld_rpc_opc {
FLD_QUERY = 600,
FLD_LAST_OPC,
struct lu_context;
/* Whole sequences space range and zero range definitions */
-extern const struct lu_range LUSTRE_SEQ_SPACE_RANGE;
-extern const struct lu_range LUSTRE_SEQ_ZERO_RANGE;
+extern const struct lu_seq_range LUSTRE_SEQ_SPACE_RANGE;
+extern const struct lu_seq_range LUSTRE_SEQ_ZERO_RANGE;
extern const struct lu_fid LUSTRE_BFL_FID;
enum {
* This is how may FIDs may be allocated in one sequence. 16384 for
* now.
*/
- LUSTRE_SEQ_MAX_WIDTH = 0x0000000000004000ULL,
+ LUSTRE_SEQ_MAX_WIDTH = 0x0000000000000400ULL,
/*
* How many sequences may be allocate for meta-sequence (this is 128
* clients, this contains meta-sequence range. And for servers this
* contains super-sequence range.
*/
- struct lu_range lcs_space;
+ struct lu_seq_range lcs_space;
/* Seq related proc */
cfs_proc_dir_entry_t *lcs_proc_dir;
/* server sequence manager interface */
struct lu_server_seq {
/* Available sequences space */
- struct lu_range lss_space;
+ struct lu_seq_range lss_space;
/*
* Device for server side seq manager needs (saving sequences to backing
* LUSTRE_SEQ_SUPER_WIDTH and LUSTRE_SEQ_META_WIDTH.
*/
__u64 lss_width;
+
+ /**
+ * Pointer to site object, required to access site fld.
+ */
+ struct md_site *lss_site;
};
int seq_query(struct com_thread_info *info);
struct dt_device *dev,
const char *prefix,
enum lu_mgr_type type,
+ struct md_site *ls,
const struct lu_env *env);
void seq_server_fini(struct lu_server_seq *seq,
const struct lu_env *env);
int seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env);
int seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env);
int seq_server_set_cli(struct lu_server_seq *seq,
struct lu_fid *fid);
/* Fids common stuff */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid);
+int fid_is_local(const struct lu_env *env,
+ struct lu_site *site, const struct lu_fid *fid);
/* fid locking */
#define LUSTRE_SEQ_CTL_NAME "seq_ctl"
/* Range common stuff */
-void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src);
-void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src);
-void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src);
-void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src);
+static inline void range_cpu_to_le(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+ dst->lsr_start = cpu_to_le64(src->lsr_start);
+ dst->lsr_end = cpu_to_le64(src->lsr_end);
+ dst->lsr_mdt = cpu_to_le32(src->lsr_mdt);
+}
+
+static inline void range_le_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+ dst->lsr_start = le64_to_cpu(src->lsr_start);
+ dst->lsr_end = le64_to_cpu(src->lsr_end);
+ dst->lsr_mdt = le32_to_cpu(src->lsr_mdt);
+}
+
+static inline void range_cpu_to_be(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+ dst->lsr_start = cpu_to_be64(src->lsr_start);
+ dst->lsr_end = cpu_to_be64(src->lsr_end);
+ dst->lsr_mdt = cpu_to_be32(src->lsr_mdt);
+}
+
+static inline void range_be_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+ dst->lsr_start = be64_to_cpu(src->lsr_start);
+ dst->lsr_end = be64_to_cpu(src->lsr_end);
+ dst->lsr_mdt = be32_to_cpu(src->lsr_mdt);
+}
#endif /* __LINUX_FID_H */
struct lu_client_fld;
struct lu_server_fld;
+struct lu_fld_hash;
+struct fld_cache;
extern const struct dt_index_features fld_index_features;
extern const char fld_index_name[];
-
-struct fld_stats {
- __u64 fst_count;
- __u64 fst_cache;
- __u64 fst_inflight;
-};
-
/*
* FLD (Fid Location Database) interface.
*/
LUSTRE_CLI_FLD_HASH_RRB
};
-struct lu_server_fld;
struct lu_fld_target {
struct list_head ft_chain;
__u64 ft_idx;
};
-typedef int
-(*fld_hash_func_t) (struct lu_client_fld *, __u64);
-
-typedef struct lu_fld_target *
-(*fld_scan_func_t) (struct lu_client_fld *, __u64);
-
-struct lu_fld_hash {
- const char *fh_name;
- fld_hash_func_t fh_hash_func;
- fld_scan_func_t fh_scan_func;
-};
-
-struct fld_cache_entry {
- struct hlist_node fce_list;
- struct list_head fce_lru;
- mdsno_t fce_mds;
- seqno_t fce_seq;
- cfs_waitq_t fce_waitq;
- __u32 fce_inflight:1,
- fce_invalid:1;
-};
-
-struct fld_cache {
- /*
- * Cache guard, protects fci_hash mostly because others immutable after
- * init is finished.
- */
- spinlock_t fci_lock;
-
- /* Cache shrink threshold */
- int fci_threshold;
-
- /* Prefered number of cached entries */
- int fci_cache_size;
-
- /* Current number of cached entries. Protected by @fci_lock */
- int fci_cache_count;
-
- /* Hash table size (number of collision lists) */
- int fci_hash_size;
-
- /* Hash table mask */
- int fci_hash_mask;
-
- /* Hash table for all collision lists */
- struct hlist_head *fci_hash_table;
-
- /* Lru list */
- struct list_head fci_lru;
-
- /* Cache statistics. */
- struct fld_stats fci_stat;
-
- /* Cache name used for debug and messages. */
- char fci_name[80];
-};
-
struct lu_server_fld {
- /* Fld dir proc entry. */
+ /**
+ * Fld dir proc entry. */
cfs_proc_dir_entry_t *lsf_proc_dir;
- /* /fld file object device */
+ /**
+ * /fld file object device */
struct dt_object *lsf_obj;
- /* Client FLD cache. */
+ /**
+ * super sequence controller export, needed to forward fld
+ * lookup request. */
+ struct obd_export *lsf_control_exp;
+
+ /**
+ * Client FLD cache. */
struct fld_cache *lsf_cache;
- /* Protect index modifications */
- struct semaphore lsf_sem;
+ /**
+ * Protect index modifications */
+ struct mutex lsf_lock;
- /* Fld service name in form "fld-srv-lustre-MDTXXX" */
+ /**
+ * Fld service name in form "fld-srv-lustre-MDTXXX" */
char lsf_name[80];
};
-enum {
- LUSTRE_FLD_INIT = 1 << 0,
- LUSTRE_FLD_RUN = 1 << 1
-};
-
struct lu_client_fld {
- /* Client side proc entry. */
+ /**
+ * Client side proc entry. */
cfs_proc_dir_entry_t *lcf_proc_dir;
- /* List of exports client FLD knows about. */
+ /**
+ * List of exports client FLD knows about. */
struct list_head lcf_targets;
- /* Current hash to be used to chose an export. */
+ /**
+ * Current hash to be used to chose an export. */
struct lu_fld_hash *lcf_hash;
- /* Exports count. */
+ /**
+ * Exports count. */
int lcf_count;
- /* Lock protecting exports list and fld_hash. */
+ /**
+ * Lock protecting exports list and fld_hash. */
spinlock_t lcf_lock;
- /* Client FLD cache. */
+ /**
+ * Client FLD cache. */
struct fld_cache *lcf_cache;
- /* Client fld proc entry name. */
+ /**
+ * Client fld proc entry name. */
char lcf_name[80];
const struct lu_context *lcf_ctx;
-
+
int lcf_flags;
};
+/**
+ * number of blocks to reserve for particular operations. Should be function of
+ * ... something. Stub for now.
+ */
+enum {
+ /* one insert operation can involve two delete and one insert */
+ FLD_TXN_INDEX_INSERT_CREDITS = 60,
+ FLD_TXN_INDEX_DELETE_CREDITS = 20,
+};
+
int fld_query(struct com_thread_info *info);
/* Server methods */
int fld_server_init(struct lu_server_fld *fld,
struct dt_device *dt,
const char *prefix,
- const struct lu_env *env);
+ const struct lu_env *env,
+ int mds_node_id);
void fld_server_fini(struct lu_server_fld *fld,
const struct lu_env *env);
int fld_server_create(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t mds);
+ struct lu_seq_range *add_range,
+ struct thandle *th);
int fld_server_delete(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq);
+ struct lu_seq_range *range);
int fld_server_lookup(struct lu_server_fld *fld,
const struct lu_env *env,
- seqno_t seq, mdsno_t *mds);
+ seqno_t seq, struct lu_seq_range *range);
/* Client methods */
int fld_client_init(struct lu_client_fld *fld,
const struct lu_env *env);
int fld_client_create(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t mds,
+ struct lu_seq_range *range,
const struct lu_env *env);
int fld_client_delete(struct lu_client_fld *fld,
int fld_client_del_target(struct lu_client_fld *fld,
__u64 idx);
-/* Cache methods */
-struct fld_cache *fld_cache_init(const char *name,
- int hash_size,
- int cache_size,
- int cache_threshold);
-
-void fld_cache_fini(struct fld_cache *cache);
-
-void fld_cache_flush(struct fld_cache *cache);
-
-int fld_cache_insert(struct fld_cache *cache,
- seqno_t seq, mdsno_t mds);
-
-int fld_cache_insert_inflight(struct fld_cache *cache,
- seqno_t seq);
-
-void fld_cache_delete(struct fld_cache *cache,
- seqno_t seq);
-
-int
-fld_cache_lookup(struct fld_cache *cache,
- seqno_t seq, mdsno_t *mds);
-
#endif
RETURN(rc);
}
- CDEBUG(D_INODE, "FLD lookup got mds #"LPU64" for fid="DFID"\n",
+ CDEBUG(D_INODE, "FLD lookup got mds #%x for fid="DFID"\n",
*mds, PFID(fid));
if (*mds >= lmv->desc.ld_tgt_count) {
- CERROR("FLD lookup got invalid mds #"LPU64" (max: %d) "
+ CERROR("FLD lookup got invalid mds #%x (max: %x) "
"for fid="DFID"\n", *mds, lmv->desc.ld_tgt_count,
PFID(fid));
rc = -EINVAL;
}
CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
- "obj -> mds #"LPU64"\n", PFID(fid), mds);
+ "obj -> mds #%x\n", PFID(fid), mds);
RETURN(rc);
}
rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
if (rc > 0) {
LASSERT(fid_is_sane(fid));
-
- /*
- * Client switches to new sequence, setup FLD.
- */
- rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
- mds, NULL);
- if (rc) {
- /*
- * Delete just allocated fid sequence in case
- * of fail back.
- */
- CERROR("Can't create fld entry, rc %d\n", rc);
- obd_fid_delete(tgt->ltd_exp, NULL);
- }
+ rc = 0;
}
EXIT;
else if (rc)
RETURN(rc);
- CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n",
+ CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
op_data->op_mds);
RETURN(rc);
}
- CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n",
+ CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n",
mds, PFID(&op_data->op_fid1));
op_data->op_fsuid = current->fsuid;
struct mdt_device *m, int lost)
{
struct md_site *ms = mdt_md_site(m);
- struct lu_range out;
+ struct lu_seq_range out;
ENTRY;
LASSERT(ms && ms->ms_server_seq);
rc = seq_server_init(ms->ms_control_seq,
m->mdt_bottom, uuid,
LUSTRE_SEQ_CONTROLLER,
+ ms,
env);
if (rc)
rc = seq_server_init(ms->ms_server_seq,
m->mdt_bottom, uuid,
LUSTRE_SEQ_SERVER,
+ ms,
env);
if (rc)
GOTO(out_seq_fini, rc = -ENOMEM);
RETURN(rc = -ENOMEM);
rc = fld_server_init(ms->ms_server_fld,
- m->mdt_bottom, uuid, env);
+ m->mdt_bottom, uuid,
+ env, ms->ms_node_id);
if (rc) {
OBD_FREE_PTR(ms->ms_server_fld);
ms->ms_server_fld = NULL;
}
EXPORT_SYMBOL(lustre_swab_lu_fid);
-void lustre_swab_lu_range(struct lu_range *range)
+void lustre_swab_lu_seq_range(struct lu_seq_range *range)
{
- __swab64s (&range->lr_start);
- __swab64s (&range->lr_end);
+ __swab64s (&range->lsr_start);
+ __swab64s (&range->lsr_end);
+ __swab32s (&range->lsr_mdt);
}
-EXPORT_SYMBOL(lustre_swab_lu_range);
+EXPORT_SYMBOL(lustre_swab_lu_seq_range);
void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail)
{
* fids. Unfortunately it is somewhat expensive (does a
* cache-lookup). Disabling it for production/acceptance-testing.
*/
- LASSERT(1 || fid_is_local(ldev->ld_site, fid));
+ LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
ENTRY;
const struct req_msg_field RMF_SEQ_RANGE =
DEFINE_MSGF("seq_query_range", 0,
- sizeof(struct lu_range), lustre_swab_lu_range);
+ sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
EXPORT_SYMBOL(RMF_SEQ_RANGE);
const struct req_msg_field RMF_FLD_OPC =
const struct req_msg_field RMF_FLD_MDFLD =
DEFINE_MSGF("fld_query_mdfld", 0,
- sizeof(struct md_fld), lustre_swab_md_fld);
+ sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
EXPORT_SYMBOL(RMF_FLD_MDFLD);
const struct req_msg_field RMF_MDT_BODY =
__swab32s (&ld->ld_active_tgt_count);
/* uuid endian insensitive */
}
-/*end adding MDT by huanghua@clusterfs.com*/
-void lustre_swab_md_fld (struct md_fld *mf)
-{
- __swab64s(&mf->mf_seq);
- __swab64s(&mf->mf_mds);
-}
static void print_lum (struct lov_user_md *lum)
{
EXPORT_SYMBOL(lustre_msg_set_status);
EXPORT_SYMBOL(lustre_msg_set_conn_cnt);
EXPORT_SYMBOL(lustre_swab_mgs_target_info);
-EXPORT_SYMBOL(lustre_swab_md_fld);
EXPORT_SYMBOL(lustre_swab_generic_32s);
EXPORT_SYMBOL(lustre_swab_lustre_capa);
EXPORT_SYMBOL(lustre_swab_lustre_capa_key);
load_module ptlrpc/ptlrpc
load_module ptlrpc/gss/ptlrpc_gss
[ "$USE_QUOTA" = "yes" -a "$LQUOTA" != "no" ] && load_module quota/lquota
- load_module fid/fid
load_module fld/fld
+ load_module fid/fid
load_module lmv/lmv
load_module mdc/mdc
load_module osc/osc
#define __REQ_LAYOUT_USER__ (1)
#define lustre_swab_generic_32s NULL
-#define lustre_swab_lu_range NULL
+#define lustre_swab_lu_seq_range NULL
#define lustre_swab_md_fld NULL
#define lustre_swab_mdt_body NULL
#define lustre_swab_mdt_epoch NULL
LASSERTF((int)sizeof(((xattr_acl_header *)0)->a_entries) == 0, " found %lld\n",
(long long)(int)sizeof(((xattr_acl_header *)0)->a_entries));
#endif
+
+ /* check fid range */
+ LASSERTF((int)sizeof(struct lu_seq_range) == 24, " found %lld\n",
+ (long long)(int)sizeof(struct lu_seq_range));
+ LASSERTF((int)offsetof(struct lu_seq_range, lsr_start) == 0, " found %lld\n",
+ (long long)(int)offsetof(struct lu_seq_range, lsr_start));
+ LASSERTF((int)offsetof(struct lu_seq_range, lsr_end) == 8, " found %lld\n",
+ (long long)(int)offsetof(struct lu_seq_range, lsr_end));
+ LASSERTF((int)offsetof(struct lu_seq_range, lsr_mdt) == 16, " found %lld\n",
+ (long long)(int)offsetof(struct lu_seq_range, lsr_mdt));
+ LASSERTF((int)offsetof(struct lu_seq_range, lsr_padding) == 20, " found %lld\n",
+ (long long)(int)offsetof(struct lu_seq_range, lsr_padding));
+
}