Whamcloud - gitweb
- added working proto of sequences manager with super-, meta- sequence approach....
authoryury <yury>
Mon, 12 Jun 2006 16:50:16 +0000 (16:50 +0000)
committeryury <yury>
Mon, 12 Jun 2006 16:50:16 +0000 (16:50 +0000)
* sequence manager state is not saved/read to/from backing store for recovery;
* meta-sequence allocation on an MDT may cause super-sequence allocation, that is cascading RPCs are possible.

Lustre is mountable and mkdir proto is in working state.

37 files changed:
lustre/cmm/cmm_object.c
lustre/fid/Makefile.in
lustre/fid/autoMakefile.am
lustre/fid/fid_handle.c [new file with mode: 0644]
lustre/fid/fid_internal.h [new file with mode: 0644]
lustre/fid/fid_lib.c [moved from lustre/fid/fid_misc.c with 87% similarity]
lustre/fid/fid_seq.c [deleted file]
lustre/fld/fld_internal.h
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lib.c
lustre/liblustre/Makefile.am
lustre/liblustre/genlib.sh
lustre/liblustre/llite_fid.c
lustre/liblustre/llite_lib.h
lustre/liblustre/namei.c
lustre/liblustre/super.c
lustre/llite/llite_fid.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/namei.c
lustre/lmv/lmv_fld.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mgs/mgs_llog.c
lustre/obdclass/llog_swab.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/lu_object.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pack_generic.c

index 52ab3a0..07d608f 100644 (file)
@@ -32,6 +32,7 @@
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <lustre_fid.h>
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
@@ -40,7 +41,7 @@ static int cmm_fld_lookup(const struct lu_fid *fid)
 {
         int rc;
         /* temporary hack for proto mkdir */
-        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
         CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
         RETURN(rc);
 }
@@ -620,7 +621,7 @@ static int cmm_fld_lookup(const struct lu_fid *fid)
 {
         int rc;
         /* temporary hack for proto mkdir */
-        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
         CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
         RETURN(rc);
 }
index 955d7b5..9afaed1 100644 (file)
@@ -1,4 +1,4 @@
 MODULES := fid
-fid-objs := fid_seq.o fid_misc.o
+fid-objs := fid_handle.o fid_lib.o
 
 @INCLUDE_RULES@
index f736429..e84a5a0 100644 (file)
@@ -3,9 +3,18 @@
 # This code is issued under the GNU General Public License.
 # See the file COPYING in this distribution
 
+if LIBLUSTRE
+noinst_LIBRARIES = libfid.a
+libfid_a_SOURCES = fid_handle.c fid_lib.c fid_internal.h
+libfid_a_CPPFLAGS = $(LLCPPFLAGS)
+libfid_a_CFLAGS = $(LLCFLAGS)
+endif
+
 if MODULES
 modulefs_DATA = fid$(KMODEXT)
 endif
 
