From: pravins Date: Fri, 28 Nov 2008 09:17:15 +0000 (+0000) Subject: b=15957 X-Git-Tag: v1_9_120~50 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=4201416b775b14d6e4cd89b7c68bb1c1bc950144 b=15957 i=Nikita i=umka compact FLD feature. --- diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 3fb9d7d..3df013c 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -12,7 +12,15 @@ tbd Sun Microsystems, Inc. * RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a removed cwd "./" (refer to Bugzilla 14399). * File join has been disabled in this release, refer to Bugzilla 16929. - + +Severity : enhancement +Bugzilla : 15957 +Description: compact fld format with extents +Details : Store range of seq rather than every seq in FLD. Seq + controller update FLD rather than clients. In Case of CMD, mdt0 + has FLD, all other metadata server act as non persistent proxy + for FLD queries and cache fld entries in fld cache. + Severity : normal Frequency : rare Bugzilla : 16081 diff --git a/lustre/cmm/cmm_device.c b/lustre/cmm/cmm_device.c index a74fdf4..08a435e 100644 --- a/lustre/cmm/cmm_device.c +++ b/lustre/cmm/cmm_device.c @@ -447,6 +447,7 @@ static int cmm_add_mdc(const struct lu_env *env, struct lu_device *ld; struct lu_device *cmm_lu = cmm2lu_dev(cm); mdsno_t mdc_num; + struct lu_site *site = cmm2lu_dev(cm)->ld_site; int rc; ENTRY; @@ -471,7 +472,7 @@ static int cmm_add_mdc(const struct lu_env *env, if (IS_ERR(ld)) RETURN(PTR_ERR(ld)); - ld->ld_site = cmm2lu_dev(cm)->ld_site; + ld->ld_site = site; rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL); if (rc) { @@ -509,6 +510,13 @@ static int cmm_add_mdc(const struct lu_env *env, target.ft_exp = mc->mc_desc.cl_exp; fld_client_add_target(cm->cmm_fld, &target); + if (mc->mc_num == 0) { + /* this is mdt0 -> mc export, fld lookup need this export + to forward fld lookup request. */ + LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp); + lu_site2md(site)->ms_server_fld->lsf_control_exp = + mc->mc_desc.cl_exp; + } /* Set max md size for the mdc. */ rc = cmm_post_init_mdc(env, cm); RETURN(rc); diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 73c9dad..7cbf87d 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -66,12 +66,12 @@ int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid, } if (*mds > cm->cmm_tgt_count) { - CERROR("Got invalid mdsno: "LPU64" (max: %u)\n", + CERROR("Got invalid mdsno: %x (max: %x)\n", *mds, cm->cmm_tgt_count); rc = -EINVAL; } else { - CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: " - LPU64"\n", *mds, fid_seq(fid)); + CDEBUG(D_INFO, "CMM: got MDS %x for sequence: " + LPX64"\n", *mds, fid_seq(fid)); } RETURN (rc); diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 361b38d..8cb4cd9 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -268,13 +268,8 @@ static int cmm_split_fid_alloc(const struct lu_env *env, /* Alloc new fid on @mc. */ rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL); - if (rc > 0) { - /* Setup FLD for new sequenceif needed. */ - rc = fld_client_create(cmm->cmm_fld, fid_seq(fid), - mc->mc_num, env); - if (rc) - CERROR("Can't create fld entry, rc %d\n", rc); - } + if (rc > 0) + rc = 0; up(&mc->mc_fid_sem); RETURN(rc); diff --git a/lustre/fid/fid_handler.c b/lustre/fid/fid_handler.c index 9e3d7ad..5af9343 100644 --- a/lustre/fid/fid_handler.c +++ b/lustre/fid/fid_handler.c @@ -93,6 +93,7 @@ int seq_server_set_cli(struct lu_server_seq *seq, seq->lss_name, cli->lcs_name); seq->lss_cli = cli; + cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id; EXIT; out_up: up(&seq->lss_sem); @@ -100,16 +101,22 @@ out_up: } EXPORT_SYMBOL(seq_server_set_cli); -/* +/** * On controller node, allocate new super sequence for regular sequence server. + * As this super sequence controller, this node suppose to maintain fld + * and update index. + * \a out range always has currect mds node number of requester. */ + static int __seq_server_alloc_super(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { - struct lu_range *space = &seq->lss_space; - int rc; + struct lu_seq_range *space = &seq->lss_space; + struct thandle *th; + __u64 mdt = out->lsr_mdt; + int rc, credit; ENTRY; LASSERT(range_is_sane(space)); @@ -118,8 +125,8 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq, CDEBUG(D_INFO, "%s: Input seq range: " DRANGE"\n", seq->lss_name, PRANGE(in)); - if (in->lr_end > space->lr_start) - space->lr_start = in->lr_end; + if (in->lsr_end > space->lsr_start) + space->lsr_start = in->lsr_end; *out = *in; CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n", @@ -130,7 +137,7 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq, "Only "LPU64" sequences left\n", seq->lss_name, range_space(space)); *out = *space; - space->lr_start = space->lr_end; + space->lsr_start = space->lsr_end; } else if (range_is_exhausted(space)) { CERROR("%s: Sequences space is exhausted\n", seq->lss_name); @@ -139,23 +146,40 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq, range_alloc(out, space, seq->lss_width); } } + out->lsr_mdt = mdt; + + credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS; + + th = seq_store_trans_start(seq, env, credit); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); - rc = seq_store_write(seq, env); + rc = seq_store_write(seq, env, th); if (rc) { CERROR("%s: Can't write space data, rc %d\n", seq->lss_name, rc); - RETURN(rc); + goto out; } - CDEBUG(D_INFO, "%s: Allocated super-sequence " - DRANGE"\n", seq->lss_name, PRANGE(out)); + rc = fld_server_create(seq->lss_site->ms_server_fld, + env, out, th); + if (rc) { + CERROR("%s: Can't Update fld database, rc %d\n", + seq->lss_name, rc); + } + +out: + seq_store_trans_stop(seq, env, th); + + CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d " + DRANGE"\n", seq->lss_name, rc, PRANGE(out)); RETURN(rc); } int seq_server_alloc_super(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { int rc; @@ -169,12 +193,14 @@ int seq_server_alloc_super(struct lu_server_seq *seq, } static int __seq_server_alloc_meta(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { - struct lu_range *space = &seq->lss_space; + struct lu_seq_range *space = &seq->lss_space; + struct thandle *th; int rc = 0; + ENTRY; LASSERT(range_is_sane(space)); @@ -193,22 +219,22 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, * we check here that range from client is "newer" than * exhausted super. */ - LASSERT(in->lr_end > space->lr_start); + LASSERT(in->lsr_end > space->lsr_start); /* * Start is set to end of last allocated, because it * *is* already allocated so we take that into account * and do not use for other allocations. */ - space->lr_start = in->lr_end; + space->lsr_start = in->lsr_end; /* - * End is set to in->lr_start + super sequence - * allocation unit. That is because in->lr_start is + * End is set to in->lsr_start + super sequence + * allocation unit. That is because in->lsr_start is * first seq in new allocated range from controller * before failure. */ - space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH; + space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH; if (!seq->lss_cli) { CERROR("%s: No sequence controller " @@ -221,6 +247,7 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, * obtained range from it was @space. */ rc = seq_client_replay_super(seq->lss_cli, space, env); + if (rc) { CERROR("%s: Can't replay super-sequence, " "rc %d\n", seq->lss_name, rc); @@ -231,8 +258,8 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, * Update super start by end from client's range. Super * end should not be changed if range was not exhausted. */ - if (in->lr_end > space->lr_start) - space->lr_start = in->lr_end; + if (in->lsr_end > space->lsr_start) + space->lsr_start = in->lsr_end; } *out = *in; @@ -266,7 +293,11 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, range_alloc(out, space, seq->lss_width); } - rc = seq_store_write(seq, env); + th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = seq_store_write(seq, env, th); if (rc) { CERROR("%s: Can't write space data, rc %d\n", seq->lss_name, rc); @@ -277,12 +308,13 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, DRANGE"\n", seq->lss_name, PRANGE(out)); } + seq_store_trans_stop(seq, env, th); RETURN(rc); } int seq_server_alloc_meta(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { int rc; @@ -298,8 +330,8 @@ EXPORT_SYMBOL(seq_server_alloc_meta); static int seq_server_handle(struct lu_site *site, const struct lu_env *env, - __u32 opc, struct lu_range *in, - struct lu_range *out) + __u32 opc, struct lu_seq_range *in, + struct lu_seq_range *out) { int rc; struct md_site *mite; @@ -337,7 +369,7 @@ static int seq_req_handle(struct ptlrpc_request *req, const struct lu_env *env, struct seq_thread_info *info) { - struct lu_range *out, *in = NULL; + struct lu_seq_range *out, *in = NULL, *tmp; struct lu_site *site; int rc = -EPROTO; __u32 *opc; @@ -356,13 +388,16 @@ static int seq_req_handle(struct ptlrpc_request *req, if (out == NULL) RETURN(err_serious(-EPROTO)); - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { - in = req_capsule_client_get(info->sti_pill, - &RMF_SEQ_RANGE); + tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE); + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { + in = tmp; LASSERT(!range_is_zero(in) && range_is_sane(in)); } + /* seq client passed mdt id, we need to pass that using out + * range parameter */ + out->lsr_mdt = tmp->lsr_mdt; rc = seq_server_handle(site, env, *opc, in, out); } else rc = err_serious(-EPROTO); @@ -475,8 +510,10 @@ int seq_server_init(struct lu_server_seq *seq, struct dt_device *dev, const char *prefix, enum lu_mgr_type type, + struct md_site *ms, const struct lu_env *env) { + struct thandle *th; int rc, is_srv = (type == LUSTRE_SEQ_SERVER); ENTRY; @@ -485,6 +522,7 @@ int seq_server_init(struct lu_server_seq *seq, seq->lss_cli = NULL; seq->lss_type = type; + seq->lss_site = ms; range_init(&seq->lss_space); sema_init(&seq->lss_sem, 1); @@ -497,7 +535,6 @@ int seq_server_init(struct lu_server_seq *seq, rc = seq_store_init(seq, env, dev); if (rc) GOTO(out, rc); - /* Request backing store for saved sequence info. */ rc = seq_store_read(seq, env); if (rc == -ENODATA) { @@ -507,16 +544,22 @@ int seq_server_init(struct lu_server_seq *seq, LUSTRE_SEQ_ZERO_RANGE: LUSTRE_SEQ_SPACE_RANGE; + seq->lss_space.lsr_mdt = ms->ms_node_id; CDEBUG(D_INFO, "%s: No data found " "on store. Initialize space\n", seq->lss_name); + th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + /* Save default controller value to store. */ - rc = seq_store_write(seq, env); + rc = seq_store_write(seq, env, th); if (rc) { CERROR("%s: Can't write space data, " "rc %d\n", seq->lss_name, rc); } + seq_store_trans_stop(seq, env, th); } else if (rc) { CERROR("%s: Can't read space data, rc %d\n", seq->lss_name, rc); diff --git a/lustre/fid/fid_internal.h b/lustre/fid/fid_internal.h index 9137656..03c5227 100644 --- a/lustre/fid/fid_internal.h +++ b/lustre/fid/fid_internal.h @@ -49,10 +49,14 @@ struct seq_thread_info { struct req_capsule *sti_pill; struct txn_param sti_txn; - struct lu_range sti_space; + struct lu_seq_range sti_space; struct lu_buf sti_buf; }; +enum { + SEQ_TXN_STORE_CREDITS = 20 +}; + extern struct lu_context_key seq_thread_key; /* Functions used internally in module. */ @@ -60,7 +64,7 @@ int seq_client_alloc_super(struct lu_client_seq *seq, const struct lu_env *env); int seq_client_replay_super(struct lu_client_seq *seq, - struct lu_range *range, + struct lu_seq_range *range, const struct lu_env *env); /* Store API functions. */ @@ -72,11 +76,19 @@ void seq_store_fini(struct lu_server_seq *seq, const struct lu_env *env); int seq_store_write(struct lu_server_seq *seq, - const struct lu_env *env); + const struct lu_env *env, + struct thandle *th); int seq_store_read(struct lu_server_seq *seq, const struct lu_env *env); +struct thandle * seq_store_trans_start(struct lu_server_seq *seq, + const struct lu_env *env, + int credits); +void seq_store_trans_stop(struct lu_server_seq *seq, + const struct lu_env *env, + struct thandle *th); + #ifdef LPROCFS extern struct lprocfs_vars seq_server_proc_list[]; extern struct lprocfs_vars seq_client_proc_list[]; diff --git a/lustre/fid/fid_lib.c b/lustre/fid/fid_lib.c index 694ee78..76e779a 100644 --- a/lustre/fid/fid_lib.c +++ b/lustre/fid/fid_lib.c @@ -71,14 +71,14 @@ * The first 0x400 sequences of normal FID are reserved for special purpose. * FID_SEQ_START + 1 is for local file id generation. */ -const struct lu_range LUSTRE_SEQ_SPACE_RANGE = { +const struct lu_seq_range LUSTRE_SEQ_SPACE_RANGE = { FID_SEQ_START + 0x400ULL, (__u64)~0ULL }; EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE); /* Zero range, used for init and other purposes. */ -const struct lu_range LUSTRE_SEQ_ZERO_RANGE = { +const struct lu_seq_range LUSTRE_SEQ_ZERO_RANGE = { 0, 0 }; @@ -89,54 +89,3 @@ const struct lu_fid LUSTRE_BFL_FID = { .f_seq = 0x0000000000000003, .f_oid = 0x0000000000000001, .f_ver = 0x0000000000000000 }; EXPORT_SYMBOL(LUSTRE_BFL_FID); - -void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src) -{ - /* check that all fields are converted */ - CLASSERT(sizeof(*src) == - sizeof(src->lr_start) + - sizeof(src->lr_end) + - sizeof(src->lr_padding)); - dst->lr_start = cpu_to_le64(src->lr_start); - dst->lr_end = cpu_to_le64(src->lr_end); -} -EXPORT_SYMBOL(range_cpu_to_le); - -void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src) -{ - /* check that all fields are converted */ - CLASSERT(sizeof(*src) == - sizeof(src->lr_start) + - sizeof(src->lr_end) + - sizeof(src->lr_padding)); - dst->lr_start = le64_to_cpu(src->lr_start); - dst->lr_end = le64_to_cpu(src->lr_end); -} -EXPORT_SYMBOL(range_le_to_cpu); - -#ifdef __KERNEL__ -void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src) -{ - /* check that all fields are converted */ - CLASSERT(sizeof(*src) == - sizeof(src->lr_start) + - sizeof(src->lr_end) + - sizeof(src->lr_padding)); - dst->lr_start = cpu_to_be64(src->lr_start); - dst->lr_end = cpu_to_be64(src->lr_end); -} -EXPORT_SYMBOL(range_cpu_to_be); - -void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src) -{ - /* check that all fields are converted */ - CLASSERT(sizeof(*src) == - sizeof(src->lr_start) + - sizeof(src->lr_end) + - sizeof(src->lr_padding)); - dst->lr_start = be64_to_cpu(src->lr_start); - dst->lr_end = be64_to_cpu(src->lr_end); -} -EXPORT_SYMBOL(range_be_to_cpu); - -#endif diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index c6c3881..9939c82 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -63,13 +63,13 @@ #include #include "fid_internal.h" -static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input, - struct lu_range *output, __u32 opc, +static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input, + struct lu_seq_range *output, __u32 opc, const char *opcname) { struct obd_export *exp = seq->lcs_exp; struct ptlrpc_request *req; - struct lu_range *out, *in; + struct lu_seq_range *out, *in; __u32 *op; int rc; ENTRY; @@ -95,9 +95,13 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input, if (seq->lcs_type == LUSTRE_SEQ_METADATA) { req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ? SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL; + /* update mdt field of *in, it is required for fld update + * on super sequence allocator node. */ + if (opc == SEQ_ALLOC_SUPER) + in->lsr_mdt = seq->lcs_space.lsr_mdt; } else { - req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ? - SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL; + LASSERT(opc == SEQ_ALLOC_META); + req->rq_request_portal = SEQ_DATA_PORTAL; } ptlrpc_at_set_req_timeout(req); @@ -135,7 +139,7 @@ out_req: /* Request sequence-controller node to allocate new super-sequence. */ int seq_client_replay_super(struct lu_client_seq *seq, - struct lu_range *range, + struct lu_seq_range *range, const struct lu_env *env) { int rc; @@ -212,8 +216,8 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) } LASSERT(!range_is_exhausted(&seq->lcs_space)); - *seqnr = seq->lcs_space.lr_start; - seq->lcs_space.lr_start += 1; + *seqnr = seq->lcs_space.lsr_start; + seq->lcs_space.lsr_start += 1; CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name, *seqnr); @@ -280,6 +284,13 @@ void seq_client_flush(struct lu_client_seq *seq) LASSERT(seq != NULL); down(&seq->lcs_sem); fid_zero(&seq->lcs_fid); + /** + * this id shld not be used for seq range allocation. + * set to -1 for dgb check. + */ + + seq->lcs_space.lsr_mdt = -1; + range_init(&seq->lcs_space); up(&seq->lcs_sem); } diff --git a/lustre/fid/fid_store.c b/lustre/fid/fid_store.c index de4bec3..56b950c 100644 --- a/lustre/fid/fid_store.c +++ b/lustre/fid/fid_store.c @@ -62,9 +62,6 @@ #include "fid_internal.h" #ifdef __KERNEL__ -enum { - SEQ_TXN_STORE_CREDITS = 20 -}; static struct lu_buf *seq_store_buf(struct seq_thread_info *info) { @@ -76,47 +73,68 @@ static struct lu_buf *seq_store_buf(struct seq_thread_info *info) return buf; } +struct thandle * seq_store_trans_start(struct lu_server_seq *seq, + const struct lu_env *env, int credit) +{ + struct seq_thread_info *info; + struct dt_device *dt_dev; + struct thandle *th; + ENTRY; + + dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); + info = lu_context_key_get(&env->le_ctx, &seq_thread_key); + LASSERT(info != NULL); + + txn_param_init(&info->sti_txn, credit); + + th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn); + return th; +} + +void seq_store_trans_stop(struct lu_server_seq *seq, + const struct lu_env *env, + struct thandle *th) +{ + struct dt_device *dt_dev; + ENTRY; + + dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); + + dt_dev->dd_ops->dt_trans_stop(env, th); +} + /* This function implies that caller takes care about locking. */ int seq_store_write(struct lu_server_seq *seq, - const struct lu_env *env) + const struct lu_env *env, + struct thandle *th) { struct dt_object *dt_obj = seq->lss_obj; struct seq_thread_info *info; struct dt_device *dt_dev; - struct thandle *th; loff_t pos = 0; - int rc; - ENTRY; + int rc; + ENTRY; dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); info = lu_context_key_get(&env->le_ctx, &seq_thread_key); LASSERT(info != NULL); - /* Stub here, will fix it later. */ - txn_param_init(&info->sti_txn, SEQ_TXN_STORE_CREDITS); + /* Store ranges in le format. */ + range_cpu_to_le(&info->sti_space, &seq->lss_space); - th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn); - if (!IS_ERR(th)) { - /* Store ranges in le format. */ - range_cpu_to_le(&info->sti_space, &seq->lss_space); - - rc = dt_obj->do_body_ops->dbo_write(env, dt_obj, - seq_store_buf(info), - &pos, th, BYPASS_CAPA, 1); - if (rc == sizeof(info->sti_space)) { - CDEBUG(D_INFO, "%s: Space - "DRANGE"\n", - seq->lss_name, PRANGE(&seq->lss_space)); - rc = 0; - } else if (rc >= 0) { - rc = -EIO; - } - - dt_dev->dd_ops->dt_trans_stop(env, th); - } else { - rc = PTR_ERR(th); + rc = dt_obj->do_body_ops->dbo_write(env, dt_obj, + seq_store_buf(info), + &pos, th, BYPASS_CAPA, 1); + if (rc == sizeof(info->sti_space)) { + CDEBUG(D_INFO, "%s: Space - "DRANGE"\n", + seq->lss_name, PRANGE(&seq->lss_space)); + rc = 0; + } else if (rc >= 0) { + rc = -EIO; } - - RETURN(rc); + + + RETURN(rc); } /* diff --git a/lustre/fid/lproc_fid.c b/lustre/fid/lproc_fid.c index 63fbacd..e9976f9 100644 --- a/lustre/fid/lproc_fid.c +++ b/lustre/fid/lproc_fid.c @@ -69,16 +69,16 @@ static int seq_proc_write_common(struct file *file, const char *buffer, unsigned long count, void *data, - struct lu_range *range) + struct lu_seq_range *range) { - struct lu_range tmp; + struct lu_seq_range tmp; int rc; ENTRY; LASSERT(range != NULL); - rc = sscanf(buffer, "[%Lx - %Lx]\n",(long long unsigned *)&tmp.lr_start, - (long long unsigned *)&tmp.lr_end); + rc = sscanf(buffer, "[%Lx - %Lx]\n",(long long unsigned *)&tmp.lsr_start, + (long long unsigned *)&tmp.lsr_end); if (rc != 2 || !range_is_sane(&tmp) || range_is_zero(&tmp)) RETURN(-EINVAL); *range = tmp; @@ -88,13 +88,13 @@ seq_proc_write_common(struct file *file, const char *buffer, static int seq_proc_read_common(char *page, char **start, off_t off, int count, int *eof, void *data, - struct lu_range *range) + struct lu_seq_range *range) { int rc; ENTRY; *eof = 1; - rc = snprintf(page, count, "["LPX64" - "LPX64"]\n", + rc = snprintf(page, count, "["LPX64" - "LPX64"]:%x\n", PRANGE(range)); RETURN(rc); } diff --git a/lustre/fld/fld_cache.c b/lustre/fld/fld_cache.c index 9ec1f1a..695fc21 100644 --- a/lustre/fld/fld_cache.c +++ b/lustre/fld/fld_cache.c @@ -37,6 +37,7 @@ * * FLD (Fids Location Database) * + * Author: Pravin Shelar * Author: Yury Umanets */ @@ -67,74 +68,35 @@ #include #include "fld_internal.h" -#ifdef __KERNEL__ -static inline __u32 fld_cache_hash(seqno_t seq) -{ - return (__u32)seq; -} - -void fld_cache_flush(struct fld_cache *cache) -{ - struct fld_cache_entry *flde; - struct hlist_head *bucket; - struct hlist_node *scan; - struct hlist_node *next; - int i; - ENTRY; - - /* Free all cache entries. */ - spin_lock(&cache->fci_lock); - for (i = 0; i < cache->fci_hash_size; i++) { - bucket = cache->fci_hash_table + i; - hlist_for_each_entry_safe(flde, scan, next, bucket, fce_list) { - hlist_del_init(&flde->fce_list); - list_del_init(&flde->fce_lru); - cache->fci_cache_count--; - OBD_FREE_PTR(flde); - } - } - spin_unlock(&cache->fci_lock); - EXIT; -} - -struct fld_cache *fld_cache_init(const char *name, int hash_size, +/** + * create fld cache. + */ +struct fld_cache *fld_cache_init(const char *name, int cache_size, int cache_threshold) { - struct fld_cache *cache; - int i; + struct fld_cache *cache; ENTRY; LASSERT(name != NULL); - LASSERT(IS_PO2(hash_size)); LASSERT(cache_threshold < cache_size); OBD_ALLOC_PTR(cache); if (cache == NULL) RETURN(ERR_PTR(-ENOMEM)); - INIT_LIST_HEAD(&cache->fci_lru); + CFS_INIT_LIST_HEAD(&cache->fci_entries_head); + CFS_INIT_LIST_HEAD(&cache->fci_lru); - cache->fci_cache_count = 0; + cache->fci_cache_count = 0; spin_lock_init(&cache->fci_lock); strncpy(cache->fci_name, name, sizeof(cache->fci_name)); - cache->fci_hash_size = hash_size; - cache->fci_cache_size = cache_size; + cache->fci_cache_size = cache_size; cache->fci_threshold = cache_threshold; /* Init fld cache info. */ - cache->fci_hash_mask = hash_size - 1; - OBD_ALLOC(cache->fci_hash_table, - hash_size * sizeof(*cache->fci_hash_table)); - if (cache->fci_hash_table == NULL) { - OBD_FREE_PTR(cache); - RETURN(ERR_PTR(-ENOMEM)); - } - - for (i = 0; i < hash_size; i++) - INIT_HLIST_HEAD(&cache->fci_hash_table[i]); memset(&cache->fci_stat, 0, sizeof(cache->fci_stat)); CDEBUG(D_INFO, "%s: FLD cache - Size: %d, Threshold: %d\n", @@ -142,8 +104,10 @@ struct fld_cache *fld_cache_init(const char *name, int hash_size, RETURN(cache); } -EXPORT_SYMBOL(fld_cache_init); +/** + * destroy fld cache. + */ void fld_cache_fini(struct fld_cache *cache) { __u64 pct; @@ -162,28 +126,109 @@ void fld_cache_fini(struct fld_cache *cache) CDEBUG(D_INFO, "FLD cache statistics (%s):\n", cache->fci_name); CDEBUG(D_INFO, " Total reqs: "LPU64"\n", cache->fci_stat.fst_count); CDEBUG(D_INFO, " Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache); - CDEBUG(D_INFO, " Saved RPCs: "LPU64"\n", cache->fci_stat.fst_inflight); CDEBUG(D_INFO, " Cache hits: "LPU64"%%\n", pct); - OBD_FREE(cache->fci_hash_table, cache->fci_hash_size * - sizeof(*cache->fci_hash_table)); - OBD_FREE_PTR(cache); - + OBD_FREE_PTR(cache); + + EXIT; +} + +static inline void fld_cache_entry_delete(struct fld_cache *cache, + struct fld_cache_entry *node); + +/** + * fix list by checking new entry with NEXT entry in order. + */ +static void fld_fix_new_list(struct fld_cache *cache) +{ + struct fld_cache_entry *f_curr; + struct fld_cache_entry *f_next; + struct lu_seq_range *c_range; + struct lu_seq_range *n_range; + struct list_head *head = &cache->fci_entries_head; + ENTRY; + +restart_fixup: + + list_for_each_entry_safe(f_curr, f_next, head, fce_list) { + c_range = &f_curr->fce_range; + n_range = &f_next->fce_range; + + LASSERT(range_is_sane(c_range)); + if (&f_next->fce_list == head) + break; + + LASSERT(c_range->lsr_start <= n_range->lsr_start); + + /* check merge possibility with next range */ + if (c_range->lsr_end == n_range->lsr_start) { + if (c_range->lsr_mdt != n_range->lsr_mdt) + continue; + n_range->lsr_start = c_range->lsr_start; + fld_cache_entry_delete(cache, f_curr); + continue; + } + + /* check if current range overlaps with next range. */ + if (n_range->lsr_start < c_range->lsr_end) { + + if (c_range->lsr_mdt == n_range->lsr_mdt) { + n_range->lsr_start = c_range->lsr_start; + n_range->lsr_end = max(c_range->lsr_end, + n_range->lsr_end); + + fld_cache_entry_delete(cache, f_curr); + } else { + if (n_range->lsr_end <= c_range->lsr_end) { + *n_range = *c_range; + fld_cache_entry_delete(cache, f_curr); + } else + n_range->lsr_start = c_range->lsr_end; + } + + /* we could have overlap over next + * range too. better restart. */ + goto restart_fixup; + } + + /* kill duplicates */ + if (c_range->lsr_start == n_range->lsr_start && + c_range->lsr_end == n_range->lsr_end) + fld_cache_entry_delete(cache, f_curr); + } + EXIT; } -EXPORT_SYMBOL(fld_cache_fini); -static inline struct hlist_head * -fld_cache_bucket(struct fld_cache *cache, seqno_t seq) +/** + * add node to fld cache + */ +static inline void fld_cache_entry_add(struct fld_cache *cache, + struct fld_cache_entry *f_new, + struct list_head *pos) { - return cache->fci_hash_table + (fld_cache_hash(seq) & - cache->fci_hash_mask); + list_add(&f_new->fce_list, pos); + list_add(&f_new->fce_lru, &cache->fci_lru); + + cache->fci_cache_count++; + fld_fix_new_list(cache); } -/* - * Check if cache needs to be shrinked. If so - do it. Tries to keep all - * collision lists well balanced. That is, check all of them and remove one - * entry in list and so on until cache is shrinked enough. +/** + * delete given node from list. + */ +static inline void fld_cache_entry_delete(struct fld_cache *cache, + struct fld_cache_entry *node) +{ + list_del(&node->fce_list); + list_del(&node->fce_lru); + cache->fci_cache_count--; + OBD_FREE_PTR(node); +} + +/** + * Check if cache needs to be shrunk. If so - do it. + * Remove one entry in list and so on until cache is shrunk enough. */ static int fld_cache_shrink(struct fld_cache *cache) { @@ -200,257 +245,234 @@ static int fld_cache_shrink(struct fld_cache *cache) curr = cache->fci_lru.prev; while (cache->fci_cache_count + cache->fci_threshold > - cache->fci_cache_size && curr != &cache->fci_lru) - { + cache->fci_cache_size && curr != &cache->fci_lru) { + flde = list_entry(curr, struct fld_cache_entry, fce_lru); curr = curr->prev; - - /* keep inflights */ - if (flde->fce_inflight) - continue; - - hlist_del_init(&flde->fce_list); - list_del_init(&flde->fce_lru); - cache->fci_cache_count--; - OBD_FREE_PTR(flde); + fld_cache_entry_delete(cache, flde); num++; } - CDEBUG(D_INFO, "%s: FLD cache - Shrinked by " + CDEBUG(D_INFO, "%s: FLD cache - Shrunk by " "%d entries\n", cache->fci_name, num); RETURN(0); } -int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq) +/** + * kill all fld cache entries. + */ +void fld_cache_flush(struct fld_cache *cache) { - struct fld_cache_entry *flde, *fldt; - struct hlist_head *bucket; - struct hlist_node *scan; ENTRY; spin_lock(&cache->fci_lock); - - /* Check if cache already has the entry with such a seq. */ - bucket = fld_cache_bucket(cache, seq); - hlist_for_each_entry(fldt, scan, bucket, fce_list) { - if (fldt->fce_seq == seq) { - spin_unlock(&cache->fci_lock); - RETURN(-EEXIST); - } - } + cache->fci_cache_size = 0; + fld_cache_shrink(cache); spin_unlock(&cache->fci_lock); - /* Allocate new entry. */ - OBD_ALLOC_PTR(flde); - if (!flde) - RETURN(-ENOMEM); + EXIT; +} - /* - * Check if cache has the entry with such a seq again. It could be added - * while we were allocating new entry. - */ - spin_lock(&cache->fci_lock); - hlist_for_each_entry(fldt, scan, bucket, fce_list) { - if (fldt->fce_seq == seq) { - spin_unlock(&cache->fci_lock); - OBD_FREE_PTR(flde); - RETURN(0); - } +/** + * punch hole in existing range. divide this range and add new + * entry accordingly. + */ + +void fld_cache_punch_hole(struct fld_cache *cache, + struct fld_cache_entry *f_curr, + struct fld_cache_entry *f_new) +{ + const struct lu_seq_range *range = &f_new->fce_range; + const seqno_t new_start = range->lsr_start; + const seqno_t new_end = range->lsr_end; + struct fld_cache_entry *fldt; + + ENTRY; + OBD_ALLOC_GFP(fldt, sizeof *fldt, CFS_ALLOC_ATOMIC); + if (!fldt) { + OBD_FREE_PTR(f_new); + EXIT; + /* overlap is not allowed, so dont mess up list. */ + return; } + /* break f_curr RANGE into three RANGES: + * f_curr, f_new , fldt + */ - /* Add new entry to cache and lru list. */ - INIT_HLIST_NODE(&flde->fce_list); - flde->fce_inflight = 1; - flde->fce_invalid = 1; - cfs_waitq_init(&flde->fce_waitq); - flde->fce_seq = seq; - - hlist_add_head(&flde->fce_list, bucket); - list_add(&flde->fce_lru, &cache->fci_lru); - cache->fci_cache_count++; + /* f_new = *range */ - spin_unlock(&cache->fci_lock); + /* fldt */ + fldt->fce_range.lsr_start = new_end; + fldt->fce_range.lsr_end = f_curr->fce_range.lsr_end; + fldt->fce_range.lsr_mdt = f_curr->fce_range.lsr_mdt; - RETURN(0); + /* f_curr */ + f_curr->fce_range.lsr_end = new_start; + + /* add these two entries to list */ + fld_cache_entry_add(cache, f_new, &f_curr->fce_list); + fld_cache_entry_add(cache, fldt, &f_new->fce_list); + + /* no need to fixup */ + EXIT; } -EXPORT_SYMBOL(fld_cache_insert_inflight); -int fld_cache_insert(struct fld_cache *cache, - seqno_t seq, mdsno_t mds) +/** + * handle range overlap in fld cache. + */ +void fld_cache_overlap_handle(struct fld_cache *cache, + struct fld_cache_entry *f_curr, + struct fld_cache_entry *f_new) { - struct fld_cache_entry *flde, *fldt; - struct hlist_head *bucket; - struct hlist_node *scan; - int rc; - ENTRY; + const struct lu_seq_range *range = &f_new->fce_range; + const seqno_t new_start = range->lsr_start; + const seqno_t new_end = range->lsr_end; + const mdsno_t mdt = range->lsr_mdt; - spin_lock(&cache->fci_lock); + /* this is overlap case, these case are checking overlapping with + * prev range only. fixup will handle overlaping with next range. */ - /* Check if need to shrink cache. */ - rc = fld_cache_shrink(cache); - if (rc) { - spin_unlock(&cache->fci_lock); - RETURN(rc); - } + if (f_curr->fce_range.lsr_mdt == mdt) { + f_curr->fce_range.lsr_start = min(f_curr->fce_range.lsr_start, + new_start); - /* Check if cache already has the entry with such a seq. */ - bucket = fld_cache_bucket(cache, seq); - hlist_for_each_entry(fldt, scan, bucket, fce_list) { - if (fldt->fce_seq == seq) { - if (fldt->fce_inflight) { - /* set mds for inflight entry */ - fldt->fce_mds = mds; - fldt->fce_inflight = 0; - fldt->fce_invalid = 0; - cfs_waitq_signal(&fldt->fce_waitq); - rc = 0; - } else - rc = -EEXIST; - spin_unlock(&cache->fci_lock); - RETURN(rc); - } - } - spin_unlock(&cache->fci_lock); + f_curr->fce_range.lsr_end = max(f_curr->fce_range.lsr_end, + new_end); - /* Allocate new entry. */ - OBD_ALLOC_PTR(flde); - if (!flde) - RETURN(-ENOMEM); + OBD_FREE_PTR(f_new); + fld_fix_new_list(cache); - /* - * Check if cache has the entry with such a seq again. It could be added - * while we were allocating new entry. - */ - spin_lock(&cache->fci_lock); - hlist_for_each_entry(fldt, scan, bucket, fce_list) { - if (fldt->fce_seq == seq) { - spin_unlock(&cache->fci_lock); - OBD_FREE_PTR(flde); - RETURN(0); - } - } + } else if (new_start <= f_curr->fce_range.lsr_start && + f_curr->fce_range.lsr_end <= new_end) { + /* case 1: new range completely overshadowed existing range. + * e.g. whole range migrated. update fld cache entry */ - /* Add new entry to cache and lru list. */ - INIT_HLIST_NODE(&flde->fce_list); - flde->fce_mds = mds; - flde->fce_seq = seq; - flde->fce_inflight = 0; - flde->fce_invalid = 0; - - hlist_add_head(&flde->fce_list, bucket); - list_add(&flde->fce_lru, &cache->fci_lru); - cache->fci_cache_count++; + f_curr->fce_range = *range; + OBD_FREE_PTR(f_new); + fld_fix_new_list(cache); - spin_unlock(&cache->fci_lock); + } else if (f_curr->fce_range.lsr_start < new_start && + new_end < f_curr->fce_range.lsr_end) { + /* case 2: new range fit within existing range. */ - RETURN(0); + fld_cache_punch_hole(cache, f_curr, f_new); + + } else if (new_end <= f_curr->fce_range.lsr_end) { + /* case 3: overlap: + * [new_start [c_start new_end) c_end) + */ + + LASSERT(new_start <= f_curr->fce_range.lsr_start); + + f_curr->fce_range.lsr_start = new_end; + fld_cache_entry_add(cache, f_new, f_curr->fce_list.prev); + + } else if (f_curr->fce_range.lsr_start <= new_start) { + /* case 4: overlap: + * [c_start [new_start c_end) new_end) + */ + + LASSERT(f_curr->fce_range.lsr_end <= new_end); + + f_curr->fce_range.lsr_end = new_start; + fld_cache_entry_add(cache, f_new, &f_curr->fce_list); + } else + CERROR("NEW range ="DRANGE" curr = "DRANGE"\n", + PRANGE(range),PRANGE(&f_curr->fce_range)); } -EXPORT_SYMBOL(fld_cache_insert); -void fld_cache_delete(struct fld_cache *cache, seqno_t seq) +/** + * Insert FLD entry in FLD cache. + * + * This function handles all cases of merging and breaking up of + * ranges. + */ +void fld_cache_insert(struct fld_cache *cache, + const struct lu_seq_range *range) { - struct fld_cache_entry *flde; - struct hlist_node *scan, *n; - struct hlist_head *bucket; + struct fld_cache_entry *f_new; + struct fld_cache_entry *f_curr; + struct fld_cache_entry *n; + struct list_head *head; + struct list_head *prev = NULL; + const seqno_t new_start = range->lsr_start; + const seqno_t new_end = range->lsr_end; ENTRY; - bucket = fld_cache_bucket(cache, seq); - + LASSERT(range_is_sane(range)); + + /* Allocate new entry. */ + OBD_ALLOC_PTR(f_new); + if (!f_new) { + EXIT; + return; + } + + f_new->fce_range = *range; + + /* + * Duplicate entries are eliminated in inset op. + * So we don't need to search new entry before starting insertion loop. + */ + spin_lock(&cache->fci_lock); - hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) { - if (flde->fce_seq == seq) { - hlist_del_init(&flde->fce_list); - list_del_init(&flde->fce_lru); - if (flde->fce_inflight) { - flde->fce_inflight = 0; - flde->fce_invalid = 1; - cfs_waitq_signal(&flde->fce_waitq); - } - cache->fci_cache_count--; - OBD_FREE_PTR(flde); - GOTO(out_unlock, 0); + fld_cache_shrink(cache); + + head = &cache->fci_entries_head; + + list_for_each_entry_safe(f_curr, n, head, fce_list) { + /* add list if next is end of list */ + if (new_end < f_curr->fce_range.lsr_start) + break; + + prev = &f_curr->fce_list; + /* check if this range is to left of new range. */ + if (new_start < f_curr->fce_range.lsr_end) { + fld_cache_overlap_handle(cache, f_curr, f_new); + goto out; } } - EXIT; -out_unlock: - spin_unlock(&cache->fci_lock); -} -EXPORT_SYMBOL(fld_cache_delete); + if (prev == NULL) + prev = head; -static int fld_check_inflight(struct fld_cache_entry *flde) -{ - return (flde->fce_inflight); + /* Add new entry to cache and lru list. */ + fld_cache_entry_add(cache, f_new, prev); +out: + spin_unlock(&cache->fci_lock); + EXIT; } +/** + * lookup \a seq sequence for range in fld cache. + */ int fld_cache_lookup(struct fld_cache *cache, - seqno_t seq, mdsno_t *mds) + const seqno_t seq, struct lu_seq_range *range) { struct fld_cache_entry *flde; - struct hlist_node *scan, *n; - struct hlist_head *bucket; + struct list_head *head; ENTRY; - bucket = fld_cache_bucket(cache, seq); spin_lock(&cache->fci_lock); + head = &cache->fci_entries_head; + cache->fci_stat.fst_count++; - hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) { - if (flde->fce_seq == seq) { - if (flde->fce_inflight) { - /* lookup RPC is inflight need to wait */ - struct l_wait_info lwi; - spin_unlock(&cache->fci_lock); - lwi = LWI_TIMEOUT(0, NULL, NULL); - l_wait_event(flde->fce_waitq, - !fld_check_inflight(flde), &lwi); - LASSERT(!flde->fce_inflight); - if (flde->fce_invalid) - RETURN(-ENOENT); - - *mds = flde->fce_mds; - cache->fci_stat.fst_inflight++; - } else { - LASSERT(!flde->fce_invalid); - *mds = flde->fce_mds; - list_del(&flde->fce_lru); - list_add(&flde->fce_lru, &cache->fci_lru); - cache->fci_stat.fst_cache++; - spin_unlock(&cache->fci_lock); - } + list_for_each_entry(flde, head, fce_list) { + if (flde->fce_range.lsr_start > seq) + break; + + if (range_within(&flde->fce_range, seq)) { + *range = flde->fce_range; + + /* update position of this entry in lru list. */ + list_move(&flde->fce_lru, &cache->fci_lru); + cache->fci_stat.fst_cache++; + spin_unlock(&cache->fci_lock); RETURN(0); } } spin_unlock(&cache->fci_lock); RETURN(-ENOENT); } -EXPORT_SYMBOL(fld_cache_lookup); -#else -int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq) -{ - return -ENOTSUPP; -} -EXPORT_SYMBOL(fld_cache_insert_inflight); - -int fld_cache_insert(struct fld_cache *cache, - seqno_t seq, mdsno_t mds) -{ - return -ENOTSUPP; -} -EXPORT_SYMBOL(fld_cache_insert); - -void fld_cache_delete(struct fld_cache *cache, - seqno_t seq) -{ - return; -} -EXPORT_SYMBOL(fld_cache_delete); - -int fld_cache_lookup(struct fld_cache *cache, - seqno_t seq, mdsno_t *mds) -{ - return -ENOTSUPP; -} -EXPORT_SYMBOL(fld_cache_lookup); -#endif diff --git a/lustre/fld/fld_handler.c b/lustre/fld/fld_handler.c index 0f6e7cc..2b6ab12 100644 --- a/lustre/fld/fld_handler.c +++ b/lustre/fld/fld_handler.c @@ -39,6 +39,7 @@ * * Author: Yury Umanets * Author: WangDi + * Author: Pravin Shelar */ #ifndef EXPORT_SYMTAB @@ -109,106 +110,200 @@ static void __exit fld_mod_exit(void) } } -/* Insert index entry and update cache. */ +/** + * Insert FLD index entry and update FLD cache. + * + * First it try to merge given range with existing range then update + * FLD index and FLD cache accordingly. FLD index consistency is maintained + * by this function. + * This function is called from the sequence allocator when a super-sequence + * is granted to a server. + */ + int fld_server_create(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t mds) + struct lu_seq_range *add_range, + struct thandle *th) { - int rc; + struct lu_seq_range *erange; + struct lu_seq_range *new; + struct fld_thread_info *info; + int rc = 0; + int do_merge=0; + ENTRY; - - rc = fld_index_create(fld, env, seq, mds); - - if (rc == 0) { - /* - * Do not return result of calling fld_cache_insert() - * here. First of all because it may return -EEXISTS. Another - * reason is that, we do not want to stop proceeding even after - * cache errors. + + info = lu_context_key_get(&env->le_ctx, &fld_thread_key); + mutex_lock(&fld->lsf_lock); + + erange = &info->fti_lrange; + new = &info->fti_irange; + *new = *add_range; + + /* STEP 1: try to merge with previous range */ + rc = fld_index_lookup(fld, env, new->lsr_start, erange); + if (!rc) { + /* in case of range overlap, mdt ID must be same for both ranges */ + if (new->lsr_mdt != erange->lsr_mdt) { + CERROR("mdt[%x] for given range is different from" + "existing overlapping range mdt[%x]\n", + new->lsr_mdt, erange->lsr_mdt); + rc = -EIO; + GOTO(out, rc); + } + + if (new->lsr_end < erange->lsr_end) + GOTO(out, rc); + do_merge = 1; + + } else if (rc == -ENOENT) { + /* check for merge case: optimizes for single mds lustre. + * As entry does not exist, returned entry must be left side + * entry compared to start of new range (ref dio_lookup()). + * So try to merge from left. */ - fld_cache_insert(fld->lsf_cache, seq, mds); + if (new->lsr_start == erange->lsr_end && + new->lsr_mdt == erange->lsr_mdt) + do_merge = 1; + } else { + /* no overlap allowed in fld, so failure in lookup is error */ + GOTO(out, rc); } - RETURN(rc); -} -EXPORT_SYMBOL(fld_server_create); + if (do_merge) { + /* new range can be combined with existing one. + * So delete existing range. + */ -/* Delete index entry. */ -int fld_server_delete(struct lu_server_fld *fld, - const struct lu_env *env, - seqno_t seq) -{ - int rc; - ENTRY; + rc = fld_index_delete(fld, env, erange, th); + if (rc == 0) { + new->lsr_start = min(erange->lsr_start, new->lsr_start); + new->lsr_end = max(erange->lsr_end, new->lsr_end); + } else + GOTO(out, rc); + + do_merge = 0; + } - fld_cache_delete(fld->lsf_cache, seq); - rc = fld_index_delete(fld, env, seq); + /* STEP 2: try to merge with next range */ + rc = fld_index_lookup(fld, env, new->lsr_end, erange); + if (!rc) { + /* case range overlap: with right side entry. */ + if (new->lsr_mdt == erange->lsr_mdt) + do_merge = 1; + } else if (rc == -ENOENT) { + /* this range is left of new range end point */ + LASSERT(erange->lsr_end <= new->lsr_end); + + if (new->lsr_end == erange->lsr_end) + do_merge = 1; + if (new->lsr_start <= erange->lsr_start) + do_merge = 1; + } else + GOTO(out, rc); + + if (do_merge) { + if (new->lsr_mdt != erange->lsr_mdt) { + CERROR("mdt[%x] for given range is different from" + "existing overlapping range mdt[%x]\n", + new->lsr_mdt, erange->lsr_mdt); + rc = -EIO; + GOTO(out, rc); + } + /* merge with next range */ + rc = fld_index_delete(fld, env, erange, th); + if (rc == 0) { + new->lsr_start = min(erange->lsr_start, new->lsr_start); + new->lsr_end = max(erange->lsr_end, new->lsr_end); + } else + GOTO(out, rc); + } + + /* now update fld entry. */ + rc = fld_index_create(fld, env, new, th); + + LASSERT(rc != -EEXIST); +out: + if (rc == 0) + fld_cache_insert(fld->lsf_cache, new); + + mutex_unlock(&fld->lsf_lock); + + CDEBUG((rc != 0 ? D_ERROR : D_INFO), + "%s: FLD create: given range : "DRANGE + "after merge "DRANGE" rc = %d \n", fld->lsf_name, + PRANGE(add_range), PRANGE(new), rc); + RETURN(rc); } -EXPORT_SYMBOL(fld_server_delete); -/* Lookup mds by seq. */ +EXPORT_SYMBOL(fld_server_create); + +/** + * Lookup mds by seq, returns a range for given seq. + * + * If that entry is not cached in fld cache, request is sent to super + * sequence controller node (MDT0). All other MDT[1...N] and client + * cache fld entries, but this cache is not persistent. + */ + int fld_server_lookup(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t *mds) + seqno_t seq, struct lu_seq_range *range) { int rc; ENTRY; - + /* Lookup it in the cache. */ - rc = fld_cache_lookup(fld->lsf_cache, seq, mds); + rc = fld_cache_lookup(fld->lsf_cache, seq, range); if (rc == 0) RETURN(0); - rc = fld_index_lookup(fld, env, seq, mds); - if (rc == 0) { - /* - * Do not return error here as well. See previous comment in - * same situation in function fld_server_create(). + if (fld->lsf_obj) + rc = fld_index_lookup(fld, env, seq, range); + else { + LASSERT(fld->lsf_control_exp); + /* send request to mdt0 i.e. super seq. controller. + * This is temporary solution, long term solution is fld + * replication on all mdt servers. */ - fld_cache_insert(fld->lsf_cache, seq, *mds); + rc = fld_client_rpc(fld->lsf_control_exp, + range, FLD_LOOKUP); } + + if (rc == 0) + fld_cache_insert(fld->lsf_cache, range); + RETURN(rc); } EXPORT_SYMBOL(fld_server_lookup); +/** + * All MDT server handle fld lookup operation. But only MDT0 has fld index. + * if entry is not found in cache we need to forward lookup request to MDT0 + */ + static int fld_server_handle(struct lu_server_fld *fld, const struct lu_env *env, - __u32 opc, struct md_fld *mf, + __u32 opc, struct lu_seq_range *range, struct fld_thread_info *info) { int rc; ENTRY; switch (opc) { - case FLD_CREATE: - rc = fld_server_create(fld, env, - mf->mf_seq, mf->mf_mds); - - /* Do not return -EEXIST error for resent case */ - if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST) - rc = 0; - break; - case FLD_DELETE: - rc = fld_server_delete(fld, env, mf->mf_seq); - - /* Do not return -ENOENT error for resent case */ - if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT) - rc = 0; - break; case FLD_LOOKUP: rc = fld_server_lookup(fld, env, - mf->mf_seq, &mf->mf_mds); + range->lsr_start, range); break; default: rc = -EINVAL; break; } - CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: " - LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc, - mf->mf_seq, mf->mf_mds); + CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: " + DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range)); RETURN(rc); @@ -218,8 +313,8 @@ static int fld_req_handle(struct ptlrpc_request *req, struct fld_thread_info *info) { struct lu_site *site; - struct md_fld *in; - struct md_fld *out; + struct lu_seq_range *in; + struct lu_seq_range *out; int rc; __u32 *opc; ENTRY; @@ -252,8 +347,6 @@ static int fld_req_handle(struct ptlrpc_request *req, static void fld_thread_info_init(struct ptlrpc_request *req, struct fld_thread_info *info) { - info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg); - info->fti_pill = &req->rq_pill; /* Init request capsule. */ req_capsule_init(info->fti_pill, req, RCL_SERVER); @@ -301,21 +394,27 @@ EXPORT_SYMBOL(fld_query); * * fid_is_local() is supposed to be used in assertion checks only. */ -int fid_is_local(struct lu_site *site, const struct lu_fid *fid) +int fid_is_local(const struct lu_env *env, + struct lu_site *site, const struct lu_fid *fid) { int result; struct md_site *msite; + struct lu_seq_range *range; + struct fld_thread_info *info; + ENTRY; + + info = lu_context_key_get(&env->le_ctx, &fld_thread_key); + range = &info->fti_lrange; result = 1; /* conservatively assume fid is local */ msite = lu_site2md(site); if (msite->ms_client_fld != NULL) { - mdsno_t mds; int rc; rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache, - fid_seq(fid), &mds); + fid_seq(fid), range); if (rc == 0) - result = (mds == msite->ms_node_id); + result = (range->lsr_mdt == msite->ms_node_id); } return result; } @@ -363,7 +462,8 @@ static void fld_server_proc_fini(struct lu_server_fld *fld) #endif int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt, - const char *prefix, const struct lu_env *env) + const char *prefix, const struct lu_env *env, + int mds_node_id) { int cache_size, cache_threshold; int rc; @@ -378,8 +478,8 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt, cache_threshold = cache_size * FLD_SERVER_CACHE_THRESHOLD / 100; + mutex_init(&fld->lsf_lock); fld->lsf_cache = fld_cache_init(fld->lsf_name, - FLD_SERVER_HTABLE_SIZE, cache_size, cache_threshold); if (IS_ERR(fld->lsf_cache)) { rc = PTR_ERR(fld->lsf_cache); @@ -387,14 +487,18 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt, GOTO(out, rc); } - rc = fld_index_init(fld, env, dt); - if (rc) - GOTO(out, rc); + if (!mds_node_id) { + rc = fld_index_init(fld, env, dt); + if (rc) + GOTO(out, rc); + } else + fld->lsf_obj = NULL; rc = fld_server_proc_init(fld); if (rc) GOTO(out, rc); + fld->lsf_control_exp = NULL; EXIT; out: if (rc) diff --git a/lustre/fld/fld_index.c b/lustre/fld/fld_index.c index 1b927ea..03da47e 100644 --- a/lustre/fld/fld_index.c +++ b/lustre/fld/fld_index.c @@ -60,32 +60,27 @@ #include #include #include +#include #include #include "fld_internal.h" const char fld_index_name[] = "fld"; -EXPORT_SYMBOL(fld_index_name); + +static const struct lu_seq_range IGIF_FLD_RANGE = { + .lsr_start = 1, + .lsr_end = IDIF_SEQ_START, + .lsr_mdt = 0 +}; const struct dt_index_features fld_index_features = { .dif_flags = DT_IND_UPDATE, .dif_keysize_min = sizeof(seqno_t), .dif_keysize_max = sizeof(seqno_t), - .dif_recsize_min = sizeof(mdsno_t), - .dif_recsize_max = sizeof(mdsno_t), + .dif_recsize_min = sizeof(struct lu_seq_range), + .dif_recsize_max = sizeof(struct lu_seq_range), .dif_ptrsize = 4 }; -EXPORT_SYMBOL(fld_index_features); - -/* - * number of blocks to reserve for particular operations. Should be function of - * ... something. Stub for now. - */ -enum { - FLD_TXN_INDEX_INSERT_CREDITS = 20, - FLD_TXN_INDEX_DELETE_CREDITS = 20, -}; - extern struct lu_context_key fld_thread_key; static struct dt_key *fld_key(const struct lu_env *env, @@ -102,86 +97,174 @@ static struct dt_key *fld_key(const struct lu_env *env, } static struct dt_rec *fld_rec(const struct lu_env *env, - const mdsno_t mds) + const struct lu_seq_range *range) { struct fld_thread_info *info; + struct lu_seq_range *rec; ENTRY; info = lu_context_key_get(&env->le_ctx, &fld_thread_key); LASSERT(info != NULL); + rec = &info->fti_rec; + + range_cpu_to_be(rec, range); + RETURN((void *)rec); +} + +struct thandle* fld_trans_start(struct lu_server_fld *fld, + const struct lu_env *env, int credit) +{ + struct fld_thread_info *info; + struct dt_device *dt_dev; + struct txn_param *p; + + dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); + info = lu_context_key_get(&env->le_ctx, &fld_thread_key); + p = &info->fti_txn_param; + txn_param_init(p, credit); - info->fti_rec = cpu_to_be64(mds); - RETURN((void *)&info->fti_rec); + return dt_dev->dd_ops->dt_trans_start(env, dt_dev, p); } +void fld_trans_stop(struct lu_server_fld *fld, + const struct lu_env *env, struct thandle* th) +{ + struct dt_device *dt_dev; + + dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); + dt_dev->dd_ops->dt_trans_stop(env, th); +} + +/** + * insert range in fld store. + * + * \param range range to be inserted + * \param th transaction for this operation as it could compound + * transaction. + * + * \retval 0 success + * \retval -ve error + */ + int fld_index_create(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t mds) + const struct lu_seq_range *range, + struct thandle *th) { struct dt_object *dt_obj = fld->lsf_obj; struct dt_device *dt_dev; - struct txn_param txn; - struct thandle *th; + seqno_t start; int rc; + ENTRY; + start = range->lsr_start; + LASSERT(range_is_sane(range)); dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); - /* stub here, will fix it later */ - txn_param_init(&txn, FLD_TXN_INDEX_INSERT_CREDITS); - - th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn); - if (!IS_ERR(th)) { - rc = dt_obj->do_index_ops->dio_insert(env, dt_obj, - fld_rec(env, mds), - fld_key(env, seq), - th, BYPASS_CAPA, 1); - dt_dev->dd_ops->dt_trans_stop(env, th); - } else - rc = PTR_ERR(th); + rc = dt_obj->do_index_ops->dio_insert(env, dt_obj, + fld_rec(env, range), + fld_key(env, start), + th, BYPASS_CAPA, 1); + + CDEBUG(D_INFO, "%s: insert given range : "DRANGE" rc = %d\n", + fld->lsf_name, PRANGE(range), rc); RETURN(rc); } +/** + * delete range in fld store. + * + * \param range range to be deleted + * \param th transaction + * + * \retval 0 success + * \retval -ve error + */ + int fld_index_delete(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq) + struct lu_seq_range *range, + struct thandle *th) { struct dt_object *dt_obj = fld->lsf_obj; struct dt_device *dt_dev; - struct txn_param txn; - struct thandle *th; + seqno_t seq = range->lsr_start; int rc; + ENTRY; dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); - txn_param_init(&txn, FLD_TXN_INDEX_DELETE_CREDITS); - th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn); - if (!IS_ERR(th)) { - rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, - fld_key(env, seq), th, - BYPASS_CAPA); - dt_dev->dd_ops->dt_trans_stop(env, th); - } else - rc = PTR_ERR(th); + rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, + fld_key(env, seq), th, + BYPASS_CAPA); + + CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n", + fld->lsf_name, PRANGE(range), rc); + RETURN(rc); } +/** + * lookup range for a seq passed + * + * \param seq seq for lookup. + * \param range result of lookup. + * + * \retval 0 success + * \retval -ve error + */ + int fld_index_lookup(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t *mds) + seqno_t seq, + struct lu_seq_range *range) { - struct dt_object *dt_obj = fld->lsf_obj; - struct dt_rec *rec = fld_rec(env, 0); + struct dt_object *dt_obj = fld->lsf_obj; + struct lu_seq_range *fld_rec; + struct dt_key *key = fld_key(env, seq); + struct fld_thread_info *info; int rc; + ENTRY; - rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec, - fld_key(env, seq), BYPASS_CAPA); - if (rc > 0) { - *mds = be64_to_cpu(*(__u64 *)rec); + info = lu_context_key_get(&env->le_ctx, &fld_thread_key); + fld_rec = &info->fti_rec; + + rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, + (struct dt_rec*) fld_rec, + key, BYPASS_CAPA); + + if (rc >= 0) { + range_be_to_cpu(fld_rec, fld_rec); + *range = *fld_rec; + if (range_within(range, seq)) + rc = 0; + else + rc = -ENOENT; + } + + CDEBUG(D_INFO, "%s: lookup seq = %llx range : "DRANGE" rc = %d\n", + fld->lsf_name, seq, PRANGE(range), rc); + + RETURN(rc); +} + +static int fld_insert_igif_fld(struct lu_server_fld *fld, + const struct lu_env *env) +{ + struct thandle *th; + int rc; + + ENTRY; + th = fld_trans_start(fld, env, FLD_TXN_INDEX_INSERT_CREDITS); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th); + fld_trans_stop(fld, env, th); + if (rc == -EEXIST) rc = 0; - } else - rc = -ENOENT; RETURN(rc); } @@ -199,11 +282,20 @@ int fld_index_init(struct lu_server_fld *fld, fld->lsf_obj = dt_obj; rc = dt_obj->do_ops->do_index_try(env, dt_obj, &fld_index_features); - if (rc == 0) + if (rc == 0) { LASSERT(dt_obj->do_index_ops != NULL); - else + rc = fld_insert_igif_fld(fld, env); + + if (rc != 0) { + CERROR("insert igif in fld! = %d\n", rc); + lu_object_put(env, &dt_obj->do_lu); + fld->lsf_obj = NULL; + } + } else CERROR("%s: File \"%s\" is not an index!\n", fld->lsf_name, fld_index_name); + + } else { CERROR("%s: Can't find \"%s\" obj %d\n", fld->lsf_name, fld_index_name, (int)PTR_ERR(dt_obj)); diff --git a/lustre/fld/fld_internal.h b/lustre/fld/fld_internal.h index 7a86e2e..6b50b16 100644 --- a/lustre/fld/fld_internal.h +++ b/lustre/fld/fld_internal.h @@ -45,10 +45,75 @@ #include #include - #include #include +enum { + LUSTRE_FLD_INIT = 1 << 0, + LUSTRE_FLD_RUN = 1 << 1 +}; + +struct fld_stats { + __u64 fst_count; + __u64 fst_cache; + __u64 fst_inflight; +}; + +typedef int (*fld_hash_func_t) (struct lu_client_fld *, __u64); + +typedef struct lu_fld_target * +(*fld_scan_func_t) (struct lu_client_fld *, __u64); + +struct lu_fld_hash { + const char *fh_name; + fld_hash_func_t fh_hash_func; + fld_scan_func_t fh_scan_func; +}; + +struct fld_cache_entry { + struct list_head fce_lru; + struct list_head fce_list; + /** + * fld cache entries are sorted on range->lsr_start field. */ + struct lu_seq_range fce_range; +}; + +struct fld_cache { + /** + * Cache guard, protects fci_hash mostly because others immutable after + * init is finished. + */ + spinlock_t fci_lock; + + /** + * Cache shrink threshold */ + int fci_threshold; + + /** + * Prefered number of cached entries */ + int fci_cache_size; + + /** + * Current number of cached entries. Protected by @fci_lock */ + int fci_cache_count; + + /** + * LRU list fld entries. */ + struct list_head fci_lru; + + /** + * sorted fld entries. */ + struct list_head fci_entries_head; + + /** + * Cache statistics. */ + struct fld_stats fci_stat; + + /** + * Cache name used for debug and messages. */ + char fci_name[80]; +}; + enum fld_op { FLD_CREATE = 0, FLD_DELETE = 1, @@ -71,30 +136,26 @@ enum { FLD_CLIENT_CACHE_THRESHOLD = 10 }; -enum { - /* - * One page is used for hashtable. That is sizeof(struct hlist_head) * - * 1024. - */ - FLD_CLIENT_HTABLE_SIZE = (1024 * 1), - - /* - * Here 4 pages are used for hashtable of server cache. This is is - * because cache it self is 4 times bugger. - */ - FLD_SERVER_HTABLE_SIZE = (1024 * 4) -}; - extern struct lu_fld_hash fld_hash[]; #ifdef __KERNEL__ + struct fld_thread_info { struct req_capsule *fti_pill; __u64 fti_key; - __u64 fti_rec; - __u32 fti_flags; + struct lu_seq_range fti_rec; + struct lu_seq_range fti_lrange; + struct lu_seq_range fti_irange; + struct txn_param fti_txn_param; }; + +struct thandle* fld_trans_start(struct lu_server_fld *fld, + const struct lu_env *env, int credit); + +void fld_trans_stop(struct lu_server_fld *fld, + const struct lu_env *env, struct thandle* th); + int fld_index_init(struct lu_server_fld *fld, const struct lu_env *env, struct dt_device *dt); @@ -104,15 +165,20 @@ void fld_index_fini(struct lu_server_fld *fld, int fld_index_create(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t mds); + const struct lu_seq_range *range, + struct thandle *th); int fld_index_delete(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq); + struct lu_seq_range *range, + struct thandle *th); int fld_index_lookup(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t *mds); + seqno_t seq, struct lu_seq_range *range); + +int fld_client_rpc(struct obd_export *exp, + struct lu_seq_range *range, __u32 fld_op); #ifdef LPROCFS extern struct lprocfs_vars fld_server_proc_list[]; @@ -121,6 +187,22 @@ extern struct lprocfs_vars fld_client_proc_list[]; #endif +struct fld_cache *fld_cache_init(const char *name, + int cache_size, int cache_threshold); + +void fld_cache_fini(struct fld_cache *cache); + +void fld_cache_flush(struct fld_cache *cache); + +void fld_cache_insert(struct fld_cache *cache, + const struct lu_seq_range *range); + +void fld_cache_delete(struct fld_cache *cache, + const struct lu_seq_range *range); + +int fld_cache_lookup(struct fld_cache *cache, + const seqno_t seq, struct lu_seq_range *range); + static inline const char * fld_target_name(struct lu_fld_target *tar) { diff --git a/lustre/fld/fld_request.c b/lustre/fld/fld_request.c index 39fb13b..dff5498 100644 --- a/lustre/fld/fld_request.c +++ b/lustre/fld/fld_request.c @@ -164,26 +164,7 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) RETURN(NULL); } -static int fld_dht_hash(struct lu_client_fld *fld, - seqno_t seq) -{ - /* XXX: here should be DHT hash */ - return fld_rrb_hash(fld, seq); -} - -static struct lu_fld_target * -fld_dht_scan(struct lu_client_fld *fld, seqno_t seq) -{ - /* XXX: here should be DHT scan code */ - return fld_rrb_scan(fld, seq); -} - -struct lu_fld_hash fld_hash[3] = { - { - .fh_name = "DHT", - .fh_hash_func = fld_dht_hash, - .fh_scan_func = fld_dht_scan - }, +struct lu_fld_hash fld_hash[] = { { .fh_name = "RRB", .fh_hash_func = fld_rrb_hash, @@ -394,7 +375,6 @@ int fld_client_init(struct lu_client_fld *fld, FLD_CLIENT_CACHE_THRESHOLD / 100; fld->lcf_cache = fld_cache_init(fld->lcf_name, - FLD_CLIENT_HTABLE_SIZE, cache_size, cache_threshold); if (IS_ERR(fld->lcf_cache)) { rc = PTR_ERR(fld->lcf_cache); @@ -447,11 +427,11 @@ void fld_client_fini(struct lu_client_fld *fld) } EXPORT_SYMBOL(fld_client_fini); -static int fld_client_rpc(struct obd_export *exp, - struct md_fld *mf, __u32 fld_op) +int fld_client_rpc(struct obd_export *exp, + struct lu_seq_range *range, __u32 fld_op) { struct ptlrpc_request *req; - struct md_fld *pmf; + struct lu_seq_range *prange; __u32 *op; int rc; ENTRY; @@ -466,8 +446,8 @@ static int fld_client_rpc(struct obd_export *exp, op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC); *op = fld_op; - pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); - *pmf = *mf; + prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); + *prange = *range; ptlrpc_request_set_replen(req); req->rq_request_portal = FLD_REQUEST_PORTAL; @@ -483,110 +463,32 @@ static int fld_client_rpc(struct obd_export *exp, if (rc) GOTO(out_req, rc); - pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD); - if (pmf == NULL) + prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD); + if (prange == NULL) GOTO(out_req, rc = -EFAULT); - *mf = *pmf; + *range = *prange; EXIT; out_req: ptlrpc_req_finished(req); return rc; } -int fld_client_create(struct lu_client_fld *fld, - seqno_t seq, mdsno_t mds, - const struct lu_env *env) -{ - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds }; - struct lu_fld_target *target; - int rc; - ENTRY; - - fld->lcf_flags |= LUSTRE_FLD_RUN; - target = fld_client_get_target(fld, seq); - LASSERT(target != NULL); - - CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: " - LPU64") on target %s (idx "LPU64")\n", fld->lcf_name, - seq, mds, fld_target_name(target), target->ft_idx); - -#ifdef __KERNEL__ - if (target->ft_srv != NULL) { - LASSERT(env != NULL); - rc = fld_server_create(target->ft_srv, env, seq, mds); - } else { -#endif - rc = fld_client_rpc(target->ft_exp, &md_fld, FLD_CREATE); -#ifdef __KERNEL__ - } -#endif - - if (rc == 0) { - /* - * Do not return result of calling fld_cache_insert() - * here. First of all because it may return -EEXIST. Another - * reason is that, we do not want to stop proceeding because of - * cache errors. - */ - fld_cache_insert(fld->lcf_cache, seq, mds); - } else { - CERROR("%s: Can't create FLD entry, rc %d\n", - fld->lcf_name, rc); - } - - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_create); - -int fld_client_delete(struct lu_client_fld *fld, seqno_t seq, - const struct lu_env *env) -{ - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 }; - struct lu_fld_target *target; - int rc; - ENTRY; - - fld->lcf_flags |= LUSTRE_FLD_RUN; - fld_cache_delete(fld->lcf_cache, seq); - - target = fld_client_get_target(fld, seq); - LASSERT(target != NULL); - - CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on " - "target %s (idx "LPU64")\n", fld->lcf_name, seq, - fld_target_name(target), target->ft_idx); - -#ifdef __KERNEL__ - if (target->ft_srv != NULL) { - LASSERT(env != NULL); - rc = fld_server_delete(target->ft_srv, - env, seq); - } else { -#endif - rc = fld_client_rpc(target->ft_exp, - &md_fld, FLD_DELETE); -#ifdef __KERNEL__ - } -#endif - - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_delete); - int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds, const struct lu_env *env) { - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 }; + struct lu_seq_range res; struct lu_fld_target *target; int rc; ENTRY; fld->lcf_flags |= LUSTRE_FLD_RUN; - rc = fld_cache_lookup(fld->lcf_cache, seq, mds); - if (rc == 0) + rc = fld_cache_lookup(fld->lcf_cache, seq, &res); + if (rc == 0) { + *mds = res.lsr_mdt; RETURN(0); + } /* Can not find it in the cache */ target = fld_client_get_target(fld, seq); @@ -596,45 +498,24 @@ int fld_client_lookup(struct lu_client_fld *fld, "target %s (idx "LPU64")\n", fld->lcf_name, seq, fld_target_name(target), target->ft_idx); + res.lsr_start = seq; #ifdef __KERNEL__ if (target->ft_srv != NULL) { LASSERT(env != NULL); rc = fld_server_lookup(target->ft_srv, - env, seq, &md_fld.mf_mds); + env, seq, &res); } else { #endif - /* - * insert the 'inflight' sequence. No need to protect that, - * we are trying to reduce numbers of RPC but not restrict - * to them exactly one - */ - fld_cache_insert_inflight(fld->lcf_cache, seq); rc = fld_client_rpc(target->ft_exp, - &md_fld, FLD_LOOKUP); + &res, FLD_LOOKUP); #ifdef __KERNEL__ } #endif - if (seq < FID_SEQ_START) { - /* - * The current solution for IGIF is to bind it to mds0. - * In the future, this should be fixed once IGIF can be found - * in FLD. - */ - md_fld.mf_mds = 0; - rc = 0; - } if (rc == 0) { - *mds = md_fld.mf_mds; + *mds = res.lsr_mdt; - /* - * Do not return error here as well. See previous comment in - * same situation in function fld_client_create(). - */ - fld_cache_insert(fld->lcf_cache, seq, *mds); - } else { - /* remove 'inflight' seq if it exists */ - fld_cache_delete(fld->lcf_cache, seq); + fld_cache_insert(fld->lcf_cache, &res); } RETURN(rc); } diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 59fcca8..48bbc4b 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -168,74 +168,85 @@ #define LUSTRE_LOG_VERSION 0x00050000 #define LUSTRE_MGS_VERSION 0x00060000 -typedef __u64 mdsno_t; +typedef __u32 mdsno_t; typedef __u64 seqno_t; -struct lu_range { - __u64 lr_start; - __u64 lr_end; - /** stub for compact fld work. */ - __u64 lr_padding; +/** + * Describes a range of sequence, lsr_start is included but lsr_end is + * not in the range. + * Same structure is used in fld module where lsr_mdt field holds mdt id + * of the home mdt. + */ + +struct lu_seq_range { + __u64 lsr_start; + __u64 lsr_end; + __u32 lsr_mdt; + __u32 lsr_padding; }; /** * returns width of given range \a r */ -static inline __u64 range_space(const struct lu_range *range) +static inline __u64 range_space(const struct lu_seq_range *range) { - return range->lr_end - range->lr_start; + return range->lsr_end - range->lsr_start; } /** * initialize range to zero */ -static inline void range_init(struct lu_range *range) + +static inline void range_init(struct lu_seq_range *range) { - range->lr_start = range->lr_end = 0; + range->lsr_start = range->lsr_end = range->lsr_mdt = 0; } /** * check if given seq id \a s is within given range \a r */ -static inline int range_within(struct lu_range *range, + +static inline int range_within(const struct lu_seq_range *range, __u64 s) { - return s >= range->lr_start && s < range->lr_end; + return s >= range->lsr_start && s < range->lsr_end; } /** * allocate \a w units of sequence from range \a from. */ -static inline void range_alloc(struct lu_range *to, - struct lu_range *from, +static inline void range_alloc(struct lu_seq_range *to, + struct lu_seq_range *from, __u64 width) { - to->lr_start = from->lr_start; - to->lr_end = from->lr_start + width; - from->lr_start += width; + to->lsr_start = from->lsr_start; + to->lsr_end = from->lsr_start + width; + from->lsr_start += width; } -static inline int range_is_sane(const struct lu_range *range) +static inline int range_is_sane(const struct lu_seq_range *range) { - return (range->lr_end >= range->lr_start); + return (range->lsr_end >= range->lsr_start); } -static inline int range_is_zero(const struct lu_range *range) +static inline int range_is_zero(const struct lu_seq_range *range) { - return (range->lr_start == 0 && range->lr_end == 0); + return (range->lsr_start == 0 && range->lsr_end == 0); } -static inline int range_is_exhausted(const struct lu_range *range) +static inline int range_is_exhausted(const struct lu_seq_range *range) + { return range_space(range) == 0; } -#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x]" +#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x):%x" #define PRANGE(range) \ - (range)->lr_start, \ - (range)->lr_end + (range)->lsr_start, \ + (range)->lsr_end, \ + (range)->lsr_mdt /** \defgroup lu_fid lu_fid * @{ */ @@ -443,7 +454,7 @@ static inline int fid_is_zero(const struct lu_fid *fid) } extern void lustre_swab_lu_fid(struct lu_fid *fid); -extern void lustre_swab_lu_range(struct lu_range *range); +extern void lustre_swab_lu_seq_range(struct lu_seq_range *range); static inline int lu_fid_eq(const struct lu_fid *f0, const struct lu_fid *f1) @@ -1745,13 +1756,6 @@ struct lmv_desc { extern void lustre_swab_lmv_desc (struct lmv_desc *ld); -struct md_fld { - seqno_t mf_seq; - mdsno_t mf_mds; -}; - -extern void lustre_swab_md_fld (struct md_fld *mf); - enum fld_rpc_opc { FLD_QUERY = 600, FLD_LAST_OPC, diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 470feae..7c8085f 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -54,8 +54,8 @@ struct lu_site; struct lu_context; /* Whole sequences space range and zero range definitions */ -extern const struct lu_range LUSTRE_SEQ_SPACE_RANGE; -extern const struct lu_range LUSTRE_SEQ_ZERO_RANGE; +extern const struct lu_seq_range LUSTRE_SEQ_SPACE_RANGE; +extern const struct lu_seq_range LUSTRE_SEQ_ZERO_RANGE; extern const struct lu_fid LUSTRE_BFL_FID; enum { @@ -63,7 +63,7 @@ enum { * This is how may FIDs may be allocated in one sequence. 16384 for * now. */ - LUSTRE_SEQ_MAX_WIDTH = 0x0000000000004000ULL, + LUSTRE_SEQ_MAX_WIDTH = 0x0000000000000400ULL, /* * How many sequences may be allocate for meta-sequence (this is 128 @@ -134,7 +134,7 @@ struct lu_client_seq { * clients, this contains meta-sequence range. And for servers this * contains super-sequence range. */ - struct lu_range lcs_space; + struct lu_seq_range lcs_space; /* Seq related proc */ cfs_proc_dir_entry_t *lcs_proc_dir; @@ -164,7 +164,7 @@ struct lu_client_seq { /* server sequence manager interface */ struct lu_server_seq { /* Available sequences space */ - struct lu_range lss_space; + struct lu_seq_range lss_space; /* * Device for server side seq manager needs (saving sequences to backing @@ -198,6 +198,11 @@ struct lu_server_seq { * LUSTRE_SEQ_SUPER_WIDTH and LUSTRE_SEQ_META_WIDTH. */ __u64 lss_width; + + /** + * Pointer to site object, required to access site fld. + */ + struct md_site *lss_site; }; int seq_query(struct com_thread_info *info); @@ -207,19 +212,20 @@ int seq_server_init(struct lu_server_seq *seq, struct dt_device *dev, const char *prefix, enum lu_mgr_type type, + struct md_site *ls, const struct lu_env *env); void seq_server_fini(struct lu_server_seq *seq, const struct lu_env *env); int seq_server_alloc_super(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env); int seq_server_alloc_meta(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env); int seq_server_set_cli(struct lu_server_seq *seq, @@ -241,7 +247,8 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid); /* Fids common stuff */ -int fid_is_local(struct lu_site *site, const struct lu_fid *fid); +int fid_is_local(const struct lu_env *env, + struct lu_site *site, const struct lu_fid *fid); /* fid locking */ @@ -300,9 +307,32 @@ static inline __u64 fid_flatten(const struct lu_fid *fid) #define LUSTRE_SEQ_CTL_NAME "seq_ctl" /* Range common stuff */ -void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src); -void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src); -void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src); -void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src); +static inline void range_cpu_to_le(struct lu_seq_range *dst, const struct lu_seq_range *src) +{ + dst->lsr_start = cpu_to_le64(src->lsr_start); + dst->lsr_end = cpu_to_le64(src->lsr_end); + dst->lsr_mdt = cpu_to_le32(src->lsr_mdt); +} + +static inline void range_le_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src) +{ + dst->lsr_start = le64_to_cpu(src->lsr_start); + dst->lsr_end = le64_to_cpu(src->lsr_end); + dst->lsr_mdt = le32_to_cpu(src->lsr_mdt); +} + +static inline void range_cpu_to_be(struct lu_seq_range *dst, const struct lu_seq_range *src) +{ + dst->lsr_start = cpu_to_be64(src->lsr_start); + dst->lsr_end = cpu_to_be64(src->lsr_end); + dst->lsr_mdt = cpu_to_be32(src->lsr_mdt); +} + +static inline void range_be_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src) +{ + dst->lsr_start = be64_to_cpu(src->lsr_start); + dst->lsr_end = be64_to_cpu(src->lsr_end); + dst->lsr_mdt = be32_to_cpu(src->lsr_mdt); +} #endif /* __LINUX_FID_H */ diff --git a/lustre/include/lustre_fld.h b/lustre/include/lustre_fld.h index ec8be4f..ec65b99 100644 --- a/lustre/include/lustre_fld.h +++ b/lustre/include/lustre_fld.h @@ -45,17 +45,12 @@ struct lu_client_fld; struct lu_server_fld; +struct lu_fld_hash; +struct fld_cache; extern const struct dt_index_features fld_index_features; extern const char fld_index_name[]; - -struct fld_stats { - __u64 fst_count; - __u64 fst_cache; - __u64 fst_inflight; -}; - /* * FLD (Fid Location Database) interface. */ @@ -64,7 +59,6 @@ enum { LUSTRE_CLI_FLD_HASH_RRB }; -struct lu_server_fld; struct lu_fld_target { struct list_head ft_chain; @@ -73,134 +67,101 @@ struct lu_fld_target { __u64 ft_idx; }; -typedef int -(*fld_hash_func_t) (struct lu_client_fld *, __u64); - -typedef struct lu_fld_target * -(*fld_scan_func_t) (struct lu_client_fld *, __u64); - -struct lu_fld_hash { - const char *fh_name; - fld_hash_func_t fh_hash_func; - fld_scan_func_t fh_scan_func; -}; - -struct fld_cache_entry { - struct hlist_node fce_list; - struct list_head fce_lru; - mdsno_t fce_mds; - seqno_t fce_seq; - cfs_waitq_t fce_waitq; - __u32 fce_inflight:1, - fce_invalid:1; -}; - -struct fld_cache { - /* - * Cache guard, protects fci_hash mostly because others immutable after - * init is finished. - */ - spinlock_t fci_lock; - - /* Cache shrink threshold */ - int fci_threshold; - - /* Prefered number of cached entries */ - int fci_cache_size; - - /* Current number of cached entries. Protected by @fci_lock */ - int fci_cache_count; - - /* Hash table size (number of collision lists) */ - int fci_hash_size; - - /* Hash table mask */ - int fci_hash_mask; - - /* Hash table for all collision lists */ - struct hlist_head *fci_hash_table; - - /* Lru list */ - struct list_head fci_lru; - - /* Cache statistics. */ - struct fld_stats fci_stat; - - /* Cache name used for debug and messages. */ - char fci_name[80]; -}; - struct lu_server_fld { - /* Fld dir proc entry. */ + /** + * Fld dir proc entry. */ cfs_proc_dir_entry_t *lsf_proc_dir; - /* /fld file object device */ + /** + * /fld file object device */ struct dt_object *lsf_obj; - /* Client FLD cache. */ + /** + * super sequence controller export, needed to forward fld + * lookup request. */ + struct obd_export *lsf_control_exp; + + /** + * Client FLD cache. */ struct fld_cache *lsf_cache; - /* Protect index modifications */ - struct semaphore lsf_sem; + /** + * Protect index modifications */ + struct mutex lsf_lock; - /* Fld service name in form "fld-srv-lustre-MDTXXX" */ + /** + * Fld service name in form "fld-srv-lustre-MDTXXX" */ char lsf_name[80]; }; -enum { - LUSTRE_FLD_INIT = 1 << 0, - LUSTRE_FLD_RUN = 1 << 1 -}; - struct lu_client_fld { - /* Client side proc entry. */ + /** + * Client side proc entry. */ cfs_proc_dir_entry_t *lcf_proc_dir; - /* List of exports client FLD knows about. */ + /** + * List of exports client FLD knows about. */ struct list_head lcf_targets; - /* Current hash to be used to chose an export. */ + /** + * Current hash to be used to chose an export. */ struct lu_fld_hash *lcf_hash; - /* Exports count. */ + /** + * Exports count. */ int lcf_count; - /* Lock protecting exports list and fld_hash. */ + /** + * Lock protecting exports list and fld_hash. */ spinlock_t lcf_lock; - /* Client FLD cache. */ + /** + * Client FLD cache. */ struct fld_cache *lcf_cache; - /* Client fld proc entry name. */ + /** + * Client fld proc entry name. */ char lcf_name[80]; const struct lu_context *lcf_ctx; - + int lcf_flags; }; +/** + * number of blocks to reserve for particular operations. Should be function of + * ... something. Stub for now. + */ +enum { + /* one insert operation can involve two delete and one insert */ + FLD_TXN_INDEX_INSERT_CREDITS = 60, + FLD_TXN_INDEX_DELETE_CREDITS = 20, +}; + int fld_query(struct com_thread_info *info); /* Server methods */ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt, const char *prefix, - const struct lu_env *env); + const struct lu_env *env, + int mds_node_id); void fld_server_fini(struct lu_server_fld *fld, const struct lu_env *env); int fld_server_create(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t mds); + struct lu_seq_range *add_range, + struct thandle *th); int fld_server_delete(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq); + struct lu_seq_range *range); int fld_server_lookup(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t *mds); + seqno_t seq, struct lu_seq_range *range); /* Client methods */ int fld_client_init(struct lu_client_fld *fld, @@ -215,7 +176,7 @@ int fld_client_lookup(struct lu_client_fld *fld, const struct lu_env *env); int fld_client_create(struct lu_client_fld *fld, - seqno_t seq, mdsno_t mds, + struct lu_seq_range *range, const struct lu_env *env); int fld_client_delete(struct lu_client_fld *fld, @@ -228,27 +189,4 @@ int fld_client_add_target(struct lu_client_fld *fld, int fld_client_del_target(struct lu_client_fld *fld, __u64 idx); -/* Cache methods */ -struct fld_cache *fld_cache_init(const char *name, - int hash_size, - int cache_size, - int cache_threshold); - -void fld_cache_fini(struct fld_cache *cache); - -void fld_cache_flush(struct fld_cache *cache); - -int fld_cache_insert(struct fld_cache *cache, - seqno_t seq, mdsno_t mds); - -int fld_cache_insert_inflight(struct fld_cache *cache, - seqno_t seq); - -void fld_cache_delete(struct fld_cache *cache, - seqno_t seq); - -int -fld_cache_lookup(struct fld_cache *cache, - seqno_t seq, mdsno_t *mds); - #endif diff --git a/lustre/lmv/lmv_fld.c b/lustre/lmv/lmv_fld.c index c2b9757..8f4f94c 100644 --- a/lustre/lmv/lmv_fld.c +++ b/lustre/lmv/lmv_fld.c @@ -75,11 +75,11 @@ int lmv_fld_lookup(struct lmv_obd *lmv, RETURN(rc); } - CDEBUG(D_INODE, "FLD lookup got mds #"LPU64" for fid="DFID"\n", + CDEBUG(D_INODE, "FLD lookup got mds #%x for fid="DFID"\n", *mds, PFID(fid)); if (*mds >= lmv->desc.ld_tgt_count) { - CERROR("FLD lookup got invalid mds #"LPU64" (max: %d) " + CERROR("FLD lookup got invalid mds #%x (max: %x) " "for fid="DFID"\n", *mds, lmv->desc.ld_tgt_count, PFID(fid)); rc = -EINVAL; diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 7e35465..cb1273b 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -768,7 +768,7 @@ int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid, } CDEBUG(D_INODE, "Allocate new fid "DFID" for slave " - "obj -> mds #"LPU64"\n", PFID(fid), mds); + "obj -> mds #%x\n", PFID(fid), mds); RETURN(rc); } diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 004218e..7543a8c 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -978,20 +978,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL); if (rc > 0) { LASSERT(fid_is_sane(fid)); - - /* - * Client switches to new sequence, setup FLD. - */ - rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid), - mds, NULL); - if (rc) { - /* - * Delete just allocated fid sequence in case - * of fail back. - */ - CERROR("Can't create fld entry, rc %d\n", rc); - obd_fid_delete(tgt->ltd_exp, NULL); - } + rc = 0; } EXIT; @@ -1498,7 +1485,7 @@ repeat: else if (rc) RETURN(rc); - CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n", + CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n", op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), op_data->op_mds); @@ -1970,7 +1957,7 @@ repeat: RETURN(rc); } - CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n", + CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n", mds, PFID(&op_data->op_fid1)); op_data->op_fsuid = current->fsuid; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 1a5b7f7..c7932bf 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -3373,7 +3373,7 @@ static void mdt_seq_adjust(const struct lu_env *env, struct mdt_device *m, int lost) { struct md_site *ms = mdt_md_site(m); - struct lu_range out; + struct lu_seq_range out; ENTRY; LASSERT(ms && ms->ms_server_seq); @@ -3439,6 +3439,7 @@ static int mdt_seq_init(const struct lu_env *env, rc = seq_server_init(ms->ms_control_seq, m->mdt_bottom, uuid, LUSTRE_SEQ_CONTROLLER, + ms, env); if (rc) @@ -3480,6 +3481,7 @@ static int mdt_seq_init(const struct lu_env *env, rc = seq_server_init(ms->ms_server_seq, m->mdt_bottom, uuid, LUSTRE_SEQ_SERVER, + ms, env); if (rc) GOTO(out_seq_fini, rc = -ENOMEM); @@ -3634,7 +3636,8 @@ static int mdt_fld_init(const struct lu_env *env, RETURN(rc = -ENOMEM); rc = fld_server_init(ms->ms_server_fld, - m->mdt_bottom, uuid, env); + m->mdt_bottom, uuid, + env, ms->ms_node_id); if (rc) { OBD_FREE_PTR(ms->ms_server_fld); ms->ms_server_fld = NULL; diff --git a/lustre/obdclass/llog_swab.c b/lustre/obdclass/llog_swab.c index df515ee..b76cca8 100644 --- a/lustre/obdclass/llog_swab.c +++ b/lustre/obdclass/llog_swab.c @@ -107,12 +107,13 @@ void lustre_swab_lu_fid(struct lu_fid *fid) } EXPORT_SYMBOL(lustre_swab_lu_fid); -void lustre_swab_lu_range(struct lu_range *range) +void lustre_swab_lu_seq_range(struct lu_seq_range *range) { - __swab64s (&range->lr_start); - __swab64s (&range->lr_end); + __swab64s (&range->lsr_start); + __swab64s (&range->lsr_end); + __swab32s (&range->lsr_mdt); } -EXPORT_SYMBOL(lustre_swab_lu_range); +EXPORT_SYMBOL(lustre_swab_lu_seq_range); void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail) { diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index bd5bb5b..0d4b6be 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -3609,7 +3609,7 @@ static int osd_fid_lookup(const struct lu_env *env, * fids. Unfortunately it is somewhat expensive (does a * cache-lookup). Disabling it for production/acceptance-testing. */ - LASSERT(1 || fid_is_local(ldev->ld_site, fid)); + LASSERT(1 || fid_is_local(env, ldev->ld_site, fid)); ENTRY; diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index e663cea..764957e 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -656,7 +656,7 @@ EXPORT_SYMBOL(RMF_SEQ_OPC); const struct req_msg_field RMF_SEQ_RANGE = DEFINE_MSGF("seq_query_range", 0, - sizeof(struct lu_range), lustre_swab_lu_range); + sizeof(struct lu_seq_range), lustre_swab_lu_seq_range); EXPORT_SYMBOL(RMF_SEQ_RANGE); const struct req_msg_field RMF_FLD_OPC = @@ -666,7 +666,7 @@ EXPORT_SYMBOL(RMF_FLD_OPC); const struct req_msg_field RMF_FLD_MDFLD = DEFINE_MSGF("fld_query_mdfld", 0, - sizeof(struct md_fld), lustre_swab_md_fld); + sizeof(struct lu_seq_range), lustre_swab_lu_seq_range); EXPORT_SYMBOL(RMF_FLD_MDFLD); const struct req_msg_field RMF_MDT_BODY = diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 8739098..1cd90f0 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1956,12 +1956,6 @@ void lustre_swab_lmv_desc (struct lmv_desc *ld) __swab32s (&ld->ld_active_tgt_count); /* uuid endian insensitive */ } -/*end adding MDT by huanghua@clusterfs.com*/ -void lustre_swab_md_fld (struct md_fld *mf) -{ - __swab64s(&mf->mf_seq); - __swab64s(&mf->mf_mds); -} static void print_lum (struct lov_user_md *lum) { diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 03adc07..363f399 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -298,7 +298,6 @@ EXPORT_SYMBOL(lustre_msg_set_transno); EXPORT_SYMBOL(lustre_msg_set_status); EXPORT_SYMBOL(lustre_msg_set_conn_cnt); EXPORT_SYMBOL(lustre_swab_mgs_target_info); -EXPORT_SYMBOL(lustre_swab_md_fld); EXPORT_SYMBOL(lustre_swab_generic_32s); EXPORT_SYMBOL(lustre_swab_lustre_capa); EXPORT_SYMBOL(lustre_swab_lustre_capa_key); diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index dd596d6..617f2bc 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -235,8 +235,8 @@ load_modules() { load_module ptlrpc/ptlrpc load_module ptlrpc/gss/ptlrpc_gss [ "$USE_QUOTA" = "yes" -a "$LQUOTA" != "no" ] && load_module quota/lquota - load_module fid/fid load_module fld/fld + load_module fid/fid load_module lmv/lmv load_module mdc/mdc load_module osc/osc diff --git a/lustre/utils/req-layout.c b/lustre/utils/req-layout.c index 2611093..e5fd0f8 100644 --- a/lustre/utils/req-layout.c +++ b/lustre/utils/req-layout.c @@ -50,7 +50,7 @@ #define __REQ_LAYOUT_USER__ (1) #define lustre_swab_generic_32s NULL -#define lustre_swab_lu_range NULL +#define lustre_swab_lu_seq_range NULL #define lustre_swab_md_fld NULL #define lustre_swab_mdt_body NULL #define lustre_swab_mdt_epoch NULL diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 02e5746..f881c82 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -2396,5 +2396,18 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((xattr_acl_header *)0)->a_entries) == 0, " found %lld\n", (long long)(int)sizeof(((xattr_acl_header *)0)->a_entries)); #endif + + /* check fid range */ + LASSERTF((int)sizeof(struct lu_seq_range) == 24, " found %lld\n", + (long long)(int)sizeof(struct lu_seq_range)); + LASSERTF((int)offsetof(struct lu_seq_range, lsr_start) == 0, " found %lld\n", + (long long)(int)offsetof(struct lu_seq_range, lsr_start)); + LASSERTF((int)offsetof(struct lu_seq_range, lsr_end) == 8, " found %lld\n", + (long long)(int)offsetof(struct lu_seq_range, lsr_end)); + LASSERTF((int)offsetof(struct lu_seq_range, lsr_mdt) == 16, " found %lld\n", + (long long)(int)offsetof(struct lu_seq_range, lsr_mdt)); + LASSERTF((int)offsetof(struct lu_seq_range, lsr_padding) == 20, " found %lld\n", + (long long)(int)offsetof(struct lu_seq_range, lsr_padding)); + }