MODULES := fid
-fid-objs := fid_handler.o fid_lib.o
+fid-objs := fid_handler.o fid_request.o fid_lib.o
@INCLUDE_RULES@
if LIBLUSTRE
noinst_LIBRARIES = libfid.a
-libfid_a_SOURCES = fid_handler.c fid_lib.c fid_internal.h
+libfid_a_SOURCES = fid_handler.c fid_request.c fid_lib.c fid_internal.h
libfid_a_CPPFLAGS = $(LLCPPFLAGS)
libfid_a_CFLAGS = $(LLCFLAGS)
endif
#include <lustre_fid.h>
#include "fid_internal.h"
-/* client seq mgr interface */
-static int
-seq_client_rpc(struct lu_client_seq *seq,
- struct lu_range *range,
- __u32 opc)
-{
- int repsize = sizeof(struct lu_range);
- int rc, reqsize = sizeof(__u32);
- struct ptlrpc_request *req;
- struct lu_range *ran;
- __u32 *op;
- ENTRY;
-
- req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp),
- LUSTRE_MDS_VERSION, SEQ_QUERY,
- 1, &reqsize, NULL);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
- *op = opc;
-
- req->rq_replen = lustre_msg_size(1, &repsize);
- req->rq_request_portal = MDS_SEQ_PORTAL;
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out_req, rc);
-
- ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
- lustre_swab_lu_range);
-
- if (ran == NULL) {
- CERROR("invalid range is returned\n");
- GOTO(out_req, rc = -EPROTO);
- }
- *range = *ran;
- EXIT;
-out_req:
- ptlrpc_req_finished(req);
- return rc;
-}
-
-/* request sequence-controller node to allocate new super-sequence. */
-int
-seq_client_alloc_super(struct lu_client_seq *seq)
-{
- int rc;
- ENTRY;
-
- LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
- rc = seq_client_rpc(seq, &seq->seq_cl_range,
- SEQ_ALLOC_SUPER);
- if (rc == 0) {
- CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence "
- "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
- seq->seq_cl_range.lr_end);
- }
- RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_super);
-
-/* request sequence-controller node to allocate new meta-sequence. */
-int
-seq_client_alloc_meta(struct lu_client_seq *seq)
-{
- int rc;
- ENTRY;
-
- LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
- rc = seq_client_rpc(seq, &seq->seq_cl_range,
- SEQ_ALLOC_META);
- if (rc == 0) {
- CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence "
- "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
- seq->seq_cl_range.lr_end);
- }
- RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_meta);
-
-/* allocate new sequence for client (llite or MDC are expected to use this) */
-int
-seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
-{
- int rc;
- ENTRY;
-
- down(&seq->seq_sem);
-
- LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
- LASSERT(range_is_sane(&seq->seq_cl_range));
-
- /* if we still have free sequences in meta-sequence we allocate new seq
- * from given range. */
- if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
- *seqnr = seq->seq_cl_range.lr_start;
- seq->seq_cl_range.lr_start += 1;
- rc = 0;
- } else {
- /* meta-sequence is exhausted, request MDT to allocate new
- * meta-sequence for us. */
- rc = seq_client_alloc_meta(seq);
- if (rc) {
- CERROR("can't allocate new meta-sequence, "
- "rc %d\n", rc);
- }
-
- *seqnr = seq->seq_cl_range.lr_start;
- seq->seq_cl_range.lr_start += 1;
- }
- up(&seq->seq_sem);
-
- if (rc == 0) {
- CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence "
- "["LPX64"]\n", *seqnr);
- }
- RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_seq);
-
-int
-seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
-{
- int rc;
- ENTRY;
-
- LASSERT(fid != NULL);
- LASSERT(fid_is_sane(&seq->seq_fid));
- LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
-
- down(&seq->seq_sem);
- if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
- *fid = seq->seq_fid;
- seq->seq_fid.f_oid += 1;
- rc = 0;
- } else {
- __u64 seqnr = 0;
-
- rc = seq_client_alloc_seq(seq, &seqnr);
- if (rc) {
- CERROR("can't allocate new sequence, "
- "rc %d\n", rc);
- GOTO(out, rc);
- } else {
- seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
- seq->seq_fid.f_seq = seqnr;
- seq->seq_fid.f_ver = 0;
-
- *fid = seq->seq_fid;
- seq->seq_fid.f_oid += 1;
- rc = -ERESTART;
- }
- }
- LASSERT(fid_is_sane(fid));
-
- CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
- PFID3(fid));
-
- EXIT;
-out:
- up(&seq->seq_sem);
- return rc;
-}
-EXPORT_SYMBOL(seq_client_alloc_fid);
-
-int
-seq_client_init(struct lu_client_seq *seq,
- struct obd_export *exp,
- int flags)
-{
- int rc;
- ENTRY;
-
- LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
- LUSTRE_CLI_SEQ_SERVER));
-
- seq->seq_flags = flags;
- fid_zero(&seq->seq_fid);
- sema_init(&seq->seq_sem, 1);
-
- seq->seq_cl_range.lr_end = 0;
- seq->seq_cl_range.lr_start = 0;
-
- if (exp != NULL)
- seq->seq_exp = class_export_get(exp);
-
- if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
- __u64 seqnr = 0;
-
- /* client (llite or MDC) init case, we need new sequence from
- * MDT. This will allocate new meta-sequemce first, because seq
- * range in init state and looks the same as exhausted. */
- rc = seq_client_alloc_seq(seq, &seqnr);
- if (rc) {
- CERROR("can't allocate new sequence, rc %d\n", rc);
- GOTO(out, rc);
- } else {
- seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
- seq->seq_fid.f_seq = seqnr;
- seq->seq_fid.f_ver = 0;
- }
-
- LASSERT(fid_is_sane(&seq->seq_fid));
- } else {
- /* check if this is controller node is trying to init client. */
- if (seq->seq_exp) {
- /* MDT uses client seq manager to talk to sequence
- * controller, and thus, we need super-sequence. */
- rc = seq_client_alloc_super(seq);
- } else {
- rc = 0;
- }
- }
-
- EXIT;
-out:
- if (rc)
- seq_client_fini(seq);
- else
- CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager initialized\n");
- return rc;
-}
-EXPORT_SYMBOL(seq_client_init);
-
-void seq_client_fini(struct lu_client_seq *seq)
-{
- ENTRY;
- if (seq->seq_exp != NULL) {
- class_export_put(seq->seq_exp);
- seq->seq_exp = NULL;
- }
- CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager finalized\n");
- EXIT;
-}
-EXPORT_SYMBOL(seq_client_fini);
-
#ifdef __KERNEL__
/* server side seq mgr stuff */
static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * lustre/fid/fid_request.c
+ * Lustre Sequence Manager
+ *
+ * Copyright (c) 2006 Cluster File Systems, Inc.
+ * Author: Yury Umanets <umka@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FID
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <dt_object.h>
+#include <md_object.h>
+#include <obd_support.h>
+#include <lustre_req_layout.h>
+#include <lustre_fid.h>
+#include "fid_internal.h"
+
+/* client seq mgr interface */
+static int
+seq_client_rpc(struct lu_client_seq *seq,
+ struct lu_range *range,
+ __u32 opc)
+{
+ int repsize = sizeof(struct lu_range);
+ int rc, reqsize = sizeof(__u32);
+ struct ptlrpc_request *req;
+ struct lu_range *ran;
+ __u32 *op;
+ ENTRY;
+
+ req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp),
+ LUSTRE_MDS_VERSION, SEQ_QUERY,
+ 1, &reqsize, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
+ *op = opc;
+
+ req->rq_replen = lustre_msg_size(1, &repsize);
+ req->rq_request_portal = MDS_SEQ_PORTAL;
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out_req, rc);
+
+ ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
+ lustre_swab_lu_range);
+
+ if (ran == NULL) {
+ CERROR("invalid range is returned\n");
+ GOTO(out_req, rc = -EPROTO);
+ }
+ *range = *ran;
+ EXIT;
+out_req:
+ ptlrpc_req_finished(req);
+ return rc;
+}
+
+/* request sequence-controller node to allocate new super-sequence. */
+int
+seq_client_alloc_super(struct lu_client_seq *seq)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
+ rc = seq_client_rpc(seq, &seq->seq_cl_range,
+ SEQ_ALLOC_SUPER);
+ if (rc == 0) {
+ CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence "
+ "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+ seq->seq_cl_range.lr_end);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_super);
+
+/* request sequence-controller node to allocate new meta-sequence. */
+int
+seq_client_alloc_meta(struct lu_client_seq *seq)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+ rc = seq_client_rpc(seq, &seq->seq_cl_range,
+ SEQ_ALLOC_META);
+ if (rc == 0) {
+ CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence "
+ "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+ seq->seq_cl_range.lr_end);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_meta);
+
+/* allocate new sequence for client (llite or MDC are expected to use this) */
+int
+seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+ int rc;
+ ENTRY;
+
+ down(&seq->seq_sem);
+
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+ LASSERT(range_is_sane(&seq->seq_cl_range));
+
+ /* if we still have free sequences in meta-sequence we allocate new seq
+ * from given range. */
+ if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
+ *seqnr = seq->seq_cl_range.lr_start;
+ seq->seq_cl_range.lr_start += 1;
+ rc = 0;
+ } else {
+ /* meta-sequence is exhausted, request MDT to allocate new
+ * meta-sequence for us. */
+ rc = seq_client_alloc_meta(seq);
+ if (rc) {
+ CERROR("can't allocate new meta-sequence, "
+ "rc %d\n", rc);
+ }
+
+ *seqnr = seq->seq_cl_range.lr_start;
+ seq->seq_cl_range.lr_start += 1;
+ }
+ up(&seq->seq_sem);
+
+ if (rc == 0) {
+ CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence "
+ "["LPX64"]\n", *seqnr);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_seq);
+
+int
+seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(fid != NULL);
+ LASSERT(fid_is_sane(&seq->seq_fid));
+ LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+
+ down(&seq->seq_sem);
+ if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
+ *fid = seq->seq_fid;
+ seq->seq_fid.f_oid += 1;
+ rc = 0;
+ } else {
+ __u64 seqnr = 0;
+
+ rc = seq_client_alloc_seq(seq, &seqnr);
+ if (rc) {
+ CERROR("can't allocate new sequence, "
+ "rc %d\n", rc);
+ GOTO(out, rc);
+ } else {
+ seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+ seq->seq_fid.f_seq = seqnr;
+ seq->seq_fid.f_ver = 0;
+
+ *fid = seq->seq_fid;
+ seq->seq_fid.f_oid += 1;
+ rc = -ERESTART;
+ }
+ }
+ LASSERT(fid_is_sane(fid));
+
+ CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
+ PFID3(fid));
+
+ EXIT;
+out:
+ up(&seq->seq_sem);
+ return rc;
+}
+EXPORT_SYMBOL(seq_client_alloc_fid);
+
+int
+seq_client_init(struct lu_client_seq *seq,
+ struct obd_export *exp,
+ int flags)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
+ LUSTRE_CLI_SEQ_SERVER));
+
+ seq->seq_flags = flags;
+ fid_zero(&seq->seq_fid);
+ sema_init(&seq->seq_sem, 1);
+
+ seq->seq_cl_range.lr_end = 0;
+ seq->seq_cl_range.lr_start = 0;
+
+ if (exp != NULL)
+ seq->seq_exp = class_export_get(exp);
+
+ if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
+ __u64 seqnr = 0;
+
+ /* client (llite or MDC) init case, we need new sequence from
+ * MDT. This will allocate new meta-sequemce first, because seq
+ * range in init state and looks the same as exhausted. */
+ rc = seq_client_alloc_seq(seq, &seqnr);
+ if (rc) {
+ CERROR("can't allocate new sequence, rc %d\n", rc);
+ GOTO(out, rc);
+ } else {
+ seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+ seq->seq_fid.f_seq = seqnr;
+ seq->seq_fid.f_ver = 0;
+ }
+
+ LASSERT(fid_is_sane(&seq->seq_fid));
+ } else {
+ /* check if this is controller node is trying to init client. */
+ if (seq->seq_exp) {
+ /* MDT uses client seq manager to talk to sequence
+ * controller, and thus, we need super-sequence. */
+ rc = seq_client_alloc_super(seq);
+ } else {
+ rc = 0;
+ }
+ }
+
+ EXIT;
+out:
+ if (rc)
+ seq_client_fini(seq);
+ else
+ CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager initialized\n");
+ return rc;
+}
+EXPORT_SYMBOL(seq_client_init);
+
+void seq_client_fini(struct lu_client_seq *seq)
+{
+ ENTRY;
+ if (seq->seq_exp != NULL) {
+ class_export_put(seq->seq_exp);
+ seq->seq_exp = NULL;
+ }
+ CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager finalized\n");
+ EXIT;
+}
+EXPORT_SYMBOL(seq_client_fini);
MODULES := fld
-fld-objs := fld_handler.o fld_iam.o
+fld-objs := fld_handler.o fld_request.o fld_iam.o
EXTRA_PRE_CFLAGS := -I@LUSTRE@ -I@LUSTRE@/ldiskfs
if LIBLUSTRE
noinst_LIBRARIES = libfld.a
-libfld_a_SOURCES = fld_handler.c fld_iam.c fld_internal.h
+libfld_a_SOURCES = fld_handler.c fld_request.c fld_iam.c fld_internal.h
libfld_a_CPPFLAGS = $(LLCPPFLAGS)
libfld_a_CFLAGS = $(LLCFLAGS)
endif
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* lustre/fld/fld_handler.c
+ * FLD (Fids Location Database)
*
* Copyright (C) 2006 Cluster File Systems, Inc.
* Author: WangDi <wangdi@clusterfs.com>
#ifdef __KERNEL__
struct fld_cache_info *fld_cache = NULL;
-enum {
- FLD_HTABLE_BITS = 8,
- FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
- FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
-};
-
-static __u32 fld_cache_hash(__u64 seq)
-{
- return seq;
-}
-
-static int
-fld_cache_insert(struct fld_cache_info *fld_cache,
- __u64 seq, __u64 mds)
-{
- struct fld_cache *fld;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- int rc = 0;
- ENTRY;
-
- bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
- fld_cache->fld_hash_mask);
-
- OBD_ALLOC_PTR(fld);
- if (!fld)
- RETURN(-ENOMEM);
-
- INIT_HLIST_NODE(&fld->fld_list);
- fld->fld_mds = mds;
- fld->fld_seq = seq;
-
- spin_lock(&fld_cache->fld_lock);
- hlist_for_each_entry(fld, scan, bucket, fld_list) {
- if (fld->fld_seq == seq)
- GOTO(exit_unlock, rc = -EEXIST);
- }
- hlist_add_head(&fld->fld_list, bucket);
- EXIT;
-exit_unlock:
- spin_unlock(&fld_cache->fld_lock);
- if (rc != 0)
- OBD_FREE(fld, sizeof(*fld));
- return rc;
-}
-
-static struct fld_cache *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
-{
- struct hlist_head *bucket;
- struct hlist_node *scan;
- struct fld_cache *fld;
- ENTRY;
-
- bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
- fld_cache->fld_hash_mask);
-
- spin_lock(&fld_cache->fld_lock);
- hlist_for_each_entry(fld, scan, bucket, fld_list) {
- if (fld->fld_seq == seq) {
- spin_unlock(&fld_cache->fld_lock);
- RETURN(fld);
- }
- }
- spin_unlock(&fld_cache->fld_lock);
-
- RETURN(NULL);
-}
-
-static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
-{
- struct hlist_head *bucket;
- struct hlist_node *scan;
- struct fld_cache *fld;
- ENTRY;
-
- bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
- fld_cache->fld_hash_mask);
-
- spin_lock(&fld_cache->fld_lock);
- hlist_for_each_entry(fld, scan, bucket, fld_list) {
- if (fld->fld_seq == seq) {
- hlist_del_init(&fld->fld_list);
- GOTO(out_unlock, 0);
- }
- }
-
- EXIT;
-out_unlock:
- spin_unlock(&fld_cache->fld_lock);
- return;
-}
-#endif
-
-static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
-{
- if (fld->fld_count == 0)
- return 0;
-
- return do_div(seq, fld->fld_count);
-}
-
-static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
-{
- /* XXX: here should DHT hash */
- return fld_rrb_hash(fld, seq);
-}
-
-static struct lu_fld_hash fld_hash[3] = {
- {
- .fh_name = "DHT",
- .fh_func = fld_dht_hash
- },
- {
- .fh_name = "Round Robin",
- .fh_func = fld_rrb_hash
- },
- {
- 0,
- }
-};
-
-static struct obd_export *
-fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
-{
- struct obd_export *fld_exp;
- int count = 0, hash;
- ENTRY;
-
- LASSERT(fld->fld_hash != NULL);
- hash = fld->fld_hash->fh_func(fld, seq);
-
- spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp,
- &fld->fld_exports, exp_fld_chain) {
- if (count == hash) {
- spin_unlock(&fld->fld_lock);
- RETURN(fld_exp);
- }
- count++;
- }
- spin_unlock(&fld->fld_lock);
- RETURN(NULL);
-}
-
-/* add export to FLD. This is usually done by CMM and LMV as they are main users
- * of FLD module. */
-int fld_client_add_export(struct lu_client_fld *fld,
- struct obd_export *exp)
-{
- struct obd_export *fld_exp;
- ENTRY;
-
- LASSERT(exp != NULL);
-
- CWARN("adding export %s\n", exp->exp_client_uuid.uuid);
-
- spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
- if (obd_uuid_equals(&fld_exp->exp_client_uuid,
- &exp->exp_client_uuid))
- {
- spin_unlock(&fld->fld_lock);
- RETURN(-EEXIST);
- }
- }
-
- fld_exp = class_export_get(exp);
- list_add_tail(&fld_exp->exp_fld_chain,
- &fld->fld_exports);
- fld->fld_count++;
-
- spin_unlock(&fld->fld_lock);
-
- RETURN(0);
-}
-EXPORT_SYMBOL(fld_client_add_export);
-
-/* remove export from FLD */
-int fld_client_del_export(struct lu_client_fld *fld,
- struct obd_export *exp)
-{
- struct obd_export *fld_exp;
- struct obd_export *tmp;
- ENTRY;
-
- spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
- if (obd_uuid_equals(&fld_exp->exp_client_uuid,
- &exp->exp_client_uuid))
- {
- fld->fld_count--;
- list_del(&fld_exp->exp_fld_chain);
- class_export_get(fld_exp);
-
- spin_unlock(&fld->fld_lock);
- RETURN(0);
- }
- }
- spin_unlock(&fld->fld_lock);
- RETURN(-ENOENT);
-}
-EXPORT_SYMBOL(fld_client_del_export);
-
-int fld_client_init(struct lu_client_fld *fld, int hash)
-{
- int rc = 0;
- ENTRY;
-
- LASSERT(fld != NULL);
-
- if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
- CERROR("wrong hash function 0x%x\n", hash);
- RETURN(-EINVAL);
- }
-
- INIT_LIST_HEAD(&fld->fld_exports);
- spin_lock_init(&fld->fld_lock);
- fld->fld_hash = &fld_hash[hash];
- fld->fld_count = 0;
-
- CDEBUG(D_INFO, "Client FLD initialized, using %s\n",
- fld->fld_hash->fh_name);
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_init);
-
-void fld_client_fini(struct lu_client_fld *fld)
-{
- struct obd_export *fld_exp;
- struct obd_export *tmp;
- ENTRY;
-
- spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp,
- &fld->fld_exports, exp_fld_chain) {
- fld->fld_count--;
- list_del(&fld_exp->exp_fld_chain);
- class_export_get(fld_exp);
- }
- spin_unlock(&fld->fld_lock);
- CDEBUG(D_INFO, "Client FLD finalized\n");
- EXIT;
-}
-EXPORT_SYMBOL(fld_client_fini);
-
-static int
-fld_client_rpc(struct obd_export *exp,
- struct md_fld *mf, __u32 fld_op)
-{
- int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
- int mf_size = sizeof(struct md_fld);
- struct ptlrpc_request *req;
- struct md_fld *pmf;
- __u32 *op;
- ENTRY;
-
- LASSERT(exp != NULL);
-
- req = ptlrpc_prep_req(class_exp2cliimp(exp),
- LUSTRE_MDS_VERSION, FLD_QUERY,
- 2, size, NULL);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
- *op = fld_op;
-
- pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
- memcpy(pmf, mf, sizeof(*mf));
-
- req->rq_replen = lustre_msg_size(1, &mf_size);
- req->rq_request_portal = MDS_FLD_PORTAL;
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out_req, rc);
-
- pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
- lustre_swab_md_fld);
- *mf = *pmf;
-out_req:
- ptlrpc_req_finished(req);
- RETURN(rc);
-}
-
-int
-fld_client_create(struct lu_client_fld *fld,
- __u64 seq, __u64 mds)
-{
- struct obd_export *fld_exp;
- struct md_fld md_fld;
- __u32 rc;
- ENTRY;
-
- fld_exp = fld_client_get_export(fld, seq);
- if (!fld_exp)
- RETURN(-EINVAL);
- md_fld.mf_seq = seq;
- md_fld.mf_mds = mds;
-
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
-#ifdef __KERNEL__
- fld_cache_insert(fld_cache, seq, mds);
-#endif
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_create);
-
-int
-fld_client_delete(struct lu_client_fld *fld,
- __u64 seq, __u64 mds)
-{
- struct obd_export *fld_exp;
- struct md_fld md_fld;
- __u32 rc;
-
-#ifdef __KERNEL__
- fld_cache_delete(fld_cache, seq);
-#endif
-
- fld_exp = fld_client_get_export(fld, seq);
- if (!fld_exp)
- RETURN(-EINVAL);
-
- md_fld.mf_seq = seq;
- md_fld.mf_mds = mds;
-
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_delete);
-
-static int
-fld_client_get(struct lu_client_fld *fld,
- __u64 seq, __u64 *mds)
-{
- struct obd_export *fld_exp;
- struct md_fld md_fld;
- int rc;
- ENTRY;
-
- fld_exp = fld_client_get_export(fld, seq);
- if (!fld_exp)
- RETURN(-EINVAL);
-
- md_fld.mf_seq = seq;
- rc = fld_client_rpc(fld_exp,
- &md_fld, FLD_LOOKUP);
- if (rc == 0)
- *mds = md_fld.mf_mds;
-
- RETURN(rc);
-}
-
-/* lookup fid in the namespace of pfid according to the name */
-int
-fld_client_lookup(struct lu_client_fld *fld,
- __u64 seq, __u64 *mds)
-{
-#ifdef __KERNEL__
- struct fld_cache *fld_entry;
-#endif
- int rc;
- ENTRY;
-
-#ifdef __KERNEL__
- /* lookup it in the cache */
- fld_entry = fld_cache_lookup(fld_cache, seq);
- if (fld_entry != NULL) {
- *mds = fld_entry->fld_mds;
- RETURN(0);
- }
-#endif
-
- /* can not find it in the cache */
- rc = fld_client_get(fld, seq, mds);
- if (rc)
- RETURN(rc);
-
-#ifdef __KERNEL__
- rc = fld_cache_insert(fld_cache, seq, *mds);
-#endif
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_lookup);
-
-#ifdef __KERNEL__
static int fld_init(void)
{
int i;
for (i = 0; i < FLD_HTABLE_SIZE; i++)
INIT_HLIST_HEAD(&fld_cache->fld_hash[i]);
- CDEBUG(D_INFO, "Client FLD, cache size %d\n",
+ CDEBUG(D_INFO|D_WARNING, "Client FLD, cache size %d\n",
FLD_HTABLE_SIZE);
RETURN(0);
return rc;
}
-
static int fld_req_handle(struct ptlrpc_request *req)
{
int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
FLD_LOOKUP = 2
};
+enum {
+ FLD_HTABLE_BITS = 8,
+ FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
+ FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
+};
+
#define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
int fld_handle_insert(struct lu_server_fld *fld,
--- /dev/null
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * lustre/fld/fld_handler.c
+ * FLD (Fids Location Database)
+ *
+ * Copyright (C) 2006 Cluster File Systems, Inc.
+ * Author: Yury Umanets <umka@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FLD
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+# include <linux/jbd.h>
+# include <asm/div64.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+# include <libcfs/list.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <lustre_ver.h>
+#include <obd_support.h>
+#include <lprocfs_status.h>
+
+#include <dt_object.h>
+#include <md_object.h>
+#include <lustre_req_layout.h>
+#include <lustre_fld.h>
+#include "fld_internal.h"
+
+#ifdef __KERNEL__
+extern struct fld_cache_info *fld_cache;
+
+static __u32 fld_cache_hash(__u64 seq)
+{
+ return seq;
+}
+
+static int
+fld_cache_insert(struct fld_cache_info *fld_cache,
+ __u64 seq, __u64 mds)
+{
+ struct fld_cache *fld;
+ struct hlist_head *bucket;
+ struct hlist_node *scan;
+ int rc = 0;
+ ENTRY;
+
+ bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
+ fld_cache->fld_hash_mask);
+
+ OBD_ALLOC_PTR(fld);
+ if (!fld)
+ RETURN(-ENOMEM);
+
+ INIT_HLIST_NODE(&fld->fld_list);
+ fld->fld_mds = mds;
+ fld->fld_seq = seq;
+
+ spin_lock(&fld_cache->fld_lock);
+ hlist_for_each_entry(fld, scan, bucket, fld_list) {
+ if (fld->fld_seq == seq)
+ GOTO(exit_unlock, rc = -EEXIST);
+ }
+ hlist_add_head(&fld->fld_list, bucket);
+ EXIT;
+exit_unlock:
+ spin_unlock(&fld_cache->fld_lock);
+ if (rc != 0)
+ OBD_FREE(fld, sizeof(*fld));
+ return rc;
+}
+
+static struct fld_cache *
+fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
+{
+ struct hlist_head *bucket;
+ struct hlist_node *scan;
+ struct fld_cache *fld;
+ ENTRY;
+
+ bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
+ fld_cache->fld_hash_mask);
+
+ spin_lock(&fld_cache->fld_lock);
+ hlist_for_each_entry(fld, scan, bucket, fld_list) {
+ if (fld->fld_seq == seq) {
+ spin_unlock(&fld_cache->fld_lock);
+ RETURN(fld);
+ }
+ }
+ spin_unlock(&fld_cache->fld_lock);
+
+ RETURN(NULL);
+}
+
+static void
+fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
+{
+ struct hlist_head *bucket;
+ struct hlist_node *scan;
+ struct fld_cache *fld;
+ ENTRY;
+
+ bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
+ fld_cache->fld_hash_mask);
+
+ spin_lock(&fld_cache->fld_lock);
+ hlist_for_each_entry(fld, scan, bucket, fld_list) {
+ if (fld->fld_seq == seq) {
+ hlist_del_init(&fld->fld_list);
+ GOTO(out_unlock, 0);
+ }
+ }
+
+ EXIT;
+out_unlock:
+ spin_unlock(&fld_cache->fld_lock);
+ return;
+}
+#endif
+
+static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
+{
+ if (fld->fld_count == 0)
+ return 0;
+
+ return do_div(seq, fld->fld_count);
+}
+
+static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
+{
+ /* XXX: here should DHT hash */
+ return fld_rrb_hash(fld, seq);
+}
+
+static struct lu_fld_hash fld_hash[3] = {
+ {
+ .fh_name = "DHT",
+ .fh_func = fld_dht_hash
+ },
+ {
+ .fh_name = "Round Robin",
+ .fh_func = fld_rrb_hash
+ },
+ {
+ 0,
+ }
+};
+
+static struct obd_export *
+fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
+{
+ struct obd_export *fld_exp;
+ int count = 0, hash;
+ ENTRY;
+
+ LASSERT(fld->fld_hash != NULL);
+ hash = fld->fld_hash->fh_func(fld, seq);
+
+ spin_lock(&fld->fld_lock);
+ list_for_each_entry(fld_exp,
+ &fld->fld_exports, exp_fld_chain) {
+ if (count == hash) {
+ spin_unlock(&fld->fld_lock);
+ RETURN(fld_exp);
+ }
+ count++;
+ }
+ spin_unlock(&fld->fld_lock);
+ RETURN(NULL);
+}
+
+/* add export to FLD. This is usually done by CMM and LMV as they are main users
+ * of FLD module. */
+int fld_client_add_export(struct lu_client_fld *fld,
+ struct obd_export *exp)
+{
+ struct obd_export *fld_exp;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+
+ CWARN("adding export %s\n", exp->exp_client_uuid.uuid);
+
+ spin_lock(&fld->fld_lock);
+ list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
+ if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+ &exp->exp_client_uuid))
+ {
+ spin_unlock(&fld->fld_lock);
+ RETURN(-EEXIST);
+ }
+ }
+
+ fld_exp = class_export_get(exp);
+ list_add_tail(&fld_exp->exp_fld_chain,
+ &fld->fld_exports);
+ fld->fld_count++;
+
+ spin_unlock(&fld->fld_lock);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(fld_client_add_export);
+
+/* remove export from FLD */
+int fld_client_del_export(struct lu_client_fld *fld,
+ struct obd_export *exp)
+{
+ struct obd_export *fld_exp;
+ struct obd_export *tmp;
+ ENTRY;
+
+ spin_lock(&fld->fld_lock);
+ list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
+ if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+ &exp->exp_client_uuid))
+ {
+ fld->fld_count--;
+ list_del(&fld_exp->exp_fld_chain);
+ class_export_get(fld_exp);
+
+ spin_unlock(&fld->fld_lock);
+ RETURN(0);
+ }
+ }
+ spin_unlock(&fld->fld_lock);
+ RETURN(-ENOENT);
+}
+EXPORT_SYMBOL(fld_client_del_export);
+
+int fld_client_init(struct lu_client_fld *fld, int hash)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(fld != NULL);
+
+ if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
+ CERROR("wrong hash function 0x%x\n", hash);
+ RETURN(-EINVAL);
+ }
+
+ INIT_LIST_HEAD(&fld->fld_exports);
+ spin_lock_init(&fld->fld_lock);
+ fld->fld_hash = &fld_hash[hash];
+ fld->fld_count = 0;
+
+ CDEBUG(D_INFO, "Client FLD initialized, using %s\n",
+ fld->fld_hash->fh_name);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_init);
+
+void fld_client_fini(struct lu_client_fld *fld)
+{
+ struct obd_export *fld_exp;
+ struct obd_export *tmp;
+ ENTRY;
+
+ spin_lock(&fld->fld_lock);
+ list_for_each_entry_safe(fld_exp, tmp,
+ &fld->fld_exports, exp_fld_chain) {
+ fld->fld_count--;
+ list_del(&fld_exp->exp_fld_chain);
+ class_export_get(fld_exp);
+ }
+ spin_unlock(&fld->fld_lock);
+ CDEBUG(D_INFO, "Client FLD finalized\n");
+ EXIT;
+}
+EXPORT_SYMBOL(fld_client_fini);
+
+static int
+fld_client_rpc(struct obd_export *exp,
+ struct md_fld *mf, __u32 fld_op)
+{
+ int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
+ int mf_size = sizeof(struct md_fld);
+ struct ptlrpc_request *req;
+ struct md_fld *pmf;
+ __u32 *op;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+
+ req = ptlrpc_prep_req(class_exp2cliimp(exp),
+ LUSTRE_MDS_VERSION, FLD_QUERY,
+ 2, size, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+ *op = fld_op;
+
+ pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
+ memcpy(pmf, mf, sizeof(*mf));
+
+ req->rq_replen = lustre_msg_size(1, &mf_size);
+ req->rq_request_portal = MDS_FLD_PORTAL;
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out_req, rc);
+
+ pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
+ lustre_swab_md_fld);
+ *mf = *pmf;
+out_req:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
+
+int
+fld_client_create(struct lu_client_fld *fld,
+ __u64 seq, __u64 mds)
+{
+ struct obd_export *fld_exp;
+ struct md_fld md_fld;
+ __u32 rc;
+ ENTRY;
+
+ fld_exp = fld_client_get_export(fld, seq);
+ if (!fld_exp)
+ RETURN(-EINVAL);
+ md_fld.mf_seq = seq;
+ md_fld.mf_mds = mds;
+
+ rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+ fld_cache_insert(fld_cache, seq, mds);
+#endif
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_create);
+
+int
+fld_client_delete(struct lu_client_fld *fld,
+ __u64 seq, __u64 mds)
+{
+ struct obd_export *fld_exp;
+ struct md_fld md_fld;
+ __u32 rc;
+
+#ifdef __KERNEL__
+ fld_cache_delete(fld_cache, seq);
+#endif
+
+ fld_exp = fld_client_get_export(fld, seq);
+ if (!fld_exp)
+ RETURN(-EINVAL);
+
+ md_fld.mf_seq = seq;
+ md_fld.mf_mds = mds;
+
+ rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_delete);
+
+static int
+fld_client_get(struct lu_client_fld *fld,
+ __u64 seq, __u64 *mds)
+{
+ struct obd_export *fld_exp;
+ struct md_fld md_fld;
+ int rc;
+ ENTRY;
+
+ fld_exp = fld_client_get_export(fld, seq);
+ if (!fld_exp)
+ RETURN(-EINVAL);
+
+ md_fld.mf_seq = seq;
+ rc = fld_client_rpc(fld_exp,
+ &md_fld, FLD_LOOKUP);
+ if (rc == 0)
+ *mds = md_fld.mf_mds;
+
+ RETURN(rc);
+}
+
+/* lookup fid in the namespace of pfid according to the name */
+int
+fld_client_lookup(struct lu_client_fld *fld,
+ __u64 seq, __u64 *mds)
+{
+#ifdef __KERNEL__
+ struct fld_cache *fld_entry;
+#endif
+ int rc;
+ ENTRY;
+
+#ifdef __KERNEL__
+ /* lookup it in the cache */
+ fld_entry = fld_cache_lookup(fld_cache, seq);
+ if (fld_entry != NULL) {
+ *mds = fld_entry->fld_mds;
+ RETURN(0);
+ }
+#endif
+
+ /* can not find it in the cache */
+ rc = fld_client_get(fld, seq, mds);
+ if (rc)
+ RETURN(rc);
+
+#ifdef __KERNEL__
+ rc = fld_cache_insert(fld_cache, seq, *mds);
+#endif
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_lookup);