* \retval negative negated errno on error
*/
int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
- llog_op_type type, struct thandle *th)
+ enum llog_op_type type, struct thandle *th)
{
struct osp_thread_info *osi = osp_env_info(env);
struct osp_device *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
}
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- LASSERT(ctxt);
+ if (!ctxt) {
+ /* for a reason OSP wasn't able to open llog,
+ * just skip logging this operation and hope
+ * LFSCK will fix it eventually */
+ CERROR("logging isn't available, run LFSCK\n");
+ RETURN(0);
+ }
rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
storage_th);
* \retval negative negated errno on error
*/
static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
- const struct lu_fid *fid, llog_op_type type,
+ const struct lu_fid *fid, enum llog_op_type type,
int count, struct thandle *th,
const struct lu_attr *attr)
{
spin_unlock(&d->opd_sync_lock);
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt == NULL)
- RETURN(-ENOMEM);
+ if (ctxt == NULL) {
+ /* see comment in osp_sync_declare_add() */
+ RETURN(0);
+ }
rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
storage_th);
}
int osp_sync_add(const struct lu_env *env, struct osp_object *o,
- llog_op_type type, struct thandle *th,
+ enum llog_op_type type, struct thandle *th,
const struct lu_attr *attr)
{
return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
* it's done in osp_sync_interpret sooner or later */
LASSERT(d);
- jra = ptlrpc_req_async_args(req);
+ jra = ptlrpc_req_async_args(jra, req);
LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
LASSERT(list_empty(&jra->jra_committed_link));
* \retval 0 always
*/
static int osp_sync_interpret(const struct lu_env *env,
- struct ptlrpc_request *req, void *aa, int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
+ struct osp_job_req_args *jra = args;
struct osp_device *d = req->rq_cb_data;
- struct osp_job_req_args *jra = aa;
if (jra->jra_magic != OSP_JOB_MAGIC) {
DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
d->opd_sync_max_rpcs_in_flight);
- jra = ptlrpc_req_async_args(req);
+ jra = ptlrpc_req_async_args(jra, req);
jra->jra_magic = OSP_JOB_MAGIC;
jra->jra_lcookie.lgc_lgl = llh->lgh_id;
jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
* \retval ERR_PTR(errno) on error
*/
static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
- ost_cmd_t op,
+ enum ost_cmd op,
const struct req_format *format)
{
struct ptlrpc_request *req;
/* cancel any generation record */
rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
+ /* flush all pending records ASAP */
+ osp_sync_force(env, d);
+
RETURN_EXIT;
}
struct ptlrpc_request *req;
struct llog_ctxt *ctxt;
struct llog_handle *llh;
- struct list_head list;
- int rc, done = 0;
+ int *arr;
+ struct list_head list, *le;
+ struct llog_logid lgid;
+ int rc, i, count = 0, done = 0;
ENTRY;
INIT_LIST_HEAD(&d->opd_sync_committed_there);
spin_unlock(&d->opd_sync_lock);
+ list_for_each(le, &list)
+ count++;
+ if (count > 2)
+ OBD_ALLOC_WAIT(arr, sizeof(int) * count);
+ else
+ arr = NULL;
+ i = 0;
while (!list_empty(&list)) {
struct osp_job_req_args *jra;
/* import can be closing, thus all commit cb's are
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
- rc = llog_cat_cancel_records(env, llh, 1,
- &jra->jra_lcookie);
- if (rc)
- CERROR("%s: can't cancel record: %d\n",
- obd->obd_name, rc);
+ if (arr && (!i ||
+ !memcmp(&jra->jra_lcookie.lgc_lgl, &lgid,
+ sizeof(lgid)))) {
+ if (unlikely(!i))
+ lgid = jra->jra_lcookie.lgc_lgl;
+
+ arr[i++] = jra->jra_lcookie.lgc_index;
+ } else {
+ rc = llog_cat_cancel_records(env, llh, 1,
+ &jra->jra_lcookie);
+ if (rc)
+ CERROR("%s: can't cancel record: %d\n",
+ obd->obd_name, rc);
+ }
} else {
DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
imp->imp_peer_committed_transno);
ptlrpc_req_finished(req);
done++;
}
+ if (arr && i > 0) {
+ rc = llog_cat_cancel_arr_rec(env, llh, &lgid, i, arr);
+
+ if (rc)
+ CERROR("%s: can't cancel %d records rc: %d\n",
+ obd->obd_name, i, rc);
+ else
+ CDEBUG(D_OTHER, "%s: massive records cancel id "DFID\
+ " num %d\n", obd->obd_name,
+ PFID(&lgid.lgl_oi.oi_fid), i);
+ }
+ if (arr)
+ OBD_FREE(arr, sizeof(int) * count);
llog_ctxt_put(ctxt);
LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
atomic_sub(done, &d->opd_sync_rpcs_in_progress);
- CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+ CDEBUG(D_OTHER, "%s: %d in flight, %d in progress, done %d\n",
d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
- atomic_read(&d->opd_sync_rpcs_in_progress));
+ atomic_read(&d->opd_sync_rpcs_in_progress), done);
osp_sync_check_for_work(d);
spin_unlock(&d->opd_sync_lock);
wake_up(&thread->t_ctl_waitq);
+again:
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
if (ctxt == NULL) {
CERROR("can't get appropriate context\n");
wrapped = (llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
llh->lgh_hdr->llh_count > 1);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CANT_PROCESS_LLOG)) {
+ rc = -EINPROGRESS;
+ goto next;
+ }
rc = llog_cat_process(&env, llh, osp_sync_process_queues, d,
d->opd_sync_last_catalog_idx, 0);
+next:
size = OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ?
cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
/* processing reaches catalog bottom */
if (d->opd_sync_last_catalog_idx == size)
- d->opd_sync_last_catalog_idx = 0;
+ d->opd_sync_last_catalog_idx = LLOG_CAT_FIRST;
else if (wrapped)
/* If catalog is wrapped we can`t predict last index of
* processing because lgh_last_idx could be changed.
* Starting form the next one */
d->opd_sync_last_catalog_idx++;
- } while (rc == 0 && (wrapped || d->opd_sync_last_catalog_idx == 0));
+ } while (rc == 0 && (wrapped ||
+ d->opd_sync_last_catalog_idx == LLOG_CAT_FIRST));
if (rc < 0) {
+ if (rc == -EINPROGRESS) {
+ /* can't access the llog now - OI scrub is trying to fix
+ * underlying issue. let's wait and try again */
+ llog_cat_close(&env, llh);
+ rc = llog_cleanup(&env, ctxt);
+ if (rc)
+ GOTO(out, rc);
+ schedule_timeout_interruptible(HZ * 5);
+ goto again;
+ }
+
CERROR("%s: llog process with osp_sync_process_queues "
"failed: %d\n", d->opd_obd->obd_name, rc);
GOTO(close, rc);
rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
d->opd_storage->dd_lu_dev.ld_obd,
- &osp_mds_ost_orig_logops);
+ &llog_common_cat_ops);
if (rc)
RETURN(rc);