struct lov_stripe_md **ea, struct obd_trans_info *oti);
void oscc_init(struct obd_export *exp);
-int lproc_osc_attach_seqstat(struct obd_device *dev);
extern atomic_t osc_max_rpcs_in_flight;
extern atomic_t osc_max_pages_per_rpc;
int osc_rpcd_addref(void);
int osc_rpcd_decref(void);
+void osc_rpcd_add_req(struct ptlrpc_request *req);
+
+#ifdef __KERNEL__
+
+int lproc_osc_attach_seqstat(struct obd_device *dev);
void lproc_osc_hist(struct osc_histogram *oh, unsigned int value);
void lproc_osc_hist_pow2(struct osc_histogram *oh, unsigned int value);
-int lproc_osc_attach_seqstat(struct obd_device *dev);
-void osc_rpcd_add_req(struct ptlrpc_request *req);
+
+#else /* !__KERNEL__ */
+
+#define lproc_osc_attach_seqstat(dev) (0)
+#define lproc_osc_hist(o,v) do{}while(0)
+#define lproc_osc_hist_pow2(o, v) do{}while(0)
+
+#endif
#endif /* OSC_INTERNAL_H */
static DECLARE_MUTEX(osc_rpcd_sem);
static int osc_rpcd_users = 0;
+#ifdef __KERNEL__
void osc_rpcd_add_req(struct ptlrpc_request *req)
{
struct osc_rpcd_ctl *orc = &osc_orc;
RETURN(rc);
}
+#else
+
+static int osc_rpcd_check(struct osc_rpcd_ctl *orc)
+{
+ struct list_head *tmp, *pos;
+ struct ptlrpc_request *req;
+ unsigned long flags;
+ int rc = 0;
+ ENTRY;
+
+ spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags);
+ list_for_each_safe(pos, tmp, &orc->orc_set->set_new_requests) {
+ req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
+ list_del_init(&req->rq_set_chain);
+ ptlrpc_set_add_req(orc->orc_set, req);
+ }
+ spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags);
+
+ if (orc->orc_set->set_remaining) {
+ rc = ptlrpc_check_set(orc->orc_set);
+
+ /* XXX our set never completes, so we prune the completed
+ * reqs after each iteration. boy could this be smarter. */
+ list_for_each_safe(pos, tmp, &orc->orc_set->set_requests) {
+ req = list_entry(pos, struct ptlrpc_request,
+ rq_set_chain);
+ if (req->rq_phase != RQ_PHASE_COMPLETE)
+ continue;
+
+ list_del_init(&req->rq_set_chain);
+ req->rq_set = NULL;
+ ptlrpc_req_finished (req);
+
+ /* if some req get finished, we should return
+ * success */
+ if (rc >= 0)
+ rc = 1;
+ }
+ }
+
+ RETURN(rc);
+}
+
+extern int ptlrpc_set_send_new(struct ptlrpc_request_set *set);
+
+void osc_rpcd_add_req(struct ptlrpc_request *req)
+{
+ struct osc_rpcd_ctl *orc = &osc_orc;
+ unsigned long flags;
+
+ spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags);
+ ptlrpc_set_add_req(orc->orc_set, req);
+ spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags);
+
+ if (ptlrpc_set_send_new(orc->orc_set)) {
+ /* XXX promptly handle error here */
+ CERROR("ERROR!\n");
+ }
+}
+
+void osc_rpcd_wait_io()
+{
+ struct osc_rpcd_ctl *orc = &osc_orc;
+ struct l_wait_info lwi;
+ int timeout;
+
+ timeout = ptlrpc_set_next_timeout(orc->orc_set) * HZ;
+ lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, orc->orc_set);
+ l_wait_event(orc->orc_waitq, osc_rpcd_check(orc), &lwi);
+}
+
+#endif
+
+
+#ifdef __KERNEL__
/* ptlrpc's code paths like to execute in process context, so we have this
* thread which spins on a set which contains the io rpcs. llite specifies
* osc_rpcd's set when it pushes pages down into the oscs */
complete(&orc->orc_finishing);
return 0;
}
+#endif
int osc_rpcd_addref(void)
{
if (orc->orc_set == NULL)
GOTO(out, rc = -ENOMEM);
+#ifdef __KERNEL__
if (kernel_thread(osc_rpcd, orc, 0) < 0) {
ptlrpc_set_destroy(orc->orc_set);
GOTO(out, rc = -ECHILD);
}
wait_for_completion(&orc->orc_starting);
+#endif
out:
up(&osc_rpcd_sem);
RETURN(rc);
if (--osc_rpcd_users == 0) {
set_bit(LIOD_STOP, &orc->orc_flags);
wake_up(&orc->orc_waitq);
+#ifdef __KERNEL__
wait_for_completion(&orc->orc_finishing);
+#endif
ptlrpc_set_destroy(orc->orc_set);
}
up(&osc_rpcd_sem);