GOTO(out_up, rc = 0);
}
- if (seq->lss_cli != NULL) {
- CERROR("%s: Sequence controller is already "
- "assigned\n", seq->lss_name);
- GOTO(out_up, rc = -EINVAL);
- }
+ if (seq->lss_cli != NULL) {
+ CDEBUG(D_HA, "%s: Sequence controller is already "
+ "assigned\n", seq->lss_name);
+ GOTO(out_up, rc = -EEXIST);
+ }
CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
seq->lss_name, cli->lcs_name);
LU_KEY_INIT_FINI(seq, struct seq_thread_info);
/* context key: seq_thread_key */
-LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
+LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD | LCT_DT_THREAD);
static void seq_thread_info_init(struct ptlrpc_request *req,
struct seq_thread_info *info)
}
EXPORT_SYMBOL(seq_server_fini);
+int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss)
+{
+ if (ss == NULL)
+ RETURN(0);
+
+ if (ss->ss_server_seq) {
+ seq_server_fini(ss->ss_server_seq, env);
+ OBD_FREE_PTR(ss->ss_server_seq);
+ ss->ss_server_seq = NULL;
+ }
+
+ if (ss->ss_control_seq) {
+ seq_server_fini(ss->ss_control_seq, env);
+ OBD_FREE_PTR(ss->ss_control_seq);
+ ss->ss_control_seq = NULL;
+ }
+
+ if (ss->ss_client_seq) {
+ seq_client_fini(ss->ss_client_seq);
+ OBD_FREE_PTR(ss->ss_client_seq);
+ ss->ss_client_seq = NULL;
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(seq_site_fini);
+
cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
static int __init fid_mod_init(void)
CDEBUG(D_INFO, "%s: allocate sequence "
"[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr);
- /*Since the caller require the whole seq,
- *so marked this seq to be used*/
- seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH;
- seq->lcs_fid.f_seq = *seqnr;
- seq->lcs_fid.f_ver = 0;
+ /*Since the caller require the whole seq,
+ *so marked this seq to be used*/
+ seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+ seq->lcs_fid.f_seq = *seqnr;
+ seq->lcs_fid.f_ver = 0;
/*
* Inform caller that sequence switch is performed to allow it
const char *prefix,
struct lu_server_seq *srv)
{
- int rc;
- ENTRY;
+ int rc;
+ ENTRY;
- LASSERT(seq != NULL);
- LASSERT(prefix != NULL);
+ LASSERT(seq != NULL);
+ LASSERT(prefix != NULL);
+
+ seq->lcs_srv = srv;
+ seq->lcs_type = type;
- seq->lcs_exp = exp;
- seq->lcs_srv = srv;
- seq->lcs_type = type;
mutex_init(&seq->lcs_mutex);
- seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
- cfs_waitq_init(&seq->lcs_waitq);
+ if (type == LUSTRE_SEQ_METADATA)
+ seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+ else
+ seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
- /* Make sure that things are clear before work is started. */
- seq_client_flush(seq);
+ cfs_waitq_init(&seq->lcs_waitq);
+ /* Make sure that things are clear before work is started. */
+ seq_client_flush(seq);
- if (exp == NULL) {
- LASSERT(seq->lcs_srv != NULL);
- } else {
- LASSERT(seq->lcs_exp != NULL);
- seq->lcs_exp = class_export_get(seq->lcs_exp);
- }
+ if (exp != NULL)
+ seq->lcs_exp = class_export_get(exp);
+ else if (type == LUSTRE_SEQ_METADATA)
+ LASSERT(seq->lcs_srv != NULL);
- snprintf(seq->lcs_name, sizeof(seq->lcs_name),
- "cli-%s", prefix);
+ snprintf(seq->lcs_name, sizeof(seq->lcs_name),
+ "cli-%s", prefix);
- rc = seq_client_proc_init(seq);
- if (rc)
- seq_client_fini(seq);
- RETURN(rc);
+ rc = seq_client_proc_init(seq);
+ if (rc)
+ seq_client_fini(seq);
+ RETURN(rc);
}
EXPORT_SYMBOL(seq_client_init);
RETURN(rc);
}
- if (val <= LUSTRE_SEQ_MAX_WIDTH && val > 0) {
- seq->lcs_width = val;
+ if (val <= LUSTRE_METADATA_SEQ_MAX_WIDTH && val > 0) {
+ seq->lcs_width = val;
- if (rc == 0) {
- CDEBUG(D_INFO, "%s: Sequence size: "LPU64"\n",
- seq->lcs_name, seq->lcs_width);
- }
- }
+ if (rc == 0) {
+ CDEBUG(D_INFO, "%s: Sequence size: "LPU64"\n",
+ seq->lcs_name, seq->lcs_width);
+ }
+ }
mutex_unlock(&seq->lcs_mutex);
extern const struct lu_fid LU_DOT_LUSTRE_FID;
enum {
- /*
- * This is how may FIDs may be allocated in one sequence(128k)
- */
- LUSTRE_SEQ_MAX_WIDTH = 0x0000000000020000ULL,
+ /*
+ * This is how may metadata FIDs may be allocated in one sequence(128k)
+ */
+ LUSTRE_METADATA_SEQ_MAX_WIDTH = 0x0000000000020000ULL,
- /*
- * How many sequences to allocate to a client at once.
- */
- LUSTRE_SEQ_META_WIDTH = 0x0000000000000001ULL,
+ /*
+ * This is how many data FIDs could be allocated in one sequence(4B - 1)
+ */
+ LUSTRE_DATA_SEQ_MAX_WIDTH = 0x00000000FFFFFFFFULL,
- /*
- * seq allocation pool size.
- */
- LUSTRE_SEQ_BATCH_WIDTH = LUSTRE_SEQ_META_WIDTH * 1000,
+ /*
+ * How many sequences to allocate to a client at once.
+ */
+ LUSTRE_SEQ_META_WIDTH = 0x0000000000000001ULL,
- /*
- * This is how many sequences may be in one super-sequence allocated to
- * MDTs.
- */
- LUSTRE_SEQ_SUPER_WIDTH = ((1ULL << 30ULL) * LUSTRE_SEQ_META_WIDTH)
+ /*
+ * seq allocation pool size.
+ */
+ LUSTRE_SEQ_BATCH_WIDTH = LUSTRE_SEQ_META_WIDTH * 1000,
+
+ /*
+ * This is how many sequences may be in one super-sequence allocated to
+ * MDTs.
+ */
+ LUSTRE_SEQ_SUPER_WIDTH = ((1ULL << 30ULL) * LUSTRE_SEQ_META_WIDTH)
};
enum {
int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq,
seqno_t *seqnr);
+int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss);
/* Fids common stuff */
int fid_is_local(const struct lu_env *env,
struct lu_site *site, const struct lu_fid *fid);
static int mdt_seq_fini(const struct lu_env *env,
struct mdt_device *m)
{
- struct seq_server_site *ss = mdt_seq_site(m);
- ENTRY;
-
- if (ss == NULL)
- RETURN(0);
-
- if (ss->ss_server_seq) {
- seq_server_fini(ss->ss_server_seq, env);
- OBD_FREE_PTR(ss->ss_server_seq);
- ss->ss_server_seq = NULL;
- }
-
- if (ss->ss_control_seq) {
- seq_server_fini(ss->ss_control_seq, env);
- OBD_FREE_PTR(ss->ss_control_seq);
- ss->ss_control_seq = NULL;
- }
-
- if (ss->ss_client_seq) {
- seq_client_fini(ss->ss_client_seq);
- OBD_FREE_PTR(ss->ss_client_seq);
- ss->ss_client_seq = NULL;
- }
-
- RETURN(0);
+ return seq_site_fini(env, mdt_seq_site(m));
}
static int mdt_seq_init(const struct lu_env *env,
if (cfs_copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
return -EFAULT;
- max_count = LUSTRE_SEQ_MAX_WIDTH;
- if (cfs_copy_to_user(data->ioc_pbuf2, &max_count,
- data->ioc_plen2))
- return -EFAULT;
- GOTO(out, rc);
+ max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+ if (cfs_copy_to_user(data->ioc_pbuf2, &max_count,
+ data->ioc_plen2))
+ return -EFAULT;
+ GOTO(out, rc);
}
case OBD_IOC_DESTROY:
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
#include <obd_class.h>
#include <lustre_param.h>
+#include <lustre_fid.h>
#include "ofd_internal.h"
extern int ost_handle(struct ptlrpc_request *req);
+int ofd_fid_fini(const struct lu_env *env, struct ofd_device *ofd)
+{
+ return seq_site_fini(env, &ofd->ofd_seq_site);
+}
+
+int ofd_fid_init(const struct lu_env *env, struct ofd_device *ofd)
+{
+ struct seq_server_site *ss = &ofd->ofd_seq_site;
+ struct lu_device *lu = &ofd->ofd_dt_dev.dd_lu_dev;
+ char *obd_name = ofd_name(ofd);
+ char *name = NULL;
+ int rc = 0;
+
+ ss = &ofd->ofd_seq_site;
+ lu->ld_site->ld_seq_site = ss;
+ ss->ss_lu = lu->ld_site;
+ ss->ss_node_id = ofd->ofd_lut.lut_lsd.lsd_osd_index;
+
+ OBD_ALLOC_PTR(ss->ss_server_seq);
+ if (ss->ss_server_seq == NULL)
+ GOTO(out_free, rc = -ENOMEM);
+
+ OBD_ALLOC(name, strlen(obd_name) + 10);
+ if (!name) {
+ OBD_FREE_PTR(ss->ss_server_seq);
+ ss->ss_server_seq = NULL;
+ GOTO(out_free, rc = -ENOMEM);
+ }
+
+ rc = seq_server_init(ss->ss_server_seq, ofd->ofd_osd, obd_name,
+ LUSTRE_SEQ_SERVER, ss, env);
+ if (rc) {
+ CERROR("%s : seq server init error %d\n", obd_name, rc);
+ GOTO(out_free, rc);
+ }
+ ss->ss_server_seq->lss_space.lsr_index = ss->ss_node_id;
+
+ OBD_ALLOC_PTR(ss->ss_client_seq);
+ if (ss->ss_client_seq == NULL)
+ GOTO(out_free, -ENOMEM);
+
+ snprintf(name, strlen(obd_name) + 6, "%p-super", obd_name);
+ rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_DATA,
+ name, NULL);
+ if (rc) {
+ CERROR("%s : seq client init error %d\n", obd_name, rc);
+ GOTO(out_free, rc);
+ }
+ OBD_FREE(name, strlen(obd_name) + 10);
+ name = NULL;
+
+ rc = seq_server_set_cli(ss->ss_server_seq, ss->ss_client_seq, env);
+
+out_free:
+ if (rc) {
+ if (ss->ss_server_seq) {
+ seq_server_fini(ss->ss_server_seq, env);
+ OBD_FREE_PTR(ss->ss_server_seq);
+ ss->ss_server_seq = NULL;
+ }
+
+ if (ss->ss_client_seq) {
+ seq_client_fini(ss->ss_client_seq);
+ OBD_FREE_PTR(ss->ss_client_seq);
+ ss->ss_client_seq = NULL;
+ }
+
+ if (name) {
+ OBD_FREE(name, strlen(obd_name) + 10);
+ name = NULL;
+ }
+ }
+
+ return rc;
+}
+
static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
struct lu_device_type *ldt, struct lustre_cfg *cfg)
{
RETURN(rc);
}
+static void ofd_deregister_seq_exp(struct ofd_device *ofd)
+{
+ struct seq_server_site *ss = &ofd->ofd_seq_site;
+
+ if (ss->ss_client_seq != NULL) {
+ lustre_deregister_osp_item(&ss->ss_client_seq->lcs_exp);
+ ss->ss_client_seq->lcs_exp = NULL;
+ }
+}
+
void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
{
struct ofd_seq *oseq;
struct ofd_seq *tmp;
cfs_list_t dispose;
+ int rc;
+
+ ofd_deregister_seq_exp(ofd);
+
+ rc = ofd_fid_fini(env, ofd);
+ if (rc != 0)
+ CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
CFS_INIT_LIST_HEAD(&dispose);
write_lock(&ofd->ofd_seq_list_lock);
return ERR_PTR(rc);
}
+static int ofd_register_seq_exp(struct ofd_device *ofd)
+{
+ struct seq_server_site *ss = &ofd->ofd_seq_site;
+ char *osp_name = NULL;
+ int rc;
+
+ OBD_ALLOC(osp_name, MAX_OBD_NAME);
+ if (osp_name == NULL)
+ GOTO(out_free, rc = -ENOMEM);
+
+ rc = tgt_name2ospname(ofd_name(ofd), osp_name);
+ if (rc != 0)
+ GOTO(out_free, rc);
+
+ rc = lustre_register_osp_item(osp_name, &ss->ss_client_seq->lcs_exp,
+ NULL, NULL);
+out_free:
+ if (osp_name != NULL)
+ OBD_FREE(osp_name, MAX_OBD_NAME);
+
+ return rc;
+}
+
/* object sequence management */
int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
{
+ int rc;
+
+ rc = ofd_fid_init(env, ofd);
+ if (rc != 0) {
+ CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
+ return rc;
+ }
+
+ rc = ofd_register_seq_exp(ofd);
+ if (rc) {
+ CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
+ return rc;
+ }
+
rwlock_init(&ofd->ofd_seq_list_lock);
CFS_INIT_LIST_HEAD(&ofd->ofd_seq_list);
ofd->ofd_seq_count = 0;
- return 0;
+ return rc;
}
int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd,
/* shall we grant space to clients not
* supporting OBD_CONNECT_GRANT_PARAM? */
ofd_grant_compat_disable:1;
+ struct seq_server_site ofd_seq_site;
};
static inline struct ofd_device *ofd_dev(struct lu_device *d)
#define ofd_fmd_drop(exp, fid) do {} while (0)
#endif
+/* ofd_dev.c */
+int ofd_fid_set_index(const struct lu_env *env, struct ofd_device *ofd,
+ int index);
+int ofd_fid_init(const struct lu_env *env, struct ofd_device *ofd);
+int ofd_fid_fini(const struct lu_env *env, struct ofd_device *ofd);
+
/* ofd_lvb.c */
extern struct ldlm_valblock_ops ofd_lvbo;
ofd_obd(ofd)->obd_name);
GOTO(out, rc = 0);
}
- /* only precreate if seq == 0 and o_id is specfied */
- if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) {
+ /* only precreate if seq is 0, IDIF or normal and also o_id
+ * must be specfied */
+ if ((!fid_seq_is_mdt(oa->o_seq) &&
+ !fid_seq_is_norm(oa->o_seq) &&
+ !fid_seq_is_idif(oa->o_seq)) || oa->o_id == 0) {
diff = 1; /* shouldn't we create this right now? */
} else {
diff = oa->o_id - ofd_seq_last_oid(oseq);