From b87c143389e9c002c67c62985914cd62fce562a1 Mon Sep 17 00:00:00 2001 From: huanghua Date: Fri, 11 Jul 2008 05:22:16 +0000 Subject: [PATCH] Branch b1_8_gate b=11930 i=adilger i=nikita.danilov i=alex --- lustre/mdc/mdc_fid.c | 472 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 472 insertions(+) create mode 100644 lustre/mdc/mdc_fid.c diff --git a/lustre/mdc/mdc_fid.c b/lustre/mdc/mdc_fid.c new file mode 100644 index 0000000..ea003ef --- /dev/null +++ b/lustre/mdc/mdc_fid.c @@ -0,0 +1,472 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/mdc/mdc_fid.c + * MDC fid management + * + * Copyright (c) 2006 Cluster File Systems, Inc. + * Author: Yury Umanets + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_FID + +#ifdef __KERNEL__ +# include +# include +#else /* __KERNEL__ */ +# include +#endif + +#include +#include +#include +#include "mdc_internal.h" + +typedef __u64 mdsno_t; +struct md_fld { + seqno_t mf_seq; + mdsno_t mf_mds; +}; + +enum fld_op { + FLD_CREATE = 0, + FLD_DELETE = 1, + FLD_LOOKUP = 2 +}; + + +static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input, + struct lu_range *output, __u32 opc, + const char *opcname) +{ + int rc, size[3] = { sizeof(struct ptlrpc_body), + sizeof(__u32), + sizeof(struct lu_range) }; + struct obd_export *exp = seq->lcs_exp; + struct ptlrpc_request *req; + struct lu_range *out, *in; + __u32 *op; + ENTRY; + + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, + SEQ_QUERY, 3, size, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + req->rq_export = class_export_get(exp); + op = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(__u32)); + *op = opc; + + /* Zero out input range, this is not recovery yet. */ + in = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + sizeof(struct lu_range)); + if (input != NULL) + *in = *input; + else + range_zero(in); + + size[1] = sizeof(struct lu_range); + ptlrpc_req_set_repsize(req, 2, size); + + LASSERT(seq->lcs_type == LUSTRE_SEQ_METADATA); + req->rq_request_portal = SEQ_METADATA_PORTAL; + + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + + if (rc) + GOTO(out_req, rc); + + out = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, + sizeof(struct lu_range)); + *output = *out; + + if (!range_is_sane(output)) { + CERROR("%s: Invalid range received from server: " + DRANGE"\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + + if (range_is_exhausted(output)) { + CERROR("%s: Range received from server is exhausted: " + DRANGE"]\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + *in = *out; + + CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n", + seq->lcs_name, opcname, PRANGE(output)); + + EXIT; +out_req: + ptlrpc_req_finished(req); + return rc; +} + + +static int fld_client_rpc(struct lu_client_seq *seq, + struct md_fld *mf, __u32 fld_op) +{ + int size[3] = { sizeof(struct ptlrpc_body), + sizeof(__u32), + sizeof(struct md_fld) }; + struct obd_export *exp = seq->lcs_exp; + struct ptlrpc_request *req; + struct md_fld *pmf; + __u32 *op; + int rc; + ENTRY; + + LASSERT(exp != NULL); + + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, + FLD_QUERY, 3, size, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + req->rq_export = class_export_get(exp); + op = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(__u32)); + *op = fld_op; + + pmf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + sizeof(struct md_fld)); + *pmf = *mf; + + size[1] = sizeof(struct md_fld); + ptlrpc_req_set_repsize(req, 2, size); + req->rq_request_portal = FLD_REQUEST_PORTAL; + + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + if (rc) + GOTO(out_req, rc); + + pmf = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, + sizeof(struct md_fld)); + if (pmf == NULL) + GOTO(out_req, rc = -EFAULT); + *mf = *pmf; + EXIT; +out_req: + ptlrpc_req_finished(req); + return rc; +} + + +/* Request sequence-controller node to allocate new meta-sequence. */ +static int seq_client_alloc_meta(struct lu_client_seq *seq) +{ + int rc; + ENTRY; + + rc = seq_client_rpc(seq, NULL, &seq->lcs_space, + SEQ_ALLOC_META, "meta"); + RETURN(rc); +} + +/* Allocate new sequence for client. */ +static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) +{ + int rc; + ENTRY; + + LASSERT(range_is_sane(&seq->lcs_space)); + + if (range_is_exhausted(&seq->lcs_space)) { + rc = seq_client_alloc_meta(seq); + if (rc) { + CERROR("%s: Can't allocate new meta-sequence, " + "rc %d\n", seq->lcs_name, rc); + RETURN(rc); + } else { + CDEBUG(D_INFO, "%s: New range - "DRANGE"\n", + seq->lcs_name, PRANGE(&seq->lcs_space)); + } + } else { + rc = 0; + } + + LASSERT(!range_is_exhausted(&seq->lcs_space)); + *seqnr = seq->lcs_space.lr_start; + seq->lcs_space.lr_start += 1; + + CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name, + *seqnr); + + RETURN(rc); +} + +/* Allocate new fid on passed client @seq and save it to @fid. */ +static int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) +{ + int rc; + ENTRY; + + LASSERT(seq != NULL); + LASSERT(fid != NULL); + + down(&seq->lcs_sem); + + if (fid_is_zero(&seq->lcs_fid) || + fid_oid(&seq->lcs_fid) >= seq->lcs_width) + { + seqno_t seqnr; + + rc = seq_client_alloc_seq(seq, &seqnr); + if (rc) { + CERROR("%s: Can't allocate new sequence, " + "rc %d\n", seq->lcs_name, rc); + up(&seq->lcs_sem); + RETURN(rc); + } + + CDEBUG(D_INFO, "%s: Switch to sequence " + "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr); + + seq->lcs_fid.f_seq = seqnr; + seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; + seq->lcs_fid.f_ver = 0; + + /* + * Inform caller that sequence switch is performed to allow it + * to setup FLD for it. + */ + rc = 1; + } else { + /* Just bump last allocated fid and return to caller. */ + seq->lcs_fid.f_oid += 1; + rc = 0; + } + + *fid = seq->lcs_fid; + up(&seq->lcs_sem); + + CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); + RETURN(rc); +} + +/* + * Finish the current sequence due to disconnect. + * See mdc_import_event() + */ +static void seq_client_flush(struct lu_client_seq *seq) +{ + LASSERT(seq != NULL); + down(&seq->lcs_sem); + fid_init(&seq->lcs_fid); + range_zero(&seq->lcs_space); + up(&seq->lcs_sem); +} + +static int fld_client_create(struct lu_client_seq *lcs, + seqno_t seq, mdsno_t mds) +{ + struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds }; + int rc; + ENTRY; + + CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: " + LPU64") on target 0\n", lcs->lcs_name, seq, mds); + + rc = fld_client_rpc(lcs, &md_fld, FLD_CREATE); + RETURN(rc); +} + +static int seq_client_proc_init(struct lu_client_seq *seq) +{ + return 0; +} + +static void seq_client_proc_fini(struct lu_client_seq *seq) +{ + return; +} + +int seq_client_init(struct lu_client_seq *seq, + struct obd_export *exp, + enum lu_cli_type type, + __u64 width, + const char *prefix) +{ + int rc; + ENTRY; + + LASSERT(seq != NULL); + LASSERT(prefix != NULL); + + seq->lcs_exp = exp; + seq->lcs_type = type; + sema_init(&seq->lcs_sem, 1); + seq->lcs_width = width; + + /* Make sure that things are clear before work is started. */ + seq_client_flush(seq); + + LASSERT(seq->lcs_exp != NULL); + seq->lcs_exp = class_export_get(seq->lcs_exp); + + snprintf(seq->lcs_name, sizeof(seq->lcs_name), + "cli-%s", prefix); + + rc = seq_client_proc_init(seq); + if (rc) + seq_client_fini(seq); + RETURN(rc); +} + +void seq_client_fini(struct lu_client_seq *seq) +{ + ENTRY; + + seq_client_proc_fini(seq); + LASSERT(seq->lcs_exp != NULL); + + if (seq->lcs_exp != NULL) { + class_export_put(seq->lcs_exp); + seq->lcs_exp = NULL; + } + + EXIT; +} + +/* Allocate new fid on passed client @seq and save it to @fid. */ +int mdc_fid_alloc(struct lu_client_seq *seq, struct lu_fid *fid) +{ + int rc; + ENTRY; + + rc = seq_client_alloc_fid(seq, fid); + if (rc > 0) { + /* Client switches to new sequence, setup FLD. */ + rc = fld_client_create(seq, fid_seq(fid), 0); + if (rc) { + CERROR("Can't create fld entry, rc %d\n", rc); + /* Delete just allocated fid sequence */ + seq_client_flush(seq); + } + } + RETURN(rc); +} + +void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src) +{ + /* check that all fields are converted */ + CLASSERT(sizeof *src == + sizeof fid_seq(src) + + sizeof fid_oid(src) + sizeof fid_ver(src)); + LASSERTF(fid_is_igif(src) || fid_ver(src) == 0, DFID"\n", PFID(src)); + dst->f_seq = cpu_to_le64(fid_seq(src)); + dst->f_oid = cpu_to_le32(fid_oid(src)); + dst->f_ver = cpu_to_le32(fid_ver(src)); +} +EXPORT_SYMBOL(fid_cpu_to_le); + +void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src) +{ + /* check that all fields are converted */ + CLASSERT(sizeof *src == + sizeof fid_seq(src) + + sizeof fid_oid(src) + sizeof fid_ver(src)); + dst->f_seq = le64_to_cpu(fid_seq(src)); + dst->f_oid = le32_to_cpu(fid_oid(src)); + dst->f_ver = le32_to_cpu(fid_ver(src)); + LASSERTF(fid_is_igif(dst) || fid_ver(dst) == 0, DFID"\n", PFID(dst)); +} +EXPORT_SYMBOL(fid_le_to_cpu); + +void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src) +{ + /* check that all fields are converted */ + CLASSERT(sizeof *src == + sizeof src->lr_start + + sizeof src->lr_end); + dst->lr_start = cpu_to_le64(src->lr_start); + dst->lr_end = cpu_to_le64(src->lr_end); +} +EXPORT_SYMBOL(range_cpu_to_le); + +void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src) +{ + /* check that all fields are converted */ + CLASSERT(sizeof *src == + sizeof src->lr_start + + sizeof src->lr_end); + dst->lr_start = le64_to_cpu(src->lr_start); + dst->lr_end = le64_to_cpu(src->lr_end); +} +EXPORT_SYMBOL(range_le_to_cpu); + +void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src) +{ + /* check that all fields are converted */ + CLASSERT(sizeof *src == + sizeof src->lr_start + + sizeof src->lr_end); + dst->lr_start = cpu_to_be64(src->lr_start); + dst->lr_end = cpu_to_be64(src->lr_end); +} +EXPORT_SYMBOL(range_cpu_to_be); + +void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src) +{ + /* check that all fields are converted */ + CLASSERT(sizeof *src == + sizeof src->lr_start + + sizeof src->lr_end); + dst->lr_start = be64_to_cpu(src->lr_start); + dst->lr_end = be64_to_cpu(src->lr_end); +} +EXPORT_SYMBOL(range_be_to_cpu); + +/** + * Build (DLM) resource name from fid. + */ +struct ldlm_res_id * +fid_build_reg_res_name(const struct lu_fid *f, struct ldlm_res_id *name) +{ + memset(name, 0, sizeof *name); + name->name[LUSTRE_RES_ID_SEQ_OFF] = fid_seq(f); + name->name[LUSTRE_RES_ID_OID_OFF] = fid_oid(f); + if (!fid_is_igif(f)) + name->name[LUSTRE_RES_ID_VER_OFF] = fid_ver(f); + return name; +} +EXPORT_SYMBOL(fid_build_reg_res_name); + +/** + * Return true if resource is for object identified by fid. + */ +int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name) +{ + int ret; + + ret = name->name[LUSTRE_RES_ID_SEQ_OFF] == fid_seq(f) && + name->name[LUSTRE_RES_ID_OID_OFF] == fid_oid(f); + if (!fid_is_igif(f)) + ret = ret && name->name[LUSTRE_RES_ID_VER_OFF] == fid_ver(f); + return ret; +} +EXPORT_SYMBOL(fid_res_name_eq); -- 1.8.3.1