1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
24 * These are the only exported functions, they provide some generic
25 * infrastructure for managing object devices
28 #define DEBUG_SUBSYSTEM S_CLASS
30 #include <liblustre.h>
33 #include <obd_class.h>
34 #include <lprocfs_status.h>
35 #include <class_hash.h>
37 extern struct list_head obd_types;
38 spinlock_t obd_types_lock;
40 cfs_mem_cache_t *obd_device_cachep;
41 cfs_mem_cache_t *obdo_cachep;
42 EXPORT_SYMBOL(obdo_cachep);
43 cfs_mem_cache_t *import_cachep;
45 struct list_head obd_zombie_imports;
46 struct list_head obd_zombie_exports;
47 spinlock_t obd_zombie_impexp_lock;
48 static void obd_zombie_impexp_notify(void);
50 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
53 * support functions: we could use inter-module communication, but this
54 * is more portable to other OS's
56 static struct obd_device *obd_device_alloc(void)
58 struct obd_device *obd;
60 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
62 obd->obd_magic = OBD_DEVICE_MAGIC;
66 EXPORT_SYMBOL(obd_device_alloc);
68 static void obd_device_free(struct obd_device *obd)
71 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
72 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
73 if (obd->obd_namespace != NULL) {
74 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
75 obd, obd->obd_namespace, obd->obd_force);
78 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
80 EXPORT_SYMBOL(obd_device_free);
82 struct obd_type *class_search_type(const char *name)
84 struct list_head *tmp;
85 struct obd_type *type;
87 spin_lock(&obd_types_lock);
88 list_for_each(tmp, &obd_types) {
89 type = list_entry(tmp, struct obd_type, typ_chain);
90 if (strcmp(type->typ_name, name) == 0) {
91 spin_unlock(&obd_types_lock);
95 spin_unlock(&obd_types_lock);
99 struct obd_type *class_get_type(const char *name)
101 struct obd_type *type = class_search_type(name);
105 const char *modname = name;
106 if (!request_module(modname)) {
107 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
108 type = class_search_type(name);
110 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
116 spin_lock(&type->obd_type_lock);
118 try_module_get(type->typ_dt_ops->o_owner);
119 spin_unlock(&type->obd_type_lock);
124 void class_put_type(struct obd_type *type)
127 spin_lock(&type->obd_type_lock);
129 module_put(type->typ_dt_ops->o_owner);
130 spin_unlock(&type->obd_type_lock);
133 #define CLASS_MAX_NAME 1024
135 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
136 struct lprocfs_vars *vars, const char *name,
137 struct lu_device_type *ldt)
139 struct obd_type *type;
144 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
146 if (class_search_type(name)) {
147 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
152 OBD_ALLOC(type, sizeof(*type));
156 OBD_ALLOC_PTR(type->typ_dt_ops);
157 OBD_ALLOC_PTR(type->typ_md_ops);
158 OBD_ALLOC(type->typ_name, strlen(name) + 1);
160 if (type->typ_dt_ops == NULL ||
161 type->typ_md_ops == NULL ||
162 type->typ_name == NULL)
165 *(type->typ_dt_ops) = *dt_ops;
166 /* md_ops is optional */
168 *(type->typ_md_ops) = *md_ops;
169 strcpy(type->typ_name, name);
170 spin_lock_init(&type->obd_type_lock);
173 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
175 if (IS_ERR(type->typ_procroot)) {
176 rc = PTR_ERR(type->typ_procroot);
177 type->typ_procroot = NULL;
183 rc = ldt->ldt_ops->ldto_init(ldt);
188 spin_lock(&obd_types_lock);
189 list_add(&type->typ_chain, &obd_types);
190 spin_unlock(&obd_types_lock);
195 if (type->typ_name != NULL)
196 OBD_FREE(type->typ_name, strlen(name) + 1);
197 if (type->typ_md_ops != NULL)
198 OBD_FREE_PTR(type->typ_md_ops);
199 if (type->typ_dt_ops != NULL)
200 OBD_FREE_PTR(type->typ_dt_ops);
201 OBD_FREE(type, sizeof(*type));
205 int class_unregister_type(const char *name)
207 struct obd_type *type = class_search_type(name);
211 CERROR("unknown obd type\n");
215 if (type->typ_refcnt) {
216 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
217 /* This is a bad situation, let's make the best of it */
218 /* Remove ops, but leave the name for debugging */
219 OBD_FREE_PTR(type->typ_dt_ops);
220 OBD_FREE_PTR(type->typ_md_ops);
224 if (type->typ_procroot) {
225 lprocfs_remove(&type->typ_procroot);
229 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
231 spin_lock(&obd_types_lock);
232 list_del(&type->typ_chain);
233 spin_unlock(&obd_types_lock);
234 OBD_FREE(type->typ_name, strlen(name) + 1);
235 if (type->typ_dt_ops != NULL)
236 OBD_FREE_PTR(type->typ_dt_ops);
237 if (type->typ_md_ops != NULL)
238 OBD_FREE_PTR(type->typ_md_ops);
239 OBD_FREE(type, sizeof(*type));
241 } /* class_unregister_type */
243 struct obd_device *class_newdev(const char *type_name, const char *name)
245 struct obd_device *result = NULL;
246 struct obd_device *newdev;
247 struct obd_type *type = NULL;
249 int new_obd_minor = 0;
251 if (strlen(name) > MAX_OBD_NAME) {
252 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
253 RETURN(ERR_PTR(-EINVAL));
256 type = class_get_type(type_name);
258 CERROR("OBD: unknown type: %s\n", type_name);
259 RETURN(ERR_PTR(-ENODEV));
262 newdev = obd_device_alloc();
263 if (newdev == NULL) {
264 class_put_type(type);
265 RETURN(ERR_PTR(-ENOMEM));
267 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
269 spin_lock(&obd_dev_lock);
270 for (i = 0; i < class_devno_max(); i++) {
271 struct obd_device *obd = class_num2obd(i);
272 if (obd && obd->obd_name &&
273 (strcmp(name, obd->obd_name) == 0)) {
274 CERROR("Device %s already exists, won't add\n", name);
276 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
277 "%p obd_magic %08x != %08x\n", result,
278 result->obd_magic, OBD_DEVICE_MAGIC);
279 LASSERTF(result->obd_minor == new_obd_minor,
280 "%p obd_minor %d != %d\n", result,
281 result->obd_minor, new_obd_minor);
283 obd_devs[result->obd_minor] = NULL;
284 result->obd_name[0]='\0';
286 result = ERR_PTR(-EEXIST);
289 if (!result && !obd) {
291 result->obd_minor = i;
293 result->obd_type = type;
294 memcpy(result->obd_name, name, strlen(name));
295 obd_devs[i] = result;
298 spin_unlock(&obd_dev_lock);
300 if (result == NULL && i >= class_devno_max()) {
301 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
303 result = ERR_PTR(-EOVERFLOW);
306 if (IS_ERR(result)) {
307 obd_device_free(newdev);
308 class_put_type(type);
310 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
311 result->obd_name, result);
316 void class_release_dev(struct obd_device *obd)
318 struct obd_type *obd_type = obd->obd_type;
320 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
321 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
322 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
323 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
324 LASSERT(obd_type != NULL);
326 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
327 obd->obd_name,obd->obd_type->typ_name);
329 spin_lock(&obd_dev_lock);
330 obd_devs[obd->obd_minor] = NULL;
331 spin_unlock(&obd_dev_lock);
332 obd_device_free(obd);
334 class_put_type(obd_type);
337 int class_name2dev(const char *name)
344 spin_lock(&obd_dev_lock);
345 for (i = 0; i < class_devno_max(); i++) {
346 struct obd_device *obd = class_num2obd(i);
347 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
348 /* Make sure we finished attaching before we give
349 out any references */
350 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
351 if (obd->obd_attached) {
352 spin_unlock(&obd_dev_lock);
358 spin_unlock(&obd_dev_lock);
363 struct obd_device *class_name2obd(const char *name)
365 int dev = class_name2dev(name);
367 if (dev < 0 || dev > class_devno_max())
369 return class_num2obd(dev);
372 int class_uuid2dev(struct obd_uuid *uuid)
376 spin_lock(&obd_dev_lock);
377 for (i = 0; i < class_devno_max(); i++) {
378 struct obd_device *obd = class_num2obd(i);
379 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
380 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
381 spin_unlock(&obd_dev_lock);
385 spin_unlock(&obd_dev_lock);
390 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
392 int dev = class_uuid2dev(uuid);
395 return class_num2obd(dev);
398 struct obd_device *class_num2obd(int num)
400 struct obd_device *obd = NULL;
402 if (num < class_devno_max()) {
408 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
409 "%p obd_magic %08x != %08x\n",
410 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
411 LASSERTF(obd->obd_minor == num,
412 "%p obd_minor %0d != %0d\n",
413 obd, obd->obd_minor, num);
419 void class_obd_list(void)
424 spin_lock(&obd_dev_lock);
425 for (i = 0; i < class_devno_max(); i++) {
426 struct obd_device *obd = class_num2obd(i);
429 if (obd->obd_stopping)
431 else if (obd->obd_set_up)
433 else if (obd->obd_attached)
437 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
438 i, status, obd->obd_type->typ_name,
439 obd->obd_name, obd->obd_uuid.uuid,
440 atomic_read(&obd->obd_refcount));
442 spin_unlock(&obd_dev_lock);
446 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
447 specified, then only the client with that uuid is returned,
448 otherwise any client connected to the tgt is returned. */
449 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
450 const char * typ_name,
451 struct obd_uuid *grp_uuid)
455 spin_lock(&obd_dev_lock);
456 for (i = 0; i < class_devno_max(); i++) {
457 struct obd_device *obd = class_num2obd(i);
460 if ((strncmp(obd->obd_type->typ_name, typ_name,
461 strlen(typ_name)) == 0)) {
462 if (obd_uuid_equals(tgt_uuid,
463 &obd->u.cli.cl_target_uuid) &&
464 ((grp_uuid)? obd_uuid_equals(grp_uuid,
465 &obd->obd_uuid) : 1)) {
466 spin_unlock(&obd_dev_lock);
471 spin_unlock(&obd_dev_lock);
476 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
477 struct obd_uuid *grp_uuid)
479 struct obd_device *obd;
481 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
483 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
488 /* Iterate the obd_device list looking devices have grp_uuid. Start
489 searching at *next, and if a device is found, the next index to look
490 at is saved in *next. If next is NULL, then the first matching device
491 will always be returned. */
492 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
498 else if (*next >= 0 && *next < class_devno_max())
503 spin_lock(&obd_dev_lock);
504 for (; i < class_devno_max(); i++) {
505 struct obd_device *obd = class_num2obd(i);
508 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
511 spin_unlock(&obd_dev_lock);
515 spin_unlock(&obd_dev_lock);
521 void obd_cleanup_caches(void)
526 if (obd_device_cachep) {
527 rc = cfs_mem_cache_destroy(obd_device_cachep);
528 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
529 obd_device_cachep = NULL;
532 rc = cfs_mem_cache_destroy(obdo_cachep);
533 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
537 rc = cfs_mem_cache_destroy(import_cachep);
538 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
539 import_cachep = NULL;
542 rc = cfs_mem_cache_destroy(capa_cachep);
543 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
549 int obd_init_caches(void)
553 LASSERT(obd_device_cachep == NULL);
554 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
555 sizeof(struct obd_device),
557 if (!obd_device_cachep)
560 LASSERT(obdo_cachep == NULL);
561 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
566 LASSERT(import_cachep == NULL);
567 import_cachep = cfs_mem_cache_create("ll_import_cache",
568 sizeof(struct obd_import),
573 LASSERT(capa_cachep == NULL);
574 capa_cachep = cfs_mem_cache_create("capa_cache",
575 sizeof(struct obd_capa), 0, 0);
581 obd_cleanup_caches();
586 /* map connection to client */
587 struct obd_export *class_conn2export(struct lustre_handle *conn)
589 struct obd_export *export;
593 CDEBUG(D_CACHE, "looking for null handle\n");
597 if (conn->cookie == -1) { /* this means assign a new connection */
598 CDEBUG(D_CACHE, "want a new connection\n");
602 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
603 export = class_handle2object(conn->cookie);
607 struct obd_device *class_exp2obd(struct obd_export *exp)
614 struct obd_device *class_conn2obd(struct lustre_handle *conn)
616 struct obd_export *export;
617 export = class_conn2export(conn);
619 struct obd_device *obd = export->exp_obd;
620 class_export_put(export);
626 struct obd_import *class_exp2cliimp(struct obd_export *exp)
628 struct obd_device *obd = exp->exp_obd;
631 return obd->u.cli.cl_import;
634 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
636 struct obd_device *obd = class_conn2obd(conn);
639 return obd->u.cli.cl_import;
642 /* Export management functions */
643 static void export_handle_addref(void *export)
645 class_export_get(export);
648 void __class_export_put(struct obd_export *exp)
650 if (atomic_dec_and_test(&exp->exp_refcount)) {
651 LASSERT (list_empty(&exp->exp_obd_chain));
653 CDEBUG(D_IOCTL, "final put %p/%s\n",
654 exp, exp->exp_client_uuid.uuid);
656 spin_lock(&obd_zombie_impexp_lock);
657 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
658 spin_unlock(&obd_zombie_impexp_lock);
660 if (obd_zombie_impexp_notify != NULL)
661 obd_zombie_impexp_notify();
664 EXPORT_SYMBOL(__class_export_put);
666 void class_export_destroy(struct obd_export *exp)
668 struct obd_device *obd = exp->exp_obd;
671 LASSERT (atomic_read(&exp->exp_refcount) == 0);
673 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
674 exp->exp_client_uuid.uuid, obd->obd_name);
676 LASSERT(obd != NULL);
678 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
679 if (exp->exp_connection)
680 ptlrpc_put_connection_superhack(exp->exp_connection);
682 LASSERT(list_empty(&exp->exp_outstanding_replies));
683 obd_destroy_export(exp);
685 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
690 /* Creates a new export, adds it to the hash table, and returns a
691 * pointer to it. The refcount is 2: one for the hash reference, and
692 * one for the pointer returned by this function. */
693 struct obd_export *class_new_export(struct obd_device *obd,
694 struct obd_uuid *cluuid)
696 struct obd_export *export;
699 OBD_ALLOC_PTR(export);
701 return ERR_PTR(-ENOMEM);
703 export->exp_conn_cnt = 0;
704 atomic_set(&export->exp_refcount, 2);
705 atomic_set(&export->exp_rpc_count, 0);
706 export->exp_obd = obd;
707 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
708 /* XXX this should be in LDLM init */
709 CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
710 spin_lock_init(&export->exp_ldlm_data.led_lock);
712 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
713 class_handle_hash(&export->exp_handle, export_handle_addref);
714 export->exp_last_request_time = cfs_time_current_sec();
715 spin_lock_init(&export->exp_lock);
716 INIT_HLIST_NODE(&export->exp_uuid_hash);
717 INIT_HLIST_NODE(&export->exp_nid_hash);
719 export->exp_sp_peer = LUSTRE_SP_ANY;
720 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
721 export->exp_client_uuid = *cluuid;
722 obd_init_export(export);
724 spin_lock(&obd->obd_dev_lock);
725 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
726 rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid,
727 &export->exp_uuid_hash);
729 CWARN("%s: denying duplicate export for %s\n",
730 obd->obd_name, cluuid->uuid);
731 spin_unlock(&obd->obd_dev_lock);
732 class_handle_unhash(&export->exp_handle);
733 OBD_FREE_PTR(export);
734 return ERR_PTR(-EALREADY);
738 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
740 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
741 list_add_tail(&export->exp_obd_chain_timed,
742 &export->exp_obd->obd_exports_timed);
743 export->exp_obd->obd_num_exports++;
744 spin_unlock(&obd->obd_dev_lock);
748 EXPORT_SYMBOL(class_new_export);
750 void class_unlink_export(struct obd_export *exp)
752 class_handle_unhash(&exp->exp_handle);
754 spin_lock(&exp->exp_obd->obd_dev_lock);
755 /* delete an uuid-export hashitem from hashtables */
756 if (!hlist_unhashed(&exp->exp_uuid_hash)) {
757 lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body,
758 &exp->exp_client_uuid, &exp->exp_uuid_hash);
760 list_del_init(&exp->exp_obd_chain);
761 list_del_init(&exp->exp_obd_chain_timed);
762 exp->exp_obd->obd_num_exports--;
763 spin_unlock(&exp->exp_obd->obd_dev_lock);
765 class_export_put(exp);
767 EXPORT_SYMBOL(class_unlink_export);
769 /* Import management functions */
770 static void import_handle_addref(void *import)
772 class_import_get(import);
775 struct obd_import *class_import_get(struct obd_import *import)
777 LASSERT(atomic_read(&import->imp_refcount) >= 0);
778 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
779 atomic_inc(&import->imp_refcount);
780 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
781 atomic_read(&import->imp_refcount));
784 EXPORT_SYMBOL(class_import_get);
786 void class_import_put(struct obd_import *import)
790 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
791 atomic_read(&import->imp_refcount) - 1);
793 LASSERT(atomic_read(&import->imp_refcount) > 0);
794 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
795 LASSERT(list_empty(&import->imp_zombie_chain));
797 if (atomic_dec_and_test(&import->imp_refcount)) {
799 CDEBUG(D_INFO, "final put import %p\n", import);
801 spin_lock(&obd_zombie_impexp_lock);
802 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
803 spin_unlock(&obd_zombie_impexp_lock);
805 if (obd_zombie_impexp_notify != NULL)
806 obd_zombie_impexp_notify();
812 void class_import_destroy(struct obd_import *import)
816 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
817 import->imp_obd->obd_name);
819 LASSERT(atomic_read(&import->imp_refcount) == 0);
821 ptlrpc_put_connection_superhack(import->imp_connection);
823 while (!list_empty(&import->imp_conn_list)) {
824 struct obd_import_conn *imp_conn;
826 imp_conn = list_entry(import->imp_conn_list.next,
827 struct obd_import_conn, oic_item);
828 list_del(&imp_conn->oic_item);
829 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
830 OBD_FREE(imp_conn, sizeof(*imp_conn));
833 LASSERT(import->imp_sec == NULL);
834 class_decref(import->imp_obd);
835 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
838 EXPORT_SYMBOL(class_import_put);
840 struct obd_import *class_new_import(struct obd_device *obd)
842 struct obd_import *imp;
844 OBD_ALLOC(imp, sizeof(*imp));
848 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
849 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
850 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
851 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
852 spin_lock_init(&imp->imp_lock);
853 imp->imp_last_success_conn = 0;
854 imp->imp_state = LUSTRE_IMP_NEW;
855 imp->imp_obd = class_incref(obd);
856 sema_init(&imp->imp_sec_mutex, 1);
857 cfs_waitq_init(&imp->imp_recovery_waitq);
859 atomic_set(&imp->imp_refcount, 2);
860 atomic_set(&imp->imp_inflight, 0);
861 atomic_set(&imp->imp_replay_inflight, 0);
862 atomic_set(&imp->imp_inval_count, 0);
863 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
864 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
865 class_handle_hash(&imp->imp_handle, import_handle_addref);
867 /* the default magic is V2, will be used in connect RPC, and
868 * then adjusted according to the flags in request/reply. */
869 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
873 EXPORT_SYMBOL(class_new_import);
875 void class_destroy_import(struct obd_import *import)
877 LASSERT(import != NULL);
878 LASSERT(import != LP_POISON);
880 class_handle_unhash(&import->imp_handle);
882 spin_lock(&import->imp_lock);
883 import->imp_generation++;
884 spin_unlock(&import->imp_lock);
885 class_import_put(import);
887 EXPORT_SYMBOL(class_destroy_import);
889 /* A connection defines an export context in which preallocation can
890 be managed. This releases the export pointer reference, and returns
891 the export handle, so the export refcount is 1 when this function
893 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
894 struct obd_uuid *cluuid)
896 struct obd_export *export;
897 LASSERT(conn != NULL);
898 LASSERT(obd != NULL);
899 LASSERT(cluuid != NULL);
902 export = class_new_export(obd, cluuid);
904 RETURN(PTR_ERR(export));
906 conn->cookie = export->exp_handle.h_cookie;
907 class_export_put(export);
909 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
910 cluuid->uuid, conn->cookie);
913 EXPORT_SYMBOL(class_connect);
915 /* if export is involved in recovery then clean up related things */
916 void class_export_recovery_cleanup(struct obd_export *exp)
918 struct obd_device *obd = exp->exp_obd;
920 spin_lock_bh(&obd->obd_processing_task_lock);
921 if (obd->obd_recovering && exp->exp_in_recovery) {
922 spin_lock(&exp->exp_lock);
923 exp->exp_in_recovery = 0;
924 spin_unlock(&exp->exp_lock);
925 obd->obd_connected_clients--;
926 /* each connected client is counted as recoverable */
927 obd->obd_recoverable_clients--;
928 if (exp->exp_req_replay_needed) {
929 spin_lock(&exp->exp_lock);
930 exp->exp_req_replay_needed = 0;
931 spin_unlock(&exp->exp_lock);
932 LASSERT(atomic_read(&obd->obd_req_replay_clients));
933 atomic_dec(&obd->obd_req_replay_clients);
935 if (exp->exp_lock_replay_needed) {
936 spin_lock(&exp->exp_lock);
937 exp->exp_lock_replay_needed = 0;
938 spin_unlock(&exp->exp_lock);
939 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
940 atomic_dec(&obd->obd_lock_replay_clients);
943 spin_unlock_bh(&obd->obd_processing_task_lock);
946 /* This function removes two references from the export: one for the
947 * hash entry and one for the export pointer passed in. The export
948 * pointer passed to this function is destroyed should not be used
950 int class_disconnect(struct obd_export *export)
952 int already_disconnected;
955 if (export == NULL) {
957 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
961 spin_lock(&export->exp_lock);
962 already_disconnected = export->exp_disconnected;
963 export->exp_disconnected = 1;
965 if (!hlist_unhashed(&export->exp_nid_hash)) {
966 lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
967 &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
969 spin_unlock(&export->exp_lock);
971 /* class_cleanup(), abort_recovery(), and class_fail_export()
972 * all end up in here, and if any of them race we shouldn't
973 * call extra class_export_puts(). */
974 if (already_disconnected)
977 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
978 export->exp_handle.h_cookie);
980 class_export_recovery_cleanup(export);
981 class_unlink_export(export);
982 class_export_put(export);
986 static void class_disconnect_export_list(struct list_head *list, int flags)
989 struct lustre_handle fake_conn;
990 struct obd_export *fake_exp, *exp;
993 /* It's possible that an export may disconnect itself, but
994 * nothing else will be added to this list. */
995 while (!list_empty(list)) {
996 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
997 class_export_get(exp);
999 spin_lock(&exp->exp_lock);
1000 exp->exp_flags = flags;
1001 spin_unlock(&exp->exp_lock);
1003 if (obd_uuid_equals(&exp->exp_client_uuid,
1004 &exp->exp_obd->obd_uuid)) {
1006 "exp %p export uuid == obd uuid, don't discon\n",
1008 /* Need to delete this now so we don't end up pointing
1009 * to work_list later when this export is cleaned up. */
1010 list_del_init(&exp->exp_obd_chain);
1011 class_export_put(exp);
1015 fake_conn.cookie = exp->exp_handle.h_cookie;
1016 fake_exp = class_conn2export(&fake_conn);
1018 class_export_put(exp);
1022 spin_lock(&fake_exp->exp_lock);
1023 fake_exp->exp_flags = flags;
1024 spin_unlock(&fake_exp->exp_lock);
1026 rc = obd_disconnect(fake_exp);
1027 class_export_put(exp);
1028 CDEBUG(D_HA, "disconnecting export %s (%p): rc %d\n",
1029 exp->exp_client_uuid.uuid, exp, rc);
1034 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1036 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1037 (obd->obd_force ? OBD_OPT_FORCE : 0));
1040 void class_disconnect_exports(struct obd_device *obd)
1042 struct list_head work_list;
1045 /* Move all of the exports from obd_exports to a work list, en masse. */
1046 spin_lock(&obd->obd_dev_lock);
1047 list_add(&work_list, &obd->obd_exports);
1048 list_del_init(&obd->obd_exports);
1049 spin_unlock(&obd->obd_dev_lock);
1051 if (!list_empty(&work_list)) {
1052 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1053 "disconnecting them\n", obd->obd_minor, obd);
1054 class_disconnect_export_list(&work_list,
1055 get_exp_flags_from_obd(obd));
1057 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1058 obd->obd_minor, obd);
1061 EXPORT_SYMBOL(class_disconnect_exports);
1063 /* Remove exports that have not completed recovery.
1065 int class_disconnect_stale_exports(struct obd_device *obd,
1066 int (*test_export)(struct obd_export *))
1068 struct list_head work_list;
1069 struct list_head *pos, *n;
1070 struct obd_export *exp;
1074 CFS_INIT_LIST_HEAD(&work_list);
1075 spin_lock(&obd->obd_dev_lock);
1076 list_for_each_safe(pos, n, &obd->obd_exports) {
1077 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1078 if (test_export(exp))
1081 list_del(&exp->exp_obd_chain);
1082 list_add(&exp->exp_obd_chain, &work_list);
1083 /* don't count self-export as client */
1084 if (obd_uuid_equals(&exp->exp_client_uuid,
1085 &exp->exp_obd->obd_uuid))
1089 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1090 obd->obd_name, exp->exp_client_uuid.uuid,
1091 exp->exp_connection == NULL ? "<unknown>" :
1092 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1094 spin_unlock(&obd->obd_dev_lock);
1096 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1097 obd->obd_name, cnt);
1098 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1101 EXPORT_SYMBOL(class_disconnect_stale_exports);
1103 int oig_init(struct obd_io_group **oig_out)
1105 struct obd_io_group *oig;
1108 OBD_ALLOC(oig, sizeof(*oig));
1112 spin_lock_init(&oig->oig_lock);
1114 oig->oig_pending = 0;
1115 atomic_set(&oig->oig_refcount, 1);
1116 cfs_waitq_init(&oig->oig_waitq);
1117 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1122 EXPORT_SYMBOL(oig_init);
1124 static inline void oig_grab(struct obd_io_group *oig)
1126 atomic_inc(&oig->oig_refcount);
1129 void oig_release(struct obd_io_group *oig)
1131 if (atomic_dec_and_test(&oig->oig_refcount))
1132 OBD_FREE(oig, sizeof(*oig));
1134 EXPORT_SYMBOL(oig_release);
1136 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1139 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1140 spin_lock(&oig->oig_lock);
1146 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1148 spin_unlock(&oig->oig_lock);
1153 EXPORT_SYMBOL(oig_add_one);
1155 void oig_complete_one(struct obd_io_group *oig,
1156 struct oig_callback_context *occ, int rc)
1158 cfs_waitq_t *wake = NULL;
1161 spin_lock(&oig->oig_lock);
1164 list_del_init(&occ->occ_oig_item);
1166 old_rc = oig->oig_rc;
1167 if (oig->oig_rc == 0 && rc != 0)
1170 if (--oig->oig_pending <= 0)
1171 wake = &oig->oig_waitq;
1173 spin_unlock(&oig->oig_lock);
1175 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1176 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1179 cfs_waitq_signal(wake);
1182 EXPORT_SYMBOL(oig_complete_one);
1184 static int oig_done(struct obd_io_group *oig)
1187 spin_lock(&oig->oig_lock);
1188 if (oig->oig_pending <= 0)
1190 spin_unlock(&oig->oig_lock);
1194 static void interrupted_oig(void *data)
1196 struct obd_io_group *oig = data;
1197 struct oig_callback_context *occ;
1199 spin_lock(&oig->oig_lock);
1200 /* We need to restart the processing each time we drop the lock, as
1201 * it is possible other threads called oig_complete_one() to remove
1202 * an entry elsewhere in the list while we dropped lock. We need to
1203 * drop the lock because osc_ap_completion() calls oig_complete_one()
1204 * which re-gets this lock ;-) as well as a lock ordering issue. */
1206 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1207 if (occ->interrupted)
1209 occ->interrupted = 1;
1210 spin_unlock(&oig->oig_lock);
1211 occ->occ_interrupted(occ);
1212 spin_lock(&oig->oig_lock);
1215 spin_unlock(&oig->oig_lock);
1218 int oig_wait(struct obd_io_group *oig)
1220 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1223 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1226 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1227 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1228 /* we can't continue until the oig has emptied and stopped
1229 * referencing state that the caller will free upon return */
1231 lwi = (struct l_wait_info){ 0, };
1232 } while (rc == -EINTR);
1234 LASSERTF(oig->oig_pending == 0,
1235 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1238 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1241 EXPORT_SYMBOL(oig_wait);
1243 void class_fail_export(struct obd_export *exp)
1245 int rc, already_failed;
1247 spin_lock(&exp->exp_lock);
1248 already_failed = exp->exp_failed;
1249 exp->exp_failed = 1;
1250 spin_unlock(&exp->exp_lock);
1252 if (already_failed) {
1253 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1254 exp, exp->exp_client_uuid.uuid);
1258 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1259 exp, exp->exp_client_uuid.uuid);
1261 if (obd_dump_on_timeout)
1262 libcfs_debug_dumplog();
1264 /* Most callers into obd_disconnect are removing their own reference
1265 * (request, for example) in addition to the one from the hash table.
1266 * We don't have such a reference here, so make one. */
1267 class_export_get(exp);
1268 rc = obd_disconnect(exp);
1270 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1272 CDEBUG(D_HA, "disconnected export %p/%s\n",
1273 exp, exp->exp_client_uuid.uuid);
1275 EXPORT_SYMBOL(class_fail_export);
1277 char *obd_export_nid2str(struct obd_export *exp)
1279 if (exp->exp_connection != NULL)
1280 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1284 EXPORT_SYMBOL(obd_export_nid2str);
1286 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1288 struct obd_export *doomed_exp = NULL;
1289 int exports_evicted = 0;
1291 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1294 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
1296 if (doomed_exp == NULL)
1299 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1300 "nid %s found, wanted nid %s, requested nid %s\n",
1301 obd_export_nid2str(doomed_exp),
1302 libcfs_nid2str(nid_key), nid);
1303 LASSERTF(doomed_exp != obd->obd_self_export,
1304 "self-export is hashed by NID?\n");
1306 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1307 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1309 class_fail_export(doomed_exp);
1310 class_export_put(doomed_exp);
1313 if (!exports_evicted)
1314 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1315 obd->obd_name, nid);
1316 return exports_evicted;
1318 EXPORT_SYMBOL(obd_export_evict_by_nid);
1320 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1322 struct obd_export *doomed_exp = NULL;
1323 struct obd_uuid doomed;
1324 int exports_evicted = 0;
1326 obd_str2uuid(&doomed, uuid);
1327 if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
1328 CERROR("%s: can't evict myself\n", obd->obd_name);
1329 return exports_evicted;
1332 doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body,
1335 if (doomed_exp == NULL) {
1336 CERROR("%s: can't disconnect %s: no exports found\n",
1337 obd->obd_name, uuid);
1339 CWARN("%s: evicting %s at adminstrative request\n",
1340 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1341 class_fail_export(doomed_exp);
1342 class_export_put(doomed_exp);
1346 return exports_evicted;
1348 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1351 * kill zombie imports and exports
1353 void obd_zombie_impexp_cull(void)
1355 struct obd_import *import;
1356 struct obd_export *export;
1360 spin_lock (&obd_zombie_impexp_lock);
1363 if (!list_empty(&obd_zombie_imports)) {
1364 import = list_entry(obd_zombie_imports.next,
1367 list_del(&import->imp_zombie_chain);
1371 if (!list_empty(&obd_zombie_exports)) {
1372 export = list_entry(obd_zombie_exports.next,
1375 list_del_init(&export->exp_obd_chain);
1378 spin_unlock(&obd_zombie_impexp_lock);
1381 class_import_destroy(import);
1384 class_export_destroy(export);
1386 } while (import != NULL || export != NULL);
1390 static struct completion obd_zombie_start;
1391 static struct completion obd_zombie_stop;
1392 static unsigned long obd_zombie_flags;
1393 static cfs_waitq_t obd_zombie_waitq;
1400 * check for work for kill zombie import/export thread.
1402 int obd_zombie_impexp_check(void *arg)
1406 spin_lock(&obd_zombie_impexp_lock);
1407 rc = list_empty(&obd_zombie_imports) &&
1408 list_empty(&obd_zombie_exports) &&
1409 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1411 spin_unlock(&obd_zombie_impexp_lock);
1417 * notify import/export destroy thread about new zombie.
1419 static void obd_zombie_impexp_notify(void)
1421 cfs_waitq_signal(&obd_zombie_waitq);
1427 * destroy zombie export/import thread.
1429 static int obd_zombie_impexp_thread(void *unused)
1433 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1434 complete(&obd_zombie_start);
1438 complete(&obd_zombie_start);
1440 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1441 struct l_wait_info lwi = { 0 };
1443 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1445 obd_zombie_impexp_cull();
1448 complete(&obd_zombie_stop);
1453 #else /* ! KERNEL */
1455 static atomic_t zombie_recur = ATOMIC_INIT(0);
1456 static void *obd_zombie_impexp_work_cb;
1457 static void *obd_zombie_impexp_idle_cb;
1459 int obd_zombie_impexp_kill(void *arg)
1463 if (atomic_inc_return(&zombie_recur) == 1) {
1464 obd_zombie_impexp_cull();
1467 atomic_dec(&zombie_recur);
1474 * start destroy zombie import/export thread
1476 int obd_zombie_impexp_init(void)
1480 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1481 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1482 spin_lock_init(&obd_zombie_impexp_lock);
1483 init_completion(&obd_zombie_start);
1484 init_completion(&obd_zombie_stop);
1485 cfs_waitq_init(&obd_zombie_waitq);
1488 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1492 wait_for_completion(&obd_zombie_start);
1495 obd_zombie_impexp_work_cb =
1496 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1497 &obd_zombie_impexp_kill, NULL);
1499 obd_zombie_impexp_idle_cb =
1500 liblustre_register_idle_callback("obd_zombi_impexp_check",
1501 &obd_zombie_impexp_check, NULL);
1508 * stop destroy zombie import/export thread
1510 void obd_zombie_impexp_stop(void)
1512 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1513 obd_zombie_impexp_notify();
1515 wait_for_completion(&obd_zombie_stop);
1517 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1518 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);