+install-data-hook: $(install_data_hook)
+
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ 
-DIST_SOURCES = $(fid-objs:%.o=%.c)
+DIST_SOURCES = $(fid-objs:%.o=%.c) fid_internal.h
diff --git a/lustre/fid/fid_handle.c b/lustre/fid/fid_handle.c
new file mode 100644 (file)
index 0000000..e851eba
--- /dev/null
@@ -0,0 +1,670 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/fid/fid_handle.c
+ *  Lustre Sequence Manager
+ *
+ *  Copyright (c) 2006 Cluster File Systems, Inc.
+ *   Author: Yury Umanets <umka@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FID
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <dt_object.h>
+#include <md_object.h>
+#include <obd_support.h>
+#include <lustre_fid.h>
+#include "fid_internal.h"
+
+/* client seq mgr interface */
+static int 
+seq_client_alloc_common(struct lu_client_seq *seq, 
+                        struct lu_range *seq_ran,
+                        __u32 seq_op)
+{
+        __u32 *op;
+        struct lu_range *range;
+        struct ptlrpc_request *req;
+        int ran_size = sizeof(*range);
+        int rc, size[] = {sizeof(*op), ran_size};
+        int repsize[] = {ran_size};
+        ENTRY;
+
+        req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), 
+                             LUSTRE_MDS_VERSION, SEQ_QUERY,
+                             2, size, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
+        *op = seq_op;
+
+        range = lustre_msg_buf(req->rq_reqmsg, 1, ran_size);
+        *range = *seq_ran;
+
+        req->rq_replen = lustre_msg_size(1, repsize);
+        req->rq_request_portal = MDS_SEQ_PORTAL;
+        rc = ptlrpc_queue_wait(req);
+        if (rc)
+                GOTO(out_req, rc);
+
+        range = lustre_swab_repbuf(req, 0, sizeof(*range),
+                                   lustre_swab_lu_range);
+
+        LASSERT(range != NULL);
+        *seq_ran = *range;
+out_req:
+        ptlrpc_req_finished(req); 
+        RETURN(rc);
+}
+
+/* request sequence-controller node to allocate new super-sequence. */
+int
+seq_client_alloc_super(struct lu_client_seq *seq)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
+        rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
+                                     SEQ_ALLOC_SUPER);
+        if (rc == 0) {
+                CWARN("SEQ-MGR(cli): allocated super-sequence "
+                      "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+                      seq->seq_cl_range.lr_end);
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_super);
+
+/* request sequence-controller node to allocate new meta-sequence. */
+int
+seq_client_alloc_meta(struct lu_client_seq *seq)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+        rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
+                                     SEQ_ALLOC_META);
+        if (rc == 0) {
+                CWARN("SEQ-MGR(cli): allocated meta-sequence "
+                      "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+                      seq->seq_cl_range.lr_end);
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_meta);
+
+/* allocate new sequence for client (llite or MDC are expected to use this) */
+int
+seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+        int rc;
+        ENTRY;
+
+        down(&seq->seq_sem);
+
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+        LASSERT(range_is_sane(&seq->seq_cl_range));
+
+        /* if we still have free sequences in meta-sequence we allocate new seq
+         * from given range. */
+        if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
+                *seqnr = seq->seq_cl_range.lr_start;
+                seq->seq_cl_range.lr_start += 1;
+                rc = 0;
+        } else {
+                /* meta-sequence is exhausted, request MDT to allocate new
+                 * meta-sequence for us. */
+                rc = seq_client_alloc_meta(seq);
+                if (rc) {
+                        CERROR("can't allocate new meta-sequence, "
+                               "rc %d\n", rc);
+                }
+                
+                *seqnr = seq->seq_cl_range.lr_start;
+                seq->seq_cl_range.lr_start += 1;
+        }
+        up(&seq->seq_sem);
+
+        if (rc == 0) {
+                CWARN("SEQ-MGR(cli): allocated sequence "
+                      "["LPX64"]\n", *seqnr);
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_seq);
+
+int
+seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(fid != NULL);
+        LASSERT(fid_is_sane(&seq->seq_fid));
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+
+        down(&seq->seq_sem);
+        if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
+                *fid = seq->seq_fid;
+                seq->seq_fid.f_oid += 1;
+                rc = 0;
+        } else {
+                __u64 seqnr = 0;
+                
+                rc = seq_client_alloc_seq(seq, &seqnr);
+                if (rc) {
+                        CERROR("can't allocate new sequence, "
+                               "rc %d\n", rc);
+                        GOTO(out, rc);
+                } else {
+                        seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+                        seq->seq_fid.f_seq = seqnr;
+                        seq->seq_fid.f_ver = 0;
+                        
+                        *fid = seq->seq_fid;
+                        seq->seq_fid.f_oid += 1;
+                }
+        }
+        LASSERT(fid_is_sane(fid));
+        
+        CWARN("SEQ-MGR(cli): allocated FID "DFID3"\n",
+              PFID3(fid));
+
+        EXIT;
+out:
+        up(&seq->seq_sem);
+        return rc;
+}
+EXPORT_SYMBOL(seq_client_alloc_fid);
+
+int 
+seq_client_init(struct lu_client_seq *seq, 
+                struct obd_export *exp,
+                int flags)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
+                         LUSTRE_CLI_SEQ_SERVER));
+
+        seq->seq_flags = flags;
+        fid_zero(&seq->seq_fid);
+        sema_init(&seq->seq_sem, 1);
+        
+        seq->seq_cl_range.lr_end = 0;
+        seq->seq_cl_range.lr_start = 0;
+       
+        if (exp != NULL)
+                seq->seq_exp = class_export_get(exp);
+
+        if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
+                __u64 seqnr = 0;
+                
+                /* client (llite or MDC) init case, we need new sequence from
+                 * MDT. This will allocate new meta-sequemce first, because seq
+                 * range in init state and looks the same as exhausted. */
+                rc = seq_client_alloc_seq(seq, &seqnr);
+                if (rc) {
+                        CERROR("can't allocate new sequence, rc %d\n", rc);
+                        GOTO(out, rc);
+                } else {
+                        seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+                        seq->seq_fid.f_seq = seqnr;
+                        seq->seq_fid.f_ver = 0;
+                }
+
+                LASSERT(fid_is_sane(&seq->seq_fid));
+        } else {
+                /* check if this is controller node is trying to init client. */
+                if (seq->seq_exp) {
+                        /* MDT uses client seq manager to talk to sequence
+                         * controller, and thus, we need super-sequence. */
+                        rc = seq_client_alloc_super(seq);
+                } else {
+                        rc = 0;
+                }
+        }
+
+        EXIT;
+out:
+        if (rc)
+                seq_client_fini(seq);
+        else
+                CWARN("Client Sequence Manager initialized\n");
+        return rc;
+}
+EXPORT_SYMBOL(seq_client_init);
+
+void seq_client_fini(struct lu_client_seq *seq)
+{
+        ENTRY;
+        if (seq->seq_exp != NULL) {
+                class_export_put(seq->seq_exp);
+                seq->seq_exp = NULL;
+        }
+        CWARN("Client Sequence Manager finalized\n");
+        EXIT;
+}
+EXPORT_SYMBOL(seq_client_fini);
+
+#ifdef __KERNEL__
+/* server side seq mgr stuff */
+static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
+        LUSTRE_SEQ_SPACE_START,
+        LUSTRE_SEQ_SPACE_LIMIT
+};
+
+static const struct lu_range LUSTRE_SEQ_META_INIT = {
+        0,
+        0
+};
+
+static int
+seq_server_write_state(struct lu_server_seq *seq,
+                       const struct lu_context *ctx)
+{
+       int rc = 0;
+       ENTRY;
+
+       /* XXX: here should be calling struct dt_device methods to write
+        * sequence state to backing store. */
+
+       RETURN(rc);
+}
+
+static int
+seq_server_read_state(struct lu_server_seq *seq,
+                      const struct lu_context *ctx)
+{
+       int rc = -ENODATA;
+       ENTRY;
+       
+       /* XXX: here should be calling struct dt_device methods to read the
+        * sequence state from backing store. */
+
+       RETURN(rc);
+}
+
+static int
+seq_server_alloc_super(struct lu_server_seq *seq,
+                       struct lu_range *range)
+{
+        struct lu_range *ss_range = &seq->seq_ss_range;
+        int rc;
+        ENTRY;
+
+        if (ss_range->lr_end - ss_range->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
+                CWARN("super-sequence is going to exhauste soon. "
+                      "Only can allocate "LPU64" sequences\n",
+                      ss_range->lr_end - ss_range->lr_start);
+                *range = *ss_range;
+                ss_range->lr_start = ss_range->lr_end;
+                rc = 0;
+        } else if (ss_range->lr_start >= ss_range->lr_end) {
+                CERROR("super-sequence is exhausted\n");
+                rc = -ENOSPC;
+        } else {
+                range->lr_start = ss_range->lr_start;
+                ss_range->lr_start += LUSTRE_SEQ_SUPER_CHUNK;
+                range->lr_end = ss_range->lr_start;
+                rc = 0;
+        }
+
+        if (rc == 0) {
+                CWARN("SEQ-MGR(srv): allocated super-sequence "
+                      "["LPX64"-"LPX64"]\n", range->lr_start,
+                      range->lr_end);
+        }
+        
+        RETURN(rc);
+}
+
+static int
+seq_server_alloc_meta(struct lu_server_seq *seq,
+                      struct lu_range *range)
+{
+        struct lu_range *ms_range = &seq->seq_ms_range;
+        int rc;
+        ENTRY;
+
+        LASSERT(range_is_sane(ms_range));
+        
+        /* XXX: here should avoid cascading RPCs using kind of async
+         * preallocation when meta-sequence is close to exhausting. */
+        if (ms_range->lr_start == ms_range->lr_end) {
+                if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
+                        /* allocate new range of meta-sequences to allocate new
+                         * meta-sequence from it. */
+                        rc = seq_server_alloc_super(seq, ms_range);
+                } else {
+                        /* request controller to allocate new super-sequence for
+                         * us.*/
+                        rc = seq_client_alloc_super(seq->seq_cli);
+                        if (rc) {
+                                CERROR("can't allocate new super-sequence, "
+                                       "rc %d\n", rc);
+                                RETURN(rc);
+                        }
+
+                        /* saving new range into allocation space. */
+                        *ms_range = seq->seq_cli->seq_cl_range;
+                }
+
+                LASSERT(ms_range->lr_start != 0);
+                LASSERT(ms_range->lr_end > ms_range->lr_start);
+        } else {
+                rc = 0;
+        }
+        range->lr_start = ms_range->lr_start;
+        ms_range->lr_start += LUSTRE_SEQ_META_CHUNK;
+        range->lr_end = ms_range->lr_start;
+
+        if (rc == 0) {
+                CWARN("SEQ-MGR(srv): allocated meta-sequence "
+                      "["LPX64"-"LPX64"]\n", range->lr_start,
+                      range->lr_end);
+        }
+
+        RETURN(rc);
+}
+
+static int
+seq_server_handle(struct lu_server_seq *seq,
+                  const struct lu_context *ctx, 
+                  struct lu_range *range,
+                  __u32 opc)
+{
+        int rc;
+        ENTRY;
+
+        down(&seq->seq_sem);
+
+        switch (opc) {
+        case SEQ_ALLOC_SUPER:
+                rc = seq_server_alloc_super(seq, range);
+                break;
+        case SEQ_ALLOC_META:
+                rc = seq_server_alloc_meta(seq, range);
+                break;
+        default:
+                rc = -EINVAL;
+                break;
+        }
+
+        if (rc)
+                GOTO(out, rc);
+
+        rc = seq_server_write_state(seq, ctx);
+        if (rc) {
+                CERROR("can't save state, rc = %d\n",
+                      rc);
+        }
+
+        EXIT;
+out:
+        up(&seq->seq_sem);
+        return rc;
+}
+
+static int
+seq_req_handle0(const struct lu_context *ctx,
+                struct lu_server_seq *seq, 
+                struct ptlrpc_request *req) 
+{
+        struct lu_range *in;
+        struct lu_range *out;
+        int size = sizeof(*in);
+        __u32 *opt;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_reply(req, 1, &size, NULL);
+        if (rc)
+                RETURN(rc);
+
+        rc = -EPROTO;
+        opt = lustre_swab_reqbuf(req, 0, sizeof(*opt),
+                                 lustre_swab_generic_32s);
+        if (opt != NULL) {
+                in = lustre_swab_reqbuf(req, 1, sizeof(*in),
+                                        lustre_swab_lu_range);
+                if (in != NULL) {
+                        out = lustre_msg_buf(req->rq_repmsg, 
+                                             0, sizeof(*out));
+                        LASSERT(out != NULL);
+
+                        *out = *in;
+                        rc = seq_server_handle(seq, ctx, out, *opt);
+                } else {
+                        CERROR("Cannot unpack seq range\n");
+                }
+        } else {
+                CERROR("Cannot unpack option\n");
+        }
+        RETURN(rc);
+}
+
+static int 
+seq_req_handle(struct ptlrpc_request *req) 
+{
+        int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
+        const struct lu_context *ctx;
+        struct lu_site    *site;
+        int rc = -EPROTO;
+        ENTRY;
+
+        OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
+        
+        ctx = req->rq_svc_thread->t_ctx;
+        LASSERT(ctx != NULL);
+        LASSERT(ctx->lc_thread == req->rq_svc_thread);
+        if (req->rq_reqmsg->opc == SEQ_QUERY) {
+                if (req->rq_export != NULL) {
+                        struct obd_device *obd;
+
+                        obd = req->rq_export->exp_obd;
+                        site = obd->obd_lu_dev->ld_site;
+                        LASSERT(site != NULL);
+                       
+                        rc = seq_req_handle0(ctx, site->ls_server_seq, req);
+                } else {
+                        CERROR("Unconnected request\n");
+                        req->rq_status = -ENOTCONN;
+                        GOTO(out, rc = -ENOTCONN);
+                }
+        } else {
+                CERROR("Wrong opcode: %d\n",
+                       req->rq_reqmsg->opc);
+                req->rq_status = -ENOTSUPP;
+                rc = ptlrpc_error(req);
+                RETURN(rc);
+        }
+
+        EXIT;
+out:
+        target_send_reply(req, rc, fail);
+        return 0;
+} 
+
+int
+seq_server_init(struct lu_server_seq *seq,
+                struct lu_client_seq *cli,
+                const struct lu_context  *ctx,
+                struct dt_device *dev,
+                int flags) 
+{
+        int rc; 
+        ENTRY;
+
+        struct ptlrpc_service_conf seq_conf = { 
+                .psc_nbufs = MDS_NBUFS, 
+                .psc_bufsize = MDS_BUFSIZE, 
+                .psc_max_req_size = MDS_MAXREQSIZE,
+                .psc_max_reply_size = MDS_MAXREPSIZE,
+                .psc_req_portal = MDS_SEQ_PORTAL,
+                .psc_rep_portal = MDC_REPLY_PORTAL,
+                .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT, 
+                .psc_num_threads = SEQ_NUM_THREADS
+        };
+
+       LASSERT(dev != NULL);
+       LASSERT(cli != NULL);
+        
+        LASSERT(flags & (LUSTRE_SRV_SEQ_CONTROLLER |
+                         LUSTRE_SRV_SEQ_REGULAR));
+
+        seq->seq_dev = dev;
+        seq->seq_cli = cli;
+        seq->seq_flags = flags;
+        sema_init(&seq->seq_sem, 1);
+
+        lu_device_get(&seq->seq_dev->dd_lu_dev);
+
+        /* request backing store for saved sequence info */
+        rc = seq_server_read_state(seq, ctx);
+        if (rc == -ENODATA) {
+                /* first run, no state on disk, init all seqs */
+                if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
+                        /* init super seq by start values on sequence-controller
+                         * node.*/
+                        seq->seq_ss_range = LUSTRE_SEQ_SUPER_INIT;
+                } else {
+                        /* take super-seq from client seq mgr */
+                        LASSERT(range_is_sane(&cli->seq_cl_range));
+                        seq->seq_ss_range = cli->seq_cl_range;
+                }
+
+                /* init meta-sequence by start values and get ready for
+                 * allocating it for clients. */
+                seq->seq_ms_range = LUSTRE_SEQ_META_INIT;
+
+                /* save init seq to backing store. */
+                rc = seq_server_write_state(seq, ctx);
+                if (rc) {
+                        CERROR("can't write sequence state, "
+                               "rc = %d\n", rc);
+                       GOTO(out, rc);
+                }
+        } else if (rc) {
+               CERROR("can't read sequence state, rc = %d\n",
+                      rc);
+               GOTO(out, rc);
+       }
+
+       seq->seq_service =  ptlrpc_init_svc_conf(&seq_conf,
+                                                seq_req_handle,
+                                                LUSTRE_SEQ0_NAME,
+                                                seq->seq_proc_entry, 
+                                                NULL); 
+       if (seq->seq_service != NULL)
+               rc = ptlrpc_start_threads(NULL, seq->seq_service,
+                                         LUSTRE_SEQ0_NAME); 
+       else 
+               rc = -ENOMEM; 
+
+       EXIT;
+
+out:
+       if (rc)
+               seq_server_fini(seq, ctx);
+        else
+                CWARN("Server Sequence Manager initialized\n");
+       return rc;
+} 
+EXPORT_SYMBOL(seq_server_init);
+
+void
+seq_server_fini(struct lu_server_seq *seq,
+                const struct lu_context *ctx) 
+{
+       int rc;
+
+        if (seq->seq_service != NULL) {
+                ptlrpc_unregister_service(seq->seq_service);
+                seq->seq_service = NULL;
+        }
+
+        if (seq->seq_dev != NULL) {
+                rc = seq_server_write_state(seq, ctx);
+               if (rc) {
+                       CERROR("can't save sequence state, "
+                              "rc = %d\n", rc);
+               }
+                lu_device_put(&seq->seq_dev->dd_lu_dev);
+                seq->seq_dev = NULL;
+        }
+        
+        CWARN("Server Sequence Manager finalized\n");
+}
+EXPORT_SYMBOL(seq_server_fini);
+
+static int fid_init(void)
+{
+       ENTRY;
+        CWARN("Lustre Sequence Manager\n");
+       RETURN(0);
+}
+
+static int fid_fini(void)
+{
+       ENTRY;
+       RETURN(0);
+}
+
+static int 
+__init fid_mod_init(void) 
+
+{
+        /* init caches if any */
+        fid_init();
+        return 0;
+}
+
+static void 
+__exit fid_mod_exit(void) 
+{
+        /* free caches if any */
+        fid_fini();
+        return;
+}
+
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Lustre FID Module");
+MODULE_LICENSE("GPL");
+
+cfs_module(fid, "0.0.3", fid_mod_init, fid_mod_exit);
+#endif
diff --git a/lustre/fid/fid_internal.h b/lustre/fid/fid_internal.h
new file mode 100644 (file)
index 0000000..1d99fe4
--- /dev/null
@@ -0,0 +1,31 @@
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  fid/fid_internal.h
+ *
+ *  Copyright (C) 2006 Cluster File Systems, Inc.
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+#ifndef _FID_INTERNAL_H
+#define _FID_INTERNAL_H
+
+#define SEQ_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
+
+#endif
similarity index 87%
rename from lustre/fid/fid_misc.c
rename to lustre/fid/fid_lib.c
index 0488b30..14bad9e 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/fid/fid_misc.c
+ *  lustre/fid/fid_lib.c
  *  Miscellaneous fid functions.
  *
  *  Copyright (c) 2006 Cluster File Systems, Inc.
  *   license text for more details.
  */
 
