}
/**
- * Allocate the whole seq to the caller, currently it would be
- * only used by echo client to access MDT
+ * Allocate the whole seq to the caller.
**/
int seq_client_get_seq(const struct lu_env *env,
struct lu_client_seq *seq, seqno_t *seqnr)
/* Since the caller require the whole seq,
* so marked this seq to be used */
- LASSERT(seq->lcs_type == LUSTRE_SEQ_METADATA);
- seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+ if (seq->lcs_type == LUSTRE_SEQ_METADATA)
+ seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+ else
+ seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
+
seq->lcs_fid.f_seq = *seqnr;
seq->lcs_fid.f_ver = 0;
-
/*
* Inform caller that sequence switch is performed to allow it
* to setup FLD for it.
struct lu_fid *fid);
int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq,
seqno_t *seqnr);
-
int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss);
/* Fids common stuff */
int fid_is_local(const struct lu_env *env,
RETURN(ino ? ino : fid_oid(fid));
}
+static inline int lu_fid_diff(struct lu_fid *fid1, struct lu_fid *fid2)
+{
+ LASSERTF(fid_seq(fid1) == fid_seq(fid2), "fid1:"DFID", fid2:"DFID"\n",
+ PFID(fid1), PFID(fid2));
+
+ if (fid_is_idif(fid1) && fid_is_idif(fid2))
+ return fid_idif_id(fid1->f_seq, fid1->f_oid, fid1->f_ver) -
+ fid_idif_id(fid2->f_seq, fid2->f_oid, fid2->f_ver);
+
+ return fid_oid(fid1) - fid_oid(fid2);
+}
+
#define LUSTRE_SEQ_SRV_NAME "seq_srv"
#define LUSTRE_SEQ_CTL_NAME "seq_ctl"
extern struct req_format RQF_OST_SET_GRANT_INFO;
extern struct req_format RQF_OST_GET_INFO_GENERIC;
extern struct req_format RQF_OST_GET_INFO_LAST_ID;
+extern struct req_format RQF_OST_GET_INFO_LAST_FID;
+extern struct req_format RQF_OST_SET_INFO_LAST_FID;
extern struct req_format RQF_OST_GET_INFO_FIEMAP;
/* LDLM req_format */
/* seq-mgr fields */
extern struct req_msg_field RMF_SEQ_OPC;
extern struct req_msg_field RMF_SEQ_RANGE;
+extern struct req_msg_field RMF_FID_SPACE;
/* FLD fields */
extern struct req_msg_field RMF_FLD_OPC;
extern struct req_msg_field RMF_OST_BODY;
extern struct req_msg_field RMF_OBD_IOOBJ;
extern struct req_msg_field RMF_OBD_ID;
+extern struct req_msg_field RMF_FID;
extern struct req_msg_field RMF_NIOBUF_REMOTE;
extern struct req_msg_field RMF_RCS;
extern struct req_msg_field RMF_FIEMAP_KEY;
#define KEY_INIT_RECOV "initial_recov"
#define KEY_INTERMDS "inter_mds"
#define KEY_LAST_ID "last_id"
+#define KEY_LAST_FID "last_fid"
#define KEY_LOCK_TO_STRIPE "lock_to_stripe"
#define KEY_LOVDESC "lovdesc"
#define KEY_LOV_IDX "lov_idx"
RETURN(0);
CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
- " ocd_version: %x ocd_grant: %d ocd_index: %u\n",
+ " ocd_version: %x ocd_grant: %d ocd_index: %u"
+ " ocd_group %u\n",
exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
data->ocd_connect_flags, data->ocd_version,
- data->ocd_grant, data->ocd_index);
+ data->ocd_grant, data->ocd_index, data->ocd_group);
if (fed->fed_group != 0 && fed->fed_group != data->ocd_group) {
CWARN("!!! This export (nid %s) used object group %d "
} else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) {
*((__u32 *) val) = ofd->ofd_sync_lock_cancel;
*vallen = sizeof(__u32);
+ } else if (KEY_IS(KEY_LAST_FID)) {
+ struct lu_env env;
+ struct ofd_device *ofd = ofd_exp(exp);
+ struct ofd_seq *oseq;
+ struct lu_fid *last_fid = val;
+ int rc;
+
+ if (last_fid == NULL) {
+ *vallen = sizeof(struct lu_fid);
+ RETURN(0);
+ }
+
+ if (*vallen < sizeof(*last_fid))
+ RETURN(-EOVERFLOW);
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc != 0)
+ RETURN(rc);
+ ofd_info_init(&env, exp);
+ fid_le_to_cpu(last_fid, last_fid);
+ oseq = ofd_seq_load(&env, ofd, fid_seq(last_fid));
+ if (IS_ERR(oseq))
+ GOTO(out_fid, rc = PTR_ERR(oseq));
+
+ last_fid->f_seq = oseq->os_seq;
+ last_fid->f_oid = oseq->os_last_oid;
+ fid_cpu_to_le(last_fid, last_fid);
+
+ *vallen = sizeof(*last_fid);
+ ofd_seq_put(&env, oseq);
+out_fid:
+ lu_env_fini(&env);
} else {
CERROR("Not supported key %s\n", (char*)key);
rc = -EOPNOTSUPP;
if (osp == NULL)
return 0;
- return snprintf(page, count, LPU64"\n", osp->opd_pre_used_id + 1);
+ return snprintf(page, count, "%u\n",
+ fid_oid(&osp->opd_pre_used_fid) + 1);
}
static int osp_rd_prealloc_last_id(char *page, char **start, off_t off,
if (osp == NULL)
return 0;
- return snprintf(page, count, LPU64"\n", osp->opd_pre_last_created);
+ return snprintf(page, count, "%u\n",
+ fid_oid(&osp->opd_pre_last_created_fid));
+}
+
+static int osp_rd_prealloc_next_seq(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
+
+ if (osp == NULL)
+ return 0;
+
+ return snprintf(page, count, LPX64"\n",
+ fid_seq(&osp->opd_pre_used_fid));
+}
+
+static int osp_rd_prealloc_last_seq(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
+
+ if (osp == NULL)
+ return 0;
+
+ return snprintf(page, count, LPX64"\n",
+ fid_seq(&osp->opd_pre_last_created_fid));
}
static int osp_rd_prealloc_reserved(char *page, char **start, off_t off,
{ "max_create_count", osp_rd_max_create_count,
osp_wr_max_create_count, 0 },
{ "prealloc_next_id", osp_rd_prealloc_next_id, 0, 0 },
- { "prealloc_last_id", osp_rd_prealloc_last_id, 0, 0 },
+ { "prealloc_next_seq", osp_rd_prealloc_next_seq, 0, 0 },
+ { "prealloc_last_id", osp_rd_prealloc_last_id, 0, 0 },
+ { "prealloc_last_seq", osp_rd_prealloc_last_seq, 0, 0 },
{ "prealloc_reserved", osp_rd_prealloc_reserved, 0, 0 },
{ "timeouts", lprocfs_rd_timeouts, 0, 0 },
{ "import", lprocfs_rd_import, lprocfs_wr_import, 0 },
*
* Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
* Author: Mikhail Pershin <mike.pershin@intel.com>
+ * Author: Di Wang <di.wang@intel.com>
*/
#ifndef EXPORT_SYMTAB
}
}
-/* Update opd_last_used_id along with checking for gap in objid sequence */
-void osp_update_last_id(struct osp_device *d, obd_id objid)
+static struct dt_object
+*osp_find_or_create(const struct lu_env *env, struct osp_device *osp,
+ struct lu_attr *attr, __u32 reg_id)
{
- /*
- * we might have lost precreated objects due to VBR and precreate
- * orphans, the gap in objid can be calculated properly only here
- */
- if (objid > le64_to_cpu(d->opd_last_used_id)) {
- if (objid - le64_to_cpu(d->opd_last_used_id) > 1) {
- d->opd_gap_start = le64_to_cpu(d->opd_last_used_id) + 1;
- d->opd_gap_count = objid - d->opd_gap_start;
- CDEBUG(D_HA, "Gap in objids: %d, start = %llu\n",
- d->opd_gap_count, d->opd_gap_start);
- }
- d->opd_last_used_id = cpu_to_le64(objid);
+ struct osp_thread_info *osi = osp_env_info(env);
+ struct dt_object_format dof = { 0 };
+ struct dt_object *dto;
+ int rc;
+ ENTRY;
+
+ lu_local_obj_fid(&osi->osi_fid, reg_id);
+ attr->la_valid = LA_MODE;
+ attr->la_mode = S_IFREG | 0644;
+ dof.dof_type = DFT_REGULAR;
+ dto = dt_find_or_create(env, osp->opd_storage, &osi->osi_fid,
+ &dof, attr);
+ if (IS_ERR(dto))
+ RETURN(dto);
+
+ rc = dt_attr_get(env, dto, attr, NULL);
+ if (rc) {
+ CERROR("%s: can't be initialized: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ lu_object_put(env, &dto->do_lu);
+ RETURN(ERR_PTR(rc));
}
+ RETURN(dto);
}
-static int osp_last_used_init(const struct lu_env *env, struct osp_device *m)
+static int osp_write_local_file(const struct lu_env *env,
+ struct osp_device *osp,
+ struct dt_object *dt_obj,
+ struct lu_buf *buf,
+ loff_t offset)
{
- struct osp_thread_info *osi = osp_env_info(env);
- struct dt_object_format dof = { 0 };
- struct dt_object *o;
- int rc;
+ struct thandle *th;
+ int rc;
- ENTRY;
+ th = dt_trans_create(env, osp->opd_storage);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
- osi->osi_attr.la_valid = LA_MODE;
- osi->osi_attr.la_mode = S_IFREG | 0644;
- lu_local_obj_fid(&osi->osi_fid, MDD_LOV_OBJ_OID);
- dof.dof_type = DFT_REGULAR;
- o = dt_find_or_create(env, m->opd_storage, &osi->osi_fid, &dof,
- &osi->osi_attr);
- if (IS_ERR(o))
- RETURN(PTR_ERR(o));
-
- rc = dt_attr_get(env, o, &osi->osi_attr, NULL);
+ rc = dt_declare_record_write(env, dt_obj, buf->lb_len, offset, th);
+ if (rc)
+ GOTO(out, rc);
+ rc = dt_trans_start_local(env, osp->opd_storage, th);
if (rc)
GOTO(out, rc);
- /* object will be released in device cleanup path */
- m->opd_last_used_file = o;
+ rc = dt_record_write(env, dt_obj, buf, &offset, th);
+out:
+ dt_trans_stop(env, osp->opd_storage, th);
+ RETURN(rc);
+}
+
+static int osp_init_last_objid(const struct lu_env *env, struct osp_device *osp)
+{
+ struct osp_thread_info *osi = osp_env_info(env);
+ struct lu_fid *fid = &osp->opd_last_used_fid;
+ struct dt_object *dto;
+ int rc;
+ ENTRY;
- if (osi->osi_attr.la_size >= sizeof(osi->osi_id) *
- (m->opd_index + 1)) {
- osp_objid_buf_prep(osi, m, m->opd_index);
- rc = dt_record_read(env, o, &osi->osi_lb, &osi->osi_off);
+ dto = osp_find_or_create(env, osp, &osi->osi_attr, MDD_LOV_OBJ_OID);
+ if (IS_ERR(dto))
+ RETURN(PTR_ERR(dto));
+ /* object will be released in device cleanup path */
+ if (osi->osi_attr.la_size >=
+ sizeof(osi->osi_id) * (osp->opd_index + 1)) {
+ osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid,
+ osp->opd_index);
+ rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
if (rc != 0)
GOTO(out, rc);
} else {
- /* reset value to 0, just to make sure and change file's size */
- struct thandle *th;
+ fid->f_oid = 0;
+ osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid,
+ osp->opd_index);
+ rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
+ osi->osi_off);
+ }
+ osp->opd_last_used_oid_file = dto;
+ RETURN(0);
+out:
+ /* object will be released in device cleanup path */
+ CERROR("%s: can't initialize lov_objid: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ lu_object_put(env, &dto->do_lu);
+ osp->opd_last_used_oid_file = NULL;
+ RETURN(rc);
+}
- m->opd_last_used_id = 0;
- osp_objid_buf_prep(osi, m, m->opd_index);
+static int osp_init_last_seq(const struct lu_env *env, struct osp_device *osp)
+{
+ struct osp_thread_info *osi = osp_env_info(env);
+ struct lu_fid *fid = &osp->opd_last_used_fid;
+ struct dt_object *dto;
+ int rc;
+ ENTRY;
- th = dt_trans_create(env, m->opd_storage);
- if (IS_ERR(th))
- GOTO(out, rc = PTR_ERR(th));
+ dto = osp_find_or_create(env, osp, &osi->osi_attr, MDD_LOV_OBJ_OSEQ);
+ if (IS_ERR(dto))
+ RETURN(PTR_ERR(dto));
- rc = dt_declare_record_write(env, m->opd_last_used_file,
- osi->osi_lb.lb_len, osi->osi_off,
- th);
- if (rc) {
- dt_trans_stop(env, m->opd_storage, th);
+ /* object will be released in device cleanup path */
+ if (osi->osi_attr.la_size >=
+ sizeof(osi->osi_id) * (osp->opd_index + 1)) {
+ osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
+ osp->opd_index);
+ rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
+ if (rc != 0)
GOTO(out, rc);
- }
+ } else {
+ fid->f_seq = 0;
+ osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
+ osp->opd_index);
+ rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
+ osi->osi_off);
+ }
+ osp->opd_last_used_seq_file = dto;
+ RETURN(0);
+out:
+ /* object will be released in device cleanup path */
+ CERROR("%s: can't initialize lov_seq: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ lu_object_put(env, &dto->do_lu);
+ osp->opd_last_used_seq_file = NULL;
+ RETURN(rc);
+}
+
+static int osp_last_used_init(const struct lu_env *env, struct osp_device *osp)
+{
+ struct osp_thread_info *osi = osp_env_info(env);
+ int rc;
+ ENTRY;
+
+ fid_zero(&osp->opd_last_used_fid);
+ rc = osp_init_last_objid(env, osp);
+ if (rc < 0) {
+ CERROR("%s: Can not get ids %d from old objid!\n",
+ osp->opd_obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ rc = osp_init_last_seq(env, osp);
+ if (rc < 0) {
+ CERROR("%s: Can not get ids %d from old objid!\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
- rc = dt_trans_start_local(env, m->opd_storage, th);
+ if (fid_oid(&osp->opd_last_used_fid) != 0 &&
+ fid_seq(&osp->opd_last_used_fid) == 0) {
+ /* Just upgrade from the old version,
+ * set the seq to be IDIF */
+ osp->opd_last_used_fid.f_seq =
+ fid_idif_seq(fid_oid(&osp->opd_last_used_fid),
+ osp->opd_index);
+ osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off,
+ &osp->opd_last_used_fid.f_seq,
+ osp->opd_index);
+ rc = osp_write_local_file(env, osp, osp->opd_last_used_seq_file,
+ &osi->osi_lb, osi->osi_off);
if (rc) {
- dt_trans_stop(env, m->opd_storage, th);
+ CERROR("%s : Can not write seq file: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
GOTO(out, rc);
}
+ }
- rc = dt_record_write(env, m->opd_last_used_file, &osi->osi_lb,
- &osi->osi_off, th);
- dt_trans_stop(env, m->opd_storage, th);
- if (rc)
- GOTO(out, rc);
+ if (!fid_is_zero(&osp->opd_last_used_fid) &&
+ !fid_is_sane(&osp->opd_last_used_fid)) {
+ CERROR("%s: Got invalid FID "DFID"\n", osp->opd_obd->obd_name,
+ PFID(&osp->opd_last_used_fid));
+ GOTO(out, rc = -EINVAL);
}
- CDEBUG(D_HA, "%s: Read last used ID: "LPU64"\n", m->opd_obd->obd_name,
- le64_to_cpu(m->opd_last_used_id));
- RETURN(0);
+
+ CDEBUG(D_INFO, "%s: Init last used fid "DFID"\n",
+ osp->opd_obd->obd_name, PFID(&osp->opd_last_used_fid));
out:
- CERROR("%s: can't initialize lov_objid: %d\n",
- m->opd_obd->obd_name, rc);
- lu_object_put(env, &o->do_lu);
- m->opd_last_used_file = NULL;
- return rc;
+ if (rc != 0) {
+ if (osp->opd_last_used_oid_file != NULL) {
+ lu_object_put(env, &osp->opd_last_used_oid_file->do_lu);
+ osp->opd_last_used_oid_file = NULL;
+ }
+ if (osp->opd_last_used_seq_file != NULL) {
+ lu_object_put(env, &osp->opd_last_used_seq_file->do_lu);
+ osp->opd_last_used_seq_file = NULL;
+ }
+ }
+
+ RETURN(rc);
}
static void osp_last_used_fini(const struct lu_env *env, struct osp_device *d)
{
- if (d->opd_last_used_file != NULL) {
- lu_object_put(env, &d->opd_last_used_file->do_lu);
- d->opd_last_used_file = NULL;
+ /* release last_used file */
+ if (d->opd_last_used_oid_file != NULL) {
+ lu_object_put(env, &d->opd_last_used_oid_file->do_lu);
+ d->opd_last_used_oid_file = NULL;
+ }
+
+ if (d->opd_last_used_seq_file != NULL) {
+ lu_object_put(env, &d->opd_last_used_seq_file->do_lu);
+ d->opd_last_used_seq_file = NULL;
}
}
* layer above osp (usually lod) can use ffree to estimate
* how many objects are available for immediate creation
*/
+
spin_lock(&d->opd_pre_lock);
- sfs->os_fprecreated = d->opd_pre_last_created - d->opd_pre_used_id;
+ LASSERTF(fid_seq(&d->opd_pre_last_created_fid) ==
+ fid_seq(&d->opd_pre_used_fid),
+ "last_created "DFID", next_fid "DFID"\n",
+ PFID(&d->opd_pre_last_created_fid),
+ PFID(&d->opd_pre_used_fid));
+ sfs->os_fprecreated = fid_oid(&d->opd_pre_last_created_fid) -
+ fid_oid(&d->opd_pre_used_fid);
sfs->os_fprecreated -= d->opd_pre_reserved;
spin_unlock(&d->opd_pre_lock);
ptlrpc_pinger_add_import(imp);
- /* set seq controller export for MDC0 if exists */
- if (osp->opd_connect_mdt && !is_osp_for_connection(obd->obd_name) &&
- data->ocd_index == 0) {
+ if (osp->opd_connect_mdt && data->ocd_index == 0 &&
+ !is_osp_for_connection(obd->obd_name)) {
struct seq_server_site *ss;
ss = lu_site2seq(osp2lu_dev(osp)->ld_site);
d->opd_imp_seen_connected = 1;
if (d->opd_connect_mdt)
break;
- if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL)
- d->opd_obd->u.cli.cl_seq->lcs_exp =
- class_export_get(d->opd_exp);
cfs_waitq_signal(&d->opd_pre_waitq);
__osp_sync_check_for_work(d);
CDEBUG(D_HA, "got connected\n");
/* device used to store persistent state (llogs, last ids) */
struct obd_export *opd_storage_exp;
struct dt_device *opd_storage;
- struct dt_object *opd_last_used_file;
+ struct dt_object *opd_last_used_oid_file;
+ struct dt_object *opd_last_used_seq_file;
/* stored persistently in LE format, updated directly to/from disk
* and required le64_to_cpu() conversion before use.
* Protected by opd_pre_lock */
- volatile obd_id opd_last_used_id;
-
- obd_id opd_gap_start;
+ struct lu_fid opd_last_used_fid;
+ struct lu_fid opd_gap_start_fid;
int opd_gap_count;
/* connection to OST */
struct obd_device *opd_obd;
* Precreation pool
*/
spinlock_t opd_pre_lock;
- /* last id assigned in creation */
- __u64 opd_pre_used_id;
+
+ /* last fid to assign in creation */
+ struct lu_fid opd_pre_used_fid;
/* last created id OST reported, next-created - available id's */
- __u64 opd_pre_last_created;
+ struct lu_fid opd_pre_last_created_fid;
/* how many ids are reserved in declare, we shouldn't block in create */
__u64 opd_pre_reserved;
/* dedicate precreate thread */
struct osp_thread_info {
struct lu_buf osi_lb;
+ struct lu_buf osi_lb2;
struct lu_fid osi_fid;
struct lu_attr osi_attr;
struct ost_id osi_oi;
+ struct ost_id osi_oi2;
obd_id osi_id;
loff_t osi_off;
union {
struct lu_seq_range osi_seq;
};
-static inline void osp_objid_buf_prep(struct osp_thread_info *osi,
- struct osp_device *d, int index)
+static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off,
+ __u32 *id, int index)
+{
+ buf->lb_buf = (void *)id;
+ buf->lb_len = sizeof(obd_id);
+ *off = sizeof(obd_id) * index;
+}
+
+static inline void osp_objseq_buf_prep(struct lu_buf *buf, loff_t *off,
+ __u64 *seq, int index)
{
- osi->osi_lb.lb_buf = (void *)&d->opd_last_used_id;
- osi->osi_lb.lb_len = sizeof(d->opd_last_used_id);
- osi->osi_off = sizeof(d->opd_last_used_id) * index;
+ buf->lb_buf = (void *)seq;
+ buf->lb_len = sizeof(obd_id);
+ *off = sizeof(obd_id) * index;
+}
+
+static inline void osp_buf_prep(struct lu_buf *lb, void *buf, int buf_len)
+{
+ lb->lb_buf = buf;
+ lb->lb_len = buf_len;
}
extern struct lu_context_key osp_thread_key;
#define osp_get_rpc_lock(lck, it) mdc_get_rpc_lock(lck, it)
#define osp_put_rpc_lock(lck, it) mdc_put_rpc_lock(lck, it)
+static inline void osp_update_last_fid(struct osp_device *d, struct lu_fid *fid)
+{
+ int diff = lu_fid_diff(fid, &d->opd_last_used_fid);
+ /*
+ * we might have lost precreated objects due to VBR and precreate
+ * orphans, the gap in objid can be calculated properly only here
+ */
+ if (diff > 0) {
+ if (diff > 1) {
+ d->opd_gap_start_fid = d->opd_last_used_fid;
+ d->opd_gap_start_fid.f_oid++;
+ d->opd_gap_count = diff - 1;
+ CDEBUG(D_HA, "Gap in objids: start="DFID", count =%d\n",
+ PFID(&d->opd_gap_start_fid), d->opd_gap_count);
+ }
+ d->opd_last_used_fid = *fid;
+ }
+}
+
+static inline int osp_is_fid_client(struct osp_device *osp)
+{
+ struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
+
+ return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID;
+}
+
/* osp_dev.c */
void osp_update_last_id(struct osp_device *d, obd_id objid);
extern struct llog_operations osp_mds_ost_orig_logops;
int osp_init_precreate(struct osp_device *d);
int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
__u64 osp_precreate_get_id(struct osp_device *d);
+int osp_precreate_get_fid(const struct lu_env *env, struct osp_device *d,
+ struct lu_fid *fid);
void osp_precreate_fini(struct osp_device *d);
int osp_object_truncate(const struct lu_env *env, struct dt_object *dt, __u64);
void osp_pre_update_status(struct osp_device *d, int rc);
void osp_statfs_need_now(struct osp_device *d);
+int osp_reset_last_used(const struct lu_env *env, struct osp_device *osp);
+int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
+ struct lu_fid *fid, int sync);
/* lproc_osp.c */
void lprocfs_osp_init_vars(struct lprocfs_static_vars *lvars);
#include "osp_internal.h"
-static __u64 osp_object_assign_id(const struct lu_env *env,
- struct osp_device *d, struct osp_object *o)
+static void osp_object_assign_fid(const struct lu_env *env,
+ struct osp_device *d, struct osp_object *o)
{
- struct osp_thread_info *osi = osp_env_info(env);
- const struct lu_fid *f = lu_object_fid(&o->opo_obj.do_lu);
+ struct osp_thread_info *osi = osp_env_info(env);
- LASSERT(fid_is_zero(f));
+ LASSERT(fid_is_zero(lu_object_fid(&o->opo_obj.do_lu)));
LASSERT(o->opo_reserved);
o->opo_reserved = 0;
- /* assign fid to anonymous object */
- osi->osi_oi.oi_id = osp_precreate_get_id(d);
- osi->osi_oi.oi_seq = FID_SEQ_OST_MDT0;
- fid_ostid_unpack(&osi->osi_fid, &osi->osi_oi, d->opd_index);
- lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
+ osp_precreate_get_fid(env, d, &osi->osi_fid);
- return osi->osi_oi.oi_id;
+ lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
}
static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
if (attr->la_valid & LA_SIZE && attr->la_size > 0) {
LASSERT(!dt_object_exists(dt));
- osp_object_assign_id(env, d, o);
+ osp_object_assign_fid(env, d, o);
rc = osp_object_truncate(env, dt, attr->la_size);
if (rc)
RETURN(rc);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
RETURN(-ENOSPC);
- LASSERT(d->opd_last_used_file);
+ LASSERT(d->opd_last_used_oid_file);
fid = lu_object_fid(&dt->do_lu);
/*
if (unlikely(!fid_is_zero(fid))) {
/* replay case: caller knows fid */
osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
- rc = dt_declare_record_write(env, d->opd_last_used_file,
+ rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
sizeof(osi->osi_id), osi->osi_off,
th);
RETURN(rc);
/* common for all OSPs file hystorically */
osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
- rc = dt_declare_record_write(env, d->opd_last_used_file,
+ rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
sizeof(osi->osi_id), osi->osi_off,
th);
} else {
struct osp_thread_info *osi = osp_env_info(env);
struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev);
struct osp_object *o = dt2osp_obj(dt);
- int rc = 0;
-
+ int rc = 0;
+ struct lu_fid *fid = &osi->osi_fid;
ENTRY;
if (o->opo_reserved) {
- /* regular case, id is assigned holding transaction open */
- osi->osi_id = osp_object_assign_id(env, d, o);
- } else {
+ /* regular case, fid is assigned holding trunsaction open */
+ osp_object_assign_fid(env, d, o);
+ }
+
+ memcpy(fid, lu_object_fid(&dt->do_lu), sizeof(*fid));
+
+ LASSERTF(fid_is_sane(fid), "fid for osp_obj %p is insane"DFID"!\n",
+ osp_obj, PFID(fid));
+
+ if (!o->opo_reserved) {
/* special case, id was assigned outside of transaction
* see comments in osp_declare_attr_set */
- rc = fid_ostid_pack(lu_object_fid(&dt->do_lu), &osi->osi_oi);
- LASSERT(rc == 0);
- osi->osi_id = ostid_id(&osi->osi_oi);
spin_lock(&d->opd_pre_lock);
- osp_update_last_id(d, osi->osi_id);
+ osp_update_last_fid(d, fid);
spin_unlock(&d->opd_pre_lock);
}
- LASSERT(osi->osi_id);
+ CDEBUG(D_INODE, "fid for osp_obj %p is "DFID"!\n", osp_obj, PFID(fid));
/*
* it's OK if the import is inactive by this moment - id was created
spin_lock(&d->opd_pre_lock);
if (d->opd_gap_count > 0) {
int count = d->opd_gap_count;
-
- osi->osi_oi.oi_id = d->opd_gap_start;
+ osi->osi_oi.oi_id = fid_oid(&d->opd_gap_start_fid);
d->opd_gap_count = 0;
spin_unlock(&d->opd_pre_lock);
- CDEBUG(D_HA, "Found gap "LPU64"+%d in objids\n",
- d->opd_gap_start, count);
+ CDEBUG(D_HA, "Writting gap "DFID"+%d in llog\n",
+ PFID(&d->opd_gap_start_fid), count);
/* real gap handling is disabled intil ORI-692 will be
* fixed, now we only report gaps */
} else {
* initializing attributes needs no logging */
o->opo_new = 1;
- osp_objid_buf_prep(osi, d, d->opd_index);
- rc = dt_record_write(env, d->opd_last_used_file, &osi->osi_lb,
+ /* Only need update last_used oid file, seq file will only be update
+ * during seq rollover */
+ osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off,
+ &d->opd_last_used_fid.f_oid, d->opd_index);
+
+ rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
&osi->osi_off, th);
- CDEBUG(D_HA, "%s: Wrote last used ID: "LPU64": %d\n",
- d->opd_obd->obd_name, le64_to_cpu(d->opd_last_used_id), rc);
+ CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
+ d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
RETURN(rc);
}
*
* Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
* Author: Mikhail Pershin <mike.pershin@intel.com>
+ * Author: Di Wang <di.wang@intel.com>
*/
#ifndef EXPORT_SYMTAB
return !!(d->opd_pre_thread.t_flags & SVC_STOPPED);
}
-static inline int osp_precreate_near_empty_nolock(struct osp_device *d)
+static inline int osp_objs_precreated(const struct lu_env *env,
+ struct osp_device *osp)
{
- int window = d->opd_pre_last_created - d->opd_pre_used_id;
+ struct lu_fid *fid1 = &osp->opd_pre_last_created_fid;
+ struct lu_fid *fid2 = &osp->opd_pre_used_fid;
+
+ LASSERTF(fid_seq(fid1) == fid_seq(fid2),
+ "Created fid"DFID" Next fid "DFID"\n", PFID(fid1), PFID(fid2));
+
+ if (fid_is_idif(fid1)) {
+ struct ost_id *oi1 = &osp_env_info(env)->osi_oi;
+ struct ost_id *oi2 = &osp_env_info(env)->osi_oi2;
+
+ LASSERT(fid_is_idif(fid1) && fid_is_idif(fid2));
+ ostid_idif_pack(fid1, oi1);
+ ostid_idif_pack(fid2, oi2);
+ LASSERT(oi1->oi_id >= oi2->oi_id);
+
+ return oi1->oi_id - oi2->oi_id;
+ }
+
+ return fid_oid(fid1) - fid_oid(fid2);
+}
+
+static inline int osp_precreate_near_empty_nolock(const struct lu_env *env,
+ struct osp_device *d)
+{
+ int window = osp_objs_precreated(env, d);
/* don't consider new precreation till OST is healty and
* has free space */
(d->opd_pre_status == 0));
}
-static inline int osp_precreate_near_empty(struct osp_device *d)
+static inline int osp_precreate_near_empty(const struct lu_env *env,
+ struct osp_device *d)
{
int rc;
/* XXX: do we really need locking here? */
spin_lock(&d->opd_pre_lock);
- rc = osp_precreate_near_empty_nolock(d);
+ rc = osp_precreate_near_empty_nolock(env, d);
spin_unlock(&d->opd_pre_lock);
return rc;
}
-static int osp_precreate_send(struct osp_device *d)
+/**
+ * Write fid into last_oid/last_seq file.
+ **/
+int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
+ struct lu_fid *fid, int sync)
{
+ struct osp_thread_info *oti = osp_env_info(env);
+ struct lu_buf *lb_oid = &oti->osi_lb;
+ struct lu_buf *lb_oseq = &oti->osi_lb2;
+ loff_t oid_off;
+ loff_t oseq_off;
+ struct thandle *th;
+ int rc;
+ ENTRY;
+
+ /* Note: through f_oid is only 32bits, it will also write
+ * 64 bits for oid to keep compatiblity with the previous
+ * version. */
+ lb_oid->lb_buf = &fid->f_oid;
+ lb_oid->lb_len = sizeof(obd_id);
+ oid_off = sizeof(obd_id) * osp->opd_index;
+
+ lb_oseq->lb_buf = &fid->f_seq;
+ lb_oseq->lb_len = sizeof(obd_id);
+ oseq_off = sizeof(obd_id) * osp->opd_index;
+
+ th = dt_trans_create(env, osp->opd_storage);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ th->th_sync |= sync;
+ rc = dt_declare_record_write(env, osp->opd_last_used_oid_file,
+ lb_oid->lb_len, oid_off, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_declare_record_write(env, osp->opd_last_used_seq_file,
+ lb_oseq->lb_len, oseq_off, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_trans_start_local(env, osp->opd_storage, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_record_write(env, osp->opd_last_used_oid_file, lb_oid,
+ &oid_off, th);
+ if (rc != 0) {
+ CERROR("%s: can not write to last seq file: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+ rc = dt_record_write(env, osp->opd_last_used_seq_file, lb_oseq,
+ &oseq_off, th);
+ if (rc) {
+ CERROR("%s: can not write to last seq file: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+out:
+ dt_trans_stop(env, osp->opd_storage, th);
+ RETURN(rc);
+}
+
+/**
+ * alloc fids for precreation.
+ * rc = 0 Success, @grow is the count of real allocation.
+ * rc = 1 Current seq is used up.
+ * rc < 0 Other error.
+ **/
+static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
+ struct lu_fid *fid, int *grow)
+{
+ struct osp_thread_info *osi = osp_env_info(env);
+ __u64 end;
+ int i = 0;
+
+ if (fid_is_idif(fid)) {
+ struct lu_fid *last_fid;
+ struct ost_id *oi = &osi->osi_oi;
+
+ spin_lock(&osp->opd_pre_lock);
+ last_fid = &osp->opd_pre_last_created_fid;
+ ostid_idif_pack(last_fid, oi);
+ end = min(oi->oi_id + *grow, IDIF_MAX_OID);
+ *grow = end - oi->oi_id;
+ oi->oi_id += *grow;
+ spin_unlock(&osp->opd_pre_lock);
+
+ if (*grow == 0)
+ return 1;
+
+ ostid_idif_unpack(oi, fid, osp->opd_index);
+ return 0;
+ }
+
+ spin_lock(&osp->opd_pre_lock);
+ *fid = osp->opd_pre_last_created_fid;
+ end = fid->f_oid;
+ end = min((end + *grow), (__u64)LUSTRE_DATA_SEQ_MAX_WIDTH);
+ *grow = end - fid->f_oid;
+ fid->f_oid += end - fid->f_oid;
+ spin_unlock(&osp->opd_pre_lock);
+
+ CDEBUG(D_INFO, "Expect %d, actual %d ["DFID" -- "DFID"]\n",
+ *grow, i, PFID(fid), PFID(&osp->opd_pre_last_created_fid));
+
+ return *grow > 0 ? 0 : 1;
+}
+
+static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
+{
+ struct osp_thread_info *oti = osp_env_info(env);
struct ptlrpc_request *req;
struct obd_import *imp;
struct ost_body *body;
int rc, grow, diff;
-
+ struct lu_fid *fid = &oti->osi_fid;
ENTRY;
/* don't precreate new objects till OST healthy and has free space */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa.o_id = d->opd_pre_last_created + grow;
- body->oa.o_seq = FID_SEQ_OST_MDT0; /* XXX: support for CMD? */
+
+ rc = osp_precreate_fids(env, d, fid, &grow);
+ if (rc == 1) {
+ /* Current seq has been used up*/
+ if (!osp_is_fid_client(d)) {
+ osp_pre_update_status(d, -ENOSPC);
+ rc = -ENOSPC;
+ }
+ cfs_waitq_signal(&d->opd_pre_waitq);
+ GOTO(out_req, rc);
+ }
+
+ if (!osp_is_fid_client(d)) {
+ /* Non-FID client will always send seq 0 because of
+ * compatiblity */
+ LASSERTF(fid_is_idif(fid), "Invalid fid "DFID"\n", PFID(fid));
+ fid->f_seq = 0;
+ }
+
+ ostid_fid_pack(fid, &body->oa.o_oi);
body->oa.o_valid = OBD_MD_FLGROUP;
ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
if (rc) {
- CERROR("%s: can't precreate: rc = %d\n",
- d->opd_obd->obd_name, rc);
+ CERROR("%s: can't precreate: rc = %d\n", d->opd_obd->obd_name,
+ rc);
GOTO(out_req, rc);
}
LASSERT(req->rq_transno == 0);
if (body == NULL)
GOTO(out_req, rc = -EPROTO);
- CDEBUG(D_HA, "%s: new last_created "LPU64"\n", d->opd_obd->obd_name,
- body->oa.o_id);
- LASSERT(body->oa.o_id > d->opd_pre_used_id);
+ fid_ostid_unpack(fid, &body->oa.o_oi, d->opd_index);
+ LASSERTF(lu_fid_diff(fid, &d->opd_pre_used_fid) > 0,
+ "reply fid "DFID" pre used fid "DFID"\n", PFID(fid),
+ PFID(&d->opd_pre_used_fid));
+
+ CDEBUG(D_HA, "%s: new last_created "DFID"\n", d->opd_obd->obd_name,
+ PFID(fid));
- diff = body->oa.o_id - d->opd_pre_last_created;
+ diff = lu_fid_diff(fid, &d->opd_pre_last_created_fid);
spin_lock(&d->opd_pre_lock);
if (diff < grow) {
* next time if needed */
d->opd_pre_grow_slow = 0;
}
- d->opd_pre_last_created = body->oa.o_id;
+
+ d->opd_pre_last_created_fid = *fid;
spin_unlock(&d->opd_pre_lock);
- CDEBUG(D_OTHER, "current precreated pool: %llu-%llu\n",
- d->opd_pre_used_id, d->opd_pre_last_created);
+ CDEBUG(D_OTHER, "current precreated pool: "DFID"-"DFID"\n",
+ PFID(&d->opd_pre_used_fid), PFID(&d->opd_pre_last_created_fid));
out_req:
/* now we can wakeup all users awaiting for objects */
osp_pre_update_status(d, rc);
RETURN(rc);
}
-
-static int osp_get_lastid_from_ost(struct osp_device *d)
+static int osp_get_lastfid_from_ost(struct osp_device *d)
{
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
struct obd_import *imp;
- obd_id *reply;
+ struct lu_fid *last_fid = &d->opd_last_used_fid;
char *tmp;
- int rc;
+ int rc;
+ ENTRY;
imp = d->opd_obd->u.cli.cl_import;
LASSERT(imp);
- req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_ID);
+ req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_FID);
if (req == NULL)
RETURN(-ENOMEM);
req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
- RCL_CLIENT, sizeof(KEY_LAST_ID));
+ RCL_CLIENT, sizeof(KEY_LAST_FID));
+
+ req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT, sizeof(*last_fid));
+
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
if (rc) {
ptlrpc_request_free(req);
}
tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
- memcpy(tmp, KEY_LAST_ID, sizeof(KEY_LAST_ID));
+ memcpy(tmp, KEY_LAST_FID, sizeof(KEY_LAST_FID));
req->rq_no_delay = req->rq_no_resend = 1;
+ fid_cpu_to_le(last_fid, last_fid);
+ tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
+ memcpy(tmp, last_fid, sizeof(*last_fid));
ptlrpc_request_set_replen(req);
+
rc = ptlrpc_queue_wait(req);
if (rc) {
/* bad-bad OST.. let sysadm sort this out */
+ if (rc == -ENOTSUPP) {
+ CERROR("%s: server does not support FID: rc = %d\n",
+ d->opd_obd->obd_name, -ENOTSUPP);
+ }
ptlrpc_set_import_active(imp, 0);
GOTO(out, rc);
}
- reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
- if (reply == NULL)
+ last_fid = req_capsule_server_get(&req->rq_pill, &RMF_FID);
+ if (last_fid == NULL || !fid_is_sane(last_fid)) {
+ CERROR("%s: Got insane last_fid "DFID"\n",
+ d->opd_obd->obd_name, PFID(last_fid));
GOTO(out, rc = -EPROTO);
+ }
- d->opd_last_used_id = *reply;
- CDEBUG(D_HA, "%s: got last_id "LPU64" from OST\n",
- d->opd_obd->obd_name, d->opd_last_used_id);
+ /* Only update the last used fid, if the OST has objects for
+ * this sequence, i.e. fid_oid > 0 */
+ if (fid_oid(last_fid) > 0)
+ d->opd_last_used_fid = *last_fid;
+
+ CDEBUG(D_HA, "%s: Got insane last_fid "DFID"\n", d->opd_obd->obd_name,
+ PFID(last_fid));
out:
ptlrpc_req_finished(req);
RETURN(rc);
-
}
/**
* asks OST to clean precreate orphans
* and gets next id for new objects
*/
-static int osp_precreate_cleanup_orphans(struct osp_device *d)
+static int osp_precreate_cleanup_orphans(struct lu_env *env,
+ struct osp_device *d)
{
+ struct osp_thread_info *osi = osp_env_info(env);
+ struct lu_fid *last_fid = &osi->osi_fid;
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
struct ost_body *body;
struct l_wait_info lwi = { 0 };
int update_status = 0;
int rc;
+ int diff;
ENTRY;
if (!osp_precreate_running(d) || d->opd_got_disconnected)
GOTO(out, rc = -EAGAIN);
- CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
- d->opd_obd->obd_name, d->opd_last_used_id);
+ CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
+ d->opd_obd->obd_name, PFID(&d->opd_last_used_fid));
- if (d->opd_last_used_id < 2) {
- /* lastid looks strange... ask OST */
- rc = osp_get_lastid_from_ost(d);
+ *last_fid = d->opd_last_used_fid;
+ /* The OSP should already get the valid seq now */
+ LASSERT(!fid_is_zero(last_fid));
+ if (fid_oid(&d->opd_last_used_fid) < 2) {
+ /* lastfid looks strange... ask OST */
+ rc = osp_get_lastfid_from_ost(d);
if (rc)
GOTO(out, rc);
}
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
if (rc) {
ptlrpc_request_free(req);
+ req = NULL;
GOTO(out, rc);
}
body->oa.o_flags = OBD_FL_DELORPHAN;
body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
- body->oa.o_seq = FID_SEQ_OST_MDT0;
-
- body->oa.o_id = d->opd_last_used_id;
+ if (osp_is_fid_client(d))
+ body->oa.o_seq = fid_seq(&d->opd_last_used_fid);
+ else
+ body->oa.o_seq = 0;
+ /* remove from NEXT after used one */
+ body->oa.o_id = fid_oid(&d->opd_last_used_fid);
ptlrpc_request_set_replen(req);
/*
* OST provides us with id new pool starts from in body->oa.o_id
*/
+ fid_ostid_unpack(last_fid, &body->oa.o_oi, d->opd_index);
+ CDEBUG(D_INFO, "%s: last_fid "DFID" server last fid "DFID"\n",
+ d->opd_obd->obd_name, PFID(&d->opd_last_used_fid),
+ PFID(last_fid));
+
spin_lock(&d->opd_pre_lock);
- if (le64_to_cpu(d->opd_last_used_id) > body->oa.o_id) {
- d->opd_pre_grow_count = OST_MIN_PRECREATE +
- le64_to_cpu(d->opd_last_used_id) -
- body->oa.o_id;
- d->opd_pre_last_created = le64_to_cpu(d->opd_last_used_id);
+ diff = lu_fid_diff(&d->opd_last_used_fid, last_fid);
+ if (diff > 0) {
+ d->opd_pre_grow_count = OST_MIN_PRECREATE + diff;
+ d->opd_pre_last_created_fid = d->opd_last_used_fid;
} else {
d->opd_pre_grow_count = OST_MIN_PRECREATE;
- d->opd_pre_last_created = body->oa.o_id;
+ d->opd_pre_last_created_fid = *last_fid;
}
/*
* This empties the pre-creation pool and effectively blocks any new
* reservations.
*/
- d->opd_pre_used_id = d->opd_pre_last_created;
+ LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
+ LUSTRE_DATA_SEQ_MAX_WIDTH);
+ d->opd_pre_used_fid = d->opd_pre_last_created_fid;
d->opd_pre_grow_slow = 0;
spin_unlock(&d->opd_pre_lock);
- CDEBUG(D_HA, "%s: Got last_id "LPU64" from OST, last_used is "LPU64
- ", pre_used "LPU64"\n", d->opd_obd->obd_name, body->oa.o_id,
- le64_to_cpu(d->opd_last_used_id), d->opd_pre_used_id);
-
+ CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
+ "last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
+ PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
out:
if (req)
ptlrpc_req_finished(req);
cfs_waitq_signal(&d->opd_pre_user_waitq);
}
+static int osp_init_pre_fid(struct osp_device *osp)
+{
+ struct lu_env env;
+ struct osp_thread_info *osi;
+ struct lu_client_seq *cli_seq;
+ struct lu_fid *last_fid;
+ int rc;
+ ENTRY;
+
+ /* Return if last_used fid has been initialized */
+ if (!fid_is_zero(&osp->opd_last_used_fid))
+ RETURN(0);
+
+ rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc) {
+ CERROR("%s: init env error: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ osi = osp_env_info(&env);
+ last_fid = &osi->osi_fid;
+ fid_zero(last_fid);
+ /* For a freshed fs, it will allocate a new sequence first */
+ if (osp_is_fid_client(osp)) {
+ cli_seq = osp->opd_obd->u.cli.cl_seq;
+ rc = seq_client_get_seq(&env, cli_seq, &last_fid->f_seq);
+ if (rc != 0) {
+ CERROR("%s: alloc fid error: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+ } else {
+ last_fid->f_seq = fid_idif_seq(1, osp->opd_index);
+ }
+ last_fid->f_oid = 1;
+ last_fid->f_ver = 0;
+
+ spin_lock(&osp->opd_pre_lock);
+ osp->opd_last_used_fid = *last_fid;
+ osp->opd_pre_used_fid = *last_fid;
+ osp->opd_pre_last_created_fid = *last_fid;
+ spin_unlock(&osp->opd_pre_lock);
+ rc = osp_write_last_oid_seq_files(&env, osp, last_fid, 1);
+ if (rc != 0) {
+ CERROR("%s: write fid error: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+out:
+ lu_env_fini(&env);
+ RETURN(rc);
+}
+
static int osp_precreate_thread(void *_arg)
{
struct osp_device *d = _arg;
struct ptlrpc_thread *thread = &d->opd_pre_thread;
struct l_wait_info lwi = { 0 };
char pname[16];
+ struct lu_env env;
int rc;
ENTRY;
sprintf(pname, "osp-pre-%u\n", d->opd_index);
cfs_daemonize(pname);
+ rc = lu_env_init(&env, d->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc) {
+ CERROR("%s: init env error: rc = %d\n", d->opd_obd->obd_name,
+ rc);
+ RETURN(rc);
+ }
+
spin_lock(&d->opd_pre_lock);
thread->t_flags = SVC_RUNNING;
spin_unlock(&d->opd_pre_lock);
while (osp_precreate_running(d)) {
l_wait_event(d->opd_pre_waitq,
!osp_precreate_running(d) ||
- d->opd_new_connection, &lwi);
+ d->opd_new_connection,
+ &lwi);
if (!osp_precreate_running(d))
break;
if (!d->opd_new_connection)
continue;
- /* got connected */
d->opd_new_connection = 0;
d->opd_got_disconnected = 0;
break;
}
+ if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL) {
+ /* Get new sequence for client first */
+ LASSERT(d->opd_exp != NULL);
+ d->opd_obd->u.cli.cl_seq->lcs_exp =
+ class_export_get(d->opd_exp);
+ rc = osp_init_pre_fid(d);
+ if (rc != 0) {
+ class_export_put(d->opd_exp);
+ d->opd_obd->u.cli.cl_seq->lcs_exp = NULL;
+ CERROR("%s: init pre fid error: rc = %d\n",
+ d->opd_obd->obd_name, rc);
+ continue;
+ }
+ }
+
osp_statfs_update(d);
/*
* Clean up orphans or recreate missing objects.
*/
- rc = osp_precreate_cleanup_orphans(d);
+ rc = osp_precreate_cleanup_orphans(&env, d);
if (rc != 0)
continue;
-
/*
* connected, can handle precreates now
*/
while (osp_precreate_running(d)) {
l_wait_event(d->opd_pre_waitq,
!osp_precreate_running(d) ||
- osp_precreate_near_empty(d) ||
+ osp_precreate_near_empty(&env, d) ||
osp_statfs_need_update(d) ||
d->opd_got_disconnected, &lwi);
if (osp_statfs_need_update(d))
osp_statfs_update(d);
- if (osp_precreate_near_empty(d)) {
- rc = osp_precreate_send(d);
+ if (osp_precreate_near_empty(&env, d)) {
+ rc = osp_precreate_send(&env, d);
/* osp_precreate_send() sets opd_pre_status
* in case of error, that prevent the using of
* failed device. */
- if (rc != 0 && rc != -ENOSPC &&
+ if (rc < 0 && rc != -ENOSPC &&
rc != -ETIMEDOUT && rc != -ENOTCONN)
CERROR("%s: cannot precreate objects:"
" rc = %d\n",
}
thread->t_flags = SVC_STOPPED;
+ lu_env_fini(&env);
cfs_waitq_signal(&thread->t_ctl_waitq);
RETURN(0);
}
-static int osp_precreate_ready_condition(struct osp_device *d)
+static int osp_precreate_ready_condition(const struct lu_env *env,
+ struct osp_device *d)
{
- __u64 next;
-
if (d->opd_pre_recovering)
return 0;
/* ready if got enough precreated objects */
/* we need to wait for others (opd_pre_reserved) and our object (+1) */
- next = d->opd_pre_used_id + d->opd_pre_reserved + 1;
- if (next <= d->opd_pre_last_created)
+ if (d->opd_pre_reserved + 1 < osp_objs_precreated(env, d))
return 1;
/* ready if OST reported no space and no destoys in progress */
{
struct osp_device *d = data;
- LCONSOLE_WARN("%s: slow creates, last="LPU64", next="LPU64", "
+ LCONSOLE_WARN("%s: slow creates, last="DFID", next="DFID", "
"reserved="LPU64", syn_changes=%lu, "
"syn_rpc_in_progress=%d, status=%d\n",
- d->opd_obd->obd_name, d->opd_pre_last_created,
- d->opd_pre_used_id, d->opd_pre_reserved,
+ d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
+ PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
d->opd_syn_changes, d->opd_syn_rpc_in_progress,
d->opd_pre_status);
ENTRY;
- LASSERT(d->opd_pre_last_created >= d->opd_pre_used_id);
+ LASSERTF(osp_objs_precreated(env, d) >= 0, "Last created FID "DFID
+ "Next FID "DFID"\n", PFID(&d->opd_pre_last_created_fid),
+ PFID(&d->opd_pre_used_fid));
/*
* wait till:
* - preallocation is done
* - no free space expected soon
* - can't connect to OST for too long (obd_timeout)
+ * - OST can allocate fid sequence.
*/
while ((rc = d->opd_pre_status) == 0 || rc == -ENOSPC ||
- rc == -ENODEV) {
+ rc == -ENODEV || rc == -EAGAIN) {
#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
/*
/*
* increase number of precreations
*/
+ precreated = osp_objs_precreated(env, d);
if (d->opd_pre_grow_count < d->opd_pre_max_grow_count &&
d->opd_pre_grow_slow == 0 &&
- (d->opd_pre_last_created - d->opd_pre_used_id <=
- d->opd_pre_grow_count / 4 + 1)) {
+ precreated <= (d->opd_pre_grow_count / 4 + 1)) {
spin_lock(&d->opd_pre_lock);
d->opd_pre_grow_slow = 1;
d->opd_pre_grow_count *= 2;
}
spin_lock(&d->opd_pre_lock);
- precreated = d->opd_pre_last_created - d->opd_pre_used_id;
+ precreated = osp_objs_precreated(env, d);
if (precreated > d->opd_pre_reserved &&
!d->opd_pre_recovering) {
d->opd_pre_reserved++;
rc = 0;
/* XXX: don't wake up if precreation is in progress */
- if (osp_precreate_near_empty_nolock(d))
+ if (osp_precreate_near_empty_nolock(env, d))
cfs_waitq_signal(&d->opd_pre_waitq);
break;
break;
l_wait_event(d->opd_pre_user_waitq,
- osp_precreate_ready_condition(d), &lwi);
+ osp_precreate_ready_condition(env, d), &lwi);
}
RETURN(rc);
/*
* this function relies on reservation made before
*/
-__u64 osp_precreate_get_id(struct osp_device *d)
+int osp_precreate_get_fid(const struct lu_env *env, struct osp_device *d,
+ struct lu_fid *fid)
{
- obd_id objid;
-
/* grab next id from the pool */
spin_lock(&d->opd_pre_lock);
- LASSERT(d->opd_pre_used_id < d->opd_pre_last_created);
- objid = ++d->opd_pre_used_id;
+
+ LASSERTF(lu_fid_diff(&d->opd_pre_used_fid,
+ &d->opd_pre_last_created_fid) < 0,
+ "next fid "DFID" last created fid "DFID"\n",
+ PFID(&d->opd_pre_used_fid),
+ PFID(&d->opd_pre_last_created_fid));
+
+ d->opd_pre_used_fid.f_oid++;
+ memcpy(fid, &d->opd_pre_used_fid, sizeof(*fid));
d->opd_pre_reserved--;
/*
* last_used_id must be changed along with getting new id otherwise
* we might miscalculate gap causing object loss or leak
*/
- osp_update_last_id(d, objid);
+ osp_update_last_fid(d, fid);
spin_unlock(&d->opd_pre_lock);
/*
if (unlikely(d->opd_pre_reserved == 0 && d->opd_pre_status))
cfs_waitq_signal(&d->opd_pre_waitq);
- return objid;
+ return 0;
}
/*
/* initially precreation isn't ready */
d->opd_pre_status = -EAGAIN;
- d->opd_pre_used_id = 0;
- d->opd_pre_last_created = 0;
+ fid_zero(&d->opd_pre_used_fid);
+ d->opd_pre_used_fid.f_oid = 1;
+ fid_zero(&d->opd_pre_last_created_fid);
+ d->opd_pre_last_created_fid.f_oid = 1;
d->opd_pre_reserved = 0;
d->opd_got_disconnected = 1;
d->opd_pre_grow_slow = 0;
#include <lustre_dlm.h>
#include <lustre_export.h>
#include <lustre_debug.h>
+#include <lustre_fid.h>
+#include <lustre_fld.h>
#include <linux/init.h>
#include <lprocfs_status.h>
#include <libcfs/list.h>
if (ioobj)
ioobj->ioo_seq = FID_SEQ_OST_MDT0;
/* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */
- } else if (oa == NULL || !(fid_seq_is_rsvd(oa->o_seq) ||
- fid_seq_is_mdt0(oa->o_seq))) {
+ } else if (oa == NULL ||
+ !(fid_seq_is_norm(oa->o_seq) || fid_seq_is_mdt(oa->o_seq) ||
+ fid_seq_is_echo(oa->o_seq))) {
CERROR("%s: client %s sent invalid object "POSTID"\n",
exp->exp_obd->obd_name, obd_export_nid2str(exp),
oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
if (reply == NULL)
RETURN(-ENOMEM);
+ if (KEY_IS(KEY_LAST_FID)) {
+ void *val;
+ int vallen;
+
+ req_capsule_extend(pill, &RQF_OST_GET_INFO_LAST_FID);
+ val = req_capsule_client_get(pill, &RMF_SETINFO_VAL);
+ vallen = req_capsule_get_size(pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT);
+ if (val != NULL && vallen > 0 && replylen >= vallen) {
+ memcpy(reply, val, vallen);
+ } else {
+ CERROR("%s: invalid req val %p vallen %d replylen %d\n",
+ exp->exp_obd->obd_name, val, vallen, replylen);
+ GOTO(out, rc = -EINVAL);
+ }
+ }
+
/* call again to fill in the reply buffer */
rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
&replylen, reply, NULL);
-
+out:
lustre_msg_set_status(req->rq_repmsg, 0);
RETURN(rc);
}
&RMF_OBD_ID
};
+static const struct req_msg_field *ost_get_last_fid_server[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_FID
+};
+
static const struct req_msg_field *ost_get_fiemap_client[] = {
&RMF_PTLRPC_BODY,
&RMF_FIEMAP_KEY,
&RQF_OST_SET_GRANT_INFO,
&RQF_OST_GET_INFO_GENERIC,
&RQF_OST_GET_INFO_LAST_ID,
+ &RQF_OST_GET_INFO_LAST_FID,
+ &RQF_OST_SET_INFO_LAST_FID,
&RQF_OST_GET_INFO_FIEMAP,
&RQF_LDLM_ENQUEUE,
&RQF_LDLM_ENQUEUE_LVB,
sizeof(obd_id), lustre_swab_ost_last_id, NULL);
EXPORT_SYMBOL(RMF_OBD_ID);
+struct req_msg_field RMF_FID =
+ DEFINE_MSGF("fid", 0,
+ sizeof(struct lu_fid), lustre_swab_lu_fid, NULL);
+EXPORT_SYMBOL(RMF_FID);
+
struct req_msg_field RMF_FIEMAP_KEY =
DEFINE_MSGF("fiemap", 0, sizeof(struct ll_fiemap_info_key),
lustre_swab_fiemap, NULL);
ost_get_last_id_server);
EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_ID);
+struct req_format RQF_OST_GET_INFO_LAST_FID =
+ DEFINE_REQ_FMT0("OST_GET_INFO_LAST_FID", obd_set_info_client,
+ ost_get_last_fid_server);
+EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_FID);
+
+struct req_format RQF_OST_SET_INFO_LAST_FID =
+ DEFINE_REQ_FMT0("OST_SET_INFO_LAST_FID", obd_set_info_client,
+ empty);
+EXPORT_SYMBOL(RQF_OST_SET_INFO_LAST_FID);
+
struct req_format RQF_OST_GET_INFO_FIEMAP =
DEFINE_REQ_FMT0("OST_GET_INFO_FIEMAP", ost_get_fiemap_client,
ost_get_fiemap_server);
# { error "mounting $dev as $FSTYPE failed"; return 3; }
#local obj_file=$(do_facet ost$ost find $dir/O/$seq -name $oid)
#local ff=$(do_facet ost$ost $LL_DECODE_FILTER_FID $obj_file)
-
- local obj_file="O/$seq/d$((oid %32))/$oid"
+ seq=$(echo $seq | sed -e "s/^0x//g")
+ if [ $seq == 0 ]; then
+ oid_hex=$(echo $oid)
+ else
+ oid_hex=$(echo $hex | sed -e "s/^0x//g")
+ fi
+ local obj_file="O/$seq/d$((oid %32))/$oid_hex"
local ff=$(do_facet ost$ost "$DEBUGFS -c -R 'stat $obj_file' \
$dev 2>/dev/null" | grep "parent=")
if (!is_dir && (header & VERBOSE_OBJID)) {
if (obdstripe == 1)
llapi_printf(LLAPI_MSG_NORMAL,
- "\tobdidx\t\t objid\t\tobjid\t\t group\n");
+ "\tobdidx\t\t objid\t\t objid\t\t group\n");
for (i = 0; i < lum->lmm_stripe_count; i++) {
int idx = objects[i].l_ost_idx;
long long oid = objects[i].l_object_id;
long long gr = objects[i].l_object_seq;
- if ((obdindex == OBD_NOT_FOUND) || (obdindex == idx))
- llapi_printf(LLAPI_MSG_NORMAL,
- "\t%6u\t%14llu\t%#13llx\t%14llu%s\n",
- idx, oid, oid, gr,
- obdindex == idx ? " *" : "");
+ if ((obdindex == OBD_NOT_FOUND) || (obdindex == idx)) {
+ char fmt[48];
+ sprintf(fmt, "%s%s%s\n",
+ "\t%6u\t%14llu\t%#13llx\t",
+ (fid_seq_is_rsvd(gr) ||
+ fid_seq_is_mdt0(gr)) ?
+ "%14llu" : "%#14llx", "%s");
+ llapi_printf(LLAPI_MSG_NORMAL, fmt, idx, oid,
+ oid, gr,
+ obdindex == idx ? " *" : "");
+ }
+
}
llapi_printf(LLAPI_MSG_NORMAL, "\n");
}
#define lustre_swab_gl_desc NULL
#define lustre_swab_mgs_config_body NULL
#define lustre_swab_mgs_config_res NULL
+#define lustre_swab_lu_fid NULL
#define dump_rniobuf NULL
#define dump_ioo NULL
#define dump_obdo NULL