there. Actual usage will be in CLIO.
b=16450
Description: Kill obd_set_fail_loc().
Details : Remove unused code.
+Severity : normal
+Bugzilla : 16450
+Description: Add special type for ptlrpc_request interpret functions.
+Details : Add lu_env parameter to ->rq_interpreter call-back. NULL is passed
+ there. Actual usage will be in CLIO.
+
--------------------------------------------------------------------------------
2007-08-10 Cluster File Systems, Inc. <info@clusterfs.com>
RQ_PHASE_COMPLETE = 0xebc0de04,
};
+/** Type of request interpreter call-back */
+typedef int (*ptlrpc_interpterer_t)(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *arg, int rc);
+
struct ptlrpc_request_pool {
spinlock_t prp_lock;
struct list_head prp_req_list; /* list of ptlrpc_request structs */
* Thread name used in cfs_daemonize()
*/
char pc_name[16];
+ /**
+ * Environment for request interpreters to run in.
+ */
+ struct lu_env pc_env;
#ifndef __KERNEL__
/**
* Async rpcs flag to make sure that ptlrpcd_check() is called only
int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
set_interpreter_func fn, void *data);
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
-int ptlrpc_check_set(struct ptlrpc_request_set *set);
+int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
int ptlrpc_set_wait(struct ptlrpc_request_set *);
int ptlrpc_expired_set(void *data);
void ptlrpc_interrupted_set(void *data);
return rc;
}
-static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
+static int ldlm_cb_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *data, int rc)
{
struct ldlm_cb_set_arg *arg;
struct ldlm_lock *lock;
return LDLM_ITER_CONTINUE;
}
-static int replay_lock_interpret(struct ptlrpc_request *req,
+static int replay_lock_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
struct ldlm_async_args *aa, int rc)
{
struct lustre_handle old_hash_key;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
aa->lock_handle = body->lock_handle[0];
- req->rq_interpret_reply = replay_lock_interpret;
+ req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
ptlrpcd_add_req(req);
RETURN(0);
RETURN(rc);
}
-static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
+static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
void *unused, int rc)
{
struct obd_export *exp = req->rq_async_args.pointer_arg[0];
if (set) {
ptlrpc_set_add_req(set, req);
- ptlrpc_check_set(set);
+ ptlrpc_check_set(NULL, set);
} else {
rc = ptlrpc_queue_wait(req);
ptlrpc_req_finished(req);
RETURN(rc);
}
-static int mdc_interpret_renew_capa(struct ptlrpc_request *req, void *unused,
+static int mdc_interpret_renew_capa(const struct lu_env *env,
+ struct ptlrpc_request *req, void *unused,
int status)
{
struct obd_capa *oc = req->rq_async_args.pointer_arg[0];
CERROR("Failed to evict nid %s from OSTs: rc %d\n",
tmpbuf + 4, rc);
- ptlrpc_check_set(set);
+ ptlrpc_check_set(NULL, set);
}
/* See the comments in function lprocfs_wr_evict_client()
#include <obd_class.h>
#include "osc_internal.h"
-static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
+static int osc_interpret_create(const struct lu_env *env,
+ struct ptlrpc_request *req, void *data, int rc)
{
struct osc_creator *oscc;
struct ost_body *body = NULL;
case -EAGAIN:
/* valid race delorphan vs create, or somthing after resend */
spin_unlock(&oscc->oscc_lock);
- DEBUG_REQ(D_INODE, req, "Got EGAIN - resend \n");
+ DEBUG_REQ(D_INODE, req, "Got EAGAIN - resend \n");
break;
case -ENOSPC:
case -EROFS:
extern quota_interface_t osc_quota_interface;
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
-static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
+static int brw_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *data, int rc);
int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
;
}
-static int osc_getattr_interpret(struct ptlrpc_request *req,
+static int osc_getattr_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
struct osc_async_args *aa, int rc)
{
struct ost_body *body;
osc_pack_req_body(req, oinfo);
ptlrpc_request_set_replen(req);
- req->rq_interpret_reply = osc_getattr_interpret;
+ req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
RETURN(rc);
}
-static int osc_setattr_interpret(struct ptlrpc_request *req,
+static int osc_setattr_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
struct osc_async_args *aa, int rc)
{
struct ost_body *body;
oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
}
- /* do mds to ost setattr asynchronouly */
+ /* do mds to ost setattr asynchronously */
if (!rqset) {
/* Do not wait for response. */
ptlrpcd_add_req(req);
} else {
- req->rq_interpret_reply = osc_setattr_interpret;
+ req->rq_interpret_reply =
+ (ptlrpc_interpterer_t)osc_setattr_interpret;
CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
RETURN(rc);
}
-static int osc_punch_interpret(struct ptlrpc_request *req,
+static int osc_punch_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
struct osc_async_args *aa, int rc)
{
struct ost_body *body;
ptlrpc_request_set_replen(req);
- req->rq_interpret_reply = osc_punch_interpret;
+ req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
aa->aa_oi = oinfo;
RETURN(count);
}
-static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
+static int osc_destroy_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *data,
int rc)
{
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
EXIT;
}
-static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
+static int brw_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *data, int rc)
{
struct osc_brw_async_args *aa = data;
struct client_obd *cli;
RETURN(rc);
}
-static int osc_enqueue_interpret(struct ptlrpc_request *req,
+static int osc_enqueue_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
struct osc_enqueue_args *aa, int rc)
{
int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
aa->oa_ei = einfo;
aa->oa_exp = exp;
- req->rq_interpret_reply = osc_enqueue_interpret;
+ req->rq_interpret_reply =
+ (ptlrpc_interpterer_t)osc_enqueue_interpret;
ptlrpc_set_add_req(rqset, req);
} else if (intent) {
ptlrpc_req_finished(req);
req->rq_no_delay = 1;
}
- req->rq_interpret_reply = osc_statfs_interpret;
+ req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
aa->aa_oi = oinfo;
RETURN(-EINVAL);
}
-static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
+static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
void *aa, int rc)
{
struct llog_ctxt *ctxt;
ptlrpc_request_set_replen(req);
ptlrpc_set_add_req(set, req);
- ptlrpc_check_set(set);
+ ptlrpc_check_set(NULL, set);
RETURN(0);
}
if (req->rq_phase == RQ_PHASE_NEW) {
if (req->rq_interpret_reply != NULL) {
- int (*interpreter)(struct ptlrpc_request *,
- void *, int) =
+ ptlrpc_interpterer_t interpreter =
req->rq_interpret_reply;
/* higher level (i.e. LOV) failed;
* let the sub reqs clean up */
req->rq_status = -EBADR;
- interpreter(req, &req->rq_async_args,
+ interpreter(NULL, req, &req->rq_async_args,
req->rq_status);
}
set->set_remaining--;
}
/* this sends any unsent RPCs in @set and returns TRUE if all are sent */
-int ptlrpc_check_set(struct ptlrpc_request_set *set)
+int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
{
struct list_head *tmp;
int force_timer_recalc = 0;
ptlrpc_unregister_bulk (req);
if (req->rq_interpret_reply != NULL) {
- int (*interpreter)(struct ptlrpc_request *,void *,int) =
+ ptlrpc_interpterer_t interpreter =
req->rq_interpret_reply;
- req->rq_status = interpreter(req, &req->rq_async_args,
+ req->rq_status = interpreter(NULL, req,
+ &req->rq_async_args,
req->rq_status);
}
req->rq_phase = RQ_PHASE_COMPLETE;
lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1),
ptlrpc_expired_set,
ptlrpc_interrupted_set, set);
- rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
+ rc = l_wait_event(set->set_waitq,
+ ptlrpc_check_set(NULL, set), &lwi);
LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
int praa_old_status;
};
-static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
+static int ptlrpc_replay_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
void * data, int rc)
{
struct ptlrpc_replay_async_args *aa = data;
} while(0)
-static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
+static int ptlrpc_connect_interpret(const struct lu_env *env,
+ struct ptlrpc_request *request,
void * data, int rc);
int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
EXIT;
}
-static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
+static int ptlrpc_connect_interpret(const struct lu_env *env,
+ struct ptlrpc_request *request,
void * data, int rc)
{
struct ptlrpc_connect_async_args *aa = data;
RETURN(rc);
}
-static int completed_replay_interpret(struct ptlrpc_request *req,
+static int completed_replay_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
void * data, int rc)
{
ENTRY;
}
do_check_set:
- rc = ptlrpc_check_set(set);
+ rc = ptlrpc_check_set(NULL, set);
/* not finished, and we are not expired, simply return */
if (!rc && cfs_time_before(curtime, cfs_time_add(pd->pd_this_ping,
rc = ptlrpc_set_add_new_req(pc, req);
if (rc) {
- int (*interpreter)(struct ptlrpc_request *,
- void *, int);
+ ptlrpc_interpterer_t interpreter;
interpreter = req->rq_interpret_reply;
* resources.
*/
req->rq_status = -EBADR;
- interpreter(req, &req->rq_async_args,
+ interpreter(NULL, req, &req->rq_async_args,
req->rq_status);
req->rq_set = NULL;
ptlrpc_req_finished(req);
}
}
-static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
+static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
{
struct list_head *tmp, *pos;
struct ptlrpc_request *req;
spin_unlock(&pc->pc_set->set_new_req_lock);
if (pc->pc_set->set_remaining) {
- rc = rc | ptlrpc_check_set(pc->pc_set);
+ rc = rc | ptlrpc_check_set(env, pc->pc_set);
/*
* XXX: our set never completes, so we prune the completed
timeout = cfs_time_seconds(ptlrpc_set_next_timeout(pc->pc_set));
lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, pc->pc_set);
- l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
+ lu_context_enter(&pc->pc_env.le_ctx);
+ l_wait_event(pc->pc_set->set_waitq,
+ ptlrpcd_check(&pc->pc_env, pc), &lwi);
+ lu_context_exit(&pc->pc_env.le_ctx);
/*
* Abort inflight rpcs for forced stop case.
return 0;
}
-#else
+#else /* !__KERNEL__ */
int ptlrpcd_check_async_rpcs(void *arg)
{
pc->pc_recurred++;
if (pc->pc_recurred == 1) {
- rc = ptlrpcd_check(pc);
+ lu_context_enter(&pc->pc_env.le_ctx);
+ rc = ptlrpcd_check(&pc->pc_env, pc);
+ lu_context_exit(&pc->pc_env.le_ctx);
if (!rc)
ptlrpc_expired_set(pc->pc_set);
/*
* XXX: send replay requests.
*/
if (pc == &ptlrpcd_recovery_pc)
- rc = ptlrpcd_check(pc);
+ rc = ptlrpcd_check(&pc->pc_env, pc);
}
pc->pc_recurred--;
int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
{
- int rc = 0;
+ int rc;
ENTRY;
/*
init_completion(&pc->pc_finishing);
spin_lock_init(&pc->pc_lock);
snprintf (pc->pc_name, sizeof (pc->pc_name), name);
-
pc->pc_set = ptlrpc_prep_set();
if (pc->pc_set == NULL)
GOTO(out, rc = -ENOMEM);
+ /*
+ * So far only "client" ptlrpcd uses an environment. In the future,
+ * ptlrpcd thread (or a thread-set) has to be given an argument,
+ * describing its "scope".
+ */
+ rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
+ if (rc != 0) {
+ ptlrpc_set_destroy(pc->pc_set);
+ GOTO(out, rc);
+ }
#ifdef __KERNEL__
rc = cfs_kernel_thread(ptlrpcd, pc, 0);
if (rc < 0) {
+ lu_context_fini(&pc->pc_env.le_ctx);
ptlrpc_set_destroy(pc->pc_set);
GOTO(out, rc);
}
liblustre_deregister_wait_callback(pc->pc_wait_callback);
liblustre_deregister_idle_callback(pc->pc_idle_callback);
#endif
+ lu_context_fini(&pc->pc_env.le_ctx);
ptlrpc_set_destroy(pc->pc_set);
}
* in cleanup time when all inflight rpcs aborted.
*/
static int
-llcd_interpret(struct ptlrpc_request *req, void *noused, int rc)
+llcd_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *noused, int rc)
{
struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0];
CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc);
/* bug 5515 */
req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
- req->rq_interpret_reply = llcd_interpret;
+ req->rq_interpret_reply = (ptlrpc_interpterer_t)llcd_interpret;
req->rq_async_args.pointer_arg[0] = llcd;
rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
if (rc) {
struct lustre_qunit *aa_qunit;
};
-static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
+static int dqacq_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *data, int rc)
{
struct dqacq_async_args *aa = (struct dqacq_async_args *)data;
struct lustre_quota_ctxt *qctxt = aa->aa_ctxt;