-#include <linux/module.h>
-#include <lustre/lustre_idl.h>
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FID
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
 
-#include <obd.h>
 #include <lustre_fid.h>
 
 void fid_to_le(struct lu_fid *dst, const struct lu_fid *src)
diff --git a/lustre/fid/fid_seq.c b/lustre/fid/fid_seq.c
deleted file mode 100644 (file)
index 1b6f5d2..0000000
+++ /dev/null
@@ -1,144 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- *  lustre/fid/fid_seq.c
- *  Lustre File Id (fid)
- *
- *  Copyright (c) 2006 Cluster File Systems, Inc.
- *   Author: Yury Umanets <umka@clusterfs.com>
- *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
- *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
- *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
- *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
- */
-
-#include <linux/module.h>
-
-#include <lustre/lustre_idl.h>
-#include <obd.h>
-#include <lustre_fid.h>
-
-/* sequence manager initialization/finalization stuff */
-struct lu_seq_mgr *seq_mgr_init(struct lu_seq_mgr_ops *ops,
-                                void *opaque)
-{
-        struct lu_seq_mgr *mgr;
-        ENTRY;
-
-        OBD_ALLOC_PTR(mgr);
-        if (!mgr)
-                RETURN(NULL);
-
-        sema_init(&mgr->m_seq_sem, 1);
-        mgr->m_opaque = opaque;
-        mgr->m_ops = ops;
-
-        RETURN(mgr);
-}
-EXPORT_SYMBOL(seq_mgr_init);
-
-void seq_mgr_fini(struct lu_seq_mgr *mgr)
-{
-        OBD_FREE_PTR(mgr);
-}
-EXPORT_SYMBOL(seq_mgr_fini);
-
-int seq_mgr_write(const struct lu_context *ctx, struct lu_seq_mgr *mgr)
-{
-        ENTRY;
-        RETURN(mgr->m_ops->smo_write(ctx, mgr->m_opaque, &mgr->m_seq));
-}
-EXPORT_SYMBOL(seq_mgr_write);
-
-int seq_mgr_read(const struct lu_context *ctx, struct lu_seq_mgr *mgr)
-{
-        ENTRY;
-        RETURN(mgr->m_ops->smo_read(ctx, mgr->m_opaque, &mgr->m_seq));
-}
-EXPORT_SYMBOL(seq_mgr_read);
-
-/* manager functionality stuff */
-int seq_mgr_alloc(const struct lu_context *ctx,
-                  struct lu_seq_mgr *mgr,
-                  __u64 *seq)
-{
-        int rc;
-        ENTRY;
-
-        LASSERT(mgr != NULL);
-        LASSERT(seq != NULL);
-
-        down(&mgr->m_seq_sem);
-        if (mgr->m_seq > mgr->m_seq_last) {
-                /* new range of seqs should be got from master */
-                rc = -EOPNOTSUPP;
-        } else {
-                *seq = mgr->m_seq;
-                mgr->m_seq++;
-
-                rc = seq_mgr_write(ctx, mgr);
-        }
-        up(&mgr->m_seq_sem);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(seq_mgr_alloc);
-
-/* initialize meta-sequence. First of all try to get it from lower layer,
- * falling down to back store one. In the case this is first run and there is
- * not meta-sequence initialized yet - store it to backstore. */
-int seq_mgr_setup(const struct lu_context *ctx, struct lu_seq_mgr *mgr)
-{
-        int rc = 0;
-        ENTRY;
-
-        /* set seq range */
-        mgr->m_seq_last = mgr->m_seq + LUSTRE_SEQ_RANGE;
-        
-        /* allocate next seq after root one */
-        mgr->m_seq += LUSTRE_ROOT_FID_SEQ + 1;
-
-        rc = seq_mgr_read(ctx, mgr);
-        if (rc == -ENODATA) {
-                CWARN("initialize sequence by defaut ["LPU64"]\n", mgr->m_seq);
-
-                /* initialize new sequence config as it is not yet created. */
-                rc = seq_mgr_write(ctx, mgr);
-        }
-
-        EXIT;
-        if (rc == 0)
-                CWARN("using start sequence: ["LPU64"]\n", mgr->m_seq);
-        return rc;
-}
-EXPORT_SYMBOL(seq_mgr_setup);
-
-static int __init fid_mod_init(void)
-{
-        /* some stuff will be here (cache initializing, etc.) */
-       return 0;
-}
-
-static void __exit fid_mod_exit(void)
-{
-        /* some stuff will be here */
-}
-
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre FID Module");
-MODULE_LICENSE("GPL");
-
-cfs_module(fid, "0.0.2", fid_mod_init, fid_mod_exit);
index 3c3f71c..6822d91 100644 (file)
@@ -45,12 +45,14 @@ struct fld_cache_info {
         spinlock_t fld_lock;
         int fld_hash_mask;
 };
+
 /*XXX use linked list temp for fld in this prototype*/
 struct fld_list {
         struct list_head fld_list;
         spinlock_t       fld_lock;
 };
-struct fld_item{
+
+struct fld_item {
         struct list_head fld_list;
         __u64 fld_seq;
         __u64 fld_mds;
index 2c9172c..5c0eaf6 100644 (file)
@@ -470,6 +470,19 @@ struct lu_site {
          */
         struct lu_fld        *ls_fld;
 
+        /*
+         * Server Seq Manager
+         */
+        struct lu_server_seq *ls_server_seq;
+
+        /*
+         * Clienbt Seq Manager
+         */
+        struct lu_client_seq *ls_client_seq;
+
+        /* sequence controller node */
+        struct obd_export    *ls_controller;
+
         /* statistical counters. Protected by nothing, races are accepted. */
         struct {
                 __u32 s_created;
index f64b7fb..a79965a 100644 (file)
@@ -98,6 +98,7 @@
 #define MGS_REPLY_PORTAL               27
 #define OST_REQUEST_PORTAL             28
 #define MDS_FLD_PORTAL                 29
+#define MDS_SEQ_PORTAL                 30
 
 #define SVC_KILLED               1
 #define SVC_EVENT                2
 #define LUSTRE_LOG_VERSION  0x00050000
 #define LUSTRE_MGS_VERSION  0x00060000
 
+struct lu_range {
+        __u64 lr_start;
+        __u64 lr_end;
+};
+
+static inline int range_is_sane(struct lu_range *r)
+{
+        if (r->lr_end >= r->lr_start)
+                return 1;
+        return 0;
+}
+
 struct lu_fid {
         __u64 f_seq;  /* holds fid sequence. Lustre should support 2 ^ 64
                        * objects, thus even if one sequence has one object we
@@ -138,12 +151,6 @@ enum {
         LUSTRE_ROOT_FID_SEQ  = 1ULL, /* XXX: should go into mkfs. */
         LUSTRE_ROOT_FID_OID  = 2UL,  /* XXX: should go into mkfs. */
 
-        /* maximal objects in sequence */
-        LUSTRE_FID_SEQ_WIDTH = 10000,
-
-        /* range of seqs for one MDS */
-        LUSTRE_SEQ_RANGE = 1000,
-
         /* initial fid id value */
         LUSTRE_FID_INIT_OID  = 1UL
 };
@@ -171,6 +178,11 @@ static inline int fid_seq_is_sane(__u64 seq)
         return seq != 0;
 }
 
+static inline void fid_zero(struct lu_fid *fid)
+{
+        memset(fid, 0, sizeof(*fid));
+}
+
 static inline int fid_is_sane(const struct lu_fid *fid)
 {
         return
@@ -185,7 +197,8 @@ static inline int fid_is_sane(const struct lu_fid *fid)
         fid_oid(fid), \
         fid_ver(fid)
 
-extern void lustre_swab_lu_fid (struct lu_fid *fid);
+extern void lustre_swab_lu_fid(struct lu_fid *fid);
+extern void lustre_swab_lu_range(struct lu_range *range);
 
 static inline int lu_fid_eq(const struct lu_fid *f0,
                             const struct lu_fid *f1)
@@ -372,11 +385,11 @@ struct obd_connect_data {
         __u32 ocd_index;                /* LOV index to connect to */
         __u32 ocd_unused;
         __u64 ocd_ibits_known;          /* inode bits this client understands */
-        __u64 ocd_seq;                  /* sequence info for client */
         __u64 padding2;                 /* also fix lustre_swab_connect */
         __u64 padding3;                 /* also fix lustre_swab_connect */
         __u64 padding4;                 /* also fix lustre_swab_connect */
         __u64 padding5;                 /* also fix lustre_swab_connect */
+        __u64 padding6;                 /* also fix lustre_swab_connect */
 };
 
 extern void lustre_swab_connect(struct obd_connect_data *ocd);
@@ -1124,6 +1137,17 @@ enum fld_rpc_opc {
         FLD_FIRST_OPC                   = FLD_QUERY
 };
 
+enum seq_rpc_opc {
+        SEQ_QUERY                       = 700,
+        SEQ_LAST_OPC,
+        SEQ_FIRST_OPC                   = SEQ_QUERY
+};
+
+enum seq_op {
+        SEQ_ALLOC_SUPER = 0,
+        SEQ_ALLOC_META = 1
+};
+
 /*
  *  LOV data structures
  */
index 0376d3a..69be22c 100644 (file)
 #include <libcfs/list.h>
 #include <libcfs/kp30.h>
 
+struct lu_site;
 struct lu_context;
-struct lu_seq_mgr_ops {
-        int (*smo_read) (const struct lu_context *, void *opaque, __u64 *);
-        int (*smo_write) (const struct lu_context *, void *opaque, __u64 *);
+
+/* start seq number */
+#define LUSTRE_SEQ_SPACE_START  0x400
+
+/* maximal posible seq number */
+#define LUSTRE_SEQ_SPACE_LIMIT  ((__u64)~0ULL)
+
+/* this is how may FIDs may be allocated in one sequence. */
+#define LUSTRE_SEQ_WIDTH 0x00000000000002800
+
+/* how many sequences may be allocate for meta-sequence (this is 10240
+ * sequences). */
+#define LUSTRE_SEQ_META_CHUNK 0x00000000000002800
+
+/* how many sequences may be allocate for super-sequence (this is 10240 * 10240
+ * sequences), what means that one alloaction for super-sequence allows to
+ * allocate 10240 meta-sequences and each of them may have 10240 sequences. */
+#define LUSTRE_SEQ_SUPER_CHUNK (LUSTRE_SEQ_META_CHUNK * LUSTRE_SEQ_META_CHUNK)
+
+/* client sequence manager interface */
+struct lu_client_seq {
+        /* sequence-controller export. */
+        struct obd_export      *seq_exp;
+        struct semaphore        seq_sem;
+
+        /* different flags. */
+        int                     seq_flags;
+
+        /* range of allowed for allocation sequeces. When using lu_client_seq on
+         * clients, this contains meta-sequence range. And for servers this
+         * contains super-sequence range. */
+        struct lu_range         seq_cl_range;
+
+        /* seq related proc */
+        struct proc_dir_entry  *seq_proc_entry;
+
+        /* this holds last allocated fid in last obtained seq */
+        struct lu_fid           seq_fid;
 };
 
-struct lu_seq_mgr {
-        /* seq management fields */
-        struct semaphore       m_seq_sem;
-        /* each MDS has own range of seqs ended with this value
-         * if it is overflowed the new one should be got from master node */
-        __u64                  m_seq_last;
-        /* last allocated seq */
-        __u64                  m_seq;
-        /* ops related stuff */
-        void                  *m_opaque;
-        struct lu_seq_mgr_ops *m_ops;
+#ifdef __KERNEL__
+/* server sequence manager interface */
+struct lu_server_seq {
+        /* super-sequence range, all super-sequences for other servers are
+         * allocated from it. */
+        struct lu_range         seq_ss_range;
+        
+        /* meta-sequence range, all meta-sequences for clients are allocated
+         * from it. */
+        struct lu_range         seq_ms_range;
+
+        /* device for server side seq manager needs (saving sequences to backing
+         * store). */
+        struct dt_device       *seq_dev;
+
+        /* different flags: LUSTRE_SEQ_CONTROLLER, etc. */
+        int                     seq_flags;
+
+        /* seq related proc */
+        struct proc_dir_entry  *seq_proc_entry;
+
+        /* server side seq service */
+        struct ptlrpc_service  *seq_service;
+
+        /* client interafce to request controller */
+        struct lu_client_seq   *seq_cli;
+
+        /* semaphore for protecting allocation */
+        struct semaphore        seq_sem;
 };
+#endif
 
-/* init/fini methods */
-struct lu_seq_mgr *seq_mgr_init(struct lu_seq_mgr_ops *, void *);
-void seq_mgr_fini(struct lu_seq_mgr *);
-
-/* seq management methods */
-int seq_mgr_setup(const struct lu_context *, struct lu_seq_mgr *);
-int seq_mgr_read(const struct lu_context *, struct lu_seq_mgr *);
-int seq_mgr_write(const struct lu_context *, struct lu_seq_mgr *);
-int seq_mgr_alloc(const struct lu_context *, struct lu_seq_mgr *, __u64 *);
-int seq_mgr_range_alloc(const struct lu_context *,
-                        struct lu_seq_mgr *, __u64 *);
-struct lu_site;
-#if 0
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid);
-#else
-static inline int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+/* client seq mgr flags */
+#define LUSTRE_CLI_SEQ_CLIENT     (1 << 0)
+#define LUSTRE_CLI_SEQ_SERVER     (1 << 1)
+
+#ifdef __KERNEL__
+/* server seq mgr flags */
+#define LUSTRE_SRV_SEQ_CONTROLLER (1 << 0)
+#define LUSTRE_SRV_SEQ_REGULAR    (1 << 1)
+
+int seq_server_init(struct lu_server_seq *seq,
+                    struct lu_client_seq *cli,
+                    const struct lu_context  *ctx,
+                    struct dt_device *dev,
+                    int flags);
+
+void seq_server_fini(struct lu_server_seq *seq,
+                     const struct lu_context *ctx) ;
+#endif
+
+int seq_client_init(struct lu_client_seq *seq, 
+                    struct obd_export *exp,
+                    int flags);
+
+void seq_client_fini(struct lu_client_seq *seq);
+
+int seq_client_alloc_super(struct lu_client_seq *seq);
+int seq_client_alloc_meta(struct lu_client_seq *seq);
+
+int seq_client_alloc_seq(struct lu_client_seq *seq,
+                         __u64 *seqnr);
+int seq_client_alloc_fid(struct lu_client_seq *seq,
+                         struct lu_fid *fid);
+
+/* Fids common stuff */
+static inline int fid_is_local(struct lu_site *site,
+                               const struct lu_fid *fid)
 {
+        /* XXX: fix this when fld is ready. */
         return 1;
 }
-#endif
 
 void fid_to_le(struct lu_fid *dst, const struct lu_fid *src);
 
-
 #endif /* __LINUX_OBD_CLASS_H */
index 049c122..c5ed0f9 100644 (file)
@@ -96,6 +96,8 @@
                                   num_physpages >> (25 - PAGE_SHIFT)), 2UL)
 #define FLD_NUM_THREADS max(min_t(unsigned long, MDT_MAX_THREADS, \
                                   num_physpages >> (25 - PAGE_SHIFT)), 2UL)
+#define SEQ_NUM_THREADS max(min_t(unsigned long, MDT_MAX_THREADS, \
+                                  num_physpages >> (25 - PAGE_SHIFT)), 2UL)
 
 #define MDS_MAX_THREADS 512UL
 #define MDS_DEF_THREADS max(2UL, min_t(unsigned long, 32, \
