#define MGR_WORKING 32
#define MGR_SIGNAL 64
-struct lustre_ha_mgr {
- __u32 mgr_flags;
- struct task_struct *mgr_thread;
- wait_queue_head_t mgr_waitq;
- wait_queue_head_t mgr_ctl_waitq;
- spinlock_t mgr_lock;
- time_t mgr_waketime;
- struct list_head mgr_connections_lh; /* connections managed by the mgr */
- struct list_head mgr_troubled_lh; /* connections in trouble */
+#define LUSTRE_HA_NAME "ptlrpc"
+
+#define CONNMGR_CONNECT 1
+
+extern struct connmgr_obd *ptlrpc_connmgr;
+
+struct connmgr_thread {
+ struct connmgr_obd *mgr;
+ char *name;
};
-struct lustre_ha_thread {
- char *name;
- struct lustre_ha_mgr *mgr;
- struct obd_device *dev;
-};
-int llite_ha_cleanup(struct lustre_ha_mgr *mgr);
-struct lustre_ha_mgr *llite_ha_setup(void);
-void llite_ha_conn_fail(struct ptlrpc_client *cli);
-void llite_ha_conn_manage(struct lustre_ha_mgr *mgr, struct ptlrpc_client *cli);
+struct connmgr_body {
+ __u32 generation;
+};
+int connmgr_connect(struct connmgr_obd *mgr, struct ptlrpc_connection *cn);
+void connmgr_cli_fail(struct ptlrpc_client *cli);
+void connmgr_cli_manage(struct connmgr_obd *mgr, struct ptlrpc_client *cli);
#endif
#define MDS_CLOSE 3
#define MDS_REINT 4
#define MDS_READPAGE 5
+#define MDS_CONNECT 6
#define REINT_SETATTR 1
#define REINT_CREATE 2
#include <linux/lustre_ha.h>
#include <linux/obdo.h>
-#define LUSTRE_LITE_NAME "llite"
-
extern kmem_cache_t *ll_file_data_slab;
struct ll_file_data {
__u64 fd_mdshandle;
};
#define LL_SUPER_MAGIC 0x0BD00BD0;
+
+#define LL_COMMITCBD_STOPPING 0x1
+#define LL_COMMITCBD_STOPPED 0x2
+#define LL_COMMITCBD_RUNNING 0x4
+
struct ll_sb_info {
- struct list_head ll_list; /* list of supers */
- struct obd_conn ll_conn;
- struct super_block *ll_super;
- ino_t ll_rootino; /* number of root inode */
- int ll_minor; /* minor of /dev/obdX */
- struct list_head ll_inodes; /* list of dirty inodes */
- unsigned long ll_cache_count;
- struct semaphore ll_list_mutex;
- struct ptlrpc_client ll_mds_client;
+ struct obd_conn ll_conn;
+ ino_t ll_rootino; /* number of root inode */
+ struct ptlrpc_client ll_mds_client;
struct ptlrpc_connection *ll_mds_conn;
- struct ptlrpc_client ll_ost_client;
- struct lustre_ha_mgr *ll_ha_mgr;
+ struct ptlrpc_client ll_ost_client;
struct ptlrpc_connection *ll_ost_conn;
+
+ struct list_head ll_commitcbd_not_committed;
+ wait_queue_head_t ll_commitcbd_waitq;
+ wait_queue_head_t ll_commitcbd_ctl_waitq;
+ int ll_commitcbd_flags;
+ struct task_struct *ll_commitcbd_thread;
+ time_t ll_commitcbd_waketime;
+ time_t ll_commitcbd_timeout;
+ spinlock_t ll_commitcbd_lock;
};
void ll_sysctl_init(void);
void ll_sysctl_clean(void);
-
-
-static inline struct list_head *ll_slist(struct inode *inode)
-{
- struct ll_sb_info *sbi = ll_i2sbi(inode);
-
- return &sbi->ll_inodes;
-}
-
#endif
* FOO_BULK_PORTAL is for incoming bulk on the FOO
*/
-#define OSC_REQUEST_PORTAL 1
-#define OSC_REPLY_PORTAL 2
-#define OSC_BULK_PORTAL 3
-
-#define OST_REQUEST_PORTAL 4
-#define OST_REPLY_PORTAL 5
-#define OST_BULK_PORTAL 6
-
-#define MDC_REQUEST_PORTAL 7
-#define MDC_REPLY_PORTAL 8
-#define MDC_BULK_PORTAL 9
-
-#define MDS_REQUEST_PORTAL 10
-#define MDS_REPLY_PORTAL 11
-#define MDS_BULK_PORTAL 12
-
-#define LDLM_REQUEST_PORTAL 13
-#define LDLM_REPLY_PORTAL 14
+#define CONNMGR_REQUEST_PORTAL 1
+#define CONNMGR_REPLY_PORTAL 2
+#define OSC_REQUEST_PORTAL 3
+#define OSC_REPLY_PORTAL 4
+#define OSC_BULK_PORTAL 5
+#define OST_REQUEST_PORTAL 6
+#define OST_REPLY_PORTAL 7
+#define OST_BULK_PORTAL 8
+#define MDC_REQUEST_PORTAL 9
+#define MDC_REPLY_PORTAL 10
+#define MDC_BULK_PORTAL 11
+#define MDS_REQUEST_PORTAL 12
+#define MDS_REPLY_PORTAL 13
+#define MDS_BULK_PORTAL 14
+#define LDLM_REQUEST_PORTAL 15
+#define LDLM_REPLY_PORTAL 16
+#define LDLM_CLI_REQUEST_PORTAL 17
+#define LDLM_CLI_REPLY_PORTAL 18
/* default rpc ring length */
#define RPC_RING_LENGTH 2
#define SVC_RUNNING 2
#define SVC_STOPPED 4
#define SVC_KILLED 8
-#define SVC_EVENT 16
-#define SVC_LIST 32
-#define SVC_SIGNAL 64
+#define SVC_EVENT 16
+#define SVC_HA_EVENT 32
+#define SVC_SIGNAL 64
struct ptlrpc_connection {
struct list_head c_link;
atomic_t c_refcount;
__u64 c_token;
-
__u64 c_remote_conn;
__u64 c_remote_token;
};
struct ptlrpc_client {
struct obd_device *cli_obd;
- struct list_head cli_sending_head;
- struct list_head cli_sent_head;
__u32 cli_request_portal;
__u32 cli_reply_portal;
struct semaphore cli_rpc_sem;
+ struct list_head cli_sending_head;
+ struct list_head cli_sent_head;
struct list_head cli_ha_item;
- struct lustre_ha_mgr *cli_ha_mgr;
+ struct connmgr_obd *cli_ha_mgr;
};
/* These do double-duty in rq_type and rq_flags */
int rq_bulklen;
time_t rq_time;
- void * rq_reply_handle;
+ // void * rq_reply_handle;
wait_queue_head_t rq_wait_for_rep;
/* incoming reply */
};
struct ptlrpc_service {
+ time_t srv_time;
+ time_t srv_timeout;
+
/* incoming request buffers */
/* FIXME: perhaps a list of EQs, if multiple NIs are used? */
char *srv_buf[RPC_RING_LENGTH];
void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
/* rpc/client.c */
-void ptlrpc_init_client(struct lustre_ha_mgr *, int req_portal, int rep_portal,
+void ptlrpc_init_client(struct connmgr_obd *, int req_portal, int rep_portal,
struct ptlrpc_client *);
-struct ptlrpc_connection *ptlrpc_connect_client(char *uuid);
+struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid);
int ptlrpc_queue_wait(struct ptlrpc_request *req);
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
struct ptlrpc_connection *u, int opcode,
struct address_space_operations *eo_aops;
};
+struct connmgr_obd {
+ time_t mgr_waketime;
+ time_t mgr_timeout;
+ struct ptlrpc_service *mgr_service;
+ struct ptlrpc_client *mgr_client;
+ __u32 mgr_flags;
+ spinlock_t mgr_lock;
+ struct list_head mgr_connections_lh; /* connections managed by the mgr */
+ struct list_head mgr_troubled_lh; /* connections in trouble */
+ wait_queue_head_t mgr_ctl_waitq;
+ wait_queue_head_t mgr_waitq;
+ struct task_struct *mgr_thread;
+};
+
struct trace_obd {
struct obdtrace_opstats *stats;
};
struct osc_obd osc;
struct ldlm_obd ldlm;
struct echo_obd echo;
+ struct connmgr_obd mgr;
struct trace_obd trace;
#if 0
struct raid1_obd raid1;
ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
ldlm->ldlm_client);
- ldlm->ldlm_server_conn = ptlrpc_connect_client("ldlm");
+ ldlm->ldlm_server_conn = ptlrpc_uuid_to_connection("ldlm");
if (!ldlm->ldlm_server_conn) {
CERROR("cannot create client\n");
LBUG();
LINX=page.c
-llite_SOURCES = llite_ha.c page.c super.c rw.c file.c dir.c sysctl.c namei.c symlink.c
+llite_SOURCES = commit_callback.c page.c super.c rw.c file.c dir.c sysctl.c namei.c symlink.c
dist-hook:
list='$(LINX)'; for f in $$list; do rm -f $(distdir)/$$f; done
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * The daemon that causes completed but not committed transactions
+ * on the MDS to be flushed periodically when they are committed.
+ * A gratuitous getattr RPC is made to the MDS to discover the
+ * last committed record.
+ *
+ * Lustre High Availability Daemon
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ *
+ * by Peter Braam <braam@clusterfs.com>
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/version.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/stat.h>
+#include <linux/locks.h>
+#include <linux/kmod.h>
+#include <linux/quotaops.h>
+#include <asm/unistd.h>
+#include <asm/uaccess.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <linux/lustre_lite.h>
+#include <linux/lustre_lib.h>
+
+static int ll_commitcbd_check_event(struct ll_sb_info *sbi)
+{
+ int rc = 0;
+ ENTRY;
+
+ spin_lock(&sbi->ll_commitcbd_lock);
+ if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) {
+ GOTO(out, rc = 1);
+ }
+
+ if (!list_empty(&sbi->ll_commitcbd_not_committed))
+ GOTO(out, rc = 1);
+
+ out:
+ spin_unlock(&mgr->mgr_lock);
+ RETURN(rc);
+}
+
+static int ll_commitcbd_main(void *arg)
+{
+ struct ll_sb_info *sbi = (struct ll_sb_info *)arg;
+
+ ENTRY;
+
+ lock_kernel();
+ daemonize();
+ spin_lock_irq(¤t->sigmask_lock);
+ sigfillset(¤t->blocked);
+ recalc_sigpending(current);
+ spin_unlock_irq(¤t->sigmask_lock);
+
+ sprintf(current->comm, "lustre_commitcbd");
+
+ /* Record that the thread is running */
+ sbi->ll_commitcbd_waketime = CURRENT_TIME;
+ sbi->ll_commitcbd_timeout = 10 * HZ;
+ sbi->ll_commitcbd_thread = current;
+ sbi->ll_commitcbd_flags = LL_COMMITCBD_RUNNING;
+ wake_up(&sbi->ll_commitcbd_ctl_waitq);
+
+ /* And now, loop forever on requests */
+ while (1) {
+ wait_event_interruptible
+ (sbi->ll_commitcbd_waitq,
+ ll_commitcbd_check_event(sbi));
+
+ spin_lock(&sbi->ll_commitcbd_lock);
+ if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) {
+ spin_unlock(&sbi->ll_commitcbd_lock);
+ CERROR("lustre_hamgr quitting\n");
+ EXIT;
+ break;
+ }
+
+ schedule_timeout(sbi->ll_commitcbd_timeout);
+ CERROR("commit callback daemon woken up - FIXME\n");
+ spin_unlock(&mgr->mgr_lock);
+ }
+
+ sbi->ll_commitcbd_thread = NULL;
+ sbi->ll_commitcbd_flags = LL_COMMITCBD_STOPPED;
+ wake_up(&sbi->ll_commitcbd_ctl_waitq);
+ CDEBUG(D_NET, "commit callback daemon exiting %d\n", current->pid);
+ RETURN(0);
+}
+
+int ll_commitcbd_setup(struct ll_sb_info *sbi)
+{
+ int rc;
+ ENTRY;
+
+ rc = kernel_thread(ll_commitcbd_main, (void *) sbi,
+ CLONE_VM | CLONE_FS | CLONE_FILES);
+ if (rc < 0) {
+ CERROR("cannot start thread\n");
+ RETURN(rc);
+ }
+ wait_event(sbi->ll_commitcbd_ctl_waitq,
+ sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING);
+ RETURN(0);
+}
+
+
+int ll_commitcbd_cleanup(struct ll_sb_info *sbi)
+{
+ sbi->ll_commitcbd_flags = LL_COMMITCBD_STOPPING;
+
+ wake_up(&sbi->ll_commitcbd_waitq);
+ wait_event_interruptible
+ (sbi->ll_commitcbd_ctl_waitq,
+ sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPED);
+ RETURN(0);
+}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * linux/mds/handler.c
- *
- * Lustre High Availability Daemon
- *
- * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
- *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
- *
- * by Peter Braam <braam@clusterfs.com>
- *
- */
-
-#define EXPORT_SYMTAB
-
-#include <linux/version.h>
-#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/kmod.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-#include <asm/uaccess.h>
-
-#define DEBUG_SUBSYSTEM S_LLITE
-
-#include <linux/lustre_lite.h>
-#include <linux/lustre_ha.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_net.h>
-
-static int lustre_ha_check_event(struct lustre_ha_mgr *mgr)
-{
- int rc = 0;
- ENTRY;
-
- spin_lock(&mgr->mgr_lock);
- if (!(mgr->mgr_flags & MGR_WORKING) &&
- !list_empty(&mgr->mgr_troubled_lh) ) {
- mgr->mgr_flags |= MGR_WORKING;
- mgr->mgr_waketime = CURRENT_TIME;
- schedule_timeout(4*HZ);
- CERROR("connection in trouble\n");
- rc = 1;
- }
-
- if (!mgr->mgr_flags & MGR_WORKING &&
- CURRENT_TIME >= mgr->mgr_waketime ) {
- CERROR("woken up once more\n");
- mgr->mgr_waketime = CURRENT_TIME;
- schedule_timeout(4*HZ);
- rc = 1;
- }
-
- if (mgr->mgr_flags & MGR_STOPPING) {
- CERROR("ha mgr stopping\n");
- rc = 1;
- }
-
- spin_unlock(&mgr->mgr_lock);
- RETURN(rc);
-}
-
-
-static int llite_ha_upcall(void)
-{
- char *argv[2];
- char *envp[3];
-
- argv[0] = "/usr/src/obd/utils/ha_assist.sh";
- argv[1] = NULL;
-
- envp [0] = "HOME=/";
- envp [1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
- envp [2] = NULL;
-
- return call_usermodehelper(argv[0], argv, envp);
-}
-
-static int llite_ha_main(void *arg)
-{
- struct lustre_ha_thread *data = (struct lustre_ha_thread *)arg;
- struct lustre_ha_mgr *mgr = data->mgr;
-
- ENTRY;
-
- lock_kernel();
- daemonize();
- spin_lock_irq(¤t->sigmask_lock);
- sigfillset(¤t->blocked);
- recalc_sigpending(current);
- spin_unlock_irq(¤t->sigmask_lock);
-
- sprintf(current->comm, data->name);
-
- /* Record that the thread is running */
- mgr->mgr_thread = current;
- mgr->mgr_flags = MGR_RUNNING;
- wake_up(&mgr->mgr_ctl_waitq);
-
- /* And now, loop forever on requests */
- while (1) {
- wait_event_interruptible(mgr->mgr_waitq,
- lustre_ha_check_event(mgr));
-
- if (mgr->mgr_flags & MGR_STOPPING) {
- spin_unlock(&mgr->mgr_lock);
- CERROR("lustre_hamgr quitting\n");
- EXIT;
- break;
- }
-
- spin_lock(&mgr->mgr_lock);
- CERROR("lustre_hamgr woken up\n");
- llite_ha_upcall();
- schedule_timeout(5 * HZ);
- spin_unlock(&mgr->mgr_lock);
- }
-
- mgr->mgr_thread = NULL;
- mgr->mgr_flags = MGR_STOPPED;
- wake_up(&mgr->mgr_ctl_waitq);
- CDEBUG(D_NET, "mgr exiting process %d\n", current->pid);
- RETURN(0);
-}
-
-struct lustre_ha_mgr *llite_ha_setup(void)
-{
- struct lustre_ha_thread d;
- struct lustre_ha_mgr *mgr;
- int rc;
- ENTRY;
-
- PORTAL_ALLOC(mgr, sizeof(*mgr));
- if (!mgr) {
- CERROR("out of memory\n");
- LBUG();
- RETURN(NULL);
- }
- INIT_LIST_HEAD(&mgr->mgr_connections_lh);
- INIT_LIST_HEAD(&mgr->mgr_troubled_lh);
- spin_lock_init(&mgr->mgr_lock);
-
- d.mgr = mgr;
- d.name = "lustre_hamgr";
-
- init_waitqueue_head(&mgr->mgr_waitq);
- init_waitqueue_head(&mgr->mgr_ctl_waitq);
-
- rc = kernel_thread(llite_ha_main, (void *) &d,
- CLONE_VM | CLONE_FS | CLONE_FILES);
- if (rc < 0) {
- CERROR("cannot start thread\n");
- RETURN(NULL);
- }
- wait_event(mgr->mgr_ctl_waitq, mgr->mgr_flags & MGR_RUNNING);
-
- RETURN(mgr);
-}
-
-
-int llite_ha_cleanup(struct lustre_ha_mgr *mgr)
-{
- mgr->mgr_flags = MGR_STOPPING;
-
- wake_up(&mgr->mgr_waitq);
- wait_event_interruptible(mgr->mgr_ctl_waitq,
- (mgr->mgr_flags & MGR_STOPPED));
- PORTAL_FREE(mgr, sizeof(*mgr));
- RETURN(0);
-}
extern struct address_space_operations ll_aops;
extern struct address_space_operations ll_dir_aops;
struct super_operations ll_super_operations;
-
-static struct lustre_ha_mgr *llite_ha_mgr;
+extern int ll_commitcbd_setup(struct ll_sb_info *);
+extern int ll_commitcbd_cleanup(struct ll_sb_info *);
static char *ll_read_opt(const char *opt, char *data)
{
GOTO(out_free, sb = NULL);
}
- /* the first parameter should become an mds device no */
- ptlrpc_init_client(llite_ha_mgr, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
+ ptlrpc_init_client(ptlrpc_connmgr,
+ MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
&sbi->ll_mds_client);
- sbi->ll_mds_conn = ptlrpc_connect_client("mds");
- if (err) {
+
+ sbi->ll_mds_conn = ptlrpc_uuid_to_connection("mds");
+ if (!sbi->ll_mds_conn) {
CERROR("cannot find MDS\n");
GOTO(out_disc, sb = NULL);
}
- sbi->ll_super = sb;
+ err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mds_conn);
+ if (err) {
+ CERROR("cannot connect to MDS\n");
+ GOTO(out_disc, sb = NULL);
+ }
+
sbi->ll_rootino = 2;
- sbi->ll_ha_mgr = llite_ha_mgr;
sb->s_maxbytes = 1LL << 36;
sb->s_blocksize = PAGE_SIZE;
GOTO(out_req, sb = NULL);
}
+ /* initialize committed transaction callback daemon */
+ INIT_LIST_HEAD(&sbi->ll_commitcbd_not_committed);
+ spin_lock_init(&sbi->ll_commitcbd_lock);
+ init_waitqueue_head(&sbi->ll_commitcbd_waitq);
+ init_waitqueue_head(&sbi->ll_commitcbd_ctl_waitq);
+ sbi->ll_commitcbd_flags = 0;
+ err = ll_commitcbd_setup(sbi);
+ if (err) {
+ CERROR("failed to start commit callback daemon\n");
+ GOTO(out_req, sb = NULL);
+ }
+
root = iget4(sb, sbi->ll_rootino, NULL,
lustre_msg_buf(request->rq_repmsg, 0));
if (root) {
{
struct ll_sb_info *sbi = sb->u.generic_sbp;
ENTRY;
+ ll_commitcbd_cleanup(sbi);
obd_disconnect(&sbi->ll_conn);
ptlrpc_put_connection(sbi->ll_mds_conn);
OBD_FREE(sb->u.generic_sbp, sizeof(*sbi));
"lustre_lite", 0, ll_read_super, NULL
};
-static int llite_setup(struct obd_device *dev, obd_count len, void *buf)
-{
- MOD_INC_USE_COUNT;
- return 0;
-}
-
-static int llite_cleanup(struct obd_device *dev)
-{
- MOD_DEC_USE_COUNT;
- return 0;
-}
-
-/* use obd ops to offer management infrastructure */
-static struct obd_ops llite_obd_ops = {
- o_setup: llite_setup,
- o_cleanup: llite_cleanup,
-};
-
static int __init init_lustre_lite(void)
{
printk(KERN_INFO "Lustre Lite 0.0.1, braam@clusterfs.com\n");
- obd_register_type(&llite_obd_ops, LUSTRE_LITE_NAME);
ll_file_data_slab = kmem_cache_create("ll_file_data",
sizeof(struct ll_file_data), 0,
SLAB_HWCACHE_ALIGN, NULL, NULL);
if (ll_file_data_slab == NULL)
return -ENOMEM;
-
- llite_ha_mgr = llite_ha_setup();
return register_filesystem(&lustre_lite_fs_type);
}
static void __exit exit_lustre_lite(void)
{
unregister_filesystem(&lustre_lite_fs_type);
- llite_ha_cleanup(llite_ha_mgr);
kmem_cache_destroy(ll_file_data_slab);
- obd_unregister_type(LUSTRE_LITE_NAME);
}
MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
int rc, size = sizeof(*body);
ENTRY;
- req = ptlrpc_prep_req(cl, conn, MDS_GETATTR, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_CONNECT, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
}
ptlrpc_init_client(NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
- conn = ptlrpc_connect_client("mds");
+ conn = ptlrpc_uuid_to_connection("mds");
if (err) {
CERROR("cannot create client\n");
RETURN(-EINVAL);
ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
osc->osc_client);
- osc->osc_conn = ptlrpc_connect_client("ost");
+ osc->osc_conn = ptlrpc_uuid_to_connection("ost");
if (!osc->osc_conn)
RETURN(-EINVAL);
modulefs_DATA = ptlrpc.o
EXTRA_PROGRAMS = ptlrpc
-ptlrpc_SOURCES = client.c connection.c events.c niobuf.c pack_generic.c rpc.c \
-service.c
+ptlrpc_SOURCES = recovd.c connection.c rpc.c events.c service.c client.c niobuf.c pack_generic.c
include $(top_srcdir)/Rules
#include <linux/lustre_net.h>
-void llite_ha_conn_manage(struct lustre_ha_mgr *mgr, struct ptlrpc_client *cli)
-{
- ENTRY;
- cli->cli_ha_mgr = mgr;
- spin_lock(&mgr->mgr_lock);
- list_add(&cli->cli_ha_item, &mgr->mgr_connections_lh);
- spin_unlock(&mgr->mgr_lock);
- EXIT;
-}
-
-void llite_ha_conn_fail(struct ptlrpc_client *cli)
-{
- ENTRY;
- spin_lock(&cli->cli_ha_mgr->mgr_lock);
- list_del(&cli->cli_ha_item);
- list_add(&cli->cli_ha_item, &cli->cli_ha_mgr->mgr_troubled_lh);
- spin_unlock(&cli->cli_ha_mgr->mgr_lock);
- wake_up(&cli->cli_ha_mgr->mgr_waitq);
- EXIT;
-}
-void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal,
+void ptlrpc_init_client(struct connmgr_obd *mgr, int req_portal,
int rep_portal, struct ptlrpc_client *cl)
{
memset(cl, 0, sizeof(*cl));
cl->cli_ha_mgr = mgr;
if (mgr)
- llite_ha_conn_manage(mgr, cl);
+ connmgr_cli_manage(mgr, cl);
cl->cli_obd = NULL;
cl->cli_request_portal = req_portal;
cl->cli_reply_portal = rep_portal;
sema_init(&cl->cli_rpc_sem, 32);
}
-struct ptlrpc_connection *ptlrpc_connect_client(char *uuid)
+struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
{
struct ptlrpc_connection *c;
struct lustre_peer peer;
if (CURRENT_TIME - req->rq_time >= 3) {
CERROR("-- REQ TIMEOUT --\n");
if (req->rq_client && req->rq_client->cli_ha_mgr)
- llite_ha_conn_fail(req->rq_client);
+ connmgr_cli_fail(req->rq_client);
return 0;
}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * linux/mds/handler.c
+ *
+ * Lustre High Availability Daemon
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ *
+ * by Peter Braam <braam@clusterfs.com>
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/version.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/stat.h>
+#include <linux/locks.h>
+#include <linux/kmod.h>
+#include <linux/quotaops.h>
+#include <asm/unistd.h>
+#include <asm/uaccess.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/lustre_lite.h>
+#include <linux/lustre_ha.h>
+#include <linux/lustre_lib.h>
+#include <linux/lustre_net.h>
+
+struct connmgr_obd *ptlrpc_connmgr;
+
+void connmgr_cli_manage(struct connmgr_obd *mgr, struct ptlrpc_client *cli)
+{
+ ENTRY;
+ cli->cli_ha_mgr = mgr;
+ spin_lock(&mgr->mgr_lock);
+ list_add(&cli->cli_ha_item, &mgr->mgr_connections_lh);
+ spin_unlock(&mgr->mgr_lock);
+ EXIT;
+}
+
+
+void connmgr_cli_fail(struct ptlrpc_client *cli)
+{
+ ENTRY;
+ spin_lock(&cli->cli_ha_mgr->mgr_lock);
+ cli->cli_ha_mgr->mgr_flags |= SVC_HA_EVENT;
+ list_del(&cli->cli_ha_item);
+ list_add(&cli->cli_ha_item, &cli->cli_ha_mgr->mgr_troubled_lh);
+ spin_unlock(&cli->cli_ha_mgr->mgr_lock);
+ wake_up(&cli->cli_ha_mgr->mgr_waitq);
+ EXIT;
+}
+
+int connmgr_upcall(void)
+{
+ char *argv[2];
+ char *envp[3];
+
+ argv[0] = "/usr/src/obd/utils/ha_assist.sh";
+ argv[1] = NULL;
+
+ envp [0] = "HOME=/";
+ envp [1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
+ envp [2] = NULL;
+
+ return call_usermodehelper(argv[0], argv, envp);
+}
+
+static void connmgr_unpack_body(struct ptlrpc_request *req)
+{
+ struct connmgr_body *b = lustre_msg_buf(req->rq_repmsg, 0);
+ if (b == NULL)
+ LBUG();
+
+ b->generation = NTOH__u32(b->generation);
+}
+
+int connmgr_connect(struct connmgr_obd *mgr,
+ struct ptlrpc_connection *conn)
+{
+ struct ptlrpc_request *req;
+ struct ptlrpc_client *cl;
+ struct connmgr_body *body;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ if (!mgr) {
+ CERROR("no manager\n");
+ LBUG();
+ }
+ cl = mgr->mgr_client;
+
+ req = ptlrpc_prep_req(cl, conn, CONNMGR_CONNECT, 1, &size, NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body->generation = HTON__u32(conn->c_generation);
+
+ req->rq_replen = lustre_msg_size(1, &size);
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+
+ if (!rc) {
+ connmgr_unpack_body(req);
+ body = lustre_msg_buf(req->rq_repmsg, 0);
+ CDEBUG(D_NET, "mode: %o\n", body->generation);
+ }
+
+ EXIT;
+ out:
+ return rc;
+}
+
+
+int connmgr_handle_connect(struct ptlrpc_request *req)
+{
+ struct connmgr_body *body;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc) {
+ CERROR("connmgr: out of memory\n");
+ req->rq_status = -ENOMEM;
+ RETURN(0);
+ }
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ connmgr_unpack_body(req);
+
+ printk("incoming generation %d\n", body->generation);
+ body = lustre_msg_buf(req->rq_repmsg, 0);
+ body->generation = 4711;
+ RETURN(0);
+}
+
+int connmgr_handle(struct obd_device *dev,
+ struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ int rc;
+ ENTRY;
+
+ rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+ if (rc) {
+ CERROR("lustre_mds: Invalid request\n");
+ GOTO(out, rc);
+ }
+
+ if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
+ CERROR("lustre_mds: wrong packet type sent %d\n",
+ req->rq_reqmsg->type);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ switch (req->rq_reqmsg->opc) {
+ case CONNMGR_CONNECT:
+ CDEBUG(D_INODE, "getattr\n");
+ OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
+ rc = connmgr_handle_connect(req);
+ break;
+
+ default:
+ rc = ptlrpc_error(svc, req);
+ RETURN(rc);
+ }
+
+ EXIT;
+out:
+ if (rc) {
+ ptlrpc_error(svc, req);
+ } else {
+ CDEBUG(D_NET, "sending reply\n");
+ ptlrpc_reply(svc, req);
+ }
+
+ return 0;
+}
+
+
+static int recovd_check_event(struct connmgr_obd *mgr)
+{
+ int rc = 0;
+ ENTRY;
+
+ spin_lock(&mgr->mgr_lock);
+
+ if (!(mgr->mgr_flags & MGR_WORKING) &&
+ !list_empty(&mgr->mgr_troubled_lh) ) {
+
+ CERROR("connection in trouble - state: WORKING, upcall\n");
+ mgr->mgr_flags = MGR_WORKING;
+
+ mgr->mgr_waketime = CURRENT_TIME;
+ mgr->mgr_timeout = 5 * HZ;
+ schedule_timeout(mgr->mgr_timeout);
+
+ }
+
+ if (mgr->mgr_flags & MGR_WORKING &&
+ CURRENT_TIME <= mgr->mgr_waketime + mgr->mgr_timeout ) {
+ CERROR("WORKING: new event\n");
+
+ mgr->mgr_waketime = CURRENT_TIME;
+ schedule_timeout(mgr->mgr_timeout);
+ }
+
+ if (mgr->mgr_flags & MGR_STOPPING) {
+ CERROR("ha mgr stopping\n");
+ rc = 1;
+ }
+
+ spin_unlock(&mgr->mgr_lock);
+ RETURN(rc);
+}
+
+int recovd_handle_event(struct connmgr_obd *mgr)
+{
+
+ spin_lock(&mgr->mgr_lock);
+
+ if (!(mgr->mgr_flags & MGR_WORKING) &&
+ !list_empty(&mgr->mgr_troubled_lh) ) {
+
+ CERROR("connection in trouble - state: WORKING, upcall\n");
+ mgr->mgr_flags = MGR_WORKING;
+
+
+ connmgr_upcall();
+ mgr->mgr_waketime = CURRENT_TIME;
+ mgr->mgr_timeout = 5 * HZ;
+ schedule_timeout(mgr->mgr_timeout);
+
+ }
+
+ if (mgr->mgr_flags & MGR_WORKING &&
+ CURRENT_TIME <= mgr->mgr_waketime + mgr->mgr_timeout ) {
+ CERROR("WORKING: new event\n");
+
+ mgr->mgr_waketime = CURRENT_TIME;
+ schedule_timeout(mgr->mgr_timeout);
+ }
+
+ spin_unlock(&mgr->mgr_lock);
+ return 0;
+}
+
+static int recovd_main(void *arg)
+{
+ struct connmgr_thread *data = (struct connmgr_thread *)arg;
+ struct connmgr_obd *mgr = data->mgr;
+
+ ENTRY;
+
+ lock_kernel();
+ daemonize();
+ spin_lock_irq(¤t->sigmask_lock);
+ sigfillset(¤t->blocked);
+ recalc_sigpending(current);
+ spin_unlock_irq(¤t->sigmask_lock);
+
+ sprintf(current->comm, data->name);
+
+ /* Record that the thread is running */
+ mgr->mgr_thread = current;
+ mgr->mgr_flags = MGR_RUNNING;
+ wake_up(&mgr->mgr_ctl_waitq);
+
+ /* And now, loop forever on requests */
+ while (1) {
+ wait_event_interruptible(mgr->mgr_waitq,
+ recovd_check_event(mgr));
+
+ spin_lock(&mgr->mgr_lock);
+ if (mgr->mgr_flags & MGR_STOPPING) {
+ spin_unlock(&mgr->mgr_lock);
+ CERROR("lustre_hamgr quitting\n");
+ EXIT;
+ break;
+ }
+
+ recovd_handle_event(mgr);
+ spin_unlock(&mgr->mgr_lock);
+ }
+
+ mgr->mgr_thread = NULL;
+ mgr->mgr_flags = MGR_STOPPED;
+ wake_up(&mgr->mgr_ctl_waitq);
+ CDEBUG(D_NET, "mgr exiting process %d\n", current->pid);
+ RETURN(0);
+}
+
+int recovd_setup(struct connmgr_obd *mgr)
+{
+ struct connmgr_thread d;
+ int rc;
+ ENTRY;
+
+ INIT_LIST_HEAD(&mgr->mgr_connections_lh);
+ INIT_LIST_HEAD(&mgr->mgr_troubled_lh);
+ spin_lock_init(&mgr->mgr_lock);
+
+ d.mgr = mgr;
+ d.name = "lustre_recovd";
+
+ init_waitqueue_head(&mgr->mgr_waitq);
+ init_waitqueue_head(&mgr->mgr_ctl_waitq);
+
+ rc = kernel_thread(recovd_main, (void *) &d,
+ CLONE_VM | CLONE_FS | CLONE_FILES);
+ if (rc < 0) {
+ CERROR("cannot start thread\n");
+ RETURN(-EINVAL);
+ }
+ wait_event(mgr->mgr_ctl_waitq, mgr->mgr_flags & MGR_RUNNING);
+
+ RETURN(0);
+}
+
+
+int recovd_cleanup(struct connmgr_obd *mgr)
+{
+ mgr->mgr_flags = MGR_STOPPING;
+
+ wake_up(&mgr->mgr_waitq);
+ wait_event_interruptible(mgr->mgr_ctl_waitq,
+ (mgr->mgr_flags & MGR_STOPPED));
+ RETURN(0);
+}
#include <linux/lustre_net.h>
+extern int connmgr_handle(struct obd_device *dev,
+ struct ptlrpc_service *svc,
+ struct ptlrpc_request *req);
+
extern int ptlrpc_init_portals(void);
extern void ptlrpc_exit_portals(void);
+extern int recovd_setup(struct connmgr_obd *mgr);
+extern int recovd_cleanup(struct connmgr_obd *mgr);
+
+
+int connmgr_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+ struct connmgr_obd *mgr = &obddev->u.mgr;
+ int err;
+ ENTRY;
+
+ memset(mgr, 0, sizeof(*mgr));
+
+ OBD_ALLOC(mgr->mgr_client, sizeof(*mgr->mgr_client));
+ if (!mgr)
+ RETURN(-ENOMEM);
+
+ err = recovd_setup(mgr);
+ if (err)
+ GOTO(err_free, err);
+
+ mgr->mgr_service = ptlrpc_init_svc
+ (128 * 1024,CONNMGR_REQUEST_PORTAL, CONNMGR_REPLY_PORTAL,
+ "self", connmgr_handle);
+ if (!mgr->mgr_service) {
+ CERROR("failed to start service\n");
+ GOTO(err_recovd, err = -EINVAL);
+ }
+
+ ptlrpc_init_client(NULL, CONNMGR_REQUEST_PORTAL,
+ CONNMGR_REPLY_PORTAL, mgr->mgr_client);
+
+ err = ptlrpc_start_thread(obddev, mgr->mgr_service, "lustre_connmgr");
+ if (err) {
+ CERROR("cannot start thread\n");
+ GOTO(err_svc, err);
+ }
+
+ MOD_INC_USE_COUNT;
+ ptlrpc_connmgr = mgr;
+ RETURN(0);
+
+ err_svc:
+ rpc_unregister_service(mgr->mgr_service);
+ err_recovd:
+ recovd_cleanup(mgr);
+ err_free:
+ if (mgr->mgr_client)
+ OBD_FREE(mgr->mgr_client, sizeof(*mgr->mgr_client));
+ RETURN(err);
+}
+
+
+int connmgr_cleanup(struct obd_device *dev)
+{
+ struct connmgr_obd *mgr = &dev->u.mgr;
+ int err;
+
+ err = recovd_cleanup(mgr);
+ if (err)
+ LBUG();
+
+ ptlrpc_stop_thread(mgr->mgr_service);
+ rpc_unregister_service(mgr->mgr_service);
+ if (!list_empty(&mgr->mgr_service->srv_reqs)) {
+ // XXX reply with errors and clean up
+ CERROR("Request list not empty!\n");
+ }
+
+ OBD_FREE(mgr->mgr_service, sizeof(*mgr->mgr_service));
+ mgr->mgr_flags = MGR_STOPPING;
+
+ PORTAL_FREE(mgr->mgr_client, sizeof(*mgr->mgr_client));
+ MOD_DEC_USE_COUNT;
+ RETURN(0);
+}
+
+/* use obd ops to offer management infrastructure */
+static struct obd_ops connmgr_obd_ops = {
+ o_setup: connmgr_setup,
+ o_cleanup: connmgr_cleanup,
+};
static int __init ptlrpc_init(void)
{
+ int rc;
+ rc = ptlrpc_init_portals();
+ if (rc)
+ RETURN(rc);
ptlrpc_init_connection();
- return ptlrpc_init_portals();
+ obd_register_type(&connmgr_obd_ops, LUSTRE_HA_NAME);
+ return 0;
}
static void __exit ptlrpc_exit(void)
{
+ obd_unregister_type(LUSTRE_HA_NAME);
ptlrpc_exit_portals();
ptlrpc_cleanup_connection();
}
extern int request_in_callback(ptl_event_t *ev, void *data);
extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
+extern int connmgr_handle_event(struct obd_device *dev, struct ptlrpc_service *);
static int ptlrpc_check_event(struct ptlrpc_service *svc)
{
if (svc->srv_flags & SVC_STOPPING)
GOTO(out, rc = 1);
- if (svc->srv_flags & SVC_EVENT)
- LBUG();
-
if (ptl_is_valid_handle(&svc->srv_eq_h)) {
int err;
err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
GOTO(out, rc = 0);
}
- if (!list_empty(&svc->srv_reqs)) {
- svc->srv_flags |= SVC_LIST;
- GOTO(out, rc = 1);
- }
-
EXIT;
out:
spin_unlock(&svc->srv_lock);
spin_lock(&svc->srv_lock);
if (svc->srv_flags & SVC_SIGNAL) {
+ svc->srv_flags &= ~SVC_SIGNAL;
spin_unlock(&svc->srv_lock);
EXIT;
break;
}
if (svc->srv_flags & SVC_STOPPING) {
+ svc->srv_flags &= ~SVC_STOPPING;
spin_unlock(&svc->srv_lock);
EXIT;
break;
}
-
- if (svc->srv_flags & SVC_EVENT) {
- svc->srv_flags = SVC_RUNNING;
+
+ if (svc->srv_flags & SVC_EVENT) {
+ svc->srv_flags &= ~SVC_EVENT;
rc = handle_incoming_request(obddev, svc);
continue;
}
- if (svc->srv_flags & SVC_LIST) {
- struct ptlrpc_request *request;
- svc->srv_flags = SVC_RUNNING;
-
- request = list_entry(svc->srv_reqs.next,
- struct ptlrpc_request,
- rq_list);
- list_del(&request->rq_list);
- spin_unlock(&svc->srv_lock);
- rc = svc->srv_handler(obddev, svc, request);
- continue;
- }
CERROR("unknown break in service");
spin_unlock(&svc->srv_lock);
EXIT;
DEFS:=
CPPFLAGS := -g -I. -I$(PORTALS)/include -I$(top_srcdir)/include -Wall -D_LARGEFILE64_SOURCE
+# LDADD = -lldap
# LDADD := -lreadline -ltermcap # -lefence
-bin_PROGRAMS = openunlink testreq truncate
+bin_PROGRAMS = ldaptest openunlink testreq truncate
+# ldaptest_SOURCES = ldaptest.c
testreq_SOURCES = testreq.c
truncate_SOURCES = truncate.c
openunlink_SOURCES = openunlink.c
device 3
attach osc
setup -1
+device 4
+attach ptlrpc
+setup
quit
EOF
rmmod mdc
$OBDCTL <<EOF
+device 4
+cleanup
+detach
device 3
cleanup
detach