* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_MDS
#include <linux/kthread.h>
+#include <linux/delay.h>
#include <lustre_log.h>
#include <lustre_update.h>
#include "osp_internal.h"
-static int osp_sync_id_traction_init(struct osp_device *d);
-static void osp_sync_id_traction_fini(struct osp_device *d);
-static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
-static void osp_sync_remove_from_tracker(struct osp_device *d);
-
/*
* this is a components of OSP implementing synchronization between MDS and OST
* it llogs all interesting changes (currently it's uig/gid change and object
__u32 jra_magic;
};
+static int osp_sync_add_commit_cb(const struct lu_env *env,
+ struct osp_device *d, struct thandle *th);
+
static inline int osp_sync_running(struct osp_device *d)
{
return !!(d->opd_sync_thread.t_flags & SVC_RUNNING);
*/
static inline int osp_sync_has_new_job(struct osp_device *d)
{
- return ((d->opd_sync_last_processed_id < d->opd_sync_last_used_id) &&
- (d->opd_sync_last_processed_id < d->opd_sync_last_committed_id))
- || (d->opd_sync_prev_done == 0);
+ return atomic_read(&d->opd_sync_changes) > 0 ||
+ d->opd_sync_prev_done == 0;
}
static inline int osp_sync_in_flight_conflict(struct osp_device *d,
return correct_id;
}
+
/**
* Check and return ready-for-new status.
*
return 1;
if (atomic_read(&d->opd_sync_changes) == 0)
return 0;
- if (rec == NULL ||
- osp_sync_correct_id(d, rec) <= d->opd_sync_last_committed_id)
+ if (rec == NULL)
+ return 1;
+ /* notice "<" not "<=" */
+ if (osp_sync_correct_id(d, rec) < d->opd_sync_last_committed_id)
return 1;
return 0;
}
* \retval negative negated errno on error
*/
int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
- llog_op_type type, struct thandle *th)
+ enum llog_op_type type, struct thandle *th)
{
struct osp_thread_info *osi = osp_env_info(env);
struct osp_device *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
LBUG();
}
- /* we want ->dt_trans_start() to allocate per-thandle structure */
- storage_th->th_tags |= LCT_OSP_THREAD;
-
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- LASSERT(ctxt);
+ if (!ctxt) {
+ /* for a reason OSP wasn't able to open llog,
+ * just skip logging this operation and hope
+ * LFSCK will fix it eventually */
+ CERROR("logging isn't available, run LFSCK\n");
+ RETURN(0);
+ }
rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
storage_th);
* \retval negative negated errno on error
*/
static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
- const struct lu_fid *fid, llog_op_type type,
+ const struct lu_fid *fid, enum llog_op_type type,
int count, struct thandle *th,
const struct lu_attr *attr)
{
struct osp_thread_info *osi = osp_env_info(env);
struct llog_ctxt *ctxt;
- struct osp_txn_info *txn;
struct thandle *storage_th;
+ bool immediate_commit_cb = false;
int rc;
ENTRY;
LASSERT(attr);
osi->osi_setattr.lsr_uid = attr->la_uid;
osi->osi_setattr.lsr_gid = attr->la_gid;
+ osi->osi_setattr.lsr_layout_version = attr->la_layout_version;
osi->osi_setattr.lsr_projid = attr->la_projid;
osi->osi_setattr.lsr_valid =
((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
+ if (attr->la_valid & LA_LAYOUT_VERSION) {
+ osi->osi_setattr.lsr_valid |= OBD_MD_LAYOUT_VERSION;
+
+ /* FLR: the layout version has to be transferred to
+ * OST objects ASAP, otherwise clients will have to
+ * experience delay to be able to write OST objects. */
+ immediate_commit_cb = true;
+ }
break;
default:
LBUG();
}
- txn = osp_txn_info(&storage_th->th_ctx);
- LASSERT(txn);
+ /* we keep the same id, but increment it when the callback
+ * is registered, so that all records upto the one taken
+ * by the callback are subject to processing */
+ spin_lock(&d->opd_sync_lock);
+ osi->osi_hdr.lrh_id = d->opd_sync_last_used_id;
+ spin_unlock(&d->opd_sync_lock);
- txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
- osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt == NULL)
- RETURN(-ENOMEM);
+ if (ctxt == NULL) {
+ /* see comment in osp_sync_declare_add() */
+ RETURN(0);
+ }
rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
storage_th);
osi->osi_cookie.lgc_index, rc);
atomic_inc(&d->opd_sync_changes);
}
+
+ if (immediate_commit_cb)
+ rc = osp_sync_add_commit_cb(env, d, th);
+ else
+ rc = osp_sync_add_commit_cb_1s(env, d, th);
+
/* return 0 always here, error case just cause no llog record */
RETURN(0);
}
int osp_sync_add(const struct lu_env *env, struct osp_object *o,
- llog_op_type type, struct thandle *th,
+ enum llog_op_type type, struct thandle *th,
const struct lu_attr *attr)
{
return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
* it's done in osp_sync_interpret sooner or later */
LASSERT(d);
- jra = ptlrpc_req_async_args(req);
+ jra = ptlrpc_req_async_args(jra, req);
LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
LASSERT(list_empty(&jra->jra_committed_link));
* \retval 0 always
*/
static int osp_sync_interpret(const struct lu_env *env,
- struct ptlrpc_request *req, void *aa, int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
+ struct osp_job_req_args *jra = args;
struct osp_device *d = req->rq_cb_data;
- struct osp_job_req_args *jra = aa;
if (jra->jra_magic != OSP_JOB_MAGIC) {
- DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
+ DEBUG_REQ(D_ERROR, req, "bad magic %u", jra->jra_magic);
LBUG();
}
LASSERT(d);
CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
atomic_read(&req->rq_refcount),
rc, (unsigned) req->rq_transno);
- LASSERT(rc || req->rq_transno);
if (rc == -ENOENT) {
/*
/*
* error happened, we'll try to repeat on next boot ?
*/
- LASSERTF(req->rq_transno == 0 ||
+ LASSERTF(req->rq_transno == 0 || rc == -EIO ||
req->rq_import_generation < imp->imp_generation,
"transno %llu, rc %d, gen: req %d, imp %d\n",
req->rq_transno, rc, req->rq_import_generation,
LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
d->opd_sync_max_rpcs_in_flight);
- jra = ptlrpc_req_async_args(req);
+ jra = ptlrpc_req_async_args(jra, req);
jra->jra_magic = OSP_JOB_MAGIC;
jra->jra_lcookie.lgc_lgl = llh->lgh_id;
jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
* \retval ERR_PTR(errno) on error
*/
static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
- ost_cmd_t op,
+ enum ost_cmd op,
const struct req_format *format)
{
struct ptlrpc_request *req;
/* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
* so no bits other than these should be set. */
if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
- OBD_MD_FLPROJID)) != 0) {
+ OBD_MD_FLPROJID | OBD_MD_LAYOUT_VERSION)) != 0) {
CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
d->opd_obd->obd_name, rec->lsr_valid);
/* return 1 on invalid record */
body->oa.o_uid = rec->lsr_uid;
body->oa.o_gid = rec->lsr_gid;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
- if (h->lrh_len > sizeof(struct llog_setattr64_rec))
- body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
- rec)->lsr_projid;
+ if (h->lrh_len > sizeof(struct llog_setattr64_rec)) {
+ struct llog_setattr64_rec_v2 *rec_v2 = (typeof(rec_v2))rec;
+ body->oa.o_projid = rec_v2->lsr_projid;
+ body->oa.o_layout_version = rec_v2->lsr_layout_version;
+ }
/* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
* we assume that both UID and GID are valid in that case. */
else
body->oa.o_valid |= rec->lsr_valid;
+ if (body->oa.o_valid & OBD_MD_LAYOUT_VERSION) {
+ OBD_FAIL_TIMEOUT(OBD_FAIL_FLR_LV_DELAY, cfs_fail_val);
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_FLR_LV_INC)))
+ ++body->oa.o_layout_version;
+ }
+
osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
/* cancel any generation record */
rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
+ /* flush all pending records ASAP */
+ osp_sync_force(env, d);
+
RETURN_EXIT;
}
* we should decrease changes and bump last_processed_id.
*/
if (d->opd_sync_prev_done) {
- __u64 correct_id = osp_sync_correct_id(d, rec);
LASSERT(atomic_read(&d->opd_sync_changes) > 0);
- LASSERT(correct_id <= d->opd_sync_last_committed_id);
- /* NOTE: it's possible to meet same id if
- * OST stores few stripes of same file
- */
- while (1) {
- /* another thread may be trying to set new value */
- rmb();
- if (correct_id > d->opd_sync_last_processed_id) {
- d->opd_sync_last_processed_id = correct_id;
- wake_up(&d->opd_sync_barrier_waitq);
- } else
- break;
- }
atomic_dec(&d->opd_sync_changes);
+ wake_up(&d->opd_sync_barrier_waitq);
}
+ atomic64_inc(&d->opd_sync_processed_recs);
if (rc != 0) {
atomic_dec(&d->opd_sync_rpcs_in_flight);
atomic_dec(&d->opd_sync_rpcs_in_progress);
struct ptlrpc_request *req;
struct llog_ctxt *ctxt;
struct llog_handle *llh;
- struct list_head list;
- int rc, done = 0;
+ int *arr;
+ LIST_HEAD(list);
+ struct list_head *le;
+ struct llog_logid lgid;
+ int rc, i, count = 0, done = 0;
ENTRY;
llh = ctxt->loc_handle;
LASSERT(llh);
- INIT_LIST_HEAD(&list);
spin_lock(&d->opd_sync_lock);
list_splice(&d->opd_sync_committed_there, &list);
INIT_LIST_HEAD(&d->opd_sync_committed_there);
spin_unlock(&d->opd_sync_lock);
+ list_for_each(le, &list)
+ count++;
+ if (count > 2)
+ OBD_ALLOC_WAIT(arr, sizeof(int) * count);
+ else
+ arr = NULL;
+ i = 0;
while (!list_empty(&list)) {
struct osp_job_req_args *jra;
/* import can be closing, thus all commit cb's are
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
- rc = llog_cat_cancel_records(env, llh, 1,
- &jra->jra_lcookie);
- if (rc)
- CERROR("%s: can't cancel record: %d\n",
- obd->obd_name, rc);
+ if (arr && (!i ||
+ !memcmp(&jra->jra_lcookie.lgc_lgl, &lgid,
+ sizeof(lgid)))) {
+ if (unlikely(!i))
+ lgid = jra->jra_lcookie.lgc_lgl;
+
+ arr[i++] = jra->jra_lcookie.lgc_index;
+ } else {
+ rc = llog_cat_cancel_records(env, llh, 1,
+ &jra->jra_lcookie);
+ if (rc)
+ CERROR("%s: can't cancel record: %d\n",
+ obd->obd_name, rc);
+ }
} else {
DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
imp->imp_peer_committed_transno);
ptlrpc_req_finished(req);
done++;
}
+ if (arr && i > 0) {
+ rc = llog_cat_cancel_arr_rec(env, llh, &lgid, i, arr);
+
+ if (rc)
+ CERROR("%s: can't cancel %d records rc: %d\n",
+ obd->obd_name, i, rc);
+ else
+ CDEBUG(D_OTHER, "%s: massive records cancel id "DFID\
+ " num %d\n", obd->obd_name,
+ PFID(&lgid.lgl_oi.oi_fid), i);
+ }
+ if (arr)
+ OBD_FREE(arr, sizeof(int) * count);
llog_ctxt_put(ctxt);
LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
atomic_sub(done, &d->opd_sync_rpcs_in_progress);
- CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+ CDEBUG(D_OTHER, "%s: %d in flight, %d in progress, done %d\n",
d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
- atomic_read(&d->opd_sync_rpcs_in_progress));
+ atomic_read(&d->opd_sync_rpcs_in_progress), done);
osp_sync_check_for_work(d);
struct osp_device *d = data;
do {
- struct l_wait_info lwi = { 0 };
-
if (!osp_sync_running(d)) {
CDEBUG(D_HA, "stop llog processing\n");
return LLOG_PROC_BREAK;
llh = NULL;
rec = NULL;
}
-
- if (d->opd_sync_last_processed_id == d->opd_sync_last_used_id)
- osp_sync_remove_from_tracker(d);
-
- l_wait_event(d->opd_sync_waitq,
- !osp_sync_running(d) ||
- osp_sync_can_process_new(d, rec) ||
- !list_empty(&d->opd_sync_committed_there),
- &lwi);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK) &&
+ cfs_fail_val != 1)
+ msleep(1 * MSEC_PER_SEC);
+
+ wait_event_idle(d->opd_sync_waitq,
+ !osp_sync_running(d) ||
+ osp_sync_can_process_new(d, rec) ||
+ !list_empty(&d->opd_sync_committed_there));
} while (1);
}
{
struct osp_device *d = _arg;
struct ptlrpc_thread *thread = &d->opd_sync_thread;
- struct l_wait_info lwi = { 0 };
struct llog_ctxt *ctxt;
struct obd_device *obd = d->opd_obd;
struct llog_handle *llh;
spin_unlock(&d->opd_sync_lock);
wake_up(&thread->t_ctl_waitq);
+again:
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
if (ctxt == NULL) {
CERROR("can't get appropriate context\n");
wrapped = (llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
llh->lgh_hdr->llh_count > 1);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CANT_PROCESS_LLOG)) {
+ rc = -EINPROGRESS;
+ goto next;
+ }
rc = llog_cat_process(&env, llh, osp_sync_process_queues, d,
d->opd_sync_last_catalog_idx, 0);
+next:
size = OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ?
cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
/* processing reaches catalog bottom */
if (d->opd_sync_last_catalog_idx == size)
- d->opd_sync_last_catalog_idx = 0;
- else if (wrapped)
- /* If catalog is wrapped we can`t predict last index of
- * processing because lgh_last_idx could be changed.
- * Starting form the next one */
- d->opd_sync_last_catalog_idx++;
-
- } while (rc == 0 && (wrapped || d->opd_sync_last_catalog_idx == 0));
+ d->opd_sync_last_catalog_idx = LLOG_CAT_FIRST;
+ /* If catalog is wrapped we can`t predict last index of
+ * processing because lgh_last_idx could be changed.
+ * Starting form the next one. Index would be increased
+ * at llog_process_thread
+ */
+ } while (rc == 0 && (wrapped ||
+ d->opd_sync_last_catalog_idx == LLOG_CAT_FIRST));
if (rc < 0) {
+ if (rc == -EINPROGRESS) {
+ /* can't access the llog now - OI scrub is trying to fix
+ * underlying issue. let's wait and try again */
+ llog_cat_close(&env, llh);
+ rc = llog_cleanup(&env, ctxt);
+ if (rc)
+ GOTO(out, rc);
+ schedule_timeout_interruptible(HZ * 5);
+ goto again;
+ }
+
CERROR("%s: llog process with osp_sync_process_queues "
"failed: %d\n", d->opd_obd->obd_name, rc);
GOTO(close, rc);
while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) {
osp_sync_process_committed(&env, d);
- lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
- rc = l_wait_event(d->opd_sync_waitq,
- atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
- &lwi);
- if (rc == -ETIMEDOUT)
+ rc = wait_event_idle_timeout(
+ d->opd_sync_waitq,
+ atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
+ cfs_time_seconds(5));
+ if (rc == 0)
count++;
LASSERTF(count < 10, "%s: %d %d %sempty\n",
d->opd_obd->obd_name,
rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
d->opd_storage->dd_lu_dev.ld_obd,
- &osp_mds_ost_orig_logops);
+ &llog_common_cat_ops);
if (rc)
RETURN(rc);
LASSERT(ctxt);
if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
- rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
- LLOG_OPEN_EXISTS);
- /* re-create llog if it is missing */
- if (rc == -ENOENT)
+ struct lu_fid fid_temp;
+
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSP_INVALID_LOGID)) {
+ memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
+ logid_set_id(&osi->osi_cid.lci_logid, cfs_fail_val);
+ }
+
+ logid_to_fid(&osi->osi_cid.lci_logid, &fid_temp);
+ if (fid_is_sane(&fid_temp)) {
+ rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid,
+ NULL, LLOG_OPEN_EXISTS);
+
+ /* re-create llog if it is missing */
+ if (rc == -ENOENT)
+ logid_set_id(&osi->osi_cid.lci_logid, 0);
+ else if (rc < 0)
+ GOTO(out_cleanup, rc);
+ } else {
+ CERROR("%s: the catid "DFID" for init llog %d is bad\n",
+ obd->obd_name, PFID(&fid_temp), d->opd_index);
+
+ /* it will be recreated later */
logid_set_id(&osi->osi_cid.lci_logid, 0);
- else if (rc < 0)
- GOTO(out_cleanup, rc);
+ }
}
if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
* put a mark in the llog till which we'll be processing
* old records restless
*/
- d->opd_sync_generation.mnt_cnt = cfs_time_current();
- d->opd_sync_generation.conn_cnt = cfs_time_current();
+ d->opd_sync_generation.mnt_cnt = ktime_get_ns();
+ d->opd_sync_generation.conn_cnt = ktime_get_ns();
osi->osi_hdr.lrh_type = LLOG_GEN_REC;
osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
*/
int osp_sync_init(const struct lu_env *env, struct osp_device *d)
{
- struct l_wait_info lwi = { 0 };
struct task_struct *task;
int rc;
if (d->opd_storage->dd_rdonly)
RETURN(0);
- rc = osp_sync_id_traction_init(d);
- if (rc)
- RETURN(rc);
-
/*
* initialize llog storing changes
*/
GOTO(err_llog, rc);
}
- l_wait_event(d->opd_sync_thread.t_ctl_waitq,
- osp_sync_running(d) || osp_sync_stopped(d), &lwi);
+ wait_event_idle(d->opd_sync_thread.t_ctl_waitq,
+ osp_sync_running(d) || osp_sync_stopped(d));
RETURN(0);
err_llog:
osp_sync_llog_fini(env, d);
err_id:
- osp_sync_id_traction_fini(d);
return rc;
}
wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
}
- /*
- * unregister transaction callbacks only when sync thread
- * has finished operations with llog
- */
- osp_sync_id_traction_fini(d);
-
RETURN(0);
}
-static DEFINE_MUTEX(osp_id_tracker_sem);
-static struct list_head osp_id_tracker_list =
- LIST_HEAD_INIT(osp_id_tracker_list);
+struct osp_last_committed_cb {
+ struct dt_txn_commit_cb ospc_cb;
+ struct osp_device *ospc_dev;
+ __u64 ospc_transno;
+};
-/**
- * OSD commit callback.
- *
- * The function is used as a local OSD commit callback to track the highest
- * committed llog record id. see osp_sync_id_traction_init() for the details.
- *
- * \param[in] th local transaction handle committed
- * \param[in] cookie commit callback data (our private structure)
- */
-static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
+void osp_sync_local_commit_cb(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *dcb, int err)
{
- struct osp_id_tracker *tr = cookie;
- struct osp_device *d;
- struct osp_txn_info *txn;
-
- LASSERT(tr);
-
- txn = osp_txn_info(&th->th_ctx);
- if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
- return;
-
- spin_lock(&tr->otr_lock);
- if (likely(txn->oti_current_id > tr->otr_committed_id)) {
- CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
- tr->otr_committed_id, txn->oti_current_id);
- tr->otr_committed_id = txn->oti_current_id;
+ struct osp_last_committed_cb *cb;
+ struct osp_device *d;
- list_for_each_entry(d, &tr->otr_wakeup_list,
- opd_sync_ontrack) {
- d->opd_sync_last_committed_id = tr->otr_committed_id;
- wake_up(&d->opd_sync_waitq);
- }
- }
- spin_unlock(&tr->otr_lock);
-}
+ cb = container_of0(dcb, struct osp_last_committed_cb, ospc_cb);
+ d = cb->ospc_dev;
-/**
- * Initialize commit tracking mechanism.
- *
- * Some setups may have thousands of OSTs and each will be represented by OSP.
- * Meaning order of magnitute many more changes to apply every second. In order
- * to keep the number of commit callbacks low this mechanism was introduced.
- * The mechanism is very similar to transno used by MDT service: it's an single
- * ID stream which can be assigned by any OSP to its llog records. The tricky
- * part is that ID is stored in per-transaction data and re-used by all the OSPs
- * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
- *
- * The function initializes the data used by the tracker described above.
- * A singler tracker per OSD device is created.
- *
- * \param[in] d OSP device
- *
- * \retval 0 on success
- * \retval negative negated errno on error
- */
-static int osp_sync_id_traction_init(struct osp_device *d)
-{
- struct osp_id_tracker *tr, *found = NULL;
- int rc = 0;
+ CDEBUG(D_HA, "%s: %llu committed\n", d->opd_obd->obd_name,
+ cb->ospc_transno);
- LASSERT(d);
- LASSERT(d->opd_storage);
- LASSERT(d->opd_sync_tracker == NULL);
- INIT_LIST_HEAD(&d->opd_sync_ontrack);
-
- mutex_lock(&osp_id_tracker_sem);
- list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
- if (tr->otr_dev == d->opd_storage) {
- LASSERT(atomic_read(&tr->otr_refcount));
- atomic_inc(&tr->otr_refcount);
- d->opd_sync_tracker = tr;
- found = tr;
- break;
- }
- }
+ spin_lock(&d->opd_sync_lock);
+ if (cb->ospc_transno > d->opd_sync_last_committed_id)
+ d->opd_sync_last_committed_id = cb->ospc_transno;
+ spin_unlock(&d->opd_sync_lock);
- if (found == NULL) {
- rc = -ENOMEM;
- OBD_ALLOC_PTR(tr);
- if (tr) {
- d->opd_sync_tracker = tr;
- spin_lock_init(&tr->otr_lock);
- tr->otr_dev = d->opd_storage;
- tr->otr_next_id = 1;
- tr->otr_committed_id = 0;
- atomic_set(&tr->otr_refcount, 1);
- INIT_LIST_HEAD(&tr->otr_wakeup_list);
- list_add(&tr->otr_list, &osp_id_tracker_list);
- tr->otr_tx_cb.dtc_txn_commit =
- osp_sync_tracker_commit_cb;
- tr->otr_tx_cb.dtc_cookie = tr;
- tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
- dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
- rc = 0;
- }
- }
- mutex_unlock(&osp_id_tracker_sem);
+ osp_sync_check_for_work(d);
+ lu_device_put(osp2lu_dev(d));
+ if (atomic_dec_and_test(&d->opd_commits_registered))
+ wake_up(&d->opd_sync_waitq);
- return rc;
+ OBD_FREE_PTR(cb);
}
-/**
- * Release commit tracker.
- *
- * Decrease a refcounter on the tracker used by the given OSP device \a d.
- * If no more users left, then the tracker is released.
- *
- * \param[in] d OSP device
- */
-static void osp_sync_id_traction_fini(struct osp_device *d)
+static int osp_sync_add_commit_cb(const struct lu_env *env,
+ struct osp_device *d, struct thandle *th)
{
- struct osp_id_tracker *tr;
-
- ENTRY;
-
- LASSERT(d);
- tr = d->opd_sync_tracker;
- if (tr == NULL) {
- EXIT;
- return;
- }
+ struct osp_last_committed_cb *cb;
+ struct dt_txn_commit_cb *dcb;
+ int rc = 0;
+
+ OBD_ALLOC_PTR(cb);
+ if (cb == NULL)
+ return -ENOMEM;
+ cb->ospc_dev = d;
+ dcb = &cb->ospc_cb;
+ dcb->dcb_func = osp_sync_local_commit_cb;
+ spin_lock(&d->opd_sync_lock);
+ cb->ospc_transno = ++d->opd_sync_last_used_id;
+ spin_unlock(&d->opd_sync_lock);
- osp_sync_remove_from_tracker(d);
+ rc = dt_trans_cb_add(th, dcb);
+ CDEBUG(D_HA, "%s: add commit cb at %lluns, next at %lluns, rc = %d\n",
+ d->opd_obd->obd_name, ktime_get_ns(),
+ ktime_to_ns(d->opd_sync_next_commit_cb), rc);
- mutex_lock(&osp_id_tracker_sem);
- if (atomic_dec_and_test(&tr->otr_refcount)) {
- dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
- LASSERT(list_empty(&tr->otr_wakeup_list));
- list_del(&tr->otr_list);
- OBD_FREE_PTR(tr);
- d->opd_sync_tracker = NULL;
- }
- mutex_unlock(&osp_id_tracker_sem);
+ if (likely(rc == 0)) {
+ lu_device_get(osp2lu_dev(d));
+ atomic_inc(&d->opd_commits_registered);
+ } else
+ OBD_FREE_PTR(cb);
- EXIT;
+ return rc;
}
-/**
- * Generate a new ID on a tracker.
- *
- * Generates a new ID using the tracker associated with the given OSP device
- * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
- * the wakeup list, so OSP won't miss when a transaction using the ID is
- * committed.
- *
- * \param[in] d OSP device
- * \param[in] id 0 or ID generated previously
- *
- * \retval ID the caller should use
- */
-static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
+/* add the commit callback every second */
+int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d,
+ struct thandle *th)
{
- struct osp_id_tracker *tr;
-
- tr = d->opd_sync_tracker;
- LASSERT(tr);
+ ktime_t now = ktime_get();
+ bool add = false;
- /* XXX: we can improve this introducing per-cpu preallocated ids? */
- spin_lock(&tr->otr_lock);
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
- tr->otr_next_id = 0xfffffff0;
+ /* fast path */
+ if (ktime_before(now, d->opd_sync_next_commit_cb))
+ return 0;
- if (unlikely(tr->otr_next_id <= d->opd_sync_last_used_id)) {
- spin_unlock(&tr->otr_lock);
- CERROR("%s: next %llu, last synced %llu\n",
- d->opd_obd->obd_name, tr->otr_next_id,
- d->opd_sync_last_used_id);
- LBUG();
+ spin_lock(&d->opd_sync_lock);
+ if (ktime_before(d->opd_sync_next_commit_cb, now)) {
+ add = true;
+ d->opd_sync_next_commit_cb = ktime_add_ns(now, NSEC_PER_SEC);
}
+ spin_unlock(&d->opd_sync_lock);
- if (id == 0)
- id = tr->otr_next_id++;
- if (id > d->opd_sync_last_used_id)
- d->opd_sync_last_used_id = id;
- if (list_empty(&d->opd_sync_ontrack))
- list_add(&d->opd_sync_ontrack, &tr->otr_wakeup_list);
- spin_unlock(&tr->otr_lock);
- CDEBUG(D_OTHER, "new id %llu\n", id);
+ if (!add)
+ return 0;
- return id;
+ return osp_sync_add_commit_cb(env, d, th);
}
-/**
- * Stop to propagate commit status to OSP.
- *
- * If the OSP does not have any llog records she's waiting to commit, then
- * it is possible to unsubscribe from wakeups from the tracking using this
- * method.
- *
- * \param[in] d OSP device not willing to wakeup
+/*
+ * generate an empty transaction and hook the commit callback in
+ * then force transaction commit
*/
-static void osp_sync_remove_from_tracker(struct osp_device *d)
+void osp_sync_force(const struct lu_env *env, struct osp_device *d)
{
- struct osp_id_tracker *tr;
-
- tr = d->opd_sync_tracker;
- LASSERT(tr);
+ struct thandle *th;
+ int rc;
- if (list_empty(&d->opd_sync_ontrack))
+ th = dt_trans_create(env, d->opd_storage);
+ if (IS_ERR(th)) {
+ CERROR("%s: can't sync\n", d->opd_obd->obd_name);
return;
+ }
+ rc = dt_trans_start_local(env, d->opd_storage, th);
+ if (rc == 0) {
+ CDEBUG(D_OTHER, "%s: sync forced, %d changes\n",
+ d->opd_obd->obd_name,
+ atomic_read(&d->opd_sync_changes));
+ rc = osp_sync_add_commit_cb(env, d, th);
+ dt_trans_stop(env, d->opd_storage, th);
+ }
- spin_lock(&tr->otr_lock);
- list_del_init(&d->opd_sync_ontrack);
- spin_unlock(&tr->otr_lock);
+ dt_commit_async(env, d->opd_storage);
}
-