* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <lustre_fid.h>
#include "fid_internal.h"
+int client_fid_init(struct obd_export *exp, enum lu_cli_type type)
+{
+ struct client_obd *cli = &exp->exp_obd->u.cli;
+ char *prefix;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC_PTR(cli->cl_seq);
+ if (cli->cl_seq == NULL)
+ RETURN(-ENOMEM);
+
+ OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+ if (prefix == NULL)
+ GOTO(out_free_seq, rc = -ENOMEM);
+
+ snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s",
+ exp->exp_obd->obd_name);
+
+ /* Init client side sequence-manager */
+ rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL);
+ OBD_FREE(prefix, MAX_OBD_NAME + 5);
+ if (rc)
+ GOTO(out_free_seq, rc);
+
+ RETURN(rc);
+out_free_seq:
+ OBD_FREE_PTR(cli->cl_seq);
+ cli->cl_seq = NULL;
+ return rc;
+}
+EXPORT_SYMBOL(client_fid_init);
+
+int client_fid_fini(struct obd_export *exp)
+{
+ struct client_obd *cli = &exp->exp_obd->u.cli;
+ ENTRY;
+
+ if (cli->cl_seq != NULL) {
+ seq_client_fini(cli->cl_seq);
+ OBD_FREE_PTR(cli->cl_seq);
+ cli->cl_seq = NULL;
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(client_fid_fini);
+
#ifdef __KERNEL__
+static void seq_server_proc_fini(struct lu_server_seq *seq);
+
/* Assigns client to sequence controller node. */
int seq_server_set_cli(struct lu_server_seq *seq,
struct lu_client_seq *cli,
* Ask client for new range, assign that range to ->seq_space and write
* seq state to backing store should be atomic.
*/
- cfs_mutex_lock(&seq->lss_mutex);
+ mutex_lock(&seq->lss_mutex);
if (cli == NULL) {
CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
GOTO(out_up, rc = 0);
}
- if (seq->lss_cli != NULL) {
- CERROR("%s: Sequence controller is already "
- "assigned\n", seq->lss_name);
- GOTO(out_up, rc = -EINVAL);
- }
+ if (seq->lss_cli != NULL) {
+ CDEBUG(D_HA, "%s: Sequence controller is already "
+ "assigned\n", seq->lss_name);
+ GOTO(out_up, rc = -EEXIST);
+ }
CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
seq->lss_name, cli->lcs_name);
- seq->lss_cli = cli;
- cli->lcs_space.lsr_index = seq->lss_site->ms_node_id;
- EXIT;
+ seq->lss_cli = cli;
+ cli->lcs_space.lsr_index = seq->lss_site->ss_node_id;
+ EXIT;
out_up:
- cfs_mutex_unlock(&seq->lss_mutex);
+ mutex_unlock(&seq->lss_mutex);
return rc;
}
EXPORT_SYMBOL(seq_server_set_cli);
int rc;
ENTRY;
- cfs_mutex_lock(&seq->lss_mutex);
+ mutex_lock(&seq->lss_mutex);
rc = __seq_server_alloc_super(seq, out, env);
- cfs_mutex_unlock(&seq->lss_mutex);
+ mutex_unlock(&seq->lss_mutex);
RETURN(rc);
}
if (range_is_exhausted(loset)) {
/* reached high water mark. */
- struct lu_device *dev = seq->lss_site->ms_lu->ls_top_dev;
+ struct lu_device *dev = seq->lss_site->ss_lu->ls_top_dev;
int obd_num_clients = dev->ld_obd->obd_num_exports;
__u64 set_sz;
int rc;
ENTRY;
- cfs_mutex_lock(&seq->lss_mutex);
+ mutex_lock(&seq->lss_mutex);
rc = __seq_server_alloc_meta(seq, out, env);
- cfs_mutex_unlock(&seq->lss_mutex);
+ mutex_unlock(&seq->lss_mutex);
RETURN(rc);
}
const struct lu_env *env,
__u32 opc, struct lu_seq_range *out)
{
- int rc;
- struct md_site *mite;
- ENTRY;
+ int rc;
+ struct seq_server_site *ss_site;
+ ENTRY;
- mite = lu_site2md(site);
- switch (opc) {
- case SEQ_ALLOC_META:
- if (!mite->ms_server_seq) {
- CERROR("Sequence server is not "
- "initialized\n");
- RETURN(-EINVAL);
- }
- rc = seq_server_alloc_meta(mite->ms_server_seq, out, env);
- break;
- case SEQ_ALLOC_SUPER:
- if (!mite->ms_control_seq) {
- CERROR("Sequence controller is not "
- "initialized\n");
- RETURN(-EINVAL);
- }
- rc = seq_server_alloc_super(mite->ms_control_seq, out, env);
- break;
- default:
- rc = -EINVAL;
- break;
- }
+ ss_site = lu_site2seq(site);
- RETURN(rc);
+ switch (opc) {
+ case SEQ_ALLOC_META:
+ if (!ss_site->ss_server_seq) {
+ CERROR("Sequence server is not "
+ "initialized\n");
+ RETURN(-EINVAL);
+ }
+ rc = seq_server_alloc_meta(ss_site->ss_server_seq, out, env);
+ break;
+ case SEQ_ALLOC_SUPER:
+ if (!ss_site->ss_control_seq) {
+ CERROR("Sequence controller is not "
+ "initialized\n");
+ RETURN(-EINVAL);
+ }
+ rc = seq_server_alloc_super(ss_site->ss_control_seq, out, env);
+ break;
+ default:
+ rc = -EINVAL;
+ break;
+ }
+
+ RETURN(rc);
}
static int seq_req_handle(struct ptlrpc_request *req,
LU_KEY_INIT_FINI(seq, struct seq_thread_info);
/* context key: seq_thread_key */
-LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
+LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD | LCT_DT_THREAD);
static void seq_thread_info_init(struct ptlrpc_request *req,
struct seq_thread_info *info)
req_capsule_fini(info->sti_pill);
}
-static int seq_handle(struct ptlrpc_request *req)
+int seq_handle(struct ptlrpc_request *req)
{
const struct lu_env *env;
struct seq_thread_info *info;
return rc;
}
+EXPORT_SYMBOL(seq_handle);
/*
* Entry point for handling FLD RPCs called from MDT.
}
EXPORT_SYMBOL(seq_query);
-static void seq_server_proc_fini(struct lu_server_seq *seq);
#ifdef LPROCFS
static int seq_server_proc_init(struct lu_server_seq *seq)
int seq_server_init(struct lu_server_seq *seq,
- struct dt_device *dev,
- const char *prefix,
- enum lu_mgr_type type,
- struct md_site *ms,
- const struct lu_env *env)
+ struct dt_device *dev,
+ const char *prefix,
+ enum lu_mgr_type type,
+ struct seq_server_site *ss,
+ const struct lu_env *env)
{
int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
ENTRY;
LASSERT(dev != NULL);
LASSERT(prefix != NULL);
+ LASSERT(ss != NULL);
+ LASSERT(ss->ss_lu != NULL);
- seq->lss_cli = NULL;
- seq->lss_type = type;
- seq->lss_site = ms;
- range_init(&seq->lss_space);
+ seq->lss_cli = NULL;
+ seq->lss_type = type;
+ seq->lss_site = ss;
+ range_init(&seq->lss_space);
range_init(&seq->lss_lowater_set);
range_init(&seq->lss_hiwater_set);
seq->lss_set_width = LUSTRE_SEQ_BATCH_WIDTH;
- cfs_mutex_init(&seq->lss_mutex);
+ mutex_init(&seq->lss_mutex);
seq->lss_width = is_srv ?
LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
LUSTRE_SEQ_ZERO_RANGE:
LUSTRE_SEQ_SPACE_RANGE;
- seq->lss_space.lsr_index = ms->ms_node_id;
+ LASSERT(ss != NULL);
+ seq->lss_space.lsr_index = ss->ss_node_id;
LCONSOLE_INFO("%s: No data found "
"on store. Initialize space\n",
seq->lss_name);
}
EXPORT_SYMBOL(seq_server_fini);
+int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss)
+{
+ if (ss == NULL)
+ RETURN(0);
+
+ if (ss->ss_server_seq) {
+ seq_server_fini(ss->ss_server_seq, env);
+ OBD_FREE_PTR(ss->ss_server_seq);
+ ss->ss_server_seq = NULL;
+ }
+
+ if (ss->ss_control_seq) {
+ seq_server_fini(ss->ss_control_seq, env);
+ OBD_FREE_PTR(ss->ss_control_seq);
+ ss->ss_control_seq = NULL;
+ }
+
+ if (ss->ss_client_seq) {
+ seq_client_fini(ss->ss_client_seq);
+ OBD_FREE_PTR(ss->ss_client_seq);
+ ss->ss_client_seq = NULL;
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(seq_site_fini);
+
cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
static int __init fid_mod_init(void)