/* Per-export ldlm state. */
struct ldlm_export_data {
- struct list_head led_held_locks;
+ struct list_head led_held_locks;
+ struct ptlrpc_client led_client; /* cached client for locks */
};
static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
#ifndef _LUSTRE_HA_H
#define _LUSTRE_HA_H
-#include <linux/lustre_net.h>
-
#define LUSTRE_HA_NAME "ptlrpc"
-extern struct recovd_obd *ptlrpc_connmgr;
-
-struct connmgr_thread {
- struct recovd_obd *mgr;
- char *name;
+struct recovd_data {
+ struct list_head rd_managed_chain;
+ int (*rd_recover)(struct recovd_data *);
};
-int connmgr_connect(struct recovd_obd *mgr, struct ptlrpc_connection *conn);
-int connmgr_handle(struct ptlrpc_request *req);
-void recovd_cli_fail(struct ptlrpc_client *cli);
-void recovd_cli_manage(struct recovd_obd *mgr, struct ptlrpc_client *cli);
-void recovd_cli_fixed(struct ptlrpc_client *cli);
+struct recovd_obd;
+struct ptlrpc_connection;
+
+void recovd_conn_fail(struct ptlrpc_connection *conn);
+void recovd_conn_manage(struct recovd_obd *mgr, struct ptlrpc_connection *conn);
+void recovd_conn_fixed(struct ptlrpc_connection *conn);
int recovd_setup(struct recovd_obd *mgr);
int recovd_cleanup(struct recovd_obd *mgr);
int mdc_create_client(char *uuid, struct ptlrpc_client *cl);
extern int mds_client_add(struct mds_export_data *med, int cl_off);
-extern int mds_client_free(struct mds_export_data *med);
+extern int mds_client_free(struct obd_export *exp);
/* mds/mds_fs.c */
struct mds_fs_operations {
#include <linux/obd.h>
#include <portals/p30.h>
#include <linux/lustre_idl.h>
+#include <linux/lustre_ha.h>
/* default rpc ring length */
#define RPC_RING_LENGTH 10
struct ptlrpc_connection {
- struct list_head c_link;
- struct lustre_peer c_peer;
- __u8 c_local_uuid[37]; /* XXX do we need this? */
- __u8 c_remote_uuid[37];
-
- int c_level;
- __u32 c_generation; /* changes upon new connection */
- __u32 c_epoch; /* changes when peer changes */
- __u32 c_bootcount; /* peer's boot count */
-
- spinlock_t c_lock;
- __u32 c_xid_in;
- __u32 c_xid_out;
-
- atomic_t c_refcount;
- __u64 c_token;
- __u64 c_remote_conn;
- __u64 c_remote_token;
+ struct list_head c_link;
+ struct lustre_peer c_peer;
+ __u8 c_local_uuid[37]; /* XXX do we need this? */
+ __u8 c_remote_uuid[37];
+
+ int c_level;
+ __u32 c_generation; /* changes upon new connection */
+ __u32 c_epoch; /* changes when peer changes */
+ __u32 c_bootcount; /* peer's boot count */
+
+ spinlock_t c_lock;
+ __u32 c_xid_in;
+ __u32 c_xid_out;
+
+ atomic_t c_refcount;
+ __u64 c_token;
+ __u64 c_remote_conn;
+ __u64 c_remote_token;
+
+ __u64 c_last_xid;
+ __u64 c_last_committed;
+ struct list_head c_delayed_head; /* delayed until post-recovery */
+ struct list_head c_sending_head;
+ struct list_head c_dying_head;
+ struct recovd_data c_recovd_data;
+
+ struct list_head c_clients; /* XXXshaver will be c_imports */
+ struct list_head c_exports;
+
+ /* should this be in recovd_data? */
+ struct recovd_obd *c_recovd;
};
struct ptlrpc_client {
- struct obd_device *cli_obd;
- __u32 cli_request_portal;
- __u32 cli_reply_portal;
-
- __u64 cli_last_xid;
- __u64 cli_last_committed;
- __u32 cli_target_devno;
-
- void *cli_data;
- struct semaphore cli_rpc_sem; /* limits outstanding requests */
-
- spinlock_t cli_lock; /* protects lists */
- struct list_head cli_delayed_head; /* delayed until after recovery */
- struct list_head cli_sending_head;
- struct list_head cli_dying_head;
- struct list_head cli_ha_item;
- int (*cli_recover)(struct ptlrpc_client *);
-
- struct recovd_obd *cli_recovd;
- char *cli_name;
+ struct obd_device *cli_obd;
+ __u32 cli_request_portal;
+ __u32 cli_reply_portal;
+
+ __u32 cli_target_devno;
+
+ struct ptlrpc_connection *cli_connection;
+
+ void *cli_data;
+ struct semaphore cli_rpc_sem; /* limits outstanding requests */
+
+ struct list_head cli_client_chain;
+ char *cli_name;
};
/* state flags of requests */
void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
/* rpc/client.c */
-void ptlrpc_init_client(struct recovd_obd *,
- int (*recover)(struct ptlrpc_client *),
- int req_portal, int rep_portal,
- struct ptlrpc_client *);
+void ptlrpc_init_client(int req_portal, int rep_portal, struct ptlrpc_client *,
+ struct ptlrpc_connection *);
void ptlrpc_cleanup_client(struct ptlrpc_client *cli);
__u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req);
struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid);
int ptlrpc_replay_req(struct ptlrpc_request *req);
void ptlrpc_restart_req(struct ptlrpc_request *req);
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
- struct ptlrpc_connection *u, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
int count, int *lengths, char **bufs);
void ptlrpc_free_req(struct ptlrpc_request *request);
void ptlrpc_req_finished(struct ptlrpc_request *request);
struct recovd_obd {
time_t recovd_waketime;
time_t recovd_timeout;
- struct ptlrpc_service *recovd_service;
- struct ptlrpc_client *recovd_client;
- __u32 recovd_flags;
- __u32 recovd_wakeup_flag;
- spinlock_t recovd_lock;
- struct list_head recovd_clients_lh; /* clients managed */
- struct list_head recovd_troubled_lh; /* clients in trouble */
+ __u32 recovd_flags;
+ __u32 recovd_wakeup_flag;
+ spinlock_t recovd_lock;
+ struct list_head recovd_managed_items; /* items managed */
+ struct list_head recovd_troubled_items; /* items in trouble */
wait_queue_head_t recovd_recovery_waitq;
wait_queue_head_t recovd_ctl_waitq;
wait_queue_head_t recovd_waitq;
*/
#ifdef __KERNEL__
-extern struct obd_export *class_conn2export(struct lustre_handle *conn);
-extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
-extern int class_rconn2export(struct lustre_handle *conn,
- struct lustre_handle *rconn);
-
struct obd_export {
__u64 exp_cookie;
struct lustre_handle exp_rconnh; /* remote connection handle */
uuid_t exp_uuid;
};
+extern struct obd_export *class_conn2export(struct lustre_handle *conn);
+extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
+extern int class_rconn2export(struct lustre_handle *conn,
+ struct lustre_handle *rconn);
+
struct obd_import {
__u64 imp_cookie;
struct lustre_handle imp_expconnh;
int class_uuid2dev(char *name);
struct obd_device *class_uuid2obd(char *name);
struct obd_export *class_new_export(struct obd_device *obddev);
+void class_destroy_export(struct obd_export *exp);
int class_connect(struct lustre_handle *conn, struct obd_device *obd,
char *cluuid);
int class_disconnect(struct lustre_handle *conn);
int class_multi_setup(struct obd_device *obddev, uint32_t len, void *data);
int class_multi_cleanup(struct obd_device *obddev);
-extern void (*class_signal_client_failure)(struct ptlrpc_client *);
+extern void (*class_signal_connection_failure)(struct ptlrpc_connection *);
+
+/* == mds_client_free if MDS running here */
+extern int (*mds_destroy_export)(struct obd_export *exp);
+/* == ldlm_client_free if(?) DLM running here */
+extern int (*ldlm_destroy_export)(struct obd_export *exp);
#endif
{
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct ptlrpc_client *cl;
int rc = 0, size = sizeof(*body);
ENTRY;
- cl = &lock->l_resource->lr_namespace->ns_rpc_client;
- req = ptlrpc_prep_req(cl, lock->l_export->exp_connection,
+ req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_client,
LDLM_BL_CALLBACK, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
{
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct ptlrpc_client *cl;
int rc = 0, size = sizeof(*body);
ENTRY;
RETURN(-EINVAL);
}
- cl = &lock->l_resource->lr_namespace->ns_rpc_client;
- req = ptlrpc_prep_req(cl, lock->l_export->exp_connection,
+ req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_client,
LDLM_CP_CALLBACK, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
OBD_ALLOC(obddev->u.ldlm.ldlm_client,
sizeof(*obddev->u.ldlm.ldlm_client));
- ptlrpc_init_client(NULL, NULL,
- LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- obddev->u.ldlm.ldlm_client);
connection = ptlrpc_uuid_to_connection("ldlm");
if (!connection)
CERROR("No LDLM UUID found: assuming ldlm is local.\n");
+ ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ obddev->u.ldlm.ldlm_client, connection);
+
switch (cmd) {
case IOC_LDLM_TEST:
err = ldlm_test(obddev, conn);
waiting_locks_timer.function = waiting_locks_callback;
waiting_locks_timer.data = 0;
init_timer(&waiting_locks_timer);
-
+
RETURN(0);
out_thread:
#define DEBUG_SUBSYSTEM S_LDLM
#include <linux/lustre_dlm.h>
+#include <linux/obd_class.h>
kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
}
strcpy(ns->ns_name, name);
- ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- &ns->ns_rpc_client);
-
INIT_LIST_HEAD(&ns->ns_root_list);
l_lock_init(&ns->ns_lock);
ns->ns_refcount = 0;
vfree(ns->ns_hash /* , sizeof(*ns->ns_hash) * RES_HASH_SIZE */);
obd_memory -= sizeof(*ns->ns_hash) * RES_HASH_SIZE;
- ptlrpc_cleanup_client(&ns->ns_rpc_client);
OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
OBD_FREE(ns, sizeof(*ns));
return ELDLM_OK;
}
+int ldlm_client_free(struct obd_export *exp)
+{
+ struct ldlm_export_data *led = &exp->exp_ldlm_data;
+ ptlrpc_cleanup_client(&led->led_client);
+ RETURN(0);
+}
+
static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name)
{
__u32 hash = 0;
LBUG();
return &export->exp_obd->u.cli;
}
-extern struct recovd_obd *ptlrpc_connmgr;
int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
{
if (mdc->cl_ldlm_client == NULL)
GOTO(out_client, rc = -ENOMEM);
- /* XXX get recovery hooked in here again */
- //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
-
- ptlrpc_init_client(ptlrpc_connmgr, NULL, rq_portal, rp_portal,
- mdc->cl_client);
- /* XXXshaver Should the LDLM have its own recover function? Probably. */
- ptlrpc_init_client(ptlrpc_connmgr, NULL, LDLM_REQUEST_PORTAL,
- LDLM_REPLY_PORTAL, mdc->cl_ldlm_client);
+ ptlrpc_init_client(rq_portal, rp_portal, mdc->cl_client, mdc->cl_conn);
+ ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ mdc->cl_ldlm_client, mdc->cl_conn);
mdc->cl_client->cli_name = "mdc";
mdc->cl_ldlm_client->cli_name = "ldlm";
mdc->cl_max_mdsize = sizeof(struct lov_mds_md);
if (obd->obd_namespace == NULL)
GOTO(out_disco, rc = -ENOMEM);
- request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 2, size,
- tmp);
+ request = ptlrpc_prep_req(cli->cl_client, rq_opc, 2, size, tmp);
if (!request)
GOTO(out_ldlm, rc = -ENOMEM);
req->rq_export = export;
export->exp_connection = req->rq_connection;
+ ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ &export->exp_ldlm_data.led_client,
+ export->exp_connection);
+
#warning Peter: is this the right place to upgrade the server connection level?
req->rq_connection->c_level = LUSTRE_CONN_FULL;
out:
ENTRY;
desc->b_connection->c_level = LUSTRE_CONN_RECOVD;
desc->b_flags |= PTL_RPC_FL_TIMEOUT;
- if (desc->b_client && desc->b_client->cli_recovd &&
- class_signal_client_failure) {
+ if (desc->b_connection && desc->b_connection->c_recovd &&
+ class_signal_connection_failure) {
/* XXXshaver Do we need a resend strategy, or do we just
* XXXshaver return -ERESTARTSYS and punt it?
*/
- CERROR("signalling failure of client %p\n", desc->b_client);
- class_signal_client_failure(desc->b_client);
+ CERROR("signalling failure of conn %p\n", desc->b_connection);
+ class_signal_connection_failure(desc->b_connection);
/* We go back to sleep, until we're resumed or interrupted. */
RETURN(0);
#include <linux/lustre_lite.h>
#include <linux/lustre_ha.h>
-
static int ll_reconnect(struct ll_sb_info *sbi)
{
struct ll_fid rootfid;
ptlrpc_readdress_connection(sbi2mdc(sbi)->cl_conn, "mds");
- err = connmgr_connect(ptlrpc_connmgr, sbi2mdc(sbi)->cl_conn);
- if (err) {
- CERROR("cannot connect to MDS: rc = %d\n", err);
- ptlrpc_put_connection(sbi2mdc(sbi)->cl_conn);
- GOTO(out_disc, err = -ENOTCONN);
- }
sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_CON;
/* XXX: need to store the last_* values somewhere */
- err = mdc_getstatus(&sbi->ll_mdc_conn,
- &rootfid, &last_committed,
- &last_xid,
- &request);
+ err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
+ &last_xid, &request);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
GOTO(out_disc, err = -ENOTCONN);
}
- sbi2mdc(sbi)->cl_client->cli_last_xid = last_xid;
+ sbi2mdc(sbi)->cl_conn->c_last_xid = last_xid;
sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD;
out_disc:
int ll_recover(struct ptlrpc_client *cli)
{
+ RETURN(-ENOSYS);
+#if 0
+ /* XXXshaver this code needs to know about connection-driven recovery! */
+
struct ptlrpc_request *req;
struct list_head *tmp, *pos;
struct ll_sb_info *sbi = cli->cli_data;
+ struct ptlrpc_connection *conn = cli->cli_connection;
int rc = 0;
ENTRY;
ll_reconnect(sbi);
/* 2. walk the request list */
- spin_lock(&cli->cli_lock);
- list_for_each_safe(tmp, pos, &cli->cli_sending_head) {
+ spin_lock(&conn->c_lock);
+ list_for_each_safe(tmp, pos, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
/* replay what needs to be replayed */
if (req->rq_flags & PTL_RPC_FL_REPLAY) {
- CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_xid);
+ CDEBUG(D_INODE, "req %Ld needs replay [last rcvd %Ld]\n",
+ req->rq_xid, conn->c_last_xid);
rc = ptlrpc_replay_req(req);
if (rc) {
- CERROR("recovery replay error %d for request %Ld\n",
+ CERROR("recovery replay error %d for req %Ld\n",
rc, req->rq_xid);
GOTO(out, rc);
}
/* server has seen req, we have reply: skip */
if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid <= cli->cli_last_xid) {
- CDEBUG(D_INODE, "req %Ld was complete: skip [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_xid);
+ req->rq_xid <= conn->c_last_xid) {
+ CDEBUG(D_INODE,
+ "req %Ld was complete: skip [last rcvd %Ld]\n",
+ req->rq_xid, conn->c_last_xid);
continue;
}
/* server has lost req, we have reply: resend, ign reply */
if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid > cli->cli_last_xid) {
- CDEBUG(D_INODE, "lost req %Ld have rep: replay [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_xid);
+ req->rq_xid > conn->c_last_xid) {
+ CDEBUG(D_INODE, "lost req %Ld have rep: replay [last "
+ "rcvd %Ld]\n", req->rq_xid, conn->c_last_xid);
rc = ptlrpc_replay_req(req);
if (rc) {
- CERROR("request resend error %d for request %Ld\n",
+ CERROR("request resend error %d for req %Ld\n",
rc, req->rq_xid);
GOTO(out, rc);
}
/* server has seen req, we have lost reply: -ERESTARTSYS */
if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid <= cli->cli_last_xid) {
- CDEBUG(D_INODE, "lost rep %Ld srv did req: restart [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_xid);
+ req->rq_xid <= conn->c_last_xid) {
+ CDEBUG(D_INODE, "lost rep %Ld srv did req: restart "
+ "[last rcvd %Ld]\n",
+ req->rq_xid, conn->c_last_xid);
ptlrpc_restart_req(req);
}
/* service has not seen req, no reply: resend */
if ( !(req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_xid > cli->cli_last_xid) {
- CDEBUG(D_INODE, "lost rep/req %Ld: resend [last rcvd %Ld]\n",
- req->rq_xid, cli->cli_last_xid);
+ req->rq_xid > conn->c_last_xid) {
+ CDEBUG(D_INODE,
+ "lost rep/req %Ld: resend [last rcvd %Ld]\n",
+ req->rq_xid, conn->c_last_xid);
ptlrpc_resend_req(req);
}
}
sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL;
- recovd_cli_fixed(cli);
+ recovd_conn_fixed(conn);
/* Finally, continue what we delayed since recovery started */
- list_for_each_safe(tmp, pos, &cli->cli_delayed_head) {
+ list_for_each_safe(tmp, pos, &conn->c_delayed_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
ptlrpc_continue_req(req);
}
EXIT;
out:
- spin_unlock(&cli->cli_lock);
+ spin_unlock(&conn->c_lock);
return rc;
+#endif
}
GOTO(out_free, sb = NULL);
}
-#if 0
- err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mdc_conn);
- if (err) {
- CERROR("cannot connect to MDC: rc = %d\n", err);
- GOTO(out_rpc, sb = NULL);
- }
-#endif
-
err = obd_connect(&sbi->ll_mdc_conn, obd, sbi->ll_sb_uuid);
if (err) {
CERROR("cannot connect to %s: rc = %d\n", mdc, err);
if (!exp)
RETURN(-EINVAL);
- rc = mds_client_free(&exp->exp_mds_data);
- if (rc)
- CERROR("error freeing client data: rc = %d\n", rc);
-
rc = class_disconnect(conn);
if (!rc)
MOD_DEC_USE_COUNT;
rc = mds_recover(obddev);
if (rc)
GOTO(err_thread, rc);
+
+ mds_destroy_export = mds_client_free;
RETURN(0);
return 0;
}
-int mds_client_free(struct mds_export_data *med)
+int mds_client_free(struct obd_export *exp)
{
+ struct mds_export_data *med = &exp->exp_mds_data;
unsigned long *word;
int bit;
+ if (!med->med_mcd)
+ RETURN(0);
+
CDEBUG(D_INFO, "freeing client at offset %d with UUID '%s'\n",
med->med_off, med->med_mcd->mcd_uuid);
&obd_psdev_fops
};
-void (*class_signal_client_failure)(struct ptlrpc_client *);
+void (*class_signal_connection_failure)(struct ptlrpc_connection *);
+int (*mds_destroy_export)(struct obd_export *exp);
+int (*ldlm_destroy_export)(struct obd_export *exp);
EXPORT_SYMBOL(obd_dev);
EXPORT_SYMBOL(obdo_cachep);
EXPORT_SYMBOL(class_uuid2dev);
EXPORT_SYMBOL(class_uuid2obd);
EXPORT_SYMBOL(class_new_export);
+EXPORT_SYMBOL(class_destroy_export);
EXPORT_SYMBOL(class_connect);
EXPORT_SYMBOL(class_conn2export);
EXPORT_SYMBOL(class_rconn2export);
//EXPORT_SYMBOL(class_multi_setup);
//EXPORT_SYMBOL(class_multi_cleanup);
-EXPORT_SYMBOL(class_signal_client_failure);
+EXPORT_SYMBOL(class_signal_connection_failure);
+EXPORT_SYMBOL(mds_destroy_export);
+EXPORT_SYMBOL(ldlm_destroy_export);
static int __init init_obdclass(void)
{
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
-*
- * linux/fs/ext2_obd/sim_obd.c
- * Copyright (C) 2001 Cluster File Systems, Inc.
+ *
+ * lustre/obdclass/genops.c
+ * Copyright (C) 2001-2002 Cluster File Systems, Inc.
*
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
return export;
}
+void class_destroy_export(struct obd_export *exp)
+{
+ int rc;
+ ENTRY;
+
+ spin_lock(&exp->exp_connection->c_lock);
+ list_del(&exp->exp_chain);
+ spin_unlock(&exp->exp_connection->c_lock);
+
+ /* XXXshaver these bits want to be hung off the export, instead of
+ * XXXshaver hard-coded here.
+ */
+ if (mds_destroy_export) {
+ rc = mds_destroy_export(exp);
+ if (rc)
+ CERROR("error freeing mds client data: rc = %d\n", rc);
+ }
+ if (ldlm_destroy_export) {
+ rc = ldlm_destroy_export(exp);
+ if (rc)
+ CERROR("error freeing dlm client data: rc = %d\n", rc);
+ }
+ kmem_cache_free(export_cachep, exp);
+
+ EXIT;
+}
+
/* a connection defines an export context in which preallocation can
be managed. */
int class_connect (struct lustre_handle *conn, struct obd_device *obd,
} else
CDEBUG(D_IOCTL, "disconnect: addr %Lx cookie %Lx\n",
(long long)conn->addr, (long long)conn->cookie);
- list_del(&export->exp_chain);
- kmem_cache_free(export_cachep, export);
+
+ class_destroy_export(export);
RETURN(0);
}
modulefs_DATA = ptlrpc.o
EXTRA_PROGRAMS = ptlrpc
-ptlrpc_SOURCES = connmgr.c recovd.c connection.c rpc.c events.c service.c client.c niobuf.c pack_generic.c
+ptlrpc_SOURCES = recovd.c connection.c rpc.c events.c service.c client.c niobuf.c pack_generic.c
include $(top_srcdir)/Rules
#include <linux/lustre_lib.h>
#include <linux/lustre_ha.h>
-void ptlrpc_init_client(struct recovd_obd *recovd,
- int (*recover)(struct ptlrpc_client *recover),
- int req_portal,
- int rep_portal, struct ptlrpc_client *cl)
+void ptlrpc_init_client(int req_portal, int rep_portal, struct ptlrpc_client *cl,
+ struct ptlrpc_connection *conn)
{
memset(cl, 0, sizeof(*cl));
- cl->cli_recover = recover;
- if (recovd)
- recovd_cli_manage(recovd, cl);
+ /* Some things, like the LDLM, can call us without a connection.
+ * I don't like it one bit.
+ */
+ if (conn) {
+ cl->cli_connection = conn;
+ list_add(&cl->cli_client_chain, &conn->c_clients);
+ }
cl->cli_obd = NULL;
cl->cli_request_portal = req_portal;
cl->cli_reply_portal = rep_portal;
- INIT_LIST_HEAD(&cl->cli_delayed_head);
- INIT_LIST_HEAD(&cl->cli_sending_head);
- INIT_LIST_HEAD(&cl->cli_dying_head);
- spin_lock_init(&cl->cli_lock);
sema_init(&cl->cli_rpc_sem, 32);
}
}
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
- struct ptlrpc_connection *conn,
int opcode, int count, int *lengths,
char **bufs)
{
struct ptlrpc_request *request;
+ struct ptlrpc_connection *conn = cl->cli_connection;
int rc;
ENTRY;
}
clobd = &export->exp_obd->u.cli;
- req = ptlrpc_prep_req(clobd->cl_client, clobd->cl_conn,
- opcode, count, lengths, bufs);
+ req = ptlrpc_prep_req(clobd->cl_client, opcode, count, lengths, bufs);
ptlrpc_hdl2req(req, &clobd->cl_exporth);
return req;
}
if (request->rq_reqmsg != NULL)
OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
- if (request->rq_client) {
- spin_lock(&request->rq_client->cli_lock);
+ if (request->rq_connection) {
+ spin_lock(&request->rq_connection->c_lock);
list_del_init(&request->rq_list);
- spin_unlock(&request->rq_client->cli_lock);
+ spin_unlock(&request->rq_connection->c_lock);
}
ptlrpc_put_connection(request->rq_connection);
}
/* caller must lock cli */
-void ptlrpc_free_committed(struct ptlrpc_client *cli)
+void ptlrpc_free_committed(struct ptlrpc_connection *conn)
{
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
- list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+ list_for_each_safe(tmp, saved, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) {
}
/* not yet committed */
- if (req->rq_transno > cli->cli_last_committed)
+ if (req->rq_transno > conn->c_last_committed)
break;
CDEBUG(D_INFO, "Marking request xid %Ld as committed ("
"transno=%Lu, last_committed=%Lu\n",
(long long)req->rq_xid, (long long)req->rq_transno,
- (long long)cli->cli_last_committed);
+ (long long)conn->c_last_committed);
if (atomic_dec_and_test(&req->rq_refcount)) {
/* we do this to prevent free_req deadlock */
list_del_init(&req->rq_list);
ptlrpc_free_req(req);
} else {
list_del_init(&req->rq_list);
- list_add(&req->rq_list, &cli->cli_dying_head);
+ list_add(&req->rq_list, &conn->c_dying_head);
}
}
{
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
+ struct ptlrpc_connection *conn = cli->cli_connection;
ENTRY;
- spin_lock(&cli->cli_lock);
- list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+ if (!conn) {
+ EXIT;
+ return;
+ }
+
+ spin_lock(&conn->c_lock);
+ list_for_each_safe(tmp, saved, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (req->rq_client != cli)
+ continue;
CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
list_del_init(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
}
- list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
+ list_for_each_safe(tmp, saved, &conn->c_dying_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (req->rq_client != cli)
+ continue;
CERROR("Request %p is on the dying list at cleanup!\n", req);
list_del_init(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
}
- spin_unlock(&cli->cli_lock);
+ spin_unlock(&conn->c_lock);
EXIT;
return;
req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
req->rq_flags |= PTL_RPC_FL_TIMEOUT;
/* Activate the recovd for this client, if there is one. */
- if (req->rq_client && req->rq_client->cli_recovd)
- recovd_cli_fail(req->rq_client);
+ if (req->rq_client && req->rq_client->cli_connection &&
+ req->rq_client->cli_connection->c_recovd)
+ recovd_conn_fail(req->rq_client->cli_connection);
/* If this request is for recovery or other primordial tasks,
* don't go back to sleep.
int rc = 0;
struct l_wait_info lwi;
struct ptlrpc_client *cli = req->rq_client;
+ struct ptlrpc_connection *conn = cli->cli_connection;
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
CERROR("process %d waiting for recovery (%d > %d)\n",
current->pid, req->rq_level, req->rq_connection->c_level);
- spin_lock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list, &cli->cli_delayed_head);
- spin_unlock(&cli->cli_lock);
+ list_add_tail(&req->rq_list, &conn->c_delayed_head);
+ spin_unlock(&conn->c_lock);
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(req->rq_wait_for_rep,
req->rq_level <= req->rq_connection->c_level,
&lwi);
- spin_lock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
list_del_init(&req->rq_list);
- spin_unlock(&cli->cli_lock);
+ spin_unlock(&conn->c_lock);
if (rc)
RETURN(rc);
RETURN(-rc);
}
- spin_lock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list, &cli->cli_sending_head);
- spin_unlock(&cli->cli_lock);
+ list_add_tail(&req->rq_list, &conn->c_sending_head);
+ spin_unlock(&conn->c_lock);
CDEBUG(D_OTHER, "-- sleeping\n");
lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
req->rq_replen, req->rq_repmsg->status);
- spin_lock(&cli->cli_lock);
- cli->cli_last_xid = req->rq_repmsg->last_xid;
- cli->cli_last_committed = req->rq_repmsg->last_committed;
- ptlrpc_free_committed(cli);
- spin_unlock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
+ conn->c_last_xid = req->rq_repmsg->last_xid;
+ conn->c_last_committed = req->rq_repmsg->last_committed;
+ ptlrpc_free_committed(conn);
+ spin_unlock(&conn->c_lock);
EXIT;
out:
c->c_generation = 1;
c->c_epoch = 1;
c->c_bootcount = 0;
+ INIT_LIST_HEAD(&c->c_delayed_head);
+ INIT_LIST_HEAD(&c->c_sending_head);
+ INIT_LIST_HEAD(&c->c_dying_head);
+ INIT_LIST_HEAD(&c->c_clients);
+ INIT_LIST_HEAD(&c->c_exports);
atomic_set(&c->c_refcount, 0);
ptlrpc_connection_addref(c);
spin_lock_init(&c->c_lock);
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * obd/rpc/recovd.c
- *
- * Lustre High Availability Daemon
- *
- * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
- *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
- *
- * by Peter Braam <braam@clusterfs.com>
- *
- */
-
-#define DEBUG_SUBSYSTEM S_RPC
-
-#include <linux/kmod.h>
-#include <linux/lustre_lite.h>
-#include <linux/lustre_ha.h>
-
-static int connmgr_unpack_body(struct ptlrpc_request *req)
-{
- struct connmgr_body *b = lustre_msg_buf(req->rq_repmsg, 0);
- if (b == NULL) {
- LBUG();
- RETURN(-EINVAL);
- }
-
- b->generation = NTOH__u32(b->generation);
-
- return 0;
-}
-
-int connmgr_connect(struct recovd_obd *recovd, struct ptlrpc_connection *conn)
-{
- struct ptlrpc_request *req;
- struct ptlrpc_client *cl;
- struct connmgr_body *body;
- int rc, size = sizeof(*body);
- ENTRY;
-
- if (!recovd) {
- CERROR("no manager\n");
- LBUG();
- }
- cl = recovd->recovd_client;
-
- req = ptlrpc_prep_req(cl, conn, CONNMGR_CONNECT, 1, &size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
-
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->generation = HTON__u32(conn->c_generation);
- body->conn = (__u64)(unsigned long)conn;
- body->conn_token = conn->c_token;
- strncpy(body->conn_uuid, conn->c_local_uuid, sizeof(body->conn_uuid));
-
- req->rq_replen = lustre_msg_size(1, &size);
- req->rq_level = LUSTRE_CONN_NEW;
-
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- if (!rc) {
- rc = connmgr_unpack_body(req);
- if (rc)
- GOTO(out_free, rc);
- body = lustre_msg_buf(req->rq_repmsg, 0);
- CDEBUG(D_NET, "remote generation: %o\n", body->generation);
- conn->c_level = LUSTRE_CONN_CON;
- conn->c_remote_conn = body->conn;
- conn->c_remote_token = body->conn_token;
- strncpy(conn->c_remote_uuid, body->conn_uuid,
- sizeof(conn->c_remote_uuid));
- }
-
- EXIT;
- out_free:
- ptlrpc_free_req(req);
- out:
- return rc;
-}
-
-static int connmgr_handle_connect(struct ptlrpc_request *req)
-{
- struct connmgr_body *body;
- int rc, size = sizeof(*body);
- ENTRY;
-
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc) {
- CERROR("connmgr: out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
- }
-
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- connmgr_unpack_body(req);
-
- req->rq_connection->c_remote_conn = body->conn;
- req->rq_connection->c_remote_token = body->conn_token;
- strncpy(req->rq_connection->c_remote_uuid, body->conn_uuid,
- sizeof(req->rq_connection->c_remote_uuid));
-
- CERROR("incoming generation %d\n", body->generation);
- body = lustre_msg_buf(req->rq_repmsg, 0);
- body->generation = 4711;
- body->conn = (__u64)(unsigned long)req->rq_connection;
- body->conn_token = req->rq_connection->c_token;
-
- req->rq_connection->c_level = LUSTRE_CONN_CON;
- RETURN(0);
-}
-
-int connmgr_handle(struct ptlrpc_request *req)
-{
- int rc;
- ENTRY;
-
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc) {
- CERROR("Invalid request\n");
- GOTO(out, rc);
- }
-
- if (req->rq_reqmsg->type != NTOH__u32(PTL_RPC_MSG_REQUEST)) {
- CERROR("wrong packet type sent %d\n",
- req->rq_reqmsg->type);
- GOTO(out, rc = -EINVAL);
- }
-
- switch (req->rq_reqmsg->opc) {
- case CONNMGR_CONNECT:
- CDEBUG(D_INODE, "connmgr connect\n");
- rc = connmgr_handle_connect(req);
- break;
-
- default:
- rc = ptlrpc_error(req->rq_svc, req);
- RETURN(rc);
- }
-
- EXIT;
-out:
- if (rc) {
- ptlrpc_error(req->rq_svc, req);
- } else {
- CDEBUG(D_NET, "sending reply\n");
- ptlrpc_reply(req->rq_svc, req);
- }
-
- return 0;
-}
#include <linux/lustre_ha.h>
#include <linux/obd_support.h>
-struct recovd_obd *ptlrpc_connmgr;
-
-void recovd_cli_manage(struct recovd_obd *recovd, struct ptlrpc_client *cli)
+void recovd_conn_manage(struct recovd_obd *recovd,
+ struct ptlrpc_connection *conn)
{
ENTRY;
- cli->cli_recovd = recovd;
+ conn->c_recovd = recovd;
spin_lock(&recovd->recovd_lock);
- list_add(&cli->cli_ha_item, &recovd->recovd_clients_lh);
+ list_add(&conn->c_recovd_data.rd_managed_chain,
+ &recovd->recovd_managed_items);
spin_unlock(&recovd->recovd_lock);
EXIT;
}
-void recovd_cli_fail(struct ptlrpc_client *cli)
+void recovd_conn_fail(struct ptlrpc_connection *conn)
{
ENTRY;
- spin_lock(&cli->cli_recovd->recovd_lock);
- cli->cli_recovd->recovd_flags |= RECOVD_FAIL;
- cli->cli_recovd->recovd_wakeup_flag = 1;
- list_del(&cli->cli_ha_item);
- list_add(&cli->cli_ha_item, &cli->cli_recovd->recovd_troubled_lh);
- spin_unlock(&cli->cli_recovd->recovd_lock);
- wake_up(&cli->cli_recovd->recovd_waitq);
+ spin_lock(&conn->c_recovd->recovd_lock);
+ conn->c_recovd->recovd_flags |= RECOVD_FAIL;
+ conn->c_recovd->recovd_wakeup_flag = 1;
+ list_del(&conn->c_recovd_data.rd_managed_chain);
+ list_add(&conn->c_recovd_data.rd_managed_chain,
+ &conn->c_recovd->recovd_troubled_items);
+ spin_unlock(&conn->c_recovd->recovd_lock);
+ wake_up(&conn->c_recovd->recovd_waitq);
EXIT;
}
-/* this function must be called with cli->cli_lock held */
-void recovd_cli_fixed(struct ptlrpc_client *cli)
+/* this function must be called with conn->c_lock held */
+void recovd_conn_fixed(struct ptlrpc_connection *conn)
{
ENTRY;
- list_del(&cli->cli_ha_item);
- list_add(&cli->cli_ha_item, &cli->cli_recovd->recovd_clients_lh);
+ list_del(&conn->c_recovd_data.rd_managed_chain);
+ list_add(&conn->c_recovd_data.rd_managed_chain,
+ &conn->c_recovd->recovd_managed_items);
EXIT;
}
if (recovd->recovd_flags & RECOVD_UPCALL_ANSWER) {
CERROR("UPCALL_WAITING: upcall answer\n");
- while (!list_empty(&recovd->recovd_troubled_lh)) {
- struct ptlrpc_client *cli =
- list_entry(recovd->recovd_troubled_lh.next,
- struct ptlrpc_client, cli_ha_item);
+ while (!list_empty(&recovd->recovd_troubled_items)) {
+ struct recovd_data *rd =
+ list_entry(recovd->recovd_troubled_items.next,
+ struct recovd_data, rd_managed_chain);
- list_del(&cli->cli_ha_item);
- if (cli->cli_recover) {
+ list_del(&rd->rd_managed_chain);
+ if (rd->rd_recover) {
spin_unlock(&recovd->recovd_lock);
- cli->cli_recover(cli);
+ rd->rd_recover(rd);
spin_lock(&recovd->recovd_lock);
}
}
int recovd_setup(struct recovd_obd *recovd)
{
int rc;
- extern void (*class_signal_client_failure)(struct ptlrpc_client *);
+ extern void (*class_signal_connection_failure)
+ (struct ptlrpc_connection *);
ENTRY;
- INIT_LIST_HEAD(&recovd->recovd_clients_lh);
- INIT_LIST_HEAD(&recovd->recovd_troubled_lh);
+ INIT_LIST_HEAD(&recovd->recovd_managed_items);
+ INIT_LIST_HEAD(&recovd->recovd_troubled_items);
spin_lock_init(&recovd->recovd_lock);
init_waitqueue_head(&recovd->recovd_waitq);
wait_event(recovd->recovd_ctl_waitq, recovd->recovd_flags & RECOVD_IDLE);
/* exported and called by obdclass timeout handlers */
- class_signal_client_failure = recovd_cli_fail;
+ class_signal_connection_failure = recovd_conn_fail;
RETURN(0);
}
MOD_INC_USE_COUNT;
memset(recovd, 0, sizeof(*recovd));
- OBD_ALLOC(recovd->recovd_client, sizeof(*recovd->recovd_client));
- if (!recovd)
- GOTO(err_dec, err = -ENOMEM);
-
err = recovd_setup(recovd);
- if (err)
- GOTO(err_free, err);
-
- recovd->recovd_service = ptlrpc_init_svc(16* 1024,
- CONNMGR_REQUEST_PORTAL,
- CONNMGR_REPLY_PORTAL,
- "self", connmgr_handle,
- "connmgr");
- if (!recovd->recovd_service) {
- CERROR("failed to start service\n");
- GOTO(err_recovd, err = -ENOMEM);
- }
-
- ptlrpc_init_client(NULL, NULL, CONNMGR_REQUEST_PORTAL,
- CONNMGR_REPLY_PORTAL, recovd->recovd_client);
- recovd->recovd_client->cli_name = "connmgr";
-
- err = ptlrpc_start_thread(obddev, recovd->recovd_service,
- "lustre_connmgr");
if (err) {
- CERROR("cannot start thread\n");
- GOTO(err_svc, err);
+ MOD_DEC_USE_COUNT;
+ RETURN(err);
}
- ptlrpc_connmgr = recovd;
RETURN(0);
-
-err_svc:
- ptlrpc_unregister_service(recovd->recovd_service);
-err_recovd:
- recovd_cleanup(recovd);
-err_free:
- OBD_FREE(recovd->recovd_client, sizeof(*recovd->recovd_client));
-err_dec:
- MOD_DEC_USE_COUNT;
- RETURN(err);
}
int connmgr_cleanup(struct obd_device *dev)
if (err)
LBUG();
- ptlrpc_stop_all_threads(recovd->recovd_service);
- ptlrpc_unregister_service(recovd->recovd_service);
- ptlrpc_cleanup_client(recovd->recovd_client);
- OBD_FREE(recovd->recovd_client, sizeof(*recovd->recovd_client));
MOD_DEC_USE_COUNT;
RETURN(0);
}
static struct obd_ops recovd_obd_ops = {
o_setup: connmgr_setup,
o_cleanup: connmgr_cleanup,
- o_iocontrol: connmgr_iocontrol,
+ o_iocontrol: connmgr_iocontrol,
};
static int __init ptlrpc_init(void)
}
/* connmgr.c */
-EXPORT_SYMBOL(ptlrpc_connmgr);
-EXPORT_SYMBOL(connmgr_connect);
-EXPORT_SYMBOL(connmgr_handle);
-EXPORT_SYMBOL(recovd_cli_fail);
-EXPORT_SYMBOL(recovd_cli_manage);
-EXPORT_SYMBOL(recovd_cli_fixed);
+EXPORT_SYMBOL(recovd_conn_fail);
+EXPORT_SYMBOL(recovd_conn_manage);
+EXPORT_SYMBOL(recovd_conn_fixed);
EXPORT_SYMBOL(recovd_setup);
EXPORT_SYMBOL(recovd_cleanup);