*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
struct ptlrpc_replay_async_args jra_raa;
struct list_head jra_committed_link;
struct list_head jra_inflight_link;
+ struct llog_cookie jra_lcookie;
__u32 jra_magic;
};
static inline int osp_sync_low_in_progress(struct osp_device *d)
{
- return d->opd_syn_rpc_in_progress < d->opd_syn_max_rpc_in_progress;
+ return atomic_read(&d->opd_syn_rpc_in_progress) <
+ d->opd_syn_max_rpc_in_progress;
}
/**
*/
static inline int osp_sync_low_in_flight(struct osp_device *d)
{
- return d->opd_syn_rpc_in_flight < d->opd_syn_max_rpc_in_flight;
+ return atomic_read(&d->opd_syn_rpc_in_flight) <
+ d->opd_syn_max_rpc_in_flight;
}
/**
return 0;
if (d->opd_syn_prev_done == 0)
return 1;
- if (d->opd_syn_changes == 0)
+ if (atomic_read(&d->opd_syn_changes) == 0)
return 0;
if (rec == NULL ||
osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
(unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
(unsigned long)osi->osi_cookie.lgc_index, rc);
- spin_lock(&d->opd_syn_lock);
- d->opd_syn_changes++;
- spin_unlock(&d->opd_syn_lock);
+ atomic_inc(&d->opd_syn_changes);
}
/* return 0 always here, error case just cause no llog record */
RETURN(0);
struct osp_device *d = req->rq_cb_data;
struct osp_job_req_args *jra;
- CDEBUG(D_HA, "commit req %p, transno "LPU64"\n", req, req->rq_transno);
+ CDEBUG(D_HA, "commit req %p, transno %llu\n", req, req->rq_transno);
if (unlikely(req->rq_transno == 0))
return;
*/
LASSERTF(req->rq_transno == 0 ||
req->rq_import_generation < imp->imp_generation,
- "transno "LPU64", rc %d, gen: req %d, imp %d\n",
+ "transno %llu, rc %d, gen: req %d, imp %d\n",
req->rq_transno, rc, req->rq_import_generation,
imp->imp_generation);
if (req->rq_transno == 0) {
/* this is the last time we see the request
* if transno is not zero, then commit cb
* will be called at some point */
- LASSERT(d->opd_syn_rpc_in_progress > 0);
- spin_lock(&d->opd_syn_lock);
- d->opd_syn_rpc_in_progress--;
- spin_unlock(&d->opd_syn_lock);
+ LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
+ atomic_dec(&d->opd_syn_rpc_in_progress);
}
wake_up(&d->opd_syn_waitq);
osp_statfs_need_now(d);
}
- LASSERT(d->opd_syn_rpc_in_flight > 0);
spin_lock(&d->opd_syn_lock);
- d->opd_syn_rpc_in_flight--;
list_del_init(&jra->jra_inflight_link);
spin_unlock(&d->opd_syn_lock);
+ LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
+ atomic_dec(&d->opd_syn_rpc_in_flight);
if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
wake_up(&d->opd_syn_barrier_waitq);
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
- d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
- d->opd_syn_rpc_in_progress);
+ d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+ atomic_read(&d->opd_syn_rpc_in_progress));
osp_sync_check_for_work(d);
* This is just a tiny helper function to put the request on the sending list
*
* \param[in] d OSP device
+ * \param[in] llh llog handle where the record is stored
+ * \param[in] h llog record
* \param[in] req request
*/
static void osp_sync_send_new_rpc(struct osp_device *d,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *h,
struct ptlrpc_request *req)
{
struct osp_job_req_args *jra;
- LASSERT(d->opd_syn_rpc_in_flight <= d->opd_syn_max_rpc_in_flight);
+ LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
+ d->opd_syn_max_rpc_in_flight);
jra = ptlrpc_req_async_args(req);
jra->jra_magic = OSP_JOB_MAGIC;
+ jra->jra_lcookie.lgc_lgl = llh->lgh_id;
+ jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
+ jra->jra_lcookie.lgc_index = h->lrh_index;
INIT_LIST_HEAD(&jra->jra_committed_link);
spin_lock(&d->opd_syn_lock);
list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
* are initialized.
*
* \param[in] d OSP device
- * \param[in] llh llog handle where the record is stored
- * \param[in] h llog record
* \param[in] op type of the change
* \param[in] format request format to be used
*
* \retval ERR_PTR(errno) on error
*/
static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
- struct llog_handle *llh,
- struct llog_rec_hdr *h,
ost_cmd_t op,
const struct req_format *format)
{
struct ptlrpc_request *req;
- struct ost_body *body;
struct obd_import *imp;
int rc;
return ERR_PTR(rc);
}
- /*
- * this is a trick: to save on memory allocations we put cookie
- * into the request, but don't set corresponded flag in o_valid
- * so that OST doesn't interpret this cookie. once the request
- * is committed on OST we take cookie from the request and cancel
- */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body);
- body->oa.o_lcookie.lgc_lgl = llh->lgh_id;
- body->oa.o_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
- body->oa.o_lcookie.lgc_index = h->lrh_index;
-
req->rq_interpret_reply = osp_sync_interpret;
req->rq_commit_cb = osp_sync_request_commit_cb;
req->rq_cb_data = d;
/* lsr_valid can only be 0 or have OBD_MD_{FLUID,FLGID} set,
* so no bits other than these should be set. */
if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID)) != 0) {
- CERROR("%s: invalid setattr record, lsr_valid:"LPU64"\n",
+ CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
d->opd_obd->obd_name, rec->lsr_valid);
/* return 1 on invalid record */
RETURN(1);
}
- req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR);
+ req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
else
body->oa.o_valid |= rec->lsr_valid;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK_REC);
- req = osp_sync_new_job(d, llh, h, OST_DESTROY, &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
if (rec->lur_count)
body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK64_REC);
- req = osp_sync_new_job(d, llh, h, OST_DESTROY,
- &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
body->oa.o_misc = rec->lur_count;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
OBD_MD_FLOBJCOUNT;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
/* notice we increment counters before sending RPC, to be consistent
* in RPC interpret callback which may happen very quickly */
- spin_lock(&d->opd_syn_lock);
- d->opd_syn_rpc_in_flight++;
- d->opd_syn_rpc_in_progress++;
- spin_unlock(&d->opd_syn_lock);
+ atomic_inc(&d->opd_syn_rpc_in_flight);
+ atomic_inc(&d->opd_syn_rpc_in_progress);
switch (rec->lrh_type) {
/* case MDS_UNLINK_REC is kept for compatibility */
break;
}
- spin_lock(&d->opd_syn_lock);
-
/* For all kinds of records, not matter successful or not,
* we should decrease changes and bump last_processed_id.
*/
if (d->opd_syn_prev_done) {
__u64 correct_id = osp_sync_correct_id(d, rec);
- LASSERT(d->opd_syn_changes > 0);
+ LASSERT(atomic_read(&d->opd_syn_changes) > 0);
LASSERT(correct_id <= d->opd_syn_last_committed_id);
/* NOTE: it's possible to meet same id if
* OST stores few stripes of same file
*/
- if (correct_id > d->opd_syn_last_processed_id) {
- d->opd_syn_last_processed_id = correct_id;
- wake_up(&d->opd_syn_barrier_waitq);
+ while (1) {
+ /* another thread may be trying to set new value */
+ rmb();
+ if (correct_id > d->opd_syn_last_processed_id) {
+ d->opd_syn_last_processed_id = correct_id;
+ wake_up(&d->opd_syn_barrier_waitq);
+ } else
+ break;
}
- d->opd_syn_changes--;
+ atomic_dec(&d->opd_syn_changes);
}
if (rc != 0) {
- d->opd_syn_rpc_in_flight--;
- d->opd_syn_rpc_in_progress--;
+ atomic_dec(&d->opd_syn_rpc_in_flight);
+ atomic_dec(&d->opd_syn_rpc_in_progress);
}
- CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
- d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
- d->opd_syn_rpc_in_progress);
- spin_unlock(&d->opd_syn_lock);
+ CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+ d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+ atomic_read(&d->opd_syn_rpc_in_progress));
/* Delete the invalid record */
if (rc == 1) {
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
rc = llog_cat_cancel_records(env, llh, 1,
- &body->oa.o_lcookie);
+ &jra->jra_lcookie);
if (rc)
CERROR("%s: can't cancel record: %d\n",
obd->obd_name, rc);
} else {
- DEBUG_REQ(D_OTHER, req, "imp_committed = "LPU64,
+ DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
imp->imp_peer_committed_transno);
}
ptlrpc_req_finished(req);
llog_ctxt_put(ctxt);
- LASSERT(d->opd_syn_rpc_in_progress >= done);
- spin_lock(&d->opd_syn_lock);
- d->opd_syn_rpc_in_progress -= done;
- spin_unlock(&d->opd_syn_lock);
+ LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
+ atomic_sub(done, &d->opd_syn_rpc_in_progress);
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
- d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
- d->opd_syn_rpc_in_progress);
+ d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+ atomic_read(&d->opd_syn_rpc_in_progress));
osp_sync_check_for_work(d);
if (osp_sync_can_process_new(d, rec)) {
if (llh == NULL) {
/* ask llog for another record */
- CDEBUG(D_HA, "%lu changes, %u in progress,"
+ CDEBUG(D_HA, "%u changes, %u in progress,"
" %u in flight\n",
- d->opd_syn_changes,
- d->opd_syn_rpc_in_progress,
- d->opd_syn_rpc_in_flight);
+ atomic_read(&d->opd_syn_changes),
+ atomic_read(&d->opd_syn_rpc_in_progress),
+ atomic_read(&d->opd_syn_rpc_in_flight));
return 0;
}
osp_sync_process_record(env, d, llh, rec);
if (rc) {
CERROR("%s: can't initialize env: rc = %d\n",
obd->obd_name, rc);
+
+ spin_lock(&d->opd_syn_lock);
+ thread->t_flags = SVC_STOPPED;
+ spin_unlock(&d->opd_syn_lock);
+ wake_up(&thread->t_ctl_waitq);
+
RETURN(rc);
}
}
rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
+ if (rc < 0) {
+ CERROR("%s: llog process with osp_sync_process_queues "
+ "failed: %d\n", d->opd_obd->obd_name, rc);
+ GOTO(close, rc);
+ }
LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
- "%lu changes, %u in progress, %u in flight: %d\n",
- d->opd_syn_changes, d->opd_syn_rpc_in_progress,
- d->opd_syn_rpc_in_flight, rc);
+ "%u changes, %u in progress, %u in flight: %d\n",
+ atomic_read(&d->opd_syn_changes),
+ atomic_read(&d->opd_syn_rpc_in_progress),
+ atomic_read(&d->opd_syn_rpc_in_flight), rc);
/* we don't expect llog_process_thread() to exit till umount */
LASSERTF(thread->t_flags != SVC_RUNNING,
- "%lu changes, %u in progress, %u in flight\n",
- d->opd_syn_changes, d->opd_syn_rpc_in_progress,
- d->opd_syn_rpc_in_flight);
+ "%u changes, %u in progress, %u in flight\n",
+ atomic_read(&d->opd_syn_changes),
+ atomic_read(&d->opd_syn_rpc_in_progress),
+ atomic_read(&d->opd_syn_rpc_in_flight));
/* wait till all the requests are completed */
count = 0;
- while (d->opd_syn_rpc_in_progress > 0) {
+ while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
osp_sync_process_committed(&env, d);
lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
rc = l_wait_event(d->opd_syn_waitq,
- d->opd_syn_rpc_in_progress == 0,
+ atomic_read(&d->opd_syn_rpc_in_progress) == 0,
&lwi);
if (rc == -ETIMEDOUT)
count++;
LASSERTF(count < 10, "%s: %d %d %sempty\n",
- d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
- d->opd_syn_rpc_in_flight,
+ d->opd_obd->obd_name,
+ atomic_read(&d->opd_syn_rpc_in_progress),
+ atomic_read(&d->opd_syn_rpc_in_flight),
list_empty(&d->opd_syn_committed_there) ? "" : "!");
}
+close:
llog_cat_close(&env, llh);
rc = llog_cleanup(&env, ctxt);
if (rc)
CERROR("can't cleanup llog: %d\n", rc);
out:
- LASSERTF(d->opd_syn_rpc_in_progress == 0,
+ LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
"%s: %d %d %sempty\n",
- d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
- d->opd_syn_rpc_in_flight,
+ d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
+ atomic_read(&d->opd_syn_rpc_in_flight),
list_empty(&d->opd_syn_committed_there) ? "" : "!");
thread->t_flags = SVC_STOPPED;
struct llog_ctxt *ctxt;
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt != NULL)
+ if (ctxt) {
llog_cat_close(env, ctxt->loc_handle);
- llog_cleanup(env, ctxt);
+ llog_cleanup(env, ctxt);
+ }
}
/**
ENTRY;
+ d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
+ d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
+ spin_lock_init(&d->opd_syn_lock);
+ init_waitqueue_head(&d->opd_syn_waitq);
+ init_waitqueue_head(&d->opd_syn_barrier_waitq);
+ thread_set_flags(&d->opd_syn_thread, SVC_INIT);
+ init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
+ INIT_LIST_HEAD(&d->opd_syn_inflight_list);
+ INIT_LIST_HEAD(&d->opd_syn_committed_there);
+
+ if (d->opd_storage->dd_rdonly)
+ RETURN(0);
+
rc = osp_sync_id_traction_init(d);
if (rc)
RETURN(rc);
/*
* Start synchronization thread
*/
- d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
- d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
- spin_lock_init(&d->opd_syn_lock);
- init_waitqueue_head(&d->opd_syn_waitq);
- init_waitqueue_head(&d->opd_syn_barrier_waitq);
- init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
- INIT_LIST_HEAD(&d->opd_syn_inflight_list);
- INIT_LIST_HEAD(&d->opd_syn_committed_there);
-
task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
d->opd_index, d->opd_group);
if (IS_ERR(task)) {
ENTRY;
- thread->t_flags = SVC_STOPPING;
- wake_up(&d->opd_syn_waitq);
- wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+ if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
+ thread->t_flags = SVC_STOPPING;
+ wake_up(&d->opd_syn_waitq);
+ wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
+ }
/*
* unregister transaction callbacks only when sync thread
spin_lock(&tr->otr_lock);
if (likely(txn->oti_current_id > tr->otr_committed_id)) {
- CDEBUG(D_OTHER, "committed: "LPU64" -> "LPU64"\n",
+ CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
tr->otr_committed_id, txn->oti_current_id);
tr->otr_committed_id = txn->oti_current_id;
if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
spin_unlock(&tr->otr_lock);
- CERROR("%s: next "LPU64", last synced "LPU64"\n",
+ CERROR("%s: next %llu, last synced %llu\n",
d->opd_obd->obd_name, tr->otr_next_id,
d->opd_syn_last_used_id);
LBUG();
if (list_empty(&d->opd_syn_ontrack))
list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
spin_unlock(&tr->otr_lock);
- CDEBUG(D_OTHER, "new id "LPU64"\n", id);
+ CDEBUG(D_OTHER, "new id %llu\n", id);
return id;
}