From: yury Date: Mon, 12 Jun 2006 16:50:16 +0000 (+0000) Subject: - added working proto of sequences manager with super-, meta- sequence approach.... X-Git-Tag: v1_8_0_110~486^2~1643 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=8ec9d1cd5ff3fcaf89ce05023af8c55b9aed214c;p=fs%2Flustre-release.git - added working proto of sequences manager with super-, meta- sequence approach. Should be almost what we wanted to have. The only two things which are not done yet are: * sequence manager state is not saved/read to/from backing store for recovery; * meta-sequence allocation on an MDT may cause super-sequence allocation, that is cascading RPCs are possible. Lustre is mountable and mkdir proto is in working state. --- diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 52ab3a0..07d608f 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -32,6 +32,7 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include "cmm_internal.h" #include "mdc_internal.h" @@ -40,7 +41,7 @@ static int cmm_fld_lookup(const struct lu_fid *fid) { int rc; /* temporary hack for proto mkdir */ - rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE; + rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK; CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid)); RETURN(rc); } @@ -620,7 +621,7 @@ static int cmm_fld_lookup(const struct lu_fid *fid) { int rc; /* temporary hack for proto mkdir */ - rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE; + rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK; CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid)); RETURN(rc); } diff --git a/lustre/fid/Makefile.in b/lustre/fid/Makefile.in index 955d7b5..9afaed1 100644 --- a/lustre/fid/Makefile.in +++ b/lustre/fid/Makefile.in @@ -1,4 +1,4 @@ MODULES := fid -fid-objs := fid_seq.o fid_misc.o +fid-objs := fid_handle.o fid_lib.o @INCLUDE_RULES@ diff --git a/lustre/fid/autoMakefile.am b/lustre/fid/autoMakefile.am index f736429..e84a5a0 100644 --- a/lustre/fid/autoMakefile.am +++ b/lustre/fid/autoMakefile.am @@ -3,9 +3,18 @@ # This code is issued under the GNU General Public License. # See the file COPYING in this distribution +if LIBLUSTRE +noinst_LIBRARIES = libfid.a +libfid_a_SOURCES = fid_handle.c fid_lib.c fid_internal.h +libfid_a_CPPFLAGS = $(LLCPPFLAGS) +libfid_a_CFLAGS = $(LLCFLAGS) +endif + if MODULES modulefs_DATA = fid$(KMODEXT) endif +install-data-hook: $(install_data_hook) + MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ -DIST_SOURCES = $(fid-objs:%.o=%.c) +DIST_SOURCES = $(fid-objs:%.o=%.c) fid_internal.h diff --git a/lustre/fid/fid_handle.c b/lustre/fid/fid_handle.c new file mode 100644 index 0000000..e851eba --- /dev/null +++ b/lustre/fid/fid_handle.c @@ -0,0 +1,670 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/fid/fid_handle.c + * Lustre Sequence Manager + * + * Copyright (c) 2006 Cluster File Systems, Inc. + * Author: Yury Umanets + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_FID + +#ifdef __KERNEL__ +# include +# include +#else /* __KERNEL__ */ +# include +#endif + +#include +#include +#include +#include +#include +#include +#include "fid_internal.h" + +/* client seq mgr interface */ +static int +seq_client_alloc_common(struct lu_client_seq *seq, + struct lu_range *seq_ran, + __u32 seq_op) +{ + __u32 *op; + struct lu_range *range; + struct ptlrpc_request *req; + int ran_size = sizeof(*range); + int rc, size[] = {sizeof(*op), ran_size}; + int repsize[] = {ran_size}; + ENTRY; + + req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), + LUSTRE_MDS_VERSION, SEQ_QUERY, + 2, size, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op)); + *op = seq_op; + + range = lustre_msg_buf(req->rq_reqmsg, 1, ran_size); + *range = *seq_ran; + + req->rq_replen = lustre_msg_size(1, repsize); + req->rq_request_portal = MDS_SEQ_PORTAL; + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out_req, rc); + + range = lustre_swab_repbuf(req, 0, sizeof(*range), + lustre_swab_lu_range); + + LASSERT(range != NULL); + *seq_ran = *range; +out_req: + ptlrpc_req_finished(req); + RETURN(rc); +} + +/* request sequence-controller node to allocate new super-sequence. */ +int +seq_client_alloc_super(struct lu_client_seq *seq) +{ + int rc; + ENTRY; + + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER); + rc = seq_client_alloc_common(seq, &seq->seq_cl_range, + SEQ_ALLOC_SUPER); + if (rc == 0) { + CWARN("SEQ-MGR(cli): allocated super-sequence " + "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start, + seq->seq_cl_range.lr_end); + } + RETURN(rc); +} +EXPORT_SYMBOL(seq_client_alloc_super); + +/* request sequence-controller node to allocate new meta-sequence. */ +int +seq_client_alloc_meta(struct lu_client_seq *seq) +{ + int rc; + ENTRY; + + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); + rc = seq_client_alloc_common(seq, &seq->seq_cl_range, + SEQ_ALLOC_META); + if (rc == 0) { + CWARN("SEQ-MGR(cli): allocated meta-sequence " + "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start, + seq->seq_cl_range.lr_end); + } + RETURN(rc); +} +EXPORT_SYMBOL(seq_client_alloc_meta); + +/* allocate new sequence for client (llite or MDC are expected to use this) */ +int +seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr) +{ + int rc; + ENTRY; + + down(&seq->seq_sem); + + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); + LASSERT(range_is_sane(&seq->seq_cl_range)); + + /* if we still have free sequences in meta-sequence we allocate new seq + * from given range. */ + if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) { + *seqnr = seq->seq_cl_range.lr_start; + seq->seq_cl_range.lr_start += 1; + rc = 0; + } else { + /* meta-sequence is exhausted, request MDT to allocate new + * meta-sequence for us. */ + rc = seq_client_alloc_meta(seq); + if (rc) { + CERROR("can't allocate new meta-sequence, " + "rc %d\n", rc); + } + + *seqnr = seq->seq_cl_range.lr_start; + seq->seq_cl_range.lr_start += 1; + } + up(&seq->seq_sem); + + if (rc == 0) { + CWARN("SEQ-MGR(cli): allocated sequence " + "["LPX64"]\n", *seqnr); + } + RETURN(rc); +} +EXPORT_SYMBOL(seq_client_alloc_seq); + +int +seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) +{ + int rc; + ENTRY; + + LASSERT(fid != NULL); + LASSERT(fid_is_sane(&seq->seq_fid)); + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); + + down(&seq->seq_sem); + if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) { + *fid = seq->seq_fid; + seq->seq_fid.f_oid += 1; + rc = 0; + } else { + __u64 seqnr = 0; + + rc = seq_client_alloc_seq(seq, &seqnr); + if (rc) { + CERROR("can't allocate new sequence, " + "rc %d\n", rc); + GOTO(out, rc); + } else { + seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID; + seq->seq_fid.f_seq = seqnr; + seq->seq_fid.f_ver = 0; + + *fid = seq->seq_fid; + seq->seq_fid.f_oid += 1; + } + } + LASSERT(fid_is_sane(fid)); + + CWARN("SEQ-MGR(cli): allocated FID "DFID3"\n", + PFID3(fid)); + + EXIT; +out: + up(&seq->seq_sem); + return rc; +} +EXPORT_SYMBOL(seq_client_alloc_fid); + +int +seq_client_init(struct lu_client_seq *seq, + struct obd_export *exp, + int flags) +{ + int rc; + ENTRY; + + LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT | + LUSTRE_CLI_SEQ_SERVER)); + + seq->seq_flags = flags; + fid_zero(&seq->seq_fid); + sema_init(&seq->seq_sem, 1); + + seq->seq_cl_range.lr_end = 0; + seq->seq_cl_range.lr_start = 0; + + if (exp != NULL) + seq->seq_exp = class_export_get(exp); + + if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) { + __u64 seqnr = 0; + + /* client (llite or MDC) init case, we need new sequence from + * MDT. This will allocate new meta-sequemce first, because seq + * range in init state and looks the same as exhausted. */ + rc = seq_client_alloc_seq(seq, &seqnr); + if (rc) { + CERROR("can't allocate new sequence, rc %d\n", rc); + GOTO(out, rc); + } else { + seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID; + seq->seq_fid.f_seq = seqnr; + seq->seq_fid.f_ver = 0; + } + + LASSERT(fid_is_sane(&seq->seq_fid)); + } else { + /* check if this is controller node is trying to init client. */ + if (seq->seq_exp) { + /* MDT uses client seq manager to talk to sequence + * controller, and thus, we need super-sequence. */ + rc = seq_client_alloc_super(seq); + } else { + rc = 0; + } + } + + EXIT; +out: + if (rc) + seq_client_fini(seq); + else + CWARN("Client Sequence Manager initialized\n"); + return rc; +} +EXPORT_SYMBOL(seq_client_init); + +void seq_client_fini(struct lu_client_seq *seq) +{ + ENTRY; + if (seq->seq_exp != NULL) { + class_export_put(seq->seq_exp); + seq->seq_exp = NULL; + } + CWARN("Client Sequence Manager finalized\n"); + EXIT; +} +EXPORT_SYMBOL(seq_client_fini); + +#ifdef __KERNEL__ +/* server side seq mgr stuff */ +static const struct lu_range LUSTRE_SEQ_SUPER_INIT = { + LUSTRE_SEQ_SPACE_START, + LUSTRE_SEQ_SPACE_LIMIT +}; + +static const struct lu_range LUSTRE_SEQ_META_INIT = { + 0, + 0 +}; + +static int +seq_server_write_state(struct lu_server_seq *seq, + const struct lu_context *ctx) +{ + int rc = 0; + ENTRY; + + /* XXX: here should be calling struct dt_device methods to write + * sequence state to backing store. */ + + RETURN(rc); +} + +static int +seq_server_read_state(struct lu_server_seq *seq, + const struct lu_context *ctx) +{ + int rc = -ENODATA; + ENTRY; + + /* XXX: here should be calling struct dt_device methods to read the + * sequence state from backing store. */ + + RETURN(rc); +} + +static int +seq_server_alloc_super(struct lu_server_seq *seq, + struct lu_range *range) +{ + struct lu_range *ss_range = &seq->seq_ss_range; + int rc; + ENTRY; + + if (ss_range->lr_end - ss_range->lr_start < LUSTRE_SEQ_SUPER_CHUNK) { + CWARN("super-sequence is going to exhauste soon. " + "Only can allocate "LPU64" sequences\n", + ss_range->lr_end - ss_range->lr_start); + *range = *ss_range; + ss_range->lr_start = ss_range->lr_end; + rc = 0; + } else if (ss_range->lr_start >= ss_range->lr_end) { + CERROR("super-sequence is exhausted\n"); + rc = -ENOSPC; + } else { + range->lr_start = ss_range->lr_start; + ss_range->lr_start += LUSTRE_SEQ_SUPER_CHUNK; + range->lr_end = ss_range->lr_start; + rc = 0; + } + + if (rc == 0) { + CWARN("SEQ-MGR(srv): allocated super-sequence " + "["LPX64"-"LPX64"]\n", range->lr_start, + range->lr_end); + } + + RETURN(rc); +} + +static int +seq_server_alloc_meta(struct lu_server_seq *seq, + struct lu_range *range) +{ + struct lu_range *ms_range = &seq->seq_ms_range; + int rc; + ENTRY; + + LASSERT(range_is_sane(ms_range)); + + /* XXX: here should avoid cascading RPCs using kind of async + * preallocation when meta-sequence is close to exhausting. */ + if (ms_range->lr_start == ms_range->lr_end) { + if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) { + /* allocate new range of meta-sequences to allocate new + * meta-sequence from it. */ + rc = seq_server_alloc_super(seq, ms_range); + } else { + /* request controller to allocate new super-sequence for + * us.*/ + rc = seq_client_alloc_super(seq->seq_cli); + if (rc) { + CERROR("can't allocate new super-sequence, " + "rc %d\n", rc); + RETURN(rc); + } + + /* saving new range into allocation space. */ + *ms_range = seq->seq_cli->seq_cl_range; + } + + LASSERT(ms_range->lr_start != 0); + LASSERT(ms_range->lr_end > ms_range->lr_start); + } else { + rc = 0; + } + range->lr_start = ms_range->lr_start; + ms_range->lr_start += LUSTRE_SEQ_META_CHUNK; + range->lr_end = ms_range->lr_start; + + if (rc == 0) { + CWARN("SEQ-MGR(srv): allocated meta-sequence " + "["LPX64"-"LPX64"]\n", range->lr_start, + range->lr_end); + } + + RETURN(rc); +} + +static int +seq_server_handle(struct lu_server_seq *seq, + const struct lu_context *ctx, + struct lu_range *range, + __u32 opc) +{ + int rc; + ENTRY; + + down(&seq->seq_sem); + + switch (opc) { + case SEQ_ALLOC_SUPER: + rc = seq_server_alloc_super(seq, range); + break; + case SEQ_ALLOC_META: + rc = seq_server_alloc_meta(seq, range); + break; + default: + rc = -EINVAL; + break; + } + + if (rc) + GOTO(out, rc); + + rc = seq_server_write_state(seq, ctx); + if (rc) { + CERROR("can't save state, rc = %d\n", + rc); + } + + EXIT; +out: + up(&seq->seq_sem); + return rc; +} + +static int +seq_req_handle0(const struct lu_context *ctx, + struct lu_server_seq *seq, + struct ptlrpc_request *req) +{ + struct lu_range *in; + struct lu_range *out; + int size = sizeof(*in); + __u32 *opt; + int rc; + ENTRY; + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) + RETURN(rc); + + rc = -EPROTO; + opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), + lustre_swab_generic_32s); + if (opt != NULL) { + in = lustre_swab_reqbuf(req, 1, sizeof(*in), + lustre_swab_lu_range); + if (in != NULL) { + out = lustre_msg_buf(req->rq_repmsg, + 0, sizeof(*out)); + LASSERT(out != NULL); + + *out = *in; + rc = seq_server_handle(seq, ctx, out, *opt); + } else { + CERROR("Cannot unpack seq range\n"); + } + } else { + CERROR("Cannot unpack option\n"); + } + RETURN(rc); +} + +static int +seq_req_handle(struct ptlrpc_request *req) +{ + int fail = OBD_FAIL_SEQ_ALL_REPLY_NET; + const struct lu_context *ctx; + struct lu_site *site; + int rc = -EPROTO; + ENTRY; + + OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0); + + ctx = req->rq_svc_thread->t_ctx; + LASSERT(ctx != NULL); + LASSERT(ctx->lc_thread == req->rq_svc_thread); + if (req->rq_reqmsg->opc == SEQ_QUERY) { + if (req->rq_export != NULL) { + struct obd_device *obd; + + obd = req->rq_export->exp_obd; + site = obd->obd_lu_dev->ld_site; + LASSERT(site != NULL); + + rc = seq_req_handle0(ctx, site->ls_server_seq, req); + } else { + CERROR("Unconnected request\n"); + req->rq_status = -ENOTCONN; + GOTO(out, rc = -ENOTCONN); + } + } else { + CERROR("Wrong opcode: %d\n", + req->rq_reqmsg->opc); + req->rq_status = -ENOTSUPP; + rc = ptlrpc_error(req); + RETURN(rc); + } + + EXIT; +out: + target_send_reply(req, rc, fail); + return 0; +} + +int +seq_server_init(struct lu_server_seq *seq, + struct lu_client_seq *cli, + const struct lu_context *ctx, + struct dt_device *dev, + int flags) +{ + int rc; + ENTRY; + + struct ptlrpc_service_conf seq_conf = { + .psc_nbufs = MDS_NBUFS, + .psc_bufsize = MDS_BUFSIZE, + .psc_max_req_size = MDS_MAXREQSIZE, + .psc_max_reply_size = MDS_MAXREPSIZE, + .psc_req_portal = MDS_SEQ_PORTAL, + .psc_rep_portal = MDC_REPLY_PORTAL, + .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT, + .psc_num_threads = SEQ_NUM_THREADS + }; + + LASSERT(dev != NULL); + LASSERT(cli != NULL); + + LASSERT(flags & (LUSTRE_SRV_SEQ_CONTROLLER | + LUSTRE_SRV_SEQ_REGULAR)); + + seq->seq_dev = dev; + seq->seq_cli = cli; + seq->seq_flags = flags; + sema_init(&seq->seq_sem, 1); + + lu_device_get(&seq->seq_dev->dd_lu_dev); + + /* request backing store for saved sequence info */ + rc = seq_server_read_state(seq, ctx); + if (rc == -ENODATA) { + /* first run, no state on disk, init all seqs */ + if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) { + /* init super seq by start values on sequence-controller + * node.*/ + seq->seq_ss_range = LUSTRE_SEQ_SUPER_INIT; + } else { + /* take super-seq from client seq mgr */ + LASSERT(range_is_sane(&cli->seq_cl_range)); + seq->seq_ss_range = cli->seq_cl_range; + } + + /* init meta-sequence by start values and get ready for + * allocating it for clients. */ + seq->seq_ms_range = LUSTRE_SEQ_META_INIT; + + /* save init seq to backing store. */ + rc = seq_server_write_state(seq, ctx); + if (rc) { + CERROR("can't write sequence state, " + "rc = %d\n", rc); + GOTO(out, rc); + } + } else if (rc) { + CERROR("can't read sequence state, rc = %d\n", + rc); + GOTO(out, rc); + } + + seq->seq_service = ptlrpc_init_svc_conf(&seq_conf, + seq_req_handle, + LUSTRE_SEQ0_NAME, + seq->seq_proc_entry, + NULL); + if (seq->seq_service != NULL) + rc = ptlrpc_start_threads(NULL, seq->seq_service, + LUSTRE_SEQ0_NAME); + else + rc = -ENOMEM; + + EXIT; + +out: + if (rc) + seq_server_fini(seq, ctx); + else + CWARN("Server Sequence Manager initialized\n"); + return rc; +} +EXPORT_SYMBOL(seq_server_init); + +void +seq_server_fini(struct lu_server_seq *seq, + const struct lu_context *ctx) +{ + int rc; + + if (seq->seq_service != NULL) { + ptlrpc_unregister_service(seq->seq_service); + seq->seq_service = NULL; + } + + if (seq->seq_dev != NULL) { + rc = seq_server_write_state(seq, ctx); + if (rc) { + CERROR("can't save sequence state, " + "rc = %d\n", rc); + } + lu_device_put(&seq->seq_dev->dd_lu_dev); + seq->seq_dev = NULL; + } + + CWARN("Server Sequence Manager finalized\n"); +} +EXPORT_SYMBOL(seq_server_fini); + +static int fid_init(void) +{ + ENTRY; + CWARN("Lustre Sequence Manager\n"); + RETURN(0); +} + +static int fid_fini(void) +{ + ENTRY; + RETURN(0); +} + +static int +__init fid_mod_init(void) + +{ + /* init caches if any */ + fid_init(); + return 0; +} + +static void +__exit fid_mod_exit(void) +{ + /* free caches if any */ + fid_fini(); + return; +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Lustre FID Module"); +MODULE_LICENSE("GPL"); + +cfs_module(fid, "0.0.3", fid_mod_init, fid_mod_exit); +#endif diff --git a/lustre/fid/fid_internal.h b/lustre/fid/fid_internal.h new file mode 100644 index 0000000..1d99fe4 --- /dev/null +++ b/lustre/fid/fid_internal.h @@ -0,0 +1,31 @@ +/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * fid/fid_internal.h + * + * Copyright (C) 2006 Cluster File Systems, Inc. + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ +#ifndef _FID_INTERNAL_H +#define _FID_INTERNAL_H + +#define SEQ_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000) + +#endif diff --git a/lustre/fid/fid_misc.c b/lustre/fid/fid_lib.c similarity index 87% rename from lustre/fid/fid_misc.c rename to lustre/fid/fid_lib.c index 0488b30..14bad9e 100644 --- a/lustre/fid/fid_misc.c +++ b/lustre/fid/fid_lib.c @@ -1,7 +1,7 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/fid/fid_misc.c + * lustre/fid/fid_lib.c * Miscellaneous fid functions. * * Copyright (c) 2006 Cluster File Systems, Inc. @@ -26,10 +26,18 @@ * license text for more details. */ -#include -#include +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_FID + +#ifdef __KERNEL__ +# include +# include +#else /* __KERNEL__ */ +# include +#endif -#include #include void fid_to_le(struct lu_fid *dst, const struct lu_fid *src) diff --git a/lustre/fid/fid_seq.c b/lustre/fid/fid_seq.c deleted file mode 100644 index 1b6f5d2..0000000 --- a/lustre/fid/fid_seq.c +++ /dev/null @@ -1,144 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * lustre/fid/fid_seq.c - * Lustre File Id (fid) - * - * Copyright (c) 2006 Cluster File Systems, Inc. - * Author: Yury Umanets - * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. - * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. - * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. - * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. - */ - -#include - -#include -#include -#include - -/* sequence manager initialization/finalization stuff */ -struct lu_seq_mgr *seq_mgr_init(struct lu_seq_mgr_ops *ops, - void *opaque) -{ - struct lu_seq_mgr *mgr; - ENTRY; - - OBD_ALLOC_PTR(mgr); - if (!mgr) - RETURN(NULL); - - sema_init(&mgr->m_seq_sem, 1); - mgr->m_opaque = opaque; - mgr->m_ops = ops; - - RETURN(mgr); -} -EXPORT_SYMBOL(seq_mgr_init); - -void seq_mgr_fini(struct lu_seq_mgr *mgr) -{ - OBD_FREE_PTR(mgr); -} -EXPORT_SYMBOL(seq_mgr_fini); - -int seq_mgr_write(const struct lu_context *ctx, struct lu_seq_mgr *mgr) -{ - ENTRY; - RETURN(mgr->m_ops->smo_write(ctx, mgr->m_opaque, &mgr->m_seq)); -} -EXPORT_SYMBOL(seq_mgr_write); - -int seq_mgr_read(const struct lu_context *ctx, struct lu_seq_mgr *mgr) -{ - ENTRY; - RETURN(mgr->m_ops->smo_read(ctx, mgr->m_opaque, &mgr->m_seq)); -} -EXPORT_SYMBOL(seq_mgr_read); - -/* manager functionality stuff */ -int seq_mgr_alloc(const struct lu_context *ctx, - struct lu_seq_mgr *mgr, - __u64 *seq) -{ - int rc; - ENTRY; - - LASSERT(mgr != NULL); - LASSERT(seq != NULL); - - down(&mgr->m_seq_sem); - if (mgr->m_seq > mgr->m_seq_last) { - /* new range of seqs should be got from master */ - rc = -EOPNOTSUPP; - } else { - *seq = mgr->m_seq; - mgr->m_seq++; - - rc = seq_mgr_write(ctx, mgr); - } - up(&mgr->m_seq_sem); - RETURN(rc); -} -EXPORT_SYMBOL(seq_mgr_alloc); - -/* initialize meta-sequence. First of all try to get it from lower layer, - * falling down to back store one. In the case this is first run and there is - * not meta-sequence initialized yet - store it to backstore. */ -int seq_mgr_setup(const struct lu_context *ctx, struct lu_seq_mgr *mgr) -{ - int rc = 0; - ENTRY; - - /* set seq range */ - mgr->m_seq_last = mgr->m_seq + LUSTRE_SEQ_RANGE; - - /* allocate next seq after root one */ - mgr->m_seq += LUSTRE_ROOT_FID_SEQ + 1; - - rc = seq_mgr_read(ctx, mgr); - if (rc == -ENODATA) { - CWARN("initialize sequence by defaut ["LPU64"]\n", mgr->m_seq); - - /* initialize new sequence config as it is not yet created. */ - rc = seq_mgr_write(ctx, mgr); - } - - EXIT; - if (rc == 0) - CWARN("using start sequence: ["LPU64"]\n", mgr->m_seq); - return rc; -} -EXPORT_SYMBOL(seq_mgr_setup); - -static int __init fid_mod_init(void) -{ - /* some stuff will be here (cache initializing, etc.) */ - return 0; -} - -static void __exit fid_mod_exit(void) -{ - /* some stuff will be here */ -} - -MODULE_AUTHOR("Cluster File Systems, Inc. "); -MODULE_DESCRIPTION("Lustre FID Module"); -MODULE_LICENSE("GPL"); - -cfs_module(fid, "0.0.2", fid_mod_init, fid_mod_exit); diff --git a/lustre/fld/fld_internal.h b/lustre/fld/fld_internal.h index 3c3f71c..6822d91 100644 --- a/lustre/fld/fld_internal.h +++ b/lustre/fld/fld_internal.h @@ -45,12 +45,14 @@ struct fld_cache_info { spinlock_t fld_lock; int fld_hash_mask; }; + /*XXX use linked list temp for fld in this prototype*/ struct fld_list { struct list_head fld_list; spinlock_t fld_lock; }; -struct fld_item{ + +struct fld_item { struct list_head fld_list; __u64 fld_seq; __u64 fld_mds; diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 2c9172c..5c0eaf6 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -470,6 +470,19 @@ struct lu_site { */ struct lu_fld *ls_fld; + /* + * Server Seq Manager + */ + struct lu_server_seq *ls_server_seq; + + /* + * Clienbt Seq Manager + */ + struct lu_client_seq *ls_client_seq; + + /* sequence controller node */ + struct obd_export *ls_controller; + /* statistical counters. Protected by nothing, races are accepted. */ struct { __u32 s_created; diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index f64b7fb..a79965a 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -98,6 +98,7 @@ #define MGS_REPLY_PORTAL 27 #define OST_REQUEST_PORTAL 28 #define MDS_FLD_PORTAL 29 +#define MDS_SEQ_PORTAL 30 #define SVC_KILLED 1 #define SVC_EVENT 2 @@ -123,6 +124,18 @@ #define LUSTRE_LOG_VERSION 0x00050000 #define LUSTRE_MGS_VERSION 0x00060000 +struct lu_range { + __u64 lr_start; + __u64 lr_end; +}; + +static inline int range_is_sane(struct lu_range *r) +{ + if (r->lr_end >= r->lr_start) + return 1; + return 0; +} + struct lu_fid { __u64 f_seq; /* holds fid sequence. Lustre should support 2 ^ 64 * objects, thus even if one sequence has one object we @@ -138,12 +151,6 @@ enum { LUSTRE_ROOT_FID_SEQ = 1ULL, /* XXX: should go into mkfs. */ LUSTRE_ROOT_FID_OID = 2UL, /* XXX: should go into mkfs. */ - /* maximal objects in sequence */ - LUSTRE_FID_SEQ_WIDTH = 10000, - - /* range of seqs for one MDS */ - LUSTRE_SEQ_RANGE = 1000, - /* initial fid id value */ LUSTRE_FID_INIT_OID = 1UL }; @@ -171,6 +178,11 @@ static inline int fid_seq_is_sane(__u64 seq) return seq != 0; } +static inline void fid_zero(struct lu_fid *fid) +{ + memset(fid, 0, sizeof(*fid)); +} + static inline int fid_is_sane(const struct lu_fid *fid) { return @@ -185,7 +197,8 @@ static inline int fid_is_sane(const struct lu_fid *fid) fid_oid(fid), \ fid_ver(fid) -extern void lustre_swab_lu_fid (struct lu_fid *fid); +extern void lustre_swab_lu_fid(struct lu_fid *fid); +extern void lustre_swab_lu_range(struct lu_range *range); static inline int lu_fid_eq(const struct lu_fid *f0, const struct lu_fid *f1) @@ -372,11 +385,11 @@ struct obd_connect_data { __u32 ocd_index; /* LOV index to connect to */ __u32 ocd_unused; __u64 ocd_ibits_known; /* inode bits this client understands */ - __u64 ocd_seq; /* sequence info for client */ __u64 padding2; /* also fix lustre_swab_connect */ __u64 padding3; /* also fix lustre_swab_connect */ __u64 padding4; /* also fix lustre_swab_connect */ __u64 padding5; /* also fix lustre_swab_connect */ + __u64 padding6; /* also fix lustre_swab_connect */ }; extern void lustre_swab_connect(struct obd_connect_data *ocd); @@ -1124,6 +1137,17 @@ enum fld_rpc_opc { FLD_FIRST_OPC = FLD_QUERY }; +enum seq_rpc_opc { + SEQ_QUERY = 700, + SEQ_LAST_OPC, + SEQ_FIRST_OPC = SEQ_QUERY +}; + +enum seq_op { + SEQ_ALLOC_SUPER = 0, + SEQ_ALLOC_META = 1 +}; + /* * LOV data structures */ diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 0376d3a..69be22c 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -31,47 +31,121 @@ #include #include +struct lu_site; struct lu_context; -struct lu_seq_mgr_ops { - int (*smo_read) (const struct lu_context *, void *opaque, __u64 *); - int (*smo_write) (const struct lu_context *, void *opaque, __u64 *); + +/* start seq number */ +#define LUSTRE_SEQ_SPACE_START 0x400 + +/* maximal posible seq number */ +#define LUSTRE_SEQ_SPACE_LIMIT ((__u64)~0ULL) + +/* this is how may FIDs may be allocated in one sequence. */ +#define LUSTRE_SEQ_WIDTH 0x00000000000002800 + +/* how many sequences may be allocate for meta-sequence (this is 10240 + * sequences). */ +#define LUSTRE_SEQ_META_CHUNK 0x00000000000002800 + +/* how many sequences may be allocate for super-sequence (this is 10240 * 10240 + * sequences), what means that one alloaction for super-sequence allows to + * allocate 10240 meta-sequences and each of them may have 10240 sequences. */ +#define LUSTRE_SEQ_SUPER_CHUNK (LUSTRE_SEQ_META_CHUNK * LUSTRE_SEQ_META_CHUNK) + +/* client sequence manager interface */ +struct lu_client_seq { + /* sequence-controller export. */ + struct obd_export *seq_exp; + struct semaphore seq_sem; + + /* different flags. */ + int seq_flags; + + /* range of allowed for allocation sequeces. When using lu_client_seq on + * clients, this contains meta-sequence range. And for servers this + * contains super-sequence range. */ + struct lu_range seq_cl_range; + + /* seq related proc */ + struct proc_dir_entry *seq_proc_entry; + + /* this holds last allocated fid in last obtained seq */ + struct lu_fid seq_fid; }; -struct lu_seq_mgr { - /* seq management fields */ - struct semaphore m_seq_sem; - /* each MDS has own range of seqs ended with this value - * if it is overflowed the new one should be got from master node */ - __u64 m_seq_last; - /* last allocated seq */ - __u64 m_seq; - /* ops related stuff */ - void *m_opaque; - struct lu_seq_mgr_ops *m_ops; +#ifdef __KERNEL__ +/* server sequence manager interface */ +struct lu_server_seq { + /* super-sequence range, all super-sequences for other servers are + * allocated from it. */ + struct lu_range seq_ss_range; + + /* meta-sequence range, all meta-sequences for clients are allocated + * from it. */ + struct lu_range seq_ms_range; + + /* device for server side seq manager needs (saving sequences to backing + * store). */ + struct dt_device *seq_dev; + + /* different flags: LUSTRE_SEQ_CONTROLLER, etc. */ + int seq_flags; + + /* seq related proc */ + struct proc_dir_entry *seq_proc_entry; + + /* server side seq service */ + struct ptlrpc_service *seq_service; + + /* client interafce to request controller */ + struct lu_client_seq *seq_cli; + + /* semaphore for protecting allocation */ + struct semaphore seq_sem; }; +#endif -/* init/fini methods */ -struct lu_seq_mgr *seq_mgr_init(struct lu_seq_mgr_ops *, void *); -void seq_mgr_fini(struct lu_seq_mgr *); - -/* seq management methods */ -int seq_mgr_setup(const struct lu_context *, struct lu_seq_mgr *); -int seq_mgr_read(const struct lu_context *, struct lu_seq_mgr *); -int seq_mgr_write(const struct lu_context *, struct lu_seq_mgr *); -int seq_mgr_alloc(const struct lu_context *, struct lu_seq_mgr *, __u64 *); -int seq_mgr_range_alloc(const struct lu_context *, - struct lu_seq_mgr *, __u64 *); -struct lu_site; -#if 0 -int fid_is_local(struct lu_site *site, const struct lu_fid *fid); -#else -static inline int fid_is_local(struct lu_site *site, const struct lu_fid *fid) +/* client seq mgr flags */ +#define LUSTRE_CLI_SEQ_CLIENT (1 << 0) +#define LUSTRE_CLI_SEQ_SERVER (1 << 1) + +#ifdef __KERNEL__ +/* server seq mgr flags */ +#define LUSTRE_SRV_SEQ_CONTROLLER (1 << 0) +#define LUSTRE_SRV_SEQ_REGULAR (1 << 1) + +int seq_server_init(struct lu_server_seq *seq, + struct lu_client_seq *cli, + const struct lu_context *ctx, + struct dt_device *dev, + int flags); + +void seq_server_fini(struct lu_server_seq *seq, + const struct lu_context *ctx) ; +#endif + +int seq_client_init(struct lu_client_seq *seq, + struct obd_export *exp, + int flags); + +void seq_client_fini(struct lu_client_seq *seq); + +int seq_client_alloc_super(struct lu_client_seq *seq); +int seq_client_alloc_meta(struct lu_client_seq *seq); + +int seq_client_alloc_seq(struct lu_client_seq *seq, + __u64 *seqnr); +int seq_client_alloc_fid(struct lu_client_seq *seq, + struct lu_fid *fid); + +/* Fids common stuff */ +static inline int fid_is_local(struct lu_site *site, + const struct lu_fid *fid) { + /* XXX: fix this when fld is ready. */ return 1; } -#endif void fid_to_le(struct lu_fid *dst, const struct lu_fid *src); - #endif /* __LINUX_OBD_CLASS_H */ diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 049c122..c5ed0f9 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -96,6 +96,8 @@ num_physpages >> (25 - PAGE_SHIFT)), 2UL) #define FLD_NUM_THREADS max(min_t(unsigned long, MDT_MAX_THREADS, \ num_physpages >> (25 - PAGE_SHIFT)), 2UL) +#define SEQ_NUM_THREADS max(min_t(unsigned long, MDT_MAX_THREADS, \ + num_physpages >> (25 - PAGE_SHIFT)), 2UL) #define MDS_MAX_THREADS 512UL #define MDS_DEF_THREADS max(2UL, min_t(unsigned long, 32, \ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 5d95c0d3..6355d46 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -385,9 +385,8 @@ struct client_obd { /* used by quotacheck */ int cl_qchk_stat; /* quotacheck stat of the peer */ - /* this holds last allocated fid in last obtained seq */ - struct lu_fid cl_fid; - spinlock_t cl_fid_lock; + /* sequence manager */ + struct lu_client_seq *cl_seq; }; #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid) @@ -540,7 +539,7 @@ struct niobuf_local { #define LUSTRE_OPC_MKDIR (1 << 0) #define LUSTRE_OPC_SYMLINK (1 << 1) -#define LUSTRE_OPC_MKNODE (1 << 2) +#define LUSTRE_OPC_MKNOD (1 << 2) #define LUSTRE_OPC_CREATE (1 << 3) struct lu_placement_hint { @@ -560,6 +559,7 @@ struct lu_placement_hint { #define LUSTRE_MDD0_NAME "mdd0" #define LUSTRE_OSD0_NAME "osd0" #define LUSTRE_FLD0_NAME "fld0" +#define LUSTRE_SEQ0_NAME "seq0" #define LUSTRE_MDC0_NAME "mdc0" #define LUSTRE_MDC_NAME "mdc" @@ -829,7 +829,10 @@ struct obd_ops { struct obd_connect_data *ocd); int (*o_disconnect)(struct obd_export *exp); - /* may be later these should be moved into separate fid_ops */ + /* maybe later these should be moved into separate fid_ops */ + int (*o_fid_init)(struct obd_export *exp); + int (*o_fid_fini)(struct obd_export *exp); + int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid, struct lu_placement_hint *hint); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 8580a10..5f6b113 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -720,6 +720,34 @@ static inline int obd_disconnect(struct obd_export *exp) RETURN(rc); } +static inline int obd_fid_init(struct obd_export *exp) +{ + int rc; + ENTRY; + + if (OBP(exp->exp_obd, fid_init) == NULL) + RETURN(-ENOTSUPP); + + OBD_COUNTER_INCREMENT(exp->exp_obd, fid_init); + + rc = OBP(exp->exp_obd, fid_init)(exp); + RETURN(rc); +} + +static inline int obd_fid_fini(struct obd_export *exp) +{ + int rc; + ENTRY; + + if (OBP(exp->exp_obd, fid_fini) == NULL) + RETURN(-ENOTSUPP); + + OBD_COUNTER_INCREMENT(exp->exp_obd, fid_fini); + + rc = OBP(exp->exp_obd, fid_fini)(exp); + RETURN(rc); +} + static inline int obd_fid_alloc(struct obd_export *exp, struct lu_fid *fid, struct lu_placement_hint *hint) diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index beca205..5c45e85 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -169,6 +169,9 @@ extern cfs_waitq_t obd_race_waitq; #define OBD_FAIL_MGS 0x900 #define OBD_FAIL_MGS_ALL_REQUEST_NET 0x901 #define OBD_FAIL_MGS_ALL_REPLY_NET 0x902 +#define OBD_FAIL_SEQ 0x1000 +#define OBD_FAIL_SEQ_ALL_REQUEST_NET 0x1001 +#define OBD_FAIL_SEQ_ALL_REPLY_NET 0x1002 /* preparation for a more advanced failure testbed (not functional yet) */ #define OBD_FAIL_MASK_SYS 0x0000FF00 diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 9d9f9ab..08c7f64 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -257,9 +257,6 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) cli->cl_r_in_flight = 0; cli->cl_w_in_flight = 0; - memset(&cli->cl_fid, 0, sizeof(struct lu_fid)); - spin_lock_init(&cli->cl_fid_lock); - spin_lock_init(&cli->cl_read_rpc_hist.oh_lock); spin_lock_init(&cli->cl_write_rpc_hist.oh_lock); spin_lock_init(&cli->cl_read_page_hist.oh_lock); @@ -380,9 +377,6 @@ int client_connect_import(struct lustre_handle *dlm_handle, imp->imp_connect_flags_orig = data->ocd_connect_flags; } - /* zero out sequence to check it later for validness */ - ocd->ocd_seq = 0; - rc = ptlrpc_connect_import(imp, NULL); if (rc != 0) { LASSERT (imp->imp_state == LUSTRE_IMP_DISCON); diff --git a/lustre/liblustre/Makefile.am b/lustre/liblustre/Makefile.am index 1efe1e1..a905c18 100644 --- a/lustre/liblustre/Makefile.am +++ b/lustre/liblustre/Makefile.am @@ -13,6 +13,7 @@ LUSTRE_LIBS = libllite.a \ $(top_builddir)/lustre/lov/liblov.a \ $(top_builddir)/lustre/obdecho/libobdecho.a \ $(top_builddir)/lustre/osc/libosc.a \ + $(top_builddir)/lustre/fid/libfid.a \ $(top_builddir)/lustre/mdc/libmdc.a \ $(top_builddir)/lustre/ptlrpc/libptlrpc.a \ $(top_builddir)/lustre/obdclass/liblustreclass.a \ diff --git a/lustre/liblustre/genlib.sh b/lustre/liblustre/genlib.sh index 6d977b3..2f8f9b4 100755 --- a/lustre/liblustre/genlib.sh +++ b/lustre/liblustre/genlib.sh @@ -63,6 +63,7 @@ build_obj_list ../lov liblov.a build_obj_list ../obdecho libobdecho.a build_obj_list ../osc libosc.a build_obj_list ../mdc libmdc.a +build_obj_list ../fid libfid.a build_obj_list ../ptlrpc libptlrpc.a build_obj_list ../obdclass liblustreclass.a build_obj_list ../lvfs liblvfs.a diff --git a/lustre/liblustre/llite_fid.c b/lustre/liblustre/llite_fid.c index d1c4c01..b2d09b6 100644 --- a/lustre/liblustre/llite_fid.c +++ b/lustre/liblustre/llite_fid.c @@ -49,39 +49,96 @@ #include "lutil.h" #include "llite_lib.h" #include +#include + +static int llu_fid_alloc(struct obd_export *exp, struct lu_fid *fid, + struct lu_placement_hint *hint) +{ + int rc; + ENTRY; + rc = obd_fid_alloc(exp, fid, hint); + RETURN(rc); +} /* allocates passed fid, that is assigns f_num and f_seq to the @fid */ -int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid) +int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid, + struct lu_placement_hint *hint) { ENTRY; + RETURN(llu_fid_alloc(sbi->ll_md_exp, fid, hint)); +} - if (sbi->ll_fid.f_oid < LUSTRE_FID_SEQ_WIDTH) { - sbi->ll_fid.f_oid += 1; - *fid = sbi->ll_fid; - } else { - CERROR("sequence is exhausted. Switching to " - "new one is not yet implemented\n"); - RETURN(-ERANGE); +/* allocates passed fid, that is assigns f_num and f_seq to the @fid */ +int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid, + struct lu_placement_hint *hint) +{ + ENTRY; + RETURN(llu_fid_alloc(sbi->ll_dt_exp, fid, hint)); +} + +static int llu_fid_init(struct obd_export *exp) +{ + int rc; + ENTRY; + + rc = obd_fid_init(exp); + if (rc) { + CERROR("cannot initialize FIDs framework, " + "rc %d\n", rc); + RETURN(rc); } - - RETURN(0); + + RETURN(rc); } -/* allocates passed fid, that is assigns f_num and f_seq to the @fid */ -int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid) +int llu_fid_md_init(struct llu_sb_info *sbi) +{ + ENTRY; + RETURN(llu_fid_init(sbi->ll_md_exp)); +} + +int llu_fid_dt_init(struct llu_sb_info *sbi) +{ + ENTRY; + RETURN(llu_fid_init(sbi->ll_dt_exp)); +} + +static int llu_fid_fini(struct obd_export *exp) +{ + int rc; + ENTRY; + + rc = obd_fid_fini(exp); + if (rc) { + CERROR("cannot finalize FIDs framework, " + "rc %d\n", rc); + RETURN(rc); + } + + RETURN(rc); +} + +int llu_fid_md_fini(struct llu_sb_info *sbi) +{ + ENTRY; + RETURN(llu_fid_fini(sbi->ll_md_exp)); +} + +int llu_fid_dt_fini(struct llu_sb_info *sbi) { ENTRY; - RETURN(-EOPNOTSUPP); + RETURN(llu_fid_fini(sbi->ll_dt_exp)); } /* build inode number on passed @fid */ -unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, struct lu_fid *fid) +unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, + struct lu_fid *fid) { unsigned long ino; ENTRY; /* very stupid and having many downsides inode allocation algorithm * based on fid. */ - ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid); + ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(fid); RETURN(ino); } diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index 10376ce..a9436e4 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -37,8 +37,6 @@ struct llu_sb_info { struct obd_uuid ll_mds_uuid; struct obd_uuid ll_mds_peer_uuid; char *ll_instance; - - struct lu_fid ll_fid; }; #define LL_SBI_NOLCK 0x1 @@ -241,9 +239,20 @@ ssize_t llu_iop_filldirentries(struct inode *ino, _SYSIO_OFF_T *basep, char *buf, size_t nbytes); /* liblustre/llite_fid.c*/ -int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid); -int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid); -unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, struct lu_fid *fid); +int llu_fid_md_init(struct llu_sb_info *sbi); +int llu_fid_dt_init(struct llu_sb_info *sbi); + +int llu_fid_md_fini(struct llu_sb_info *sbi); +int llu_fid_dt_fini(struct llu_sb_info *sbi); + +int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid, + struct lu_placement_hint *hint); + +int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid, + struct lu_placement_hint *hint); + +unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, + struct lu_fid *fid); /* ext2 related */ #define EXT2_NAME_LEN (255) diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c index 14b0518..977ecbb 100644 --- a/lustre/liblustre/namei.c +++ b/lustre/liblustre/namei.c @@ -443,8 +443,13 @@ static int llu_lookup_it(struct inode *parent, struct pnode *pnode, icbd.icbd_parent = parent; /* allocate new fid for child */ - if (it->it_op == IT_OPEN || it->it_op == IT_CREAT) { - rc = llu_fid_md_alloc(llu_i2sbi(parent), &op_data.fid2); + if (it->it_op & IT_CREAT || + (it->it_op & IT_OPEN && it->it_create_mode & O_CREAT)) { + struct lu_placement_hint hint = { .ph_pname = NULL, + .ph_cname = &pnode->p_base->pb_name, + .ph_opc = LUSTRE_OPC_CREATE }; + + rc = llu_fid_md_alloc(llu_i2sbi(parent), &op_data.fid2, &hint); if (rc) { CERROR("can't allocate new fid, rc %d\n", rc); LBUG(); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 09d91c6..f5fa4fc 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -850,6 +850,11 @@ static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt) struct ptlrpc_request *request = NULL; struct llu_sb_info *sbi = llu_i2sbi(dir); struct md_op_data op_data = { { 0 } }; + struct lu_placement_hint hint = { + .ph_pname = NULL, + .ph_cname = qstr, + .ph_opc = LUSTRE_OPC_SYMLINK + }; int err = -EMLINK; ENTRY; @@ -858,7 +863,7 @@ static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt) RETURN(err); /* allocate new fid */ - err = llu_fid_md_alloc(sbi, &op_data.fid2); + err = llu_fid_md_alloc(sbi, &op_data.fid2, &hint); if (err) { CERROR("can't allocate new fid, rc %d\n", err); RETURN(err); @@ -968,6 +973,11 @@ static int llu_iop_mknod_raw(struct pnode *pno, struct llu_sb_info *sbi = llu_i2sbi(dir); struct md_op_data op_data = { { 0 } }; int err = -EMLINK; + struct lu_placement_hint hint = { + .ph_pname = NULL, + .ph_cname = &pno->p_base->pb_name, + .ph_opc = LUSTRE_OPC_MKNOD + }; ENTRY; liblustre_wait_event(0); @@ -987,7 +997,7 @@ static int llu_iop_mknod_raw(struct pnode *pno, case S_IFIFO: case S_IFSOCK: /* allocate new fid */ - err = llu_fid_md_alloc(sbi, &op_data.fid2); + err = llu_fid_md_alloc(sbi, &op_data.fid2, &hint); if (err) { CERROR("can't allocate new fid, rc %d\n", err); RETURN(err); @@ -1216,6 +1226,12 @@ static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode) struct ptlrpc_request *request = NULL; struct intnl_stat *st = llu_i2stat(dir); struct md_op_data op_data = { { 0 } }; + struct lu_placement_hint hint = { + .ph_pname = NULL, + .ph_cname = qstr, + .ph_opc = LUSTRE_OPC_MKDIR + }; + int err = -EMLINK; ENTRY; @@ -1227,7 +1243,7 @@ static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode) RETURN(err); /* allocate new fid */ - err = llu_fid_md_alloc(llu_i2sbi(dir), &op_data.fid2); + err = llu_fid_md_alloc(llu_i2sbi(dir), &op_data.fid2, &hint); if (err) { CERROR("can't allocate new fid, rc %d\n", err); RETURN(err); @@ -1819,11 +1835,9 @@ llu_fsswop_mount(const char *source, if (err) GOTO(out_mdc, err); - /* initializing ->ll_fid. It is known that root object has separate - * sequence, so that we use what MDS returned to us and do not check if - * f_oid collides with root or not. */ - sbi->ll_fid.f_seq = ocd.ocd_seq; - sbi->ll_fid.f_oid = LUSTRE_FID_INIT_OID; + err = llu_fid_md_init(sbi); + if (err) + GOTO(out_mdc, err); /* * FIXME fill fs stat data into sbi here!!! FIXME @@ -1833,7 +1847,7 @@ llu_fsswop_mount(const char *source, obd = class_name2obd(osc); if (!obd) { CERROR("OSC %s: not setup or attached\n", osc); - GOTO(out_mdc, err = -EINVAL); + GOTO(out_md_fid, err = -EINVAL); } obd_set_info_async(obd->obd_self_export, strlen("async"), "async", sizeof(async), &async, NULL); @@ -1852,12 +1866,16 @@ llu_fsswop_mount(const char *source, sbi->ll_dt_exp = class_conn2export(&osc_conn); sbi->ll_lco.lco_flags = ocd.ocd_connect_flags; + err = llu_fid_dt_init(sbi); + if (err) + GOTO(out_osc, err); + llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp); err = md_getstatus(sbi->ll_md_exp, &rootfid); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out_osc, err); + GOTO(out_dt_fid, err); } CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid)); sbi->ll_root_fid = rootfid; @@ -1911,8 +1929,12 @@ out_inode: _sysio_i_gone(root); out_request: ptlrpc_req_finished(request); +out_dt_fid: + llu_fid_dt_fini(sbi); out_osc: obd_disconnect(sbi->ll_dt_exp); +out_md_fid: + llu_fid_md_fini(sbi); out_mdc: obd_disconnect(sbi->ll_md_exp); out_free: diff --git a/lustre/llite/llite_fid.c b/lustre/llite/llite_fid.c index ee9d8ae..a2335e1 100644 --- a/lustre/llite/llite_fid.c +++ b/lustre/llite/llite_fid.c @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -67,14 +68,75 @@ int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid, RETURN(ll_fid_alloc(sbi->ll_dt_exp, fid, hint)); } +static int ll_fid_init(struct obd_export *exp) +{ + int rc; + ENTRY; + + rc = obd_fid_init(exp); + if (rc) { + CERROR("cannot initialize FIDs framework, " + "rc %d\n", rc); + RETURN(rc); + } + + RETURN(rc); +} + +int ll_fid_md_init(struct ll_sb_info *sbi) +{ + ENTRY; + RETURN(ll_fid_init(sbi->ll_md_exp)); +} + +int ll_fid_dt_init(struct ll_sb_info *sbi) +{ +#if 0 + ENTRY; + RETURN(ll_fid_init(sbi->ll_dt_exp)); +#endif + /* XXX: enable this again when OSD is starting sequence-management + * service. */ + ENTRY; + RETURN(0); +} + +static int ll_fid_fini(struct obd_export *exp) +{ + int rc; + ENTRY; + + rc = obd_fid_fini(exp); + if (rc) { + CERROR("cannot finalize FIDs framework, " + "rc %d\n", rc); + RETURN(rc); + } + + RETURN(rc); +} + +int ll_fid_md_fini(struct ll_sb_info *sbi) +{ + ENTRY; + RETURN(ll_fid_fini(sbi->ll_md_exp)); +} + +int ll_fid_dt_fini(struct ll_sb_info *sbi) +{ + ENTRY; + RETURN(ll_fid_fini(sbi->ll_dt_exp)); +} + /* build inode number on passed @fid */ -ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid) +ino_t ll_fid_build_ino(struct ll_sb_info *sbi, + struct lu_fid *fid) { ino_t ino; ENTRY; /* very stupid and having many downsides inode allocation algorithm * based on fid. */ - ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid); + ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(fid); RETURN(ino); } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 9771b99..4b26a70 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -591,6 +591,12 @@ ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size); int ll_removexattr(struct dentry *dentry, const char *name); /* llite/llite_fid.c*/ +int ll_fid_md_init(struct ll_sb_info *sbi); +int ll_fid_dt_init(struct ll_sb_info *sbi); + +int ll_fid_md_fini(struct ll_sb_info *sbi); +int ll_fid_dt_fini(struct ll_sb_info *sbi); + int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid, struct lu_placement_hint *hint); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 913a704..908498b 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -244,10 +244,17 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc) strlen(sbi2mdc(sbi)->cl_target_uuid.uuid)); #endif + /* init FIDs framework */ + err = ll_fid_md_init(sbi); + if (err) { + CERROR("can't init FIDs framework, rc %d\n", err); + GOTO(out_mdc, err); + } + obd = class_name2obd(osc); if (!obd) { CERROR("OSC %s: not setup or attached\n", osc); - GOTO(out_mdc, err = -ENODEV); + GOTO(out_md_fid, err = -ENODEV); } data->ocd_connect_flags = @@ -298,10 +305,17 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc) GOTO(out_osc, -ENOMEM); } + /* init FIDs framework */ + err = ll_fid_dt_init(sbi); + if (err) { + CERROR("can't init FIDs framework, rc %d\n", err); + GOTO(out_osc, err); + } + err = md_getstatus(sbi->ll_md_exp, &rootfid); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out_osc, err); + GOTO(out_dt_fid, err); } CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid)); sbi->ll_root_fid = rootfid; @@ -329,7 +343,7 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc) GOTO(out_osc, err); } - LASSERT(fid_oid(&sbi->ll_root_fid) != 0); + LASSERT(fid_is_sane(&sbi->ll_root_fid)); root = ll_iget(sb, ll_fid_build_ino(sbi, &sbi->ll_root_fid), &md); ptlrpc_req_finished(request); @@ -363,8 +377,12 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc) out_root: if (root) iput(root); +out_dt_fid: + obd_fid_fini(sbi->ll_dt_exp); out_osc: obd_disconnect(sbi->ll_dt_exp); +out_md_fid: + obd_fid_fini(sbi->ll_md_exp); out_mdc: obd_disconnect(sbi->ll_md_exp); out: @@ -502,6 +520,7 @@ void client_common_put_super(struct super_block *sb) prune_deathrow(sbi, 0); list_del(&sbi->ll_conn_chain); + ll_fid_dt_fini(sbi); obd_disconnect(sbi->ll_dt_exp); lprocfs_unregister_mountpoint(sbi); @@ -510,6 +529,7 @@ void client_common_put_super(struct super_block *sb) sbi->ll_proc_root = NULL; } + ll_fid_md_fini(sbi); obd_disconnect(sbi->ll_md_exp); lustre_throw_orphan_dentries(sb); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index b36fde4..922ffba 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -547,6 +547,11 @@ static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode, struct inode *inode = NULL; struct ll_sb_info *sbi = ll_i2sbi(dir); struct md_op_data op_data = { { 0 } }; + struct lu_placement_hint hint = { + .ph_pname = NULL, + .ph_cname = name, + .ph_opc = LUSTRE_OPC_MKNOD + }; int err; ENTRY; @@ -564,6 +569,9 @@ static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode, case S_IFBLK: case S_IFIFO: case S_IFSOCK: + err = ll_fid_md_alloc(sbi, &op_data.fid2, &hint); + if (err) + break; ll_prepare_md_op_data(&op_data, dir, NULL, name->name, name->len, 0); err = md_create(sbi->ll_md_exp, &op_data, NULL, 0, mode, diff --git a/lustre/lmv/lmv_fld.c b/lustre/lmv/lmv_fld.c index 99a5cab..6810f09 100644 --- a/lustre/lmv/lmv_fld.c +++ b/lustre/lmv/lmv_fld.c @@ -37,6 +37,7 @@ #include #include +#include #include #include #include @@ -53,7 +54,7 @@ int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid) LASSERT(fid_is_sane(fid)); /* temporary hack until fld will works */ - rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE; + rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK; CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid)); RETURN(rc); } diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 33c7125..dd04eca 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -670,6 +670,42 @@ static int lmv_placment_policy(struct obd_device *obd, RETURN(0); } +static int lmv_fid_init(struct obd_export *exp) +{ + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + int i, rc = 0; + ENTRY; + + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) + continue; + + rc = obd_fid_init(lmv->tgts[i].ltd_exp); + if (rc) + RETURN(rc); + } + RETURN(rc); +} + +static int lmv_fid_fini(struct obd_export *exp) +{ + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + int i, rc = 0; + ENTRY; + + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) + continue; + + rc = obd_fid_fini(lmv->tgts[i].ltd_exp); + if (rc) + break; + } + RETURN(rc); +} + static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, struct lu_placement_hint *hint) { @@ -2385,6 +2421,8 @@ struct obd_ops lmv_obd_ops = { .o_packmd = lmv_packmd, .o_unpackmd = lmv_unpackmd, .o_notify = lmv_notify, + .o_fid_init = lmv_fid_init, + .o_fid_fini = lmv_fid_fini, .o_fid_alloc = lmv_fid_alloc, .o_fid_delete = lmv_fid_delete, .o_iocontrol = lmv_iocontrol diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 3ea198b..e43081d 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -38,6 +38,7 @@ #include #include +#include #include /* for LUSTRE_POSIX_ACL_MAX_SIZE */ #include #include @@ -1087,30 +1088,50 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, RETURN(rc); } -static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct lu_placement_hint *hint) +static int mdc_fid_init(struct obd_export *exp) { struct client_obd *cli = &exp->exp_obd->u.cli; - int rc = 0; + int rc; ENTRY; - LASSERT(fid != NULL); - LASSERT(hint != NULL); - LASSERT(fid_seq_is_sane(fid_seq(&cli->cl_fid))); + OBD_ALLOC_PTR(cli->cl_seq); + if (cli->cl_seq == NULL) + RETURN(-ENOMEM); - spin_lock(&cli->cl_fid_lock); - if (fid_oid(&cli->cl_fid) < LUSTRE_FID_SEQ_WIDTH) { - cli->cl_fid.f_oid += 1; - *fid = cli->cl_fid; - } else { - CERROR("sequence is exhausted. Switching to " - "new one is not yet implemented\n"); - rc = -ERANGE; + /* init client side of sequence-manager */ + rc = seq_client_init(cli->cl_seq, exp, + LUSTRE_CLI_SEQ_CLIENT); + if (rc) { + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; } - spin_unlock(&cli->cl_fid_lock); RETURN(rc); } +static int mdc_fid_fini(struct obd_export *exp) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + ENTRY; + + if (cli->cl_seq != NULL) { + seq_client_fini(cli->cl_seq); + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + + RETURN(0); +} + +static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, + struct lu_placement_hint *hint) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + struct lu_client_seq *seq = cli->cl_seq; + + ENTRY; + RETURN(seq_client_alloc_fid(seq, fid)); +} + static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { struct client_obd *cli = &obd->u.cli; @@ -1259,6 +1280,8 @@ struct obd_ops mdc_obd_ops = { .o_statfs = mdc_statfs, .o_pin = mdc_pin, .o_unpin = mdc_unpin, + .o_fid_init = mdc_fid_init, + .o_fid_fini = mdc_fid_fini, .o_fid_alloc = mdc_fid_alloc, .o_import_event = mdc_import_event, .o_llog_init = mdc_llog_init, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 33658aa..a742aba 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -293,22 +293,8 @@ static int mdt_connect(struct mdt_thread_info *info) req = mdt_info_req(info); result = target_handle_connect(req, mdt_handle); if (result == 0) { - struct obd_connect_data *data; - LASSERT(req->rq_export != NULL); info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); - - /* - * XXX: this is incorrect, because target_handle_connect() - * accessed and *swabbed* connect data bypassing - * capsule. Correct fix is to switch everything to the new - * req-layout interface. - */ - data = req_capsule_server_get(pill, &RMF_CONNECT_DATA); - - result = seq_mgr_alloc(info->mti_ctxt, - info->mti_mdt->mdt_seq_mgr, - &data->ocd_seq); } return result; } @@ -1469,41 +1455,130 @@ static int mdt_config(const struct lu_context *ctx, struct mdt_device *m, RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode)); } -static int mdt_seq_mgr_hpr(const struct lu_context *ctx, - void *opaque, __u64 *seq, int mode) +/* + * Seq wrappers + */ +static int mdt_seq_init(const struct lu_context *ctx, + struct mdt_device *m) { - struct mdt_device *m = opaque; + struct lu_site *ls; int rc; ENTRY; - rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ, - seq, sizeof(*seq), mode); + ls = m->mdt_md_dev.md_lu_dev.ld_site; + + OBD_ALLOC_PTR(ls->ls_client_seq); + + if (ls->ls_client_seq != NULL) { + rc = seq_client_init(ls->ls_client_seq, + ls->ls_controller, + LUSTRE_CLI_SEQ_SERVER); + } else + rc = -ENOMEM; + + if (rc) + RETURN(rc); + + OBD_ALLOC_PTR(ls->ls_server_seq); + + if (ls->ls_server_seq != NULL) { + int flags; + + flags = (ls->ls_node_id == 0) ? + LUSTRE_SRV_SEQ_CONTROLLER : + LUSTRE_SRV_SEQ_REGULAR; + + rc = seq_server_init(ls->ls_server_seq, + ls->ls_client_seq, + ctx, m->mdt_bottom, + flags); + } else + rc = -ENOMEM; + RETURN(rc); } -static int mdt_seq_mgr_read(const struct lu_context *ctx, - void *opaque, __u64 *seq) +static int mdt_seq_fini(const struct lu_context *ctx, + struct mdt_device *m) { + struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site; ENTRY; - RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET)); + + if (ls->ls_server_seq) { + seq_server_fini(ls->ls_server_seq, ctx); + OBD_FREE_PTR(ls->ls_server_seq); + ls->ls_server_seq = NULL; + } + if (ls->ls_client_seq) { + seq_client_fini(ls->ls_client_seq); + OBD_FREE_PTR(ls->ls_client_seq); + ls->ls_client_seq = NULL; + } + RETURN(0); } -static int mdt_seq_mgr_write(const struct lu_context *ctx, - void *opaque, __u64 *seq) +static int mdt_controller_init(struct mdt_device *m, + struct lustre_cfg *cfg) { + struct obd_device *mdc; + struct obd_uuid uuid; + struct lu_site *ls; + char *uuid_str; + int rc, index; ENTRY; - RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET)); + + index = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10); + if (index != 0) + RETURN(0); + + uuid_str = lustre_cfg_string(cfg, 1); + obd_str2uuid(&uuid, uuid_str); + mdc = class_find_client_obd(&uuid, LUSTRE_MDC_NAME, NULL); + if (!mdc) { + CERROR("can't find controller MDC by uuid %s\n", + uuid_str); + rc = -ENOENT; + } else if (!mdc->obd_set_up) { + CERROR("target %s not set up\n", mdc->obd_name); + rc = -EINVAL; + } else { + struct lustre_handle conn = {0, }; + + CDEBUG(D_CONFIG, "connect to controller %s(%s)\n", + mdc->obd_name, mdc->obd_uuid.uuid); + + rc = obd_connect(&conn, mdc, &mdc->obd_uuid, NULL); + + if (rc) { + CERROR("target %s connect error %d\n", + mdc->obd_name, rc); + } else { + ls = m->mdt_md_dev.md_lu_dev.ld_site; + ls->ls_controller = class_conn2export(&conn); + } + } + + RETURN(rc); } -struct lu_seq_mgr_ops seq_mgr_ops = { - .smo_read = mdt_seq_mgr_read, - .smo_write = mdt_seq_mgr_write -}; +static int mdt_controller_fini(struct mdt_device *m) +{ + struct lu_site *ls; + int rc; + ENTRY; + + ls = m->mdt_md_dev.md_lu_dev.ld_site; + if (ls && ls->ls_controller) { + rc = obd_disconnect(ls->ls_controller); + ls->ls_controller = NULL; + } + + RETURN(rc); +} /* * FLD wrappers */ - static int mdt_fld_init(const struct lu_context *ctx, struct mdt_device *m) { struct lu_site *ls; @@ -1525,16 +1600,16 @@ static int mdt_fld_init(const struct lu_context *ctx, struct mdt_device *m) static int mdt_fld_fini(const struct lu_context *ctx, struct mdt_device *m) { struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site; - + ENTRY; + if (ls && ls->ls_fld) { fld_server_fini(ctx, ls->ls_fld); OBD_FREE_PTR(ls->ls_fld); } - return 0; + RETURN(0); } /* device init/fini methods */ - static void mdt_stop_ptlrpc_service(struct mdt_device *m) { if (m->mdt_service != NULL) { @@ -1723,6 +1798,8 @@ static void mdt_fini(struct mdt_device *m) mdt_stack_fini(&ctx, m, md2lu_dev(m->mdt_child)); mdt_fld_fini(&ctx, m); + mdt_seq_fini(&ctx, m); + mdt_controller_fini(m); LASSERT(atomic_read(&d->ld_ref) == 0); md_device_fini(&m->mdt_md_dev); @@ -1732,11 +1809,6 @@ static void mdt_fini(struct mdt_device *m) m->mdt_namespace = NULL; } - if (m->mdt_seq_mgr) { - seq_mgr_fini(m->mdt_seq_mgr); - m->mdt_seq_mgr = NULL; - } - if (d->ld_site != NULL) { lu_site_fini(d->ld_site); OBD_FREE_PTR(d->ld_site); @@ -1794,27 +1866,22 @@ static int mdt_init0(struct mdt_device *m, LASSERT(num); s->ls_node_id = simple_strtol(num, NULL, 10); - m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m); - if (!m->mdt_seq_mgr) { - CERROR("can't initialize sequence manager\n"); - GOTO(err_fini_stack, rc); - } - /* set initial sequence by mds index */ - m->mdt_seq_mgr->m_seq = s->ls_node_id * LUSTRE_SEQ_RANGE; - - /* init sequence info after device stack is initialized. */ - rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr); - lu_context_exit(&ctx); if (rc) - GOTO(err_fini_mgr, rc); + GOTO(err_fini_stack, rc); lu_context_enter(&ctx); rc = mdt_fld_init(&ctx, m); lu_context_exit(&ctx); if (rc) - GOTO(err_free_fld, rc); + GOTO(err_fini_fld, rc); + lu_context_enter(&ctx); + rc = mdt_seq_init(&ctx, m); + lu_context_exit(&ctx); + if (rc) + GOTO(err_fini_seq, rc); + lu_context_fini(&ctx); snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m); @@ -1824,7 +1891,6 @@ static int mdt_init0(struct mdt_device *m, ldlm_register_intent(m->mdt_namespace, mdt_intent_policy); - rc = mdt_start_ptlrpc_service(m); if (rc) GOTO(err_free_ns, rc); @@ -1837,11 +1903,10 @@ static int mdt_init0(struct mdt_device *m, err_free_ns: ldlm_namespace_free(m->mdt_namespace, 0); m->mdt_namespace = NULL; -err_free_fld: +err_fini_seq: + mdt_seq_fini(&ctx, m); +err_fini_fld: mdt_fld_fini(&ctx, m); -err_fini_mgr: - seq_mgr_fini(m->mdt_seq_mgr); - m->mdt_seq_mgr = NULL; err_fini_ctx: lu_context_exit(&ctx); lu_context_fini(&ctx); @@ -1862,6 +1927,14 @@ static int mdt_process_config(const struct lu_context *ctx, ENTRY; switch (cfg->lcfg_command) { + case LCFG_ADD_MDC: + /* add mdc hook to get first MDT uuid and connect it to + * ls->controller to use for seq manager. */ + err = mdt_controller_init(mdt_dev(d), cfg); + if (err) { + CERROR("can't initialize controller export, " + "rc %d\n", err); + } /* all MDT specific commands should be here */ default: /* others are passed further */ diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index bbbb734..22106e5 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -65,10 +65,6 @@ struct mdt_device { * necessary. */ unsigned long mdt_flags; - - /* Seq management related stuff */ - struct lu_seq_mgr *mdt_seq_mgr; - struct dt_device *mdt_bottom; /* * Options bit-fields. diff --git a/lustre/mgs/mgs_llog.c b/lustre/mgs/mgs_llog.c index 898bba1..afe61fe 100644 --- a/lustre/mgs/mgs_llog.c +++ b/lustre/mgs/mgs_llog.c @@ -1308,8 +1308,8 @@ static int mgs_write_log_mds(struct obd_device *obd, struct fs_db *fsdb, /* Add the ost info to the client/mdt lov */ static int mgs_write_log_osc_to_lov(struct obd_device *obd, struct fs_db *fsdb, - struct mgs_target_info *mti, - char *logname, char *lovname, int flags) + struct mgs_target_info *mti, + char *logname, char *lovname, int flags) { struct llog_handle *llh = NULL; char *nodeuuid, *oscname, *oscuuid, *lovuuid; diff --git a/lustre/obdclass/llog_swab.c b/lustre/obdclass/llog_swab.c index fd138af..91a331d 100644 --- a/lustre/obdclass/llog_swab.c +++ b/lustre/obdclass/llog_swab.c @@ -93,6 +93,13 @@ void lustre_swab_lu_fid(struct lu_fid *fid) } EXPORT_SYMBOL(lustre_swab_lu_fid); +void lustre_swab_lu_range(struct lu_range *range) +{ + __swab64s (&range->lr_start); + __swab64s (&range->lr_end); +} +EXPORT_SYMBOL(lustre_swab_lu_range); + void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail) { __swab32s(&rec->lrh_len); diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 0c3aeb8..88488e9 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -667,6 +667,8 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats) LPROCFS_OBD_OP_INIT(num_private_stats, stats, connect); LPROCFS_OBD_OP_INIT(num_private_stats, stats, reconnect); LPROCFS_OBD_OP_INIT(num_private_stats, stats, disconnect); + LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_init); + LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_fini); LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_alloc); LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_delete); LPROCFS_OBD_OP_INIT(num_private_stats, stats, statfs); diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index a92c3b1..9844e06 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -275,7 +276,7 @@ static __u32 fid_hash(const struct lu_fid *f) { /* all objects with same id and different versions will belong to same * collisions list. */ - return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f); + return (fid_seq(f) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(f); } /* diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index bb9fd86..747c70f 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -564,7 +564,6 @@ finish: RETURN(0); } } else { - struct client_obd *cli = &imp->imp_obd->u.cli; struct obd_connect_data *ocd; struct obd_export *exp; @@ -576,12 +575,6 @@ finish: GOTO(out, rc); } - /* get correct start fid from connect data sent by server. */ - spin_lock(&cli->cl_fid_lock); - cli->cl_fid.f_seq = ocd->ocd_seq; - cli->cl_fid.f_oid = LUSTRE_FID_INIT_OID; - spin_unlock(&cli->cl_fid_lock); - spin_lock_irqsave(&imp->imp_lock, flags); /* diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index d2deec3..b30e12f 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -509,11 +509,11 @@ void lustre_swab_connect(struct obd_connect_data *ocd) __swab32s (&ocd->ocd_index); __swab32s (&ocd->ocd_unused); __swab64s (&ocd->ocd_ibits_known); - __swab64s (&ocd->ocd_seq); CLASSERT(offsetof(typeof(*ocd), padding2) != 0); CLASSERT(offsetof(typeof(*ocd), padding3) != 0); CLASSERT(offsetof(typeof(*ocd), padding4) != 0); CLASSERT(offsetof(typeof(*ocd), padding5) != 0); + CLASSERT(offsetof(typeof(*ocd), padding6) != 0); } void lustre_swab_obdo (struct obdo *o)