struct ptlrpc_replay_async_args jra_raa;
struct list_head jra_committed_link;
struct list_head jra_inflight_link;
+ struct llog_cookie jra_lcookie;
__u32 jra_magic;
};
* This is just a tiny helper function to put the request on the sending list
*
* \param[in] d OSP device
+ * \param[in] llh llog handle where the record is stored
+ * \param[in] h llog record
* \param[in] req request
*/
static void osp_sync_send_new_rpc(struct osp_device *d,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *h,
struct ptlrpc_request *req)
{
struct osp_job_req_args *jra;
jra = ptlrpc_req_async_args(req);
jra->jra_magic = OSP_JOB_MAGIC;
+ jra->jra_lcookie.lgc_lgl = llh->lgh_id;
+ jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
+ jra->jra_lcookie.lgc_index = h->lrh_index;
INIT_LIST_HEAD(&jra->jra_committed_link);
spin_lock(&d->opd_syn_lock);
list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
* are initialized.
*
* \param[in] d OSP device
- * \param[in] llh llog handle where the record is stored
- * \param[in] h llog record
* \param[in] op type of the change
* \param[in] format request format to be used
*
* \retval ERR_PTR(errno) on error
*/
static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
- struct llog_handle *llh,
- struct llog_rec_hdr *h,
ost_cmd_t op,
const struct req_format *format)
{
struct ptlrpc_request *req;
- struct ost_body *body;
struct obd_import *imp;
int rc;
return ERR_PTR(rc);
}
- /*
- * this is a trick: to save on memory allocations we put cookie
- * into the request, but don't set corresponded flag in o_valid
- * so that OST doesn't interpret this cookie. once the request
- * is committed on OST we take cookie from the request and cancel
- */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body);
- body->oa.o_lcookie.lgc_lgl = llh->lgh_id;
- body->oa.o_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
- body->oa.o_lcookie.lgc_index = h->lrh_index;
-
req->rq_interpret_reply = osp_sync_interpret;
req->rq_commit_cb = osp_sync_request_commit_cb;
req->rq_cb_data = d;
RETURN(1);
}
- req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR);
+ req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
else
body->oa.o_valid |= rec->lsr_valid;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK_REC);
- req = osp_sync_new_job(d, llh, h, OST_DESTROY, &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
if (rec->lur_count)
body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK64_REC);
- req = osp_sync_new_job(d, llh, h, OST_DESTROY,
- &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
body->oa.o_misc = rec->lur_count;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
OBD_MD_FLOBJCOUNT;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
rc = llog_cat_cancel_records(env, llh, 1,
- &body->oa.o_lcookie);
+ &jra->jra_lcookie);
if (rc)
CERROR("%s: can't cancel record: %d\n",
obd->obd_name, rc);
if (rc) {
CERROR("%s: can't initialize env: rc = %d\n",
obd->obd_name, rc);
+
+ spin_lock(&d->opd_syn_lock);
+ thread->t_flags = SVC_STOPPED;
+ spin_unlock(&d->opd_syn_lock);
+ wake_up(&thread->t_ctl_waitq);
+
RETURN(rc);
}
struct llog_ctxt *ctxt;
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt != NULL)
+ if (ctxt) {
llog_cat_close(env, ctxt->loc_handle);
- llog_cleanup(env, ctxt);
+ llog_cleanup(env, ctxt);
+ }
}
/**
ENTRY;
+ d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
+ d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
+ spin_lock_init(&d->opd_syn_lock);
+ init_waitqueue_head(&d->opd_syn_waitq);
+ init_waitqueue_head(&d->opd_syn_barrier_waitq);
+ thread_set_flags(&d->opd_syn_thread, SVC_INIT);
+ init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
+ INIT_LIST_HEAD(&d->opd_syn_inflight_list);
+ INIT_LIST_HEAD(&d->opd_syn_committed_there);
+
+ if (d->opd_storage->dd_rdonly)
+ RETURN(0);
+
rc = osp_sync_id_traction_init(d);
if (rc)
RETURN(rc);
/*
* Start synchronization thread
*/
- d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
- d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
- spin_lock_init(&d->opd_syn_lock);
- init_waitqueue_head(&d->opd_syn_waitq);
- init_waitqueue_head(&d->opd_syn_barrier_waitq);
- init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
- INIT_LIST_HEAD(&d->opd_syn_inflight_list);
- INIT_LIST_HEAD(&d->opd_syn_committed_there);
-
task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
d->opd_index, d->opd_group);
if (IS_ERR(task)) {
ENTRY;
- thread->t_flags = SVC_STOPPING;
- wake_up(&d->opd_syn_waitq);
- wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+ if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
+ thread->t_flags = SVC_STOPPING;
+ wake_up(&d->opd_syn_waitq);
+ wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
+ }
/*
* unregister transaction callbacks only when sync thread