index 5d95c0d..6355d46 100644 (file)
@@ -385,9 +385,8 @@ struct client_obd {
         /* used by quotacheck */
         int                      cl_qchk_stat; /* quotacheck stat of the peer */
 
-        /* this holds last allocated fid in last obtained seq */
-        struct lu_fid            cl_fid;
-        spinlock_t               cl_fid_lock;
+        /* sequence manager */
+        struct lu_client_seq    *cl_seq;
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
@@ -540,7 +539,7 @@ struct niobuf_local {
 
 #define LUSTRE_OPC_MKDIR     (1 << 0)
 #define LUSTRE_OPC_SYMLINK   (1 << 1)
-#define LUSTRE_OPC_MKNODE    (1 << 2)
+#define LUSTRE_OPC_MKNOD     (1 << 2)
 #define LUSTRE_OPC_CREATE    (1 << 3)
         
 struct lu_placement_hint {
@@ -560,6 +559,7 @@ struct lu_placement_hint {
 #define LUSTRE_MDD0_NAME "mdd0"
 #define LUSTRE_OSD0_NAME "osd0"
 #define LUSTRE_FLD0_NAME "fld0"
+#define LUSTRE_SEQ0_NAME "seq0"
 #define LUSTRE_MDC0_NAME "mdc0"
 
 #define LUSTRE_MDC_NAME  "mdc"
@@ -829,7 +829,10 @@ struct obd_ops {
                            struct obd_connect_data *ocd);
         int (*o_disconnect)(struct obd_export *exp);
 
-        /* may be later these should be moved into separate fid_ops */
+        /* maybe later these should be moved into separate fid_ops */
+        int (*o_fid_init)(struct obd_export *exp);
+        int (*o_fid_fini)(struct obd_export *exp);
+
         int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid,
                            struct lu_placement_hint *hint);
         
index 8580a10..5f6b113 100644 (file)
@@ -720,6 +720,34 @@ static inline int obd_disconnect(struct obd_export *exp)
         RETURN(rc);
 }
 
+static inline int obd_fid_init(struct obd_export *exp)
+{
+        int rc;
+        ENTRY;
+
+        if (OBP(exp->exp_obd, fid_init) == NULL)
+                RETURN(-ENOTSUPP);
+
+        OBD_COUNTER_INCREMENT(exp->exp_obd, fid_init);
+
+        rc = OBP(exp->exp_obd, fid_init)(exp);
+        RETURN(rc);
+}
+
+static inline int obd_fid_fini(struct obd_export *exp)
+{
+        int rc;
+        ENTRY;
+
+        if (OBP(exp->exp_obd, fid_fini) == NULL)
+                RETURN(-ENOTSUPP);
+
+        OBD_COUNTER_INCREMENT(exp->exp_obd, fid_fini);
+
+        rc = OBP(exp->exp_obd, fid_fini)(exp);
+        RETURN(rc);
+}
+
 static inline int obd_fid_alloc(struct obd_export *exp,
                                 struct lu_fid *fid,
                                 struct lu_placement_hint *hint)
index beca205..5c45e85 100644 (file)
@@ -169,6 +169,9 @@ extern cfs_waitq_t obd_race_waitq;
 #define OBD_FAIL_MGS                     0x900
 #define OBD_FAIL_MGS_ALL_REQUEST_NET     0x901
 #define OBD_FAIL_MGS_ALL_REPLY_NET       0x902
+#define OBD_FAIL_SEQ                     0x1000
+#define OBD_FAIL_SEQ_ALL_REQUEST_NET     0x1001
+#define OBD_FAIL_SEQ_ALL_REPLY_NET       0x1002
 
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
index 9d9f9ab..08c7f64 100644 (file)
@@ -257,9 +257,6 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         cli->cl_r_in_flight = 0;
         cli->cl_w_in_flight = 0;
 
-        memset(&cli->cl_fid, 0, sizeof(struct lu_fid));
-        spin_lock_init(&cli->cl_fid_lock);
-
         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
         spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
         spin_lock_init(&cli->cl_read_page_hist.oh_lock);
@@ -380,9 +377,6 @@ int client_connect_import(struct lustre_handle *dlm_handle,
                 imp->imp_connect_flags_orig = data->ocd_connect_flags;
         }
 
-        /* zero out sequence to check it later for validness */
-        ocd->ocd_seq = 0;
-
         rc = ptlrpc_connect_import(imp, NULL);
         if (rc != 0) {
                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
index 1efe1e1..a905c18 100644 (file)
@@ -13,6 +13,7 @@ LUSTRE_LIBS = libllite.a \
               $(top_builddir)/lustre/lov/liblov.a \
               $(top_builddir)/lustre/obdecho/libobdecho.a \
               $(top_builddir)/lustre/osc/libosc.a \
+              $(top_builddir)/lustre/fid/libfid.a \
               $(top_builddir)/lustre/mdc/libmdc.a \
               $(top_builddir)/lustre/ptlrpc/libptlrpc.a \
               $(top_builddir)/lustre/obdclass/liblustreclass.a \
index 6d977b3..2f8f9b4 100755 (executable)
@@ -63,6 +63,7 @@ build_obj_list ../lov liblov.a
 build_obj_list ../obdecho libobdecho.a
 build_obj_list ../osc libosc.a
 build_obj_list ../mdc libmdc.a
+build_obj_list ../fid libfid.a
 build_obj_list ../ptlrpc libptlrpc.a
 build_obj_list ../obdclass liblustreclass.a
 build_obj_list ../lvfs liblvfs.a
index d1c4c01..b2d09b6 100644 (file)
 #include "lutil.h"
 #include "llite_lib.h"
 #include <lustre_ver.h>
+#include <lustre_fid.h>
+
+static int llu_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+                         struct lu_placement_hint *hint)
+{
+        int rc;
+        ENTRY;
+        rc = obd_fid_alloc(exp, fid, hint);
+        RETURN(rc);
+}
 
 /* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid)
+int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
+                     struct lu_placement_hint *hint)
 {
         ENTRY;
+        RETURN(llu_fid_alloc(sbi->ll_md_exp, fid, hint));
+}
 
-        if (sbi->ll_fid.f_oid < LUSTRE_FID_SEQ_WIDTH) {
-                sbi->ll_fid.f_oid += 1;
-                *fid = sbi->ll_fid;
-        } else {
-                CERROR("sequence is exhausted. Switching to "
-                       "new one is not yet implemented\n");
-                RETURN(-ERANGE);
+/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
+int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
+                     struct lu_placement_hint *hint)
+{
+        ENTRY;
+        RETURN(llu_fid_alloc(sbi->ll_dt_exp, fid, hint));
+}
+
+static int llu_fid_init(struct obd_export *exp)
+{
+        int rc;
+        ENTRY;
+
+        rc = obd_fid_init(exp);
+        if (rc) {
+                CERROR("cannot initialize FIDs framework, "
+                       "rc %d\n", rc);
+                RETURN(rc);
         }
-        
-        RETURN(0);
+
+        RETURN(rc);
 }
 
-/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid)
+int llu_fid_md_init(struct llu_sb_info *sbi)
+{
+        ENTRY;
+        RETURN(llu_fid_init(sbi->ll_md_exp));
+}
+
+int llu_fid_dt_init(struct llu_sb_info *sbi)
+{
+        ENTRY;
+        RETURN(llu_fid_init(sbi->ll_dt_exp));
+}
+
+static int llu_fid_fini(struct obd_export *exp)
+{
+        int rc;
+        ENTRY;
+
+        rc = obd_fid_fini(exp);
+        if (rc) {
+                CERROR("cannot finalize FIDs framework, "
+                       "rc %d\n", rc);
+                RETURN(rc);
+        }
+
+        RETURN(rc);
+}
+
+int llu_fid_md_fini(struct llu_sb_info *sbi)
+{
+        ENTRY;
+        RETURN(llu_fid_fini(sbi->ll_md_exp));
+}
+
+int llu_fid_dt_fini(struct llu_sb_info *sbi)
 {
         ENTRY;
-        RETURN(-EOPNOTSUPP);
+        RETURN(llu_fid_fini(sbi->ll_dt_exp));
 }
 
 /* build inode number on passed @fid */
-unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, struct lu_fid *fid)
+unsigned long llu_fid_build_ino(struct llu_sb_info *sbi,
+                                struct lu_fid *fid)
 {
         unsigned long ino;
         ENTRY;
 
         /* very stupid and having many downsides inode allocation algorithm
          * based on fid. */
-        ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid);
+        ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(fid);
         RETURN(ino);
 }
