The osp_sync_process_committed() cancels llog records one by one.
For each cancel it do open,transaction,mutex,write, etc. But most
of all cancels belongs to a single llog file. So they could
be combined.
The patch adds functions for cancelling array of indexes for a
llog file. And adds behavior and calls at
osp_sync_process_committed().
Signed-off-by: Alexander Boyko <c17825@cray.com>
Cray-bug-id: LUS-6836
Change-Id: I4f461687021b3f76595d403cdd0bb6aba8d93b53
Reviewed-on: https://review.whamcloud.com/34179
Tested-by: Jenkins
Reviewed-by: Sergey Cheremencev <c17829@cray.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andriy Skulysh <c17819@cray.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
void *data, void *catdata);
int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
int index);
+int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
+ int num, int *index);
int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
struct llog_handle **lgh, struct llog_logid *logid,
char *name, enum llog_open_param open_param);
struct llog_rec_hdr *rec, struct thandle *th);
int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
struct llog_rec_hdr *rec, struct llog_cookie *reccookie);
+int llog_cat_cancel_arr_rec(const struct lu_env *env,
+ struct llog_handle *cathandle,
+ struct llog_logid *lgl, int count, int *index);
int llog_cat_cancel_records(const struct lu_env *env,
struct llog_handle *cathandle, int count,
struct llog_cookie *cookies);
EXPORT_SYMBOL(llog_destroy);
/* returns negative on error; 0 if success; 1 if success & log destroyed */
-int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
- int index)
+int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
+ int num, int *index)
{
struct llog_thread_info *lgi = llog_info(env);
struct dt_device *dt;
struct llog_log_hdr *llh;
struct thandle *th;
__u32 tmp_lgc_index;
- int rc;
+ int rc, i = 0;
int rc1;
bool subtract_count = false;
llh = loghandle->lgh_hdr;
- CDEBUG(D_RPCTRACE, "Canceling %d in log "DFID"\n", index,
- PFID(&loghandle->lgh_id.lgl_oi.oi_fid));
-
- if (index == 0) {
- CERROR("Can't cancel index 0 which is header\n");
- RETURN(-EINVAL);
- }
+ CDEBUG(D_RPCTRACE, "Canceling %d records, first %d in log "DFID"\n",
+ num, index[0], PFID(&loghandle->lgh_id.lgl_oi.oi_fid));
dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
- rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, index, th);
+ rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, 0, th);
if (rc < 0)
GOTO(out_trans, rc);
down_write(&loghandle->lgh_lock);
/* clear bitmap */
mutex_lock(&loghandle->lgh_hdr_mutex);
- if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
- CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
- GOTO(out_unlock, rc);
+ for (i = 0; i < num; ++i) {
+ if (index[i] == 0) {
+ CERROR("Can't cancel index 0 which is header\n");
+ GOTO(out_unlock, rc = -EINVAL);
+ }
+ if (!ext2_clear_bit(index[i], LLOG_HDR_BITMAP(llh))) {
+ CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n",
+ index[i]);
+ GOTO(out_unlock, rc = -ENOENT);
+ }
}
-
- loghandle->lgh_hdr->llh_count--;
+ loghandle->lgh_hdr->llh_count -= num;
subtract_count = true;
/* Since llog_process_thread use lgi_cookie, it`s better to save them
tmp_lgc_index = lgi->lgi_cookie.lgc_index;
/* Pass this index to llog_osd_write_rec(), which will use the index
* to only update the necesary bitmap. */
- lgi->lgi_cookie.lgc_index = index;
+ lgi->lgi_cookie.lgc_index = index[0];
/* update header */
- rc = llog_write_rec(env, loghandle, &llh->llh_hdr, &lgi->lgi_cookie,
- LLOG_HEADER_IDX, th);
+ rc = llog_write_rec(env, loghandle, &llh->llh_hdr, (num != 1 ? NULL :
+ &lgi->lgi_cookie), LLOG_HEADER_IDX, th);
lgi->lgi_cookie.lgc_index = tmp_lgc_index;
if (rc != 0)
rc1 = dt_trans_stop(env, dt, th);
if (rc == 0)
rc = rc1;
- if (rc < 0 && subtract_count) {
+ if (rc < 0) {
mutex_lock(&loghandle->lgh_hdr_mutex);
- loghandle->lgh_hdr->llh_count++;
- ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
+ if (subtract_count)
+ loghandle->lgh_hdr->llh_count += num;
+ for (i = i - 1; i >= 0; i--)
+ ext2_set_bit(index[i], LLOG_HDR_BITMAP(llh));
mutex_unlock(&loghandle->lgh_hdr_mutex);
}
RETURN(rc);
}
+int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
+ int index)
+{
+ return llog_cancel_arr_rec(env, loghandle, 1, &index);
+}
+
int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
const struct obd_uuid *uuid)
{
}
EXPORT_SYMBOL(llog_cat_add);
+int llog_cat_cancel_arr_rec(const struct lu_env *env,
+ struct llog_handle *cathandle,
+ struct llog_logid *lgl, int count, int *index)
+{
+ struct llog_handle *loghandle;
+ int rc;
+
+ ENTRY;
+ rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
+ if (rc) {
+ CDEBUG(D_HA, "%s: cannot find llog for handle "DFID":%x"
+ ": rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name,
+ PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
+ RETURN(rc);
+ }
+
+ if ((cathandle->lgh_ctxt->loc_flags &
+ LLOG_CTXT_FLAG_NORMAL_FID) && !llog_exist(loghandle)) {
+ /* For update log, some of loghandles of cathandle
+ * might not exist because remote llog creation might
+ * be failed, so let's skip the record cancellation
+ * for these non-exist llogs.
+ */
+ rc = -ENOENT;
+ CDEBUG(D_HA, "%s: llog "DFID":%x does not exist"
+ ": rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name,
+ PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
+
+ llog_handle_put(loghandle);
+ RETURN(rc);
+ }
+
+ rc = llog_cancel_arr_rec(env, loghandle, count, index);
+ if (rc == LLOG_DEL_PLAIN) { /* log has been destroyed */
+ int cat_index;
+
+ cat_index = loghandle->u.phd.phd_cookie.lgc_index;
+ rc = llog_cat_cleanup(env, cathandle, loghandle, cat_index);
+ if (rc)
+ CERROR("%s: fail to cancel catalog record: rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name, rc);
+ rc = 0;
+
+ }
+ llog_handle_put(loghandle);
+
+ if (rc)
+ CERROR("%s: fail to cancel %d llog-records: rc = %d\n",
+ cathandle->lgh_ctxt->loc_obd->obd_name, count,
+ rc);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_cancel_arr_rec);
+
/* For each cookie in the cookie array, we clear the log in-use bit and either:
* - the log is empty, so mark it free in the catalog header and delete it
* - the log is not empty, just write out the log header
struct llog_handle *cathandle, int count,
struct llog_cookie *cookies)
{
- int i, index, rc = 0, failed = 0;
+ int i, rc = 0, failed = 0;
ENTRY;
for (i = 0; i < count; i++, cookies++) {
- struct llog_handle *loghandle;
- struct llog_logid *lgl = &cookies->lgc_lgl;
- int lrc;
-
- rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
- if (rc) {
- CDEBUG(D_HA, "%s: cannot find llog for handle "DFID":%x"
- ": rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name,
- PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
- failed++;
- continue;
- }
-
- if ((cathandle->lgh_ctxt->loc_flags &
- LLOG_CTXT_FLAG_NORMAL_FID) && !llog_exist(loghandle)) {
- /* For update log, some of loghandles of cathandle
- * might not exist because remote llog creation might
- * be failed, so let's skip the record cancellation
- * for these non-exist llogs.
- */
- lrc = -ENOENT;
- CDEBUG(D_HA, "%s: llog "DFID":%x does not exist"
- ": rc = %d\n",
- cathandle->lgh_ctxt->loc_obd->obd_name,
- PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, lrc);
- failed++;
- if (rc == 0)
- rc = lrc;
- continue;
- }
+ int lrc;
- lrc = llog_cancel_rec(env, loghandle, cookies->lgc_index);
- if (lrc == LLOG_DEL_PLAIN) { /* log has been destroyed */
- index = loghandle->u.phd.phd_cookie.lgc_index;
- lrc = llog_cat_cleanup(env, cathandle, loghandle,
- index);
- if (rc == 0)
- rc = lrc;
- } else if (lrc == -ENOENT) {
- if (rc == 0) /* ENOENT shouldn't rewrite any error */
- rc = lrc;
- } else if (lrc < 0) {
+ lrc = llog_cat_cancel_arr_rec(env, cathandle, &cookies->lgc_lgl,
+ 1, &cookies->lgc_index);
+ if (lrc) {
failed++;
- if (rc == 0)
+ if (!rc)
rc = lrc;
}
- llog_handle_put(loghandle);
}
- if (rc)
+ if (failed)
CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
rc);
-
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_cancel_records);
struct ptlrpc_request *req;
struct llog_ctxt *ctxt;
struct llog_handle *llh;
- struct list_head list;
- int rc, done = 0;
+ int *arr;
+ struct list_head list, *le;
+ struct llog_logid lgid;
+ int rc, i, count = 0, done = 0;
ENTRY;
INIT_LIST_HEAD(&d->opd_sync_committed_there);
spin_unlock(&d->opd_sync_lock);
+ list_for_each(le, &list)
+ count++;
+ if (count > 2)
+ OBD_ALLOC_WAIT(arr, sizeof(int) * count);
+ else
+ arr = NULL;
+ i = 0;
while (!list_empty(&list)) {
struct osp_job_req_args *jra;
/* import can be closing, thus all commit cb's are
* called we can check committness directly */
if (req->rq_import_generation == imp->imp_generation) {
- rc = llog_cat_cancel_records(env, llh, 1,
- &jra->jra_lcookie);
- if (rc)
- CERROR("%s: can't cancel record: %d\n",
- obd->obd_name, rc);
+ if (arr && (!i ||
+ !memcmp(&jra->jra_lcookie.lgc_lgl, &lgid,
+ sizeof(lgid)))) {
+ if (unlikely(!i))
+ lgid = jra->jra_lcookie.lgc_lgl;
+
+ arr[i++] = jra->jra_lcookie.lgc_index;
+ } else {
+ rc = llog_cat_cancel_records(env, llh, 1,
+ &jra->jra_lcookie);
+ if (rc)
+ CERROR("%s: can't cancel record: %d\n",
+ obd->obd_name, rc);
+ }
} else {
DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
imp->imp_peer_committed_transno);
ptlrpc_req_finished(req);
done++;
}
+ if (arr && i > 0) {
+ rc = llog_cat_cancel_arr_rec(env, llh, &lgid, i, arr);
+
+ if (rc)
+ CERROR("%s: can't cancel %d records rc: %d\n",
+ obd->obd_name, i, rc);
+ else
+ CDEBUG(D_OTHER, "%s: massive records cancel id "DFID\
+ " num %d\n", obd->obd_name,
+ PFID(&lgid.lgl_oi.oi_fid), i);
+ }
+ if (arr)
+ OBD_FREE(arr, sizeof(int) * count);
llog_ctxt_put(ctxt);
LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
atomic_sub(done, &d->opd_sync_rpcs_in_progress);
- CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+ CDEBUG(D_OTHER, "%s: %d in flight, %d in progress, done %d\n",
d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
- atomic_read(&d->opd_sync_rpcs_in_progress));
+ atomic_read(&d->opd_sync_rpcs_in_progress), done);
osp_sync_check_for_work(d);