X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fgenops.c;h=3bac4a35cebd3ad7713b16c053592a8bc27e2947;hp=926991a93c74cad742df5579191cacb31a42dda4;hb=6aec038b3693221aea8f1c666bf56e794be939c0;hpb=7312616768bfed768ecc00ba20322c37568138d0 diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 926991a..3bac4a3 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -1,7 +1,7 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (c) 2001, 2002 Cluster File Systems, Inc. + * Copyright (c) 2001-2003 Cluster File Systems, Inc. * * This file is part of Lustre, http://www.lustre.org. * @@ -23,93 +23,138 @@ */ #define DEBUG_SUBSYSTEM S_CLASS +#ifdef __KERNEL__ #include /* for request_module() */ #include #include #include #include +#include +#else +#include +#include +#include +#endif #include extern struct list_head obd_types; +static spinlock_t obd_types_lock = SPIN_LOCK_UNLOCKED; kmem_cache_t *obdo_cachep = NULL; kmem_cache_t *import_cachep = NULL; -kmem_cache_t *export_cachep = NULL; int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c); +void (*ptlrpc_abort_inflight_superhack)(struct obd_import *imp); /* * support functions: we could use inter-module communication, but this * is more portable to other OS's */ -static struct obd_type *class_search_type(char *nm) +static struct obd_type *class_search_type(char *name) { struct list_head *tmp; struct obd_type *type; - CDEBUG(D_INFO, "SEARCH %s\n", nm); - tmp = &obd_types; + spin_lock(&obd_types_lock); list_for_each(tmp, &obd_types) { type = list_entry(tmp, struct obd_type, typ_chain); - CDEBUG(D_INFO, "TYP %s\n", type->typ_name); - if (strlen(type->typ_name) == strlen(nm) && - strcmp(type->typ_name, nm) == 0 ) { + if (strlen(type->typ_name) == strlen(name) && + strcmp(type->typ_name, name) == 0) { + spin_unlock(&obd_types_lock); return type; } } + spin_unlock(&obd_types_lock); return NULL; } -struct obd_type *class_nm_to_type(char *nm) +struct obd_type *class_get_type(char *name) { - struct obd_type *type = class_search_type(nm); + struct obd_type *type = class_search_type(name); #ifdef CONFIG_KMOD - if ( !type ) { - if ( !request_module(nm) ) { - CDEBUG(D_INFO, "Loaded module '%s'\n", nm); - type = class_search_type(nm); - } else { - CDEBUG(D_INFO, "Can't load module '%s'\n", nm); - } + if (!type) { + if (!request_module(name)) { + CDEBUG(D_INFO, "Loaded module '%s'\n", name); + type = class_search_type(name); + } else + CDEBUG(D_INFO, "Can't load module '%s'\n", name); } #endif + if (type) + try_module_get(type->typ_ops->o_owner); return type; } -int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars, - char *nm) +void class_put_type(struct obd_type *type) { - struct obd_type *type; - int rc; + LASSERT(type); + module_put(type->typ_ops->o_owner); +} +int class_register_type(struct obd_ops *ops, struct md_ops *md_ops, + struct lprocfs_vars *vars, char *name) +{ + struct obd_type *type; + int rc = 0; ENTRY; - if (class_search_type(nm)) { - CDEBUG(D_IOCTL, "Type %s already registered\n", nm); + LASSERT(strnlen(name, 1024) < 1024); /* sanity check */ + + if (class_search_type(name)) { + CDEBUG(D_IOCTL, "Type %s already registered\n", name); RETURN(-EEXIST); } + rc = -ENOMEM; OBD_ALLOC(type, sizeof(*type)); + if (type == NULL) + RETURN(rc); + OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops)); - OBD_ALLOC(type->typ_name, strlen(nm) + 1); - if (!type) - RETURN(-ENOMEM); - INIT_LIST_HEAD(&type->typ_chain); - CDEBUG(D_INFO, "MOD_INC_USE for register_type: count = %d\n", - atomic_read(&(THIS_MODULE)->uc.usecount)); - MOD_INC_USE_COUNT; + OBD_ALLOC(type->typ_name, strlen(name) + 1); + if (md_ops) + OBD_ALLOC(type->typ_md_ops, sizeof(*type->typ_md_ops)); + if (type->typ_ops == NULL || type->typ_name == NULL || + (md_ops && type->typ_md_ops == NULL)) + GOTO (failed, rc); + + *(type->typ_ops) = *ops; + if (md_ops) + *(type->typ_md_ops) = *md_ops; + else + type->typ_md_ops = NULL; + strcpy(type->typ_name, name); + +#ifdef LPROCFS + type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root, + vars, type); +#endif + if (IS_ERR(type->typ_procroot)) { + rc = PTR_ERR(type->typ_procroot); + type->typ_procroot = NULL; + GOTO (failed, rc); + } + + spin_lock(&obd_types_lock); list_add(&type->typ_chain, &obd_types); - memcpy(type->typ_ops, ops, sizeof(*type->typ_ops)); - strcpy(type->typ_name, nm); - rc = lprocfs_reg_class(type, vars, type); + spin_unlock(&obd_types_lock); + + RETURN (0); + failed: + if (type->typ_name != NULL) + OBD_FREE(type->typ_name, strlen(name) + 1); + if (type->typ_ops != NULL) + OBD_FREE (type->typ_ops, sizeof (*type->typ_ops)); + if (type->typ_md_ops != NULL) + OBD_FREE (type->typ_md_ops, sizeof (*type->typ_md_ops)); + OBD_FREE(type, sizeof(*type)); RETURN(rc); } -int class_unregister_type(char *nm) +int class_unregister_type(char *name) { - struct obd_type *type = class_nm_to_type(nm); - + struct obd_type *type = class_search_type(name); ENTRY; if (!type) { @@ -118,75 +163,156 @@ int class_unregister_type(char *nm) } if (type->typ_refcnt) { - CERROR("type %s has refcount (%d)\n", nm, type->typ_refcnt); + CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt); /* This is a bad situation, let's make the best of it */ /* Remove ops, but leave the name for debugging */ OBD_FREE(type->typ_ops, sizeof(*type->typ_ops)); RETURN(-EBUSY); } - if(type->typ_procroot) - lprocfs_dereg_class(type); + if (type->typ_procroot) { + lprocfs_remove(type->typ_procroot); + type->typ_procroot = NULL; + } + + spin_lock(&obd_types_lock); list_del(&type->typ_chain); - OBD_FREE(type->typ_name, strlen(nm) + 1); + spin_unlock(&obd_types_lock); + OBD_FREE(type->typ_name, strlen(name) + 1); if (type->typ_ops != NULL) OBD_FREE(type->typ_ops, sizeof(*type->typ_ops)); + if (type->typ_md_ops != NULL) + OBD_FREE (type->typ_md_ops, sizeof (*type->typ_md_ops)); OBD_FREE(type, sizeof(*type)); - CDEBUG(D_INFO, "MOD_DEC_USE for register_type: count = %d\n", - atomic_read(&(THIS_MODULE)->uc.usecount) - 1); - MOD_DEC_USE_COUNT; RETURN(0); } /* class_unregister_type */ +struct obd_device *class_newdev(int *dev) +{ + struct obd_device *result = NULL; + int i; + + for (i = 0 ; i < MAX_OBD_DEVICES ; i++) { + struct obd_device *obd = &obd_dev[i]; + if (!obd->obd_type) { + result = obd; + if (dev) + *dev = i; + break; + } + } + return result; +} + int class_name2dev(char *name) { - int res = -1; int i; if (!name) return -1; - for (i=0; i < MAX_OBD_DEVICES; i++) { + for (i = 0; i < MAX_OBD_DEVICES; i++) { struct obd_device *obd = &obd_dev[i]; - if (obd->obd_name && strcmp(name, obd->obd_name) == 0) { - res = i; - return res; - } + if (obd->obd_name && strcmp(name, obd->obd_name) == 0) + return i; } - return res; + return -1; } -int class_uuid2dev(char *uuid) +struct obd_device *class_name2obd(char *name) +{ + int dev = class_name2dev(name); + if (dev < 0) + return NULL; + return &obd_dev[dev]; +} + +int class_uuid2dev(struct obd_uuid *uuid) { - int res = -1; int i; - for (i=0; i < MAX_OBD_DEVICES; i++) { + for (i = 0; i < MAX_OBD_DEVICES; i++) { struct obd_device *obd = &obd_dev[i]; - if (strncmp(uuid, obd->obd_uuid, sizeof(obd->obd_uuid)) == 0) { - res = i; - return res; - } + if (obd_uuid_equals(uuid, &obd->obd_uuid)) + return i; } - return res; + return -1; } +struct obd_device *class_uuid2obd(struct obd_uuid *uuid) +{ + int dev = class_uuid2dev(uuid); + if (dev < 0) + return NULL; + return &obd_dev[dev]; +} -struct obd_device *class_uuid2obd(char *uuid) +/* Search for a client OBD connected to tgt_uuid. If grp_uuid is + specified, then only the client with that uuid is returned, + otherwise any client connected to the tgt is returned. + If tgt_uuid is NULL, the lov with grp_uuid is returned. */ +struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, + char * typ_name, + struct obd_uuid *grp_uuid) { int i; - for (i=0; i < MAX_OBD_DEVICES; i++) { + for (i = 0; i < MAX_OBD_DEVICES; i++) { struct obd_device *obd = &obd_dev[i]; - if (strncmp(uuid, obd->obd_uuid, sizeof(obd->obd_uuid)) == 0) + if (obd->obd_type == NULL) + continue; + if ((strncmp(obd->obd_type->typ_name, typ_name, + strlen(typ_name)) == 0)) { + struct client_obd *cli = &obd->u.cli; + struct obd_import *imp = cli->cl_import; + if (tgt_uuid == NULL) { + LASSERT(grp_uuid); + if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) + return obd; + continue; + } + if (obd_uuid_equals(tgt_uuid, &imp->imp_target_uuid) && + ((grp_uuid)? obd_uuid_equals(grp_uuid, + &obd->obd_uuid) : 1)) { + return obd; + } + } + } + + return NULL; +} + +/* Iterate the obd_device list looking devices have grp_uuid. Start + searching at *next, and if a device is found, the next index to look + it is saved in *next. If next is NULL, then the first matching device + will always be returned. */ +struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next) +{ + int i; + if (next == NULL) + i = 0; + else if (*next >= 0 && *next < MAX_OBD_DEVICES) + i = *next; + else + return NULL; + + for (; i < MAX_OBD_DEVICES; i++) { + struct obd_device *obd = &obd_dev[i]; + if (obd->obd_type == NULL) + continue; + if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) { + if (next != NULL) + *next = i+1; return obd; + } } return NULL; } + void obd_cleanup_caches(void) { int rc; @@ -203,12 +329,6 @@ void obd_cleanup_caches(void) CERROR("Cannot destory ll_import_cache\n"); import_cachep = NULL; } - if (export_cachep) { - rc = kmem_cache_destroy(export_cachep); - if (rc) - CERROR("Cannot destory ll_export_cache\n"); - export_cachep = NULL; - } EXIT; } @@ -221,13 +341,6 @@ int obd_init_caches(void) if (!obdo_cachep) GOTO(out, -ENOMEM); - LASSERT(export_cachep == NULL); - export_cachep = kmem_cache_create("ll_export_cache", - sizeof(struct obd_export), - 0, 0, NULL, NULL); - if (!export_cachep) - GOTO(out, -ENOMEM); - LASSERT(import_cachep == NULL); import_cachep = kmem_cache_create("ll_import_cache", sizeof(struct obd_import), @@ -253,251 +366,443 @@ struct obd_export *class_conn2export(struct lustre_handle *conn) RETURN(NULL); } - if (conn->addr == -1) { /* this means assign a new connection */ + if (conn->cookie == -1) { /* this means assign a new connection */ CDEBUG(D_CACHE, "want a new connection\n"); RETURN(NULL); } - if (!conn->addr) { - CDEBUG(D_CACHE, "looking for null addr\n"); - fixme(); - RETURN(NULL); - } - - CDEBUG(D_IOCTL, "looking for export addr "LPX64" cookie "LPX64"\n", - conn->addr, conn->cookie); - export = (struct obd_export *) (unsigned long)conn->addr; - if (!kmem_cache_validate(export_cachep, (void *)export)) - RETURN(NULL); - - if (export->exp_cookie != conn->cookie) - RETURN(NULL); + CDEBUG(D_IOCTL, "looking for export cookie "LPX64"\n", conn->cookie); + export = class_handle2object(conn->cookie); RETURN(export); -} /* class_conn2export */ +} + +struct obd_device *class_exp2obd(struct obd_export *exp) +{ + if (exp) + return exp->exp_obd; + return NULL; +} struct obd_device *class_conn2obd(struct lustre_handle *conn) { struct obd_export *export; export = class_conn2export(conn); - if (export) - return export->exp_obd; - fixme(); + if (export) { + struct obd_device *obd = export->exp_obd; + class_export_put(export); + return obd; + } return NULL; } +struct obd_import *class_exp2cliimp(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + if (obd == NULL) + return NULL; + return obd->u.cli.cl_import; +} + struct obd_import *class_conn2cliimp(struct lustre_handle *conn) { - return &class_conn2obd(conn)->u.cli.cl_import; + struct obd_device *obd = class_conn2obd(conn); + if (obd == NULL) + return NULL; + return obd->u.cli.cl_import; } -struct obd_import *class_conn2ldlmimp(struct lustre_handle *conn) +/* Export management functions */ +static void export_handle_addref(void *export) { - return &class_conn2export(conn)->exp_ldlm_data.led_import; + class_export_get(export); +} + +void __class_export_put(struct obd_export *exp) +{ + if (atomic_dec_and_test(&exp->exp_refcount)) { + struct obd_device *obd = exp->exp_obd; + CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp, + exp->exp_client_uuid.uuid); + + LASSERT(obd != NULL); + + /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */ + if (exp->exp_connection) + ptlrpc_put_connection_superhack(exp->exp_connection); + + LASSERT(list_empty(&exp->exp_outstanding_replies)); + LASSERT(list_empty(&exp->exp_handle.h_link)); + obd_destroy_export(exp); + + OBD_FREE(exp, sizeof(*exp)); + if (obd->obd_set_up) { + atomic_dec(&obd->obd_refcount); + wake_up(&obd->obd_refcount_waitq); + } else { + CERROR("removing export %p from obd %s (%p) -- OBD " + "not set up (refcount = %d)\n", exp, + obd->obd_name, obd, + atomic_read(&obd->obd_refcount)); + } + } } -struct obd_export *class_new_export(struct obd_device *obddev) +/* Creates a new export, adds it to the hash table, and returns a + * pointer to it. The refcount is 2: one for the hash reference, and + * one for the pointer returned by this function. */ +struct obd_export *class_new_export(struct obd_device *obd) { - struct obd_export * export; + struct obd_export *export; - export = kmem_cache_alloc(export_cachep, GFP_KERNEL); + OBD_ALLOC(export, sizeof(*export)); if (!export) { - CERROR("no memory! (minor %d)\n", obddev->obd_minor); + CERROR("no memory! (minor %d)\n", obd->obd_minor); return NULL; } - memset(export, 0, sizeof(*export)); - get_random_bytes(&export->exp_cookie, sizeof(export->exp_cookie)); - export->exp_obd = obddev; + export->exp_conn_cnt = 0; + atomic_set(&export->exp_refcount, 2); + atomic_set(&export->exp_rpc_count, 0); + export->exp_obd = obd; + INIT_LIST_HEAD(&export->exp_outstanding_replies); /* XXX this should be in LDLM init */ INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks); - INIT_LIST_HEAD(&export->exp_conn_chain); - spin_lock(&obddev->obd_dev_lock); + + INIT_LIST_HEAD(&export->exp_handle.h_link); + class_handle_hash(&export->exp_handle, export_handle_addref); + spin_lock_init(&export->exp_lock); + + spin_lock(&obd->obd_dev_lock); + LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */ + atomic_inc(&obd->obd_refcount); list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports); - spin_unlock(&obddev->obd_dev_lock); + export->exp_obd->obd_num_exports++; + spin_unlock(&obd->obd_dev_lock); + obd_init_export(export); return export; } -void class_destroy_export(struct obd_export *exp) +void class_unlink_export(struct obd_export *exp) { - ENTRY; - - LASSERT(exp->exp_cookie != DEAD_HANDLE_MAGIC); + class_handle_unhash(&exp->exp_handle); spin_lock(&exp->exp_obd->obd_dev_lock); - list_del(&exp->exp_obd_chain); + list_del_init(&exp->exp_obd_chain); + exp->exp_obd->obd_num_exports--; spin_unlock(&exp->exp_obd->obd_dev_lock); - /* XXXshaver no connection here... */ - if (exp->exp_connection) - spin_lock(&exp->exp_connection->c_lock); - list_del(&exp->exp_conn_chain); - if (exp->exp_connection) { - spin_unlock(&exp->exp_connection->c_lock); - ptlrpc_put_connection_superhack(exp->exp_connection); + class_export_put(exp); +} + +/* Import management functions */ +static void import_handle_addref(void *import) +{ + class_import_get(import); +} + +struct obd_import *class_import_get(struct obd_import *import) +{ + atomic_inc(&import->imp_refcount); + CDEBUG(D_IOCTL, "import %p refcount=%d\n", import, + atomic_read(&import->imp_refcount)); + return import; +} + +void class_import_put(struct obd_import *import) +{ + ENTRY; + + CDEBUG(D_IOCTL, "import %p refcount=%d\n", import, + atomic_read(&import->imp_refcount) - 1); + + LASSERT(atomic_read(&import->imp_refcount) > 0); + LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a); + if (!atomic_dec_and_test(&import->imp_refcount)) { + EXIT; + return; } - exp->exp_cookie = DEAD_HANDLE_MAGIC; - kmem_cache_free(export_cachep, exp); + CDEBUG(D_IOCTL, "destroying import %p\n", import); + + ptlrpc_put_connection_superhack(import->imp_connection); + LASSERT(list_empty(&import->imp_handle.h_link)); + OBD_FREE(import, sizeof(*import)); EXIT; } -/* a connection defines an export context in which preallocation can - be managed. */ -int class_connect(struct lustre_handle *conn, struct obd_device *obd, - obd_uuid_t cluuid) +struct obd_import *class_new_import(void) { - struct obd_export * export; - if (conn == NULL) { - LBUG(); - return -EINVAL; - } + struct obd_import *imp; - if (obd == NULL) { - LBUG(); - return -EINVAL; - } + OBD_ALLOC(imp, sizeof(*imp)); + if (imp == NULL) + return NULL; - export = class_new_export(obd); - if (!export) - return -ENOMEM; + INIT_LIST_HEAD(&imp->imp_replay_list); + INIT_LIST_HEAD(&imp->imp_sending_list); + INIT_LIST_HEAD(&imp->imp_delayed_list); + spin_lock_init(&imp->imp_lock); + imp->imp_conn_cnt = 0; + imp->imp_max_transno = 0; + imp->imp_peer_committed_transno = 0; + imp->imp_state = LUSTRE_IMP_NEW; + init_waitqueue_head(&imp->imp_recovery_waitq); + + atomic_set(&imp->imp_refcount, 2); + atomic_set(&imp->imp_inflight, 0); + atomic_set(&imp->imp_replay_inflight, 0); + INIT_LIST_HEAD(&imp->imp_handle.h_link); + class_handle_hash(&imp->imp_handle, import_handle_addref); + + return imp; +} + +void class_destroy_import(struct obd_import *import) +{ + LASSERT(import != NULL); + LASSERT(import != LP_POISON); + + class_handle_unhash(&import->imp_handle); - conn->addr = (__u64) (unsigned long)export; - conn->cookie = export->exp_cookie; + /* Abort any inflight DLM requests and NULL out their (about to be + * freed) import. */ + /* Invalidate all requests on import, would be better to call + ptlrpc_set_import_active(imp, 0); */ + import->imp_generation++; + ptlrpc_abort_inflight_superhack(import); - CDEBUG(D_IOCTL, "connect: addr %Lx cookie %Lx\n", - (long long)conn->addr, (long long)conn->cookie); - return 0; + class_import_put(import); } -int class_disconnect(struct lustre_handle *conn) +/* A connection defines an export context in which preallocation can + be managed. This releases the export pointer reference, and returns + the export handle, so the export refcount is 1 when this function + returns. */ +int class_connect(struct lustre_handle *conn, struct obd_device *obd, + struct obd_uuid *cluuid) { struct obd_export *export; + LASSERT(conn != NULL); + LASSERT(obd != NULL); + LASSERT(cluuid != NULL); ENTRY; - if (!(export = class_conn2export(conn))) { + export = class_new_export(obd); + if (export == NULL) + RETURN(-ENOMEM); + + conn->cookie = export->exp_handle.h_cookie; + memcpy(&export->exp_client_uuid, cluuid, + sizeof(export->exp_client_uuid)); + class_export_put(export); + + CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n", + cluuid->uuid, conn->cookie); + RETURN(0); +} + +/* This function removes two references from the export: one for the + * hash entry and one for the export pointer passed in. The export + * pointer passed to this function is destroyed should not be used + * again. */ +int class_disconnect(struct obd_export *export, int flags) +{ + ENTRY; + + if (export == NULL) { fixme(); - CDEBUG(D_IOCTL, "disconnect: attempting to free " - "nonexistent client "LPX64"\n", conn->addr); + CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export); RETURN(-EINVAL); } - CDEBUG(D_IOCTL, "disconnect: addr %Lx cookie %Lx\n", - (long long)conn->addr, (long long)conn->cookie); + /* XXX this shouldn't have to be here, but double-disconnect will crash + * otherwise, and sometimes double-disconnect happens. abort_recovery, + * for example. */ + if (list_empty(&export->exp_handle.h_link)) + RETURN(0); - class_destroy_export(export); + CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n", + export->exp_handle.h_cookie); + if (export->exp_handle.h_cookie == 0x5a5a5a5a5a5a5a5a) { + CERROR("disconnecting freed export %p, ignoring\n", export); + } else { + class_unlink_export(export); + class_export_put(export); + } RETURN(0); } -void class_disconnect_all(struct obd_device *obddev) -{ - int again = 1; - - while (again) { - spin_lock(&obddev->obd_dev_lock); - if (!list_empty(&obddev->obd_exports)) { - struct obd_export *export; - struct lustre_handle conn; - int rc; - - export = list_entry(obddev->obd_exports.next, - struct obd_export, - exp_obd_chain); - conn.addr = (__u64)(unsigned long)export; - conn.cookie = export->exp_cookie; - spin_unlock(&obddev->obd_dev_lock); - CERROR("force disconnecting %s:%s export %p\n", - export->exp_obd->obd_type->typ_name, - export->exp_connection->c_remote_uuid, export); - rc = obd_disconnect(&conn); - if (rc < 0) { - /* AED: not so sure about this... We can't - * loop here forever, yet we shouldn't leak - * exports on a struct we will soon destroy. - */ - CERROR("destroy export %p with err: rc = %d\n", - export, rc); - class_destroy_export(export); - } +void class_disconnect_exports(struct obd_device *obd, int flags) +{ + int rc; + struct list_head *tmp, *n, work_list; + struct lustre_handle fake_conn; + struct obd_export *fake_exp, *exp; + ENTRY; + + /* Move all of the exports from obd_exports to a work list, en masse. */ + spin_lock(&obd->obd_dev_lock); + list_add(&work_list, &obd->obd_exports); + list_del_init(&obd->obd_exports); + spin_unlock(&obd->obd_dev_lock); + + CDEBUG(D_HA, "OBD device %d (%p) has exports, " + "disconnecting them\n", obd->obd_minor, obd); + list_for_each_safe(tmp, n, &work_list) { + exp = list_entry(tmp, struct obd_export, exp_obd_chain); + class_export_get(exp); + + if (obd_uuid_equals(&exp->exp_client_uuid, + &exp->exp_obd->obd_uuid)) { + CDEBUG(D_HA, + "exp %p export uuid == obd uuid, don't discon\n", + exp); + /* Need to delete this now so we don't end up pointing + * to work_list later when this export is cleaned up. */ + list_del_init(&exp->exp_obd_chain); + class_export_put(exp); + continue; + } + + fake_conn.cookie = exp->exp_handle.h_cookie; + fake_exp = class_conn2export(&fake_conn); + if (!fake_exp) { + class_export_put(exp); + continue; + } + rc = obd_disconnect(fake_exp, flags); + class_export_put(exp); + if (rc) { + CDEBUG(D_HA, "disconnecting export %p failed: %d\n", + exp, rc); } else { - spin_unlock(&obddev->obd_dev_lock); - again = 0; + CDEBUG(D_HA, "export %p disconnected\n", exp); } } + EXIT; } -#if 0 - -/* FIXME: Data is a space- or comma-separated list of device IDs. This will - * have to change. */ -int class_multi_setup(struct obd_device *obddev, uint32_t len, void *data) +int oig_init(struct obd_io_group **oig_out) { - int count, rc; - char *p; + struct obd_io_group *oig; ENTRY; - for (p = data, count = 0; p < (char *)data + len; count++) { - char *end; - int tmp = simple_strtoul(p, &end, 0); + OBD_ALLOC(oig, sizeof(*oig)); + if (oig == NULL) + RETURN(-ENOMEM); - if (p == end) { - CERROR("invalid device ID starting at: %s\n", p); - GOTO(err_disconnect, rc = -EINVAL); - } + spin_lock_init(&oig->oig_lock); + oig->oig_rc = 0; + oig->oig_pending = 0; + atomic_set(&oig->oig_refcount, 1); + init_waitqueue_head(&oig->oig_waitq); + INIT_LIST_HEAD(&oig->oig_occ_list); - if (tmp < 0 || tmp >= MAX_OBD_DEVICES) { - CERROR("Trying to sub dev %d - dev no too large\n", - tmp); - GOTO(err_disconnect, rc = -EINVAL); - } + *oig_out = oig; + RETURN(0); +}; - rc = obd_connect(&obddev->obd_multi_conn[count], &obd_dev[tmp]); - if (rc) { - CERROR("cannot connect to device %d: rc = %d\n", tmp, - rc); - GOTO(err_disconnect, rc); - } +static inline void oig_grab(struct obd_io_group *oig) +{ + atomic_inc(&oig->oig_refcount); +} +void oig_release(struct obd_io_group *oig) +{ + if (atomic_dec_and_test(&oig->oig_refcount)) + OBD_FREE(oig, sizeof(*oig)); +} - CDEBUG(D_INFO, "target OBD %d is of type %s\n", count, - obd_dev[tmp].obd_type->typ_name); +void oig_add_one(struct obd_io_group *oig, + struct oig_callback_context *occ) +{ + unsigned long flags; + CDEBUG(D_CACHE, "oig %p ready to roll\n", oig); + spin_lock_irqsave(&oig->oig_lock, flags); + oig->oig_pending++; + if (occ != NULL) + list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list); + spin_unlock_irqrestore(&oig->oig_lock, flags); + oig_grab(oig); +} - p = end + 1; - } +void oig_complete_one(struct obd_io_group *oig, + struct oig_callback_context *occ, int rc) +{ + unsigned long flags; + wait_queue_head_t *wake = NULL; + int old_rc; - obddev->obd_multi_count = count; + spin_lock_irqsave(&oig->oig_lock, flags); - RETURN(0); + if (occ != NULL) + list_del_init(&occ->occ_oig_item); + + old_rc = oig->oig_rc; + if (oig->oig_rc == 0 && rc != 0) + oig->oig_rc = rc; - err_disconnect: - for (count--; count >= 0; count--) - obd_disconnect(&obddev->obd_multi_conn[count]); + if (--oig->oig_pending <= 0) + wake = &oig->oig_waitq; + + spin_unlock_irqrestore(&oig->oig_lock, flags); + + CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now " + "pending (racey)\n", oig, old_rc, oig->oig_rc, rc, + oig->oig_pending); + if (wake) + wake_up(wake); + oig_release(oig); +} + +static int oig_done(struct obd_io_group *oig) +{ + unsigned long flags; + int rc = 0; + spin_lock_irqsave(&oig->oig_lock, flags); + if (oig->oig_pending <= 0) + rc = 1; + spin_unlock_irqrestore(&oig->oig_lock, flags); return rc; } -/* - * remove all connections to this device - * close all connections to lower devices - * needed for forced unloads of OBD client drivers - */ -int class_multi_cleanup(struct obd_device *obddev) +static void interrupted_oig(void *data) { - int i; + struct obd_io_group *oig = data; + struct list_head *pos; + struct oig_callback_context *occ; + unsigned long flags; + + spin_lock_irqsave(&oig->oig_lock, flags); + list_for_each(pos, &oig->oig_occ_list) { + occ = list_entry(pos, struct oig_callback_context, + occ_oig_item); + occ->occ_interrupted(occ); + } + spin_unlock_irqrestore(&oig->oig_lock, flags); +} - for (i = 0; i < obddev->obd_multi_count; i++) { - int rc; - struct obd_device *obd = - class_conn2obd(&obddev->obd_multi_conn[i]); +int oig_wait(struct obd_io_group *oig) +{ + struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig); + int rc; - if (!obd) { - CERROR("no such device [i %d]\n", i); - RETURN(-EINVAL); - } + CDEBUG(D_CACHE, "waiting for oig %p\n", oig); - rc = obd_disconnect(&obddev->obd_multi_conn[i]); - if (rc) - CERROR("disconnect failure %d\n", obd->obd_minor); - } - return 0; + do { + rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi); + LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc); + /* we can't continue until the oig has emptied and stopped + * referencing state that the caller will free upon return */ + if (rc == -EINTR) + lwi = (struct l_wait_info){ 0, }; + } while (rc == -EINTR); + + LASSERTF(oig->oig_pending == 0, + "exiting oig_wait(oig = %p) with %d pending\n", oig, + oig->oig_pending); + + CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc); + return oig->oig_rc; } -#endif