* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
osi->osi_unlink.lur_count = count;
break;
case MDS_SETATTR64_REC:
- rc = fid_ostid_pack(fid, &osi->osi_oi);
+ rc = fid_to_ostid(fid, &osi->osi_oi);
LASSERT(rc == 0);
osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
- osi->osi_setattr.lsr_oid = osi->osi_oi.oi_id;
- osi->osi_setattr.lsr_oseq = osi->osi_oi.oi_seq;
+ osi->osi_setattr.lsr_oi = osi->osi_oi;
LASSERT(attr);
osi->osi_setattr.lsr_uid = attr->la_uid;
osi->osi_setattr.lsr_gid = attr->la_gid;
NULL, th);
llog_ctxt_put(ctxt);
- CDEBUG(D_OTHER, "%s: new record %lu:%lu:%lu/%lu: %d\n",
- d->opd_obd->obd_name,
- (unsigned long) osi->osi_cookie.lgc_lgl.lgl_oid,
- (unsigned long) osi->osi_cookie.lgc_lgl.lgl_oseq,
+ CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n",
+ d->opd_obd->obd_name, POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
(unsigned long) osi->osi_cookie.lgc_lgl.lgl_ogen,
(unsigned long) osi->osi_cookie.lgc_index, rc);
rc = 0;
if (likely(rc == 0)) {
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
d->opd_syn_changes++;
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
}
RETURN(rc);
if (unlikely(req->rq_transno == 0))
return;
- /* XXX: what if request isn't committed for very long? */
+ /* do not do any opd_dyn_rpc_* accounting here
+ * it's done in osp_sync_interpret sooner or later */
+
LASSERT(d);
LASSERT(req->rq_svc_thread == (void *) OSP_JOB_MAGIC);
LASSERT(cfs_list_empty(&req->rq_exp_list));
ptlrpc_request_addref(req);
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
cfs_list_add(&req->rq_exp_list, &d->opd_syn_committed_there);
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
/* XXX: some batching wouldn't hurt */
cfs_waitq_signal(&d->opd_syn_waitq);
{
struct osp_device *d = req->rq_cb_data;
- /* XXX: error handling here */
if (req->rq_svc_thread != (void *) OSP_JOB_MAGIC)
DEBUG_REQ(D_ERROR, req, "bad magic %p\n", req->rq_svc_thread);
LASSERT(req->rq_svc_thread == (void *) OSP_JOB_MAGIC);
ptlrpc_request_addref(req);
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
cfs_list_add(&req->rq_exp_list, &d->opd_syn_committed_there);
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
cfs_waitq_signal(&d->opd_syn_waitq);
} else if (rc) {
"transno "LPU64", rc %d, gen: req %d, imp %d\n",
req->rq_transno, rc, req->rq_import_generation,
imp->imp_generation);
- LASSERT(d->opd_syn_rpc_in_progress > 0);
if (req->rq_transno == 0) {
/* this is the last time we see the request
* if transno is not zero, then commit cb
* will be called at some point */
- cfs_spin_lock(&d->opd_syn_lock);
+ LASSERT(d->opd_syn_rpc_in_progress > 0);
+ spin_lock(&d->opd_syn_lock);
d->opd_syn_rpc_in_progress--;
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
}
cfs_waitq_signal(&d->opd_syn_waitq);
}
LASSERT(d->opd_syn_rpc_in_flight > 0);
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
d->opd_syn_rpc_in_flight--;
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
d->opd_syn_rpc_in_progress);
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa.o_id = rec->lsr_oid;
- body->oa.o_seq = rec->lsr_oseq;
+ body->oa.o_oi = rec->lsr_oi;
body->oa.o_uid = rec->lsr_uid;
body->oa.o_gid = rec->lsr_gid;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa.o_id = rec->lur_oid;
- body->oa.o_seq = rec->lur_oseq;
+ ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
+ ostid_set_id(&body->oa.o_oi, rec->lur_oid);
body->oa.o_misc = rec->lur_count;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
if (rec->lur_count)
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
RETURN(-EFAULT);
- rc = fid_ostid_pack(&rec->lur_fid, &body->oa.o_oi);
+ rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
if (rc < 0)
RETURN(rc);
body->oa.o_misc = rec->lur_count;
/* notice we increment counters before sending RPC, to be consistent
* in RPC interpret callback which may happen very quickly */
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
d->opd_syn_rpc_in_flight++;
d->opd_syn_rpc_in_progress++;
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
switch (rec->lrh_type) {
/* case MDS_UNLINK_REC is kept for compatibility */
}
if (likely(rc == 0)) {
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
if (d->opd_syn_prev_done) {
LASSERT(d->opd_syn_changes > 0);
LASSERT(rec->lrh_id <= d->opd_syn_last_committed_id);
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
d->opd_syn_rpc_in_progress);
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
} else {
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
d->opd_syn_rpc_in_flight--;
d->opd_syn_rpc_in_progress--;
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
}
CDEBUG(D_HA, "found record %x, %d, idx %u, id %u: %d\n",
LASSERT(llh);
CFS_INIT_LIST_HEAD(&list);
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
cfs_list_splice(&d->opd_syn_committed_there, &list);
CFS_INIT_LIST_HEAD(&d->opd_syn_committed_there);
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
cfs_list_for_each_entry_safe(req, tmp, &list, rq_exp_list) {
LASSERT(req->rq_svc_thread == (void *) OSP_JOB_MAGIC);
llog_ctxt_put(ctxt);
LASSERT(d->opd_syn_rpc_in_progress >= done);
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
d->opd_syn_rpc_in_progress -= done;
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
d->opd_syn_rpc_in_progress);
osp_sync_check_for_work(d);
+ /* wake up the thread if requested to stop:
+ * it might be waiting for in-progress to complete */
+ if (unlikely(osp_sync_running(d) == 0))
+ cfs_waitq_signal(&d->opd_syn_waitq);
+
EXIT;
}
struct obd_device *obd = d->opd_obd;
struct llog_handle *llh;
struct lu_env env;
- int rc;
+ int rc, count;
char pname[16];
ENTRY;
RETURN(rc);
}
- sprintf(pname, "osp-syn-%u\n", d->opd_index);
+ sprintf(pname, "osp-syn-%u", d->opd_index);
cfs_daemonize(pname);
- cfs_spin_lock(&d->opd_syn_lock);
+ spin_lock(&d->opd_syn_lock);
thread->t_flags = SVC_RUNNING;
- cfs_spin_unlock(&d->opd_syn_lock);
+ spin_unlock(&d->opd_syn_lock);
cfs_waitq_signal(&thread->t_ctl_waitq);
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
d->opd_syn_changes, d->opd_syn_rpc_in_progress,
d->opd_syn_rpc_in_flight);
- osp_sync_process_committed(&env, d);
+ /* wait till all the requests are completed */
+ count = 0;
+ while (d->opd_syn_rpc_in_progress > 0) {
+ osp_sync_process_committed(&env, d);
+
+ lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
+ rc = l_wait_event(d->opd_syn_waitq,
+ d->opd_syn_rpc_in_progress == 0,
+ &lwi);
+ if (rc == -ETIMEDOUT)
+ count++;
+ LASSERTF(count < 10, "%s: %d %d %sempty\n",
+ d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
+ d->opd_syn_rpc_in_flight,
+ cfs_list_empty(&d->opd_syn_committed_there) ? "" :"!");
+
+ }
llog_cat_close(&env, llh);
rc = llog_cleanup(&env, ctxt);
out:
thread->t_flags = SVC_STOPPED;
- /*
- * there might be a race between osp sync thread sending RPCs and
- * import invalidation. this can result in RPCs being in ptlrpcd
- * till this point. for safete reason let's wait till they are done
- */
- l_wait_event(d->opd_syn_waitq, d->opd_syn_rpc_in_flight == 0, &lwi);
-
cfs_waitq_signal(&thread->t_ctl_waitq);
LASSERTF(d->opd_syn_rpc_in_progress == 0,
"%s: %d %d %sempty\n",
RETURN(0);
}
-static struct llog_operations osp_mds_ost_orig_logops;
-
static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
{
struct osp_thread_info *osi = osp_env_info(env);
- struct llog_handle *lgh;
+ struct llog_handle *lgh = NULL;
struct obd_device *obd = d->opd_obd;
struct llog_ctxt *ctxt;
int rc;
RETURN(rc);
}
- CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
- obd->obd_name, d->opd_index, osi->osi_cid.lci_logid.lgl_oid,
- osi->osi_cid.lci_logid.lgl_oseq,
+ CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
+ obd->obd_name, d->opd_index,
+ POSTID(&osi->osi_cid.lci_logid.lgl_oi),
osi->osi_cid.lci_logid.lgl_ogen);
- osp_mds_ost_orig_logops = llog_osd_ops;
rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, obd,
&osp_mds_ost_orig_logops);
if (rc)
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
LASSERT(ctxt);
- if (likely(osi->osi_cid.lci_logid.lgl_oid != 0)) {
+ if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
LLOG_OPEN_EXISTS);
/* re-create llog if it is missing */
if (rc == -ENOENT)
- osi->osi_cid.lci_logid.lgl_oid = 0;
+ logid_set_id(&osi->osi_cid.lci_logid, 0);
else if (rc < 0)
GOTO(out_cleanup, rc);
}
- if (unlikely(osi->osi_cid.lci_logid.lgl_oid == 0)) {
+ if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
if (rc < 0)
GOTO(out_cleanup, rc);
osi->osi_cid.lci_logid = lgh->lgh_id;
}
+ LASSERT(lgh != NULL);
ctxt->loc_handle = lgh;
- lgh->lgh_logops->lop_add = llog_cat_add_rec;
- lgh->lgh_logops->lop_declare_add = llog_cat_declare_add_rec;
rc = llog_cat_init_and_process(env, lgh);
if (rc)
*/
d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
- cfs_spin_lock_init(&d->opd_syn_lock);
+ spin_lock_init(&d->opd_syn_lock);
cfs_waitq_init(&d->opd_syn_waitq);
cfs_waitq_init(&d->opd_syn_thread.t_ctl_waitq);
CFS_INIT_LIST_HEAD(&d->opd_syn_committed_there);
RETURN(0);
}
-static CFS_DEFINE_MUTEX(osp_id_tracker_sem);
+static DEFINE_MUTEX(osp_id_tracker_sem);
static CFS_LIST_HEAD(osp_id_tracker_list);
static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
return;
- cfs_spin_lock(&tr->otr_lock);
+ spin_lock(&tr->otr_lock);
if (likely(txn->oti_current_id > tr->otr_committed_id)) {
CDEBUG(D_OTHER, "committed: %u -> %u\n",
tr->otr_committed_id, txn->oti_current_id);
cfs_waitq_signal(&d->opd_syn_waitq);
}
}
- cfs_spin_unlock(&tr->otr_lock);
+ spin_unlock(&tr->otr_lock);
}
static int osp_sync_id_traction_init(struct osp_device *d)
LASSERT(d->opd_syn_tracker == NULL);
CFS_INIT_LIST_HEAD(&d->opd_syn_ontrack);
- cfs_mutex_lock(&osp_id_tracker_sem);
+ mutex_lock(&osp_id_tracker_sem);
cfs_list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
if (tr->otr_dev == d->opd_storage) {
LASSERT(cfs_atomic_read(&tr->otr_refcount));
OBD_ALLOC_PTR(tr);
if (tr) {
d->opd_syn_tracker = tr;
- cfs_spin_lock_init(&tr->otr_lock);
+ spin_lock_init(&tr->otr_lock);
tr->otr_dev = d->opd_storage;
tr->otr_next_id = 1;
tr->otr_committed_id = 0;
rc = 0;
}
}
- cfs_mutex_unlock(&osp_id_tracker_sem);
+ mutex_unlock(&osp_id_tracker_sem);
return rc;
}
osp_sync_remove_from_tracker(d);
- cfs_mutex_lock(&osp_id_tracker_sem);
+ mutex_lock(&osp_id_tracker_sem);
if (cfs_atomic_dec_and_test(&tr->otr_refcount)) {
dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
LASSERT(cfs_list_empty(&tr->otr_wakeup_list));
OBD_FREE_PTR(tr);
d->opd_syn_tracker = NULL;
}
- cfs_mutex_unlock(&osp_id_tracker_sem);
+ mutex_unlock(&osp_id_tracker_sem);
EXIT;
}
LASSERT(tr);
/* XXX: we can improve this introducing per-cpu preallocated ids? */
- cfs_spin_lock(&tr->otr_lock);
+ spin_lock(&tr->otr_lock);
if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
- cfs_spin_unlock(&tr->otr_lock);
+ spin_unlock(&tr->otr_lock);
CERROR("%s: next %u, last synced %lu\n",
d->opd_obd->obd_name, tr->otr_next_id,
d->opd_syn_last_used_id);
d->opd_syn_last_used_id = id;
if (cfs_list_empty(&d->opd_syn_ontrack))
cfs_list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
- cfs_spin_unlock(&tr->otr_lock);
+ spin_unlock(&tr->otr_lock);
CDEBUG(D_OTHER, "new id %u\n", (unsigned) id);
return id;
if (cfs_list_empty(&d->opd_syn_ontrack))
return;
- cfs_spin_lock(&tr->otr_lock);
+ spin_lock(&tr->otr_lock);
cfs_list_del_init(&d->opd_syn_ontrack);
- cfs_spin_unlock(&tr->otr_lock);
+ spin_unlock(&tr->otr_lock);
}