index 10376ce..a9436e4 100644 (file)
@@ -37,8 +37,6 @@ struct llu_sb_info {
         struct obd_uuid          ll_mds_uuid;
         struct obd_uuid          ll_mds_peer_uuid;
         char                    *ll_instance;
-
-        struct lu_fid            ll_fid;
 };
 
 #define LL_SBI_NOLCK            0x1
@@ -241,9 +239,20 @@ ssize_t llu_iop_filldirentries(struct inode *ino, _SYSIO_OFF_T *basep,
                               char *buf, size_t nbytes);
 
 /* liblustre/llite_fid.c*/
-int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid);
-int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid);
-unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, struct lu_fid *fid);
+int llu_fid_md_init(struct llu_sb_info *sbi);
+int llu_fid_dt_init(struct llu_sb_info *sbi);
+
+int llu_fid_md_fini(struct llu_sb_info *sbi);
+int llu_fid_dt_fini(struct llu_sb_info *sbi);
+
+int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid, 
+                     struct lu_placement_hint *hint);
+
+int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
+                     struct lu_placement_hint *hint);
+
+unsigned long llu_fid_build_ino(struct llu_sb_info *sbi, 
+                                struct lu_fid *fid);
 
 /* ext2 related */
 #define EXT2_NAME_LEN (255)
