#include <linux/lustre_net.h>
-#define MGR_STOPPING 1
-#define MGR_RUNNING 2
-#define MGR_STOPPED 4
-#define MGR_KILLED 8
-#define MGR_EVENT 16
-#define MGR_WORKING 32
-#define MGR_SIGNAL 64
-
#define LUSTRE_HA_NAME "ptlrpc"
#define CONNMGR_CONNECT 1
int connmgr_connect(struct recovd_obd *mgr, struct ptlrpc_connection *conn);
int connmgr_handle(struct obd_device *dev, struct ptlrpc_service *svc,
struct ptlrpc_request *req);
-void connmgr_cli_fail(struct ptlrpc_client *cli);
-void connmgr_cli_manage(struct recovd_obd *mgr, struct ptlrpc_client *cli);
+void recovd_cli_fail(struct ptlrpc_client *cli);
+void recovd_cli_manage(struct recovd_obd *mgr, struct ptlrpc_client *cli);
+void recovd_cli_fixed(struct ptlrpc_client *cli);
int recovd_setup(struct recovd_obd *mgr);
int recovd_cleanup(struct recovd_obd *mgr);
__u32 opc;
__u32 xid;
+ __u64 last_rcvd;
+ __u64 last_committed;
+ __u64 transno;
__u32 status;
__u32 type;
__u32 connid;
__u32 nlink;
__u32 generation;
__u32 last_xid;
- __u64 last_committed;
- __u64 last_rcvd;
};
/* MDS update records */
#define OBD_IOC_NAME2DEV _IOWR('f', 29, long)
#define OBD_IOC_NEWDEV _IOWR('f', 30, long)
+#define OBD_RECOVD_NEWCONN _IOWR('f', 31, long)
+
#define OBD_IOC_DEC_FS_USE_COUNT _IO ('f', 32 )
#endif
extern kmem_cache_t *ll_file_data_slab;
struct ll_file_data {
__u64 fd_mdshandle;
+ struct ptlrpc_request *fd_req;
};
#define LL_INLINESZ 60
struct ptlrpc_client ll_ost_client;
struct ptlrpc_connection *ll_ost_conn;
- struct list_head ll_commitcbd_not_committed;
wait_queue_head_t ll_commitcbd_waitq;
wait_queue_head_t ll_commitcbd_ctl_waitq;
int ll_commitcbd_flags;
/* default rpc ring length */
#define RPC_RING_LENGTH 2
-#define SVC_STOPPING 1
-#define SVC_RUNNING 2
-#define SVC_STOPPED 4
-#define SVC_KILLED 8
-#define SVC_EVENT 16
-#define SVC_HA_EVENT 32
-#define SVC_SIGNAL 64
-
+#define SVC_KILLED 1
+#define SVC_EVENT 2
+#define SVC_SIGNAL 4
+#define SVC_RUNNING 8
+#define SVC_STOPPING 16
+#define SVC_STOPPED 32
+
+#define RECOVD_STOPPING 1 /* how cleanup tells recovd to quit */
+#define RECOVD_IDLE 2 /* normal state */
+#define RECOVD_STOPPED 4 /* after recovd has stopped */
+#define RECOVD_FAIL 8 /* RPC timeout: wakeup recovd, sets flag */
+#define RECOVD_TIMEOUT 16 /* set when recovd detects a timeout */
+#define RECOVD_UPCALL_WAIT 32 /* an upcall has been placed */
+#define RECOVD_UPCALL_ANSWER 64 /* an upcall has been answered */
#define LUSTRE_CONN_NEW 1
#define LUSTRE_CONN_CON 2
struct obd_device *cli_obd;
__u32 cli_request_portal;
__u32 cli_reply_portal;
+ __u64 cli_last_rcvd;
+ __u64 cli_last_committed;
struct semaphore cli_rpc_sem; /* limits outstanding requests */
spinlock_t cli_lock; /* protects lists */
struct list_head cli_sending_head;
struct list_head cli_sent_head;
+ struct list_head cli_replied_head;
+ struct list_head cli_replay_head;
struct list_head cli_ha_item;
+ void (*cli_recover)(struct ptlrpc_client *);
struct recovd_obd *cli_recovd;
};
#define PTL_RPC_TYPE_REPLY 3
/* state flags of requests */
-#define PTL_RPC_FL_INTR 1
-#define PTL_RPC_FL_REPLY 2
-#define PTL_RPC_FL_SENT 4
-#define PTL_BULK_FL_SENT 8
-#define PTL_BULK_FL_RCVD 16
-#define PTL_RPC_FL_ERR 32
-#define PTL_RPC_FL_TIMEOUT 64
+#define PTL_RPC_FL_INTR (1 << 0)
+#define PTL_RPC_FL_REPLY (1 << 1)
+#define PTL_RPC_FL_SENT (1 << 2)
+#define PTL_BULK_FL_SENT (1 << 3)
+#define PTL_BULK_FL_RCVD (1 << 4)
+#define PTL_RPC_FL_ERR (1 << 5)
+#define PTL_RPC_FL_TIMEOUT (1 << 6)
+#define PTL_RPC_FL_RESEND (1 << 7)
+#define PTL_RPC_FL_COMMITTED (1 << 8)
+#define PTL_RPC_FL_FINISHED (1 << 9)
+#define PTL_RPC_FL_RETAIN (1 << 10)
struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
int rq_status;
int rq_flags;
__u32 rq_connid;
+ atomic_t rq_refcount;
int rq_reqlen;
struct lustre_msg *rq_reqmsg;
int rq_replen;
struct lustre_msg *rq_repmsg;
+ __u64 rq_transno;
char *rq_bulkbuf;
int rq_bulklen;
time_t rq_time;
+ time_t rq_timeout;
// void * rq_reply_handle;
wait_queue_head_t rq_wait_for_rep;
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req);
+void ptlrpc_resend_req(struct ptlrpc_request *request);
int ptl_send_rpc(struct ptlrpc_request *request);
void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
/* rpc/client.c */
-void ptlrpc_init_client(struct recovd_obd *, int req_portal, int rep_portal,
+void ptlrpc_init_client(struct recovd_obd *,
+ void (*recover)(struct ptlrpc_client *),
+ int req_portal, int rep_portal,
struct ptlrpc_client *);
+void ptlrpc_cleanup_client(struct ptlrpc_client *cli);
__u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req);
struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid);
int ptlrpc_queue_wait(struct ptlrpc_request *req);
int count, int *lengths, char **bufs);
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
void ptlrpc_free_req(struct ptlrpc_request *request);
+void ptlrpc_req_finished(struct ptlrpc_request *request);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *);
int ptlrpc_check_status(struct ptlrpc_request *req, int err);
struct ptlrpc_service *recovd_service;
struct ptlrpc_client *recovd_client;
__u32 recovd_flags;
+ __u32 recovd_wakeup_flag;
spinlock_t recovd_lock;
- struct list_head recovd_connections_lh; /* connections managed by the mgr */
- struct list_head recovd_troubled_lh; /* connections in trouble */
+ struct list_head recovd_clients_lh; /* clients managed */
+ struct list_head recovd_troubled_lh; /* clients in trouble */
wait_queue_head_t recovd_recovery_waitq;
wait_queue_head_t recovd_ctl_waitq;
wait_queue_head_t recovd_waitq;
if (OBD_FAIL_CHECK(id)) { \
CERROR("obd_fail_loc=%x, fail operation rc=%d\n", id, ret); \
obd_fail_loc |= OBD_FAILED; \
+ if ((id) & OBD_FAIL_ONCE) \
+ obd_fail_loc |= OBD_FAIL_ONCE; \
RETURN(ret); \
} \
} while(0)
RETURN(-EINVAL);
}
- ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ ptlrpc_init_client(NULL, NULL,
+ LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
obddev->u.ldlm.ldlm_client);
connection = ptlrpc_uuid_to_connection("ldlm");
if (!connection)
OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
if (ldlm->ldlm_client == NULL)
LBUG();
- ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ ptlrpc_init_client(NULL, NULL,
+ LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
ldlm->ldlm_client);
MOD_INC_USE_COUNT;
LINX=page.c
-llite_SOURCES = commit_callback.c page.c super.c rw.c file.c dir.c sysctl.c namei.c symlink.c
+llite_SOURCES = recover.c commit_callback.c page.c super.c rw.c file.c dir.c sysctl.c namei.c symlink.c
dist-hook:
list='$(LINX)'; for f in $$list; do rm -f $(distdir)/$$f; done
GOTO(out, rc = 1);
}
- if (!list_empty(&sbi->ll_commitcbd_not_committed))
- GOTO(out, rc = 1);
-
out:
spin_unlock(&sbi->ll_commitcbd_lock);
RETURN(rc);
spin_lock(&sbi->ll_commitcbd_lock);
if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) {
spin_unlock(&sbi->ll_commitcbd_lock);
- CERROR("lustre_hamgr quitting\n");
+ CERROR("lustre_commitd quitting\n");
EXIT;
break;
}
+ if (!list_empty(&sbi->ll_mds_client.cli_replied_head))
+ CERROR("** clean up committed reqs here **\n");
schedule_timeout(sbi->ll_commitcbd_timeout);
CERROR("commit callback daemon woken up - FIXME\n");
RETURN(0);
}
+
+
int ll_commitcbd_setup(struct ll_sb_info *sbi)
{
int rc;
rc = mdc_open(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
S_IFREG, file->f_flags, &fd->fd_mdshandle, &req);
- if (!fd->fd_mdshandle)
- CERROR("mdc_open didn't assign fd_mdshandle\n");
-
- ptlrpc_free_req(req);
+ fd->fd_req = req;
+ ptlrpc_req_finished(req);
if (rc) {
if (rc > 0)
rc = -rc;
GOTO(out, rc);
}
+ if (!fd->fd_mdshandle)
+ CERROR("mdc_open didn't assign fd_mdshandle\n");
+
oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
if (oa == NULL)
rc = mdc_close(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
S_IFREG, fd->fd_mdshandle, &req);
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req);
if (rc) {
if (rc > 0)
rc = -rc;
GOTO(out, rc);
}
+ ptlrpc_free_req(fd->fd_req);
+
EXIT;
out:
extern struct address_space_operations ll_aops;
extern struct address_space_operations ll_dir_aops;
struct super_operations ll_super_operations;
+
+extern void ll_recover(struct ptlrpc_client *);
extern int ll_commitcbd_setup(struct ll_sb_info *);
extern int ll_commitcbd_cleanup(struct ll_sb_info *);
GOTO(out_free, sb = NULL);
}
- ptlrpc_init_client(ptlrpc_connmgr, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
+ ptlrpc_init_client(ptlrpc_connmgr, ll_recover,
+ MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
&sbi->ll_mds_client);
sbi->ll_mds_conn = ptlrpc_uuid_to_connection("mds");
}
/* initialize committed transaction callback daemon */
- INIT_LIST_HEAD(&sbi->ll_commitcbd_not_committed);
spin_lock_init(&sbi->ll_commitcbd_lock);
init_waitqueue_head(&sbi->ll_commitcbd_waitq);
init_waitqueue_head(&sbi->ll_commitcbd_ctl_waitq);
ll_commitcbd_cleanup(sbi);
obd_disconnect(&sbi->ll_conn);
ptlrpc_put_connection(sbi->ll_mds_conn);
+ ptlrpc_cleanup_client(&sbi->ll_mds_client);
OBD_FREE(sb->u.generic_sbp, sizeof(*sbi));
MOD_DEC_USE_COUNT;
EXIT;
if (err)
CERROR("mdc_setattr fails (%d)\n", err);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
RETURN(err);
}
mds_unpack_rep_body(req);
body = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(rootfid, &body->fid1, sizeof(*rootfid));
- *last_committed = body->last_committed;
- *last_rcvd = body->last_rcvd;
+ *last_committed = req->rq_repmsg->last_committed;
+ *last_rcvd = req->rq_repmsg->last_rcvd;
*last_xid = body->last_xid;
- CDEBUG(D_NET, "root ino=%ld, last_committed=%ld, last_rcvd=%ld,"
+ CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_rcvd=%Lu,"
" last_xid=%d\n",
(unsigned long)rootfid->id,
- (unsigned long)body->last_committed,
- (unsigned long)body->last_rcvd,
+ (unsigned long long)*last_committed,
+ (unsigned long long)*last_rcvd,
body->last_xid);
}
if (!req)
GOTO(out, rc = -ENOMEM);
+ req->rq_flags |= PTL_RPC_FL_RETAIN;
body = lustre_msg_buf(req->rq_reqmsg, 0);
ll_ino2fid(&body->fid1, ino, 0, type);
body->flags = HTON__u32(flags);
ll_ino2fid(&body->fid1, ino, 0, type);
body->objid = fh;
- req->rq_replen = lustre_msg_size(1, &size);
+ req->rq_replen = lustre_msg_size(0, NULL);
rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
RETURN(-EINVAL);
}
- ptlrpc_init_client(NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
+ ptlrpc_init_client(NULL, NULL,
+ MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
conn = ptlrpc_uuid_to_connection("mds");
if (!conn) {
CERROR("cannot create client\n");
out:
ptlrpc_free_req(request);
ptlrpc_put_connection(conn);
+ ptlrpc_cleanup_client(&cl);
RETURN(err);
}
CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
mcd->mcd_uuid, mci->mci_off);
}
- /* Still not 100% sure whether we should reply with the server
- * last_rcvd or that of this client. I'm not sure it even makes
- * a difference on a per-client basis, because last_rcvd is global
- * and we are not supposed to allow transactions while in recovery.
- */
- body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
- body->last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
- //body->last_rcvd = mds->mds_last_rcvd;
- body->last_committed = mds->mds_last_committed;
- CDEBUG(D_INFO, "last_rcvd %ld, last_committed %ld, last_xid %d\n",
- (unsigned long)body->last_rcvd,
- (unsigned long)body->last_committed, body->last_xid);
+ body->last_xid = HTON__u32(mcd->mcd_last_xid);
mds_pack_rep_body(req);
RETURN(0);
}
body->mode = inode->i_mode;
body->nlink = inode->i_nlink;
body->valid = ~0;
- body->last_committed = mds->mds_last_committed;
mds_fs_get_objid(mds, inode, &body->objid);
l_dput(de);
RETURN(0);
body = lustre_msg_buf(req->rq_repmsg, 0);
body->objid = (__u64) (unsigned long)file;
- body->last_committed = mds->mds_last_committed;
RETURN(0);
}
file = (struct file *)(unsigned long)body->objid;
req->rq_status = filp_close(file, 0);
+
l_dput(de);
mntput(mnt);
int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
struct ptlrpc_request *req)
{
+ struct mds_obd *mds = &req->rq_obd->u.mds;
int rc;
ENTRY;
EXIT;
out:
+ /* Still not 100% sure whether we should reply with the server
+ * last_rcvd or that of this client. I'm not sure it even makes
+ * a difference on a per-client basis, because last_rcvd is global
+ * and we are not supposed to allow transactions while in recovery.
+ */
+ req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
+ req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed);
+ CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu\n",
+ (unsigned long long)mds->mds_last_rcvd,
+ (unsigned long long)mds->mds_last_committed);
if (rc) {
ptlrpc_error(svc, req);
} else {
*/
last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
mds->mds_last_rcvd = last_rcvd;
- CDEBUG(D_INODE, "got %Ld for server last_rcvd value\n",
+ CDEBUG(D_INODE, "got %Lu for server last_rcvd value\n",
(unsigned long long)last_rcvd);
last_mount = le64_to_cpu(msd->msd_mount_count);
mds->mds_mount_count = last_mount;
- CDEBUG(D_INODE, "got %Ld for server last_mount value\n",
+ CDEBUG(D_INODE, "got %Lu for server last_mount value\n",
(unsigned long long)last_mount);
for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
if (last_rcvd > mds->mds_last_rcvd) {
CDEBUG(D_OTHER,
- "client at offset %d has last_rcvd = %Ld\n",
+ "client at offset %d has last_rcvd = %Lu\n",
cl_off, (unsigned long long)last_rcvd);
mds->mds_last_rcvd = last_rcvd;
}
}
- CDEBUG(D_INODE, "got %Ld for highest last_rcvd value, %d clients\n",
+ CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d clients\n",
(unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
/* After recovery, there can be no local uncommitted transactions */
msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
- CDEBUG(D_SUPER, "MDS mount_count is %Ld, last_rcvd is %Ld\n",
+ CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
(unsigned long long)mds->mds_mount_count,
(unsigned long long)mds->mds_last_rcvd);
push_ctxt(&saved, &mds->mds_ctxt);
off = MDS_LR_CLIENT + mci->mci_off * MDS_LR_SIZE;
++mds->mds_last_rcvd; /* lock this, or make it an LDLM function? */
+ req->rq_repmsg->transno = HTON__u64(mds->mds_last_rcvd);
mci->mci_mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
mci->mci_mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
mci->mci_mcd->mcd_last_xid = cpu_to_le32(req->rq_reqmsg->xid);
body = lustre_msg_buf(req->rq_repmsg, 0);
body->ino = inode->i_ino;
body->generation = inode->i_generation;
- body->last_rcvd = mds->mds_last_rcvd;
- body->last_committed = mds->mds_last_committed;
}
out_create_commit:
}
rc = reinters[rec->ur_opcode](rec, req);
+
return rc;
}
if (bulk[pages] == NULL)
continue;
kunmap(buf[pages]);
- OBD_FREE(bulk[pages], sizeof(**bulk));
+ ptlrpc_free_bulk(bulk[pages]);
}
}
if (osc->osc_ldlm_client == NULL)
GOTO(out_client, rc = -ENOMEM);
- ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
+ ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
osc->osc_client);
- ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
osc->osc_ldlm_client);
MOD_INC_USE_COUNT;
{
struct osc_obd *osc = &obddev->u.osc;
+ ptlrpc_cleanup_client(osc->osc_client);
OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+ ptlrpc_cleanup_client(osc->osc_ldlm_client);
OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
ptlrpc_put_connection(osc->osc_conn);
modulefs_DATA = ptlrpc.o
EXTRA_PROGRAMS = ptlrpc
-ptlrpc_SOURCES = recovd.c connection.c rpc.c events.c service.c client.c niobuf.c pack_generic.c
+ptlrpc_SOURCES = connmgr.c recovd.c connection.c rpc.c events.c service.c client.c niobuf.c pack_generic.c
include $(top_srcdir)/Rules
#include <linux/lustre_ha.h>
-void ptlrpc_init_client(struct recovd_obd *recovd, int req_portal,
+void ptlrpc_init_client(struct recovd_obd *recovd,
+ void (*recover)(struct ptlrpc_client *recover),
+ int req_portal,
int rep_portal, struct ptlrpc_client *cl)
{
memset(cl, 0, sizeof(*cl));
cl->cli_recovd = recovd;
+ cl->cli_recover = recover;
if (recovd)
- connmgr_cli_manage(recovd, cl);
+ recovd_cli_manage(recovd, cl);
cl->cli_obd = NULL;
cl->cli_request_portal = req_portal;
cl->cli_reply_portal = rep_portal;
INIT_LIST_HEAD(&cl->cli_sending_head);
INIT_LIST_HEAD(&cl->cli_sent_head);
+ INIT_LIST_HEAD(&cl->cli_replied_head);
+ INIT_LIST_HEAD(&cl->cli_replay_head);
spin_lock_init(&cl->cli_lock);
sema_init(&cl->cli_rpc_sem, 32);
}
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk)
{
- if (bulk == NULL)
+ ENTRY;
+ if (bulk == NULL) {
+ EXIT;
return;
+ }
ptlrpc_put_connection(bulk->b_connection);
OBD_FREE(bulk, sizeof(*bulk));
+ EXIT;
}
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
RETURN(NULL);
}
- request->rq_time = CURRENT_TIME;
request->rq_type = PTL_RPC_TYPE_REQUEST;
request->rq_connection = ptlrpc_connection_addref(conn);
request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
INIT_LIST_HEAD(&request->rq_list);
+ /* this will be dec()d once in req_finished, once in free_committed */
+ atomic_set(&request->rq_refcount, 2);
+
spin_lock(&conn->c_lock);
request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
spin_unlock(&conn->c_lock);
RETURN(request);
}
+void ptlrpc_req_finished(struct ptlrpc_request *request)
+{
+ if (request == NULL)
+ return;
+
+ if (request->rq_repmsg != NULL) {
+ OBD_FREE(request->rq_repmsg, request->rq_replen);
+ request->rq_repmsg = NULL;
+ }
+
+ if (atomic_dec_and_test(&request->rq_refcount))
+ ptlrpc_free_req(request);
+}
+
void ptlrpc_free_req(struct ptlrpc_request *request)
{
if (request == NULL)
if (request->rq_repmsg != NULL)
OBD_FREE(request->rq_repmsg, request->rq_replen);
+ if (request->rq_reqmsg != NULL)
+ OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
if (request->rq_client) {
spin_lock(&request->rq_client->cli_lock);
{
int rc = 0;
- schedule_timeout(3 * HZ); /* 3 second timeout */
if (req->rq_repmsg != NULL) {
+ req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
req->rq_flags |= PTL_RPC_FL_REPLY;
GOTO(out, rc = 1);
}
- if (CURRENT_TIME - req->rq_time >= 3) {
+ if (req->rq_flags & PTL_RPC_FL_RESEND) {
+ CERROR("-- RESEND --\n");
+ req->rq_status = -EAGAIN;
+ GOTO(out, rc = 1);
+ }
+
+ if (CURRENT_TIME - req->rq_time >= req->rq_timeout) {
CERROR("-- REQ TIMEOUT --\n");
+ /* clear the timeout */
+ req->rq_timeout = 0;
req->rq_flags |= PTL_RPC_FL_TIMEOUT;
if (req->rq_client && req->rq_client->cli_recovd)
- connmgr_cli_fail(req->rq_client);
- return 0;
+ recovd_cli_fail(req->rq_client);
+ GOTO(out, rc = 0);
+ }
+
+ if (req->rq_timeout) {
+ schedule_timeout(req->rq_timeout * HZ);
}
if (sigismember(&(current->pending.signal), SIGKILL) ||
return 0;
}
+/* caller must lock cli */
+void ptlrpc_free_committed(struct ptlrpc_client *cli)
+{
+ struct list_head *tmp, *saved;
+ struct ptlrpc_request *req;
+
+ list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+
+ /* not yet committed */
+ if (req->rq_transno > cli->cli_last_committed)
+ break;
+
+ /* retain for replay if flagged */
+ if (req->rq_flags & PTL_RPC_FL_RETAIN) {
+ list_del(&req->rq_list);
+ list_add(&req->rq_list, &cli->cli_replay_head);
+ } else {
+ CDEBUG(D_INFO, "Marking request %p as committed ("
+ "transno=%Lu, last_committed=%Lu\n", req,
+ req->rq_transno, cli->cli_last_committed);
+ if (atomic_dec_and_test(&req->rq_refcount))
+ ptlrpc_free_req(req);
+ }
+ }
+
+ EXIT;
+ return;
+}
+
+void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
+{
+ struct list_head *tmp, *saved;
+ struct ptlrpc_request *req;
+ ENTRY;
+
+ spin_lock(&cli->cli_lock);
+ list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ /* We do this to prevent ptlrpc_free_req from taking cli_lock */
+ CDEBUG(D_INFO, "Cleaning req %p from replied head.\n", req);
+ list_del(&req->rq_list);
+ req->rq_client = NULL;
+ ptlrpc_free_req(req);
+ }
+ list_for_each_safe(tmp, saved, &cli->cli_sent_head) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ CDEBUG(D_INFO, "Cleaning req %p from sent head.\n", req);
+ list_del(&req->rq_list);
+ req->rq_client = NULL;
+ ptlrpc_free_req(req);
+ }
+ list_for_each_safe(tmp, saved, &cli->cli_replay_head) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ CERROR("Request %p is on the replay head at cleanup!\n", req);
+ list_del(&req->rq_list);
+ req->rq_client = NULL;
+ ptlrpc_free_req(req);
+ }
+ list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ CDEBUG(D_INFO, "Cleaning req %p from sending head.\n", req);
+ list_del(&req->rq_list);
+ req->rq_client = NULL;
+ ptlrpc_free_req(req);
+ }
+ spin_unlock(&cli->cli_lock);
+ EXIT;
+ return;
+}
+
int ptlrpc_queue_wait(struct ptlrpc_request *req)
{
int rc = 0;
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
-
+ resend:
+ req->rq_time = CURRENT_TIME;
+ req->rq_timeout = 3;
rc = ptl_send_rpc(req);
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
CDEBUG(D_OTHER, "-- sleeping\n");
wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
CDEBUG(D_OTHER, "-- done\n");
- ptlrpc_cleanup_request_buf(req);
+
+ if (req->rq_flags & PTL_RPC_FL_RESEND) {
+ req->rq_flags &= ~PTL_RPC_FL_RESEND;
+ goto resend;
+ }
+
+ //ptlrpc_cleanup_request_buf(req);
up(&req->rq_client->cli_rpc_sem);
if (req->rq_flags & PTL_RPC_FL_INTR) {
/* Clean up the dangling reply buffers */
GOTO(out, rc);
}
CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
-
if (req->rq_repmsg->status == 0)
CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
req->rq_replen, req->rq_repmsg->status);
+ spin_lock(&req->rq_client->cli_lock);
+ /* add to the tail of the replied head */
+ list_del(&req->rq_list);
+ list_add(&req->rq_list, req->rq_client->cli_replied_head.prev);
+
+ req->rq_client->cli_last_rcvd = req->rq_repmsg->last_rcvd;
+ req->rq_client->cli_last_committed = req->rq_repmsg->last_committed;
+ ptlrpc_free_committed(req->rq_client);
+ spin_unlock(&req->rq_client->cli_lock);
+
EXIT;
out:
return rc;
RETURN(rc);
}
+void ptlrpc_resend_req(struct ptlrpc_request *req)
+{
+ ENTRY;
+ req->rq_flags |= PTL_RPC_FL_RESEND;
+ req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
+ wake_up_interruptible(&req->rq_wait_for_rep);
+ EXIT;
+ return;
+}
+
int ptl_send_rpc(struct ptlrpc_request *request)
{
ptl_process_id_t local_id;
CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
request->rq_replen, request->rq_reqmsg->xid,
- request->rq_client->cli_request_portal);
+ request->rq_client->cli_reply_portal);
spin_lock(&request->rq_client->cli_lock);
list_add(&request->rq_list, &request->rq_client->cli_sending_head);
struct recovd_obd *ptlrpc_connmgr;
-void connmgr_cli_manage(struct recovd_obd *recovd, struct ptlrpc_client *cli)
+void recovd_cli_manage(struct recovd_obd *recovd, struct ptlrpc_client *cli)
{
ENTRY;
cli->cli_recovd = recovd;
spin_lock(&recovd->recovd_lock);
- list_add(&cli->cli_ha_item, &recovd->recovd_connections_lh);
+ list_add(&cli->cli_ha_item, &recovd->recovd_clients_lh);
spin_unlock(&recovd->recovd_lock);
EXIT;
}
-void connmgr_cli_fail(struct ptlrpc_client *cli)
+void recovd_cli_fail(struct ptlrpc_client *cli)
{
ENTRY;
spin_lock(&cli->cli_recovd->recovd_lock);
- cli->cli_recovd->recovd_flags |= SVC_HA_EVENT;
+ cli->cli_recovd->recovd_flags |= RECOVD_FAIL;
+ cli->cli_recovd->recovd_wakeup_flag = 1;
list_del(&cli->cli_ha_item);
list_add(&cli->cli_ha_item, &cli->cli_recovd->recovd_troubled_lh);
spin_unlock(&cli->cli_recovd->recovd_lock);
EXIT;
}
-static int connmgr_upcall(void)
+void recovd_cli_fixed(struct ptlrpc_client *cli)
+{
+ ENTRY;
+ list_del(&cli->cli_ha_item);
+ list_add(&cli->cli_ha_item, &cli->cli_recovd->recovd_clients_lh);
+ EXIT;
+}
+
+
+static int recovd_upcall(void)
{
char *argv[2];
char *envp[3];
return call_usermodehelper(argv[0], argv, envp);
}
-static int connmgr_unpack_body(struct ptlrpc_request *req)
-{
- struct connmgr_body *b = lustre_msg_buf(req->rq_repmsg, 0);
- if (b == NULL) {
- LBUG();
- RETURN(-EINVAL);
- }
-
- b->generation = NTOH__u32(b->generation);
-
- return 0;
-}
-
-int connmgr_connect(struct recovd_obd *recovd, struct ptlrpc_connection *conn)
-{
- struct ptlrpc_request *req;
- struct ptlrpc_client *cl;
- struct connmgr_body *body;
- int rc, size = sizeof(*body);
- ENTRY;
-
- if (!recovd) {
- CERROR("no manager\n");
- LBUG();
- GOTO(out, rc = -EINVAL);
- }
- cl = recovd->recovd_client;
-
- req = ptlrpc_prep_req(cl, conn, CONNMGR_CONNECT, 1, &size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
-
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->generation = HTON__u32(conn->c_generation);
- body->conn = (__u64)(unsigned long)conn;
- body->conn_token = conn->c_token;
- strncpy(body->conn_uuid, conn->c_local_uuid, sizeof(body->conn_uuid));
-
- req->rq_replen = lustre_msg_size(1, &size);
-
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- if (!rc) {
- rc = connmgr_unpack_body(req);
- if (rc)
- GOTO(out_free, rc);
- body = lustre_msg_buf(req->rq_repmsg, 0);
- CDEBUG(D_NET, "remote generation: %o\n", body->generation);
- conn->c_level = LUSTRE_CONN_CON;
- conn->c_remote_conn = body->conn;
- conn->c_remote_token = body->conn_token;
- strncpy(conn->c_remote_uuid, body->conn_uuid,
- sizeof(conn->c_remote_uuid));
- }
-
-out_free:
- ptlrpc_free_req(req);
-out:
- RETURN(rc);
-}
-
-static int connmgr_handle_connect(struct ptlrpc_request *req)
-{
- struct connmgr_body *body;
- int rc, size = sizeof(*body);
- ENTRY;
-
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc) {
- CERROR("connmgr: out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
- }
-
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- rc = connmgr_unpack_body(req);
- if (rc) {
- req->rq_status = rc;
- RETURN(0);
- }
-
- req->rq_connection->c_remote_conn = body->conn;
- req->rq_connection->c_remote_token = body->conn_token;
- strncpy(req->rq_connection->c_remote_uuid, body->conn_uuid,
- sizeof(req->rq_connection->c_remote_uuid));
-
- CERROR("incoming generation %d\n", body->generation);
- body = lustre_msg_buf(req->rq_repmsg, 0);
- body->generation = 4711;
- body->conn = (__u64)(unsigned long)req->rq_connection;
- body->conn_token = req->rq_connection->c_token;
- strncpy(body->conn_uuid, req->rq_connection->c_local_uuid,
- sizeof(body->conn_uuid));
-
- req->rq_connection->c_level = LUSTRE_CONN_CON;
- RETURN(0);
-}
-
-int connmgr_handle(struct obd_device *dev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
-{
- int rc;
- ENTRY;
-
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc) {
- CERROR("Invalid request\n");
- GOTO(out, rc);
- }
-
- if (req->rq_reqmsg->type != NTOH__u32(PTL_RPC_MSG_REQUEST)) {
- CERROR("wrong packet type sent %d\n",
- req->rq_reqmsg->type);
- GOTO(out, rc = -EINVAL);
- }
-
- switch (req->rq_reqmsg->opc) {
- case CONNMGR_CONNECT:
- CDEBUG(D_INODE, "connmgr connect\n");
- rc = connmgr_handle_connect(req);
- break;
-
- default:
- rc = ptlrpc_error(svc, req);
- RETURN(rc);
- }
-
- EXIT;
-out:
- if (rc) {
- ptlrpc_error(svc, req);
- } else {
- CDEBUG(D_NET, "sending reply\n");
- ptlrpc_reply(svc, req);
- }
-
- return 0;
-}
-
static int recovd_check_event(struct recovd_obd *recovd)
{
int rc = 0;
spin_lock(&recovd->recovd_lock);
- if (!(recovd->recovd_flags & MGR_WORKING) &&
- !list_empty(&recovd->recovd_troubled_lh)) {
-
- CERROR("connection in trouble - state: WORKING, upcall\n");
- recovd->recovd_flags = MGR_WORKING;
-
- recovd->recovd_waketime = CURRENT_TIME;
- recovd->recovd_timeout = 5 * HZ;
+ recovd->recovd_waketime = CURRENT_TIME;
+ if (recovd->recovd_timeout)
schedule_timeout(recovd->recovd_timeout);
- }
- if (recovd->recovd_flags & MGR_WORKING &&
- CURRENT_TIME <= recovd->recovd_waketime + recovd->recovd_timeout) {
- CERROR("WORKING: new event\n");
+ if (recovd->recovd_wakeup_flag) {
+ CERROR("service woken\n");
+ GOTO(out, rc = 1);
+ }
- recovd->recovd_waketime = CURRENT_TIME;
- schedule_timeout(recovd->recovd_timeout);
+ if (recovd->recovd_timeout &&
+ CURRENT_TIME > recovd->recovd_waketime + recovd->recovd_timeout) {
+ recovd->recovd_flags |= RECOVD_TIMEOUT;
+ CERROR("timeout\n");
+ GOTO(out, rc = 1);
}
- if (recovd->recovd_flags & MGR_STOPPING) {
- CERROR("ha mgr stopping\n");
+ if (recovd->recovd_flags & RECOVD_STOPPING) {
+ CERROR("recovd stopping\n");
rc = 1;
}
+ out:
+ recovd->recovd_wakeup_flag = 0;
spin_unlock(&recovd->recovd_lock);
RETURN(rc);
}
static int recovd_handle_event(struct recovd_obd *recovd)
{
+ ENTRY;
spin_lock(&recovd->recovd_lock);
- if (!(recovd->recovd_flags & MGR_WORKING) &&
- !list_empty(&recovd->recovd_troubled_lh)) {
-
- CERROR("connection in trouble - state: WORKING, upcall\n");
- recovd->recovd_flags = MGR_WORKING;
+ if (!(recovd->recovd_flags & RECOVD_UPCALL_WAIT) &&
+ recovd->recovd_flags & RECOVD_FAIL) {
+ CERROR("client in trouble: flags -> UPCALL_WAITING\n");
+ recovd->recovd_flags |= RECOVD_UPCALL_WAIT;
- connmgr_upcall();
+ recovd_upcall();
recovd->recovd_waketime = CURRENT_TIME;
- recovd->recovd_timeout = 5 * HZ;
+ recovd->recovd_timeout = 10 * HZ;
schedule_timeout(recovd->recovd_timeout);
}
- if (recovd->recovd_flags & MGR_WORKING &&
- CURRENT_TIME <= recovd->recovd_waketime + recovd->recovd_timeout) {
- CERROR("WORKING: new event\n");
+ if (recovd->recovd_flags & RECOVD_TIMEOUT) {
+ CERROR("timeout - no news from upcall?\n");
+ recovd->recovd_flags &= ~RECOVD_TIMEOUT;
+ }
- recovd->recovd_waketime = CURRENT_TIME;
- schedule_timeout(recovd->recovd_timeout);
+ if (recovd->recovd_flags & RECOVD_UPCALL_ANSWER) {
+ struct list_head *tmp, *pos;
+ CERROR("UPCALL_WAITING: upcall answer\n");
+ CERROR("** fill me in with recovery\n");
+
+ list_for_each_safe(tmp, pos, &recovd->recovd_troubled_lh) {
+ struct ptlrpc_client *cli = list_entry
+ (tmp, struct ptlrpc_client, cli_ha_item);
+
+ list_del(&cli->cli_ha_item);
+ spin_unlock(&recovd->recovd_lock);
+ if (cli->cli_recover)
+ cli->cli_recover(cli);
+ spin_lock(&recovd->recovd_lock);
+ }
+
+ recovd->recovd_timeout = 0;
+ recovd->recovd_flags = RECOVD_IDLE;
}
spin_unlock(&recovd->recovd_lock);
- return 0;
+ RETURN(0);
}
static int recovd_main(void *arg)
/* Record that the thread is running */
recovd->recovd_thread = current;
- recovd->recovd_flags = MGR_RUNNING;
+ recovd->recovd_flags = RECOVD_IDLE;
wake_up(&recovd->recovd_ctl_waitq);
/* And now, loop forever on requests */
recovd_check_event(recovd));
spin_lock(&recovd->recovd_lock);
- if (recovd->recovd_flags & MGR_STOPPING) {
+ if (recovd->recovd_flags & RECOVD_STOPPING) {
spin_unlock(&recovd->recovd_lock);
- CERROR("lustre_hamgr quitting\n");
+ CERROR("lustre_recovd stopping\n");
EXIT;
break;
}
}
recovd->recovd_thread = NULL;
- recovd->recovd_flags = MGR_STOPPED;
+ recovd->recovd_flags = RECOVD_STOPPED;
wake_up(&recovd->recovd_ctl_waitq);
CDEBUG(D_NET, "mgr exiting process %d\n", current->pid);
RETURN(0);
int rc;
ENTRY;
- INIT_LIST_HEAD(&recovd->recovd_connections_lh);
+ INIT_LIST_HEAD(&recovd->recovd_clients_lh);
INIT_LIST_HEAD(&recovd->recovd_troubled_lh);
spin_lock_init(&recovd->recovd_lock);
CERROR("cannot start thread\n");
RETURN(-EINVAL);
}
- wait_event(recovd->recovd_ctl_waitq, recovd->recovd_flags & MGR_RUNNING);
+ wait_event(recovd->recovd_ctl_waitq, recovd->recovd_flags & RECOVD_IDLE);
RETURN(0);
}
int recovd_cleanup(struct recovd_obd *recovd)
{
- recovd->recovd_flags = MGR_STOPPING;
-
+ spin_lock(&recovd->recovd_lock);
+ recovd->recovd_flags = RECOVD_STOPPING;
wake_up(&recovd->recovd_waitq);
+ spin_unlock(&recovd->recovd_lock);
+
wait_event_interruptible(recovd->recovd_ctl_waitq,
- (recovd->recovd_flags & MGR_STOPPED));
+ (recovd->recovd_flags & RECOVD_STOPPED));
RETURN(0);
}
GOTO(err_recovd, err = -EINVAL);
}
- ptlrpc_init_client(NULL, CONNMGR_REQUEST_PORTAL,
+ ptlrpc_init_client(NULL, NULL, CONNMGR_REQUEST_PORTAL,
CONNMGR_REPLY_PORTAL, recovd->recovd_client);
err = ptlrpc_start_thread(obddev, recovd->recovd_service, "lustre_connmgr");
}
OBD_FREE(recovd->recovd_service, sizeof(*recovd->recovd_service));
- recovd->recovd_flags = MGR_STOPPING;
-
+ ptlrpc_cleanup_client(recovd->recovd_client);
OBD_FREE(recovd->recovd_client, sizeof(*recovd->recovd_client));
MOD_DEC_USE_COUNT;
RETURN(0);
}
+
+int connmgr_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
+ void *uarg)
+{
+ struct recovd_obd *recovd = &conn->oc_dev->u.recovd;
+
+ ENTRY;
+ if (cmd == OBD_RECOVD_NEWCONN) {
+ spin_lock(&recovd->recovd_lock);
+ recovd->recovd_flags |= RECOVD_UPCALL_ANSWER;
+ recovd->recovd_wakeup_flag = 1;
+ wake_up(&recovd->recovd_waitq);
+ spin_unlock(&recovd->recovd_lock);
+ EXIT;
+ }
+ return 0;
+}
+
+
/* use obd ops to offer management infrastructure */
static struct obd_ops recovd_obd_ops = {
o_setup: connmgr_setup,
o_cleanup: connmgr_cleanup,
+ o_iocontrol: connmgr_iocontrol,
};
static int __init ptlrpc_init(void)
new_fs ext2 /tmp/ost 10000
OST=$LOOPDEV
-MDSFS=ext2
+MDSFS=ext3
new_fs ${MDSFS} /tmp/mds 10000
MDS=$LOOPDEV
echo 0xffffffff > /proc/sys/portals/debug
$OBDCTL <<EOF
-device 0
+newdev
attach mds MDSDEV
setup ${MDS} ${MDSFS}
-device 1
+newdev
attach obdext2 OBDDEV
setup ${OST}
-device 2
+newdev
attach ost OSTDEV
setup \$OBDDEV
-device 3
-attach ptlrpc RPCDEV
-setup
-device 4
+newdev
attach ldlm LDLMDEV
setup
-device 5
+newdev
attach osc OSCDEV
setup -1
quit
int fd, rc;
if (argc != 2) {
- fprintf(stderr, "usage: %s filename\n", argv[1]);
+ fprintf(stderr, "usage: %s filename\n", argv[0]);
exit(1);
} else {
fprintf(stderr, "congratulations - program starting\n");
echo 0xffffffff > /proc/sys/portals/debug
$OBDCTL <<EOF
-device 0
+newdev
attach mds MDSDEV
setup ${MDS} ${MDSFS}
-device 1
-attach obdfilter OBDDEV
+newdev
+attach obdext2 OBDDEV
setup ${OST}
-device 2
+newdev
attach ost OSTDEV
-setup 1
-device 3
-attach ptlrpc RPCDEV
-setup
-device 4
+setup \$OBDDEV
+newdev
attach ldlm LDLMDEV
setup
-device 5
+newdev
attach osc OSCDEV
setup -1
quit
EOF
-MNT='mount -t lustre_lite -o device=`$OBDCTL name2dev OSCDEV` none /mnt/lustre'
+MNT="mount -t lustre_lite -o device=`$OBDCTL name2dev OSCDEV` none /mnt/lustre"
$MNT
test_fail() {
echo "Cleaning up and restarting MDS"
umount /mnt/lustre || fail "unable to unmount"
$OBDCTL <<- EOF
- device 0
+ name2dev MDSDEV
cleanup
detach
quit
echo 0 > /proc/sys/lustre/fail_loc
$OBDCTL <<- EOF
- device 0
- attach mds
+ newdev
+ attach mds MDSDEV
setup ${MDS} ${MDSFS}
quit
EOF
echo -n `date` >> /tmp/halog
echo "- please supply a new mds" >> /tmp/halog
+
+echo "- suppose we have a new one" >> /tmp/halog
+sleep 1
+
+/usr/src/obd/utils/obdctl <<EOF
+name2dev RPCDEV
+newconn
+EOF
+
return rc;
}
+static int jt_newconn(int argc, char **argv)
+{
+ struct obd_ioctl_data data;
+ int rc;
+
+ IOCINIT(data);
+ if (argc != 1) {
+ fprintf(stderr, "usage: %s\n", cmdname(argv[0]));
+ return -1;
+ }
+
+ rc = ioctl(fd, OBD_RECOVD_NEWCONN , &data);
+ if (rc < 0)
+ fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
+ strerror(rc = errno));
+
+ return rc;
+}
+
command_t cmdlist[] = {
/* Metacommands */
{"--device", jt__device, 0, "--device <devno> <command [args ...]>"},
{"destroy", jt_destroy, 0, "destroy <id>"},
{"getattr", jt_getattr, 0, "getattr <id>"},
{"setattr", jt_setattr, 0, "setattr <id> <mode>"},
+ {"newconn", jt_newconn, 0, "newconn [newuuid]"},
{"test_getattr", jt_test_getattr, 0, "test_getattr <count> [verbose]"},
{"test_brw", jt_test_brw, 0, "test_brw <count> [write [verbose]]"},
{"test_ldlm", jt_test_ldlm, 0, "test lock manager (no args)"},