* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* the first queue is llog itself, once read a change is stored in 2nd queue
* in form of RPC (but RPC isn't fired yet).
*
- * the second queue (opd_syn_waiting_for_commit) holds changes awaiting local
+ * the second queue (opd_sync_waiting_for_commit) holds changes awaiting local
* commit. once change is committed locally it migrates onto 3rd queue.
*
- * the third queue (opd_syn_committed_here) holds changes committed locally,
+ * the third queue (opd_sync_committed_here) holds changes committed locally,
* but not sent to OST (as the pipe can be full). once pipe becomes non-full
* we take a change from the queue and fire corresponded RPC.
*
* once RPC is reported committed by OST (using regular last_committed mech.)
- * the change jumps into 4th queue (opd_syn_committed_there), now we can
+ * the change jumps into 4th queue (opd_sync_committed_there), now we can
* cancel corresponded llog record and release RPC
*
- * opd_syn_changes is a number of unread llog records (to be processed).
+ * opd_sync_changes is a number of unread llog records (to be processed).
* notice this number doesn't include llog records from previous boots.
- * with OSP_SYN_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
+ * with OSP_SYNC_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
*
- * opd_syn_rpc_in_progress is a number of requests in 2-4 queues.
- * we control this with OSP_MAX_IN_PROGRESS so that OSP don't consume
+ * opd_sync_rpcs_in_progress is total number of requests in above 2-4 queues.
+ * we control this with OSP_MAX_RPCS_IN_PROGRESS so that OSP don't consume
* too much memory -- how to deal with 1000th OSTs ? batching could help?
*
- * opd_syn_rpc_in_flight is a number of RPC in flight.
- * we control this with OSP_MAX_IN_FLIGHT
+ * opd_sync_rpcs_in_flight is a number of RPC in flight.
+ * we control this with OSP_MAX_RPCS_IN_FLIGHT
*/
/* XXX: do math to learn reasonable threshold
* should it be ~ number of changes fitting bulk? */
-#define OSP_SYN_THRESHOLD 10
-#define OSP_MAX_IN_FLIGHT 8
-#define OSP_MAX_IN_PROGRESS 4096
+#define OSP_SYNC_THRESHOLD 10
+#define OSP_MAX_RPCS_IN_FLIGHT 8
+#define OSP_MAX_RPCS_IN_PROGRESS 4096
#define OSP_JOB_MAGIC 0x26112005
/** bytes reserved for ptlrpc_replay_req() */
struct ptlrpc_replay_async_args jra_raa;
struct list_head jra_committed_link;
- struct list_head jra_inflight_link;
+ struct list_head jra_in_flight_link;
+ struct llog_cookie jra_lcookie;
__u32 jra_magic;
};
static inline int osp_sync_running(struct osp_device *d)
{
- return !!(d->opd_syn_thread.t_flags & SVC_RUNNING);
+ return !!(d->opd_sync_thread.t_flags & SVC_RUNNING);
}
/**
*/
static inline int osp_sync_stopped(struct osp_device *d)
{
- return !!(d->opd_syn_thread.t_flags & SVC_STOPPED);
+ return !!(d->opd_sync_thread.t_flags & SVC_STOPPED);
}
/*
*/
static inline int osp_sync_has_new_job(struct osp_device *d)
{
- return ((d->opd_syn_last_processed_id < d->opd_syn_last_used_id) &&
- (d->opd_syn_last_processed_id < d->opd_syn_last_committed_id))
- || (d->opd_syn_prev_done == 0);
+ return ((d->opd_sync_last_processed_id < d->opd_sync_last_used_id) &&
+ (d->opd_sync_last_processed_id < d->opd_sync_last_committed_id))
+ || (d->opd_sync_prev_done == 0);
}
-static inline int osp_sync_inflight_conflict(struct osp_device *d,
+static inline int osp_sync_in_flight_conflict(struct osp_device *d,
struct llog_rec_hdr *h)
{
struct osp_job_req_args *jra;
int conflict = 0;
if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
- list_empty(&d->opd_syn_inflight_list))
+ list_empty(&d->opd_sync_in_flight_list))
return conflict;
memset(&ostid, 0, sizeof(ostid));
switch (h->lrh_type) {
- case MDS_UNLINK_REC:
- ostid_set_seq(&ostid, ((struct llog_unlink_rec *)h)->lur_oseq);
- ostid_set_id(&ostid, ((struct llog_unlink_rec *)h)->lur_oid);
+ case MDS_UNLINK_REC: {
+ struct llog_unlink_rec *unlink = (struct llog_unlink_rec *)h;
+
+ ostid_set_seq(&ostid, unlink->lur_oseq);
+ if (ostid_set_id(&ostid, unlink->lur_oid)) {
+ CERROR("Bad %llu to set " DOSTID "\n",
+ (unsigned long long)(unlink->lur_oid),
+ POSTID(&ostid));
+ return 1;
+ }
+ }
break;
case MDS_UNLINK64_REC:
fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
LBUG();
}
- spin_lock(&d->opd_syn_lock);
- list_for_each_entry(jra, &d->opd_syn_inflight_list, jra_inflight_link) {
+ spin_lock(&d->opd_sync_lock);
+ list_for_each_entry(jra, &d->opd_sync_in_flight_list,
+ jra_in_flight_link) {
struct ptlrpc_request *req;
struct ost_body *body;
break;
}
}
- spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_sync_lock);
return conflict;
}
-static inline int osp_sync_low_in_progress(struct osp_device *d)
+static inline int osp_sync_rpcs_in_progress_low(struct osp_device *d)
{
- return atomic_read(&d->opd_syn_rpc_in_progress) <
- d->opd_syn_max_rpc_in_progress;
+ return atomic_read(&d->opd_sync_rpcs_in_progress) <
+ d->opd_sync_max_rpcs_in_progress;
}
/**
* \retval 1 there is room
* \retval 0 no room, the pipe is full
*/
-static inline int osp_sync_low_in_flight(struct osp_device *d)
+static inline int osp_sync_rpcs_in_flight_low(struct osp_device *d)
{
- return atomic_read(&d->opd_syn_rpc_in_flight) <
- d->opd_syn_max_rpc_in_flight;
+ return atomic_read(&d->opd_sync_rpcs_in_flight) <
+ d->opd_sync_max_rpcs_in_flight;
}
/**
* \retval 1 time to wake up
* \retval 0 no need to wake up
*/
-static inline int osp_sync_has_work(struct osp_device *d)
+static inline int osp_sync_has_work(struct osp_device *osp)
{
/* has new/old changes and low in-progress? */
- if (osp_sync_has_new_job(d) && osp_sync_low_in_progress(d) &&
- osp_sync_low_in_flight(d) && d->opd_imp_connected)
+ if (osp_sync_has_new_job(osp) && osp_sync_rpcs_in_progress_low(osp) &&
+ osp_sync_rpcs_in_flight_low(osp) && osp->opd_imp_connected)
return 1;
/* has remotely committed? */
- if (!list_empty(&d->opd_syn_committed_there))
+ if (!list_empty(&osp->opd_sync_committed_there))
return 1;
return 0;
}
-#define osp_sync_check_for_work(d) \
-{ \
- if (osp_sync_has_work(d)) { \
- wake_up(&d->opd_syn_waitq); \
- } \
-}
-
-void __osp_sync_check_for_work(struct osp_device *d)
+void osp_sync_check_for_work(struct osp_device *osp)
{
- osp_sync_check_for_work(d);
+ if (osp_sync_has_work(osp))
+ wake_up(&osp->opd_sync_waitq);
}
static inline __u64 osp_sync_correct_id(struct osp_device *d,
* last_processed and last_committed is less than
* 64745 ^ 2 and less than 2^32 - 1
*/
- __u64 correct_id = d->opd_syn_last_committed_id;
+ __u64 correct_id = d->opd_sync_last_committed_id;
if ((correct_id & 0xffffffffULL) < rec->lrh_id)
correct_id -= 0x100000000ULL;
{
LASSERT(d);
- if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
+ if (unlikely(atomic_read(&d->opd_sync_barrier) > 0))
return 0;
- if (unlikely(osp_sync_inflight_conflict(d, rec)))
+ if (unlikely(osp_sync_in_flight_conflict(d, rec)))
return 0;
- if (!osp_sync_low_in_progress(d))
+ if (!osp_sync_rpcs_in_progress_low(d))
return 0;
- if (!osp_sync_low_in_flight(d))
+ if (!osp_sync_rpcs_in_flight_low(d))
return 0;
if (!d->opd_imp_connected)
return 0;
- if (d->opd_syn_prev_done == 0)
+ if (d->opd_sync_prev_done == 0)
return 1;
- if (atomic_read(&d->opd_syn_changes) == 0)
+ if (atomic_read(&d->opd_sync_changes) == 0)
return 0;
if (rec == NULL ||
- osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
+ osp_sync_correct_id(d, rec) <= d->opd_sync_last_committed_id)
return 1;
return 0;
}
osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
break;
case MDS_SETATTR64_REC:
- osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec);
+ osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec_v2);
break;
default:
LBUG();
LASSERT(attr);
osi->osi_setattr.lsr_uid = attr->la_uid;
osi->osi_setattr.lsr_gid = attr->la_gid;
+ osi->osi_setattr.lsr_projid = attr->la_projid;
osi->osi_setattr.lsr_valid =
((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
- ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0);
+ ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
+ ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
break;
default:
LBUG();
llog_ctxt_put(ctxt);
if (likely(rc >= 0)) {
- CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n",
+ CDEBUG(D_OTHER, "%s: new record "DFID":%x.%u: rc = %d\n",
d->opd_obd->obd_name,
- POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
- (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
- (unsigned long)osi->osi_cookie.lgc_index, rc);
- atomic_inc(&d->opd_syn_changes);
+ PFID(&osi->osi_cookie.lgc_lgl.lgl_oi.oi_fid),
+ osi->osi_cookie.lgc_lgl.lgl_ogen,
+ osi->osi_cookie.lgc_index, rc);
+ atomic_inc(&d->opd_sync_changes);
}
/* return 0 always here, error case just cause no llog record */
RETURN(0);
if (unlikely(req->rq_transno == 0))
return;
- /* do not do any opd_dyn_rpc_* accounting here
+ /* do not do any opd_sync_rpcs_* accounting here
* it's done in osp_sync_interpret sooner or later */
LASSERT(d);
ptlrpc_request_addref(req);
- spin_lock(&d->opd_syn_lock);
- list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
- spin_unlock(&d->opd_syn_lock);
+ spin_lock(&d->opd_sync_lock);
+ list_add(&jra->jra_committed_link, &d->opd_sync_committed_there);
+ spin_unlock(&d->opd_sync_lock);
/* XXX: some batching wouldn't hurt */
- wake_up(&d->opd_syn_waitq);
+ wake_up(&d->opd_sync_waitq);
}
/**
ptlrpc_request_addref(req);
- spin_lock(&d->opd_syn_lock);
- list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
- spin_unlock(&d->opd_syn_lock);
+ spin_lock(&d->opd_sync_lock);
+ list_add(&jra->jra_committed_link,
+ &d->opd_sync_committed_there);
+ spin_unlock(&d->opd_sync_lock);
- wake_up(&d->opd_syn_waitq);
+ wake_up(&d->opd_sync_waitq);
} else if (rc) {
struct obd_import *imp = req->rq_import;
/*
/* this is the last time we see the request
* if transno is not zero, then commit cb
* will be called at some point */
- LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
- atomic_dec(&d->opd_syn_rpc_in_progress);
+ LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) > 0);
+ atomic_dec(&d->opd_sync_rpcs_in_progress);
}
- wake_up(&d->opd_syn_waitq);
+ wake_up(&d->opd_sync_waitq);
} else if (d->opd_pre != NULL &&
unlikely(d->opd_pre_status == -ENOSPC)) {
/*
osp_statfs_need_now(d);
}
- spin_lock(&d->opd_syn_lock);
- list_del_init(&jra->jra_inflight_link);
- spin_unlock(&d->opd_syn_lock);
- LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
- atomic_dec(&d->opd_syn_rpc_in_flight);
- if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
- wake_up(&d->opd_syn_barrier_waitq);
+ spin_lock(&d->opd_sync_lock);
+ list_del_init(&jra->jra_in_flight_link);
+ spin_unlock(&d->opd_sync_lock);
+ LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) > 0);
+ atomic_dec(&d->opd_sync_rpcs_in_flight);
+ if (unlikely(atomic_read(&d->opd_sync_barrier) > 0))
+ wake_up(&d->opd_sync_barrier_waitq);
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
- d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
- atomic_read(&d->opd_syn_rpc_in_progress));
+ d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
+ atomic_read(&d->opd_sync_rpcs_in_progress));
osp_sync_check_for_work(d);
* This is just a tiny helper function to put the request on the sending list
*
* \param[in] d OSP device
+ * \param[in] llh llog handle where the record is stored
+ * \param[in] h llog record
* \param[in] req request
*/
static void osp_sync_send_new_rpc(struct osp_device *d,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *h,
struct ptlrpc_request *req)
{
struct osp_job_req_args *jra;
- LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
- d->opd_syn_max_rpc_in_flight);
+ LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
+ d->opd_sync_max_rpcs_in_flight);
jra = ptlrpc_req_async_args(req);
jra->jra_magic = OSP_JOB_MAGIC;
+ jra->jra_lcookie.lgc_lgl = llh->lgh_id;
+ jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
+ jra->jra_lcookie.lgc_index = h->lrh_index;
INIT_LIST_HEAD(&jra->jra_committed_link);
- spin_lock(&d->opd_syn_lock);
- list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
- spin_unlock(&d->opd_syn_lock);
+ spin_lock(&d->opd_sync_lock);
+ list_add_tail(&jra->jra_in_flight_link, &d->opd_sync_in_flight_list);
+ spin_unlock(&d->opd_sync_lock);
ptlrpcd_add_req(req);
}
* are initialized.
*
* \param[in] d OSP device
- * \param[in] llh llog handle where the record is stored
- * \param[in] h llog record
* \param[in] op type of the change
* \param[in] format request format to be used
*
* \retval ERR_PTR(errno) on error
*/
static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
- struct llog_handle *llh,
- struct llog_rec_hdr *h,
ost_cmd_t op,
const struct req_format *format)
{
struct ptlrpc_request *req;
- struct ost_body *body;
struct obd_import *imp;
int rc;
return ERR_PTR(rc);
}
- /*
- * this is a trick: to save on memory allocations we put cookie
- * into the request, but don't set corresponded flag in o_valid
- * so that OST doesn't interpret this cookie. once the request
- * is committed on OST we take cookie from the request and cancel
- */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body);
- body->oa.o_lcookie.lgc_lgl = llh->lgh_id;
- body->oa.o_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
- body->oa.o_lcookie.lgc_index = h->lrh_index;
-
req->rq_interpret_reply = osp_sync_interpret;
req->rq_commit_cb = osp_sync_request_commit_cb;
req->rq_cb_data = d;
if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
RETURN(1);
- /* lsr_valid can only be 0 or have OBD_MD_{FLUID,FLGID} set,
+
+ /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
* so no bits other than these should be set. */
- if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID)) != 0) {
+ if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
+ OBD_MD_FLPROJID)) != 0) {
CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
- d->opd_obd->obd_name, rec->lsr_valid);
+ d->opd_obd->obd_name, rec->lsr_valid);
/* return 1 on invalid record */
RETURN(1);
}
- req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR);
+ req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
body->oa.o_uid = rec->lsr_uid;
body->oa.o_gid = rec->lsr_gid;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
+ if (h->lrh_len > sizeof(struct llog_setattr64_rec))
+ body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
+ rec)->lsr_projid;
+
/* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
* we assume that both UID and GID are valid in that case. */
if (rec->lsr_valid == 0)
else
body->oa.o_valid |= rec->lsr_valid;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
struct llog_unlink_rec *rec = (struct llog_unlink_rec *)h;
struct ptlrpc_request *req;
struct ost_body *body;
+ int rc;
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK_REC);
- req = osp_sync_new_job(d, llh, h, OST_DESTROY, &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
- ostid_set_id(&body->oa.o_oi, rec->lur_oid);
+ rc = ostid_set_id(&body->oa.o_oi, rec->lur_oid);
+ if (rc)
+ return rc;
body->oa.o_misc = rec->lur_count;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
if (rec->lur_count)
body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
ENTRY;
LASSERT(h->lrh_type == MDS_UNLINK64_REC);
- req = osp_sync_new_job(d, llh, h, OST_DESTROY,
- &RQF_OST_DESTROY);
+ req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
body->oa.o_misc = rec->lur_count;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
OBD_MD_FLOBJCOUNT;
- osp_sync_send_new_rpc(d, req);
+ osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
/* we're waiting for the record generated by this instance */
- LASSERT(d->opd_syn_prev_done == 0);
- if (!memcmp(&d->opd_syn_generation, &gen->lgr_gen,
+ LASSERT(d->opd_sync_prev_done == 0);
+ if (!memcmp(&d->opd_sync_generation, &gen->lgr_gen,
sizeof(gen->lgr_gen))) {
CDEBUG(D_HA, "processed all old entries\n");
- d->opd_syn_prev_done = 1;
+ d->opd_sync_prev_done = 1;
}
/* cancel any generation record */
/* notice we increment counters before sending RPC, to be consistent
* in RPC interpret callback which may happen very quickly */
- atomic_inc(&d->opd_syn_rpc_in_flight);
- atomic_inc(&d->opd_syn_rpc_in_progress);
+ atomic_inc(&d->opd_sync_rpcs_in_flight);
+ atomic_inc(&d->opd_sync_rpcs_in_progress);
switch (rec->lrh_type) {
/* case MDS_UNLINK_REC is kept for compatibility */
/* For all kinds of records, not matter successful or not,
* we should decrease changes and bump last_processed_id.
*/
- if (d->opd_syn_prev_done) {
+ if (d->opd_sync_prev_done) {
__u64 correct_id = osp_sync_correct_id(d, rec);
- LASSERT(atomic_read(&d->opd_syn_changes) > 0);
- LASSERT(correct_id <= d->opd_syn_last_committed_id);
+ LASSERT(atomic_read(&d->opd_sync_changes) > 0);
+ LASSERT(correct_id <= d->opd_sync_last_committed_id);
/* NOTE: it's possible to meet same id if
* OST stores few stripes of same file
*/
while (1) {
/* another thread may be trying to set new value */
rmb();
- if (correct_id > d->opd_syn_last_processed_id) {
- d->opd_syn_last_processed_id = correct_id;
- wake_up(&d->opd_syn_barrier_waitq);
+ if (correct_id > d->opd_sync_last_processed_id) {
+ d->opd_sync_last_processed_id = correct_id;
+ wake_up(&d->opd_sync_barrier_waitq);
} else
break;
}
- atomic_dec(&d->opd_syn_changes);
+ atomic_dec(&d->opd_sync_changes);
}
if (rc != 0) {
- atomic_dec(&d->opd_syn_rpc_in_flight);
- atomic_dec(&d->opd_syn_rpc_in_progress);
+ atomic_dec(&d->opd_sync_rpcs_in_flight);
+ atomic_dec(&d->opd_sync_rpcs_in_progress);
}
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
- d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
- atomic_read(&d->opd_syn_rpc_in_progress));
+ d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
+ atomic_read(&d->opd_sync_rpcs_in_progress));
/* Delete the invalid record */
if (rc == 1) {
ENTRY;
- if (list_empty(&d->opd_syn_committed_there))
+ if (list_empty(&d->opd_sync_committed_there))
return;
/*
LASSERT(llh);
INIT_LIST_HEAD(&list);
- spin_lock(&d->opd_syn_lock);
- list_splice(&d->opd_syn_committed_there, &list);
- INIT_LIST_HEAD(&d->opd_syn_committed_there);
- spin_unlock(&d->opd_syn_lock);
+ spin_lock(&d->opd_sync_lock);
+ list_splice(&d->opd_sync_committed_there, &list);
+ INIT_LIST_HEAD(&d->opd_sync_committed_there);
+ spin_unlock(&d->opd_sync_lock);
while (!list_empty(&list)) {
struct osp_job_req_args *jra;
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
rc = llog_cat_cancel_records(env, llh, 1,
- &body->oa.o_lcookie);
+ &jra->jra_lcookie);
if (rc)
CERROR("%s: can't cancel record: %d\n",
obd->obd_name, rc);
llog_ctxt_put(ctxt);
- LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
- atomic_sub(done, &d->opd_syn_rpc_in_progress);
+ LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
+ atomic_sub(done, &d->opd_sync_rpcs_in_progress);
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
- d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
- atomic_read(&d->opd_syn_rpc_in_progress));
+ d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
+ atomic_read(&d->opd_sync_rpcs_in_progress));
osp_sync_check_for_work(d);
/* wake up the thread if requested to stop:
* it might be waiting for in-progress to complete */
if (unlikely(osp_sync_running(d) == 0))
- wake_up(&d->opd_syn_waitq);
+ wake_up(&d->opd_sync_waitq);
EXIT;
}
if (llh == NULL) {
/* ask llog for another record */
CDEBUG(D_HA, "%u changes, %u in progress,"
- " %u in flight\n",
- atomic_read(&d->opd_syn_changes),
- atomic_read(&d->opd_syn_rpc_in_progress),
- atomic_read(&d->opd_syn_rpc_in_flight));
+ " %u in flight\n",
+ atomic_read(&d->opd_sync_changes),
+ atomic_read(&d->opd_sync_rpcs_in_progress),
+ atomic_read(&d->opd_sync_rpcs_in_flight));
return 0;
}
osp_sync_process_record(env, d, llh, rec);
rec = NULL;
}
- if (d->opd_syn_last_processed_id == d->opd_syn_last_used_id)
+ if (d->opd_sync_last_processed_id == d->opd_sync_last_used_id)
osp_sync_remove_from_tracker(d);
- l_wait_event(d->opd_syn_waitq,
+ l_wait_event(d->opd_sync_waitq,
!osp_sync_running(d) ||
osp_sync_can_process_new(d, rec) ||
- !list_empty(&d->opd_syn_committed_there),
+ !list_empty(&d->opd_sync_committed_there),
&lwi);
} while (1);
}
static int osp_sync_thread(void *_arg)
{
struct osp_device *d = _arg;
- struct ptlrpc_thread *thread = &d->opd_syn_thread;
+ struct ptlrpc_thread *thread = &d->opd_sync_thread;
struct l_wait_info lwi = { 0 };
struct llog_ctxt *ctxt;
struct obd_device *obd = d->opd_obd;
if (rc) {
CERROR("%s: can't initialize env: rc = %d\n",
obd->obd_name, rc);
+
+ spin_lock(&d->opd_sync_lock);
+ thread->t_flags = SVC_STOPPED;
+ spin_unlock(&d->opd_sync_lock);
+ wake_up(&thread->t_ctl_waitq);
+
RETURN(rc);
}
- spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_sync_lock);
thread->t_flags = SVC_RUNNING;
- spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_sync_lock);
wake_up(&thread->t_ctl_waitq);
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
}
LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
"%u changes, %u in progress, %u in flight: %d\n",
- atomic_read(&d->opd_syn_changes),
- atomic_read(&d->opd_syn_rpc_in_progress),
- atomic_read(&d->opd_syn_rpc_in_flight), rc);
+ atomic_read(&d->opd_sync_changes),
+ atomic_read(&d->opd_sync_rpcs_in_progress),
+ atomic_read(&d->opd_sync_rpcs_in_flight), rc);
/* we don't expect llog_process_thread() to exit till umount */
LASSERTF(thread->t_flags != SVC_RUNNING,
"%u changes, %u in progress, %u in flight\n",
- atomic_read(&d->opd_syn_changes),
- atomic_read(&d->opd_syn_rpc_in_progress),
- atomic_read(&d->opd_syn_rpc_in_flight));
+ atomic_read(&d->opd_sync_changes),
+ atomic_read(&d->opd_sync_rpcs_in_progress),
+ atomic_read(&d->opd_sync_rpcs_in_flight));
/* wait till all the requests are completed */
count = 0;
- while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
+ while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) {
osp_sync_process_committed(&env, d);
lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
- rc = l_wait_event(d->opd_syn_waitq,
- atomic_read(&d->opd_syn_rpc_in_progress) == 0,
+ rc = l_wait_event(d->opd_sync_waitq,
+ atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
&lwi);
if (rc == -ETIMEDOUT)
count++;
LASSERTF(count < 10, "%s: %d %d %sempty\n",
d->opd_obd->obd_name,
- atomic_read(&d->opd_syn_rpc_in_progress),
- atomic_read(&d->opd_syn_rpc_in_flight),
- list_empty(&d->opd_syn_committed_there) ? "" : "!");
+ atomic_read(&d->opd_sync_rpcs_in_progress),
+ atomic_read(&d->opd_sync_rpcs_in_flight),
+ list_empty(&d->opd_sync_committed_there) ? "" : "!");
}
if (rc)
CERROR("can't cleanup llog: %d\n", rc);
out:
- LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
- "%s: %d %d %sempty\n",
- d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
- atomic_read(&d->opd_syn_rpc_in_flight),
- list_empty(&d->opd_syn_committed_there) ? "" : "!");
+ LASSERTF(atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
+ "%s: %d %d %sempty\n", d->opd_obd->obd_name,
+ atomic_read(&d->opd_sync_rpcs_in_progress),
+ atomic_read(&d->opd_sync_rpcs_in_flight),
+ list_empty(&d->opd_sync_committed_there) ? "" : "!");
thread->t_flags = SVC_STOPPED;
rc = 0;
}
- CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
+ CDEBUG(D_INFO, "%s: Init llog for %d - catid "DFID":%x\n",
obd->obd_name, d->opd_index,
- POSTID(&osi->osi_cid.lci_logid.lgl_oi),
+ PFID(&osi->osi_cid.lci_logid.lgl_oi.oi_fid),
osi->osi_cid.lci_logid.lgl_ogen);
rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
* put a mark in the llog till which we'll be processing
* old records restless
*/
- d->opd_syn_generation.mnt_cnt = cfs_time_current();
- d->opd_syn_generation.conn_cnt = cfs_time_current();
+ d->opd_sync_generation.mnt_cnt = cfs_time_current();
+ d->opd_sync_generation.conn_cnt = cfs_time_current();
osi->osi_hdr.lrh_type = LLOG_GEN_REC;
osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
- memcpy(&osi->osi_gen.lgr_gen, &d->opd_syn_generation,
+ memcpy(&osi->osi_gen.lgr_gen, &d->opd_sync_generation,
sizeof(osi->osi_gen.lgr_gen));
rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
struct llog_ctxt *ctxt;
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt != NULL)
+ if (ctxt) {
llog_cat_close(env, ctxt->loc_handle);
- llog_cleanup(env, ctxt);
+ llog_cleanup(env, ctxt);
+ }
}
/**
ENTRY;
+ d->opd_sync_max_rpcs_in_flight = OSP_MAX_RPCS_IN_FLIGHT;
+ d->opd_sync_max_rpcs_in_progress = OSP_MAX_RPCS_IN_PROGRESS;
+ spin_lock_init(&d->opd_sync_lock);
+ init_waitqueue_head(&d->opd_sync_waitq);
+ init_waitqueue_head(&d->opd_sync_barrier_waitq);
+ thread_set_flags(&d->opd_sync_thread, SVC_INIT);
+ init_waitqueue_head(&d->opd_sync_thread.t_ctl_waitq);
+ INIT_LIST_HEAD(&d->opd_sync_in_flight_list);
+ INIT_LIST_HEAD(&d->opd_sync_committed_there);
+
+ if (d->opd_storage->dd_rdonly)
+ RETURN(0);
+
rc = osp_sync_id_traction_init(d);
if (rc)
RETURN(rc);
/*
* Start synchronization thread
*/
- d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
- d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
- spin_lock_init(&d->opd_syn_lock);
- init_waitqueue_head(&d->opd_syn_waitq);
- init_waitqueue_head(&d->opd_syn_barrier_waitq);
- init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
- INIT_LIST_HEAD(&d->opd_syn_inflight_list);
- INIT_LIST_HEAD(&d->opd_syn_committed_there);
-
task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
d->opd_index, d->opd_group);
if (IS_ERR(task)) {
GOTO(err_llog, rc);
}
- l_wait_event(d->opd_syn_thread.t_ctl_waitq,
+ l_wait_event(d->opd_sync_thread.t_ctl_waitq,
osp_sync_running(d) || osp_sync_stopped(d), &lwi);
RETURN(0);
*/
int osp_sync_fini(struct osp_device *d)
{
- struct ptlrpc_thread *thread = &d->opd_syn_thread;
+ struct ptlrpc_thread *thread = &d->opd_sync_thread;
ENTRY;
- thread->t_flags = SVC_STOPPING;
- wake_up(&d->opd_syn_waitq);
- wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+ if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
+ thread->t_flags = SVC_STOPPING;
+ wake_up(&d->opd_sync_waitq);
+ wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
+ }
/*
* unregister transaction callbacks only when sync thread
tr->otr_committed_id = txn->oti_current_id;
list_for_each_entry(d, &tr->otr_wakeup_list,
- opd_syn_ontrack) {
- d->opd_syn_last_committed_id = tr->otr_committed_id;
- wake_up(&d->opd_syn_waitq);
+ opd_sync_ontrack) {
+ d->opd_sync_last_committed_id = tr->otr_committed_id;
+ wake_up(&d->opd_sync_waitq);
}
}
spin_unlock(&tr->otr_lock);
LASSERT(d);
LASSERT(d->opd_storage);
- LASSERT(d->opd_syn_tracker == NULL);
- INIT_LIST_HEAD(&d->opd_syn_ontrack);
+ LASSERT(d->opd_sync_tracker == NULL);
+ INIT_LIST_HEAD(&d->opd_sync_ontrack);
mutex_lock(&osp_id_tracker_sem);
list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
if (tr->otr_dev == d->opd_storage) {
LASSERT(atomic_read(&tr->otr_refcount));
atomic_inc(&tr->otr_refcount);
- d->opd_syn_tracker = tr;
+ d->opd_sync_tracker = tr;
found = tr;
break;
}
rc = -ENOMEM;
OBD_ALLOC_PTR(tr);
if (tr) {
- d->opd_syn_tracker = tr;
+ d->opd_sync_tracker = tr;
spin_lock_init(&tr->otr_lock);
tr->otr_dev = d->opd_storage;
tr->otr_next_id = 1;
ENTRY;
LASSERT(d);
- tr = d->opd_syn_tracker;
+ tr = d->opd_sync_tracker;
if (tr == NULL) {
EXIT;
return;
LASSERT(list_empty(&tr->otr_wakeup_list));
list_del(&tr->otr_list);
OBD_FREE_PTR(tr);
- d->opd_syn_tracker = NULL;
+ d->opd_sync_tracker = NULL;
}
mutex_unlock(&osp_id_tracker_sem);
{
struct osp_id_tracker *tr;
- tr = d->opd_syn_tracker;
+ tr = d->opd_sync_tracker;
LASSERT(tr);
/* XXX: we can improve this introducing per-cpu preallocated ids? */
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
tr->otr_next_id = 0xfffffff0;
- if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
+ if (unlikely(tr->otr_next_id <= d->opd_sync_last_used_id)) {
spin_unlock(&tr->otr_lock);
CERROR("%s: next %llu, last synced %llu\n",
d->opd_obd->obd_name, tr->otr_next_id,
- d->opd_syn_last_used_id);
+ d->opd_sync_last_used_id);
LBUG();
}
if (id == 0)
id = tr->otr_next_id++;
- if (id > d->opd_syn_last_used_id)
- d->opd_syn_last_used_id = id;
- if (list_empty(&d->opd_syn_ontrack))
- list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
+ if (id > d->opd_sync_last_used_id)
+ d->opd_sync_last_used_id = id;
+ if (list_empty(&d->opd_sync_ontrack))
+ list_add(&d->opd_sync_ontrack, &tr->otr_wakeup_list);
spin_unlock(&tr->otr_lock);
CDEBUG(D_OTHER, "new id %llu\n", id);
{
struct osp_id_tracker *tr;
- tr = d->opd_syn_tracker;
+ tr = d->opd_sync_tracker;
LASSERT(tr);
- if (list_empty(&d->opd_syn_ontrack))
+ if (list_empty(&d->opd_sync_ontrack))
return;
spin_lock(&tr->otr_lock);
- list_del_init(&d->opd_syn_ontrack);
+ list_del_init(&d->opd_sync_ontrack);
spin_unlock(&tr->otr_lock);
}