index 14b0518..977ecbb 100644 (file)
@@ -443,8 +443,13 @@ static int llu_lookup_it(struct inode *parent, struct pnode *pnode,
         icbd.icbd_parent = parent;
 
         /* allocate new fid for child */
-        if (it->it_op == IT_OPEN || it->it_op == IT_CREAT) {
-                rc = llu_fid_md_alloc(llu_i2sbi(parent), &op_data.fid2);
+        if (it->it_op & IT_CREAT || 
+            (it->it_op & IT_OPEN && it->it_create_mode & O_CREAT)) {
+                struct lu_placement_hint hint = { .ph_pname = NULL,
+                                                  .ph_cname = &pnode->p_base->pb_name,
+                                                  .ph_opc = LUSTRE_OPC_CREATE };
+                
+                rc = llu_fid_md_alloc(llu_i2sbi(parent), &op_data.fid2, &hint);
                 if (rc) {
                         CERROR("can't allocate new fid, rc %d\n", rc);
                         LBUG();
index 09d91c6..f5fa4fc 100644 (file)
@@ -850,6 +850,11 @@ static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt)
         struct ptlrpc_request *request = NULL;
         struct llu_sb_info *sbi = llu_i2sbi(dir);
         struct md_op_data op_data = { { 0 } };
+        struct lu_placement_hint hint = {
+                .ph_pname = NULL,
+                .ph_cname = qstr,
+                .ph_opc = LUSTRE_OPC_SYMLINK
+        };
         int err = -EMLINK;
         ENTRY;
 
@@ -858,7 +863,7 @@ static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt)
                 RETURN(err);
 
         /* allocate new fid */
-        err = llu_fid_md_alloc(sbi, &op_data.fid2);
+        err = llu_fid_md_alloc(sbi, &op_data.fid2, &hint);
         if (err) {
                 CERROR("can't allocate new fid, rc %d\n", err);
                 RETURN(err);
@@ -968,6 +973,11 @@ static int llu_iop_mknod_raw(struct pnode *pno,
         struct llu_sb_info *sbi = llu_i2sbi(dir);
         struct md_op_data op_data = { { 0 } };
         int err = -EMLINK;
+        struct lu_placement_hint hint = {
+                .ph_pname = NULL,
+                .ph_cname = &pno->p_base->pb_name,
+                .ph_opc = LUSTRE_OPC_MKNOD
+        };
         ENTRY;
 
         liblustre_wait_event(0);
@@ -987,7 +997,7 @@ static int llu_iop_mknod_raw(struct pnode *pno,
         case S_IFIFO:
         case S_IFSOCK:
                 /* allocate new fid */
-                err = llu_fid_md_alloc(sbi, &op_data.fid2);
+                err = llu_fid_md_alloc(sbi, &op_data.fid2, &hint);
                 if (err) {
                         CERROR("can't allocate new fid, rc %d\n", err);
                         RETURN(err);
@@ -1216,6 +1226,12 @@ static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode)
         struct ptlrpc_request *request = NULL;
         struct intnl_stat *st = llu_i2stat(dir);
         struct md_op_data op_data = { { 0 } };
+        struct lu_placement_hint hint = {
+                .ph_pname = NULL,
+                .ph_cname = qstr,
+                .ph_opc = LUSTRE_OPC_MKDIR
+        };
+        
         int err = -EMLINK;
         ENTRY;
 
@@ -1227,7 +1243,7 @@ static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode)
                 RETURN(err);
 
         /* allocate new fid */
-        err = llu_fid_md_alloc(llu_i2sbi(dir), &op_data.fid2);
+        err = llu_fid_md_alloc(llu_i2sbi(dir), &op_data.fid2, &hint);
         if (err) {
                 CERROR("can't allocate new fid, rc %d\n", err);
                 RETURN(err);
@@ -1819,11 +1835,9 @@ llu_fsswop_mount(const char *source,
         if (err)
                 GOTO(out_mdc, err);
 
-        /* initializing ->ll_fid. It is known that root object has separate
-         * sequence, so that we use what MDS returned to us and do not check if
-         * f_oid collides with root or not. */
-        sbi->ll_fid.f_seq = ocd.ocd_seq;
-        sbi->ll_fid.f_oid = LUSTRE_FID_INIT_OID;
+        err = llu_fid_md_init(sbi);
+        if (err)
+                GOTO(out_mdc, err);
 
         /*
          * FIXME fill fs stat data into sbi here!!! FIXME
@@ -1833,7 +1847,7 @@ llu_fsswop_mount(const char *source,
         obd = class_name2obd(osc);
         if (!obd) {
                 CERROR("OSC %s: not setup or attached\n", osc);
-                GOTO(out_mdc, err = -EINVAL);
+                GOTO(out_md_fid, err = -EINVAL);
         }
         obd_set_info_async(obd->obd_self_export, strlen("async"), "async",
                            sizeof(async), &async, NULL);
@@ -1852,12 +1866,16 @@ llu_fsswop_mount(const char *source,
         sbi->ll_dt_exp = class_conn2export(&osc_conn);
         sbi->ll_lco.lco_flags = ocd.ocd_connect_flags;
 
+        err = llu_fid_dt_init(sbi);
+        if (err)
+                GOTO(out_osc, err);
+
         llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
 
         err = md_getstatus(sbi->ll_md_exp, &rootfid);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_osc, err);
+                GOTO(out_dt_fid, err);
         }
         CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid));
         sbi->ll_root_fid = rootfid;
@@ -1911,8 +1929,12 @@ out_inode:
         _sysio_i_gone(root);
 out_request:
         ptlrpc_req_finished(request);
+out_dt_fid:
+        llu_fid_dt_fini(sbi);
 out_osc:
         obd_disconnect(sbi->ll_dt_exp);
+out_md_fid:
+        llu_fid_md_fini(sbi);
 out_mdc:
         obd_disconnect(sbi->ll_md_exp);
 out_free:
index ee9d8ae..a2335e1 100644 (file)
@@ -28,6 +28,7 @@
 #include <linux/random.h>
 #include <linux/version.h>
 
+#include <lustre_fid.h>
 #include <lustre_lite.h>
 #include <lustre_ha.h>
 #include <lustre_ver.h>
@@ -67,14 +68,75 @@ int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
         RETURN(ll_fid_alloc(sbi->ll_dt_exp, fid, hint));
 }
 
+static int ll_fid_init(struct obd_export *exp)
+{
+        int rc;
+        ENTRY;
+
+        rc = obd_fid_init(exp);
+        if (rc) {
+                CERROR("cannot initialize FIDs framework, "
+                       "rc %d\n", rc);
+                RETURN(rc);
+        }
+
+        RETURN(rc);
+}
+
+int ll_fid_md_init(struct ll_sb_info *sbi)
+{
+        ENTRY;
+        RETURN(ll_fid_init(sbi->ll_md_exp));
+}
+
+int ll_fid_dt_init(struct ll_sb_info *sbi)
+{
+#if 0
+        ENTRY;
+        RETURN(ll_fid_init(sbi->ll_dt_exp));
+#endif
+        /* XXX: enable this again when OSD is starting sequence-management
+         * service. */
+        ENTRY;
+        RETURN(0);
+}
+
+static int ll_fid_fini(struct obd_export *exp)
+{
+        int rc;
+        ENTRY;
+
+        rc = obd_fid_fini(exp);
+        if (rc) {
+                CERROR("cannot finalize FIDs framework, "
+                       "rc %d\n", rc);
+                RETURN(rc);
+        }
+
+        RETURN(rc);
+}
+
+int ll_fid_md_fini(struct ll_sb_info *sbi)
+{
+        ENTRY;
+        RETURN(ll_fid_fini(sbi->ll_md_exp));
+}
+
+int ll_fid_dt_fini(struct ll_sb_info *sbi)
+{
+        ENTRY;
+        RETURN(ll_fid_fini(sbi->ll_dt_exp));
+}
+
 /* build inode number on passed @fid */
-ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid)
+ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
+                       struct lu_fid *fid)
 {
         ino_t ino;
         ENTRY;
 
         /* very stupid and having many downsides inode allocation algorithm
          * based on fid. */
-        ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid);
+        ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(fid);
         RETURN(ino);
 }
index 9771b99..4b26a70 100644 (file)
@@ -591,6 +591,12 @@ ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size);
 int ll_removexattr(struct dentry *dentry, const char *name);
 
 /* llite/llite_fid.c*/
+int ll_fid_md_init(struct ll_sb_info *sbi);
+int ll_fid_dt_init(struct ll_sb_info *sbi);
+
+int ll_fid_md_fini(struct ll_sb_info *sbi);
+int ll_fid_dt_fini(struct ll_sb_info *sbi);
+
 int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
                     struct lu_placement_hint *hint);
 
index 913a704..908498b 100644 (file)
@@ -244,10 +244,17 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
                                  strlen(sbi2mdc(sbi)->cl_target_uuid.uuid));
 #endif
 
+        /* init FIDs framework */
+        err = ll_fid_md_init(sbi);
+        if (err) {
+                CERROR("can't init FIDs framework, rc %d\n", err);
+                GOTO(out_mdc, err);
+        }
+        
         obd = class_name2obd(osc);
         if (!obd) {
                 CERROR("OSC %s: not setup or attached\n", osc);
-                GOTO(out_mdc, err = -ENODEV);
+                GOTO(out_md_fid, err = -ENODEV);
         }
 
         data->ocd_connect_flags =
@@ -298,10 +305,17 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
                         GOTO(out_osc, -ENOMEM);
         }
 
+        /* init FIDs framework */
+        err = ll_fid_dt_init(sbi);
+        if (err) {
+                CERROR("can't init FIDs framework, rc %d\n", err);
+                GOTO(out_osc, err);
+        }
+        
         err = md_getstatus(sbi->ll_md_exp, &rootfid);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_osc, err);
+                GOTO(out_dt_fid, err);
         }
         CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid));
         sbi->ll_root_fid = rootfid;
@@ -329,7 +343,7 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
                 GOTO(out_osc, err);
         }
 
-        LASSERT(fid_oid(&sbi->ll_root_fid) != 0);
+        LASSERT(fid_is_sane(&sbi->ll_root_fid));
         root = ll_iget(sb, ll_fid_build_ino(sbi, &sbi->ll_root_fid), &md);
         ptlrpc_req_finished(request);
 
@@ -363,8 +377,12 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
 out_root:
         if (root)
                 iput(root);
+out_dt_fid:
+        obd_fid_fini(sbi->ll_dt_exp);
 out_osc:
         obd_disconnect(sbi->ll_dt_exp);
+out_md_fid:
+        obd_fid_fini(sbi->ll_md_exp);
 out_mdc:
         obd_disconnect(sbi->ll_md_exp);
 out:
@@ -502,6 +520,7 @@ void client_common_put_super(struct super_block *sb)
         prune_deathrow(sbi, 0);
 
         list_del(&sbi->ll_conn_chain);
+        ll_fid_dt_fini(sbi);
         obd_disconnect(sbi->ll_dt_exp);
 
         lprocfs_unregister_mountpoint(sbi);
