From: wangdi Date: Tue, 24 Sep 2013 09:59:57 +0000 (-0700) Subject: LU-1445 ofd: Add fid support on OFD X-Git-Tag: 2.3.59~24 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=00e814ff75c4da15a769f18736c70ab1e6a5b174;p=fs%2Flustre-release.git LU-1445 ofd: Add fid support on OFD 1. Init fid server related structure in OFD, which will become a sequence metaserver, i.e. MDT will request sequence from OST(OFD), OFD will request super sequence from MDT0 2. OST FID sequence width will be 4 billion, which is different with MDS FID. Change-Id: If861cf5814ea77ce72464525a8de71ac182d742c Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/4326 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Mike Pershin Reviewed-by: Andreas Dilger --- diff --git a/lustre/fid/fid_handler.c b/lustre/fid/fid_handler.c index d4c2611..e36935f 100644 --- a/lustre/fid/fid_handler.c +++ b/lustre/fid/fid_handler.c @@ -80,11 +80,11 @@ int seq_server_set_cli(struct lu_server_seq *seq, GOTO(out_up, rc = 0); } - if (seq->lss_cli != NULL) { - CERROR("%s: Sequence controller is already " - "assigned\n", seq->lss_name); - GOTO(out_up, rc = -EINVAL); - } + if (seq->lss_cli != NULL) { + CDEBUG(D_HA, "%s: Sequence controller is already " + "assigned\n", seq->lss_name); + GOTO(out_up, rc = -EEXIST); + } CDEBUG(D_INFO, "%s: Attached sequence controller %s\n", seq->lss_name, cli->lcs_name); @@ -365,7 +365,7 @@ static int seq_req_handle(struct ptlrpc_request *req, LU_KEY_INIT_FINI(seq, struct seq_thread_info); /* context key: seq_thread_key */ -LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD); +LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD | LCT_DT_THREAD); static void seq_thread_info_init(struct ptlrpc_request *req, struct seq_thread_info *info) @@ -557,6 +557,33 @@ void seq_server_fini(struct lu_server_seq *seq, } EXPORT_SYMBOL(seq_server_fini); +int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss) +{ + if (ss == NULL) + RETURN(0); + + if (ss->ss_server_seq) { + seq_server_fini(ss->ss_server_seq, env); + OBD_FREE_PTR(ss->ss_server_seq); + ss->ss_server_seq = NULL; + } + + if (ss->ss_control_seq) { + seq_server_fini(ss->ss_control_seq, env); + OBD_FREE_PTR(ss->ss_control_seq); + ss->ss_control_seq = NULL; + } + + if (ss->ss_client_seq) { + seq_client_fini(ss->ss_client_seq); + OBD_FREE_PTR(ss->ss_client_seq); + ss->ss_client_seq = NULL; + } + + RETURN(0); +} +EXPORT_SYMBOL(seq_site_fini); + cfs_proc_dir_entry_t *seq_type_proc_dir = NULL; static int __init fid_mod_init(void) diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index 7bfcc57..413d0f5 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -278,11 +278,11 @@ int seq_client_get_seq(const struct lu_env *env, CDEBUG(D_INFO, "%s: allocate sequence " "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); - /*Since the caller require the whole seq, - *so marked this seq to be used*/ - seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH; - seq->lcs_fid.f_seq = *seqnr; - seq->lcs_fid.f_ver = 0; + /*Since the caller require the whole seq, + *so marked this seq to be used*/ + seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; + seq->lcs_fid.f_seq = *seqnr; + seq->lcs_fid.f_ver = 0; /* * Inform caller that sequence switch is performed to allow it @@ -460,36 +460,37 @@ int seq_client_init(struct lu_client_seq *seq, const char *prefix, struct lu_server_seq *srv) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(seq != NULL); - LASSERT(prefix != NULL); + LASSERT(seq != NULL); + LASSERT(prefix != NULL); + + seq->lcs_srv = srv; + seq->lcs_type = type; - seq->lcs_exp = exp; - seq->lcs_srv = srv; - seq->lcs_type = type; mutex_init(&seq->lcs_mutex); - seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH; - cfs_waitq_init(&seq->lcs_waitq); + if (type == LUSTRE_SEQ_METADATA) + seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; - /* Make sure that things are clear before work is started. */ - seq_client_flush(seq); + cfs_waitq_init(&seq->lcs_waitq); + /* Make sure that things are clear before work is started. */ + seq_client_flush(seq); - if (exp == NULL) { - LASSERT(seq->lcs_srv != NULL); - } else { - LASSERT(seq->lcs_exp != NULL); - seq->lcs_exp = class_export_get(seq->lcs_exp); - } + if (exp != NULL) + seq->lcs_exp = class_export_get(exp); + else if (type == LUSTRE_SEQ_METADATA) + LASSERT(seq->lcs_srv != NULL); - snprintf(seq->lcs_name, sizeof(seq->lcs_name), - "cli-%s", prefix); + snprintf(seq->lcs_name, sizeof(seq->lcs_name), + "cli-%s", prefix); - rc = seq_client_proc_init(seq); - if (rc) - seq_client_fini(seq); - RETURN(rc); + rc = seq_client_proc_init(seq); + if (rc) + seq_client_fini(seq); + RETURN(rc); } EXPORT_SYMBOL(seq_client_init); diff --git a/lustre/fid/lproc_fid.c b/lustre/fid/lproc_fid.c index 664bc94..3bb44f2 100644 --- a/lustre/fid/lproc_fid.c +++ b/lustre/fid/lproc_fid.c @@ -275,14 +275,14 @@ seq_client_proc_write_width(struct file *file, const char *buffer, RETURN(rc); } - if (val <= LUSTRE_SEQ_MAX_WIDTH && val > 0) { - seq->lcs_width = val; + if (val <= LUSTRE_METADATA_SEQ_MAX_WIDTH && val > 0) { + seq->lcs_width = val; - if (rc == 0) { - CDEBUG(D_INFO, "%s: Sequence size: "LPU64"\n", - seq->lcs_name, seq->lcs_width); - } - } + if (rc == 0) { + CDEBUG(D_INFO, "%s: Sequence size: "LPU64"\n", + seq->lcs_name, seq->lcs_width); + } + } mutex_unlock(&seq->lcs_mutex); diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 0275400..99b88be 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -169,26 +169,31 @@ extern const struct lu_fid LU_OBF_FID; extern const struct lu_fid LU_DOT_LUSTRE_FID; enum { - /* - * This is how may FIDs may be allocated in one sequence(128k) - */ - LUSTRE_SEQ_MAX_WIDTH = 0x0000000000020000ULL, + /* + * This is how may metadata FIDs may be allocated in one sequence(128k) + */ + LUSTRE_METADATA_SEQ_MAX_WIDTH = 0x0000000000020000ULL, - /* - * How many sequences to allocate to a client at once. - */ - LUSTRE_SEQ_META_WIDTH = 0x0000000000000001ULL, + /* + * This is how many data FIDs could be allocated in one sequence(4B - 1) + */ + LUSTRE_DATA_SEQ_MAX_WIDTH = 0x00000000FFFFFFFFULL, - /* - * seq allocation pool size. - */ - LUSTRE_SEQ_BATCH_WIDTH = LUSTRE_SEQ_META_WIDTH * 1000, + /* + * How many sequences to allocate to a client at once. + */ + LUSTRE_SEQ_META_WIDTH = 0x0000000000000001ULL, - /* - * This is how many sequences may be in one super-sequence allocated to - * MDTs. - */ - LUSTRE_SEQ_SUPER_WIDTH = ((1ULL << 30ULL) * LUSTRE_SEQ_META_WIDTH) + /* + * seq allocation pool size. + */ + LUSTRE_SEQ_BATCH_WIDTH = LUSTRE_SEQ_META_WIDTH * 1000, + + /* + * This is how many sequences may be in one super-sequence allocated to + * MDTs. + */ + LUSTRE_SEQ_SUPER_WIDTH = ((1ULL << 30ULL) * LUSTRE_SEQ_META_WIDTH) }; enum { @@ -427,6 +432,7 @@ int seq_client_alloc_fid(const struct lu_env *env, struct lu_client_seq *seq, int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq, seqno_t *seqnr); +int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss); /* Fids common stuff */ int fid_is_local(const struct lu_env *env, struct lu_site *site, const struct lu_fid *fid); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 53cb6ae..a44945c 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -3753,31 +3753,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, static int mdt_seq_fini(const struct lu_env *env, struct mdt_device *m) { - struct seq_server_site *ss = mdt_seq_site(m); - ENTRY; - - if (ss == NULL) - RETURN(0); - - if (ss->ss_server_seq) { - seq_server_fini(ss->ss_server_seq, env); - OBD_FREE_PTR(ss->ss_server_seq); - ss->ss_server_seq = NULL; - } - - if (ss->ss_control_seq) { - seq_server_fini(ss->ss_control_seq, env); - OBD_FREE_PTR(ss->ss_control_seq); - ss->ss_control_seq = NULL; - } - - if (ss->ss_client_seq) { - seq_client_fini(ss->ss_client_seq); - OBD_FREE_PTR(ss->ss_client_seq); - ss->ss_client_seq = NULL; - } - - RETURN(0); + return seq_site_fini(env, mdt_seq_site(m)); } static int mdt_seq_init(const struct lu_env *env, diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index f1b4f9e..6250aa6 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -2876,11 +2876,11 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, if (cfs_copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1)) return -EFAULT; - max_count = LUSTRE_SEQ_MAX_WIDTH; - if (cfs_copy_to_user(data->ioc_pbuf2, &max_count, - data->ioc_plen2)) - return -EFAULT; - GOTO(out, rc); + max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH; + if (cfs_copy_to_user(data->ioc_pbuf2, &max_count, + data->ioc_plen2)) + return -EFAULT; + GOTO(out, rc); } case OBD_IOC_DESTROY: if (!cfs_capable(CFS_CAP_SYS_ADMIN)) diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index e8c809f..d12729a 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -44,6 +44,7 @@ #include #include +#include #include "ofd_internal.h" @@ -479,6 +480,82 @@ static int ofd_procfs_fini(struct ofd_device *ofd) extern int ost_handle(struct ptlrpc_request *req); +int ofd_fid_fini(const struct lu_env *env, struct ofd_device *ofd) +{ + return seq_site_fini(env, &ofd->ofd_seq_site); +} + +int ofd_fid_init(const struct lu_env *env, struct ofd_device *ofd) +{ + struct seq_server_site *ss = &ofd->ofd_seq_site; + struct lu_device *lu = &ofd->ofd_dt_dev.dd_lu_dev; + char *obd_name = ofd_name(ofd); + char *name = NULL; + int rc = 0; + + ss = &ofd->ofd_seq_site; + lu->ld_site->ld_seq_site = ss; + ss->ss_lu = lu->ld_site; + ss->ss_node_id = ofd->ofd_lut.lut_lsd.lsd_osd_index; + + OBD_ALLOC_PTR(ss->ss_server_seq); + if (ss->ss_server_seq == NULL) + GOTO(out_free, rc = -ENOMEM); + + OBD_ALLOC(name, strlen(obd_name) + 10); + if (!name) { + OBD_FREE_PTR(ss->ss_server_seq); + ss->ss_server_seq = NULL; + GOTO(out_free, rc = -ENOMEM); + } + + rc = seq_server_init(ss->ss_server_seq, ofd->ofd_osd, obd_name, + LUSTRE_SEQ_SERVER, ss, env); + if (rc) { + CERROR("%s : seq server init error %d\n", obd_name, rc); + GOTO(out_free, rc); + } + ss->ss_server_seq->lss_space.lsr_index = ss->ss_node_id; + + OBD_ALLOC_PTR(ss->ss_client_seq); + if (ss->ss_client_seq == NULL) + GOTO(out_free, -ENOMEM); + + snprintf(name, strlen(obd_name) + 6, "%p-super", obd_name); + rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_DATA, + name, NULL); + if (rc) { + CERROR("%s : seq client init error %d\n", obd_name, rc); + GOTO(out_free, rc); + } + OBD_FREE(name, strlen(obd_name) + 10); + name = NULL; + + rc = seq_server_set_cli(ss->ss_server_seq, ss->ss_client_seq, env); + +out_free: + if (rc) { + if (ss->ss_server_seq) { + seq_server_fini(ss->ss_server_seq, env); + OBD_FREE_PTR(ss->ss_server_seq); + ss->ss_server_seq = NULL; + } + + if (ss->ss_client_seq) { + seq_client_fini(ss->ss_client_seq); + OBD_FREE_PTR(ss->ss_client_seq); + ss->ss_client_seq = NULL; + } + + if (name) { + OBD_FREE(name, strlen(obd_name) + 10); + name = NULL; + } + } + + return rc; +} + static int ofd_init0(const struct lu_env *env, struct ofd_device *m, struct lu_device_type *ldt, struct lustre_cfg *cfg) { diff --git a/lustre/ofd/ofd_fs.c b/lustre/ofd/ofd_fs.c index e85ba4d..d7fd08d 100644 --- a/lustre/ofd/ofd_fs.c +++ b/lustre/ofd/ofd_fs.c @@ -189,11 +189,28 @@ int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd, RETURN(rc); } +static void ofd_deregister_seq_exp(struct ofd_device *ofd) +{ + struct seq_server_site *ss = &ofd->ofd_seq_site; + + if (ss->ss_client_seq != NULL) { + lustre_deregister_osp_item(&ss->ss_client_seq->lcs_exp); + ss->ss_client_seq->lcs_exp = NULL; + } +} + void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd) { struct ofd_seq *oseq; struct ofd_seq *tmp; cfs_list_t dispose; + int rc; + + ofd_deregister_seq_exp(ofd); + + rc = ofd_fid_fini(env, ofd); + if (rc != 0) + CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc); CFS_INIT_LIST_HEAD(&dispose); write_lock(&ofd->ofd_seq_list_lock); @@ -288,13 +305,50 @@ cleanup: return ERR_PTR(rc); } +static int ofd_register_seq_exp(struct ofd_device *ofd) +{ + struct seq_server_site *ss = &ofd->ofd_seq_site; + char *osp_name = NULL; + int rc; + + OBD_ALLOC(osp_name, MAX_OBD_NAME); + if (osp_name == NULL) + GOTO(out_free, rc = -ENOMEM); + + rc = tgt_name2ospname(ofd_name(ofd), osp_name); + if (rc != 0) + GOTO(out_free, rc); + + rc = lustre_register_osp_item(osp_name, &ss->ss_client_seq->lcs_exp, + NULL, NULL); +out_free: + if (osp_name != NULL) + OBD_FREE(osp_name, MAX_OBD_NAME); + + return rc; +} + /* object sequence management */ int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd) { + int rc; + + rc = ofd_fid_init(env, ofd); + if (rc != 0) { + CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc); + return rc; + } + + rc = ofd_register_seq_exp(ofd); + if (rc) { + CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc); + return rc; + } + rwlock_init(&ofd->ofd_seq_list_lock); CFS_INIT_LIST_HEAD(&ofd->ofd_seq_list); ofd->ofd_seq_count = 0; - return 0; + return rc; } int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd, diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index 128bcfc..d346990 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -185,6 +185,7 @@ struct ofd_device { /* shall we grant space to clients not * supporting OBD_CONNECT_GRANT_PARAM? */ ofd_grant_compat_disable:1; + struct seq_server_site ofd_seq_site; }; static inline struct ofd_device *ofd_dev(struct lu_device *d) @@ -488,6 +489,12 @@ void ofd_fmd_drop(struct obd_export *exp, struct lu_fid *fid); #define ofd_fmd_drop(exp, fid) do {} while (0) #endif +/* ofd_dev.c */ +int ofd_fid_set_index(const struct lu_env *env, struct ofd_device *ofd, + int index); +int ofd_fid_init(const struct lu_env *env, struct ofd_device *ofd); +int ofd_fid_fini(const struct lu_env *env, struct ofd_device *ofd); + /* ofd_lvb.c */ extern struct ldlm_valblock_ops ofd_lvbo; diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 3b16bc9..b4dd026 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -1186,8 +1186,11 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, ofd_obd(ofd)->obd_name); GOTO(out, rc = 0); } - /* only precreate if seq == 0 and o_id is specfied */ - if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) { + /* only precreate if seq is 0, IDIF or normal and also o_id + * must be specfied */ + if ((!fid_seq_is_mdt(oa->o_seq) && + !fid_seq_is_norm(oa->o_seq) && + !fid_seq_is_idif(oa->o_seq)) || oa->o_id == 0) { diff = 1; /* shouldn't we create this right now? */ } else { diff = oa->o_id - ofd_seq_last_oid(oseq);