--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * lustre/mdc/mdc_fid.c
+ * MDC fid management
+ *
+ * Copyright (c) 2006 Cluster File Systems, Inc.
+ * Author: Yury Umanets <umka@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FID
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <obd_support.h>
+#include "mdc_internal.h"
+
+typedef __u64 mdsno_t;
+struct md_fld {
+ seqno_t mf_seq;
+ mdsno_t mf_mds;
+};
+
+enum fld_op {
+ FLD_CREATE = 0,
+ FLD_DELETE = 1,
+ FLD_LOOKUP = 2
+};
+
+
+static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
+ struct lu_range *output, __u32 opc,
+ const char *opcname)
+{
+ int rc, size[3] = { sizeof(struct ptlrpc_body),
+ sizeof(__u32),
+ sizeof(struct lu_range) };
+ struct obd_export *exp = seq->lcs_exp;
+ struct ptlrpc_request *req;
+ struct lu_range *out, *in;
+ __u32 *op;
+ ENTRY;
+
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ SEQ_QUERY, 3, size, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req->rq_export = class_export_get(exp);
+ op = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(__u32));
+ *op = opc;
+
+ /* Zero out input range, this is not recovery yet. */
+ in = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ sizeof(struct lu_range));
+ if (input != NULL)
+ *in = *input;
+ else
+ range_zero(in);
+
+ size[1] = sizeof(struct lu_range);
+ ptlrpc_req_set_repsize(req, 2, size);
+
+ LASSERT(seq->lcs_type == LUSTRE_SEQ_METADATA);
+ req->rq_request_portal = SEQ_METADATA_PORTAL;
+
+ mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+ rc = ptlrpc_queue_wait(req);
+ mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+
+ if (rc)
+ GOTO(out_req, rc);
+
+ out = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
+ sizeof(struct lu_range));
+ *output = *out;
+
+ if (!range_is_sane(output)) {
+ CERROR("%s: Invalid range received from server: "
+ DRANGE"\n", seq->lcs_name, PRANGE(output));
+ GOTO(out_req, rc = -EINVAL);
+ }
+
+ if (range_is_exhausted(output)) {
+ CERROR("%s: Range received from server is exhausted: "
+ DRANGE"]\n", seq->lcs_name, PRANGE(output));
+ GOTO(out_req, rc = -EINVAL);
+ }
+ *in = *out;
+
+ CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
+ seq->lcs_name, opcname, PRANGE(output));
+
+ EXIT;
+out_req:
+ ptlrpc_req_finished(req);
+ return rc;
+}
+
+
+static int fld_client_rpc(struct lu_client_seq *seq,
+ struct md_fld *mf, __u32 fld_op)
+{
+ int size[3] = { sizeof(struct ptlrpc_body),
+ sizeof(__u32),
+ sizeof(struct md_fld) };
+ struct obd_export *exp = seq->lcs_exp;
+ struct ptlrpc_request *req;
+ struct md_fld *pmf;
+ __u32 *op;
+ int rc;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ FLD_QUERY, 3, size, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req->rq_export = class_export_get(exp);
+ op = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(__u32));
+ *op = fld_op;
+
+ pmf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ sizeof(struct md_fld));
+ *pmf = *mf;
+
+ size[1] = sizeof(struct md_fld);
+ ptlrpc_req_set_repsize(req, 2, size);
+ req->rq_request_portal = FLD_REQUEST_PORTAL;
+
+ mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+ rc = ptlrpc_queue_wait(req);
+ mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+ if (rc)
+ GOTO(out_req, rc);
+
+ pmf = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
+ sizeof(struct md_fld));
+ if (pmf == NULL)
+ GOTO(out_req, rc = -EFAULT);
+ *mf = *pmf;
+ EXIT;
+out_req:
+ ptlrpc_req_finished(req);
+ return rc;
+}
+
+
+/* Request sequence-controller node to allocate new meta-sequence. */
+static int seq_client_alloc_meta(struct lu_client_seq *seq)
+{
+ int rc;
+ ENTRY;
+
+ rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
+ SEQ_ALLOC_META, "meta");
+ RETURN(rc);
+}
+
+/* Allocate new sequence for client. */
+static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(range_is_sane(&seq->lcs_space));
+
+ if (range_is_exhausted(&seq->lcs_space)) {
+ rc = seq_client_alloc_meta(seq);
+ if (rc) {
+ CERROR("%s: Can't allocate new meta-sequence, "
+ "rc %d\n", seq->lcs_name, rc);
+ RETURN(rc);
+ } else {
+ CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
+ seq->lcs_name, PRANGE(&seq->lcs_space));
+ }
+ } else {
+ rc = 0;
+ }
+
+ LASSERT(!range_is_exhausted(&seq->lcs_space));
+ *seqnr = seq->lcs_space.lr_start;
+ seq->lcs_space.lr_start += 1;
+
+ CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
+ *seqnr);
+
+ RETURN(rc);
+}
+
+/* Allocate new fid on passed client @seq and save it to @fid. */
+static int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(seq != NULL);
+ LASSERT(fid != NULL);
+
+ down(&seq->lcs_sem);
+
+ if (fid_is_zero(&seq->lcs_fid) ||
+ fid_oid(&seq->lcs_fid) >= seq->lcs_width)
+ {
+ seqno_t seqnr;
+
+ rc = seq_client_alloc_seq(seq, &seqnr);
+ if (rc) {
+ CERROR("%s: Can't allocate new sequence, "
+ "rc %d\n", seq->lcs_name, rc);
+ up(&seq->lcs_sem);
+ RETURN(rc);
+ }
+
+ CDEBUG(D_INFO, "%s: Switch to sequence "
+ "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
+
+ seq->lcs_fid.f_seq = seqnr;
+ seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
+ seq->lcs_fid.f_ver = 0;
+
+ /*
+ * Inform caller that sequence switch is performed to allow it
+ * to setup FLD for it.
+ */
+ rc = 1;
+ } else {
+ /* Just bump last allocated fid and return to caller. */
+ seq->lcs_fid.f_oid += 1;
+ rc = 0;
+ }
+
+ *fid = seq->lcs_fid;
+ up(&seq->lcs_sem);
+
+ CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
+ RETURN(rc);
+}
+
+/*
+ * Finish the current sequence due to disconnect.
+ * See mdc_import_event()
+ */
+static void seq_client_flush(struct lu_client_seq *seq)
+{
+ LASSERT(seq != NULL);
+ down(&seq->lcs_sem);
+ fid_init(&seq->lcs_fid);
+ range_zero(&seq->lcs_space);
+ up(&seq->lcs_sem);
+}
+
+static int fld_client_create(struct lu_client_seq *lcs,
+ seqno_t seq, mdsno_t mds)
+{
+ struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: "
+ LPU64") on target 0\n", lcs->lcs_name, seq, mds);
+
+ rc = fld_client_rpc(lcs, &md_fld, FLD_CREATE);
+ RETURN(rc);
+}
+
+static int seq_client_proc_init(struct lu_client_seq *seq)
+{
+ return 0;
+}
+
+static void seq_client_proc_fini(struct lu_client_seq *seq)
+{
+ return;
+}
+
+int seq_client_init(struct lu_client_seq *seq,
+ struct obd_export *exp,
+ enum lu_cli_type type,
+ __u64 width,
+ const char *prefix)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(seq != NULL);
+ LASSERT(prefix != NULL);
+
+ seq->lcs_exp = exp;
+ seq->lcs_type = type;
+ sema_init(&seq->lcs_sem, 1);
+ seq->lcs_width = width;
+
+ /* Make sure that things are clear before work is started. */
+ seq_client_flush(seq);
+
+ LASSERT(seq->lcs_exp != NULL);
+ seq->lcs_exp = class_export_get(seq->lcs_exp);
+
+ snprintf(seq->lcs_name, sizeof(seq->lcs_name),
+ "cli-%s", prefix);
+
+ rc = seq_client_proc_init(seq);
+ if (rc)
+ seq_client_fini(seq);
+ RETURN(rc);
+}
+
+void seq_client_fini(struct lu_client_seq *seq)
+{
+ ENTRY;
+
+ seq_client_proc_fini(seq);
+ LASSERT(seq->lcs_exp != NULL);
+
+ if (seq->lcs_exp != NULL) {
+ class_export_put(seq->lcs_exp);
+ seq->lcs_exp = NULL;
+ }
+
+ EXIT;
+}
+
+/* Allocate new fid on passed client @seq and save it to @fid. */
+int mdc_fid_alloc(struct lu_client_seq *seq, struct lu_fid *fid)
+{
+ int rc;
+ ENTRY;
+
+ rc = seq_client_alloc_fid(seq, fid);
+ if (rc > 0) {
+ /* Client switches to new sequence, setup FLD. */
+ rc = fld_client_create(seq, fid_seq(fid), 0);
+ if (rc) {
+ CERROR("Can't create fld entry, rc %d\n", rc);
+ /* Delete just allocated fid sequence */
+ seq_client_flush(seq);
+ }
+ }
+ RETURN(rc);
+}
+
+void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src)
+{
+ /* check that all fields are converted */
+ CLASSERT(sizeof *src ==
+ sizeof fid_seq(src) +
+ sizeof fid_oid(src) + sizeof fid_ver(src));
+ LASSERTF(fid_is_igif(src) || fid_ver(src) == 0, DFID"\n", PFID(src));
+ dst->f_seq = cpu_to_le64(fid_seq(src));
+ dst->f_oid = cpu_to_le32(fid_oid(src));
+ dst->f_ver = cpu_to_le32(fid_ver(src));
+}
+EXPORT_SYMBOL(fid_cpu_to_le);
+
+void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
+{
+ /* check that all fields are converted */
+ CLASSERT(sizeof *src ==
+ sizeof fid_seq(src) +
+ sizeof fid_oid(src) + sizeof fid_ver(src));
+ dst->f_seq = le64_to_cpu(fid_seq(src));
+ dst->f_oid = le32_to_cpu(fid_oid(src));
+ dst->f_ver = le32_to_cpu(fid_ver(src));
+ LASSERTF(fid_is_igif(dst) || fid_ver(dst) == 0, DFID"\n", PFID(dst));
+}
+EXPORT_SYMBOL(fid_le_to_cpu);
+
+void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src)
+{
+ /* check that all fields are converted */
+ CLASSERT(sizeof *src ==
+ sizeof src->lr_start +
+ sizeof src->lr_end);
+ dst->lr_start = cpu_to_le64(src->lr_start);
+ dst->lr_end = cpu_to_le64(src->lr_end);
+}
+EXPORT_SYMBOL(range_cpu_to_le);
+
+void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src)
+{
+ /* check that all fields are converted */
+ CLASSERT(sizeof *src ==
+ sizeof src->lr_start +
+ sizeof src->lr_end);
+ dst->lr_start = le64_to_cpu(src->lr_start);
+ dst->lr_end = le64_to_cpu(src->lr_end);
+}
+EXPORT_SYMBOL(range_le_to_cpu);
+
+void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src)
+{
+ /* check that all fields are converted */
+ CLASSERT(sizeof *src ==
+ sizeof src->lr_start +
+ sizeof src->lr_end);
+ dst->lr_start = cpu_to_be64(src->lr_start);
+ dst->lr_end = cpu_to_be64(src->lr_end);
+}
+EXPORT_SYMBOL(range_cpu_to_be);
+
+void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src)
+{
+ /* check that all fields are converted */
+ CLASSERT(sizeof *src ==
+ sizeof src->lr_start +
+ sizeof src->lr_end);
+ dst->lr_start = be64_to_cpu(src->lr_start);
+ dst->lr_end = be64_to_cpu(src->lr_end);
+}
+EXPORT_SYMBOL(range_be_to_cpu);
+
+/**
+ * Build (DLM) resource name from fid.
+ */
+struct ldlm_res_id *
+fid_build_reg_res_name(const struct lu_fid *f, struct ldlm_res_id *name)
+{
+ memset(name, 0, sizeof *name);
+ name->name[LUSTRE_RES_ID_SEQ_OFF] = fid_seq(f);
+ name->name[LUSTRE_RES_ID_OID_OFF] = fid_oid(f);
+ if (!fid_is_igif(f))
+ name->name[LUSTRE_RES_ID_VER_OFF] = fid_ver(f);
+ return name;
+}
+EXPORT_SYMBOL(fid_build_reg_res_name);
+
+/**
+ * Return true if resource is for object identified by fid.
+ */
+int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
+{
+ int ret;
+
+ ret = name->name[LUSTRE_RES_ID_SEQ_OFF] == fid_seq(f) &&
+ name->name[LUSTRE_RES_ID_OID_OFF] == fid_oid(f);
+ if (!fid_is_igif(f))
+ ret = ret && name->name[LUSTRE_RES_ID_VER_OFF] == fid_ver(f);
+ return ret;
+}
+EXPORT_SYMBOL(fid_res_name_eq);