* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
__u32 jra_magic;
};
+static int osp_sync_add_commit_cb(const struct lu_env *env,
+ struct osp_device *d, struct thandle *th);
+
static inline int osp_sync_running(struct osp_device *d)
{
return !!(d->opd_sync_thread.t_flags & SVC_RUNNING);
* \retval negative negated errno on error
*/
int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
- llog_op_type type, struct thandle *th)
+ enum llog_op_type type, struct thandle *th)
{
struct osp_thread_info *osi = osp_env_info(env);
struct osp_device *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
RETURN(rc);
}
-/* add the commit callback every second */
-int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d,
- struct thandle *th)
-{
- int add = 0;
-
- /* fast path */
- if (cfs_time_before(cfs_time_current(), d->opd_sync_next_commit_cb))
- return 0;
-
- spin_lock(&d->opd_sync_lock);
- if (cfs_time_aftereq(cfs_time_current(), d->opd_sync_next_commit_cb))
- add = 1;
- d->opd_sync_next_commit_cb = cfs_time_shift(1);
- spin_unlock(&d->opd_sync_lock);
-
- if (add == 0)
- return 0;
- return osp_sync_add_commit_cb(env, d, th);
-}
-
-
/**
* Generate a llog record for a given change.
*
* \retval negative negated errno on error
*/
static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
- const struct lu_fid *fid, llog_op_type type,
+ const struct lu_fid *fid, enum llog_op_type type,
int count, struct thandle *th,
const struct lu_attr *attr)
{
struct osp_thread_info *osi = osp_env_info(env);
struct llog_ctxt *ctxt;
struct thandle *storage_th;
+ bool immediate_commit_cb = false;
int rc;
ENTRY;
LASSERT(attr);
osi->osi_setattr.lsr_uid = attr->la_uid;
osi->osi_setattr.lsr_gid = attr->la_gid;
+ osi->osi_setattr.lsr_layout_version = attr->la_layout_version;
osi->osi_setattr.lsr_projid = attr->la_projid;
osi->osi_setattr.lsr_valid =
((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
+ if (attr->la_valid & LA_LAYOUT_VERSION) {
+ osi->osi_setattr.lsr_valid |= OBD_MD_LAYOUT_VERSION;
+
+ /* FLR: the layout version has to be transferred to
+ * OST objects ASAP, otherwise clients will have to
+ * experience delay to be able to write OST objects. */
+ immediate_commit_cb = true;
+ }
break;
default:
LBUG();
atomic_inc(&d->opd_sync_changes);
}
- rc = osp_sync_add_commit_cb_1s(env, d, th);
+ if (immediate_commit_cb)
+ rc = osp_sync_add_commit_cb(env, d, th);
+ else
+ rc = osp_sync_add_commit_cb_1s(env, d, th);
/* return 0 always here, error case just cause no llog record */
RETURN(0);
}
int osp_sync_add(const struct lu_env *env, struct osp_object *o,
- llog_op_type type, struct thandle *th,
+ enum llog_op_type type, struct thandle *th,
const struct lu_attr *attr)
{
return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
* \retval 0 always
*/
static int osp_sync_interpret(const struct lu_env *env,
- struct ptlrpc_request *req, void *aa, int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
+ struct osp_job_req_args *jra = args;
struct osp_device *d = req->rq_cb_data;
- struct osp_job_req_args *jra = aa;
if (jra->jra_magic != OSP_JOB_MAGIC) {
DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
atomic_read(&req->rq_refcount),
rc, (unsigned) req->rq_transno);
- LASSERT(rc || req->rq_transno);
if (rc == -ENOENT) {
/*
/*
* error happened, we'll try to repeat on next boot ?
*/
- LASSERTF(req->rq_transno == 0 ||
+ LASSERTF(req->rq_transno == 0 || rc == -EIO ||
req->rq_import_generation < imp->imp_generation,
"transno %llu, rc %d, gen: req %d, imp %d\n",
req->rq_transno, rc, req->rq_import_generation,
* \retval ERR_PTR(errno) on error
*/
static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
- ost_cmd_t op,
+ enum ost_cmd op,
const struct req_format *format)
{
struct ptlrpc_request *req;
/* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
* so no bits other than these should be set. */
if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
- OBD_MD_FLPROJID)) != 0) {
+ OBD_MD_FLPROJID | OBD_MD_LAYOUT_VERSION)) != 0) {
CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
d->opd_obd->obd_name, rec->lsr_valid);
/* return 1 on invalid record */
body->oa.o_uid = rec->lsr_uid;
body->oa.o_gid = rec->lsr_gid;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
- if (h->lrh_len > sizeof(struct llog_setattr64_rec))
- body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
- rec)->lsr_projid;
+ if (h->lrh_len > sizeof(struct llog_setattr64_rec)) {
+ struct llog_setattr64_rec_v2 *rec_v2 = (typeof(rec_v2))rec;
+ body->oa.o_projid = rec_v2->lsr_projid;
+ body->oa.o_layout_version = rec_v2->lsr_layout_version;
+ }
/* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
* we assume that both UID and GID are valid in that case. */
else
body->oa.o_valid |= rec->lsr_valid;
+ if (body->oa.o_valid & OBD_MD_LAYOUT_VERSION) {
+ OBD_FAIL_TIMEOUT(OBD_FAIL_FLR_LV_DELAY, cfs_fail_val);
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_FLR_LV_INC)))
+ ++body->oa.o_layout_version;
+ }
+
osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
/* cancel any generation record */
rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
+ /* flush all pending records ASAP */
+ osp_sync_force(env, d);
+
RETURN_EXIT;
}
struct ptlrpc_request *req;
struct llog_ctxt *ctxt;
struct llog_handle *llh;
- struct list_head list;
- int rc, done = 0;
+ int *arr;
+ struct list_head list, *le;
+ struct llog_logid lgid;
+ int rc, i, count = 0, done = 0;
ENTRY;
INIT_LIST_HEAD(&d->opd_sync_committed_there);
spin_unlock(&d->opd_sync_lock);
+ list_for_each(le, &list)
+ count++;
+ if (count > 2)
+ OBD_ALLOC_WAIT(arr, sizeof(int) * count);
+ else
+ arr = NULL;
+ i = 0;
while (!list_empty(&list)) {
struct osp_job_req_args *jra;
/* import can be closing, thus all commit cb's are
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
- rc = llog_cat_cancel_records(env, llh, 1,
- &jra->jra_lcookie);
- if (rc)
- CERROR("%s: can't cancel record: %d\n",
- obd->obd_name, rc);
+ if (arr && (!i ||
+ !memcmp(&jra->jra_lcookie.lgc_lgl, &lgid,
+ sizeof(lgid)))) {
+ if (unlikely(!i))
+ lgid = jra->jra_lcookie.lgc_lgl;
+
+ arr[i++] = jra->jra_lcookie.lgc_index;
+ } else {
+ rc = llog_cat_cancel_records(env, llh, 1,
+ &jra->jra_lcookie);
+ if (rc)
+ CERROR("%s: can't cancel record: %d\n",
+ obd->obd_name, rc);
+ }
} else {
DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
imp->imp_peer_committed_transno);
ptlrpc_req_finished(req);
done++;
}
+ if (arr && i > 0) {
+ rc = llog_cat_cancel_arr_rec(env, llh, &lgid, i, arr);
+
+ if (rc)
+ CERROR("%s: can't cancel %d records rc: %d\n",
+ obd->obd_name, i, rc);
+ else
+ CDEBUG(D_OTHER, "%s: massive records cancel id "DFID\
+ " num %d\n", obd->obd_name,
+ PFID(&lgid.lgl_oi.oi_fid), i);
+ }
+ if (arr)
+ OBD_FREE(arr, sizeof(int) * count);
llog_ctxt_put(ctxt);
LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
atomic_sub(done, &d->opd_sync_rpcs_in_progress);
- CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+ CDEBUG(D_OTHER, "%s: %d in flight, %d in progress, done %d\n",
d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
- atomic_read(&d->opd_sync_rpcs_in_progress));
+ atomic_read(&d->opd_sync_rpcs_in_progress), done);
osp_sync_check_for_work(d);
cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
/* processing reaches catalog bottom */
if (d->opd_sync_last_catalog_idx == size)
- d->opd_sync_last_catalog_idx = 0;
+ d->opd_sync_last_catalog_idx = LLOG_CAT_FIRST;
else if (wrapped)
/* If catalog is wrapped we can`t predict last index of
* processing because lgh_last_idx could be changed.
* Starting form the next one */
d->opd_sync_last_catalog_idx++;
- } while (rc == 0 && (wrapped || d->opd_sync_last_catalog_idx == 0));
+ } while (rc == 0 && (wrapped ||
+ d->opd_sync_last_catalog_idx == LLOG_CAT_FIRST));
if (rc < 0) {
CERROR("%s: llog process with osp_sync_process_queues "
rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
d->opd_storage->dd_lu_dev.ld_obd,
- &osp_mds_ost_orig_logops);
+ &llog_common_cat_ops);
if (rc)
RETURN(rc);
* put a mark in the llog till which we'll be processing
* old records restless
*/
- d->opd_sync_generation.mnt_cnt = cfs_time_current();
- d->opd_sync_generation.conn_cnt = cfs_time_current();
+ d->opd_sync_generation.mnt_cnt = ktime_get_ns();
+ d->opd_sync_generation.conn_cnt = ktime_get_ns();
osi->osi_hdr.lrh_type = LLOG_GEN_REC;
osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
OBD_FREE_PTR(cb);
}
-int osp_sync_add_commit_cb(const struct lu_env *env, struct osp_device *d,
- struct thandle *th)
+static int osp_sync_add_commit_cb(const struct lu_env *env,
+ struct osp_device *d, struct thandle *th)
{
struct osp_last_committed_cb *cb;
struct dt_txn_commit_cb *dcb;
spin_unlock(&d->opd_sync_lock);
rc = dt_trans_cb_add(th, dcb);
- CDEBUG(D_HA, "%s: add commit cb at %llu, next at %llu, rc = %d\n",
- d->opd_obd->obd_name, (unsigned long long) cfs_time_current(),
- (unsigned long long) d->opd_sync_next_commit_cb, rc);
+ CDEBUG(D_HA, "%s: add commit cb at %lluns, next at %lluns, rc = %d\n",
+ d->opd_obd->obd_name, ktime_get_ns(),
+ ktime_to_ns(d->opd_sync_next_commit_cb), rc);
if (likely(rc == 0)) {
lu_device_get(osp2lu_dev(d));
return rc;
}
+/* add the commit callback every second */
+int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d,
+ struct thandle *th)
+{
+ ktime_t now = ktime_get();
+ bool add = false;
+
+ /* fast path */
+ if (ktime_before(now, d->opd_sync_next_commit_cb))
+ return 0;
+
+ spin_lock(&d->opd_sync_lock);
+ if (ktime_before(d->opd_sync_next_commit_cb, now)) {
+ add = true;
+ d->opd_sync_next_commit_cb = ktime_add_ns(now, NSEC_PER_SEC);
+ }
+ spin_unlock(&d->opd_sync_lock);
+
+ if (!add)
+ return 0;
+
+ return osp_sync_add_commit_cb(env, d, th);
+}
+
/*
* generate an empty transaction and hook the commit callback in
* then force transaction commit