1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 EXPORT_SYMBOL(obd_device_free);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = ldt->ldt_ops->ldto_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
257 struct obd_device *class_newdev(const char *type_name, const char *name)
259 struct obd_device *result = NULL;
260 struct obd_device *newdev;
261 struct obd_type *type = NULL;
263 int new_obd_minor = 0;
265 if (strlen(name) > MAX_OBD_NAME) {
266 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
267 RETURN(ERR_PTR(-EINVAL));
270 type = class_get_type(type_name);
272 CERROR("OBD: unknown type: %s\n", type_name);
273 RETURN(ERR_PTR(-ENODEV));
276 newdev = obd_device_alloc();
277 if (newdev == NULL) {
278 class_put_type(type);
279 RETURN(ERR_PTR(-ENOMEM));
281 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
283 spin_lock(&obd_dev_lock);
284 for (i = 0; i < class_devno_max(); i++) {
285 struct obd_device *obd = class_num2obd(i);
286 if (obd && obd->obd_name &&
287 (strcmp(name, obd->obd_name) == 0)) {
288 CERROR("Device %s already exists, won't add\n", name);
290 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
291 "%p obd_magic %08x != %08x\n", result,
292 result->obd_magic, OBD_DEVICE_MAGIC);
293 LASSERTF(result->obd_minor == new_obd_minor,
294 "%p obd_minor %d != %d\n", result,
295 result->obd_minor, new_obd_minor);
297 obd_devs[result->obd_minor] = NULL;
298 result->obd_name[0]='\0';
300 result = ERR_PTR(-EEXIST);
303 if (!result && !obd) {
305 result->obd_minor = i;
307 result->obd_type = type;
308 memcpy(result->obd_name, name, strlen(name));
309 obd_devs[i] = result;
312 spin_unlock(&obd_dev_lock);
314 if (result == NULL && i >= class_devno_max()) {
315 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
317 result = ERR_PTR(-EOVERFLOW);
320 if (IS_ERR(result)) {
321 obd_device_free(newdev);
322 class_put_type(type);
324 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
325 result->obd_name, result);
330 void class_release_dev(struct obd_device *obd)
332 struct obd_type *obd_type = obd->obd_type;
334 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
335 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
336 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
337 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
338 LASSERT(obd_type != NULL);
340 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
341 obd->obd_name,obd->obd_type->typ_name);
343 spin_lock(&obd_dev_lock);
344 obd_devs[obd->obd_minor] = NULL;
345 spin_unlock(&obd_dev_lock);
346 obd_device_free(obd);
348 class_put_type(obd_type);
351 int class_name2dev(const char *name)
358 spin_lock(&obd_dev_lock);
359 for (i = 0; i < class_devno_max(); i++) {
360 struct obd_device *obd = class_num2obd(i);
361 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
362 /* Make sure we finished attaching before we give
363 out any references */
364 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
365 if (obd->obd_attached) {
366 spin_unlock(&obd_dev_lock);
372 spin_unlock(&obd_dev_lock);
377 struct obd_device *class_name2obd(const char *name)
379 int dev = class_name2dev(name);
381 if (dev < 0 || dev > class_devno_max())
383 return class_num2obd(dev);
386 int class_uuid2dev(struct obd_uuid *uuid)
390 spin_lock(&obd_dev_lock);
391 for (i = 0; i < class_devno_max(); i++) {
392 struct obd_device *obd = class_num2obd(i);
393 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
394 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
395 spin_unlock(&obd_dev_lock);
399 spin_unlock(&obd_dev_lock);
404 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
406 int dev = class_uuid2dev(uuid);
409 return class_num2obd(dev);
412 struct obd_device *class_num2obd(int num)
414 struct obd_device *obd = NULL;
416 if (num < class_devno_max()) {
422 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
423 "%p obd_magic %08x != %08x\n",
424 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
425 LASSERTF(obd->obd_minor == num,
426 "%p obd_minor %0d != %0d\n",
427 obd, obd->obd_minor, num);
433 void class_obd_list(void)
438 spin_lock(&obd_dev_lock);
439 for (i = 0; i < class_devno_max(); i++) {
440 struct obd_device *obd = class_num2obd(i);
443 if (obd->obd_stopping)
445 else if (obd->obd_set_up)
447 else if (obd->obd_attached)
451 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
452 i, status, obd->obd_type->typ_name,
453 obd->obd_name, obd->obd_uuid.uuid,
454 atomic_read(&obd->obd_refcount));
456 spin_unlock(&obd_dev_lock);
460 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
461 specified, then only the client with that uuid is returned,
462 otherwise any client connected to the tgt is returned. */
463 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
464 const char * typ_name,
465 struct obd_uuid *grp_uuid)
469 spin_lock(&obd_dev_lock);
470 for (i = 0; i < class_devno_max(); i++) {
471 struct obd_device *obd = class_num2obd(i);
474 if ((strncmp(obd->obd_type->typ_name, typ_name,
475 strlen(typ_name)) == 0)) {
476 if (obd_uuid_equals(tgt_uuid,
477 &obd->u.cli.cl_target_uuid) &&
478 ((grp_uuid)? obd_uuid_equals(grp_uuid,
479 &obd->obd_uuid) : 1)) {
480 spin_unlock(&obd_dev_lock);
485 spin_unlock(&obd_dev_lock);
490 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
491 struct obd_uuid *grp_uuid)
493 struct obd_device *obd;
495 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
497 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
502 /* Iterate the obd_device list looking devices have grp_uuid. Start
503 searching at *next, and if a device is found, the next index to look
504 at is saved in *next. If next is NULL, then the first matching device
505 will always be returned. */
506 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
512 else if (*next >= 0 && *next < class_devno_max())
517 spin_lock(&obd_dev_lock);
518 for (; i < class_devno_max(); i++) {
519 struct obd_device *obd = class_num2obd(i);
522 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
525 spin_unlock(&obd_dev_lock);
529 spin_unlock(&obd_dev_lock);
535 void obd_cleanup_caches(void)
540 if (obd_device_cachep) {
541 rc = cfs_mem_cache_destroy(obd_device_cachep);
542 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
543 obd_device_cachep = NULL;
546 rc = cfs_mem_cache_destroy(obdo_cachep);
547 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
551 rc = cfs_mem_cache_destroy(import_cachep);
552 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
553 import_cachep = NULL;
556 rc = cfs_mem_cache_destroy(capa_cachep);
557 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
563 int obd_init_caches(void)
567 LASSERT(obd_device_cachep == NULL);
568 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
569 sizeof(struct obd_device),
571 if (!obd_device_cachep)
574 LASSERT(obdo_cachep == NULL);
575 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
580 LASSERT(import_cachep == NULL);
581 import_cachep = cfs_mem_cache_create("ll_import_cache",
582 sizeof(struct obd_import),
587 LASSERT(capa_cachep == NULL);
588 capa_cachep = cfs_mem_cache_create("capa_cache",
589 sizeof(struct obd_capa), 0, 0);
595 obd_cleanup_caches();
600 /* map connection to client */
601 struct obd_export *class_conn2export(struct lustre_handle *conn)
603 struct obd_export *export;
607 CDEBUG(D_CACHE, "looking for null handle\n");
611 if (conn->cookie == -1) { /* this means assign a new connection */
612 CDEBUG(D_CACHE, "want a new connection\n");
616 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
617 export = class_handle2object(conn->cookie);
621 struct obd_device *class_exp2obd(struct obd_export *exp)
628 struct obd_device *class_conn2obd(struct lustre_handle *conn)
630 struct obd_export *export;
631 export = class_conn2export(conn);
633 struct obd_device *obd = export->exp_obd;
634 class_export_put(export);
640 struct obd_import *class_exp2cliimp(struct obd_export *exp)
642 struct obd_device *obd = exp->exp_obd;
645 return obd->u.cli.cl_import;
648 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
650 struct obd_device *obd = class_conn2obd(conn);
653 return obd->u.cli.cl_import;
656 /* Export management functions */
657 static void export_handle_addref(void *export)
659 class_export_get(export);
662 void __class_export_put(struct obd_export *exp)
664 if (atomic_dec_and_test(&exp->exp_refcount)) {
665 LASSERT (list_empty(&exp->exp_obd_chain));
667 CDEBUG(D_IOCTL, "final put %p/%s\n",
668 exp, exp->exp_client_uuid.uuid);
670 spin_lock(&obd_zombie_impexp_lock);
671 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
672 spin_unlock(&obd_zombie_impexp_lock);
674 if (obd_zombie_impexp_notify != NULL)
675 obd_zombie_impexp_notify();
678 EXPORT_SYMBOL(__class_export_put);
680 void class_export_destroy(struct obd_export *exp)
682 struct obd_device *obd = exp->exp_obd;
685 LASSERT (atomic_read(&exp->exp_refcount) == 0);
687 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
688 exp->exp_client_uuid.uuid, obd->obd_name);
690 LASSERT(obd != NULL);
692 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
693 if (exp->exp_connection)
694 ptlrpc_put_connection_superhack(exp->exp_connection);
696 LASSERT(list_empty(&exp->exp_outstanding_replies));
697 LASSERT(list_empty(&exp->exp_req_replay_queue));
698 obd_destroy_export(exp);
700 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
705 /* Creates a new export, adds it to the hash table, and returns a
706 * pointer to it. The refcount is 2: one for the hash reference, and
707 * one for the pointer returned by this function. */
708 struct obd_export *class_new_export(struct obd_device *obd,
709 struct obd_uuid *cluuid)
711 struct obd_export *export;
714 OBD_ALLOC_PTR(export);
716 return ERR_PTR(-ENOMEM);
718 export->exp_conn_cnt = 0;
719 atomic_set(&export->exp_refcount, 2);
720 atomic_set(&export->exp_rpc_count, 0);
721 export->exp_obd = obd;
722 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
723 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
724 /* XXX this should be in LDLM init */
725 CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
726 spin_lock_init(&export->exp_ldlm_data.led_lock);
728 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
729 class_handle_hash(&export->exp_handle, export_handle_addref);
730 export->exp_last_request_time = cfs_time_current_sec();
731 spin_lock_init(&export->exp_lock);
732 INIT_HLIST_NODE(&export->exp_uuid_hash);
733 INIT_HLIST_NODE(&export->exp_nid_hash);
735 export->exp_sp_peer = LUSTRE_SP_ANY;
736 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
737 export->exp_client_uuid = *cluuid;
738 obd_init_export(export);
740 spin_lock(&obd->obd_dev_lock);
741 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
742 rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid,
743 &export->exp_uuid_hash);
745 CWARN("%s: denying duplicate export for %s\n",
746 obd->obd_name, cluuid->uuid);
747 spin_unlock(&obd->obd_dev_lock);
748 class_handle_unhash(&export->exp_handle);
749 OBD_FREE_PTR(export);
750 return ERR_PTR(-EALREADY);
754 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
756 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
757 list_add_tail(&export->exp_obd_chain_timed,
758 &export->exp_obd->obd_exports_timed);
759 export->exp_obd->obd_num_exports++;
760 spin_unlock(&obd->obd_dev_lock);
764 EXPORT_SYMBOL(class_new_export);
766 void class_unlink_export(struct obd_export *exp)
768 class_handle_unhash(&exp->exp_handle);
770 spin_lock(&exp->exp_obd->obd_dev_lock);
771 /* delete an uuid-export hashitem from hashtables */
772 if (!hlist_unhashed(&exp->exp_uuid_hash)) {
773 lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body,
774 &exp->exp_client_uuid, &exp->exp_uuid_hash);
776 list_del_init(&exp->exp_obd_chain);
777 list_del_init(&exp->exp_obd_chain_timed);
778 exp->exp_obd->obd_num_exports--;
779 spin_unlock(&exp->exp_obd->obd_dev_lock);
781 class_export_put(exp);
783 EXPORT_SYMBOL(class_unlink_export);
785 /* Import management functions */
786 static void import_handle_addref(void *import)
788 class_import_get(import);
791 struct obd_import *class_import_get(struct obd_import *import)
793 LASSERT(atomic_read(&import->imp_refcount) >= 0);
794 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
795 atomic_inc(&import->imp_refcount);
796 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
797 atomic_read(&import->imp_refcount));
800 EXPORT_SYMBOL(class_import_get);
802 void class_import_put(struct obd_import *import)
806 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
807 atomic_read(&import->imp_refcount) - 1);
809 LASSERT(atomic_read(&import->imp_refcount) > 0);
810 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
811 LASSERT(list_empty(&import->imp_zombie_chain));
813 if (atomic_dec_and_test(&import->imp_refcount)) {
815 CDEBUG(D_INFO, "final put import %p\n", import);
817 spin_lock(&obd_zombie_impexp_lock);
818 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
819 spin_unlock(&obd_zombie_impexp_lock);
821 if (obd_zombie_impexp_notify != NULL)
822 obd_zombie_impexp_notify();
827 EXPORT_SYMBOL(class_import_put);
829 void class_import_destroy(struct obd_import *import)
833 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
834 import->imp_obd->obd_name);
836 LASSERT(atomic_read(&import->imp_refcount) == 0);
838 ptlrpc_put_connection_superhack(import->imp_connection);
840 while (!list_empty(&import->imp_conn_list)) {
841 struct obd_import_conn *imp_conn;
843 imp_conn = list_entry(import->imp_conn_list.next,
844 struct obd_import_conn, oic_item);
845 list_del(&imp_conn->oic_item);
846 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
847 OBD_FREE(imp_conn, sizeof(*imp_conn));
850 LASSERT(import->imp_sec == NULL);
851 class_decref(import->imp_obd);
852 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
856 static void init_imp_at(struct imp_at *at) {
858 at_init(&at->iat_net_latency, 0, 0);
859 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
860 /* max service estimates are tracked on the server side, so
861 don't use the AT history here, just use the last reported
862 val. (But keep hist for proc histogram, worst_ever) */
863 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
868 struct obd_import *class_new_import(struct obd_device *obd)
870 struct obd_import *imp;
872 OBD_ALLOC(imp, sizeof(*imp));
876 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
877 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
878 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
879 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
880 spin_lock_init(&imp->imp_lock);
881 imp->imp_last_success_conn = 0;
882 imp->imp_state = LUSTRE_IMP_NEW;
883 imp->imp_obd = class_incref(obd);
884 sema_init(&imp->imp_sec_mutex, 1);
885 cfs_waitq_init(&imp->imp_recovery_waitq);
887 atomic_set(&imp->imp_refcount, 2);
888 atomic_set(&imp->imp_inflight, 0);
889 atomic_set(&imp->imp_replay_inflight, 0);
890 atomic_set(&imp->imp_inval_count, 0);
891 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
892 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
893 class_handle_hash(&imp->imp_handle, import_handle_addref);
894 init_imp_at(&imp->imp_at);
896 /* the default magic is V2, will be used in connect RPC, and
897 * then adjusted according to the flags in request/reply. */
898 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
902 EXPORT_SYMBOL(class_new_import);
904 void class_destroy_import(struct obd_import *import)
906 LASSERT(import != NULL);
907 LASSERT(import != LP_POISON);
909 class_handle_unhash(&import->imp_handle);
911 spin_lock(&import->imp_lock);
912 import->imp_generation++;
913 spin_unlock(&import->imp_lock);
914 class_import_put(import);
916 EXPORT_SYMBOL(class_destroy_import);
918 /* A connection defines an export context in which preallocation can
919 be managed. This releases the export pointer reference, and returns
920 the export handle, so the export refcount is 1 when this function
922 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
923 struct obd_uuid *cluuid)
925 struct obd_export *export;
926 LASSERT(conn != NULL);
927 LASSERT(obd != NULL);
928 LASSERT(cluuid != NULL);
931 export = class_new_export(obd, cluuid);
933 RETURN(PTR_ERR(export));
935 conn->cookie = export->exp_handle.h_cookie;
936 class_export_put(export);
938 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
939 cluuid->uuid, conn->cookie);
942 EXPORT_SYMBOL(class_connect);
944 /* if export is involved in recovery then clean up related things */
945 void class_export_recovery_cleanup(struct obd_export *exp)
947 struct obd_device *obd = exp->exp_obd;
949 spin_lock_bh(&obd->obd_processing_task_lock);
950 if (obd->obd_recovering && exp->exp_in_recovery) {
951 spin_lock(&exp->exp_lock);
952 exp->exp_in_recovery = 0;
953 spin_unlock(&exp->exp_lock);
954 obd->obd_connected_clients--;
955 /* each connected client is counted as recoverable */
956 obd->obd_recoverable_clients--;
957 if (exp->exp_req_replay_needed) {
958 spin_lock(&exp->exp_lock);
959 exp->exp_req_replay_needed = 0;
960 spin_unlock(&exp->exp_lock);
961 LASSERT(atomic_read(&obd->obd_req_replay_clients));
962 atomic_dec(&obd->obd_req_replay_clients);
964 if (exp->exp_lock_replay_needed) {
965 spin_lock(&exp->exp_lock);
966 exp->exp_lock_replay_needed = 0;
967 spin_unlock(&exp->exp_lock);
968 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
969 atomic_dec(&obd->obd_lock_replay_clients);
972 spin_unlock_bh(&obd->obd_processing_task_lock);
975 /* This function removes two references from the export: one for the
976 * hash entry and one for the export pointer passed in. The export
977 * pointer passed to this function is destroyed should not be used
979 int class_disconnect(struct obd_export *export)
981 int already_disconnected;
984 if (export == NULL) {
986 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
990 spin_lock(&export->exp_lock);
991 already_disconnected = export->exp_disconnected;
992 export->exp_disconnected = 1;
994 if (!hlist_unhashed(&export->exp_nid_hash)) {
995 lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
996 &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
998 spin_unlock(&export->exp_lock);
1000 /* class_cleanup(), abort_recovery(), and class_fail_export()
1001 * all end up in here, and if any of them race we shouldn't
1002 * call extra class_export_puts(). */
1003 if (already_disconnected)
1006 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1007 export->exp_handle.h_cookie);
1009 class_export_recovery_cleanup(export);
1010 class_unlink_export(export);
1011 class_export_put(export);
1015 static void class_disconnect_export_list(struct list_head *list, int flags)
1018 struct lustre_handle fake_conn;
1019 struct obd_export *fake_exp, *exp;
1022 /* It's possible that an export may disconnect itself, but
1023 * nothing else will be added to this list. */
1024 while (!list_empty(list)) {
1025 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1026 class_export_get(exp);
1028 spin_lock(&exp->exp_lock);
1029 exp->exp_flags = flags;
1030 spin_unlock(&exp->exp_lock);
1032 if (obd_uuid_equals(&exp->exp_client_uuid,
1033 &exp->exp_obd->obd_uuid)) {
1035 "exp %p export uuid == obd uuid, don't discon\n",
1037 /* Need to delete this now so we don't end up pointing
1038 * to work_list later when this export is cleaned up. */
1039 list_del_init(&exp->exp_obd_chain);
1040 class_export_put(exp);
1044 fake_conn.cookie = exp->exp_handle.h_cookie;
1045 fake_exp = class_conn2export(&fake_conn);
1047 class_export_put(exp);
1051 spin_lock(&fake_exp->exp_lock);
1052 fake_exp->exp_flags = flags;
1053 spin_unlock(&fake_exp->exp_lock);
1055 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1056 "last request at %ld\n",
1057 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1058 exp, exp->exp_last_request_time);
1059 rc = obd_disconnect(fake_exp);
1060 class_export_put(exp);
1065 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1067 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1068 (obd->obd_force ? OBD_OPT_FORCE : 0));
1071 void class_disconnect_exports(struct obd_device *obd)
1073 struct list_head work_list;
1076 /* Move all of the exports from obd_exports to a work list, en masse. */
1077 spin_lock(&obd->obd_dev_lock);
1078 list_add(&work_list, &obd->obd_exports);
1079 list_del_init(&obd->obd_exports);
1080 spin_unlock(&obd->obd_dev_lock);
1082 if (!list_empty(&work_list)) {
1083 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1084 "disconnecting them\n", obd->obd_minor, obd);
1085 class_disconnect_export_list(&work_list,
1086 get_exp_flags_from_obd(obd));
1088 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1089 obd->obd_minor, obd);
1092 EXPORT_SYMBOL(class_disconnect_exports);
1094 /* Remove exports that have not completed recovery.
1096 int class_disconnect_stale_exports(struct obd_device *obd,
1097 int (*test_export)(struct obd_export *))
1099 struct list_head work_list;
1100 struct list_head *pos, *n;
1101 struct obd_export *exp;
1105 CFS_INIT_LIST_HEAD(&work_list);
1106 spin_lock(&obd->obd_dev_lock);
1107 list_for_each_safe(pos, n, &obd->obd_exports) {
1108 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1109 if (test_export(exp))
1112 list_del(&exp->exp_obd_chain);
1113 list_add(&exp->exp_obd_chain, &work_list);
1114 /* don't count self-export as client */
1115 if (obd_uuid_equals(&exp->exp_client_uuid,
1116 &exp->exp_obd->obd_uuid))
1120 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1121 obd->obd_name, exp->exp_client_uuid.uuid,
1122 exp->exp_connection == NULL ? "<unknown>" :
1123 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1125 spin_unlock(&obd->obd_dev_lock);
1127 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1128 obd->obd_name, cnt);
1129 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1132 EXPORT_SYMBOL(class_disconnect_stale_exports);
1134 int oig_init(struct obd_io_group **oig_out)
1136 struct obd_io_group *oig;
1139 OBD_ALLOC(oig, sizeof(*oig));
1143 spin_lock_init(&oig->oig_lock);
1145 oig->oig_pending = 0;
1146 atomic_set(&oig->oig_refcount, 1);
1147 cfs_waitq_init(&oig->oig_waitq);
1148 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1153 EXPORT_SYMBOL(oig_init);
1155 static inline void oig_grab(struct obd_io_group *oig)
1157 atomic_inc(&oig->oig_refcount);
1160 void oig_release(struct obd_io_group *oig)
1162 if (atomic_dec_and_test(&oig->oig_refcount))
1163 OBD_FREE(oig, sizeof(*oig));
1165 EXPORT_SYMBOL(oig_release);
1167 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1170 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1171 spin_lock(&oig->oig_lock);
1177 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1179 spin_unlock(&oig->oig_lock);
1184 EXPORT_SYMBOL(oig_add_one);
1186 void oig_complete_one(struct obd_io_group *oig,
1187 struct oig_callback_context *occ, int rc)
1189 cfs_waitq_t *wake = NULL;
1192 spin_lock(&oig->oig_lock);
1195 list_del_init(&occ->occ_oig_item);
1197 old_rc = oig->oig_rc;
1198 if (oig->oig_rc == 0 && rc != 0)
1201 if (--oig->oig_pending <= 0)
1202 wake = &oig->oig_waitq;
1204 spin_unlock(&oig->oig_lock);
1206 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1207 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1210 cfs_waitq_signal(wake);
1213 EXPORT_SYMBOL(oig_complete_one);
1215 static int oig_done(struct obd_io_group *oig)
1218 spin_lock(&oig->oig_lock);
1219 if (oig->oig_pending <= 0)
1221 spin_unlock(&oig->oig_lock);
1225 static void interrupted_oig(void *data)
1227 struct obd_io_group *oig = data;
1228 struct oig_callback_context *occ;
1230 spin_lock(&oig->oig_lock);
1231 /* We need to restart the processing each time we drop the lock, as
1232 * it is possible other threads called oig_complete_one() to remove
1233 * an entry elsewhere in the list while we dropped lock. We need to
1234 * drop the lock because osc_ap_completion() calls oig_complete_one()
1235 * which re-gets this lock ;-) as well as a lock ordering issue. */
1237 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1238 if (occ->interrupted)
1240 occ->interrupted = 1;
1241 spin_unlock(&oig->oig_lock);
1242 occ->occ_interrupted(occ);
1243 spin_lock(&oig->oig_lock);
1246 spin_unlock(&oig->oig_lock);
1249 int oig_wait(struct obd_io_group *oig)
1251 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1254 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1257 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1258 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1259 /* we can't continue until the oig has emptied and stopped
1260 * referencing state that the caller will free upon return */
1262 lwi = (struct l_wait_info){ 0, };
1263 } while (rc == -EINTR);
1265 LASSERTF(oig->oig_pending == 0,
1266 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1269 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1272 EXPORT_SYMBOL(oig_wait);
1274 void class_fail_export(struct obd_export *exp)
1276 int rc, already_failed;
1278 spin_lock(&exp->exp_lock);
1279 already_failed = exp->exp_failed;
1280 exp->exp_failed = 1;
1281 spin_unlock(&exp->exp_lock);
1283 if (already_failed) {
1284 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1285 exp, exp->exp_client_uuid.uuid);
1289 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1290 exp, exp->exp_client_uuid.uuid);
1292 if (obd_dump_on_timeout)
1293 libcfs_debug_dumplog();
1295 /* Most callers into obd_disconnect are removing their own reference
1296 * (request, for example) in addition to the one from the hash table.
1297 * We don't have such a reference here, so make one. */
1298 class_export_get(exp);
1299 rc = obd_disconnect(exp);
1301 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1303 CDEBUG(D_HA, "disconnected export %p/%s\n",
1304 exp, exp->exp_client_uuid.uuid);
1306 EXPORT_SYMBOL(class_fail_export);
1308 char *obd_export_nid2str(struct obd_export *exp)
1310 if (exp->exp_connection != NULL)
1311 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1315 EXPORT_SYMBOL(obd_export_nid2str);
1317 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1319 struct obd_export *doomed_exp = NULL;
1320 int exports_evicted = 0;
1322 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1325 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
1327 if (doomed_exp == NULL)
1330 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1331 "nid %s found, wanted nid %s, requested nid %s\n",
1332 obd_export_nid2str(doomed_exp),
1333 libcfs_nid2str(nid_key), nid);
1334 LASSERTF(doomed_exp != obd->obd_self_export,
1335 "self-export is hashed by NID?\n");
1337 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1338 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1340 class_fail_export(doomed_exp);
1341 class_export_put(doomed_exp);
1344 if (!exports_evicted)
1345 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1346 obd->obd_name, nid);
1347 return exports_evicted;
1349 EXPORT_SYMBOL(obd_export_evict_by_nid);
1351 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1353 struct obd_export *doomed_exp = NULL;
1354 struct obd_uuid doomed;
1355 int exports_evicted = 0;
1357 obd_str2uuid(&doomed, uuid);
1358 if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
1359 CERROR("%s: can't evict myself\n", obd->obd_name);
1360 return exports_evicted;
1363 doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body,
1366 if (doomed_exp == NULL) {
1367 CERROR("%s: can't disconnect %s: no exports found\n",
1368 obd->obd_name, uuid);
1370 CWARN("%s: evicting %s at adminstrative request\n",
1371 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1372 class_fail_export(doomed_exp);
1373 class_export_put(doomed_exp);
1377 return exports_evicted;
1379 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1382 * kill zombie imports and exports
1384 void obd_zombie_impexp_cull(void)
1386 struct obd_import *import;
1387 struct obd_export *export;
1391 spin_lock (&obd_zombie_impexp_lock);
1394 if (!list_empty(&obd_zombie_imports)) {
1395 import = list_entry(obd_zombie_imports.next,
1398 list_del(&import->imp_zombie_chain);
1402 if (!list_empty(&obd_zombie_exports)) {
1403 export = list_entry(obd_zombie_exports.next,
1406 list_del_init(&export->exp_obd_chain);
1409 spin_unlock(&obd_zombie_impexp_lock);
1412 class_import_destroy(import);
1415 class_export_destroy(export);
1417 } while (import != NULL || export != NULL);
1421 static struct completion obd_zombie_start;
1422 static struct completion obd_zombie_stop;
1423 static unsigned long obd_zombie_flags;
1424 static cfs_waitq_t obd_zombie_waitq;
1431 * check for work for kill zombie import/export thread.
1433 int obd_zombie_impexp_check(void *arg)
1437 spin_lock(&obd_zombie_impexp_lock);
1438 rc = list_empty(&obd_zombie_imports) &&
1439 list_empty(&obd_zombie_exports) &&
1440 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1442 spin_unlock(&obd_zombie_impexp_lock);
1448 * notify import/export destroy thread about new zombie.
1450 static void obd_zombie_impexp_notify(void)
1452 cfs_waitq_signal(&obd_zombie_waitq);
1458 * destroy zombie export/import thread.
1460 static int obd_zombie_impexp_thread(void *unused)
1464 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1465 complete(&obd_zombie_start);
1469 complete(&obd_zombie_start);
1471 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1472 struct l_wait_info lwi = { 0 };
1474 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1476 obd_zombie_impexp_cull();
1479 complete(&obd_zombie_stop);
1484 #else /* ! KERNEL */
1486 static atomic_t zombie_recur = ATOMIC_INIT(0);
1487 static void *obd_zombie_impexp_work_cb;
1488 static void *obd_zombie_impexp_idle_cb;
1490 int obd_zombie_impexp_kill(void *arg)
1494 if (atomic_inc_return(&zombie_recur) == 1) {
1495 obd_zombie_impexp_cull();
1498 atomic_dec(&zombie_recur);
1505 * start destroy zombie import/export thread
1507 int obd_zombie_impexp_init(void)
1511 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1512 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1513 spin_lock_init(&obd_zombie_impexp_lock);
1514 init_completion(&obd_zombie_start);
1515 init_completion(&obd_zombie_stop);
1516 cfs_waitq_init(&obd_zombie_waitq);
1519 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1523 wait_for_completion(&obd_zombie_start);
1526 obd_zombie_impexp_work_cb =
1527 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1528 &obd_zombie_impexp_kill, NULL);
1530 obd_zombie_impexp_idle_cb =
1531 liblustre_register_idle_callback("obd_zombi_impexp_check",
1532 &obd_zombie_impexp_check, NULL);
1539 * stop destroy zombie import/export thread
1541 void obd_zombie_impexp_stop(void)
1543 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1544 obd_zombie_impexp_notify();
1546 wait_for_completion(&obd_zombie_stop);
1548 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1549 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);