1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
24 * These are the only exported functions, they provide some generic
25 * infrastructure for managing object devices
28 #define DEBUG_SUBSYSTEM S_CLASS
30 #include <liblustre.h>
33 #include <obd_class.h>
34 #include <lprocfs_status.h>
35 #include <class_hash.h>
37 extern struct list_head obd_types;
38 spinlock_t obd_types_lock;
40 cfs_mem_cache_t *obd_device_cachep;
41 cfs_mem_cache_t *obdo_cachep;
42 EXPORT_SYMBOL(obdo_cachep);
43 cfs_mem_cache_t *import_cachep;
45 struct list_head obd_zombie_imports;
46 struct list_head obd_zombie_exports;
47 spinlock_t obd_zombie_impexp_lock;
48 static void obd_zombie_impexp_notify(void);
50 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
53 * support functions: we could use inter-module communication, but this
54 * is more portable to other OS's
56 static struct obd_device *obd_device_alloc(void)
58 struct obd_device *obd;
60 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
62 obd->obd_magic = OBD_DEVICE_MAGIC;
66 EXPORT_SYMBOL(obd_device_alloc);
68 static void obd_device_free(struct obd_device *obd)
71 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
72 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
73 if (obd->obd_namespace != NULL) {
74 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
75 obd, obd->obd_namespace, obd->obd_force);
78 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
80 EXPORT_SYMBOL(obd_device_free);
82 struct obd_type *class_search_type(const char *name)
84 struct list_head *tmp;
85 struct obd_type *type;
87 spin_lock(&obd_types_lock);
88 list_for_each(tmp, &obd_types) {
89 type = list_entry(tmp, struct obd_type, typ_chain);
90 if (strcmp(type->typ_name, name) == 0) {
91 spin_unlock(&obd_types_lock);
95 spin_unlock(&obd_types_lock);
99 struct obd_type *class_get_type(const char *name)
101 struct obd_type *type = class_search_type(name);
105 const char *modname = name;
106 if (!request_module(modname)) {
107 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
108 type = class_search_type(name);
110 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
116 spin_lock(&type->obd_type_lock);
118 try_module_get(type->typ_dt_ops->o_owner);
119 spin_unlock(&type->obd_type_lock);
124 void class_put_type(struct obd_type *type)
127 spin_lock(&type->obd_type_lock);
129 module_put(type->typ_dt_ops->o_owner);
130 spin_unlock(&type->obd_type_lock);
133 #define CLASS_MAX_NAME 1024
135 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
136 struct lprocfs_vars *vars, const char *name,
137 struct lu_device_type *ldt)
139 struct obd_type *type;
144 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
146 if (class_search_type(name)) {
147 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
152 OBD_ALLOC(type, sizeof(*type));
156 OBD_ALLOC_PTR(type->typ_dt_ops);
157 OBD_ALLOC_PTR(type->typ_md_ops);
158 OBD_ALLOC(type->typ_name, strlen(name) + 1);
160 if (type->typ_dt_ops == NULL ||
161 type->typ_md_ops == NULL ||
162 type->typ_name == NULL)
165 *(type->typ_dt_ops) = *dt_ops;
166 /* md_ops is optional */
168 *(type->typ_md_ops) = *md_ops;
169 strcpy(type->typ_name, name);
170 spin_lock_init(&type->obd_type_lock);
173 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
175 if (IS_ERR(type->typ_procroot)) {
176 rc = PTR_ERR(type->typ_procroot);
177 type->typ_procroot = NULL;
183 rc = ldt->ldt_ops->ldto_init(ldt);
188 spin_lock(&obd_types_lock);
189 list_add(&type->typ_chain, &obd_types);
190 spin_unlock(&obd_types_lock);
195 if (type->typ_name != NULL)
196 OBD_FREE(type->typ_name, strlen(name) + 1);
197 if (type->typ_md_ops != NULL)
198 OBD_FREE_PTR(type->typ_md_ops);
199 if (type->typ_dt_ops != NULL)
200 OBD_FREE_PTR(type->typ_dt_ops);
201 OBD_FREE(type, sizeof(*type));
205 int class_unregister_type(const char *name)
207 struct obd_type *type = class_search_type(name);
211 CERROR("unknown obd type\n");
215 if (type->typ_refcnt) {
216 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
217 /* This is a bad situation, let's make the best of it */
218 /* Remove ops, but leave the name for debugging */
219 OBD_FREE_PTR(type->typ_dt_ops);
220 OBD_FREE_PTR(type->typ_md_ops);
224 if (type->typ_procroot) {
225 lprocfs_remove(&type->typ_procroot);
229 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
231 spin_lock(&obd_types_lock);
232 list_del(&type->typ_chain);
233 spin_unlock(&obd_types_lock);
234 OBD_FREE(type->typ_name, strlen(name) + 1);
235 if (type->typ_dt_ops != NULL)
236 OBD_FREE_PTR(type->typ_dt_ops);
237 if (type->typ_md_ops != NULL)
238 OBD_FREE_PTR(type->typ_md_ops);
239 OBD_FREE(type, sizeof(*type));
241 } /* class_unregister_type */
243 struct obd_device *class_newdev(const char *type_name, const char *name)
245 struct obd_device *result = NULL;
246 struct obd_device *newdev;
247 struct obd_type *type = NULL;
249 int new_obd_minor = 0;
251 if (strlen(name) > MAX_OBD_NAME) {
252 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
253 RETURN(ERR_PTR(-EINVAL));
256 type = class_get_type(type_name);
258 CERROR("OBD: unknown type: %s\n", type_name);
259 RETURN(ERR_PTR(-ENODEV));
262 newdev = obd_device_alloc();
263 if (newdev == NULL) {
264 class_put_type(type);
265 RETURN(ERR_PTR(-ENOMEM));
267 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
269 spin_lock(&obd_dev_lock);
270 for (i = 0; i < class_devno_max(); i++) {
271 struct obd_device *obd = class_num2obd(i);
272 if (obd && obd->obd_name &&
273 (strcmp(name, obd->obd_name) == 0)) {
274 CERROR("Device %s already exists, won't add\n", name);
276 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
277 "%p obd_magic %08x != %08x\n", result,
278 result->obd_magic, OBD_DEVICE_MAGIC);
279 LASSERTF(result->obd_minor == new_obd_minor,
280 "%p obd_minor %d != %d\n", result,
281 result->obd_minor, new_obd_minor);
283 obd_devs[result->obd_minor] = NULL;
284 result->obd_name[0]='\0';
286 result = ERR_PTR(-EEXIST);
289 if (!result && !obd) {
291 result->obd_minor = i;
293 result->obd_type = type;
294 memcpy(result->obd_name, name, strlen(name));
295 obd_devs[i] = result;
298 spin_unlock(&obd_dev_lock);
300 if (result == NULL && i >= class_devno_max()) {
301 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
303 result = ERR_PTR(-EOVERFLOW);
306 if (IS_ERR(result)) {
307 obd_device_free(newdev);
308 class_put_type(type);
310 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
311 result->obd_name, result);
316 void class_release_dev(struct obd_device *obd)
318 struct obd_type *obd_type = obd->obd_type;
320 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
321 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
322 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
323 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
324 LASSERT(obd_type != NULL);
326 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
327 obd->obd_name,obd->obd_type->typ_name);
329 spin_lock(&obd_dev_lock);
330 obd_devs[obd->obd_minor] = NULL;
331 spin_unlock(&obd_dev_lock);
332 obd_device_free(obd);
334 class_put_type(obd_type);
337 int class_name2dev(const char *name)
344 spin_lock(&obd_dev_lock);
345 for (i = 0; i < class_devno_max(); i++) {
346 struct obd_device *obd = class_num2obd(i);
347 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
348 /* Make sure we finished attaching before we give
349 out any references */
350 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
351 if (obd->obd_attached) {
352 spin_unlock(&obd_dev_lock);
358 spin_unlock(&obd_dev_lock);
363 struct obd_device *class_name2obd(const char *name)
365 int dev = class_name2dev(name);
367 if (dev < 0 || dev > class_devno_max())
369 return class_num2obd(dev);
372 int class_uuid2dev(struct obd_uuid *uuid)
376 spin_lock(&obd_dev_lock);
377 for (i = 0; i < class_devno_max(); i++) {
378 struct obd_device *obd = class_num2obd(i);
379 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
380 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
381 spin_unlock(&obd_dev_lock);
385 spin_unlock(&obd_dev_lock);
390 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
392 int dev = class_uuid2dev(uuid);
395 return class_num2obd(dev);
398 struct obd_device *class_num2obd(int num)
400 struct obd_device *obd = NULL;
402 if (num < class_devno_max()) {
408 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
409 "%p obd_magic %08x != %08x\n",
410 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
411 LASSERTF(obd->obd_minor == num,
412 "%p obd_minor %0d != %0d\n",
413 obd, obd->obd_minor, num);
419 void class_obd_list(void)
424 spin_lock(&obd_dev_lock);
425 for (i = 0; i < class_devno_max(); i++) {
426 struct obd_device *obd = class_num2obd(i);
429 if (obd->obd_stopping)
431 else if (obd->obd_set_up)
433 else if (obd->obd_attached)
437 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
438 i, status, obd->obd_type->typ_name,
439 obd->obd_name, obd->obd_uuid.uuid,
440 atomic_read(&obd->obd_refcount));
442 spin_unlock(&obd_dev_lock);
446 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
447 specified, then only the client with that uuid is returned,
448 otherwise any client connected to the tgt is returned. */
449 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
450 const char * typ_name,
451 struct obd_uuid *grp_uuid)
455 spin_lock(&obd_dev_lock);
456 for (i = 0; i < class_devno_max(); i++) {
457 struct obd_device *obd = class_num2obd(i);
460 if ((strncmp(obd->obd_type->typ_name, typ_name,
461 strlen(typ_name)) == 0)) {
462 if (obd_uuid_equals(tgt_uuid,
463 &obd->u.cli.cl_target_uuid) &&
464 ((grp_uuid)? obd_uuid_equals(grp_uuid,
465 &obd->obd_uuid) : 1)) {
466 spin_unlock(&obd_dev_lock);
471 spin_unlock(&obd_dev_lock);
476 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
477 struct obd_uuid *grp_uuid)
479 struct obd_device *obd;
481 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
483 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
488 /* Iterate the obd_device list looking devices have grp_uuid. Start
489 searching at *next, and if a device is found, the next index to look
490 at is saved in *next. If next is NULL, then the first matching device
491 will always be returned. */
492 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
498 else if (*next >= 0 && *next < class_devno_max())
503 spin_lock(&obd_dev_lock);
504 for (; i < class_devno_max(); i++) {
505 struct obd_device *obd = class_num2obd(i);
508 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
511 spin_unlock(&obd_dev_lock);
515 spin_unlock(&obd_dev_lock);
521 void obd_cleanup_caches(void)
526 if (obd_device_cachep) {
527 rc = cfs_mem_cache_destroy(obd_device_cachep);
528 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
529 obd_device_cachep = NULL;
532 rc = cfs_mem_cache_destroy(obdo_cachep);
533 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
537 rc = cfs_mem_cache_destroy(import_cachep);
538 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
539 import_cachep = NULL;
542 rc = cfs_mem_cache_destroy(capa_cachep);
543 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
549 int obd_init_caches(void)
553 LASSERT(obd_device_cachep == NULL);
554 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
555 sizeof(struct obd_device),
557 if (!obd_device_cachep)
560 LASSERT(obdo_cachep == NULL);
561 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
566 LASSERT(import_cachep == NULL);
567 import_cachep = cfs_mem_cache_create("ll_import_cache",
568 sizeof(struct obd_import),
573 LASSERT(capa_cachep == NULL);
574 capa_cachep = cfs_mem_cache_create("capa_cache",
575 sizeof(struct obd_capa), 0, 0);
581 obd_cleanup_caches();
586 /* map connection to client */
587 struct obd_export *class_conn2export(struct lustre_handle *conn)
589 struct obd_export *export;
593 CDEBUG(D_CACHE, "looking for null handle\n");
597 if (conn->cookie == -1) { /* this means assign a new connection */
598 CDEBUG(D_CACHE, "want a new connection\n");
602 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
603 export = class_handle2object(conn->cookie);
607 struct obd_device *class_exp2obd(struct obd_export *exp)
614 struct obd_device *class_conn2obd(struct lustre_handle *conn)
616 struct obd_export *export;
617 export = class_conn2export(conn);
619 struct obd_device *obd = export->exp_obd;
620 class_export_put(export);
626 struct obd_import *class_exp2cliimp(struct obd_export *exp)
628 struct obd_device *obd = exp->exp_obd;
631 return obd->u.cli.cl_import;
634 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
636 struct obd_device *obd = class_conn2obd(conn);
639 return obd->u.cli.cl_import;
642 /* Export management functions */
643 static void export_handle_addref(void *export)
645 class_export_get(export);
648 void __class_export_put(struct obd_export *exp)
650 if (atomic_dec_and_test(&exp->exp_refcount)) {
651 LASSERT (list_empty(&exp->exp_obd_chain));
653 CDEBUG(D_IOCTL, "final put %p/%s\n",
654 exp, exp->exp_client_uuid.uuid);
656 spin_lock(&obd_zombie_impexp_lock);
657 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
658 spin_unlock(&obd_zombie_impexp_lock);
660 if (obd_zombie_impexp_notify != NULL)
661 obd_zombie_impexp_notify();
664 EXPORT_SYMBOL(__class_export_put);
666 void class_export_destroy(struct obd_export *exp)
668 struct obd_device *obd = exp->exp_obd;
671 LASSERT (atomic_read(&exp->exp_refcount) == 0);
673 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
674 exp->exp_client_uuid.uuid, obd->obd_name);
676 LASSERT(obd != NULL);
678 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
679 if (exp->exp_connection)
680 ptlrpc_put_connection_superhack(exp->exp_connection);
682 LASSERT(list_empty(&exp->exp_outstanding_replies));
683 LASSERT(list_empty(&exp->exp_req_replay_queue));
684 obd_destroy_export(exp);
686 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
691 /* Creates a new export, adds it to the hash table, and returns a
692 * pointer to it. The refcount is 2: one for the hash reference, and
693 * one for the pointer returned by this function. */
694 struct obd_export *class_new_export(struct obd_device *obd,
695 struct obd_uuid *cluuid)
697 struct obd_export *export;
700 OBD_ALLOC_PTR(export);
702 return ERR_PTR(-ENOMEM);
704 export->exp_conn_cnt = 0;
705 atomic_set(&export->exp_refcount, 2);
706 atomic_set(&export->exp_rpc_count, 0);
707 export->exp_obd = obd;
708 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
709 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
710 /* XXX this should be in LDLM init */
711 CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
712 spin_lock_init(&export->exp_ldlm_data.led_lock);
714 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
715 class_handle_hash(&export->exp_handle, export_handle_addref);
716 export->exp_last_request_time = cfs_time_current_sec();
717 spin_lock_init(&export->exp_lock);
718 INIT_HLIST_NODE(&export->exp_uuid_hash);
719 INIT_HLIST_NODE(&export->exp_nid_hash);
721 export->exp_sp_peer = LUSTRE_SP_ANY;
722 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
723 export->exp_client_uuid = *cluuid;
724 obd_init_export(export);
726 spin_lock(&obd->obd_dev_lock);
727 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
728 rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid,
729 &export->exp_uuid_hash);
731 CWARN("%s: denying duplicate export for %s\n",
732 obd->obd_name, cluuid->uuid);
733 spin_unlock(&obd->obd_dev_lock);
734 class_handle_unhash(&export->exp_handle);
735 OBD_FREE_PTR(export);
736 return ERR_PTR(-EALREADY);
740 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
742 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
743 list_add_tail(&export->exp_obd_chain_timed,
744 &export->exp_obd->obd_exports_timed);
745 export->exp_obd->obd_num_exports++;
746 spin_unlock(&obd->obd_dev_lock);
750 EXPORT_SYMBOL(class_new_export);
752 void class_unlink_export(struct obd_export *exp)
754 class_handle_unhash(&exp->exp_handle);
756 spin_lock(&exp->exp_obd->obd_dev_lock);
757 /* delete an uuid-export hashitem from hashtables */
758 if (!hlist_unhashed(&exp->exp_uuid_hash)) {
759 lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body,
760 &exp->exp_client_uuid, &exp->exp_uuid_hash);
762 list_del_init(&exp->exp_obd_chain);
763 list_del_init(&exp->exp_obd_chain_timed);
764 exp->exp_obd->obd_num_exports--;
765 spin_unlock(&exp->exp_obd->obd_dev_lock);
767 class_export_put(exp);
769 EXPORT_SYMBOL(class_unlink_export);
771 /* Import management functions */
772 static void import_handle_addref(void *import)
774 class_import_get(import);
777 struct obd_import *class_import_get(struct obd_import *import)
779 LASSERT(atomic_read(&import->imp_refcount) >= 0);
780 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
781 atomic_inc(&import->imp_refcount);
782 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
783 atomic_read(&import->imp_refcount));
786 EXPORT_SYMBOL(class_import_get);
788 void class_import_put(struct obd_import *import)
792 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
793 atomic_read(&import->imp_refcount) - 1);
795 LASSERT(atomic_read(&import->imp_refcount) > 0);
796 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
797 LASSERT(list_empty(&import->imp_zombie_chain));
799 if (atomic_dec_and_test(&import->imp_refcount)) {
801 CDEBUG(D_INFO, "final put import %p\n", import);
803 spin_lock(&obd_zombie_impexp_lock);
804 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
805 spin_unlock(&obd_zombie_impexp_lock);
807 if (obd_zombie_impexp_notify != NULL)
808 obd_zombie_impexp_notify();
813 EXPORT_SYMBOL(class_import_put);
815 void class_import_destroy(struct obd_import *import)
819 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
820 import->imp_obd->obd_name);
822 LASSERT(atomic_read(&import->imp_refcount) == 0);
824 ptlrpc_put_connection_superhack(import->imp_connection);
826 while (!list_empty(&import->imp_conn_list)) {
827 struct obd_import_conn *imp_conn;
829 imp_conn = list_entry(import->imp_conn_list.next,
830 struct obd_import_conn, oic_item);
831 list_del(&imp_conn->oic_item);
832 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
833 OBD_FREE(imp_conn, sizeof(*imp_conn));
836 LASSERT(import->imp_sec == NULL);
837 class_decref(import->imp_obd);
838 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
842 static void init_imp_at(struct imp_at *at) {
844 at_init(&at->iat_net_latency, 0, 0);
845 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
846 /* max service estimates are tracked on the server side, so
847 don't use the AT history here, just use the last reported
848 val. (But keep hist for proc histogram, worst_ever) */
849 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
854 struct obd_import *class_new_import(struct obd_device *obd)
856 struct obd_import *imp;
858 OBD_ALLOC(imp, sizeof(*imp));
862 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
863 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
864 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
865 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
866 spin_lock_init(&imp->imp_lock);
867 imp->imp_last_success_conn = 0;
868 imp->imp_state = LUSTRE_IMP_NEW;
869 imp->imp_obd = class_incref(obd);
870 sema_init(&imp->imp_sec_mutex, 1);
871 cfs_waitq_init(&imp->imp_recovery_waitq);
873 atomic_set(&imp->imp_refcount, 2);
874 atomic_set(&imp->imp_inflight, 0);
875 atomic_set(&imp->imp_replay_inflight, 0);
876 atomic_set(&imp->imp_inval_count, 0);
877 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
878 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
879 class_handle_hash(&imp->imp_handle, import_handle_addref);
880 init_imp_at(&imp->imp_at);
882 /* the default magic is V2, will be used in connect RPC, and
883 * then adjusted according to the flags in request/reply. */
884 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
888 EXPORT_SYMBOL(class_new_import);
890 void class_destroy_import(struct obd_import *import)
892 LASSERT(import != NULL);
893 LASSERT(import != LP_POISON);
895 class_handle_unhash(&import->imp_handle);
897 spin_lock(&import->imp_lock);
898 import->imp_generation++;
899 spin_unlock(&import->imp_lock);
900 class_import_put(import);
902 EXPORT_SYMBOL(class_destroy_import);
904 /* A connection defines an export context in which preallocation can
905 be managed. This releases the export pointer reference, and returns
906 the export handle, so the export refcount is 1 when this function
908 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
909 struct obd_uuid *cluuid)
911 struct obd_export *export;
912 LASSERT(conn != NULL);
913 LASSERT(obd != NULL);
914 LASSERT(cluuid != NULL);
917 export = class_new_export(obd, cluuid);
919 RETURN(PTR_ERR(export));
921 conn->cookie = export->exp_handle.h_cookie;
922 class_export_put(export);
924 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
925 cluuid->uuid, conn->cookie);
928 EXPORT_SYMBOL(class_connect);
930 /* if export is involved in recovery then clean up related things */
931 void class_export_recovery_cleanup(struct obd_export *exp)
933 struct obd_device *obd = exp->exp_obd;
935 spin_lock_bh(&obd->obd_processing_task_lock);
936 if (obd->obd_recovering && exp->exp_in_recovery) {
937 spin_lock(&exp->exp_lock);
938 exp->exp_in_recovery = 0;
939 spin_unlock(&exp->exp_lock);
940 obd->obd_connected_clients--;
941 /* each connected client is counted as recoverable */
942 obd->obd_recoverable_clients--;
943 if (exp->exp_req_replay_needed) {
944 spin_lock(&exp->exp_lock);
945 exp->exp_req_replay_needed = 0;
946 spin_unlock(&exp->exp_lock);
947 LASSERT(atomic_read(&obd->obd_req_replay_clients));
948 atomic_dec(&obd->obd_req_replay_clients);
950 if (exp->exp_lock_replay_needed) {
951 spin_lock(&exp->exp_lock);
952 exp->exp_lock_replay_needed = 0;
953 spin_unlock(&exp->exp_lock);
954 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
955 atomic_dec(&obd->obd_lock_replay_clients);
958 spin_unlock_bh(&obd->obd_processing_task_lock);
961 /* This function removes two references from the export: one for the
962 * hash entry and one for the export pointer passed in. The export
963 * pointer passed to this function is destroyed should not be used
965 int class_disconnect(struct obd_export *export)
967 int already_disconnected;
970 if (export == NULL) {
972 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
976 spin_lock(&export->exp_lock);
977 already_disconnected = export->exp_disconnected;
978 export->exp_disconnected = 1;
980 if (!hlist_unhashed(&export->exp_nid_hash)) {
981 lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
982 &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
984 spin_unlock(&export->exp_lock);
986 /* class_cleanup(), abort_recovery(), and class_fail_export()
987 * all end up in here, and if any of them race we shouldn't
988 * call extra class_export_puts(). */
989 if (already_disconnected)
992 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
993 export->exp_handle.h_cookie);
995 class_export_recovery_cleanup(export);
996 class_unlink_export(export);
997 class_export_put(export);
1001 static void class_disconnect_export_list(struct list_head *list, int flags)
1004 struct lustre_handle fake_conn;
1005 struct obd_export *fake_exp, *exp;
1008 /* It's possible that an export may disconnect itself, but
1009 * nothing else will be added to this list. */
1010 while (!list_empty(list)) {
1011 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1012 class_export_get(exp);
1014 spin_lock(&exp->exp_lock);
1015 exp->exp_flags = flags;
1016 spin_unlock(&exp->exp_lock);
1018 if (obd_uuid_equals(&exp->exp_client_uuid,
1019 &exp->exp_obd->obd_uuid)) {
1021 "exp %p export uuid == obd uuid, don't discon\n",
1023 /* Need to delete this now so we don't end up pointing
1024 * to work_list later when this export is cleaned up. */
1025 list_del_init(&exp->exp_obd_chain);
1026 class_export_put(exp);
1030 fake_conn.cookie = exp->exp_handle.h_cookie;
1031 fake_exp = class_conn2export(&fake_conn);
1033 class_export_put(exp);
1037 spin_lock(&fake_exp->exp_lock);
1038 fake_exp->exp_flags = flags;
1039 spin_unlock(&fake_exp->exp_lock);
1041 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1042 "last request at %ld\n",
1043 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1044 exp, exp->exp_last_request_time);
1045 rc = obd_disconnect(fake_exp);
1046 class_export_put(exp);
1051 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1053 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1054 (obd->obd_force ? OBD_OPT_FORCE : 0));
1057 void class_disconnect_exports(struct obd_device *obd)
1059 struct list_head work_list;
1062 /* Move all of the exports from obd_exports to a work list, en masse. */
1063 spin_lock(&obd->obd_dev_lock);
1064 list_add(&work_list, &obd->obd_exports);
1065 list_del_init(&obd->obd_exports);
1066 spin_unlock(&obd->obd_dev_lock);
1068 if (!list_empty(&work_list)) {
1069 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1070 "disconnecting them\n", obd->obd_minor, obd);
1071 class_disconnect_export_list(&work_list,
1072 get_exp_flags_from_obd(obd));
1074 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1075 obd->obd_minor, obd);
1078 EXPORT_SYMBOL(class_disconnect_exports);
1080 /* Remove exports that have not completed recovery.
1082 int class_disconnect_stale_exports(struct obd_device *obd,
1083 int (*test_export)(struct obd_export *))
1085 struct list_head work_list;
1086 struct list_head *pos, *n;
1087 struct obd_export *exp;
1091 CFS_INIT_LIST_HEAD(&work_list);
1092 spin_lock(&obd->obd_dev_lock);
1093 list_for_each_safe(pos, n, &obd->obd_exports) {
1094 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1095 if (test_export(exp))
1098 list_del(&exp->exp_obd_chain);
1099 list_add(&exp->exp_obd_chain, &work_list);
1100 /* don't count self-export as client */
1101 if (obd_uuid_equals(&exp->exp_client_uuid,
1102 &exp->exp_obd->obd_uuid))
1106 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1107 obd->obd_name, exp->exp_client_uuid.uuid,
1108 exp->exp_connection == NULL ? "<unknown>" :
1109 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1111 spin_unlock(&obd->obd_dev_lock);
1113 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1114 obd->obd_name, cnt);
1115 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1118 EXPORT_SYMBOL(class_disconnect_stale_exports);
1120 int oig_init(struct obd_io_group **oig_out)
1122 struct obd_io_group *oig;
1125 OBD_ALLOC(oig, sizeof(*oig));
1129 spin_lock_init(&oig->oig_lock);
1131 oig->oig_pending = 0;
1132 atomic_set(&oig->oig_refcount, 1);
1133 cfs_waitq_init(&oig->oig_waitq);
1134 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1139 EXPORT_SYMBOL(oig_init);
1141 static inline void oig_grab(struct obd_io_group *oig)
1143 atomic_inc(&oig->oig_refcount);
1146 void oig_release(struct obd_io_group *oig)
1148 if (atomic_dec_and_test(&oig->oig_refcount))
1149 OBD_FREE(oig, sizeof(*oig));
1151 EXPORT_SYMBOL(oig_release);
1153 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1156 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1157 spin_lock(&oig->oig_lock);
1163 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1165 spin_unlock(&oig->oig_lock);
1170 EXPORT_SYMBOL(oig_add_one);
1172 void oig_complete_one(struct obd_io_group *oig,
1173 struct oig_callback_context *occ, int rc)
1175 cfs_waitq_t *wake = NULL;
1178 spin_lock(&oig->oig_lock);
1181 list_del_init(&occ->occ_oig_item);
1183 old_rc = oig->oig_rc;
1184 if (oig->oig_rc == 0 && rc != 0)
1187 if (--oig->oig_pending <= 0)
1188 wake = &oig->oig_waitq;
1190 spin_unlock(&oig->oig_lock);
1192 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1193 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1196 cfs_waitq_signal(wake);
1199 EXPORT_SYMBOL(oig_complete_one);
1201 static int oig_done(struct obd_io_group *oig)
1204 spin_lock(&oig->oig_lock);
1205 if (oig->oig_pending <= 0)
1207 spin_unlock(&oig->oig_lock);
1211 static void interrupted_oig(void *data)
1213 struct obd_io_group *oig = data;
1214 struct oig_callback_context *occ;
1216 spin_lock(&oig->oig_lock);
1217 /* We need to restart the processing each time we drop the lock, as
1218 * it is possible other threads called oig_complete_one() to remove
1219 * an entry elsewhere in the list while we dropped lock. We need to
1220 * drop the lock because osc_ap_completion() calls oig_complete_one()
1221 * which re-gets this lock ;-) as well as a lock ordering issue. */
1223 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1224 if (occ->interrupted)
1226 occ->interrupted = 1;
1227 spin_unlock(&oig->oig_lock);
1228 occ->occ_interrupted(occ);
1229 spin_lock(&oig->oig_lock);
1232 spin_unlock(&oig->oig_lock);
1235 int oig_wait(struct obd_io_group *oig)
1237 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1240 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1243 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1244 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1245 /* we can't continue until the oig has emptied and stopped
1246 * referencing state that the caller will free upon return */
1248 lwi = (struct l_wait_info){ 0, };
1249 } while (rc == -EINTR);
1251 LASSERTF(oig->oig_pending == 0,
1252 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1255 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1258 EXPORT_SYMBOL(oig_wait);
1260 void class_fail_export(struct obd_export *exp)
1262 int rc, already_failed;
1264 spin_lock(&exp->exp_lock);
1265 already_failed = exp->exp_failed;
1266 exp->exp_failed = 1;
1267 spin_unlock(&exp->exp_lock);
1269 if (already_failed) {
1270 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1271 exp, exp->exp_client_uuid.uuid);
1275 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1276 exp, exp->exp_client_uuid.uuid);
1278 if (obd_dump_on_timeout)
1279 libcfs_debug_dumplog();
1281 /* Most callers into obd_disconnect are removing their own reference
1282 * (request, for example) in addition to the one from the hash table.
1283 * We don't have such a reference here, so make one. */
1284 class_export_get(exp);
1285 rc = obd_disconnect(exp);
1287 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1289 CDEBUG(D_HA, "disconnected export %p/%s\n",
1290 exp, exp->exp_client_uuid.uuid);
1292 EXPORT_SYMBOL(class_fail_export);
1294 char *obd_export_nid2str(struct obd_export *exp)
1296 if (exp->exp_connection != NULL)
1297 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1301 EXPORT_SYMBOL(obd_export_nid2str);
1303 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1305 struct obd_export *doomed_exp = NULL;
1306 int exports_evicted = 0;
1308 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1311 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
1313 if (doomed_exp == NULL)
1316 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1317 "nid %s found, wanted nid %s, requested nid %s\n",
1318 obd_export_nid2str(doomed_exp),
1319 libcfs_nid2str(nid_key), nid);
1320 LASSERTF(doomed_exp != obd->obd_self_export,
1321 "self-export is hashed by NID?\n");
1323 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1324 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1326 class_fail_export(doomed_exp);
1327 class_export_put(doomed_exp);
1330 if (!exports_evicted)
1331 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1332 obd->obd_name, nid);
1333 return exports_evicted;
1335 EXPORT_SYMBOL(obd_export_evict_by_nid);
1337 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1339 struct obd_export *doomed_exp = NULL;
1340 struct obd_uuid doomed;
1341 int exports_evicted = 0;
1343 obd_str2uuid(&doomed, uuid);
1344 if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
1345 CERROR("%s: can't evict myself\n", obd->obd_name);
1346 return exports_evicted;
1349 doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body,
1352 if (doomed_exp == NULL) {
1353 CERROR("%s: can't disconnect %s: no exports found\n",
1354 obd->obd_name, uuid);
1356 CWARN("%s: evicting %s at adminstrative request\n",
1357 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1358 class_fail_export(doomed_exp);
1359 class_export_put(doomed_exp);
1363 return exports_evicted;
1365 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1368 * kill zombie imports and exports
1370 void obd_zombie_impexp_cull(void)
1372 struct obd_import *import;
1373 struct obd_export *export;
1377 spin_lock (&obd_zombie_impexp_lock);
1380 if (!list_empty(&obd_zombie_imports)) {
1381 import = list_entry(obd_zombie_imports.next,
1384 list_del(&import->imp_zombie_chain);
1388 if (!list_empty(&obd_zombie_exports)) {
1389 export = list_entry(obd_zombie_exports.next,
1392 list_del_init(&export->exp_obd_chain);
1395 spin_unlock(&obd_zombie_impexp_lock);
1398 class_import_destroy(import);
1401 class_export_destroy(export);
1403 } while (import != NULL || export != NULL);
1407 static struct completion obd_zombie_start;
1408 static struct completion obd_zombie_stop;
1409 static unsigned long obd_zombie_flags;
1410 static cfs_waitq_t obd_zombie_waitq;
1417 * check for work for kill zombie import/export thread.
1419 int obd_zombie_impexp_check(void *arg)
1423 spin_lock(&obd_zombie_impexp_lock);
1424 rc = list_empty(&obd_zombie_imports) &&
1425 list_empty(&obd_zombie_exports) &&
1426 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1428 spin_unlock(&obd_zombie_impexp_lock);
1434 * notify import/export destroy thread about new zombie.
1436 static void obd_zombie_impexp_notify(void)
1438 cfs_waitq_signal(&obd_zombie_waitq);
1444 * destroy zombie export/import thread.
1446 static int obd_zombie_impexp_thread(void *unused)
1450 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1451 complete(&obd_zombie_start);
1455 complete(&obd_zombie_start);
1457 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1458 struct l_wait_info lwi = { 0 };
1460 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1462 obd_zombie_impexp_cull();
1465 complete(&obd_zombie_stop);
1470 #else /* ! KERNEL */
1472 static atomic_t zombie_recur = ATOMIC_INIT(0);
1473 static void *obd_zombie_impexp_work_cb;
1474 static void *obd_zombie_impexp_idle_cb;
1476 int obd_zombie_impexp_kill(void *arg)
1480 if (atomic_inc_return(&zombie_recur) == 1) {
1481 obd_zombie_impexp_cull();
1484 atomic_dec(&zombie_recur);
1491 * start destroy zombie import/export thread
1493 int obd_zombie_impexp_init(void)
1497 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1498 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1499 spin_lock_init(&obd_zombie_impexp_lock);
1500 init_completion(&obd_zombie_start);
1501 init_completion(&obd_zombie_stop);
1502 cfs_waitq_init(&obd_zombie_waitq);
1505 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1509 wait_for_completion(&obd_zombie_start);
1512 obd_zombie_impexp_work_cb =
1513 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1514 &obd_zombie_impexp_kill, NULL);
1516 obd_zombie_impexp_idle_cb =
1517 liblustre_register_idle_callback("obd_zombi_impexp_check",
1518 &obd_zombie_impexp_check, NULL);
1525 * stop destroy zombie import/export thread
1527 void obd_zombie_impexp_stop(void)
1529 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1530 obd_zombie_impexp_notify();
1532 wait_for_completion(&obd_zombie_stop);
1534 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1535 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);