static inline int osp_sync_in_flight_conflict(struct osp_device *d,
struct llog_rec_hdr *h)
{
- struct osp_job_req_args *jra;
- struct ost_id ostid;
- int conflict = 0;
+ struct osp_job_req_args *jra;
+ struct ost_id ostid;
+ struct osp_job_args *ja;
+ int conflict = 0;
if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
list_empty(&d->opd_sync_in_flight_list))
LASSERT(body);
if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
+ spin_unlock(&d->opd_sync_lock);
+ return 1;
+ }
+ }
+
+ list_for_each_entry(ja, &d->opd_sync_error_list, ja_error_link) {
+ if (memcmp(&ostid, &ja->ja_body.oa.o_oi, sizeof(ostid)) == 0) {
conflict = 1;
break;
}
return correct_id;
}
+static inline bool osp_sync_can_send_delayed(struct osp_device *d)
+{
+ struct osp_job_args *ja;
+ ktime_t now = ktime_get();
+
+ ENTRY;
+
+ if (likely(list_empty(&d->opd_sync_error_list)))
+ RETURN(false);
+
+ ja = list_first_entry(&d->opd_sync_error_list, typeof(*ja),
+ ja_error_link);
+
+ if (ktime_before(ja->ja_time, now)) {
+ CDEBUG(D_INFO, "can send delayed osp job %px "DOSTID"\n", ja,
+ POSTID(&ja->ja_body.oa.o_oi));
+ RETURN(true);
+ }
+ RETURN(false);
+}
+
/**
* Check and return ready-for-new status.
*
return 0;
if (d->opd_sync_prev_done == 0)
return 1;
+ if (osp_sync_can_send_delayed(d))
+ return 1;
if (atomic_read(&d->opd_sync_changes) == 0)
return 0;
if (rec == NULL)
wake_up(&d->opd_sync_waitq);
} else if (rc) {
struct obd_import *imp = req->rq_import;
+ struct osp_job_args *ja = NULL;
/*
* error happened, we'll try to repeat on next boot ?
*/
LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) > 0);
atomic_dec(&d->opd_sync_rpcs_in_progress);
}
-
+ /* limit error list by 1K objects */
+ if (atomic_read(&d->opd_sync_error_count) < 1000)
+ OBD_ALLOC_PTR(ja);
+ if (likely(ja)) {
+ struct ost_body *body;
+
+ body = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_BODY);
+ memcpy(&ja->ja_body, &body->oa, sizeof(ja->ja_body));
+
+ ja->ja_lcookie = jra->jra_lcookie;
+ ja->ja_op = lustre_msg_get_opc(req->rq_reqmsg);
+ INIT_LIST_HEAD(&ja->ja_error_link);
+ /* repeat an operation after OBD_TIMEOUT / 10 */
+ ja->ja_time = ktime_add_ms(ktime_get(),
+ obd_timeout / 10 * MSEC_PER_SEC);
+ spin_lock(&d->opd_sync_lock);
+ list_add_tail(&ja->ja_error_link,
+ &d->opd_sync_error_list);
+ spin_unlock(&d->opd_sync_lock);
+ atomic_inc(&d->opd_sync_error_count);
+ CDEBUG(D_INFO, "Added %px object "DOSTID"\n", ja,
+ POSTID(&ja->ja_body.oa.o_oi));
+ }
wake_up(&d->opd_sync_waitq);
} else if (d->opd_pre != NULL &&
unlikely(d->opd_pre_status == -ENOSPC)) {
* \retval ERR_PTR(errno) on error
*/
static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
- enum ost_cmd op,
- const struct req_format *format)
+ enum ost_cmd op)
{
- struct ptlrpc_request *req;
- struct obd_import *imp;
- int rc;
+ struct ptlrpc_request *req;
+ struct obd_import *imp;
+ struct req_format *format;
+ int rc;
+
+ ENTRY;
/* Prepare the request */
imp = d->opd_obd->u.cli.cl_import;
if (CFS_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
RETURN(ERR_PTR(-ENOMEM));
+ switch (op) {
+ case OST_DESTROY:
+ format = &RQF_OST_DESTROY;
+ break;
+ case OST_SETATTR:
+ format = &RQF_OST_SETATTR;
+ break;
+ default:
+ RETURN(ERR_PTR(-EOPNOTSUPP));
+ };
+
req = ptlrpc_request_alloc(imp, format);
if (req == NULL)
RETURN(ERR_PTR(-ENOMEM));
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
if (rc) {
ptlrpc_req_put(req);
- return ERR_PTR(rc);
+ RETURN(ERR_PTR(rc));
}
req->rq_interpret_reply = osp_sync_interpret;
ptlrpc_request_set_replen(req);
- return req;
+ RETURN(req);
}
/**
RETURN(1);
}
- req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
+ req = osp_sync_new_job(d, OST_SETATTR);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK_REC);
- req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK64_REC);
- req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
RETURN(0);
}
+static int osp_sync_new_err_job(struct osp_device *d,
+ struct osp_job_args *ja)
+{
+ struct ptlrpc_request *req = NULL;
+ struct ost_body *body;
+ struct osp_job_req_args *jra;
+
+ ENTRY;
+
+ req = osp_sync_new_job(d, ja->ja_op);
+ if (IS_ERR(req))
+ RETURN(PTR_ERR(req));
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL) {
+ ptlrpc_req_put(req);
+ RETURN(-EFAULT);
+ }
+ memcpy(body, &ja->ja_body, sizeof(*body));
+
+ LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
+ d->opd_sync_max_rpcs_in_flight);
+
+ jra = ptlrpc_req_async_args(jra, req);
+ jra->jra_magic = OSP_JOB_MAGIC;
+ memcpy(&jra->jra_lcookie, &ja->ja_lcookie, sizeof(jra->jra_lcookie));
+ INIT_LIST_HEAD(&jra->jra_committed_link);
+ spin_lock(&d->opd_sync_lock);
+ list_add_tail(&jra->jra_in_flight_link, &d->opd_sync_in_flight_list);
+ spin_unlock(&d->opd_sync_lock);
+
+ CDEBUG(D_HA, "%s repeating %d operation object "DOSTID"\n",
+ d->opd_obd->obd_name, ja->ja_op, POSTID(&ja->ja_body.oa.o_oi));
+
+ atomic_dec(&d->opd_sync_error_count);
+ OBD_FREE_PTR(ja);
+ ptlrpcd_add_req(req);
+
+ RETURN(0);
+
+}
/**
* Process llog records.
*
RETURN_EXIT;
}
-
/*
* now we prepare and fill requests to OST, put them on the queue
* and fire after next commit callback
EXIT;
}
+static bool osp_sync_process_error_list(const struct lu_env *env,
+ struct osp_device *d)
+{
+ struct osp_job_args *ja;
+ ktime_t now = ktime_get();
+
+ if (likely(list_empty(&d->opd_sync_error_list)))
+ return false;
+
+ spin_lock(&d->opd_sync_lock);
+ ja = list_first_entry(&d->opd_sync_error_list, typeof(*ja),
+ ja_error_link);
+ if (ktime_before(ja->ja_time, now))
+ list_del_init(&ja->ja_error_link);
+ else
+ ja = NULL;
+ spin_unlock(&d->opd_sync_lock);
+
+ if (ja) {
+ /* same counters as osp_sync_process_record */
+ atomic_inc(&d->opd_sync_rpcs_in_flight);
+ atomic_inc(&d->opd_sync_rpcs_in_progress);
+ if (osp_sync_new_err_job(d, ja)) {
+ atomic_dec(&d->opd_sync_rpcs_in_flight);
+ atomic_dec(&d->opd_sync_rpcs_in_progress);
+ /* insert a ja back to list in case of error */
+ spin_lock(&d->opd_sync_lock);
+ list_add(&ja->ja_error_link, &d->opd_sync_error_list);
+ spin_unlock(&d->opd_sync_lock);
+ }
+ return true;
+ }
+ return false;
+}
+
/**
* The core of the syncing mechanism.
*
struct llog_rec_hdr *rec,
void *data)
{
- struct osp_device *d = data;
+ struct osp_device *d = data;
+ long timeout = 1;
do {
if (!d->opd_sync_task) {
/* if we there are changes to be processed and we have
* resources for this ... do now */
if (osp_sync_can_process_new(d, rec)) {
- if (llh == NULL) {
- /* ask llog for another record */
- return 0;
+ if (osp_sync_process_error_list(env, d))
+ continue;
+
+ if (timeout) {
+ if (llh == NULL) {
+ /* ask llog for another record */
+ return 0;
+ }
+ osp_sync_process_record(env, d, llh, rec);
+ llh = NULL;
+ rec = NULL;
}
- osp_sync_process_record(env, d, llh, rec);
- llh = NULL;
- rec = NULL;
}
if (CFS_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK) &&
cfs_fail_val != 1)
msleep(1 * MSEC_PER_SEC);
- wait_event_idle(d->opd_sync_waitq,
+ if (list_empty(&d->opd_sync_error_list)) {
+ wait_event_idle(d->opd_sync_waitq,
!d->opd_sync_task ||
osp_sync_can_process_new(d, rec) ||
!list_empty(&d->opd_sync_committed_there));
+ } else {
+ timeout = wait_event_idle_timeout(d->opd_sync_waitq,
+ !d->opd_sync_task ||
+ osp_sync_can_process_new(d, rec) ||
+ !list_empty(&d->opd_sync_committed_there),
+ cfs_time_seconds(1));
+ }
} while (1);
}
*/
static int osp_sync_thread(void *_args)
{
- struct osp_sync_args *args = _args;
- struct osp_device *d = args->osa_dev;
- struct llog_ctxt *ctxt;
- struct obd_device *obd = d->opd_obd;
- struct llog_handle *llh;
- struct lu_env *env = &args->osa_env;
- int rc, count;
- bool wrapped;
+ struct osp_sync_args *args = _args;
+ struct osp_device *d = args->osa_dev;
+ struct llog_ctxt *ctxt;
+ struct obd_device *obd = d->opd_obd;
+ struct llog_handle *llh;
+ struct lu_env *env = &args->osa_env;
+ struct osp_job_args *ja, *tmp;
+ int rc, count;
+ bool wrapped;
ENTRY;
rc = llog_cleanup(env, ctxt);
if (rc)
CERROR("can't cleanup llog: %d\n", rc);
+ list_for_each_entry_safe(ja, tmp, &d->opd_sync_error_list,
+ ja_error_link) {
+ CDEBUG(D_HA, "%s: failed to sync object "DOSTID"\n",
+ d->opd_obd->obd_name, POSTID(&ja->ja_body.oa.o_oi));
+ list_del(&ja->ja_error_link);
+ OBD_FREE_PTR(ja);
+ }
out:
LASSERTF(atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
"%s: %d %d %sempty\n", d->opd_obd->obd_name,
init_waitqueue_head(&d->opd_sync_barrier_waitq);
INIT_LIST_HEAD(&d->opd_sync_in_flight_list);
INIT_LIST_HEAD(&d->opd_sync_committed_there);
+ INIT_LIST_HEAD(&d->opd_sync_error_list);
+ atomic_set(&d->opd_sync_error_count, 0);
if (d->opd_storage->dd_rdonly)
RETURN(0);