From: yury Date: Sat, 17 Jun 2006 09:23:15 +0000 (+0000) Subject: - a little reorganization in fid and fld sources, client part is moved to fid_request... X-Git-Tag: v1_8_0_110~486^2~1618 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=d3fe16d7d3277538ab4105500af72c608978c36f;p=fs%2Flustre-release.git - a little reorganization in fid and fld sources, client part is moved to fid_request.c and fld_request.c respectively. --- diff --git a/lustre/fid/Makefile.in b/lustre/fid/Makefile.in index 857660d..cb15fe8 100644 --- a/lustre/fid/Makefile.in +++ b/lustre/fid/Makefile.in @@ -1,4 +1,4 @@ MODULES := fid -fid-objs := fid_handler.o fid_lib.o +fid-objs := fid_handler.o fid_request.o fid_lib.o @INCLUDE_RULES@ diff --git a/lustre/fid/autoMakefile.am b/lustre/fid/autoMakefile.am index acce2f4..0c8c5a1 100644 --- a/lustre/fid/autoMakefile.am +++ b/lustre/fid/autoMakefile.am @@ -5,7 +5,7 @@ if LIBLUSTRE noinst_LIBRARIES = libfid.a -libfid_a_SOURCES = fid_handler.c fid_lib.c fid_internal.h +libfid_a_SOURCES = fid_handler.c fid_request.c fid_lib.c fid_internal.h libfid_a_CPPFLAGS = $(LLCPPFLAGS) libfid_a_CFLAGS = $(LLCFLAGS) endif diff --git a/lustre/fid/fid_handler.c b/lustre/fid/fid_handler.c index b7b0eac..2f1ceb0 100644 --- a/lustre/fid/fid_handler.c +++ b/lustre/fid/fid_handler.c @@ -47,242 +47,6 @@ #include #include "fid_internal.h" -/* client seq mgr interface */ -static int -seq_client_rpc(struct lu_client_seq *seq, - struct lu_range *range, - __u32 opc) -{ - int repsize = sizeof(struct lu_range); - int rc, reqsize = sizeof(__u32); - struct ptlrpc_request *req; - struct lu_range *ran; - __u32 *op; - ENTRY; - - req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), - LUSTRE_MDS_VERSION, SEQ_QUERY, - 1, &reqsize, NULL); - if (req == NULL) - RETURN(-ENOMEM); - - op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op)); - *op = opc; - - req->rq_replen = lustre_msg_size(1, &repsize); - req->rq_request_portal = MDS_SEQ_PORTAL; - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out_req, rc); - - ran = lustre_swab_repbuf(req, 0, sizeof(*ran), - lustre_swab_lu_range); - - if (ran == NULL) { - CERROR("invalid range is returned\n"); - GOTO(out_req, rc = -EPROTO); - } - *range = *ran; - EXIT; -out_req: - ptlrpc_req_finished(req); - return rc; -} - -/* request sequence-controller node to allocate new super-sequence. */ -int -seq_client_alloc_super(struct lu_client_seq *seq) -{ - int rc; - ENTRY; - - LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER); - rc = seq_client_rpc(seq, &seq->seq_cl_range, - SEQ_ALLOC_SUPER); - if (rc == 0) { - CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence " - "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start, - seq->seq_cl_range.lr_end); - } - RETURN(rc); -} -EXPORT_SYMBOL(seq_client_alloc_super); - -/* request sequence-controller node to allocate new meta-sequence. */ -int -seq_client_alloc_meta(struct lu_client_seq *seq) -{ - int rc; - ENTRY; - - LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); - rc = seq_client_rpc(seq, &seq->seq_cl_range, - SEQ_ALLOC_META); - if (rc == 0) { - CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence " - "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start, - seq->seq_cl_range.lr_end); - } - RETURN(rc); -} -EXPORT_SYMBOL(seq_client_alloc_meta); - -/* allocate new sequence for client (llite or MDC are expected to use this) */ -int -seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr) -{ - int rc; - ENTRY; - - down(&seq->seq_sem); - - LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); - LASSERT(range_is_sane(&seq->seq_cl_range)); - - /* if we still have free sequences in meta-sequence we allocate new seq - * from given range. */ - if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) { - *seqnr = seq->seq_cl_range.lr_start; - seq->seq_cl_range.lr_start += 1; - rc = 0; - } else { - /* meta-sequence is exhausted, request MDT to allocate new - * meta-sequence for us. */ - rc = seq_client_alloc_meta(seq); - if (rc) { - CERROR("can't allocate new meta-sequence, " - "rc %d\n", rc); - } - - *seqnr = seq->seq_cl_range.lr_start; - seq->seq_cl_range.lr_start += 1; - } - up(&seq->seq_sem); - - if (rc == 0) { - CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence " - "["LPX64"]\n", *seqnr); - } - RETURN(rc); -} -EXPORT_SYMBOL(seq_client_alloc_seq); - -int -seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) -{ - int rc; - ENTRY; - - LASSERT(fid != NULL); - LASSERT(fid_is_sane(&seq->seq_fid)); - LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); - - down(&seq->seq_sem); - if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) { - *fid = seq->seq_fid; - seq->seq_fid.f_oid += 1; - rc = 0; - } else { - __u64 seqnr = 0; - - rc = seq_client_alloc_seq(seq, &seqnr); - if (rc) { - CERROR("can't allocate new sequence, " - "rc %d\n", rc); - GOTO(out, rc); - } else { - seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID; - seq->seq_fid.f_seq = seqnr; - seq->seq_fid.f_ver = 0; - - *fid = seq->seq_fid; - seq->seq_fid.f_oid += 1; - rc = -ERESTART; - } - } - LASSERT(fid_is_sane(fid)); - - CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n", - PFID3(fid)); - - EXIT; -out: - up(&seq->seq_sem); - return rc; -} -EXPORT_SYMBOL(seq_client_alloc_fid); - -int -seq_client_init(struct lu_client_seq *seq, - struct obd_export *exp, - int flags) -{ - int rc; - ENTRY; - - LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT | - LUSTRE_CLI_SEQ_SERVER)); - - seq->seq_flags = flags; - fid_zero(&seq->seq_fid); - sema_init(&seq->seq_sem, 1); - - seq->seq_cl_range.lr_end = 0; - seq->seq_cl_range.lr_start = 0; - - if (exp != NULL) - seq->seq_exp = class_export_get(exp); - - if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) { - __u64 seqnr = 0; - - /* client (llite or MDC) init case, we need new sequence from - * MDT. This will allocate new meta-sequemce first, because seq - * range in init state and looks the same as exhausted. */ - rc = seq_client_alloc_seq(seq, &seqnr); - if (rc) { - CERROR("can't allocate new sequence, rc %d\n", rc); - GOTO(out, rc); - } else { - seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID; - seq->seq_fid.f_seq = seqnr; - seq->seq_fid.f_ver = 0; - } - - LASSERT(fid_is_sane(&seq->seq_fid)); - } else { - /* check if this is controller node is trying to init client. */ - if (seq->seq_exp) { - /* MDT uses client seq manager to talk to sequence - * controller, and thus, we need super-sequence. */ - rc = seq_client_alloc_super(seq); - } else { - rc = 0; - } - } - - EXIT; -out: - if (rc) - seq_client_fini(seq); - else - CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager initialized\n"); - return rc; -} -EXPORT_SYMBOL(seq_client_init); - -void seq_client_fini(struct lu_client_seq *seq) -{ - ENTRY; - if (seq->seq_exp != NULL) { - class_export_put(seq->seq_exp); - seq->seq_exp = NULL; - } - CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager finalized\n"); - EXIT; -} -EXPORT_SYMBOL(seq_client_fini); - #ifdef __KERNEL__ /* server side seq mgr stuff */ static const struct lu_range LUSTRE_SEQ_SUPER_INIT = { diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c new file mode 100644 index 0000000..964c279 --- /dev/null +++ b/lustre/fid/fid_request.c @@ -0,0 +1,284 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/fid/fid_request.c + * Lustre Sequence Manager + * + * Copyright (c) 2006 Cluster File Systems, Inc. + * Author: Yury Umanets + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_FID + +#ifdef __KERNEL__ +# include +# include +#else /* __KERNEL__ */ +# include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include "fid_internal.h" + +/* client seq mgr interface */ +static int +seq_client_rpc(struct lu_client_seq *seq, + struct lu_range *range, + __u32 opc) +{ + int repsize = sizeof(struct lu_range); + int rc, reqsize = sizeof(__u32); + struct ptlrpc_request *req; + struct lu_range *ran; + __u32 *op; + ENTRY; + + req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), + LUSTRE_MDS_VERSION, SEQ_QUERY, + 1, &reqsize, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op)); + *op = opc; + + req->rq_replen = lustre_msg_size(1, &repsize); + req->rq_request_portal = MDS_SEQ_PORTAL; + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out_req, rc); + + ran = lustre_swab_repbuf(req, 0, sizeof(*ran), + lustre_swab_lu_range); + + if (ran == NULL) { + CERROR("invalid range is returned\n"); + GOTO(out_req, rc = -EPROTO); + } + *range = *ran; + EXIT; +out_req: + ptlrpc_req_finished(req); + return rc; +} + +/* request sequence-controller node to allocate new super-sequence. */ +int +seq_client_alloc_super(struct lu_client_seq *seq) +{ + int rc; + ENTRY; + + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER); + rc = seq_client_rpc(seq, &seq->seq_cl_range, + SEQ_ALLOC_SUPER); + if (rc == 0) { + CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence " + "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start, + seq->seq_cl_range.lr_end); + } + RETURN(rc); +} +EXPORT_SYMBOL(seq_client_alloc_super); + +/* request sequence-controller node to allocate new meta-sequence. */ +int +seq_client_alloc_meta(struct lu_client_seq *seq) +{ + int rc; + ENTRY; + + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); + rc = seq_client_rpc(seq, &seq->seq_cl_range, + SEQ_ALLOC_META); + if (rc == 0) { + CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence " + "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start, + seq->seq_cl_range.lr_end); + } + RETURN(rc); +} +EXPORT_SYMBOL(seq_client_alloc_meta); + +/* allocate new sequence for client (llite or MDC are expected to use this) */ +int +seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr) +{ + int rc; + ENTRY; + + down(&seq->seq_sem); + + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); + LASSERT(range_is_sane(&seq->seq_cl_range)); + + /* if we still have free sequences in meta-sequence we allocate new seq + * from given range. */ + if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) { + *seqnr = seq->seq_cl_range.lr_start; + seq->seq_cl_range.lr_start += 1; + rc = 0; + } else { + /* meta-sequence is exhausted, request MDT to allocate new + * meta-sequence for us. */ + rc = seq_client_alloc_meta(seq); + if (rc) { + CERROR("can't allocate new meta-sequence, " + "rc %d\n", rc); + } + + *seqnr = seq->seq_cl_range.lr_start; + seq->seq_cl_range.lr_start += 1; + } + up(&seq->seq_sem); + + if (rc == 0) { + CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence " + "["LPX64"]\n", *seqnr); + } + RETURN(rc); +} +EXPORT_SYMBOL(seq_client_alloc_seq); + +int +seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) +{ + int rc; + ENTRY; + + LASSERT(fid != NULL); + LASSERT(fid_is_sane(&seq->seq_fid)); + LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT); + + down(&seq->seq_sem); + if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) { + *fid = seq->seq_fid; + seq->seq_fid.f_oid += 1; + rc = 0; + } else { + __u64 seqnr = 0; + + rc = seq_client_alloc_seq(seq, &seqnr); + if (rc) { + CERROR("can't allocate new sequence, " + "rc %d\n", rc); + GOTO(out, rc); + } else { + seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID; + seq->seq_fid.f_seq = seqnr; + seq->seq_fid.f_ver = 0; + + *fid = seq->seq_fid; + seq->seq_fid.f_oid += 1; + rc = -ERESTART; + } + } + LASSERT(fid_is_sane(fid)); + + CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n", + PFID3(fid)); + + EXIT; +out: + up(&seq->seq_sem); + return rc; +} +EXPORT_SYMBOL(seq_client_alloc_fid); + +int +seq_client_init(struct lu_client_seq *seq, + struct obd_export *exp, + int flags) +{ + int rc; + ENTRY; + + LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT | + LUSTRE_CLI_SEQ_SERVER)); + + seq->seq_flags = flags; + fid_zero(&seq->seq_fid); + sema_init(&seq->seq_sem, 1); + + seq->seq_cl_range.lr_end = 0; + seq->seq_cl_range.lr_start = 0; + + if (exp != NULL) + seq->seq_exp = class_export_get(exp); + + if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) { + __u64 seqnr = 0; + + /* client (llite or MDC) init case, we need new sequence from + * MDT. This will allocate new meta-sequemce first, because seq + * range in init state and looks the same as exhausted. */ + rc = seq_client_alloc_seq(seq, &seqnr); + if (rc) { + CERROR("can't allocate new sequence, rc %d\n", rc); + GOTO(out, rc); + } else { + seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID; + seq->seq_fid.f_seq = seqnr; + seq->seq_fid.f_ver = 0; + } + + LASSERT(fid_is_sane(&seq->seq_fid)); + } else { + /* check if this is controller node is trying to init client. */ + if (seq->seq_exp) { + /* MDT uses client seq manager to talk to sequence + * controller, and thus, we need super-sequence. */ + rc = seq_client_alloc_super(seq); + } else { + rc = 0; + } + } + + EXIT; +out: + if (rc) + seq_client_fini(seq); + else + CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager initialized\n"); + return rc; +} +EXPORT_SYMBOL(seq_client_init); + +void seq_client_fini(struct lu_client_seq *seq) +{ + ENTRY; + if (seq->seq_exp != NULL) { + class_export_put(seq->seq_exp); + seq->seq_exp = NULL; + } + CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager finalized\n"); + EXIT; +} +EXPORT_SYMBOL(seq_client_fini); diff --git a/lustre/fld/Makefile.in b/lustre/fld/Makefile.in index a43b009..3cb2017 100644 --- a/lustre/fld/Makefile.in +++ b/lustre/fld/Makefile.in @@ -1,5 +1,5 @@ MODULES := fld -fld-objs := fld_handler.o fld_iam.o +fld-objs := fld_handler.o fld_request.o fld_iam.o EXTRA_PRE_CFLAGS := -I@LUSTRE@ -I@LUSTRE@/ldiskfs diff --git a/lustre/fld/autoMakefile.am b/lustre/fld/autoMakefile.am index cffe901..3c885be 100644 --- a/lustre/fld/autoMakefile.am +++ b/lustre/fld/autoMakefile.am @@ -5,7 +5,7 @@ if LIBLUSTRE noinst_LIBRARIES = libfld.a -libfld_a_SOURCES = fld_handler.c fld_iam.c fld_internal.h +libfld_a_SOURCES = fld_handler.c fld_request.c fld_iam.c fld_internal.h libfld_a_CPPFLAGS = $(LLCPPFLAGS) libfld_a_CFLAGS = $(LLCFLAGS) endif diff --git a/lustre/fld/fld_handler.c b/lustre/fld/fld_handler.c index a896b54..90438e7 100644 --- a/lustre/fld/fld_handler.c +++ b/lustre/fld/fld_handler.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * lustre/fld/fld_handler.c + * FLD (Fids Location Database) * * Copyright (C) 2006 Cluster File Systems, Inc. * Author: WangDi @@ -55,396 +56,6 @@ #ifdef __KERNEL__ struct fld_cache_info *fld_cache = NULL; -enum { - FLD_HTABLE_BITS = 8, - FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS), - FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1 -}; - -static __u32 fld_cache_hash(__u64 seq) -{ - return seq; -} - -static int -fld_cache_insert(struct fld_cache_info *fld_cache, - __u64 seq, __u64 mds) -{ - struct fld_cache *fld; - struct hlist_head *bucket; - struct hlist_node *scan; - int rc = 0; - ENTRY; - - bucket = fld_cache->fld_hash + (fld_cache_hash(seq) & - fld_cache->fld_hash_mask); - - OBD_ALLOC_PTR(fld); - if (!fld) - RETURN(-ENOMEM); - - INIT_HLIST_NODE(&fld->fld_list); - fld->fld_mds = mds; - fld->fld_seq = seq; - - spin_lock(&fld_cache->fld_lock); - hlist_for_each_entry(fld, scan, bucket, fld_list) { - if (fld->fld_seq == seq) - GOTO(exit_unlock, rc = -EEXIST); - } - hlist_add_head(&fld->fld_list, bucket); - EXIT; -exit_unlock: - spin_unlock(&fld_cache->fld_lock); - if (rc != 0) - OBD_FREE(fld, sizeof(*fld)); - return rc; -} - -static struct fld_cache * -fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq) -{ - struct hlist_head *bucket; - struct hlist_node *scan; - struct fld_cache *fld; - ENTRY; - - bucket = fld_cache->fld_hash + (fld_cache_hash(seq) & - fld_cache->fld_hash_mask); - - spin_lock(&fld_cache->fld_lock); - hlist_for_each_entry(fld, scan, bucket, fld_list) { - if (fld->fld_seq == seq) { - spin_unlock(&fld_cache->fld_lock); - RETURN(fld); - } - } - spin_unlock(&fld_cache->fld_lock); - - RETURN(NULL); -} - -static void -fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq) -{ - struct hlist_head *bucket; - struct hlist_node *scan; - struct fld_cache *fld; - ENTRY; - - bucket = fld_cache->fld_hash + (fld_cache_hash(seq) & - fld_cache->fld_hash_mask); - - spin_lock(&fld_cache->fld_lock); - hlist_for_each_entry(fld, scan, bucket, fld_list) { - if (fld->fld_seq == seq) { - hlist_del_init(&fld->fld_list); - GOTO(out_unlock, 0); - } - } - - EXIT; -out_unlock: - spin_unlock(&fld_cache->fld_lock); - return; -} -#endif - -static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq) -{ - if (fld->fld_count == 0) - return 0; - - return do_div(seq, fld->fld_count); -} - -static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq) -{ - /* XXX: here should DHT hash */ - return fld_rrb_hash(fld, seq); -} - -static struct lu_fld_hash fld_hash[3] = { - { - .fh_name = "DHT", - .fh_func = fld_dht_hash - }, - { - .fh_name = "Round Robin", - .fh_func = fld_rrb_hash - }, - { - 0, - } -}; - -static struct obd_export * -fld_client_get_export(struct lu_client_fld *fld, __u64 seq) -{ - struct obd_export *fld_exp; - int count = 0, hash; - ENTRY; - - LASSERT(fld->fld_hash != NULL); - hash = fld->fld_hash->fh_func(fld, seq); - - spin_lock(&fld->fld_lock); - list_for_each_entry(fld_exp, - &fld->fld_exports, exp_fld_chain) { - if (count == hash) { - spin_unlock(&fld->fld_lock); - RETURN(fld_exp); - } - count++; - } - spin_unlock(&fld->fld_lock); - RETURN(NULL); -} - -/* add export to FLD. This is usually done by CMM and LMV as they are main users - * of FLD module. */ -int fld_client_add_export(struct lu_client_fld *fld, - struct obd_export *exp) -{ - struct obd_export *fld_exp; - ENTRY; - - LASSERT(exp != NULL); - - CWARN("adding export %s\n", exp->exp_client_uuid.uuid); - - spin_lock(&fld->fld_lock); - list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) { - if (obd_uuid_equals(&fld_exp->exp_client_uuid, - &exp->exp_client_uuid)) - { - spin_unlock(&fld->fld_lock); - RETURN(-EEXIST); - } - } - - fld_exp = class_export_get(exp); - list_add_tail(&fld_exp->exp_fld_chain, - &fld->fld_exports); - fld->fld_count++; - - spin_unlock(&fld->fld_lock); - - RETURN(0); -} -EXPORT_SYMBOL(fld_client_add_export); - -/* remove export from FLD */ -int fld_client_del_export(struct lu_client_fld *fld, - struct obd_export *exp) -{ - struct obd_export *fld_exp; - struct obd_export *tmp; - ENTRY; - - spin_lock(&fld->fld_lock); - list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) { - if (obd_uuid_equals(&fld_exp->exp_client_uuid, - &exp->exp_client_uuid)) - { - fld->fld_count--; - list_del(&fld_exp->exp_fld_chain); - class_export_get(fld_exp); - - spin_unlock(&fld->fld_lock); - RETURN(0); - } - } - spin_unlock(&fld->fld_lock); - RETURN(-ENOENT); -} -EXPORT_SYMBOL(fld_client_del_export); - -int fld_client_init(struct lu_client_fld *fld, int hash) -{ - int rc = 0; - ENTRY; - - LASSERT(fld != NULL); - - if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) { - CERROR("wrong hash function 0x%x\n", hash); - RETURN(-EINVAL); - } - - INIT_LIST_HEAD(&fld->fld_exports); - spin_lock_init(&fld->fld_lock); - fld->fld_hash = &fld_hash[hash]; - fld->fld_count = 0; - - CDEBUG(D_INFO, "Client FLD initialized, using %s\n", - fld->fld_hash->fh_name); - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_init); - -void fld_client_fini(struct lu_client_fld *fld) -{ - struct obd_export *fld_exp; - struct obd_export *tmp; - ENTRY; - - spin_lock(&fld->fld_lock); - list_for_each_entry_safe(fld_exp, tmp, - &fld->fld_exports, exp_fld_chain) { - fld->fld_count--; - list_del(&fld_exp->exp_fld_chain); - class_export_get(fld_exp); - } - spin_unlock(&fld->fld_lock); - CDEBUG(D_INFO, "Client FLD finalized\n"); - EXIT; -} -EXPORT_SYMBOL(fld_client_fini); - -static int -fld_client_rpc(struct obd_export *exp, - struct md_fld *mf, __u32 fld_op) -{ - int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc; - int mf_size = sizeof(struct md_fld); - struct ptlrpc_request *req; - struct md_fld *pmf; - __u32 *op; - ENTRY; - - LASSERT(exp != NULL); - - req = ptlrpc_prep_req(class_exp2cliimp(exp), - LUSTRE_MDS_VERSION, FLD_QUERY, - 2, size, NULL); - if (req == NULL) - RETURN(-ENOMEM); - - op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op)); - *op = fld_op; - - pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf)); - memcpy(pmf, mf, sizeof(*mf)); - - req->rq_replen = lustre_msg_size(1, &mf_size); - req->rq_request_portal = MDS_FLD_PORTAL; - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out_req, rc); - - pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf), - lustre_swab_md_fld); - *mf = *pmf; -out_req: - ptlrpc_req_finished(req); - RETURN(rc); -} - -int -fld_client_create(struct lu_client_fld *fld, - __u64 seq, __u64 mds) -{ - struct obd_export *fld_exp; - struct md_fld md_fld; - __u32 rc; - ENTRY; - - fld_exp = fld_client_get_export(fld, seq); - if (!fld_exp) - RETURN(-EINVAL); - md_fld.mf_seq = seq; - md_fld.mf_mds = mds; - - rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE); -#ifdef __KERNEL__ - fld_cache_insert(fld_cache, seq, mds); -#endif - - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_create); - -int -fld_client_delete(struct lu_client_fld *fld, - __u64 seq, __u64 mds) -{ - struct obd_export *fld_exp; - struct md_fld md_fld; - __u32 rc; - -#ifdef __KERNEL__ - fld_cache_delete(fld_cache, seq); -#endif - - fld_exp = fld_client_get_export(fld, seq); - if (!fld_exp) - RETURN(-EINVAL); - - md_fld.mf_seq = seq; - md_fld.mf_mds = mds; - - rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE); - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_delete); - -static int -fld_client_get(struct lu_client_fld *fld, - __u64 seq, __u64 *mds) -{ - struct obd_export *fld_exp; - struct md_fld md_fld; - int rc; - ENTRY; - - fld_exp = fld_client_get_export(fld, seq); - if (!fld_exp) - RETURN(-EINVAL); - - md_fld.mf_seq = seq; - rc = fld_client_rpc(fld_exp, - &md_fld, FLD_LOOKUP); - if (rc == 0) - *mds = md_fld.mf_mds; - - RETURN(rc); -} - -/* lookup fid in the namespace of pfid according to the name */ -int -fld_client_lookup(struct lu_client_fld *fld, - __u64 seq, __u64 *mds) -{ -#ifdef __KERNEL__ - struct fld_cache *fld_entry; -#endif - int rc; - ENTRY; - -#ifdef __KERNEL__ - /* lookup it in the cache */ - fld_entry = fld_cache_lookup(fld_cache, seq); - if (fld_entry != NULL) { - *mds = fld_entry->fld_mds; - RETURN(0); - } -#endif - - /* can not find it in the cache */ - rc = fld_client_get(fld, seq, mds); - if (rc) - RETURN(rc); - -#ifdef __KERNEL__ - rc = fld_cache_insert(fld_cache, seq, *mds); -#endif - - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_lookup); - -#ifdef __KERNEL__ static int fld_init(void) { int i; @@ -468,7 +79,7 @@ static int fld_init(void) for (i = 0; i < FLD_HTABLE_SIZE; i++) INIT_HLIST_HEAD(&fld_cache->fld_hash[i]); - CDEBUG(D_INFO, "Client FLD, cache size %d\n", + CDEBUG(D_INFO|D_WARNING, "Client FLD, cache size %d\n", FLD_HTABLE_SIZE); RETURN(0); @@ -569,7 +180,6 @@ out_pill: return rc; } - static int fld_req_handle(struct ptlrpc_request *req) { int fail = OBD_FAIL_FLD_ALL_REPLY_NET; diff --git a/lustre/fld/fld_internal.h b/lustre/fld/fld_internal.h index 90d0255..56cd638 100644 --- a/lustre/fld/fld_internal.h +++ b/lustre/fld/fld_internal.h @@ -65,6 +65,12 @@ enum fld_op { FLD_LOOKUP = 2 }; +enum { + FLD_HTABLE_BITS = 8, + FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS), + FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1 +}; + #define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000) int fld_handle_insert(struct lu_server_fld *fld, diff --git a/lustre/fld/fld_request.c b/lustre/fld/fld_request.c new file mode 100644 index 0000000..b890e99 --- /dev/null +++ b/lustre/fld/fld_request.c @@ -0,0 +1,439 @@ +/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/fld/fld_handler.c + * FLD (Fids Location Database) + * + * Copyright (C) 2006 Cluster File Systems, Inc. + * Author: Yury Umanets + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_FLD + +#ifdef __KERNEL__ +# include +# include +# include +# include +#else /* __KERNEL__ */ +# include +# include +#endif + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include "fld_internal.h" + +#ifdef __KERNEL__ +extern struct fld_cache_info *fld_cache; + +static __u32 fld_cache_hash(__u64 seq) +{ + return seq; +} + +static int +fld_cache_insert(struct fld_cache_info *fld_cache, + __u64 seq, __u64 mds) +{ + struct fld_cache *fld; + struct hlist_head *bucket; + struct hlist_node *scan; + int rc = 0; + ENTRY; + + bucket = fld_cache->fld_hash + (fld_cache_hash(seq) & + fld_cache->fld_hash_mask); + + OBD_ALLOC_PTR(fld); + if (!fld) + RETURN(-ENOMEM); + + INIT_HLIST_NODE(&fld->fld_list); + fld->fld_mds = mds; + fld->fld_seq = seq; + + spin_lock(&fld_cache->fld_lock); + hlist_for_each_entry(fld, scan, bucket, fld_list) { + if (fld->fld_seq == seq) + GOTO(exit_unlock, rc = -EEXIST); + } + hlist_add_head(&fld->fld_list, bucket); + EXIT; +exit_unlock: + spin_unlock(&fld_cache->fld_lock); + if (rc != 0) + OBD_FREE(fld, sizeof(*fld)); + return rc; +} + +static struct fld_cache * +fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq) +{ + struct hlist_head *bucket; + struct hlist_node *scan; + struct fld_cache *fld; + ENTRY; + + bucket = fld_cache->fld_hash + (fld_cache_hash(seq) & + fld_cache->fld_hash_mask); + + spin_lock(&fld_cache->fld_lock); + hlist_for_each_entry(fld, scan, bucket, fld_list) { + if (fld->fld_seq == seq) { + spin_unlock(&fld_cache->fld_lock); + RETURN(fld); + } + } + spin_unlock(&fld_cache->fld_lock); + + RETURN(NULL); +} + +static void +fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq) +{ + struct hlist_head *bucket; + struct hlist_node *scan; + struct fld_cache *fld; + ENTRY; + + bucket = fld_cache->fld_hash + (fld_cache_hash(seq) & + fld_cache->fld_hash_mask); + + spin_lock(&fld_cache->fld_lock); + hlist_for_each_entry(fld, scan, bucket, fld_list) { + if (fld->fld_seq == seq) { + hlist_del_init(&fld->fld_list); + GOTO(out_unlock, 0); + } + } + + EXIT; +out_unlock: + spin_unlock(&fld_cache->fld_lock); + return; +} +#endif + +static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq) +{ + if (fld->fld_count == 0) + return 0; + + return do_div(seq, fld->fld_count); +} + +static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq) +{ + /* XXX: here should DHT hash */ + return fld_rrb_hash(fld, seq); +} + +static struct lu_fld_hash fld_hash[3] = { + { + .fh_name = "DHT", + .fh_func = fld_dht_hash + }, + { + .fh_name = "Round Robin", + .fh_func = fld_rrb_hash + }, + { + 0, + } +}; + +static struct obd_export * +fld_client_get_export(struct lu_client_fld *fld, __u64 seq) +{ + struct obd_export *fld_exp; + int count = 0, hash; + ENTRY; + + LASSERT(fld->fld_hash != NULL); + hash = fld->fld_hash->fh_func(fld, seq); + + spin_lock(&fld->fld_lock); + list_for_each_entry(fld_exp, + &fld->fld_exports, exp_fld_chain) { + if (count == hash) { + spin_unlock(&fld->fld_lock); + RETURN(fld_exp); + } + count++; + } + spin_unlock(&fld->fld_lock); + RETURN(NULL); +} + +/* add export to FLD. This is usually done by CMM and LMV as they are main users + * of FLD module. */ +int fld_client_add_export(struct lu_client_fld *fld, + struct obd_export *exp) +{ + struct obd_export *fld_exp; + ENTRY; + + LASSERT(exp != NULL); + + CWARN("adding export %s\n", exp->exp_client_uuid.uuid); + + spin_lock(&fld->fld_lock); + list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) { + if (obd_uuid_equals(&fld_exp->exp_client_uuid, + &exp->exp_client_uuid)) + { + spin_unlock(&fld->fld_lock); + RETURN(-EEXIST); + } + } + + fld_exp = class_export_get(exp); + list_add_tail(&fld_exp->exp_fld_chain, + &fld->fld_exports); + fld->fld_count++; + + spin_unlock(&fld->fld_lock); + + RETURN(0); +} +EXPORT_SYMBOL(fld_client_add_export); + +/* remove export from FLD */ +int fld_client_del_export(struct lu_client_fld *fld, + struct obd_export *exp) +{ + struct obd_export *fld_exp; + struct obd_export *tmp; + ENTRY; + + spin_lock(&fld->fld_lock); + list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) { + if (obd_uuid_equals(&fld_exp->exp_client_uuid, + &exp->exp_client_uuid)) + { + fld->fld_count--; + list_del(&fld_exp->exp_fld_chain); + class_export_get(fld_exp); + + spin_unlock(&fld->fld_lock); + RETURN(0); + } + } + spin_unlock(&fld->fld_lock); + RETURN(-ENOENT); +} +EXPORT_SYMBOL(fld_client_del_export); + +int fld_client_init(struct lu_client_fld *fld, int hash) +{ + int rc = 0; + ENTRY; + + LASSERT(fld != NULL); + + if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) { + CERROR("wrong hash function 0x%x\n", hash); + RETURN(-EINVAL); + } + + INIT_LIST_HEAD(&fld->fld_exports); + spin_lock_init(&fld->fld_lock); + fld->fld_hash = &fld_hash[hash]; + fld->fld_count = 0; + + CDEBUG(D_INFO, "Client FLD initialized, using %s\n", + fld->fld_hash->fh_name); + RETURN(rc); +} +EXPORT_SYMBOL(fld_client_init); + +void fld_client_fini(struct lu_client_fld *fld) +{ + struct obd_export *fld_exp; + struct obd_export *tmp; + ENTRY; + + spin_lock(&fld->fld_lock); + list_for_each_entry_safe(fld_exp, tmp, + &fld->fld_exports, exp_fld_chain) { + fld->fld_count--; + list_del(&fld_exp->exp_fld_chain); + class_export_get(fld_exp); + } + spin_unlock(&fld->fld_lock); + CDEBUG(D_INFO, "Client FLD finalized\n"); + EXIT; +} +EXPORT_SYMBOL(fld_client_fini); + +static int +fld_client_rpc(struct obd_export *exp, + struct md_fld *mf, __u32 fld_op) +{ + int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc; + int mf_size = sizeof(struct md_fld); + struct ptlrpc_request *req; + struct md_fld *pmf; + __u32 *op; + ENTRY; + + LASSERT(exp != NULL); + + req = ptlrpc_prep_req(class_exp2cliimp(exp), + LUSTRE_MDS_VERSION, FLD_QUERY, + 2, size, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op)); + *op = fld_op; + + pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf)); + memcpy(pmf, mf, sizeof(*mf)); + + req->rq_replen = lustre_msg_size(1, &mf_size); + req->rq_request_portal = MDS_FLD_PORTAL; + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out_req, rc); + + pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf), + lustre_swab_md_fld); + *mf = *pmf; +out_req: + ptlrpc_req_finished(req); + RETURN(rc); +} + +int +fld_client_create(struct lu_client_fld *fld, + __u64 seq, __u64 mds) +{ + struct obd_export *fld_exp; + struct md_fld md_fld; + __u32 rc; + ENTRY; + + fld_exp = fld_client_get_export(fld, seq); + if (!fld_exp) + RETURN(-EINVAL); + md_fld.mf_seq = seq; + md_fld.mf_mds = mds; + + rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE); +#ifdef __KERNEL__ + fld_cache_insert(fld_cache, seq, mds); +#endif + + RETURN(rc); +} +EXPORT_SYMBOL(fld_client_create); + +int +fld_client_delete(struct lu_client_fld *fld, + __u64 seq, __u64 mds) +{ + struct obd_export *fld_exp; + struct md_fld md_fld; + __u32 rc; + +#ifdef __KERNEL__ + fld_cache_delete(fld_cache, seq); +#endif + + fld_exp = fld_client_get_export(fld, seq); + if (!fld_exp) + RETURN(-EINVAL); + + md_fld.mf_seq = seq; + md_fld.mf_mds = mds; + + rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE); + RETURN(rc); +} +EXPORT_SYMBOL(fld_client_delete); + +static int +fld_client_get(struct lu_client_fld *fld, + __u64 seq, __u64 *mds) +{ + struct obd_export *fld_exp; + struct md_fld md_fld; + int rc; + ENTRY; + + fld_exp = fld_client_get_export(fld, seq); + if (!fld_exp) + RETURN(-EINVAL); + + md_fld.mf_seq = seq; + rc = fld_client_rpc(fld_exp, + &md_fld, FLD_LOOKUP); + if (rc == 0) + *mds = md_fld.mf_mds; + + RETURN(rc); +} + +/* lookup fid in the namespace of pfid according to the name */ +int +fld_client_lookup(struct lu_client_fld *fld, + __u64 seq, __u64 *mds) +{ +#ifdef __KERNEL__ + struct fld_cache *fld_entry; +#endif + int rc; + ENTRY; + +#ifdef __KERNEL__ + /* lookup it in the cache */ + fld_entry = fld_cache_lookup(fld_cache, seq); + if (fld_entry != NULL) { + *mds = fld_entry->fld_mds; + RETURN(0); + } +#endif + + /* can not find it in the cache */ + rc = fld_client_get(fld, seq, mds); + if (rc) + RETURN(rc); + +#ifdef __KERNEL__ + rc = fld_cache_insert(fld_cache, seq, *mds); +#endif + + RETURN(rc); +} +EXPORT_SYMBOL(fld_client_lookup);