@@ -510,6 +529,7 @@ void client_common_put_super(struct super_block *sb)
                 sbi->ll_proc_root = NULL;
         }
 
+        ll_fid_md_fini(sbi);
         obd_disconnect(sbi->ll_md_exp);
 
         lustre_throw_orphan_dentries(sb);
index b36fde4..922ffba 100644 (file)
@@ -547,6 +547,11 @@ static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode,
         struct inode *inode = NULL;
         struct ll_sb_info *sbi = ll_i2sbi(dir);
         struct md_op_data op_data = { { 0 } };
+        struct lu_placement_hint hint = {
+                .ph_pname = NULL,
+                .ph_cname = name,
+                .ph_opc = LUSTRE_OPC_MKNOD
+        };
         int err;
         ENTRY;
 
@@ -564,6 +569,9 @@ static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode,
         case S_IFBLK:
         case S_IFIFO:
         case S_IFSOCK:
+                err = ll_fid_md_alloc(sbi, &op_data.fid2, &hint);
+                if (err)
+                        break;
                 ll_prepare_md_op_data(&op_data, dir, NULL, name->name,
                                       name->len, 0);
                 err = md_create(sbi->ll_md_exp, &op_data, NULL, 0, mode,
index 99a5cab..6810f09 100644 (file)
@@ -37,6 +37,7 @@
 
 #include <lustre/lustre_idl.h>
 #include <obd_support.h>
+#include <lustre_fid.h>
 #include <lustre_lib.h>
 #include <lustre_net.h>
 #include <lustre_dlm.h>
@@ -53,7 +54,7 @@ int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid)
         LASSERT(fid_is_sane(fid));
 
         /* temporary hack until fld will works */
-        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_SUPER_CHUNK;
         CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
         RETURN(rc);
 }
index 33c7125..dd04eca 100644 (file)
@@ -670,6 +670,42 @@ static int lmv_placment_policy(struct obd_device *obd,
         RETURN(0);
 }
 
+static int lmv_fid_init(struct obd_export *exp)
+{
+        struct obd_device *obd = class_exp2obd(exp);
+        struct lmv_obd *lmv = &obd->u.lmv;
+        int i, rc = 0;
+        ENTRY;
+        
+        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                if (lmv->tgts[i].ltd_exp == NULL)
+                        continue;
+                
+                rc = obd_fid_init(lmv->tgts[i].ltd_exp);
+                if (rc)
+                        RETURN(rc);
+        }
+        RETURN(rc);
+}
+
+static int lmv_fid_fini(struct obd_export *exp)
+{
+        struct obd_device *obd = class_exp2obd(exp);
+        struct lmv_obd *lmv = &obd->u.lmv;
+        int i, rc = 0;
+        ENTRY;
+
+        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                if (lmv->tgts[i].ltd_exp == NULL)
+                        continue;
+                
+                rc = obd_fid_fini(lmv->tgts[i].ltd_exp);
+                if (rc)
+                        break;
+        }
+        RETURN(rc);
+}
+
 static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
                          struct lu_placement_hint *hint)
 {
@@ -2385,6 +2421,8 @@ struct obd_ops lmv_obd_ops = {
         .o_packmd               = lmv_packmd,
         .o_unpackmd             = lmv_unpackmd,
         .o_notify               = lmv_notify,
+        .o_fid_init             = lmv_fid_init,
+        .o_fid_fini             = lmv_fid_fini,
         .o_fid_alloc            = lmv_fid_alloc,
         .o_fid_delete           = lmv_fid_delete,
         .o_iocontrol            = lmv_iocontrol
index 3ea198b..e43081d 100644 (file)
@@ -38,6 +38,7 @@
 
 #include <obd_class.h>
 #include <lustre_dlm.h>
+#include <lustre_fid.h>
 #include <lustre_mds.h> /* for LUSTRE_POSIX_ACL_MAX_SIZE */
 #include <md_object.h>
 #include <lprocfs_status.h>
@@ -1087,30 +1088,50 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
         RETURN(rc);
 }
 
-static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
-                         struct lu_placement_hint *hint)
+static int mdc_fid_init(struct obd_export *exp)
 {
         struct client_obd *cli = &exp->exp_obd->u.cli;
-        int rc = 0;
+        int rc;
         ENTRY;
 
-        LASSERT(fid != NULL);
-        LASSERT(hint != NULL);
-        LASSERT(fid_seq_is_sane(fid_seq(&cli->cl_fid)));
+        OBD_ALLOC_PTR(cli->cl_seq);
+        if (cli->cl_seq == NULL)
+                RETURN(-ENOMEM);
 
-        spin_lock(&cli->cl_fid_lock);
-        if (fid_oid(&cli->cl_fid) < LUSTRE_FID_SEQ_WIDTH) {
-                cli->cl_fid.f_oid += 1;
-                *fid = cli->cl_fid;
-        } else {
-                CERROR("sequence is exhausted. Switching to "
-                       "new one is not yet implemented\n");
-                rc = -ERANGE;
+        /* init client side of sequence-manager */
+        rc = seq_client_init(cli->cl_seq, exp,
+                             LUSTRE_CLI_SEQ_CLIENT);
+        if (rc) {
+                OBD_FREE_PTR(cli->cl_seq);
+                cli->cl_seq = NULL;
         }
-        spin_unlock(&cli->cl_fid_lock);
         RETURN(rc);
 }
 
+static int mdc_fid_fini(struct obd_export *exp)
+{
+        struct client_obd *cli = &exp->exp_obd->u.cli;
+        ENTRY;
+
+        if (cli->cl_seq != NULL) {
+                seq_client_fini(cli->cl_seq);
+                OBD_FREE_PTR(cli->cl_seq);
+                cli->cl_seq = NULL;
+        }
+        
+        RETURN(0);
+}
+
+static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+                         struct lu_placement_hint *hint)
+{
+        struct client_obd *cli = &exp->exp_obd->u.cli;
+        struct lu_client_seq *seq = cli->cl_seq;
+
+        ENTRY;
+        RETURN(seq_client_alloc_fid(seq, fid));
+}
+
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -1259,6 +1280,8 @@ struct obd_ops mdc_obd_ops = {
         .o_statfs           = mdc_statfs,
         .o_pin              = mdc_pin,
         .o_unpin            = mdc_unpin,
+        .o_fid_init         = mdc_fid_init,
+        .o_fid_fini         = mdc_fid_fini,
         .o_fid_alloc        = mdc_fid_alloc,
         .o_import_event     = mdc_import_event,
         .o_llog_init        = mdc_llog_init,
index 33658aa..a742aba 100644 (file)
@@ -293,22 +293,8 @@ static int mdt_connect(struct mdt_thread_info *info)
         req = mdt_info_req(info);
         result = target_handle_connect(req, mdt_handle);
         if (result == 0) {
-                struct obd_connect_data *data;
-
                 LASSERT(req->rq_export != NULL);
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
-
-                /*
-                 * XXX: this is incorrect, because target_handle_connect()
-                 * accessed and *swabbed* connect data bypassing
-                 * capsule. Correct fix is to switch everything to the new
-                 * req-layout interface.
-                 */
-                data = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
-
-                result = seq_mgr_alloc(info->mti_ctxt,
-                                       info->mti_mdt->mdt_seq_mgr,
-                                       &data->ocd_seq);
         }
         return result;
 }
@@ -1469,41 +1455,130 @@ static int mdt_config(const struct lu_context *ctx, struct mdt_device *m,
         RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
 }
 
-static int mdt_seq_mgr_hpr(const struct lu_context *ctx,
-                           void *opaque, __u64 *seq, int mode)
+/*
+ * Seq wrappers
+ */
+static int mdt_seq_init(const struct lu_context *ctx,
+                        struct mdt_device *m)
 {
-        struct mdt_device *m = opaque;
+        struct lu_site *ls;
         int rc;
         ENTRY;
 
-        rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
-                        seq, sizeof(*seq), mode);
+        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+        OBD_ALLOC_PTR(ls->ls_client_seq);
+
+        if (ls->ls_client_seq != NULL) {
+                rc = seq_client_init(ls->ls_client_seq,
+                                     ls->ls_controller,
+                                     LUSTRE_CLI_SEQ_SERVER);
+        } else
+                rc = -ENOMEM;
+
+        if (rc)
+                RETURN(rc);
+        
+        OBD_ALLOC_PTR(ls->ls_server_seq);
+
+        if (ls->ls_server_seq != NULL) {
+                int flags;
+
+                flags = (ls->ls_node_id == 0) ?
+                        LUSTRE_SRV_SEQ_CONTROLLER :
+                        LUSTRE_SRV_SEQ_REGULAR;
+                
+                rc = seq_server_init(ls->ls_server_seq,
+                                     ls->ls_client_seq,
+                                     ctx, m->mdt_bottom,
+                                     flags);
+        } else
+                rc = -ENOMEM;
+
         RETURN(rc);
 }
 
-static int mdt_seq_mgr_read(const struct lu_context *ctx,
-                            void *opaque, __u64 *seq)
+static int mdt_seq_fini(const struct lu_context *ctx,
+                        struct mdt_device *m)
 {
+        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
         ENTRY;
-        RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
+        
+        if (ls->ls_server_seq) {
+                seq_server_fini(ls->ls_server_seq, ctx);
+                OBD_FREE_PTR(ls->ls_server_seq);
+                ls->ls_server_seq = NULL;
+        }
+        if (ls->ls_client_seq) {
+                seq_client_fini(ls->ls_client_seq);
+                OBD_FREE_PTR(ls->ls_client_seq);
+                ls->ls_client_seq = NULL;
+        }
+        RETURN(0);
 }
 
