#define DEBUG_SUBSYSTEM S_MDS
#include <linux/kthread.h>
+#include <linux/delay.h>
#include <lustre_log.h>
#include <lustre_update.h>
#include "osp_internal.h"
}
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- LASSERT(ctxt);
+ if (!ctxt) {
+ /* for a reason OSP wasn't able to open llog,
+ * just skip logging this operation and hope
+ * LFSCK will fix it eventually */
+ CERROR("logging isn't available, run LFSCK\n");
+ RETURN(0);
+ }
rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
storage_th);
spin_unlock(&d->opd_sync_lock);
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt == NULL)
- RETURN(-ENOMEM);
+ if (ctxt == NULL) {
+ /* see comment in osp_sync_declare_add() */
+ RETURN(0);
+ }
rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
storage_th);
* it's done in osp_sync_interpret sooner or later */
LASSERT(d);
- jra = ptlrpc_req_async_args(req);
+ jra = ptlrpc_req_async_args(jra, req);
LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
LASSERT(list_empty(&jra->jra_committed_link));
struct osp_device *d = req->rq_cb_data;
if (jra->jra_magic != OSP_JOB_MAGIC) {
- DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
+ DEBUG_REQ(D_ERROR, req, "bad magic %u", jra->jra_magic);
LBUG();
}
LASSERT(d);
/*
* error happened, we'll try to repeat on next boot ?
*/
- LASSERTF(req->rq_transno == 0 || rc == -EIO ||
+ LASSERTF(req->rq_transno == 0 || rc == -EIO || rc == -EROFS ||
req->rq_import_generation < imp->imp_generation,
"transno %llu, rc %d, gen: req %d, imp %d\n",
req->rq_transno, rc, req->rq_import_generation,
LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
d->opd_sync_max_rpcs_in_flight);
- jra = ptlrpc_req_async_args(req);
+ jra = ptlrpc_req_async_args(jra, req);
jra->jra_magic = OSP_JOB_MAGIC;
jra->jra_lcookie.lgc_lgl = llh->lgh_id;
jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
struct ptlrpc_request *req;
struct llog_ctxt *ctxt;
struct llog_handle *llh;
- struct list_head list;
- int rc, done = 0;
+ int *arr;
+ LIST_HEAD(list);
+ struct list_head *le;
+ struct llog_logid lgid;
+ int rc, i, count = 0, done = 0;
ENTRY;
llh = ctxt->loc_handle;
LASSERT(llh);
- INIT_LIST_HEAD(&list);
spin_lock(&d->opd_sync_lock);
list_splice(&d->opd_sync_committed_there, &list);
INIT_LIST_HEAD(&d->opd_sync_committed_there);
spin_unlock(&d->opd_sync_lock);
+ list_for_each(le, &list)
+ count++;
+ if (count > 2)
+ OBD_ALLOC_WAIT(arr, sizeof(int) * count);
+ else
+ arr = NULL;
+ i = 0;
while (!list_empty(&list)) {
struct osp_job_req_args *jra;
/* import can be closing, thus all commit cb's are
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
- rc = llog_cat_cancel_records(env, llh, 1,
- &jra->jra_lcookie);
- if (rc)
- CERROR("%s: can't cancel record: %d\n",
- obd->obd_name, rc);
+ if (arr && (!i ||
+ !memcmp(&jra->jra_lcookie.lgc_lgl, &lgid,
+ sizeof(lgid)))) {
+ if (unlikely(!i))
+ lgid = jra->jra_lcookie.lgc_lgl;
+
+ arr[i++] = jra->jra_lcookie.lgc_index;
+ } else {
+ rc = llog_cat_cancel_records(env, llh, 1,
+ &jra->jra_lcookie);
+ if (rc)
+ CERROR("%s: can't cancel record: %d\n",
+ obd->obd_name, rc);
+ }
} else {
DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
imp->imp_peer_committed_transno);
ptlrpc_req_finished(req);
done++;
}
+ if (arr && i > 0) {
+ rc = llog_cat_cancel_arr_rec(env, llh, &lgid, i, arr);
+
+ if (rc)
+ CERROR("%s: can't cancel %d records rc: %d\n",
+ obd->obd_name, i, rc);
+ else
+ CDEBUG(D_OTHER, "%s: massive records cancel id "DFID\
+ " num %d\n", obd->obd_name,
+ PFID(&lgid.lgl_oi.oi_fid), i);
+ }
+ if (arr)
+ OBD_FREE(arr, sizeof(int) * count);
llog_ctxt_put(ctxt);
LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
atomic_sub(done, &d->opd_sync_rpcs_in_progress);
- CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+ CDEBUG(D_OTHER, "%s: %d in flight, %d in progress, done %d\n",
d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
- atomic_read(&d->opd_sync_rpcs_in_progress));
+ atomic_read(&d->opd_sync_rpcs_in_progress), done);
osp_sync_check_for_work(d);
struct osp_device *d = data;
do {
- struct l_wait_info lwi = { 0 };
-
if (!osp_sync_running(d)) {
CDEBUG(D_HA, "stop llog processing\n");
return LLOG_PROC_BREAK;
llh = NULL;
rec = NULL;
}
-
- l_wait_event(d->opd_sync_waitq,
- !osp_sync_running(d) ||
- osp_sync_can_process_new(d, rec) ||
- !list_empty(&d->opd_sync_committed_there),
- &lwi);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK) &&
+ cfs_fail_val != 1)
+ msleep(1 * MSEC_PER_SEC);
+
+ wait_event_idle(d->opd_sync_waitq,
+ !osp_sync_running(d) ||
+ osp_sync_can_process_new(d, rec) ||
+ !list_empty(&d->opd_sync_committed_there));
} while (1);
}
{
struct osp_device *d = _arg;
struct ptlrpc_thread *thread = &d->opd_sync_thread;
- struct l_wait_info lwi = { 0 };
struct llog_ctxt *ctxt;
struct obd_device *obd = d->opd_obd;
struct llog_handle *llh;
spin_unlock(&d->opd_sync_lock);
wake_up(&thread->t_ctl_waitq);
+again:
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
if (ctxt == NULL) {
CERROR("can't get appropriate context\n");
wrapped = (llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
llh->lgh_hdr->llh_count > 1);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CANT_PROCESS_LLOG)) {
+ rc = -EINPROGRESS;
+ goto next;
+ }
rc = llog_cat_process(&env, llh, osp_sync_process_queues, d,
d->opd_sync_last_catalog_idx, 0);
+next:
size = OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ?
cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
/* processing reaches catalog bottom */
if (d->opd_sync_last_catalog_idx == size)
d->opd_sync_last_catalog_idx = LLOG_CAT_FIRST;
- else if (wrapped)
- /* If catalog is wrapped we can`t predict last index of
- * processing because lgh_last_idx could be changed.
- * Starting form the next one */
- d->opd_sync_last_catalog_idx++;
-
+ /* If catalog is wrapped we can`t predict last index of
+ * processing because lgh_last_idx could be changed.
+ * Starting form the next one. Index would be increased
+ * at llog_process_thread
+ */
} while (rc == 0 && (wrapped ||
d->opd_sync_last_catalog_idx == LLOG_CAT_FIRST));
if (rc < 0) {
+ if (rc == -EINPROGRESS) {
+ /* can't access the llog now - OI scrub is trying to fix
+ * underlying issue. let's wait and try again */
+ llog_cat_close(&env, llh);
+ rc = llog_cleanup(&env, ctxt);
+ if (rc)
+ GOTO(out, rc);
+ schedule_timeout_interruptible(cfs_time_seconds(5));
+ goto again;
+ }
+
CERROR("%s: llog process with osp_sync_process_queues "
"failed: %d\n", d->opd_obd->obd_name, rc);
GOTO(close, rc);
while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) {
osp_sync_process_committed(&env, d);
- lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
- rc = l_wait_event(d->opd_sync_waitq,
- atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
- &lwi);
- if (rc == -ETIMEDOUT)
+ rc = wait_event_idle_timeout(
+ d->opd_sync_waitq,
+ atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
+ cfs_time_seconds(5));
+ if (rc == 0)
count++;
LASSERTF(count < 10, "%s: %d %d %sempty\n",
d->opd_obd->obd_name,
LASSERT(ctxt);
if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
- rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
- LLOG_OPEN_EXISTS);
- /* re-create llog if it is missing */
- if (rc == -ENOENT)
+ struct lu_fid fid_temp;
+
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSP_INVALID_LOGID)) {
+ memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
+ logid_set_id(&osi->osi_cid.lci_logid, cfs_fail_val);
+ }
+
+ logid_to_fid(&osi->osi_cid.lci_logid, &fid_temp);
+ if (fid_is_sane(&fid_temp)) {
+ rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid,
+ NULL, LLOG_OPEN_EXISTS);
+
+ /* re-create llog if it is missing */
+ if (rc == -ENOENT)
+ logid_set_id(&osi->osi_cid.lci_logid, 0);
+ else if (rc < 0)
+ GOTO(out_cleanup, rc);
+ } else {
+ CERROR("%s: the catid "DFID" for init llog %d is bad\n",
+ obd->obd_name, PFID(&fid_temp), d->opd_index);
+
+ /* it will be recreated later */
logid_set_id(&osi->osi_cid.lci_logid, 0);
- else if (rc < 0)
- GOTO(out_cleanup, rc);
+ }
}
if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
*/
int osp_sync_init(const struct lu_env *env, struct osp_device *d)
{
- struct l_wait_info lwi = { 0 };
struct task_struct *task;
int rc;
GOTO(err_llog, rc);
}
- l_wait_event(d->opd_sync_thread.t_ctl_waitq,
- osp_sync_running(d) || osp_sync_stopped(d), &lwi);
+ wait_event_idle(d->opd_sync_thread.t_ctl_waitq,
+ osp_sync_running(d) || osp_sync_stopped(d));
RETURN(0);
err_llog: