/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * lustre/fid/fid_handler.c
- * Lustre Sequence Manager
+ * GPL HEADER START
*
- * Copyright (c) 2006 Cluster File Systems, Inc.
- * Author: Yury Umanets <umka@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/fid/fid_handler.c
+ *
+ * Lustre Sequence Manager
+ *
+ * Author: Yury Umanets <umka@clusterfs.com>
*/
#ifndef EXPORT_SYMTAB
* Ask client for new range, assign that range to ->seq_space and write
* seq state to backing store should be atomic.
*/
- down(&seq->lss_sem);
+ cfs_down(&seq->lss_sem);
if (cli == NULL) {
CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
seq->lss_name, cli->lcs_name);
seq->lss_cli = cli;
+ cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
EXIT;
out_up:
- up(&seq->lss_sem);
+ cfs_up(&seq->lss_sem);
return rc;
}
EXPORT_SYMBOL(seq_server_set_cli);
-/*
+/**
* On controller node, allocate new super sequence for regular sequence server.
+ * As this super sequence controller, this node suppose to maintain fld
+ * and update index.
+ * \a out range always has currect mds node number of requester.
*/
+
static int __seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
- struct lu_range *space = &seq->lss_space;
- int rc;
+ struct lu_seq_range *space = &seq->lss_space;
+ struct thandle *th;
+ __u64 mdt = out->lsr_mdt;
+ int rc, credit;
ENTRY;
LASSERT(range_is_sane(space));
CDEBUG(D_INFO, "%s: Input seq range: "
DRANGE"\n", seq->lss_name, PRANGE(in));
- if (in->lr_end > space->lr_start)
- space->lr_start = in->lr_end;
+ if (in->lsr_end > space->lsr_start)
+ space->lsr_start = in->lsr_end;
*out = *in;
CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
"Only "LPU64" sequences left\n", seq->lss_name,
range_space(space));
*out = *space;
- space->lr_start = space->lr_end;
+ space->lsr_start = space->lsr_end;
} else if (range_is_exhausted(space)) {
CERROR("%s: Sequences space is exhausted\n",
seq->lss_name);
range_alloc(out, space, seq->lss_width);
}
}
+ out->lsr_mdt = mdt;
- rc = seq_store_write(seq, env);
+ credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
+
+ th = seq_store_trans_start(seq, env, credit);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
- RETURN(rc);
+ goto out;
+ }
+
+ rc = fld_server_create(seq->lss_site->ms_server_fld,
+ env, out, th);
+ if (rc) {
+ CERROR("%s: Can't Update fld database, rc %d\n",
+ seq->lss_name, rc);
}
- CDEBUG(D_INFO, "%s: Allocated super-sequence "
- DRANGE"\n", seq->lss_name, PRANGE(out));
+out:
+ seq_store_trans_stop(seq, env, th);
+
+ CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
+ DRANGE"\n", seq->lss_name, rc, PRANGE(out));
RETURN(rc);
}
int seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
int rc;
ENTRY;
- down(&seq->lss_sem);
+ cfs_down(&seq->lss_sem);
rc = __seq_server_alloc_super(seq, in, out, env);
- up(&seq->lss_sem);
+ cfs_up(&seq->lss_sem);
RETURN(rc);
}
static int __seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
- struct lu_range *space = &seq->lss_space;
+ struct lu_seq_range *space = &seq->lss_space;
+ struct thandle *th;
int rc = 0;
+
ENTRY;
LASSERT(range_is_sane(space));
CDEBUG(D_INFO, "%s: Input seq range: "
DRANGE"\n", seq->lss_name, PRANGE(in));
- if (range_is_exhausted(space)) {
+ if (in->lsr_end <= space->lsr_start) {
/*
- * Server cannot send empty range to client, this is why
- * we check here that range from client is "newer" than
- * exhausted super.
+ * Client is replaying a fairly old range, server
+ * don't need to do any allocation.
*/
- LASSERT(in->lr_end > space->lr_start);
-
+ } else if (range_is_exhausted(space)) {
/*
* Start is set to end of last allocated, because it
* *is* already allocated so we take that into account
* and do not use for other allocations.
*/
- space->lr_start = in->lr_end;
+ space->lsr_start = in->lsr_end;
/*
- * End is set to in->lr_start + super sequence
- * allocation unit. That is because in->lr_start is
+ * End is set to in->lsr_start + super sequence
+ * allocation unit. That is because in->lsr_start is
* first seq in new allocated range from controller
* before failure.
*/
- space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
+ space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
if (!seq->lss_cli) {
CERROR("%s: No sequence controller "
* obtained range from it was @space.
*/
rc = seq_client_replay_super(seq->lss_cli, space, env);
+
if (rc) {
CERROR("%s: Can't replay super-sequence, "
"rc %d\n", seq->lss_name, rc);
* Update super start by end from client's range. Super
* end should not be changed if range was not exhausted.
*/
- if (in->lr_end > space->lr_start)
- space->lr_start = in->lr_end;
+ space->lsr_start = in->lsr_end;
+ }
+
+ /* sending replay_super to update fld as only super sequence
+ * server can update fld.
+ * we are sending meta sequence to fld rather than super
+ * sequence, but fld server can handle range merging. */
+
+ in->lsr_mdt = space->lsr_mdt;
+ rc = seq_client_replay_super(seq->lss_cli, in, env);
+
+ if (rc) {
+ CERROR("%s: Can't replay super-sequence, "
+ "rc %d\n", seq->lss_name, rc);
+ RETURN(rc);
}
*out = *in;
range_alloc(out, space, seq->lss_width);
}
- rc = seq_store_write(seq, env);
+ th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
DRANGE"\n", seq->lss_name, PRANGE(out));
}
+ seq_store_trans_stop(seq, env, th);
RETURN(rc);
}
int seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
int rc;
ENTRY;
- down(&seq->lss_sem);
+ cfs_down(&seq->lss_sem);
rc = __seq_server_alloc_meta(seq, in, out, env);
- up(&seq->lss_sem);
+ cfs_up(&seq->lss_sem);
RETURN(rc);
}
static int seq_server_handle(struct lu_site *site,
const struct lu_env *env,
- __u32 opc, struct lu_range *in,
- struct lu_range *out)
+ __u32 opc, struct lu_seq_range *in,
+ struct lu_seq_range *out)
{
int rc;
+ struct md_site *mite;
ENTRY;
+ mite = lu_site2md(site);
switch (opc) {
case SEQ_ALLOC_META:
- if (!site->ls_server_seq) {
+ if (!mite->ms_server_seq) {
CERROR("Sequence server is not "
"initialized\n");
RETURN(-EINVAL);
}
- rc = seq_server_alloc_meta(site->ls_server_seq,
+ rc = seq_server_alloc_meta(mite->ms_server_seq,
in, out, env);
break;
case SEQ_ALLOC_SUPER:
- if (!site->ls_control_seq) {
+ if (!mite->ms_control_seq) {
CERROR("Sequence controller is not "
"initialized\n");
RETURN(-EINVAL);
}
- rc = seq_server_alloc_super(site->ls_control_seq,
+ rc = seq_server_alloc_super(mite->ms_control_seq,
in, out, env);
break;
default:
const struct lu_env *env,
struct seq_thread_info *info)
{
- struct lu_range *out, *in = NULL;
+ struct lu_seq_range *out, *in = NULL, *tmp;
struct lu_site *site;
int rc = -EPROTO;
__u32 *opc;
site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
LASSERT(site != NULL);
- rc = req_capsule_pack(&info->sti_pill);
+ rc = req_capsule_server_pack(info->sti_pill);
if (rc)
RETURN(err_serious(rc));
- opc = req_capsule_client_get(&info->sti_pill,
- &RMF_SEQ_OPC);
+ opc = req_capsule_client_get(info->sti_pill, &RMF_SEQ_OPC);
if (opc != NULL) {
- out = req_capsule_server_get(&info->sti_pill,
- &RMF_SEQ_RANGE);
+ out = req_capsule_server_get(info->sti_pill, &RMF_SEQ_RANGE);
if (out == NULL)
RETURN(err_serious(-EPROTO));
+ tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
+
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
- in = req_capsule_client_get(&info->sti_pill,
- &RMF_SEQ_RANGE);
+ in = tmp;
- LASSERT(!range_is_zero(in) && range_is_sane(in));
+ if (range_is_zero(in) || !range_is_sane(in)) {
+ CERROR("Replayed seq range is invalid: "
+ DRANGE"\n", PRANGE(in));
+ RETURN(err_serious(-EINVAL));
+ }
}
+ /* seq client passed mdt id, we need to pass that using out
+ * range parameter */
+ out->lsr_mdt = tmp->lsr_mdt;
rc = seq_server_handle(site, env, *opc, in, out);
} else
rc = err_serious(-EPROTO);
static void seq_thread_info_init(struct ptlrpc_request *req,
struct seq_thread_info *info)
{
- int i;
-
- /* Mark rep buffer as req-layout stuff expects */
- for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
- info->sti_rep_buf_size[i] = -1;
-
+ info->sti_pill = &req->rq_pill;
/* Init request capsule */
- req_capsule_init(&info->sti_pill, req, RCL_SERVER,
- info->sti_rep_buf_size);
-
- req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
+ req_capsule_init(info->sti_pill, req, RCL_SERVER);
+ req_capsule_set(info->sti_pill, &RQF_SEQ_QUERY);
}
static void seq_thread_info_fini(struct seq_thread_info *info)
{
- req_capsule_fini(&info->sti_pill);
+ req_capsule_fini(info->sti_pill);
}
static int seq_handle(struct ptlrpc_request *req)
*/
int seq_query(struct com_thread_info *info)
{
- return seq_handle(info->cti_pill.rc_req);
+ return seq_handle(info->cti_pill->rc_req);
}
EXPORT_SYMBOL(seq_query);
struct dt_device *dev,
const char *prefix,
enum lu_mgr_type type,
+ struct md_site *ms,
const struct lu_env *env)
{
+ struct thandle *th;
int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
ENTRY;
seq->lss_cli = NULL;
seq->lss_type = type;
- range_zero(&seq->lss_space);
- sema_init(&seq->lss_sem, 1);
+ seq->lss_site = ms;
+ range_init(&seq->lss_space);
+ cfs_sema_init(&seq->lss_sem, 1);
seq->lss_width = is_srv ?
LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
rc = seq_store_init(seq, env, dev);
if (rc)
GOTO(out, rc);
-
/* Request backing store for saved sequence info. */
rc = seq_store_read(seq, env);
if (rc == -ENODATA) {
LUSTRE_SEQ_ZERO_RANGE:
LUSTRE_SEQ_SPACE_RANGE;
+ seq->lss_space.lsr_mdt = ms->ms_node_id;
CDEBUG(D_INFO, "%s: No data found "
"on store. Initialize space\n",
seq->lss_name);
+ th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
/* Save default controller value to store. */
- rc = seq_store_write(seq, env);
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, "
"rc %d\n", seq->lss_name, rc);
}
+ seq_store_trans_stop(seq, env, th);
} else if (rc) {
CERROR("%s: Can't read space data, rc %d\n",
seq->lss_name, rc);
cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
+static struct lu_local_obj_desc llod_seq_srv = {
+ .llod_name = LUSTRE_SEQ_SRV_NAME,
+ .llod_oid = FID_SEQ_SRV_OID,
+ .llod_is_index = 0,
+};
+
+static struct lu_local_obj_desc llod_seq_ctl = {
+ .llod_name = LUSTRE_SEQ_CTL_NAME,
+ .llod_oid = FID_SEQ_CTL_OID,
+ .llod_is_index = 0,
+};
+
static int __init fid_mod_init(void)
{
seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
if (IS_ERR(seq_type_proc_dir))
return PTR_ERR(seq_type_proc_dir);
+ llo_local_obj_register(&llod_seq_srv);
+ llo_local_obj_register(&llod_seq_ctl);
+
LU_CONTEXT_KEY_INIT(&seq_thread_key);
lu_context_key_register(&seq_thread_key);
return 0;
static void __exit fid_mod_exit(void)
{
+ llo_local_obj_unregister(&llod_seq_srv);
+ llo_local_obj_unregister(&llod_seq_ctl);
+
lu_context_key_degister(&seq_thread_key);
if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
lprocfs_remove(&seq_type_proc_dir);
}
}
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
MODULE_DESCRIPTION("Lustre FID Module");
MODULE_LICENSE("GPL");