* Big doc comment for l_wait_event.
* Only fire the timeout once from l_wait_event.
* Made timeout and the recovery-upcall path configurable via sysctl.
* Added OBD_FAIL_OSC codes for simulating simple client failure.
* Tentative rewiring of recovd into client connections, needs more thought
and then more typing. We do fire the upcall, at least.
* Use the provided cluuid instead of NULL wherever it's handy already.
* Protect (feebly) against waiting for recovery that will never happen,
in sync_io_timeout.
* Add timeouts to bulk operations in MDS and OST -- a recovery stub is now
triggered, but nothing else.
* Document the unpleasant business in osc_brw_{read,write} as pertains to
errors in the callbacks and cleanup of descriptors.
* Remove now-unused ptlrpc_check_bulk_{sent,received}.
#define OBD_IOC_DEC_FS_USE_COUNT _IO ('f', 133 )
+/*
+ * l_wait_event is a flexible sleeping function, permitting simple caller
+ * configuration of interrupt and timeout sensitivity along with actions to
+ * be performed in the event of either exception.
+ *
+ * Common usage looks like this:
+ *
+ * struct l_wait_info lwi = LWI_TIMEOUT_INTR(timeout, timeout_handler,
+ * intr_handler, callback_data);
+ * rc = l_wait_event(waitq, condition, &lwi);
+ *
+ * (LWI_TIMEOUT and LWI_INTR macros are available for timeout- and
+ * interrupt-only variants, respectively.)
+ *
+ * If a timeout is specified, the timeout_handler will be invoked in the event
+ * that the timeout expires before the process is awakened. (Note that any
+ * waking of the process will restart the timeout, even if the condition is
+ * not satisfied and the process immediately returns to sleep. This might be
+ * considered a bug.) If the timeout_handler returns non-zero, l_wait_event
+ * will return -ETIMEDOUT and the caller will continue. If the handler returns
+ * zero instead, the process will go back to sleep until it is awakened by the
+ * waitq or some similar mechanism, or an interrupt occurs (if the caller has
+ * asked for interrupts to be detected). The timeout will only fire once, so
+ * callers should take care that a timeout_handler which returns zero will take
+ * future steps to awaken the process. N.B. that these steps must include making
+ * the provided condition become true.
+ *
+ * If the interrupt flag (lwi_signals) is non-zero, then the process will be
+ * interruptible, and will be awakened by any "killable" signal (SIGTERM,
+ * SIGKILL or SIGINT). If a timeout is also specified, then the process will
+ * only become interruptible _after_ the timeout has expired, though it can be
+ * awakened by a signal that was delivered before the timeout and is still
+ * pending when the timeout expires. If a timeout is not specified, the process
+ * will be interruptible at all times during l_wait_event.
+ */
+
struct l_wait_info {
long lwi_timeout;
int (*lwi_on_timeout)(void *);
lwi_cb_data: data \
})
-#define LWI_INTR(signals, cb, data) \
+#define LWI_INTR(cb, data) \
((struct l_wait_info) { \
- lwi_signals: signals, \
+ lwi_signals: 1, \
lwi_on_signal: cb, \
lwi_cb_data: data \
})
-#define LWI_TIMEOUT_INTR(time, time_cb, signals, sig_cb, data) \
+#define LWI_TIMEOUT_INTR(time, time_cb, sig_cb, data) \
((struct l_wait_info) { \
lwi_timeout: time, \
lwi_on_timeout: time_cb, \
- lwi_signals: signals, \
+ lwi_signals: 1, \
lwi_on_signal: sig_cb, \
lwi_cb_data: data \
})
do { \
wait_queue_t __wait; \
long __state; \
+ int __timed_out = 0; \
init_waitqueue_entry(&__wait, current); \
\
add_wait_queue(&wq, &__wait); \
- __state = TASK_UNINTERRUPTIBLE; \
+ if (info->lwi_signals && !info->lwi_timeout) \
+ __state = TASK_INTERRUPTIBLE; \
+ else \
+ __state = TASK_UNINTERRUPTIBLE; \
for (;;) { \
set_current_state(__state); \
if (condition) \
break; \
- /* We only become INTERRUPTIBLE if a timeout has fired, and \
- * the caller has given us some signals to care about. \
- * \
- * XXXshaver we should check against info->wli_signals here, \
- * XXXshaver instead of just using l_killable_pending, perhaps. \
- */ \
- if (__state == TASK_INTERRUPTIBLE && \
- l_killable_pending(current)) { \
- CERROR("lwe: interrupt for %d\n", current->pid); \
- if (info->lwi_on_signal) \
- info->lwi_on_signal(info->lwi_cb_data); \
- ret = -EINTR; \
- break; \
+ if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) { \
+ CERROR("lwe: interrupt\n"); \
+ if (info->lwi_on_signal) \
+ info->lwi_on_signal(info->lwi_cb_data); \
+ ret = -EINTR; \
+ break; \
} \
- if (info->lwi_timeout) { \
+ if (info->lwi_timeout && !__timed_out) { \
if (schedule_timeout(info->lwi_timeout) == 0) { \
- CERROR("lwe: timeout for %d\n", current->pid); \
+ CERROR("lwe: timeout\n"); \
+ __timed_out = 1; \
if (!info->lwi_on_timeout || \
info->lwi_on_timeout(info->lwi_cb_data)) { \
ret = -ETIMEDOUT; \
break; \
} \
- /* We'll take signals only after a timeout. */ \
+ /* We'll take signals after a timeout. */ \
if (info->lwi_signals) { \
__state = TASK_INTERRUPTIBLE; \
/* Check for a pending interrupt. */ \
- if (info->lwi_signals && \
- l_killable_pending(current)) { \
- CERROR("lwe: pending interrupt for %d\n", \
- current->pid); \
- if (info->lwi_on_signal) \
- info->lwi_on_signal(info->lwi_cb_data); \
- ret = -EINTR; \
- break; \
+ if (info->lwi_signals && l_killable_pending(current)) { \
+ CERROR("lwe: pending interrupt\n"); \
+ if (info->lwi_on_signal) \
+ info->lwi_on_signal(info->lwi_cb_data); \
+ ret = -EINTR; \
+ break; \
} \
} \
} \
/* file data for open files on MDS */
struct mds_file_data {
- struct list_head mfd_list;
- struct file * mfd_file;
+ struct list_head mfd_list;
+ struct file *mfd_file;
__u64 mfd_clientfd;
__u32 mfd_clientcookie;
};
/* global variables */
extern unsigned long obd_memory;
extern unsigned long obd_fail_loc;
+extern unsigned long obd_timeout;
+extern char obd_recovery_upcall[128];
#define OBD_FAIL_MDS 0x100
#define OBD_FAIL_MDS_HANDLE_UNPACK 0x101
#define OBD_FAIL_OST_BRW_WRITE_BULK 0x20e
#define OBD_FAIL_OST_BRW_READ_BULK 0x20f
-#define OBB_FAIL_LDLM 0x300
+#define OBD_FAIL_LDLM 0x300
#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
#define OBD_FAIL_LDLM_ENQUEUE 0x302
#define OBD_FAIL_LDLM_CONVERT 0x303
#define OBD_FAIL_LDLM_BL_CALLBACK 0x305
#define OBD_FAIL_LDLM_CP_CALLBACK 0x306
+#define OBD_FAIL_OSC 0x400
+#define OBD_FAIL_OSC_BRW_READ_BULK 0x401
+#define OBD_FAIL_OSC_BRW_WRITE_BULK 0x402
+
/* preparation for a more advanced failure testbed (not functional yet) */
#define OBD_FAIL_MASK_SYS 0x0000FF00
#define OBD_FAIL_MASK_LOC (0x000000FF | OBD_FAIL_MASK_SYS)
/* XXX get recovery hooked in here again */
//ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
- ptlrpc_init_client(NULL, NULL, rq_portal, rp_portal, mdc->cl_client);
- ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- mdc->cl_ldlm_client);
+ ptlrpc_init_client(ptlrpc_connmgr, NULL, rq_portal, rp_portal,
+ mdc->cl_client);
+ /* XXXshaver Should the LDLM have its own recover function? Probably. */
+ ptlrpc_init_client(ptlrpc_connmgr, NULL, LDLM_REQUEST_PORTAL,
+ LDLM_REPLY_PORTAL, mdc->cl_ldlm_client);
mdc->cl_client->cli_name = "mdc";
mdc->cl_ldlm_client->cli_name = "ldlm";
mdc->cl_max_mdsize = sizeof(struct lov_stripe_md);
ENTRY;
down(&cli->cl_sem);
MOD_INC_USE_COUNT;
-#warning shaver: we might need a real cluuid here
- rc = class_connect(conn, obd, NULL);
+ rc = class_connect(conn, obd, cluuid);
if (rc) {
MOD_DEC_USE_COUNT;
GOTO(out_sem, rc);
ENTRY;
desc->b_connection->c_level = LUSTRE_CONN_RECOVD;
desc->b_flags |= PTL_RPC_FL_TIMEOUT;
- if (desc->b_client && desc->b_client->cli_recovd) {
+ if (desc->b_client && desc->b_client->cli_recovd &&
+ class_signal_client_failure) {
/* XXXshaver Do we need a resend strategy, or do we just
* XXXshaver return -ERESTARTSYS and punt it?
*/
CERROR("signalling failure of client %p\n", desc->b_client);
class_signal_client_failure(desc->b_client);
- }
- /* We go back to sleep, until we're resumed or interrupted. */
- RETURN(0);
+ /* We go back to sleep, until we're resumed or interrupted. */
+ RETURN(0);
+ }
+
+ /* If we can't be recovered, just abort the syscall with -ETIMEDOUT. */
+ RETURN(1);
}
static int sync_io_intr(void *data)
ENTRY;
if (phase == CB_PHASE_START) {
-#warning shaver hardcoded timeout (/proc/sys/lustre/timeout)
struct l_wait_info lwi;
- lwi = LWI_TIMEOUT_INTR(100 * HZ, sync_io_timeout,
- SIGTERM | SIGKILL | SIGINT, sync_io_intr,
- data);
+ lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, sync_io_timeout,
+ sync_io_intr, data);
ret = l_wait_event(data->waitq, data->complete, &lwi);
if (atomic_dec_and_test(&data->refcount))
OBD_FREE(data, sizeof(*data));
return err;
}
-
int ll_recover(struct ptlrpc_client *cli)
{
struct ptlrpc_request *req;
return &req->rq_export->exp_obd->u.mds;
}
+static int mds_bulk_timeout(void *data)
+{
+ struct ptlrpc_bulk_desc *desc = data;
+
+ ENTRY;
+ CERROR("(not yet) starting recovery of client %p\n", desc->b_client);
+ RETURN(1);
+}
/* Assumes caller has already pushed into the kernel filesystem context */
static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
struct mds_obd *mds = mds_req2mds(req);
struct ptlrpc_bulk_desc *desc;
struct ptlrpc_bulk_page *bulk;
+ struct l_wait_info lwi;
char *buf;
ENTRY;
GOTO(cleanup_buf, rc);
}
- wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
- if (desc->b_flags & PTL_RPC_FL_INTR)
- GOTO(cleanup_buf, rc = -EINTR);
+ lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
+ rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_SENT, &lwi);
+ if (rc) {
+ if (rc != -ETIMEDOUT)
+ LBUG();
+ GOTO(cleanup_buf, rc);
+ }
EXIT;
cleanup_buf:
RETURN(0);
}
}
-
-#warning shaver: we might need a real cluuid here
- rc = class_connect(conn, obd, NULL);
+ rc = class_connect(conn, obd, cluuid);
if (rc)
GOTO(out_dec, rc);
exp = class_conn2export(conn);
struct obd_device obd_dev[MAX_OBD_DEVICES];
struct list_head obd_types;
unsigned long obd_memory = 0;
+
+/* The following are visible and mutable through /proc/sys/lustre/. */
unsigned long obd_fail_loc = 0;
+unsigned long obd_timeout = 100;
+char obd_recovery_upcall[128] = "/usr/lib/lustre/ha_assist";
extern struct obd_type *class_nm_to_type(char *nm);
EXPORT_SYMBOL(obdo_cachep);
EXPORT_SYMBOL(obd_memory);
EXPORT_SYMBOL(obd_fail_loc);
+EXPORT_SYMBOL(obd_timeout);
+EXPORT_SYMBOL(obd_recovery_upcall);
EXPORT_SYMBOL(class_register_type);
EXPORT_SYMBOL(class_unregister_type);
#define OBD_FAIL_LOC 1 /* control test failures instrumentation */
#define OBD_ENTRY 2 /* control enter/leave pattern */
-#define OBD_TIMEOUT 3 /* timeout on upcalls to become intrble */
-#define OBD_HARD 4 /* mount type "hard" or "soft" */
-#define OBD_VARS 5
-#define OBD_INDEX 6
-#define OBD_RESET 7
+#define OBD_VARS 3
+#define OBD_INDEX 4
+#define OBD_RESET 5
+#define OBD_TIMEOUT 6 /* RPC timeout before recovery/intr */
+/* XXX move to /proc/sys/lustre/recovery? */
+#define OBD_UPCALL 7 /* path to recovery upcall */
#define OBD_VARS_SLOT 2
{OBD_VARS, "vars", &vars[0], sizeof(int), 0644, NULL, &proc_dointvec},
{OBD_INDEX, "index", &index, sizeof(int), 0644, NULL, &obd_sctl_vars},
{OBD_RESET, "reset", NULL, 0, 0644, NULL, &obd_sctl_reset},
+ {OBD_TIMEOUT, "timeout", &obd_timeout, sizeof(int), 0644, NULL, &proc_dointvec},
+ /* XXX need to lock so we avoid update races with the recovery upcall! */
+ {OBD_UPCALL, "recovery_upcall", obd_recovery_upcall, 128, 0644, NULL,
+ &proc_dostring, &sysctl_string },
{ 0 }
};
#include <linux/obd_lov.h>
#include <linux/init.h>
#include <linux/lustre_ha.h>
+#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *md)
*
* On error, we never do the brw_finish, so we handle all decrefs.
*/
- rc = ptlrpc_register_bulk(desc);
- if (rc)
- GOTO(out_unmap, rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
+ CERROR("obd_fail_loc=%x, skipping register_bulk\n",
+ OBD_FAIL_OSC_BRW_READ_BULK);
+ } else {
+ rc = ptlrpc_register_bulk(desc);
+ if (rc)
+ GOTO(out_unmap, rc);
+ }
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- /* XXX: Mike, this is the only place I'm not sure of. If we have
- * an error here, will we have always called brw_finish? If no,
- * then out_req will not clean up and we should go to out_desc.
- * If maybe, then we are screwed, and we need to set things up
- * so that bulk_sink_callback is called for each bulk page,
- * even on error so brw_finish is always called. It would need
- * to be passed an error code as a parameter to know what to do.
- *
- * That would also help with the partial completion case, so
- * we could say in brw_finish "these pages are done, don't
- * restart them" and osc_brw callers can know this.
+ /*
+ * XXX: If there is an error during the processing of the callback,
+ * such as a timeout in a sleep that it performs, brw_finish
+ * will never get called, and we'll leak the desc, fail to kunmap
+ * things, cats will live with dogs. One solution would be to
+ * export brw_finish as osc_brw_finish, so that the timeout case and
+ * its kin could call it for proper cleanup. An alternative would
+ * be for an error return from the callback to cause us to clean up,
+ * but that doesn't help the truly async cases (like LOV), which
+ * will immediately return from their PHASE_START callback, before
+ * any such cleanup-requiring error condition can be detected.
*/
if (rc)
GOTO(out_req, rc);
if (desc->b_page_count != page_count)
LBUG();
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
+ GOTO(out_unmap, rc = 0);
+
/* Our reference is released when brw_finish is complete. */
rc = ptlrpc_send_bulk(desc);
RETURN(0);
}
+static int ost_bulk_timeout(void *data)
+{
+ struct ptlrpc_bulk_desc *desc = data;
+
+ ENTRY;
+ CERROR("(not yet) starting recovery of client %p\n", desc->b_client);
+ RETURN(1);
+}
+
static int ost_brw_read(struct ptlrpc_request *req)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct niobuf_local *local_nb = NULL;
struct obd_ioobj *ioo;
struct ost_body *body;
+ struct l_wait_info lwi;
int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
ENTRY;
niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = OBD_BRW_READ;
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
+ GOTO(out, rc = 0);
+
for (i = 0; i < objcount; i++) {
ost_unpack_ioo(&tmp1, &ioo);
if (tmp2 + ioo->ioo_bufcnt > end2) {
ost_unpack_niobuf(&tmp2, &remote_nb);
}
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc)
- RETURN(rc);
OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
if (local_nb == NULL)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp1, niocount, tmp2, local_nb, NULL);
if (req->rq_status)
- GOTO(out_local, 0);
+ GOTO(out, 0);
desc = ptlrpc_prep_bulk(req->rq_connection);
if (desc == NULL)
if (rc)
GOTO(out_bulk, rc);
-#warning OST must time out here.
- wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
- if (desc->b_flags & PTL_RPC_FL_INTR)
- rc = -EINTR;
+ lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+ rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_SENT, &lwi);
+ if (rc) {
+ LASSERT(rc == -ETIMEDOUT);
+ GOTO(out_bulk, rc);
+ }
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
req->rq_status = obd_commitrw(cmd, conn, objcount,
tmp1, niocount, local_nb, NULL);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+
out_bulk:
ptlrpc_free_bulk(desc);
out_local:
void *desc_priv = NULL;
int reply_sent = 0;
struct ptlrpc_service *srv;
+ struct l_wait_info lwi;
__u32 xid;
ENTRY;
reply_sent = 1;
ptlrpc_reply(req->rq_svc, req);
-#warning OST must time out here.
- wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD);
+ lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+ rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD, &lwi);
+ if (rc) {
+ if (rc != -ETIMEDOUT)
+ LBUG();
+ GOTO(fail_bulk, rc);
+ }
rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
desc->b_desc_private);
int rep_portal, struct ptlrpc_client *cl)
{
memset(cl, 0, sizeof(*cl));
- cl->cli_recovd = recovd;
cl->cli_recover = recover;
if (recovd)
recovd_cli_manage(recovd, cl);
list_add_tail(&req->rq_list, &cli->cli_delayed_head);
spin_unlock(&cli->cli_lock);
-#warning shaver: what happens when we get interrupted during this wait?
- lwi = LWI_INTR(SIGTERM | SIGKILL | SIGINT, NULL, NULL);
- l_wait_event(req->rq_wait_for_rep,
- req->rq_level <= req->rq_connection->c_level,
- &lwi);
+ lwi = LWI_INTR(NULL, NULL);
+ rc = l_wait_event(req->rq_wait_for_rep,
+ req->rq_level <= req->rq_connection->c_level,
+ &lwi);
spin_lock(&cli->cli_lock);
list_del_init(&req->rq_list);
spin_unlock(&cli->cli_lock);
+
+ if (rc)
+ RETURN(rc);
CERROR("process %d resumed\n", current->pid);
}
resend:
req->rq_time = CURRENT_TIME;
- req->rq_timeout = 100;
+ req->rq_timeout = obd_timeout;
rc = ptl_send_rpc(req);
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
CDEBUG(D_OTHER, "-- sleeping\n");
lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
- SIGKILL | SIGTERM | SIGINT, interrupted_request,
- req);
+ interrupted_request,req);
l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
CDEBUG(D_OTHER, "-- done\n");
{
int rc = 0;
struct ptlrpc_client *cli = req->rq_client;
- struct l_wait_info lwi = LWI_INTR(SIGKILL|SIGTERM|SIGINT, NULL, NULL);
+ struct l_wait_info lwi;
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
req->rq_connection->c_level);
req->rq_time = CURRENT_TIME;
- req->rq_timeout = 100;
+ req->rq_timeout = obd_timeout;
rc = ptl_send_rpc(req);
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
}
CDEBUG(D_OTHER, "-- sleeping\n");
+ lwi = LWI_INTR(NULL, NULL);
l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
CDEBUG(D_OTHER, "-- done\n");
bulk_source_eq, bulk_sink_eq;
static ptl_process_id_t local_id = {PTL_NID_ANY, PTL_PID_ANY};
-int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *desc)
-{
- ENTRY;
-
- if (desc->b_flags & PTL_BULK_FL_SENT)
- RETURN(1);
-
- if (l_killable_pending(current)) {
- desc->b_flags |= PTL_RPC_FL_INTR;
- RETURN(1);
- }
-
- CDEBUG(D_NET, "no event yet\n");
- RETURN(0);
-}
-
-int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *desc)
-{
- ENTRY;
-
- if (desc->b_flags & PTL_BULK_FL_RCVD)
- RETURN(1);
-
- if (l_killable_pending(current)) {
- desc->b_flags |= PTL_RPC_FL_INTR;
- RETURN(1);
- }
-
- CDEBUG(D_NET, "no event yet\n");
- RETURN(0);
-}
-
static int ptl_send_buf(struct ptlrpc_request *request,
struct ptlrpc_connection *conn, int portal)
{
#include <linux/kmod.h>
#include <linux/lustre_lite.h>
#include <linux/lustre_ha.h>
+#include <linux/obd_support.h>
struct recovd_obd *ptlrpc_connmgr;
char *argv[2];
char *envp[3];
- argv[0] = "/usr/src/obd/utils/ha_assist.sh";
+ argv[0] = obd_recovery_upcall;
argv[1] = NULL;
envp [0] = "HOME=/";
ptlrpc_cleanup_connection();
}
-/* events.c */
-EXPORT_SYMBOL(ptlrpc_check_bulk_sent);
-EXPORT_SYMBOL(ptlrpc_check_bulk_received);
-
/* connmgr.c */
EXPORT_SYMBOL(ptlrpc_connmgr);
EXPORT_SYMBOL(connmgr_connect);