*
* Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
* Author: Mikhail Pershin <mike.pershin@intel.com>
+ * Author: Di Wang <di.wang@intel.com>
*/
#ifndef EXPORT_SYMTAB
return !!(d->opd_pre_thread.t_flags & SVC_STOPPED);
}
-static inline int osp_precreate_near_empty_nolock(struct osp_device *d)
+static inline int osp_objs_precreated(const struct lu_env *env,
+ struct osp_device *osp)
{
- int window = d->opd_pre_last_created - d->opd_pre_used_id;
+ struct lu_fid *fid1 = &osp->opd_pre_last_created_fid;
+ struct lu_fid *fid2 = &osp->opd_pre_used_fid;
+
+ LASSERTF(fid_seq(fid1) == fid_seq(fid2),
+ "Created fid"DFID" Next fid "DFID"\n", PFID(fid1), PFID(fid2));
+
+ if (fid_is_idif(fid1)) {
+ struct ost_id *oi1 = &osp_env_info(env)->osi_oi;
+ struct ost_id *oi2 = &osp_env_info(env)->osi_oi2;
+
+ LASSERT(fid_is_idif(fid1) && fid_is_idif(fid2));
+ ostid_idif_pack(fid1, oi1);
+ ostid_idif_pack(fid2, oi2);
+ LASSERT(oi1->oi_id >= oi2->oi_id);
+
+ return oi1->oi_id - oi2->oi_id;
+ }
+
+ return fid_oid(fid1) - fid_oid(fid2);
+}
+
+static inline int osp_precreate_near_empty_nolock(const struct lu_env *env,
+ struct osp_device *d)
+{
+ int window = osp_objs_precreated(env, d);
/* don't consider new precreation till OST is healty and
* has free space */
(d->opd_pre_status == 0));
}
-static inline int osp_precreate_near_empty(struct osp_device *d)
+static inline int osp_precreate_near_empty(const struct lu_env *env,
+ struct osp_device *d)
{
int rc;
/* XXX: do we really need locking here? */
spin_lock(&d->opd_pre_lock);
- rc = osp_precreate_near_empty_nolock(d);
+ rc = osp_precreate_near_empty_nolock(env, d);
spin_unlock(&d->opd_pre_lock);
return rc;
}
-static int osp_precreate_send(struct osp_device *d)
+static inline int osp_create_end_seq(const struct lu_env *env,
+ struct osp_device *osp)
+{
+ struct lu_fid *fid = &osp->opd_pre_used_fid;
+ int rc;
+
+ spin_lock(&osp->opd_pre_lock);
+ rc = osp_fid_end_seq(env, fid);
+ spin_unlock(&osp->opd_pre_lock);
+ return rc;
+}
+
+/**
+ * Write fid into last_oid/last_seq file.
+ **/
+int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
+ struct lu_fid *fid, int sync)
+{
+ struct osp_thread_info *oti = osp_env_info(env);
+ struct lu_buf *lb_oid = &oti->osi_lb;
+ struct lu_buf *lb_oseq = &oti->osi_lb2;
+ loff_t oid_off;
+ loff_t oseq_off;
+ struct thandle *th;
+ int rc;
+ ENTRY;
+
+ /* Note: through f_oid is only 32bits, it will also write
+ * 64 bits for oid to keep compatiblity with the previous
+ * version. */
+ lb_oid->lb_buf = &fid->f_oid;
+ lb_oid->lb_len = sizeof(obd_id);
+ oid_off = sizeof(obd_id) * osp->opd_index;
+
+ lb_oseq->lb_buf = &fid->f_seq;
+ lb_oseq->lb_len = sizeof(obd_id);
+ oseq_off = sizeof(obd_id) * osp->opd_index;
+
+ th = dt_trans_create(env, osp->opd_storage);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ th->th_sync |= sync;
+ rc = dt_declare_record_write(env, osp->opd_last_used_oid_file,
+ lb_oid->lb_len, oid_off, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_declare_record_write(env, osp->opd_last_used_seq_file,
+ lb_oseq->lb_len, oseq_off, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_trans_start_local(env, osp->opd_storage, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_record_write(env, osp->opd_last_used_oid_file, lb_oid,
+ &oid_off, th);
+ if (rc != 0) {
+ CERROR("%s: can not write to last seq file: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+ rc = dt_record_write(env, osp->opd_last_used_seq_file, lb_oseq,
+ &oseq_off, th);
+ if (rc) {
+ CERROR("%s: can not write to last seq file: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+out:
+ dt_trans_stop(env, osp->opd_storage, th);
+ RETURN(rc);
+}
+
+int osp_precreate_rollover_new_seq(struct lu_env *env, struct osp_device *osp)
+{
+ struct lu_fid *fid = &osp_env_info(env)->osi_fid;
+ struct lu_fid *last_fid = &osp->opd_last_used_fid;
+ int rc;
+ ENTRY;
+
+ rc = seq_client_get_seq(env, osp->opd_obd->u.cli.cl_seq, &fid->f_seq);
+ if (rc != 0) {
+ CERROR("%s: alloc fid error: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ fid->f_oid = 1;
+ fid->f_ver = 0;
+ LASSERTF(fid_seq(fid) != fid_seq(last_fid),
+ "fid "DFID", last_fid "DFID"\n", PFID(fid),
+ PFID(last_fid));
+
+ rc = osp_write_last_oid_seq_files(env, osp, fid, 1);
+ if (rc != 0) {
+ CERROR("%s: Can not update oid/seq file: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ LCONSOLE_INFO("%s: update sequence from "LPX64" to "LPX64"\n",
+ osp->opd_obd->obd_name, fid_seq(last_fid),
+ fid_seq(fid));
+ /* Update last_xxx to the new seq */
+ spin_lock(&osp->opd_pre_lock);
+ osp->opd_last_used_fid = *fid;
+ osp->opd_gap_start_fid = *fid;
+ osp->opd_pre_used_fid = *fid;
+ osp->opd_pre_last_created_fid = *fid;
+ spin_unlock(&osp->opd_pre_lock);
+
+ RETURN(rc);
+}
+
+/**
+ * alloc fids for precreation.
+ * rc = 0 Success, @grow is the count of real allocation.
+ * rc = 1 Current seq is used up.
+ * rc < 0 Other error.
+ **/
+static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
+ struct lu_fid *fid, int *grow)
+{
+ struct osp_thread_info *osi = osp_env_info(env);
+ __u64 end;
+ int i = 0;
+
+ if (fid_is_idif(fid)) {
+ struct lu_fid *last_fid;
+ struct ost_id *oi = &osi->osi_oi;
+
+ spin_lock(&osp->opd_pre_lock);
+ last_fid = &osp->opd_pre_last_created_fid;
+ ostid_idif_pack(last_fid, oi);
+ end = min(oi->oi_id + *grow, IDIF_MAX_OID);
+ *grow = end - oi->oi_id;
+ oi->oi_id += *grow;
+ spin_unlock(&osp->opd_pre_lock);
+
+ if (*grow == 0)
+ return 1;
+
+ ostid_idif_unpack(oi, fid, osp->opd_index);
+ return 0;
+ }
+
+ spin_lock(&osp->opd_pre_lock);
+ *fid = osp->opd_pre_last_created_fid;
+ end = fid->f_oid;
+ end = min((end + *grow), (__u64)LUSTRE_DATA_SEQ_MAX_WIDTH);
+ *grow = end - fid->f_oid;
+ fid->f_oid += end - fid->f_oid;
+ spin_unlock(&osp->opd_pre_lock);
+
+ CDEBUG(D_INFO, "Expect %d, actual %d ["DFID" -- "DFID"]\n",
+ *grow, i, PFID(fid), PFID(&osp->opd_pre_last_created_fid));
+
+ return *grow > 0 ? 0 : 1;
+}
+
+static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
{
+ struct osp_thread_info *oti = osp_env_info(env);
struct ptlrpc_request *req;
struct obd_import *imp;
struct ost_body *body;
int rc, grow, diff;
-
+ struct lu_fid *fid = &oti->osi_fid;
ENTRY;
/* don't precreate new objects till OST healthy and has free space */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa.o_id = d->opd_pre_last_created + grow;
- body->oa.o_seq = FID_SEQ_OST_MDT0; /* XXX: support for CMD? */
+
+ rc = osp_precreate_fids(env, d, fid, &grow);
+ if (rc == 1) {
+ /* Current seq has been used up*/
+ if (!osp_is_fid_client(d)) {
+ osp_pre_update_status(d, -ENOSPC);
+ rc = -ENOSPC;
+ }
+ cfs_waitq_signal(&d->opd_pre_waitq);
+ GOTO(out_req, rc);
+ }
+
+ if (!osp_is_fid_client(d)) {
+ /* Non-FID client will always send seq 0 because of
+ * compatiblity */
+ LASSERTF(fid_is_idif(fid), "Invalid fid "DFID"\n", PFID(fid));
+ fid->f_seq = 0;
+ }
+
+ ostid_fid_pack(fid, &body->oa.o_oi);
body->oa.o_valid = OBD_MD_FLGROUP;
ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
if (rc) {
- CERROR("%s: can't precreate: rc = %d\n",
- d->opd_obd->obd_name, rc);
+ CERROR("%s: can't precreate: rc = %d\n", d->opd_obd->obd_name,
+ rc);
GOTO(out_req, rc);
}
LASSERT(req->rq_transno == 0);
if (body == NULL)
GOTO(out_req, rc = -EPROTO);
- CDEBUG(D_HA, "%s: new last_created "LPU64"\n", d->opd_obd->obd_name,
- body->oa.o_id);
- LASSERT(body->oa.o_id > d->opd_pre_used_id);
+ fid_ostid_unpack(fid, &body->oa.o_oi, d->opd_index);
+ LASSERTF(lu_fid_diff(fid, &d->opd_pre_used_fid) > 0,
+ "reply fid "DFID" pre used fid "DFID"\n", PFID(fid),
+ PFID(&d->opd_pre_used_fid));
+
+ CDEBUG(D_HA, "%s: new last_created "DFID"\n", d->opd_obd->obd_name,
+ PFID(fid));
- diff = body->oa.o_id - d->opd_pre_last_created;
+ diff = lu_fid_diff(fid, &d->opd_pre_last_created_fid);
spin_lock(&d->opd_pre_lock);
if (diff < grow) {
* next time if needed */
d->opd_pre_grow_slow = 0;
}
- d->opd_pre_last_created = body->oa.o_id;
+
+ d->opd_pre_last_created_fid = *fid;
spin_unlock(&d->opd_pre_lock);
- CDEBUG(D_OTHER, "current precreated pool: %llu-%llu\n",
- d->opd_pre_used_id, d->opd_pre_last_created);
+ CDEBUG(D_OTHER, "current precreated pool: "DFID"-"DFID"\n",
+ PFID(&d->opd_pre_used_fid), PFID(&d->opd_pre_last_created_fid));
out_req:
/* now we can wakeup all users awaiting for objects */
osp_pre_update_status(d, rc);
RETURN(rc);
}
-
-static int osp_get_lastid_from_ost(struct osp_device *d)
+static int osp_get_lastfid_from_ost(struct osp_device *d)
{
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
struct obd_import *imp;
- obd_id *reply;
+ struct lu_fid *last_fid = &d->opd_last_used_fid;
char *tmp;
- int rc;
+ int rc;
+ ENTRY;
imp = d->opd_obd->u.cli.cl_import;
LASSERT(imp);
- req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_ID);
+ req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_FID);
if (req == NULL)
RETURN(-ENOMEM);
req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
- RCL_CLIENT, sizeof(KEY_LAST_ID));
+ RCL_CLIENT, sizeof(KEY_LAST_FID));
+
+ req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT, sizeof(*last_fid));
+
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
if (rc) {
ptlrpc_request_free(req);
}
tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
- memcpy(tmp, KEY_LAST_ID, sizeof(KEY_LAST_ID));
+ memcpy(tmp, KEY_LAST_FID, sizeof(KEY_LAST_FID));
req->rq_no_delay = req->rq_no_resend = 1;
+ fid_cpu_to_le(last_fid, last_fid);
+ tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
+ memcpy(tmp, last_fid, sizeof(*last_fid));
ptlrpc_request_set_replen(req);
+
rc = ptlrpc_queue_wait(req);
if (rc) {
/* bad-bad OST.. let sysadm sort this out */
+ if (rc == -ENOTSUPP) {
+ CERROR("%s: server does not support FID: rc = %d\n",
+ d->opd_obd->obd_name, -ENOTSUPP);
+ }
ptlrpc_set_import_active(imp, 0);
GOTO(out, rc);
}
- reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
- if (reply == NULL)
+ last_fid = req_capsule_server_get(&req->rq_pill, &RMF_FID);
+ if (last_fid == NULL || !fid_is_sane(last_fid)) {
+ CERROR("%s: Got insane last_fid "DFID"\n",
+ d->opd_obd->obd_name, PFID(last_fid));
GOTO(out, rc = -EPROTO);
+ }
+
+ /* Only update the last used fid, if the OST has objects for
+ * this sequence, i.e. fid_oid > 0 */
+ if (fid_oid(last_fid) > 0)
+ d->opd_last_used_fid = *last_fid;
- d->opd_last_used_id = *reply;
- CDEBUG(D_HA, "%s: got last_id "LPU64" from OST\n",
- d->opd_obd->obd_name, d->opd_last_used_id);
+ CDEBUG(D_HA, "%s: Got insane last_fid "DFID"\n", d->opd_obd->obd_name,
+ PFID(last_fid));
out:
ptlrpc_req_finished(req);
RETURN(rc);
-
}
/**
* asks OST to clean precreate orphans
* and gets next id for new objects
*/
-static int osp_precreate_cleanup_orphans(struct osp_device *d)
+static int osp_precreate_cleanup_orphans(struct lu_env *env,
+ struct osp_device *d)
{
+ struct osp_thread_info *osi = osp_env_info(env);
+ struct lu_fid *last_fid = &osi->osi_fid;
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
struct ost_body *body;
struct l_wait_info lwi = { 0 };
int update_status = 0;
int rc;
+ int diff;
ENTRY;
if (!osp_precreate_running(d) || d->opd_got_disconnected)
GOTO(out, rc = -EAGAIN);
- CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
- d->opd_obd->obd_name, d->opd_last_used_id);
+ CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
+ d->opd_obd->obd_name, PFID(&d->opd_last_used_fid));
- if (d->opd_last_used_id < 2) {
- /* lastid looks strange... ask OST */
- rc = osp_get_lastid_from_ost(d);
+ *last_fid = d->opd_last_used_fid;
+ /* The OSP should already get the valid seq now */
+ LASSERT(!fid_is_zero(last_fid));
+ if (fid_oid(&d->opd_last_used_fid) < 2) {
+ /* lastfid looks strange... ask OST */
+ rc = osp_get_lastfid_from_ost(d);
if (rc)
GOTO(out, rc);
}
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
if (rc) {
ptlrpc_request_free(req);
+ req = NULL;
GOTO(out, rc);
}
body->oa.o_flags = OBD_FL_DELORPHAN;
body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
- body->oa.o_seq = FID_SEQ_OST_MDT0;
-
- body->oa.o_id = d->opd_last_used_id;
+ if (osp_is_fid_client(d))
+ body->oa.o_seq = fid_seq(&d->opd_last_used_fid);
+ else
+ body->oa.o_seq = 0;
+ /* remove from NEXT after used one */
+ body->oa.o_id = fid_oid(&d->opd_last_used_fid);
ptlrpc_request_set_replen(req);
/*
* OST provides us with id new pool starts from in body->oa.o_id
*/
+ fid_ostid_unpack(last_fid, &body->oa.o_oi, d->opd_index);
+ CDEBUG(D_INFO, "%s: last_fid "DFID" server last fid "DFID"\n",
+ d->opd_obd->obd_name, PFID(&d->opd_last_used_fid),
+ PFID(last_fid));
+
spin_lock(&d->opd_pre_lock);
- if (le64_to_cpu(d->opd_last_used_id) > body->oa.o_id) {
- d->opd_pre_grow_count = OST_MIN_PRECREATE +
- le64_to_cpu(d->opd_last_used_id) -
- body->oa.o_id;
- d->opd_pre_last_created = le64_to_cpu(d->opd_last_used_id);
+ diff = lu_fid_diff(&d->opd_last_used_fid, last_fid);
+ if (diff > 0) {
+ d->opd_pre_grow_count = OST_MIN_PRECREATE + diff;
+ d->opd_pre_last_created_fid = d->opd_last_used_fid;
} else {
d->opd_pre_grow_count = OST_MIN_PRECREATE;
- d->opd_pre_last_created = body->oa.o_id;
+ d->opd_pre_last_created_fid = *last_fid;
}
/*
* This empties the pre-creation pool and effectively blocks any new
* reservations.
*/
- d->opd_pre_used_id = d->opd_pre_last_created;
+ LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
+ LUSTRE_DATA_SEQ_MAX_WIDTH);
+ d->opd_pre_used_fid = d->opd_pre_last_created_fid;
d->opd_pre_grow_slow = 0;
spin_unlock(&d->opd_pre_lock);
- CDEBUG(D_HA, "%s: Got last_id "LPU64" from OST, last_used is "LPU64
- ", pre_used "LPU64"\n", d->opd_obd->obd_name, body->oa.o_id,
- le64_to_cpu(d->opd_last_used_id), d->opd_pre_used_id);
-
+ CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
+ "last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
+ PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
out:
if (req)
ptlrpc_req_finished(req);
cfs_waitq_signal(&d->opd_pre_user_waitq);
}
+static int osp_init_pre_fid(struct osp_device *osp)
+{
+ struct lu_env env;
+ struct osp_thread_info *osi;
+ struct lu_client_seq *cli_seq;
+ struct lu_fid *last_fid;
+ int rc;
+ ENTRY;
+
+ /* Return if last_used fid has been initialized */
+ if (!fid_is_zero(&osp->opd_last_used_fid))
+ RETURN(0);
+
+ rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc) {
+ CERROR("%s: init env error: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ osi = osp_env_info(&env);
+ last_fid = &osi->osi_fid;
+ fid_zero(last_fid);
+ /* For a freshed fs, it will allocate a new sequence first */
+ if (osp_is_fid_client(osp)) {
+ cli_seq = osp->opd_obd->u.cli.cl_seq;
+ rc = seq_client_get_seq(&env, cli_seq, &last_fid->f_seq);
+ if (rc != 0) {
+ CERROR("%s: alloc fid error: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+ } else {
+ last_fid->f_seq = fid_idif_seq(1, osp->opd_index);
+ }
+ last_fid->f_oid = 1;
+ last_fid->f_ver = 0;
+
+ spin_lock(&osp->opd_pre_lock);
+ osp->opd_last_used_fid = *last_fid;
+ osp->opd_pre_used_fid = *last_fid;
+ osp->opd_pre_last_created_fid = *last_fid;
+ spin_unlock(&osp->opd_pre_lock);
+ rc = osp_write_last_oid_seq_files(&env, osp, last_fid, 1);
+ if (rc != 0) {
+ CERROR("%s: write fid error: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+out:
+ lu_env_fini(&env);
+ RETURN(rc);
+}
+
static int osp_precreate_thread(void *_arg)
{
struct osp_device *d = _arg;
struct ptlrpc_thread *thread = &d->opd_pre_thread;
struct l_wait_info lwi = { 0 };
char pname[16];
+ struct lu_env env;
int rc;
ENTRY;
- sprintf(pname, "osp-pre-%u\n", d->opd_index);
+ sprintf(pname, "osp-pre-%u", d->opd_index);
cfs_daemonize(pname);
+ rc = lu_env_init(&env, d->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc) {
+ CERROR("%s: init env error: rc = %d\n", d->opd_obd->obd_name,
+ rc);
+ RETURN(rc);
+ }
+
spin_lock(&d->opd_pre_lock);
thread->t_flags = SVC_RUNNING;
spin_unlock(&d->opd_pre_lock);
while (osp_precreate_running(d)) {
l_wait_event(d->opd_pre_waitq,
!osp_precreate_running(d) ||
- d->opd_new_connection, &lwi);
+ d->opd_new_connection,
+ &lwi);
if (!osp_precreate_running(d))
break;
if (!d->opd_new_connection)
continue;
- /* got connected */
d->opd_new_connection = 0;
d->opd_got_disconnected = 0;
break;
}
+ if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL) {
+ /* Get new sequence for client first */
+ LASSERT(d->opd_exp != NULL);
+ d->opd_obd->u.cli.cl_seq->lcs_exp =
+ class_export_get(d->opd_exp);
+ rc = osp_init_pre_fid(d);
+ if (rc != 0) {
+ class_export_put(d->opd_exp);
+ d->opd_obd->u.cli.cl_seq->lcs_exp = NULL;
+ CERROR("%s: init pre fid error: rc = %d\n",
+ d->opd_obd->obd_name, rc);
+ continue;
+ }
+ }
+
osp_statfs_update(d);
/*
* Clean up orphans or recreate missing objects.
*/
- rc = osp_precreate_cleanup_orphans(d);
+ rc = osp_precreate_cleanup_orphans(&env, d);
if (rc != 0)
continue;
-
/*
* connected, can handle precreates now
*/
while (osp_precreate_running(d)) {
l_wait_event(d->opd_pre_waitq,
!osp_precreate_running(d) ||
- osp_precreate_near_empty(d) ||
+ osp_precreate_near_empty(&env, d) ||
osp_statfs_need_update(d) ||
d->opd_got_disconnected, &lwi);
if (osp_statfs_need_update(d))
osp_statfs_update(d);
- if (osp_precreate_near_empty(d)) {
- rc = osp_precreate_send(d);
+ /* To avoid handling different seq in precreate/orphan
+ * cleanup, it will hold precreate until current seq is
+ * used up. */
+ if (unlikely(osp_precreate_end_seq(&env, d) &&
+ !osp_create_end_seq(&env, d)))
+ continue;
+
+ if (unlikely(osp_precreate_end_seq(&env, d) &&
+ osp_create_end_seq(&env, d))) {
+ LCONSOLE_INFO("%s:"LPX64" is used up."
+ " Update to new seq\n",
+ d->opd_obd->obd_name,
+ fid_seq(&d->opd_pre_last_created_fid));
+ rc = osp_precreate_rollover_new_seq(&env, d);
+ if (rc)
+ continue;
+ }
+
+ if (osp_precreate_near_empty(&env, d)) {
+ rc = osp_precreate_send(&env, d);
/* osp_precreate_send() sets opd_pre_status
* in case of error, that prevent the using of
* failed device. */
- if (rc != 0 && rc != -ENOSPC &&
+ if (rc < 0 && rc != -ENOSPC &&
rc != -ETIMEDOUT && rc != -ENOTCONN)
CERROR("%s: cannot precreate objects:"
" rc = %d\n",
}
thread->t_flags = SVC_STOPPED;
+ lu_env_fini(&env);
cfs_waitq_signal(&thread->t_ctl_waitq);
RETURN(0);
}
-static int osp_precreate_ready_condition(struct osp_device *d)
+static int osp_precreate_ready_condition(const struct lu_env *env,
+ struct osp_device *d)
{
- __u64 next;
-
if (d->opd_pre_recovering)
return 0;
/* ready if got enough precreated objects */
/* we need to wait for others (opd_pre_reserved) and our object (+1) */
- next = d->opd_pre_used_id + d->opd_pre_reserved + 1;
- if (next <= d->opd_pre_last_created)
+ if (d->opd_pre_reserved + 1 < osp_objs_precreated(env, d))
return 1;
/* ready if OST reported no space and no destoys in progress */
{
struct osp_device *d = data;
- LCONSOLE_WARN("%s: slow creates, last="LPU64", next="LPU64", "
+ LCONSOLE_WARN("%s: slow creates, last="DFID", next="DFID", "
"reserved="LPU64", syn_changes=%lu, "
"syn_rpc_in_progress=%d, status=%d\n",
- d->opd_obd->obd_name, d->opd_pre_last_created,
- d->opd_pre_used_id, d->opd_pre_reserved,
+ d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
+ PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
d->opd_syn_changes, d->opd_syn_rpc_in_progress,
d->opd_pre_status);
ENTRY;
- LASSERT(d->opd_pre_last_created >= d->opd_pre_used_id);
+ LASSERTF(osp_objs_precreated(env, d) >= 0, "Last created FID "DFID
+ "Next FID "DFID"\n", PFID(&d->opd_pre_last_created_fid),
+ PFID(&d->opd_pre_used_fid));
/*
* wait till:
* - preallocation is done
* - no free space expected soon
* - can't connect to OST for too long (obd_timeout)
+ * - OST can allocate fid sequence.
*/
while ((rc = d->opd_pre_status) == 0 || rc == -ENOSPC ||
- rc == -ENODEV) {
+ rc == -ENODEV || rc == -EAGAIN) {
#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
/*
/*
* increase number of precreations
*/
+ precreated = osp_objs_precreated(env, d);
if (d->opd_pre_grow_count < d->opd_pre_max_grow_count &&
d->opd_pre_grow_slow == 0 &&
- (d->opd_pre_last_created - d->opd_pre_used_id <=
- d->opd_pre_grow_count / 4 + 1)) {
+ precreated <= (d->opd_pre_grow_count / 4 + 1)) {
spin_lock(&d->opd_pre_lock);
d->opd_pre_grow_slow = 1;
d->opd_pre_grow_count *= 2;
}
spin_lock(&d->opd_pre_lock);
- precreated = d->opd_pre_last_created - d->opd_pre_used_id;
+ precreated = osp_objs_precreated(env, d);
if (precreated > d->opd_pre_reserved &&
!d->opd_pre_recovering) {
d->opd_pre_reserved++;
rc = 0;
/* XXX: don't wake up if precreation is in progress */
- if (osp_precreate_near_empty_nolock(d))
+ if (osp_precreate_near_empty_nolock(env, d) &&
+ !osp_precreate_end_seq_nolock(env, d))
cfs_waitq_signal(&d->opd_pre_waitq);
break;
break;
l_wait_event(d->opd_pre_user_waitq,
- osp_precreate_ready_condition(d), &lwi);
+ osp_precreate_ready_condition(env, d), &lwi);
}
RETURN(rc);
/*
* this function relies on reservation made before
*/
-__u64 osp_precreate_get_id(struct osp_device *d)
+int osp_precreate_get_fid(const struct lu_env *env, struct osp_device *d,
+ struct lu_fid *fid)
{
- obd_id objid;
-
/* grab next id from the pool */
spin_lock(&d->opd_pre_lock);
- LASSERT(d->opd_pre_used_id < d->opd_pre_last_created);
- objid = ++d->opd_pre_used_id;
+
+ LASSERTF(lu_fid_diff(&d->opd_pre_used_fid,
+ &d->opd_pre_last_created_fid) < 0,
+ "next fid "DFID" last created fid "DFID"\n",
+ PFID(&d->opd_pre_used_fid),
+ PFID(&d->opd_pre_last_created_fid));
+
+ d->opd_pre_used_fid.f_oid++;
+ memcpy(fid, &d->opd_pre_used_fid, sizeof(*fid));
d->opd_pre_reserved--;
/*
* last_used_id must be changed along with getting new id otherwise
* we might miscalculate gap causing object loss or leak
*/
- osp_update_last_id(d, objid);
+ osp_update_last_fid(d, fid);
spin_unlock(&d->opd_pre_lock);
/*
if (unlikely(d->opd_pre_reserved == 0 && d->opd_pre_status))
cfs_waitq_signal(&d->opd_pre_waitq);
- return objid;
+ return 0;
}
/*
/* initially precreation isn't ready */
d->opd_pre_status = -EAGAIN;
- d->opd_pre_used_id = 0;
- d->opd_pre_last_created = 0;
+ fid_zero(&d->opd_pre_used_fid);
+ d->opd_pre_used_fid.f_oid = 1;
+ fid_zero(&d->opd_pre_last_created_fid);
+ d->opd_pre_last_created_fid.f_oid = 1;
d->opd_pre_reserved = 0;
d->opd_got_disconnected = 1;
d->opd_pre_grow_slow = 0;