struct chlg_reader_state {
/* Shortcut to the corresponding OBD device */
struct obd_device *crs_obd;
+ /* Producer thread (if any) */
+ struct task_struct *crs_prod_task;
/* An error occurred that prevents from reading further */
bool crs_err;
/* EOF, no more records available */
bool crs_eof;
- /* Userland reader closed connection */
- bool crs_closed;
/* Desired start position */
__u64 crs_start_offset;
/* Wait queue for the catalog processing thread */
l_wait_event(crs->crs_waitq_prod,
(crs->crs_rec_count < CDEV_CHLG_MAX_PREFETCH ||
- crs->crs_closed), &lwi);
+ kthread_should_stop()), &lwi);
- if (crs->crs_closed)
+ if (kthread_should_stop())
RETURN(LLOG_PROC_BREAK);
len = changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
}
/**
- * Release resources associated to a changelog_reader_state instance.
- *
- * @param crs CRS instance to release.
- */
-static void crs_free(struct chlg_reader_state *crs)
-{
- struct chlg_rec_entry *rec;
- struct chlg_rec_entry *tmp;
-
- list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
- enq_record_delete(rec);
-
- OBD_FREE_PTR(crs);
-}
-
-/**
* Record prefetch thread entry point. Opens the changelog catalog and starts
* reading records.
*
GOTO(err_out, rc);
}
+ crs->crs_eof = true;
+
err_out:
- crs->crs_err = true;
+ if (rc < 0)
+ crs->crs_err = true;
+
wake_up_all(&crs->crs_waitq_cons);
if (llh != NULL)
if (ctx != NULL)
llog_ctxt_put(ctx);
- l_wait_event(crs->crs_waitq_prod, crs->crs_closed, &lwi);
- crs_free(crs);
+ l_wait_event(crs->crs_waitq_prod, kthread_should_stop(), &lwi);
+
RETURN(rc);
}
crs->crs_obd = obd;
crs->crs_err = false;
crs->crs_eof = false;
- crs->crs_closed = false;
mutex_init(&crs->crs_lock);
INIT_LIST_HEAD(&crs->crs_rec_queue);
obd->obd_name, rc);
GOTO(err_crs, rc);
}
+ crs->crs_prod_task = task;
}
file->private_data = crs;
static int chlg_release(struct inode *inode, struct file *file)
{
struct chlg_reader_state *crs = file->private_data;
+ struct chlg_rec_entry *rec;
+ struct chlg_rec_entry *tmp;
- if (file->f_mode & FMODE_READ) {
- crs->crs_closed = true;
- wake_up_all(&crs->crs_waitq_prod);
- } else {
- /* No producer thread, release resource ourselves */
- crs_free(crs);
- }
+ if (crs->crs_prod_task)
+ kthread_stop(crs->crs_prod_task);
+
+ list_for_each_entry_safe(rec, tmp, &crs->crs_rec_queue, enq_linkage)
+ enq_record_delete(rec);
+
+ OBD_FREE_PTR(crs);
return 0;
}
*/
static unsigned int chlg_poll(struct file *file, poll_table *wait)
{
- struct chlg_reader_state *crs = file->private_data;
+ struct chlg_reader_state *crs = file->private_data;
unsigned int mask = 0;
mutex_lock(&crs->crs_lock);