static int llog_process_thread(void *arg)
{
- struct llog_process_info *lpi = (struct llog_process_info *)arg;
- struct llog_handle *loghandle = lpi->lpi_loghandle;
- struct llog_log_hdr *llh = loghandle->lgh_hdr;
- struct llog_process_cat_data *cd = lpi->lpi_catdata;
- char *buf;
- __u64 cur_offset = LLOG_CHUNK_SIZE;
- __u64 last_offset;
- int rc = 0, index = 1, last_index;
- int saved_index = 0, last_called_index = 0;
+ struct llog_process_info *lpi = arg;
+ struct llog_handle *loghandle = lpi->lpi_loghandle;
+ struct llog_log_hdr *llh = loghandle->lgh_hdr;
+ struct llog_process_cat_data *cd = lpi->lpi_catdata;
+ char *buf;
+ __u64 cur_offset = LLOG_CHUNK_SIZE;
+ __u64 last_offset;
+ int rc = 0, index = 1, last_index;
+ int saved_index = 0;
+ int last_called_index = 0;
+
+ ENTRY;
LASSERT(llh);
OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
if (!buf) {
lpi->lpi_rc = -ENOMEM;
-#ifdef __KERNEL__
- cfs_complete(&lpi->lpi_completion);
-#endif
- return 0;
+ RETURN(0);
}
- if (!(lpi->lpi_flags & LLOG_FLAG_NODEAMON))
- cfs_daemonize_ctxt("llog_process_thread");
-
if (cd != NULL) {
last_called_index = cd->lpcd_first_idx;
index = cd->lpcd_first_idx + 1;
LASSERT(index <= last_index + 1);
if (index == last_index + 1)
break;
-
+repeat:
CDEBUG(D_OTHER, "index: %d last_index %d\n",
index, last_index);
rec, rec->lrh_type);
if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
- lustre_swab_llog_rec(rec, NULL);
+ lustre_swab_llog_rec(rec);
CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
rec->lrh_type, rec->lrh_index);
- if (rec->lrh_index == 0)
- GOTO(out, 0); /* no more records */
-
- if (rec->lrh_len == 0 || rec->lrh_len >LLOG_CHUNK_SIZE){
+ if (rec->lrh_index == 0) {
+ /* probably another rec just got added? */
+ if (index <= loghandle->lgh_last_idx)
+ GOTO(repeat, rc = 0);
+ GOTO(out, rc = 0); /* no more records */
+ }
+ if (rec->lrh_len == 0 ||
+ rec->lrh_len > LLOG_CHUNK_SIZE) {
CWARN("invalid length %d in llog record for "
"index %d/%d\n", rec->lrh_len,
rec->lrh_index, index);
}
}
- out:
+out:
if (cd != NULL)
cd->lpcd_last_idx = last_called_index;
- if (buf)
- OBD_FREE(buf, LLOG_CHUNK_SIZE);
+
+ OBD_FREE(buf, LLOG_CHUNK_SIZE);
lpi->lpi_rc = rc;
-#ifdef __KERNEL__
- cfs_complete(&lpi->lpi_completion);
-#endif
return 0;
}
-int llog_process_flags(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata, int flags)
+#ifdef __KERNEL__
+static int llog_process_thread_daemonize(void *arg)
+{
+ struct llog_process_info *lpi = arg;
+ struct lu_env env;
+ int rc;
+
+ cfs_daemonize_ctxt("llog_process_thread");
+
+ /* client env has no keys, tags is just 0 */
+ rc = lu_env_init(&env, LCT_LOCAL);
+ if (rc)
+ goto out;
+ lpi->lpi_env = &env;
+
+ rc = llog_process_thread(arg);
+
+ lu_env_fini(&env);
+out:
+ cfs_complete(&lpi->lpi_completion);
+ return rc;
+}
+#endif
+
+int llog_process_or_fork(const struct lu_env *env,
+ struct llog_handle *loghandle,
+ llog_cb_t cb, void *data, void *catdata, bool fork)
{
struct llog_process_info *lpi;
int rc;
+
ENTRY;
OBD_ALLOC_PTR(lpi);
lpi->lpi_cb = cb;
lpi->lpi_cbdata = data;
lpi->lpi_catdata = catdata;
- lpi->lpi_flags = flags;
#ifdef __KERNEL__
- cfs_init_completion(&lpi->lpi_completion);
- rc = cfs_create_thread(llog_process_thread, lpi, CFS_DAEMON_FLAGS);
- if (rc < 0) {
- CERROR("cannot start thread: %d\n", rc);
- OBD_FREE_PTR(lpi);
- RETURN(rc);
- }
- cfs_wait_for_completion(&lpi->lpi_completion);
+ if (fork) {
+ /* The new thread can't use parent env,
+ * init the new one in llog_process_thread_daemonize. */
+ lpi->lpi_env = NULL;
+ cfs_init_completion(&lpi->lpi_completion);
+ rc = cfs_create_thread(llog_process_thread_daemonize, lpi,
+ CFS_DAEMON_FLAGS);
+ if (rc < 0) {
+ CERROR("%s: cannot start thread: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+ OBD_FREE_PTR(lpi);
+ RETURN(rc);
+ }
+ cfs_wait_for_completion(&lpi->lpi_completion);
+ } else {
+ lpi->lpi_env = env;
+ llog_process_thread(lpi);
+ }
#else
- llog_process_thread(lpi);
+ lpi->lpi_env = env;
+ llog_process_thread(lpi);
#endif
rc = lpi->lpi_rc;
OBD_FREE_PTR(lpi);
RETURN(rc);
}
-EXPORT_SYMBOL(llog_process_flags);
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata)
+int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
+ llog_cb_t cb, void *data, void *catdata)
{
- return llog_process_flags(loghandle, cb, data, catdata, 0);
+ return llog_process_or_fork(env, loghandle, cb, data, catdata, false);
}
EXPORT_SYMBOL(llog_process);
}
EXPORT_SYMBOL(llog_get_size);
-int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
- void *data, void *catdata)
+int llog_reverse_process(const struct lu_env *env,
+ struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata)
{
struct llog_log_hdr *llh = loghandle->lgh_hdr;
struct llog_process_cat_data *cd = catdata;
GOTO(out, rc);
rec = buf;
- idx = le32_to_cpu(rec->lrh_index);
- if (idx < index)
- CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
+ idx = rec->lrh_index;
+ CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
while (idx < index) {
- rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ rec = (void *)rec + rec->lrh_len;
+ if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+ lustre_swab_llog_rec(rec);
idx ++;
}
- tail = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*tail);
+ LASSERT(idx == index);
+ tail = (void *)rec + rec->lrh_len - sizeof(*tail);
/* process records in buffer, starting where we found one */
while ((void *)tail > buf) {
- rec = (void *)tail - le32_to_cpu(tail->lrt_len) +
- sizeof(*tail);
-
- if (rec->lrh_index == 0)
- GOTO(out, 0); /* no more records */
+ if (tail->lrt_index == 0)
+ GOTO(out, rc = 0); /* no more records */
/* if set, process the callback on this record */
if (ext2_test_bit(index, llh->llh_bitmap)) {
+ rec = (void *)tail - tail->lrt_len +
+ sizeof(*tail);
+
rc = cb(loghandle, rec, data);
if (rc == LLOG_PROC_BREAK) {
GOTO(out, rc);
- }
+ } else if (rc == LLOG_DEL_RECORD) {
+ llog_cancel_rec(loghandle,
+ tail->lrt_index);
+ rc = 0;
+ }
if (rc)
GOTO(out, rc);
}
--index;
if (index < first_index)
GOTO(out, rc = 0);
- tail = (void *)rec - sizeof(*tail);
+ tail = (void *)tail - tail->lrt_len;
}
}