-static int mdt_seq_mgr_write(const struct lu_context *ctx,
-                             void *opaque, __u64 *seq)
+static int mdt_controller_init(struct mdt_device *m,
+                               struct lustre_cfg *cfg)
 {
+        struct obd_device *mdc;
+        struct obd_uuid uuid;
+        struct lu_site *ls;
+        char *uuid_str;
+        int rc, index;
         ENTRY;
-        RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
+        
+        index = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10);
+        if (index != 0)
+                RETURN(0);
+
+        uuid_str = lustre_cfg_string(cfg, 1);
+        obd_str2uuid(&uuid, uuid_str);
+        mdc = class_find_client_obd(&uuid, LUSTRE_MDC_NAME, NULL);
+        if (!mdc) {
+                CERROR("can't find controller MDC by uuid %s\n",
+                       uuid_str);
+                rc = -ENOENT;
+        } else if (!mdc->obd_set_up) {
+                CERROR("target %s not set up\n", mdc->obd_name);
+                rc = -EINVAL;
+        } else {
+                struct lustre_handle conn = {0, };
+
+                CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
+                       mdc->obd_name, mdc->obd_uuid.uuid);
+
+                rc = obd_connect(&conn, mdc, &mdc->obd_uuid, NULL);
+
+                if (rc) {
+                        CERROR("target %s connect error %d\n",
+                               mdc->obd_name, rc);
+                } else {
+                        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+                        ls->ls_controller = class_conn2export(&conn);
+                }
+        }
+                
+        RETURN(rc);
 }
 
-struct lu_seq_mgr_ops seq_mgr_ops = {
-        .smo_read  = mdt_seq_mgr_read,
-        .smo_write = mdt_seq_mgr_write
-};
+static int mdt_controller_fini(struct mdt_device *m)
+{
+        struct lu_site *ls;
+        int rc;
+        ENTRY;
+        
+        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        if (ls && ls->ls_controller) {
+                rc = obd_disconnect(ls->ls_controller);
+                ls->ls_controller = NULL;
+        }
+
+        RETURN(rc);
+}
 
 /*
  * FLD wrappers
  */
-
 static int mdt_fld_init(const struct lu_context *ctx, struct mdt_device *m)
 {
         struct lu_site *ls;
@@ -1525,16 +1600,16 @@ static int mdt_fld_init(const struct lu_context *ctx, struct mdt_device *m)
 static int mdt_fld_fini(const struct lu_context *ctx, struct mdt_device *m)
 {
         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
-
+        ENTRY;
+        
         if (ls && ls->ls_fld) {
                 fld_server_fini(ctx, ls->ls_fld);
                 OBD_FREE_PTR(ls->ls_fld);
         }
-        return 0;
+        RETURN(0);
 }
 
 /* device init/fini methods */
-
 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
 {
         if (m->mdt_service != NULL) {
@@ -1723,6 +1798,8 @@ static void mdt_fini(struct mdt_device *m)
         mdt_stack_fini(&ctx, m, md2lu_dev(m->mdt_child));
 
         mdt_fld_fini(&ctx, m);
+        mdt_seq_fini(&ctx, m);
+        mdt_controller_fini(m);
 
         LASSERT(atomic_read(&d->ld_ref) == 0);
         md_device_fini(&m->mdt_md_dev);
@@ -1732,11 +1809,6 @@ static void mdt_fini(struct mdt_device *m)
                 m->mdt_namespace = NULL;
         }
 
-        if (m->mdt_seq_mgr) {
-                seq_mgr_fini(m->mdt_seq_mgr);
-                m->mdt_seq_mgr = NULL;
-        }
-
         if (d->ld_site != NULL) {
                 lu_site_fini(d->ld_site);
                 OBD_FREE_PTR(d->ld_site);
@@ -1794,27 +1866,22 @@ static int mdt_init0(struct mdt_device *m,
         LASSERT(num);
         s->ls_node_id = simple_strtol(num, NULL, 10);
 
-        m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
-        if (!m->mdt_seq_mgr) {
-                CERROR("can't initialize sequence manager\n");
-                GOTO(err_fini_stack, rc);
-        }
-        /* set initial sequence by mds index */
-        m->mdt_seq_mgr->m_seq = s->ls_node_id * LUSTRE_SEQ_RANGE;
-
-        /* init sequence info after device stack is initialized. */
-        rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
-
         lu_context_exit(&ctx);
         if (rc)
-                GOTO(err_fini_mgr, rc);
+                GOTO(err_fini_stack, rc);
 
         lu_context_enter(&ctx);
         rc = mdt_fld_init(&ctx, m);
         lu_context_exit(&ctx);
         if (rc)
-                GOTO(err_free_fld, rc);
+                GOTO(err_fini_fld, rc);
 
+        lu_context_enter(&ctx);
+        rc = mdt_seq_init(&ctx, m);
+        lu_context_exit(&ctx);
+        if (rc)
+                GOTO(err_fini_seq, rc);
+        
         lu_context_fini(&ctx);
 
         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
@@ -1824,7 +1891,6 @@ static int mdt_init0(struct mdt_device *m,
 
         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
 
-
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
                 GOTO(err_free_ns, rc);
@@ -1837,11 +1903,10 @@ static int mdt_init0(struct mdt_device *m,
 err_free_ns:
         ldlm_namespace_free(m->mdt_namespace, 0);
         m->mdt_namespace = NULL;
-err_free_fld:
+err_fini_seq:
+        mdt_seq_fini(&ctx, m);
+err_fini_fld:
         mdt_fld_fini(&ctx, m);
-err_fini_mgr:
-        seq_mgr_fini(m->mdt_seq_mgr);
-        m->mdt_seq_mgr = NULL;
 err_fini_ctx:
         lu_context_exit(&ctx);
         lu_context_fini(&ctx);
@@ -1862,6 +1927,14 @@ static int mdt_process_config(const struct lu_context *ctx,
         ENTRY;
 
         switch (cfg->lcfg_command) {
+        case LCFG_ADD_MDC:
+                /* add mdc hook to get first MDT uuid and connect it to
+                 * ls->controller to use for seq manager. */
+                err = mdt_controller_init(mdt_dev(d), cfg);
+                if (err) {
+                        CERROR("can't initialize controller export, "
+                               "rc %d\n", err);
+                }
                 /* all MDT specific commands should be here */
         default:
                 /* others are passed further */
index bbbb734..22106e5 100644 (file)
@@ -65,10 +65,6 @@ struct mdt_device {
          * necessary.
          */
         unsigned long              mdt_flags;
-
-        /* Seq management related stuff */
-        struct lu_seq_mgr         *mdt_seq_mgr;
-
         struct dt_device          *mdt_bottom;
         /*
          * Options bit-fields.
index 898bba1..afe61fe 100644 (file)
@@ -1308,8 +1308,8 @@ static int mgs_write_log_mds(struct obd_device *obd, struct fs_db *fsdb,
 
 /* Add the ost info to the client/mdt lov */
 static int mgs_write_log_osc_to_lov(struct obd_device *obd, struct fs_db *fsdb,
-                             struct mgs_target_info *mti,
-                             char *logname, char *lovname, int flags)
+                                    struct mgs_target_info *mti,
+                                    char *logname, char *lovname, int flags)
 {
         struct llog_handle *llh = NULL;
         char *nodeuuid, *oscname, *oscuuid, *lovuuid;
index fd138af..91a331d 100644 (file)
@@ -93,6 +93,13 @@ void lustre_swab_lu_fid(struct lu_fid *fid)
 }
 EXPORT_SYMBOL(lustre_swab_lu_fid);
 
+void lustre_swab_lu_range(struct lu_range *range)
+{
+        __swab64s (&range->lr_start);
+        __swab64s (&range->lr_end);
+}
+EXPORT_SYMBOL(lustre_swab_lu_range);
+
 void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail)
 {
         __swab32s(&rec->lrh_len);
index 0c3aeb8..88488e9 100644 (file)
@@ -667,6 +667,8 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, connect);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, reconnect);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, disconnect);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_init);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_fini);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_alloc);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_delete);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, statfs);
index a92c3b1..9844e06 100644 (file)
@@ -37,6 +37,7 @@
 #include <linux/module.h>
 #include <obd_support.h>
 #include <lustre_disk.h>
+#include <lustre_fid.h>
 #include <lu_object.h>
 #include <libcfs/list.h>
 
@@ -275,7 +276,7 @@ static __u32 fid_hash(const struct lu_fid *f)
 {
         /* all objects with same id and different versions will belong to same
          * collisions list. */
-        return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
+        return (fid_seq(f) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(f);
 }
 
 /*
index bb9fd86..747c70f 100644 (file)
@@ -564,7 +564,6 @@ finish:
                         RETURN(0);
                 }
         } else {
-                struct client_obd *cli = &imp->imp_obd->u.cli;
                 struct obd_connect_data *ocd;
                 struct obd_export *exp;
 
@@ -576,12 +575,6 @@ finish:
                         GOTO(out, rc);
                 }
 
-                /* get correct start fid from connect data sent by server. */
-                spin_lock(&cli->cl_fid_lock);
-                cli->cl_fid.f_seq = ocd->ocd_seq;
-                cli->cl_fid.f_oid = LUSTRE_FID_INIT_OID;
-                spin_unlock(&cli->cl_fid_lock);
-
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 
                 /*
index d2deec3..b30e12f 100644 (file)
@@ -509,11 +509,11 @@ void lustre_swab_connect(struct obd_connect_data *ocd)
         __swab32s (&ocd->ocd_index);
         __swab32s (&ocd->ocd_unused);
         __swab64s (&ocd->ocd_ibits_known);
-        __swab64s (&ocd->ocd_seq);
         CLASSERT(offsetof(typeof(*ocd), padding2) != 0);
         CLASSERT(offsetof(typeof(*ocd), padding3) != 0);
         CLASSERT(offsetof(typeof(*ocd), padding4) != 0);
         CLASSERT(offsetof(typeof(*ocd), padding5) != 0);
+        CLASSERT(offsetof(typeof(*ocd), padding6) != 0);
 }
 
 void lustre_swab_obdo (struct obdo  *o)