X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_sync.c;h=4f777e861dbc0c061ce0aa8cca0fe4ce19a73999;hp=ffe2db1bf38e625eaba4a21643d91c22ec786fe0;hb=12a130c71b88351bc361d1f741cad0a4c1fbb257;hpb=d10200a80770f0029d1d665af954187b9ad883df diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index ffe2db1..4f777e8 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -96,6 +96,7 @@ struct osp_job_req_args { struct ptlrpc_replay_async_args jra_raa; struct list_head jra_committed_link; struct list_head jra_inflight_link; + struct llog_cookie jra_lcookie; __u32 jra_magic; }; @@ -610,9 +611,13 @@ static int osp_sync_interpret(const struct lu_env *env, * This is just a tiny helper function to put the request on the sending list * * \param[in] d OSP device + * \param[in] llh llog handle where the record is stored + * \param[in] h llog record * \param[in] req request */ static void osp_sync_send_new_rpc(struct osp_device *d, + struct llog_handle *llh, + struct llog_rec_hdr *h, struct ptlrpc_request *req) { struct osp_job_req_args *jra; @@ -622,6 +627,9 @@ static void osp_sync_send_new_rpc(struct osp_device *d, jra = ptlrpc_req_async_args(req); jra->jra_magic = OSP_JOB_MAGIC; + jra->jra_lcookie.lgc_lgl = llh->lgh_id; + jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT; + jra->jra_lcookie.lgc_index = h->lrh_index; INIT_LIST_HEAD(&jra->jra_committed_link); spin_lock(&d->opd_syn_lock); list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list); @@ -640,8 +648,6 @@ static void osp_sync_send_new_rpc(struct osp_device *d, * are initialized. * * \param[in] d OSP device - * \param[in] llh llog handle where the record is stored - * \param[in] h llog record * \param[in] op type of the change * \param[in] format request format to be used * @@ -649,13 +655,10 @@ static void osp_sync_send_new_rpc(struct osp_device *d, * \retval ERR_PTR(errno) on error */ static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d, - struct llog_handle *llh, - struct llog_rec_hdr *h, ost_cmd_t op, const struct req_format *format) { struct ptlrpc_request *req; - struct ost_body *body; struct obd_import *imp; int rc; @@ -676,18 +679,6 @@ static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d, return ERR_PTR(rc); } - /* - * this is a trick: to save on memory allocations we put cookie - * into the request, but don't set corresponded flag in o_valid - * so that OST doesn't interpret this cookie. once the request - * is committed on OST we take cookie from the request and cancel - */ - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - body->oa.o_lcookie.lgc_lgl = llh->lgh_id; - body->oa.o_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT; - body->oa.o_lcookie.lgc_index = h->lrh_index; - req->rq_interpret_reply = osp_sync_interpret; req->rq_commit_cb = osp_sync_request_commit_cb; req->rq_cb_data = d; @@ -733,7 +724,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d, RETURN(1); } - req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR); + req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR); if (IS_ERR(req)) RETURN(PTR_ERR(req)); @@ -750,7 +741,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d, else body->oa.o_valid |= rec->lsr_valid; - osp_sync_send_new_rpc(d, req); + osp_sync_send_new_rpc(d, llh, h, req); RETURN(0); } @@ -780,7 +771,7 @@ static int osp_sync_new_unlink_job(struct osp_device *d, ENTRY; LASSERT(h->lrh_type == MDS_UNLINK_REC); - req = osp_sync_new_job(d, llh, h, OST_DESTROY, &RQF_OST_DESTROY); + req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY); if (IS_ERR(req)) RETURN(PTR_ERR(req)); @@ -793,7 +784,7 @@ static int osp_sync_new_unlink_job(struct osp_device *d, if (rec->lur_count) body->oa.o_valid |= OBD_MD_FLOBJCOUNT; - osp_sync_send_new_rpc(d, req); + osp_sync_send_new_rpc(d, llh, h, req); RETURN(0); } @@ -826,8 +817,7 @@ static int osp_sync_new_unlink64_job(struct osp_device *d, ENTRY; LASSERT(h->lrh_type == MDS_UNLINK64_REC); - req = osp_sync_new_job(d, llh, h, OST_DESTROY, - &RQF_OST_DESTROY); + req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY); if (IS_ERR(req)) RETURN(PTR_ERR(req)); @@ -840,7 +830,7 @@ static int osp_sync_new_unlink64_job(struct osp_device *d, body->oa.o_misc = rec->lur_count; body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID | OBD_MD_FLOBJCOUNT; - osp_sync_send_new_rpc(d, req); + osp_sync_send_new_rpc(d, llh, h, req); RETURN(0); } @@ -1040,7 +1030,7 @@ static void osp_sync_process_committed(const struct lu_env *env, * called we can check committness directly */ if (req->rq_import_generation == imp->imp_generation) { rc = llog_cat_cancel_records(env, llh, 1, - &body->oa.o_lcookie); + &jra->jra_lcookie); if (rc) CERROR("%s: can't cancel record: %d\n", obd->obd_name, rc); @@ -1169,6 +1159,12 @@ static int osp_sync_thread(void *_arg) if (rc) { CERROR("%s: can't initialize env: rc = %d\n", obd->obd_name, rc); + + spin_lock(&d->opd_syn_lock); + thread->t_flags = SVC_STOPPED; + spin_unlock(&d->opd_syn_lock); + wake_up(&thread->t_ctl_waitq); + RETURN(rc); } @@ -1383,9 +1379,10 @@ static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d) struct llog_ctxt *ctxt; ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT); - if (ctxt != NULL) + if (ctxt) { llog_cat_close(env, ctxt->loc_handle); - llog_cleanup(env, ctxt); + llog_cleanup(env, ctxt); + } } /** @@ -1408,6 +1405,19 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d) ENTRY; + d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT; + d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS; + spin_lock_init(&d->opd_syn_lock); + init_waitqueue_head(&d->opd_syn_waitq); + init_waitqueue_head(&d->opd_syn_barrier_waitq); + thread_set_flags(&d->opd_syn_thread, SVC_INIT); + init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq); + INIT_LIST_HEAD(&d->opd_syn_inflight_list); + INIT_LIST_HEAD(&d->opd_syn_committed_there); + + if (d->opd_storage->dd_rdonly) + RETURN(0); + rc = osp_sync_id_traction_init(d); if (rc) RETURN(rc); @@ -1425,15 +1435,6 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d) /* * Start synchronization thread */ - d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT; - d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS; - spin_lock_init(&d->opd_syn_lock); - init_waitqueue_head(&d->opd_syn_waitq); - init_waitqueue_head(&d->opd_syn_barrier_waitq); - init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq); - INIT_LIST_HEAD(&d->opd_syn_inflight_list); - INIT_LIST_HEAD(&d->opd_syn_committed_there); - task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u", d->opd_index, d->opd_group); if (IS_ERR(task)) { @@ -1469,9 +1470,11 @@ int osp_sync_fini(struct osp_device *d) ENTRY; - thread->t_flags = SVC_STOPPING; - wake_up(&d->opd_syn_waitq); - wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED); + if (!thread_is_init(thread) && !thread_is_stopped(thread)) { + thread->t_flags = SVC_STOPPING; + wake_up(&d->opd_syn_waitq); + wait_event(thread->t_ctl_waitq, thread_is_stopped(thread)); + } /* * unregister transaction callbacks only when sync thread