#define DEBUG_SUBSYSTEM S_MDS
+#include <lustre_fid.h>
#include "cmm_internal.h"
#include "mdc_internal.h"
{
int rc;
/* temporary hack for proto mkdir */
- rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+ rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
RETURN(rc);
}
{
int rc;
/* temporary hack for proto mkdir */
- rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+ rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
RETURN(rc);
}
MODULES := fid
-fid-objs := fid_seq.o fid_misc.o
+fid-objs := fid_handle.o fid_lib.o
@INCLUDE_RULES@
# This code is issued under the GNU General Public License.
# See the file COPYING in this distribution
+if LIBLUSTRE
+noinst_LIBRARIES = libfid.a
+libfid_a_SOURCES = fid_handle.c fid_lib.c fid_internal.h
+libfid_a_CPPFLAGS = $(LLCPPFLAGS)
+libfid_a_CFLAGS = $(LLCFLAGS)
+endif
+
if MODULES
modulefs_DATA = fid$(KMODEXT)
endif
+install-data-hook: $(install_data_hook)
+
MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
-DIST_SOURCES = $(fid-objs:%.o=%.c)
+DIST_SOURCES = $(fid-objs:%.o=%.c) fid_internal.h
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * lustre/fid/fid_handle.c
+ * Lustre Sequence Manager
+ *
+ * Copyright (c) 2006 Cluster File Systems, Inc.
+ * Author: Yury Umanets <umka@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FID
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <dt_object.h>
+#include <md_object.h>
+#include <obd_support.h>
+#include <lustre_fid.h>
+#include "fid_internal.h"
+
+/* client seq mgr interface */
+static int
+seq_client_alloc_common(struct lu_client_seq *seq,
+ struct lu_range *seq_ran,
+ __u32 seq_op)
+{
+ __u32 *op;
+ struct lu_range *range;
+ struct ptlrpc_request *req;
+ int ran_size = sizeof(*range);
+ int rc, size[] = {sizeof(*op), ran_size};
+ int repsize[] = {ran_size};
+ ENTRY;
+
+ req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp),
+ LUSTRE_MDS_VERSION, SEQ_QUERY,
+ 2, size, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
+ *op = seq_op;
+
+ range = lustre_msg_buf(req->rq_reqmsg, 1, ran_size);
+ *range = *seq_ran;
+
+ req->rq_replen = lustre_msg_size(1, repsize);
+ req->rq_request_portal = MDS_SEQ_PORTAL;
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out_req, rc);
+
+ range = lustre_swab_repbuf(req, 0, sizeof(*range),
+ lustre_swab_lu_range);
+
+ LASSERT(range != NULL);
+ *seq_ran = *range;
+out_req:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
+
+/* request sequence-controller node to allocate new super-sequence. */
+int
+seq_client_alloc_super(struct lu_client_seq *seq)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
+ rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
+ SEQ_ALLOC_SUPER);
+ if (rc == 0) {
+ CWARN("SEQ-MGR(cli): allocated super-sequence "
+ "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+ seq->seq_cl_range.lr_end);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_super);
+
+/* request sequence-controller node to allocate new meta-sequence. */
+int
+seq_client_alloc_meta(struct lu_client_seq *seq)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+ rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
+ SEQ_ALLOC_META);
+ if (rc == 0) {
+ CWARN("SEQ-MGR(cli): allocated meta-sequence "
+ "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+ seq->seq_cl_range.lr_end);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_meta);
+
+/* allocate new sequence for client (llite or MDC are expected to use this) */
+int
+seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+ int rc;
+ ENTRY;
+
+ down(&seq->seq_sem);
+
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+ LASSERT(range_is_sane(&seq->seq_cl_range));
+
+ /* if we still have free sequences in meta-sequence we allocate new seq
+ * from given range. */
+ if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
+ *seqnr = seq->seq_cl_range.lr_start;
+ seq->seq_cl_range.lr_start += 1;
+ rc = 0;
+ } else {
+ /* meta-sequence is exhausted, request MDT to allocate new
+ * meta-sequence for us. */
+ rc = seq_client_alloc_meta(seq);
+ if (rc) {
+ CERROR("can't allocate new meta-sequence, "
+ "rc %d\n", rc);
+ }
+
+ *seqnr = seq->seq_cl_range.lr_start;
+ seq->seq_cl_range.lr_start += 1;
+ }
+ up(&seq->seq_sem);
+
+ if (rc == 0) {
+ CWARN("SEQ-MGR(cli): allocated sequence "
+ "["LPX64"]\n", *seqnr);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_seq);
+
+int
+seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(fid != NULL);
+ LASSERT(fid_is_sane(&seq->seq_fid));
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+
+ down(&seq->seq_sem);
+ if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
+ *fid = seq->seq_fid;
+ seq->seq_fid.f_oid += 1;
+ rc = 0;
+ } else {
+ __u64 seqnr = 0;
+
+ rc = seq_client_alloc_seq(seq, &seqnr);
+ if (rc) {
+ CERROR("can't allocate new sequence, "
+ "rc %d\n", rc);
+ GOTO(out, rc);
+ } else {
+ seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+ seq->seq_fid.f_seq = seqnr;
+ seq->seq_fid.f_ver = 0;
+
+ *fid = seq->seq_fid;
+ seq->seq_fid.f_oid += 1;
+ }
+ }
+ LASSERT(fid_is_sane(fid));
+
+ CWARN("SEQ-MGR(cli): allocated FID "DFID3"\n",
+ PFID3(fid));
+
+ EXIT;
+out:
+ up(&seq->seq_sem);
+ return rc;
+}
+EXPORT_SYMBOL(seq_client_alloc_fid);
+
+int
+seq_client_init(struct lu_client_seq *seq,
+ struct obd_export *exp,
+ int flags)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
+ LUSTRE_CLI_SEQ_SERVER));
+
+ seq->seq_flags = flags;
+ fid_zero(&seq->seq_fid);
+ sema_init(&seq->seq_sem, 1);
+
+ seq->seq_cl_range.lr_end = 0;
+ seq->seq_cl_range.lr_start = 0;
+
+ if (exp != NULL)
+ seq->seq_exp = class_export_get(exp);
+
+ if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
+ __u64 seqnr = 0;
+
+ /* client (llite or MDC) init case, we need new sequence from
+ * MDT. This will allocate new meta-sequemce first, because seq
+ * range in init state and looks the same as exhausted. */
+ rc = seq_client_alloc_seq(seq, &seqnr);
+ if (rc) {
+ CERROR("can't allocate new sequence, rc %d\n", rc);
+ GOTO(out, rc);
+ } else {
+ seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+ seq->seq_fid.f_seq = seqnr;
+ seq->seq_fid.f_ver = 0;
+ }
+
+ LASSERT(fid_is_sane(&seq->seq_fid));
+ } else {
+ /* check if this is controller node is trying to init client. */
+ if (seq->seq_exp) {
+ /* MDT uses client seq manager to talk to sequence
+ * controller, and thus, we need super-sequence. */
+ rc = seq_client_alloc_super(seq);
+ } else {
+ rc = 0;
+ }
+ }
+
+ EXIT;
+out:
+ if (rc)
+ seq_client_fini(seq);
+ else
+ CWARN("Client Sequence Manager initialized\n");
+ return rc;
+}
+EXPORT_SYMBOL(seq_client_init);
+
+void seq_client_fini(struct lu_client_seq *seq)
+{
+ ENTRY;
+ if (seq->seq_exp != NULL) {
+ class_export_put(seq->seq_exp);
+ seq->seq_exp = NULL;
+ }
+ CWARN("Client Sequence Manager finalized\n");
+ EXIT;
+}
+EXPORT_SYMBOL(seq_client_fini);
+
+#ifdef __KERNEL__
+/* server side seq mgr stuff */
+static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
+ LUSTRE_SEQ_SPACE_START,
+ LUSTRE_SEQ_SPACE_LIMIT
+};
+
+static const struct lu_range LUSTRE_SEQ_META_INIT = {
+ 0,
+ 0
+};
+
+static int
+seq_server_write_state(struct lu_server_seq *seq,
+ const struct lu_context *ctx)
+{
+ int rc = 0;
+ ENTRY;
+
+ /* XXX: here should be calling struct dt_device methods to write
+ * sequence state to backing store. */
+
+ RETURN(rc);
+}
+
+static int
+seq_server_read_state(struct lu_server_seq *seq,
+ const struct lu_context *ctx)
+{
+ int rc = -ENODATA;
+ ENTRY;
+
+ /* XXX: here should be calling struct dt_device methods to read the
+ * sequence state from backing store. */
+
+ RETURN(rc);
+}
+
+static int
+seq_server_alloc_super(struct lu_server_seq *seq,
+ struct lu_range *range)
+{
+ struct lu_range *ss_range = &seq->seq_ss_range;
+ int rc;
+ ENTRY;
+
+ if (ss_range->lr_end - ss_range->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
+ CWARN("super-sequence is going to exhauste soon. "
+ "Only can allocate "LPU64" sequences\n",
+ ss_range->lr_end - ss_range->lr_start);
+ *range = *ss_range;
+ ss_range->lr_start = ss_range->lr_end;
+ rc = 0;
+ } else if (ss_range->lr_start >= ss_range->lr_end) {
+ CERROR("super-sequence is exhausted\n");
+ rc = -ENOSPC;
+ } else {
+ range->lr_start = ss_range->lr_start;
+ ss_range->lr_start += LUSTRE_SEQ_SUPER_CHUNK;
+ range->lr_end = ss_range->lr_start;
+ rc = 0;
+ }
+
+ if (rc == 0) {
+ CWARN("SEQ-MGR(srv): allocated super-sequence "
+ "["LPX64"-"LPX64"]\n", range->lr_start,
+ range->lr_end);
+ }
+
+ RETURN(rc);
+}
+
+static int
+seq_server_alloc_meta(struct lu_server_seq *seq,
+ struct lu_range *range)
+{
+ struct lu_range *ms_range = &seq->seq_ms_range;
+ int rc;
+ ENTRY;
+
+ LASSERT(range_is_sane(ms_range));
+
+ /* XXX: here should avoid cascading RPCs using kind of async
+ * preallocation when meta-sequence is close to exhausting. */
+ if (ms_range->lr_start == ms_range->lr_end) {
+ if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
+ /* allocate new range of meta-sequences to allocate new
+ * meta-sequence from it. */
+ rc = seq_server_alloc_super(seq, ms_range);
+ } else {
+ /* request controller to allocate new super-sequence for
+ * us.*/
+ rc = seq_client_alloc_super(seq->seq_cli);
+ if (rc) {
+ CERROR("can't allocate new super-sequence, "
+ "rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ /* saving new range into allocation space. */
+ *ms_range = seq->seq_cli->seq_cl_range;
+ }
+
+ LASSERT(ms_range->lr_start != 0);
+ LASSERT(ms_range->lr_end > ms_range->lr_start);
+ } else {
+ rc = 0;
+ }
+ range->lr_start = ms_range->lr_start;
+ ms_range->lr_start += LUSTRE_SEQ_META_CHUNK;
+ range->lr_end = ms_range->lr_start;
+
+ if (rc == 0) {
+ CWARN("SEQ-MGR(srv): allocated meta-sequence "
+ "["LPX64"-"LPX64"]\n", range->lr_start,
+ range->lr_end);
+ }
+
+ RETURN(rc);
+}
+
+static int
+seq_server_handle(struct lu_server_seq *seq,
+ const struct lu_context *ctx,
+ struct lu_range *range,
+ __u32 opc)
+{
+ int rc;
+ ENTRY;
+
+ down(&seq->seq_sem);
+
+ switch (opc) {
+ case SEQ_ALLOC_SUPER:
+ rc = seq_server_alloc_super(seq, range);
+ break;
+ case SEQ_ALLOC_META:
+ rc = seq_server_alloc_meta(seq, range);
+ break;
+ default:
+ rc = -EINVAL;
+ break;
+ }
+
+ if (rc)
+ GOTO(out, rc);
+
+ rc = seq_server_write_state(seq, ctx);
+ if (rc) {
+ CERROR("can't save state, rc = %d\n",
+ rc);
+ }
+
+ EXIT;
+out:
+ up(&seq->seq_sem);
+ return rc;
+}
+
+static int
+seq_req_handle0(const struct lu_context *ctx,
+ struct lu_server_seq *seq,
+ struct ptlrpc_request *req)
+{
+ struct lu_range *in;
+ struct lu_range *out;
+ int size = sizeof(*in);
+ __u32 *opt;
+ int rc;
+ ENTRY;
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
+ RETURN(rc);
+
+ rc = -EPROTO;
+ opt = lustre_swab_reqbuf(req, 0, sizeof(*opt),
+ lustre_swab_generic_32s);
+ if (opt != NULL) {
+ in = lustre_swab_reqbuf(req, 1, sizeof(*in),
+ lustre_swab_lu_range);
+ if (in != NULL) {
+ out = lustre_msg_buf(req->rq_repmsg,
+ 0, sizeof(*out));
+ LASSERT(out != NULL);
+
+ *out = *in;
+ rc = seq_server_handle(seq, ctx, out, *opt);
+ } else {
+ CERROR("Cannot unpack seq range\n");
+ }
+ } else {
+ CERROR("Cannot unpack option\n");
+ }
+ RETURN(rc);
+}
+
+static int
+seq_req_handle(struct ptlrpc_request *req)
+{
+ int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
+ const struct lu_context *ctx;
+ struct lu_site *site;
+ int rc = -EPROTO;
+ ENTRY;
+
+ OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
+
+ ctx = req->rq_svc_thread->t_ctx;
+ LASSERT(ctx != NULL);
+ LASSERT(ctx->lc_thread == req->rq_svc_thread);
+ if (req->rq_reqmsg->opc == SEQ_QUERY) {
+ if (req->rq_export != NULL) {
+ struct obd_device *obd;
+
+ obd = req->rq_export->exp_obd;
+ site = obd->obd_lu_dev->ld_site;
+ LASSERT(site != NULL);
+
+ rc = seq_req_handle0(ctx, site->ls_server_seq, req);
+ } else {
+ CERROR("Unconnected request\n");
+ req->rq_status = -ENOTCONN;
+ GOTO(out, rc = -ENOTCONN);
+ }
+ } else {
+ CERROR("Wrong opcode: %d\n",
+ req->rq_reqmsg->opc);
+ req->rq_status = -ENOTSUPP;
+ rc = ptlrpc_error(req);
+ RETURN(rc);
+ }
+
+ EXIT;
+out:
+ target_send_reply(req, rc, fail);
+ return 0;
+}
+
+int
+seq_server_init(struct lu_server_seq *seq,
+ struct lu_client_seq *cli,
+ const struct lu_context *ctx,
+ struct dt_device *dev,
+ int flags)
+{
+ int rc;
+ ENTRY;
+
+ struct ptlrpc_service_conf seq_conf = {
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = MDS_MAXREQSIZE,
+ .psc_max_reply_size = MDS_MAXREPSIZE,
+ .psc_req_portal = MDS_SEQ_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
+ .psc_num_threads = SEQ_NUM_THREADS
+ };
+
+ LASSERT(dev != NULL);
+ LASSERT(cli != NULL);
+
+ LASSERT(flags & (LUSTRE_SRV_SEQ_CONTROLLER |
+ LUSTRE_SRV_SEQ_REGULAR));
+
+ seq->seq_dev = dev;
+ seq->seq_cli = cli;
+ seq->seq_flags = flags;
+ sema_init(&seq->seq_sem, 1);
+
+ lu_device_get(&seq->seq_dev->dd_lu_dev);
+
+ /* request backing store for saved sequence info */
+ rc = seq_server_read_state(seq, ctx);
+ if (rc == -ENODATA) {
+ /* first run, no state on disk, init all seqs */
+ if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
+ /* init super seq by start values on sequence-controller
+ * node.*/
+ seq->seq_ss_range = LUSTRE_SEQ_SUPER_INIT;
+ } else {
+ /* take super-seq from client seq mgr */
+ LASSERT(range_is_sane(&cli->seq_cl_range));
+ seq->seq_ss_range = cli->seq_cl_range;
+ }
+
+ /* init meta-sequence by start values and get ready for
+ * allocating it for clients. */
+ seq->seq_ms_range = LUSTRE_SEQ_META_INIT;
+
+ /* save init seq to backing store. */
+ rc = seq_server_write_state(seq, ctx);
+ if (rc) {
+ CERROR("can't write sequence state, "
+ "rc = %d\n", rc);
+ GOTO(out, rc);
+ }
+ } else if (rc) {
+ CERROR("can't read sequence state, rc = %d\n",
+ rc);
+ GOTO(out, rc);
+ }
+
+ seq->seq_service = ptlrpc_init_svc_conf(&seq_conf,
+ seq_req_handle,
+ LUSTRE_SEQ0_NAME,
+ seq->seq_proc_entry,
+ NULL);
+ if (seq->seq_service != NULL)
+ rc = ptlrpc_start_threads(NULL, seq->seq_service,
+ LUSTRE_SEQ0_NAME);
+ else
+ rc = -ENOMEM;
+
+ EXIT;
+
+out:
+ if (rc)
+ seq_server_fini(seq, ctx);
+ else
+ CWARN("Server Sequence Manager initialized\n");
+ return rc;
+}
+EXPORT_SYMBOL(seq_server_init);
+
+void
+seq_server_fini(struct lu_server_seq *seq,
+ const struct lu_context *ctx)
+{
+ int rc;
+
+ if (seq->seq_service != NULL) {
+ ptlrpc_unregister_service(seq->seq_service);
+ seq->seq_service = NULL;
+ }
+
+ if (seq->seq_dev != NULL) {
+ rc = seq_server_write_state(seq, ctx);
+ if (rc) {
+ CERROR("can't save sequence state, "
+ "rc = %d\n", rc);
+ }
+ lu_device_put(&seq->seq_dev->dd_lu_dev);
+ seq->seq_dev = NULL;
+ }
+
+ CWARN("Server Sequence Manager finalized\n");
+}
+EXPORT_SYMBOL(seq_server_fini);
+
+static int fid_init(void)
+{
+ ENTRY;
+ CWARN("Lustre Sequence Manager\n");
+ RETURN(0);
+}
+
+static int fid_fini(void)
+{
+ ENTRY;
+ RETURN(0);
+}
+
+static int
+__init fid_mod_init(void)
+
+{
+ /* init caches if any */
+ fid_init();
+ return 0;
+}
+
+static void
+__exit fid_mod_exit(void)
+{
+ /* free caches if any */
+ fid_fini();
+ return;
+}
+
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Lustre FID Module");
+MODULE_LICENSE("GPL");
+
+cfs_module(fid, "0.0.3", fid_mod_init, fid_mod_exit);
+#endif
--- /dev/null
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * fid/fid_internal.h
+ *
+ * Copyright (C) 2006 Cluster File Systems, Inc.
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+#ifndef _FID_INTERNAL_H
+#define _FID_INTERNAL_H
+
+#define SEQ_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
+
+#endif
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * lustre/fid/fid_misc.c
+ * lustre/fid/fid_lib.c
* Miscellaneous fid functions.
*
* Copyright (c) 2006 Cluster File Systems, Inc.
* license text for more details.
*/
-#include <linux/module.h>
-#include <lustre/lustre_idl.h>
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FID
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
-#include <obd.h>
#include <lustre_fid.h>
void fid_to_le(struct lu_fid *dst, const struct lu_fid *src)
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * lustre/fid/fid_seq.c
- * Lustre File Id (fid)
- *
- * Copyright (c) 2006 Cluster File Systems, Inc.
- * Author: Yury Umanets <umka@clusterfs.com>
- *
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
- *
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
- *
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
- *
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
- */
-
-#include <linux/module.h>
-
-#include <lustre/lustre_idl.h>
-#include <obd.h>
-#include <lustre_fid.h>
-
-/* sequence manager initialization/finalization stuff */
-struct lu_seq_mgr *seq_mgr_init(struct lu_seq_mgr_ops *ops,
- void *opaque)
-{
- struct lu_seq_mgr *mgr;
- ENTRY;
-
- OBD_ALLOC_PTR(mgr);
- if (!mgr)
- RETURN(NULL);
-
- sema_init(&mgr->m_seq_sem, 1);
- mgr->m_opaque = opaque;
- mgr->m_ops = ops;
-
- RETURN(mgr);
-}
-EXPORT_SYMBOL(seq_mgr_init);
-
-void seq_mgr_fini(struct lu_seq_mgr *mgr)
-{
- OBD_FREE_PTR(mgr);
-}
-EXPORT_SYMBOL(seq_mgr_fini);
-
-int seq_mgr_write(const struct lu_context *ctx, struct lu_seq_mgr *mgr)
-{
- ENTRY;
- RETURN(mgr->m_ops->smo_write(ctx, mgr->m_opaque, &mgr->m_seq));
-}
-EXPORT_SYMBOL(seq_mgr_write);
-
-int seq_mgr_read(const struct lu_context *ctx, struct lu_seq_mgr *mgr)
-{
- ENTRY;
- RETURN(mgr->m_ops->smo_read(ctx, mgr->m_opaque, &mgr->m_seq));
-}
-EXPORT_SYMBOL(seq_mgr_read);
-
-/* manager functionality stuff */
-int seq_mgr_alloc(const struct lu_context *ctx,
- struct lu_seq_mgr *mgr,
- __u64 *seq)
-{
- int rc;
- ENTRY;
-
- LASSERT(mgr != NULL);
- LASSERT(seq != NULL);
-
- down(&mgr->m_seq_sem);
- if (mgr->m_seq > mgr->m_seq_last) {
- /* new range of seqs should be got from master */
- rc = -EOPNOTSUPP;
- } else {
- *seq = mgr->m_seq;
- mgr->m_seq++;
-
- rc = seq_mgr_write(ctx, mgr);
- }
- up(&mgr->m_seq_sem);
- RETURN(rc);
-}
-EXPORT_SYMBOL(seq_mgr_alloc);
-
-/* initialize meta-sequence. First of all try to get it from lower layer,
- * falling down to back store one. In the case this is first run and there is
- * not meta-sequence initialized yet - store it to backstore. */
-int seq_mgr_setup(const struct lu_context *ctx, struct lu_seq_mgr *mgr)
-{
- int rc = 0;
- ENTRY;
-
- /* set seq range */
- mgr->m_seq_last = mgr->m_seq + LUSTRE_SEQ_RANGE;
-
- /* allocate next seq after root one */
- mgr->m_seq += LUSTRE_ROOT_FID_SEQ + 1;
-
- rc = seq_mgr_read(ctx, mgr);
- if (rc == -ENODATA) {
- CWARN("initialize sequence by defaut ["LPU64"]\n", mgr->m_seq);
-
- /* initialize new sequence config as it is not yet created. */
- rc = seq_mgr_write(ctx, mgr);
- }
-
- EXIT;
- if (rc == 0)
- CWARN("using start sequence: ["LPU64"]\n", mgr->m_seq);
- return rc;
-}
-EXPORT_SYMBOL(seq_mgr_setup);
-
-static int __init fid_mod_init(void)
-{
- /* some stuff will be here (cache initializing, etc.) */
- return 0;
-}
-
-static void __exit fid_mod_exit(void)
-{
- /* some stuff will be here */
-}
-
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre FID Module");
-MODULE_LICENSE("GPL");
-
-cfs_module(fid, "0.0.2", fid_mod_init, fid_mod_exit);
spinlock_t fld_lock;
int fld_hash_mask;
};
+
/*XXX use linked list temp for fld in this prototype*/
struct fld_list {
struct list_head fld_list;
spinlock_t fld_lock;
};
-struct fld_item{
+
+struct fld_item {
struct list_head fld_list;
__u64 fld_seq;
__u64 fld_mds;
*/
struct lu_fld *ls_fld;
+ /*
+ * Server Seq Manager
+ */
+ struct lu_server_seq *ls_server_seq;
+
+ /*
+ * Clienbt Seq Manager
+ */
+ struct lu_client_seq *ls_client_seq;
+
+ /* sequence controller node */
+ struct obd_export *ls_controller;
+
/* statistical counters. Protected by nothing, races are accepted. */
struct {
__u32 s_created;
#define MGS_REPLY_PORTAL 27
#define OST_REQUEST_PORTAL 28
#define MDS_FLD_PORTAL 29
+#define MDS_SEQ_PORTAL 30
#define SVC_KILLED 1
#define SVC_EVENT 2
#define LUSTRE_LOG_VERSION 0x00050000
#define LUSTRE_MGS_VERSION 0x00060000
+struct lu_range {
+ __u64 lr_start;
+ __u64 lr_end;
+};
+
+static inline int range_is_sane(struct lu_range *r)
+{
+ if (r->lr_end >= r->lr_start)
+ return 1;
+ return 0;
+}
+
struct lu_fid {
__u64 f_seq; /* holds fid sequence. Lustre should support 2 ^ 64
* objects, thus even if one sequence has one object we
LUSTRE_ROOT_FID_SEQ = 1ULL, /* XXX: should go into mkfs. */
LUSTRE_ROOT_FID_OID = 2UL, /* XXX: should go into mkfs. */
- /* maximal objects in sequence */
- LUSTRE_FID_SEQ_WIDTH = 10000,
-
- /* range of seqs for one MDS */
- LUSTRE_SEQ_RANGE = 1000,
-
/* initial fid id value */
LUSTRE_FID_INIT_OID = 1UL
};
return seq != 0;
}
+static inline void fid_zero(struct lu_fid *fid)
+{
+ memset(fid, 0, sizeof(*fid));
+}
+
static inline int fid_is_sane(const struct lu_fid *fid)
{
return
fid_oid(fid), \
fid_ver(fid)
-extern void lustre_swab_lu_fid (struct lu_fid *fid);
+extern void lustre_swab_lu_fid(struct lu_fid *fid);
+extern void lustre_swab_lu_range(struct lu_range *range);
static inline int lu_fid_eq(const struct lu_fid *f0,
const struct lu_fid *f1)
__u32 ocd_index; /* LOV index to connect to */
__u32 ocd_unused;
__u64 ocd_ibits_known; /* inode bits this client understands */
- __u64 ocd_seq; /* sequence info for client */
__u64 padding2; /* also fix lustre_swab_connect */
__u64 padding3; /* also fix lustre_swab_connect */
__u64 padding4; /* also fix lustre_swab_connect */
__u64 padding5; /* also fix lustre_swab_connect */
+ __u64 padding6; /* also fix lustre_swab_connect */
};
extern void lustre_swab_connect(struct obd_connect_data *ocd);
FLD_FIRST_OPC = FLD_QUERY
};
+enum seq_rpc_opc {
+ SEQ_QUERY = 700,
+ SEQ_LAST_OPC,
+ SEQ_FIRST_OPC = SEQ_QUERY
+};
+
+enum seq_op {
+ SEQ_ALLOC_SUPER = 0,
+ SEQ_ALLOC_META = 1
+};
+
/*
* LOV data structures
*/
#include <libcfs/list.h>
#include <libcfs/kp30.h>
+struct lu_site;
struct lu_context;
-struct lu_seq_mgr_ops {
- int (*smo_read) (const struct lu_context *, void *opaque, __u64 *);
- int (*smo_write) (const struct lu_context *, void *opaque, __u64 *);
+
+/* start seq number */
+#define LUSTRE_SEQ_SPACE_START 0x400
+
+/* maximal posible seq number */
+#define LUSTRE_SEQ_SPACE_LIMIT ((__u64)~0ULL)
+
+/* this is how may FIDs may be allocated in one sequence. */
+#define LUSTRE_SEQ_WIDTH 0x00000000000002800
+
+/* how many sequences may be allocate for meta-sequence (this is 10240
+ * sequences). */
+#define LUSTRE_SEQ_META_CHUNK 0x00000000000002800
+
+/* how many sequences may be allocate for super-sequence (this is 10240 * 10240
+ * sequences), what means that one alloaction for super-sequence allows to
+ * allocate 10240 meta-sequences and each of them may have 10240 sequences. */
+#define LUSTRE_SEQ_SUPER_CHUNK (LUSTRE_SEQ_META_CHUNK * LUSTRE_SEQ_META_CHUNK)
+
+/* client sequence manager interface */
+struct lu_client_seq {
+ /* sequence-controller export. */
+ struct obd_export *seq_exp;
+ struct semaphore seq_sem;
+
+ /* different flags. */
+ int seq_flags;
+
+ /* range of allowed for allocation sequeces. When using lu_client_seq on
+ * clients, this contains meta-sequence range. And for servers this
+ * contains super-sequence range. */
+ struct lu_range seq_cl_range;
+
+ /* seq related proc */
+ struct proc_dir_entry *seq_proc_entry;
+
+ /* this holds last allocated fid in last obtained seq */
+ struct lu_fid seq_fid;
};
-struct lu_seq_mgr {
- /* seq management fields */
- struct semaphore m_seq_sem;
- /* each MDS has own range of seqs ended with this value
- * if it is overflowed the new one should be got from master node */
- __u64 m_seq_last;
- /* last allocated seq */
- __u64 m_seq;
- /* ops related stuff */
- void *m_opaque;
- struct lu_seq_mgr_ops *m_ops;
+#ifdef __KERNEL__
+/* server sequence manager interface */
+struct lu_server_seq {
+ /* super-sequence range, all super-sequences for other servers are
+ * allocated from it. */
+ struct lu_range seq_ss_range;
+
+ /* meta-sequence range, all meta-sequences for clients are allocated
+ * from it. */
+ struct lu_range seq_ms_range;
+
+ /* device for server side seq manager needs (saving sequences to backing
+ * store). */
+ struct dt_device *seq_dev;
+
+ /* different flags: LUSTRE_SEQ_CONTROLLER, etc. */
+ int seq_flags;
+
+ /* seq related proc */
+ struct proc_dir_entry *seq_proc_entry;
+
+ /* server side seq service */
+ struct ptlrpc_service *seq_service;
+
+ /* client interafce to request controller */
+ struct lu_client_seq *seq_cli;
+
+ /* semaphore for protecting allocation */
+ struct semaphore seq_sem;
};
+#endif
-/* init/fini methods */
-struct lu_seq_mgr *seq_mgr_init(struct lu_seq_mgr_ops *, void *);
-void seq_mgr_fini(struct lu_seq_mgr *);
-
-/* seq management methods */
-int seq_mgr_setup(const struct lu_context *, struct lu_seq_mgr *);
-int seq_mgr_read(const struct lu_context *, struct lu_seq_mgr *);
-int seq_mgr_write(const struct lu_context *, struct lu_seq_mgr *);
-int seq_mgr_alloc(const struct lu_context *, struct lu_seq_mgr *, __u64 *);
-int seq_mgr_range_alloc(const struct lu_context *,
- struct lu_seq_mgr *, __u64 *);
-struct lu_site;
-#if 0
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid);
-#else
-static inline int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+/* client seq mgr flags */
+#define LUSTRE_CLI_SEQ_CLIENT (1 << 0)
+#define LUSTRE_CLI_SEQ_SERVER (1 << 1)
+
+#ifdef __KERNEL__
+/* server seq mgr flags */
+#define LUSTRE_SRV_SEQ_CONTROLLER (1 << 0)
+#define LUSTRE_SRV_SEQ_REGULAR (1 << 1)
+
+int seq_server_init(struct lu_server_seq *seq,
+ struct lu_client_seq *cli,
+ const struct lu_context *ctx,
+ struct dt_device *dev,
+ int flags);
+
+void seq_server_fini(struct lu_server_seq *seq,
+ const struct lu_context *ctx) ;
+#endif
+
+int seq_client_init(struct lu_client_seq *seq,
+ struct obd_export *exp,
+ int flags);
+
+void seq_client_fini(struct lu_client_seq *seq);
+
+int seq_client_alloc_super(struct lu_client_seq *seq);
+int seq_client_alloc_meta(struct lu_client_seq *seq);
+
+int seq_client_alloc_seq(struct lu_client_seq *seq,
+ __u64 *seqnr);
+int seq_client_alloc_fid(struct lu_client_seq *seq,
+ struct lu_fid *fid);
+
+/* Fids common stuff */
+static inline int fid_is_local(struct lu_site *site,
+ const struct lu_fid *fid)
{
+ /* XXX: fix this when fld is ready. */
return 1;
}
-#endif
void fid_to_le(struct lu_fid *dst, const struct lu_fid *src);
-
#endif /* __LINUX_OBD_CLASS_H */
num_physpages >> (25 - PAGE_SHIFT)), 2UL)
#define FLD_NUM_THREADS max(min_t(unsigned long, MDT_MAX_THREADS, \
num_physpages >> (25 - PAGE_SHIFT)), 2UL)
+#define SEQ_NUM_THREADS max(min_t(unsigned long, MDT_MAX_THREADS, \
+ num_physpages >> (25 - PAGE_SHIFT)), 2UL)
#define MDS_MAX_THREADS 512UL
#define MDS_DEF_THREADS max(2UL, min_t(unsigned long, 32, \
/* used by quotacheck */
int cl_qchk_stat; /* quotacheck stat of the peer */
- /* this holds last allocated fid in last obtained seq */
- struct lu_fid cl_fid;
- spinlock_t cl_fid_lock;
+ /* sequence manager */
+ struct lu_client_seq *cl_seq;
};
#define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
#define LUSTRE_OPC_MKDIR (1 << 0)
#define LUSTRE_OPC_SYMLINK (1 << 1)
-#define LUSTRE_OPC_MKNODE (1 << 2)
+#define LUSTRE_OPC_MKNOD (1 << 2)
#define LUSTRE_OPC_CREATE (1 << 3)
struct lu_placement_hint {
#define LUSTRE_MDD0_NAME "mdd0"
#define LUSTRE_OSD0_NAME "osd0"
#define LUSTRE_FLD0_NAME "fld0"
+#define LUSTRE_SEQ0_NAME "seq0"
#define LUSTRE_MDC0_NAME "mdc0"
#define LUSTRE_MDC_NAME "mdc"
struct obd_connect_data *ocd);
int (*o_disconnect)(struct obd_export *exp);
- /* may be later these should be moved into separate fid_ops */
+ /* maybe later these should be moved into separate fid_ops */
+ int (*o_fid_init)(struct obd_export *exp);
+ int (*o_fid_fini)(struct obd_export *exp);
+
int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid,
struct lu_placement_hint *hint);
RETURN(rc);
}
+static inline int obd_fid_init(struct obd_export *exp)
+{
+ int rc;
+ ENTRY;
+
+ if (OBP(exp->exp_obd, fid_init) == NULL)
+ RETURN(-ENOTSUPP);
+
+ OBD_COUNTER_INCREMENT(exp->exp_obd, fid_init);
+
+ rc = OBP(exp->exp_obd, fid_init)(exp);
+ RETURN(rc);
+}
+
+static inline int obd_fid_fini(struct obd_export *exp)
+{
+ int rc;
+ ENTRY;
+
+ if (OBP(exp->exp_obd, fid_fini) == NULL)
+ RETURN(-ENOTSUPP);
+
+ OBD_COUNTER_INCREMENT(exp->exp_obd, fid_fini);
+
+ rc = OBP(exp->exp_obd, fid_fini)(exp);
+ RETURN(rc);
+}
+
static inline int obd_fid_alloc(struct obd_export *exp,
struct lu_fid *fid,
struct lu_placement_hint *hint)
#define OBD_FAIL_MGS 0x900
#define OBD_FAIL_MGS_ALL_REQUEST_NET 0x901
#define OBD_FAIL_MGS_ALL_REPLY_NET 0x902
+#define OBD_FAIL_SEQ 0x1000
+#define OBD_FAIL_SEQ_ALL_REQUEST_NET 0x1001
+#define OBD_FAIL_SEQ_ALL_REPLY_NET 0x1002
/* preparation for a more advanced failure testbed (not functional yet) */
#define OBD_FAIL_MASK_SYS 0x0000FF00
cli->cl_r_in_flight = 0;
cli->cl_w_in_flight = 0;
- memset(&cli->cl_fid, 0, sizeof(struct lu_fid));
- spin_lock_init(&cli->cl_fid_lock);
-
spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
spin_lock_init(&cli->cl_read_page_hist.oh_lock);
imp->imp_connect_flags_orig = data->ocd_connect_flags;
}
- /* zero out sequence to check it later for validness */
- ocd->ocd_seq = 0;
-
rc = ptlrpc_connect_import(imp, NULL);
if (rc != 0) {
LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
$(top_builddir)/lustre/lov/liblov.a \
$(top_builddir)/lustre/obdecho/libobdecho.a \
$(top_builddir)/lustre/osc/libosc.a \
+ $(top_builddir)/lustre/fid/libfid.a \
$(top_builddir)/lustre/mdc/libmdc.a \
$(top_builddir)/lustre/ptlrpc/libptlrpc.a \
$(top_builddir)/lustre/obdclass/liblustreclass.a \
build_obj_list ../obdecho libobdecho.a
build_obj_list ../osc libosc.a
build_obj_list ../mdc libmdc.a
+build_obj_list ../fid libfid.a
build_obj_list ../ptlrpc libptlrpc.a
build_obj_list ../obdclass liblustreclass.a
build_obj_list ../lvfs liblvfs.a
#include "lutil.h"
#include "llite_lib.h"
#include <lustre_ver.h>
+#include <lustre_fid.h>
+
+static int llu_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct lu_placement_hint *hint)
+{
+ int rc;
+ ENTRY;
+ rc = obd_fid_alloc(exp, fid, hint);
+ RETURN(rc);
+}
/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid)
+int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
+ struct lu_placement_hint *hint)
{
ENTRY;
+ RETURN(llu_fid_alloc(sbi->ll_md_exp, fid, hint));
+}
- if (sbi->ll_fid.f_oid < LUSTRE_FID_SEQ_WIDTH) {
- sbi->ll_fid.f_oid += 1;
- *fid = sbi->ll_fid;
- } else {
- CERROR("sequence is exhausted. Switching to "
- "new one is not yet implemented\n");
- RETURN(-ERANGE);
+/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
+int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
+ struct lu_placement_hint *hint)
+{
+ ENTRY;
+ RETURN(llu_fid_alloc(sbi->ll_dt_exp, fid, hint));
+}
+
+static int llu_fid_init(struct obd_export *exp)
+{
+ int rc;
+ ENTRY;
+
+ rc = obd_fid_init(exp);
+ if (rc) {
+ CERROR("cannot initialize FIDs framework, "
+ "rc %d\n", rc);
+ RETURN(rc);
}
-
- RETURN(0);
+
+ RETURN(rc);
}
-/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid)
+int llu_fid_md_init(struct llu_sb_info *sbi)
+{
+ ENTRY;
+ RETURN(llu_fid_init(sbi->ll_md_exp));
+}
+
+int llu_fid_dt_init(struct llu_sb_info *sbi)
+{
+ ENTRY;
+ RETURN(llu_fid_init(sbi->ll_dt_exp));
+}
+
+static int llu_fid_fini(struct obd_export *exp)
+{
+ int rc;
+ ENTRY;
+
+ rc = obd_fid_fini(exp);
+ if (rc) {
+ CERROR("cannot finalize FIDs framework, "
+ "rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
+}
+
+int llu_fid_md_fini(struct llu_sb_info *sbi)
+{
+ ENTRY;
+ RETURN(llu_fid_fini(sbi->ll_md_exp));
+}
+
+int llu_fid_dt_fini(struct llu_sb_info *sbi)
{
ENTRY;
- RETURN(-EOPNOTSUPP);
+ RETURN(llu_fid_fini(sbi->ll_dt_exp));
}
/* build inode number on passed @fid */
-unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, struct lu_fid *fid)
+unsigned long llu_fid_build_ino(struct llu_sb_info *sbi,
+ struct lu_fid *fid)
{
unsigned long ino;
ENTRY;
/* very stupid and having many downsides inode allocation algorithm
* based on fid. */
- ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid);
+ ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(fid);
RETURN(ino);
}
struct obd_uuid ll_mds_uuid;
struct obd_uuid ll_mds_peer_uuid;
char *ll_instance;
-
- struct lu_fid ll_fid;
};
#define LL_SBI_NOLCK 0x1
char *buf, size_t nbytes);
/* liblustre/llite_fid.c*/
-int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid);
-int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid);
-unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, struct lu_fid *fid);
+int llu_fid_md_init(struct llu_sb_info *sbi);
+int llu_fid_dt_init(struct llu_sb_info *sbi);
+
+int llu_fid_md_fini(struct llu_sb_info *sbi);
+int llu_fid_dt_fini(struct llu_sb_info *sbi);
+
+int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
+ struct lu_placement_hint *hint);
+
+int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
+ struct lu_placement_hint *hint);
+
+unsigned long llu_fid_build_ino(struct llu_sb_info *sbi,
+ struct lu_fid *fid);
/* ext2 related */
#define EXT2_NAME_LEN (255)
icbd.icbd_parent = parent;
/* allocate new fid for child */
- if (it->it_op == IT_OPEN || it->it_op == IT_CREAT) {
- rc = llu_fid_md_alloc(llu_i2sbi(parent), &op_data.fid2);
+ if (it->it_op & IT_CREAT ||
+ (it->it_op & IT_OPEN && it->it_create_mode & O_CREAT)) {
+ struct lu_placement_hint hint = { .ph_pname = NULL,
+ .ph_cname = &pnode->p_base->pb_name,
+ .ph_opc = LUSTRE_OPC_CREATE };
+
+ rc = llu_fid_md_alloc(llu_i2sbi(parent), &op_data.fid2, &hint);
if (rc) {
CERROR("can't allocate new fid, rc %d\n", rc);
LBUG();
struct ptlrpc_request *request = NULL;
struct llu_sb_info *sbi = llu_i2sbi(dir);
struct md_op_data op_data = { { 0 } };
+ struct lu_placement_hint hint = {
+ .ph_pname = NULL,
+ .ph_cname = qstr,
+ .ph_opc = LUSTRE_OPC_SYMLINK
+ };
int err = -EMLINK;
ENTRY;
RETURN(err);
/* allocate new fid */
- err = llu_fid_md_alloc(sbi, &op_data.fid2);
+ err = llu_fid_md_alloc(sbi, &op_data.fid2, &hint);
if (err) {
CERROR("can't allocate new fid, rc %d\n", err);
RETURN(err);
struct llu_sb_info *sbi = llu_i2sbi(dir);
struct md_op_data op_data = { { 0 } };
int err = -EMLINK;
+ struct lu_placement_hint hint = {
+ .ph_pname = NULL,
+ .ph_cname = &pno->p_base->pb_name,
+ .ph_opc = LUSTRE_OPC_MKNOD
+ };
ENTRY;
liblustre_wait_event(0);
case S_IFIFO:
case S_IFSOCK:
/* allocate new fid */
- err = llu_fid_md_alloc(sbi, &op_data.fid2);
+ err = llu_fid_md_alloc(sbi, &op_data.fid2, &hint);
if (err) {
CERROR("can't allocate new fid, rc %d\n", err);
RETURN(err);
struct ptlrpc_request *request = NULL;
struct intnl_stat *st = llu_i2stat(dir);
struct md_op_data op_data = { { 0 } };
+ struct lu_placement_hint hint = {
+ .ph_pname = NULL,
+ .ph_cname = qstr,
+ .ph_opc = LUSTRE_OPC_MKDIR
+ };
+
int err = -EMLINK;
ENTRY;
RETURN(err);
/* allocate new fid */
- err = llu_fid_md_alloc(llu_i2sbi(dir), &op_data.fid2);
+ err = llu_fid_md_alloc(llu_i2sbi(dir), &op_data.fid2, &hint);
if (err) {
CERROR("can't allocate new fid, rc %d\n", err);
RETURN(err);
if (err)
GOTO(out_mdc, err);
- /* initializing ->ll_fid. It is known that root object has separate
- * sequence, so that we use what MDS returned to us and do not check if
- * f_oid collides with root or not. */
- sbi->ll_fid.f_seq = ocd.ocd_seq;
- sbi->ll_fid.f_oid = LUSTRE_FID_INIT_OID;
+ err = llu_fid_md_init(sbi);
+ if (err)
+ GOTO(out_mdc, err);
/*
* FIXME fill fs stat data into sbi here!!! FIXME
obd = class_name2obd(osc);
if (!obd) {
CERROR("OSC %s: not setup or attached\n", osc);
- GOTO(out_mdc, err = -EINVAL);
+ GOTO(out_md_fid, err = -EINVAL);
}
obd_set_info_async(obd->obd_self_export, strlen("async"), "async",
sizeof(async), &async, NULL);
sbi->ll_dt_exp = class_conn2export(&osc_conn);
sbi->ll_lco.lco_flags = ocd.ocd_connect_flags;
+ err = llu_fid_dt_init(sbi);
+ if (err)
+ GOTO(out_osc, err);
+
llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
err = md_getstatus(sbi->ll_md_exp, &rootfid);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
- GOTO(out_osc, err);
+ GOTO(out_dt_fid, err);
}
CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid));
sbi->ll_root_fid = rootfid;
_sysio_i_gone(root);
out_request:
ptlrpc_req_finished(request);
+out_dt_fid:
+ llu_fid_dt_fini(sbi);
out_osc:
obd_disconnect(sbi->ll_dt_exp);
+out_md_fid:
+ llu_fid_md_fini(sbi);
out_mdc:
obd_disconnect(sbi->ll_md_exp);
out_free:
#include <linux/random.h>
#include <linux/version.h>
+#include <lustre_fid.h>
#include <lustre_lite.h>
#include <lustre_ha.h>
#include <lustre_ver.h>
RETURN(ll_fid_alloc(sbi->ll_dt_exp, fid, hint));
}
+static int ll_fid_init(struct obd_export *exp)
+{
+ int rc;
+ ENTRY;
+
+ rc = obd_fid_init(exp);
+ if (rc) {
+ CERROR("cannot initialize FIDs framework, "
+ "rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
+}
+
+int ll_fid_md_init(struct ll_sb_info *sbi)
+{
+ ENTRY;
+ RETURN(ll_fid_init(sbi->ll_md_exp));
+}
+
+int ll_fid_dt_init(struct ll_sb_info *sbi)
+{
+#if 0
+ ENTRY;
+ RETURN(ll_fid_init(sbi->ll_dt_exp));
+#endif
+ /* XXX: enable this again when OSD is starting sequence-management
+ * service. */
+ ENTRY;
+ RETURN(0);
+}
+
+static int ll_fid_fini(struct obd_export *exp)
+{
+ int rc;
+ ENTRY;
+
+ rc = obd_fid_fini(exp);
+ if (rc) {
+ CERROR("cannot finalize FIDs framework, "
+ "rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
+}
+
+int ll_fid_md_fini(struct ll_sb_info *sbi)
+{
+ ENTRY;
+ RETURN(ll_fid_fini(sbi->ll_md_exp));
+}
+
+int ll_fid_dt_fini(struct ll_sb_info *sbi)
+{
+ ENTRY;
+ RETURN(ll_fid_fini(sbi->ll_dt_exp));
+}
+
/* build inode number on passed @fid */
-ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid)
+ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
+ struct lu_fid *fid)
{
ino_t ino;
ENTRY;
/* very stupid and having many downsides inode allocation algorithm
* based on fid. */
- ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid);
+ ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(fid);
RETURN(ino);
}
int ll_removexattr(struct dentry *dentry, const char *name);
/* llite/llite_fid.c*/
+int ll_fid_md_init(struct ll_sb_info *sbi);
+int ll_fid_dt_init(struct ll_sb_info *sbi);
+
+int ll_fid_md_fini(struct ll_sb_info *sbi);
+int ll_fid_dt_fini(struct ll_sb_info *sbi);
+
int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
struct lu_placement_hint *hint);
strlen(sbi2mdc(sbi)->cl_target_uuid.uuid));
#endif
+ /* init FIDs framework */
+ err = ll_fid_md_init(sbi);
+ if (err) {
+ CERROR("can't init FIDs framework, rc %d\n", err);
+ GOTO(out_mdc, err);
+ }
+
obd = class_name2obd(osc);
if (!obd) {
CERROR("OSC %s: not setup or attached\n", osc);
- GOTO(out_mdc, err = -ENODEV);
+ GOTO(out_md_fid, err = -ENODEV);
}
data->ocd_connect_flags =
GOTO(out_osc, -ENOMEM);
}
+ /* init FIDs framework */
+ err = ll_fid_dt_init(sbi);
+ if (err) {
+ CERROR("can't init FIDs framework, rc %d\n", err);
+ GOTO(out_osc, err);
+ }
+
err = md_getstatus(sbi->ll_md_exp, &rootfid);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
- GOTO(out_osc, err);
+ GOTO(out_dt_fid, err);
}
CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid));
sbi->ll_root_fid = rootfid;
GOTO(out_osc, err);
}
- LASSERT(fid_oid(&sbi->ll_root_fid) != 0);
+ LASSERT(fid_is_sane(&sbi->ll_root_fid));
root = ll_iget(sb, ll_fid_build_ino(sbi, &sbi->ll_root_fid), &md);
ptlrpc_req_finished(request);
out_root:
if (root)
iput(root);
+out_dt_fid:
+ obd_fid_fini(sbi->ll_dt_exp);
out_osc:
obd_disconnect(sbi->ll_dt_exp);
+out_md_fid:
+ obd_fid_fini(sbi->ll_md_exp);
out_mdc:
obd_disconnect(sbi->ll_md_exp);
out:
prune_deathrow(sbi, 0);
list_del(&sbi->ll_conn_chain);
+ ll_fid_dt_fini(sbi);
obd_disconnect(sbi->ll_dt_exp);
lprocfs_unregister_mountpoint(sbi);
sbi->ll_proc_root = NULL;
}
+ ll_fid_md_fini(sbi);
obd_disconnect(sbi->ll_md_exp);
lustre_throw_orphan_dentries(sb);
struct inode *inode = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct md_op_data op_data = { { 0 } };
+ struct lu_placement_hint hint = {
+ .ph_pname = NULL,
+ .ph_cname = name,
+ .ph_opc = LUSTRE_OPC_MKNOD
+ };
int err;
ENTRY;
case S_IFBLK:
case S_IFIFO:
case S_IFSOCK:
+ err = ll_fid_md_alloc(sbi, &op_data.fid2, &hint);
+ if (err)
+ break;
ll_prepare_md_op_data(&op_data, dir, NULL, name->name,
name->len, 0);
err = md_create(sbi->ll_md_exp, &op_data, NULL, 0, mode,
#include <lustre/lustre_idl.h>
#include <obd_support.h>
+#include <lustre_fid.h>
#include <lustre_lib.h>
#include <lustre_net.h>
#include <lustre_dlm.h>
LASSERT(fid_is_sane(fid));
/* temporary hack until fld will works */
- rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+ rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
RETURN(rc);
}
RETURN(0);
}
+static int lmv_fid_init(struct obd_export *exp)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int i, rc = 0;
+ ENTRY;
+
+ for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL)
+ continue;
+
+ rc = obd_fid_init(lmv->tgts[i].ltd_exp);
+ if (rc)
+ RETURN(rc);
+ }
+ RETURN(rc);
+}
+
+static int lmv_fid_fini(struct obd_export *exp)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int i, rc = 0;
+ ENTRY;
+
+ for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL)
+ continue;
+
+ rc = obd_fid_fini(lmv->tgts[i].ltd_exp);
+ if (rc)
+ break;
+ }
+ RETURN(rc);
+}
+
static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
struct lu_placement_hint *hint)
{
.o_packmd = lmv_packmd,
.o_unpackmd = lmv_unpackmd,
.o_notify = lmv_notify,
+ .o_fid_init = lmv_fid_init,
+ .o_fid_fini = lmv_fid_fini,
.o_fid_alloc = lmv_fid_alloc,
.o_fid_delete = lmv_fid_delete,
.o_iocontrol = lmv_iocontrol
#include <obd_class.h>
#include <lustre_dlm.h>
+#include <lustre_fid.h>
#include <lustre_mds.h> /* for LUSTRE_POSIX_ACL_MAX_SIZE */
#include <md_object.h>
#include <lprocfs_status.h>
RETURN(rc);
}
-static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
- struct lu_placement_hint *hint)
+static int mdc_fid_init(struct obd_export *exp)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
- int rc = 0;
+ int rc;
ENTRY;
- LASSERT(fid != NULL);
- LASSERT(hint != NULL);
- LASSERT(fid_seq_is_sane(fid_seq(&cli->cl_fid)));
+ OBD_ALLOC_PTR(cli->cl_seq);
+ if (cli->cl_seq == NULL)
+ RETURN(-ENOMEM);
- spin_lock(&cli->cl_fid_lock);
- if (fid_oid(&cli->cl_fid) < LUSTRE_FID_SEQ_WIDTH) {
- cli->cl_fid.f_oid += 1;
- *fid = cli->cl_fid;
- } else {
- CERROR("sequence is exhausted. Switching to "
- "new one is not yet implemented\n");
- rc = -ERANGE;
+ /* init client side of sequence-manager */
+ rc = seq_client_init(cli->cl_seq, exp,
+ LUSTRE_CLI_SEQ_CLIENT);
+ if (rc) {
+ OBD_FREE_PTR(cli->cl_seq);
+ cli->cl_seq = NULL;
}
- spin_unlock(&cli->cl_fid_lock);
RETURN(rc);
}
+static int mdc_fid_fini(struct obd_export *exp)
+{
+ struct client_obd *cli = &exp->exp_obd->u.cli;
+ ENTRY;
+
+ if (cli->cl_seq != NULL) {
+ seq_client_fini(cli->cl_seq);
+ OBD_FREE_PTR(cli->cl_seq);
+ cli->cl_seq = NULL;
+ }
+
+ RETURN(0);
+}
+
+static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct lu_placement_hint *hint)
+{
+ struct client_obd *cli = &exp->exp_obd->u.cli;
+ struct lu_client_seq *seq = cli->cl_seq;
+
+ ENTRY;
+ RETURN(seq_client_alloc_fid(seq, fid));
+}
+
static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
{
struct client_obd *cli = &obd->u.cli;
.o_statfs = mdc_statfs,
.o_pin = mdc_pin,
.o_unpin = mdc_unpin,
+ .o_fid_init = mdc_fid_init,
+ .o_fid_fini = mdc_fid_fini,
.o_fid_alloc = mdc_fid_alloc,
.o_import_event = mdc_import_event,
.o_llog_init = mdc_llog_init,
req = mdt_info_req(info);
result = target_handle_connect(req, mdt_handle);
if (result == 0) {
- struct obd_connect_data *data;
-
LASSERT(req->rq_export != NULL);
info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
-
- /*
- * XXX: this is incorrect, because target_handle_connect()
- * accessed and *swabbed* connect data bypassing
- * capsule. Correct fix is to switch everything to the new
- * req-layout interface.
- */
- data = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
-
- result = seq_mgr_alloc(info->mti_ctxt,
- info->mti_mdt->mdt_seq_mgr,
- &data->ocd_seq);
}
return result;
}
RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
}
-static int mdt_seq_mgr_hpr(const struct lu_context *ctx,
- void *opaque, __u64 *seq, int mode)
+/*
+ * Seq wrappers
+ */
+static int mdt_seq_init(const struct lu_context *ctx,
+ struct mdt_device *m)
{
- struct mdt_device *m = opaque;
+ struct lu_site *ls;
int rc;
ENTRY;
- rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
- seq, sizeof(*seq), mode);
+ ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+ OBD_ALLOC_PTR(ls->ls_client_seq);
+
+ if (ls->ls_client_seq != NULL) {
+ rc = seq_client_init(ls->ls_client_seq,
+ ls->ls_controller,
+ LUSTRE_CLI_SEQ_SERVER);
+ } else
+ rc = -ENOMEM;
+
+ if (rc)
+ RETURN(rc);
+
+ OBD_ALLOC_PTR(ls->ls_server_seq);
+
+ if (ls->ls_server_seq != NULL) {
+ int flags;
+
+ flags = (ls->ls_node_id == 0) ?
+ LUSTRE_SRV_SEQ_CONTROLLER :
+ LUSTRE_SRV_SEQ_REGULAR;
+
+ rc = seq_server_init(ls->ls_server_seq,
+ ls->ls_client_seq,
+ ctx, m->mdt_bottom,
+ flags);
+ } else
+ rc = -ENOMEM;
+
RETURN(rc);
}
-static int mdt_seq_mgr_read(const struct lu_context *ctx,
- void *opaque, __u64 *seq)
+static int mdt_seq_fini(const struct lu_context *ctx,
+ struct mdt_device *m)
{
+ struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
ENTRY;
- RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
+
+ if (ls->ls_server_seq) {
+ seq_server_fini(ls->ls_server_seq, ctx);
+ OBD_FREE_PTR(ls->ls_server_seq);
+ ls->ls_server_seq = NULL;
+ }
+ if (ls->ls_client_seq) {
+ seq_client_fini(ls->ls_client_seq);
+ OBD_FREE_PTR(ls->ls_client_seq);
+ ls->ls_client_seq = NULL;
+ }
+ RETURN(0);
}
-static int mdt_seq_mgr_write(const struct lu_context *ctx,
- void *opaque, __u64 *seq)
+static int mdt_controller_init(struct mdt_device *m,
+ struct lustre_cfg *cfg)
{
+ struct obd_device *mdc;
+ struct obd_uuid uuid;
+ struct lu_site *ls;
+ char *uuid_str;
+ int rc, index;
ENTRY;
- RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
+
+ index = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10);
+ if (index != 0)
+ RETURN(0);
+
+ uuid_str = lustre_cfg_string(cfg, 1);
+ obd_str2uuid(&uuid, uuid_str);
+ mdc = class_find_client_obd(&uuid, LUSTRE_MDC_NAME, NULL);
+ if (!mdc) {
+ CERROR("can't find controller MDC by uuid %s\n",
+ uuid_str);
+ rc = -ENOENT;
+ } else if (!mdc->obd_set_up) {
+ CERROR("target %s not set up\n", mdc->obd_name);
+ rc = -EINVAL;
+ } else {
+ struct lustre_handle conn = {0, };
+
+ CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
+ mdc->obd_name, mdc->obd_uuid.uuid);
+
+ rc = obd_connect(&conn, mdc, &mdc->obd_uuid, NULL);
+
+ if (rc) {
+ CERROR("target %s connect error %d\n",
+ mdc->obd_name, rc);
+ } else {
+ ls = m->mdt_md_dev.md_lu_dev.ld_site;
+ ls->ls_controller = class_conn2export(&conn);
+ }
+ }
+
+ RETURN(rc);
}
-struct lu_seq_mgr_ops seq_mgr_ops = {
- .smo_read = mdt_seq_mgr_read,
- .smo_write = mdt_seq_mgr_write
-};
+static int mdt_controller_fini(struct mdt_device *m)
+{
+ struct lu_site *ls;
+ int rc;
+ ENTRY;
+
+ ls = m->mdt_md_dev.md_lu_dev.ld_site;
+ if (ls && ls->ls_controller) {
+ rc = obd_disconnect(ls->ls_controller);
+ ls->ls_controller = NULL;
+ }
+
+ RETURN(rc);
+}
/*
* FLD wrappers
*/
-
static int mdt_fld_init(const struct lu_context *ctx, struct mdt_device *m)
{
struct lu_site *ls;
static int mdt_fld_fini(const struct lu_context *ctx, struct mdt_device *m)
{
struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
-
+ ENTRY;
+
if (ls && ls->ls_fld) {
fld_server_fini(ctx, ls->ls_fld);
OBD_FREE_PTR(ls->ls_fld);
}
- return 0;
+ RETURN(0);
}
/* device init/fini methods */
-
static void mdt_stop_ptlrpc_service(struct mdt_device *m)
{
if (m->mdt_service != NULL) {
mdt_stack_fini(&ctx, m, md2lu_dev(m->mdt_child));
mdt_fld_fini(&ctx, m);
+ mdt_seq_fini(&ctx, m);
+ mdt_controller_fini(m);
LASSERT(atomic_read(&d->ld_ref) == 0);
md_device_fini(&m->mdt_md_dev);
m->mdt_namespace = NULL;
}
- if (m->mdt_seq_mgr) {
- seq_mgr_fini(m->mdt_seq_mgr);
- m->mdt_seq_mgr = NULL;
- }
-
if (d->ld_site != NULL) {
lu_site_fini(d->ld_site);
OBD_FREE_PTR(d->ld_site);
LASSERT(num);
s->ls_node_id = simple_strtol(num, NULL, 10);
- m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
- if (!m->mdt_seq_mgr) {
- CERROR("can't initialize sequence manager\n");
- GOTO(err_fini_stack, rc);
- }
- /* set initial sequence by mds index */
- m->mdt_seq_mgr->m_seq = s->ls_node_id * LUSTRE_SEQ_RANGE;
-
- /* init sequence info after device stack is initialized. */
- rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
-
lu_context_exit(&ctx);
if (rc)
- GOTO(err_fini_mgr, rc);
+ GOTO(err_fini_stack, rc);
lu_context_enter(&ctx);
rc = mdt_fld_init(&ctx, m);
lu_context_exit(&ctx);
if (rc)
- GOTO(err_free_fld, rc);
+ GOTO(err_fini_fld, rc);
+ lu_context_enter(&ctx);
+ rc = mdt_seq_init(&ctx, m);
+ lu_context_exit(&ctx);
+ if (rc)
+ GOTO(err_fini_seq, rc);
+
lu_context_fini(&ctx);
snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
-
rc = mdt_start_ptlrpc_service(m);
if (rc)
GOTO(err_free_ns, rc);
err_free_ns:
ldlm_namespace_free(m->mdt_namespace, 0);
m->mdt_namespace = NULL;
-err_free_fld:
+err_fini_seq:
+ mdt_seq_fini(&ctx, m);
+err_fini_fld:
mdt_fld_fini(&ctx, m);
-err_fini_mgr:
- seq_mgr_fini(m->mdt_seq_mgr);
- m->mdt_seq_mgr = NULL;
err_fini_ctx:
lu_context_exit(&ctx);
lu_context_fini(&ctx);
ENTRY;
switch (cfg->lcfg_command) {
+ case LCFG_ADD_MDC:
+ /* add mdc hook to get first MDT uuid and connect it to
+ * ls->controller to use for seq manager. */
+ err = mdt_controller_init(mdt_dev(d), cfg);
+ if (err) {
+ CERROR("can't initialize controller export, "
+ "rc %d\n", err);
+ }
/* all MDT specific commands should be here */
default:
/* others are passed further */
* necessary.
*/
unsigned long mdt_flags;
-
- /* Seq management related stuff */
- struct lu_seq_mgr *mdt_seq_mgr;
-
struct dt_device *mdt_bottom;
/*
* Options bit-fields.
/* Add the ost info to the client/mdt lov */
static int mgs_write_log_osc_to_lov(struct obd_device *obd, struct fs_db *fsdb,
- struct mgs_target_info *mti,
- char *logname, char *lovname, int flags)
+ struct mgs_target_info *mti,
+ char *logname, char *lovname, int flags)
{
struct llog_handle *llh = NULL;
char *nodeuuid, *oscname, *oscuuid, *lovuuid;
}
EXPORT_SYMBOL(lustre_swab_lu_fid);
+void lustre_swab_lu_range(struct lu_range *range)
+{
+ __swab64s (&range->lr_start);
+ __swab64s (&range->lr_end);
+}
+EXPORT_SYMBOL(lustre_swab_lu_range);
+
void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail)
{
__swab32s(&rec->lrh_len);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, connect);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, reconnect);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, disconnect);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_init);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_fini);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_alloc);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_delete);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, statfs);
#include <linux/module.h>
#include <obd_support.h>
#include <lustre_disk.h>
+#include <lustre_fid.h>
#include <lu_object.h>
#include <libcfs/list.h>
{
/* all objects with same id and different versions will belong to same
* collisions list. */
- return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
+ return (fid_seq(f) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(f);
}
/*
RETURN(0);
}
} else {
- struct client_obd *cli = &imp->imp_obd->u.cli;
struct obd_connect_data *ocd;
struct obd_export *exp;
GOTO(out, rc);
}
- /* get correct start fid from connect data sent by server. */
- spin_lock(&cli->cl_fid_lock);
- cli->cl_fid.f_seq = ocd->ocd_seq;
- cli->cl_fid.f_oid = LUSTRE_FID_INIT_OID;
- spin_unlock(&cli->cl_fid_lock);
-
spin_lock_irqsave(&imp->imp_lock, flags);
/*
__swab32s (&ocd->ocd_index);
__swab32s (&ocd->ocd_unused);
__swab64s (&ocd->ocd_ibits_known);
- __swab64s (&ocd->ocd_seq);
CLASSERT(offsetof(typeof(*ocd), padding2) != 0);
CLASSERT(offsetof(typeof(*ocd), padding3) != 0);
CLASSERT(offsetof(typeof(*ocd), padding4) != 0);
CLASSERT(offsetof(typeof(*ocd), padding5) != 0);
+ CLASSERT(offsetof(typeof(*ocd), padding6) != 0);
}
void lustre_swab_obdo (struct obdo *o)