1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = lu_device_type_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 lu_device_type_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 /* Iterate the obd_device list looking devices have grp_uuid. Start
510 searching at *next, and if a device is found, the next index to look
511 at is saved in *next. If next is NULL, then the first matching device
512 will always be returned. */
513 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
519 else if (*next >= 0 && *next < class_devno_max())
524 spin_lock(&obd_dev_lock);
525 for (; i < class_devno_max(); i++) {
526 struct obd_device *obd = class_num2obd(i);
529 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532 spin_unlock(&obd_dev_lock);
536 spin_unlock(&obd_dev_lock);
542 void obd_cleanup_caches(void)
547 if (obd_device_cachep) {
548 rc = cfs_mem_cache_destroy(obd_device_cachep);
549 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
550 obd_device_cachep = NULL;
553 rc = cfs_mem_cache_destroy(obdo_cachep);
554 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
558 rc = cfs_mem_cache_destroy(import_cachep);
559 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
560 import_cachep = NULL;
563 rc = cfs_mem_cache_destroy(capa_cachep);
564 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
570 int obd_init_caches(void)
574 LASSERT(obd_device_cachep == NULL);
575 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
576 sizeof(struct obd_device),
578 if (!obd_device_cachep)
581 LASSERT(obdo_cachep == NULL);
582 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
587 LASSERT(import_cachep == NULL);
588 import_cachep = cfs_mem_cache_create("ll_import_cache",
589 sizeof(struct obd_import),
594 LASSERT(capa_cachep == NULL);
595 capa_cachep = cfs_mem_cache_create("capa_cache",
596 sizeof(struct obd_capa), 0, 0);
602 obd_cleanup_caches();
607 /* map connection to client */
608 struct obd_export *class_conn2export(struct lustre_handle *conn)
610 struct obd_export *export;
614 CDEBUG(D_CACHE, "looking for null handle\n");
618 if (conn->cookie == -1) { /* this means assign a new connection */
619 CDEBUG(D_CACHE, "want a new connection\n");
623 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
624 export = class_handle2object(conn->cookie);
628 struct obd_device *class_exp2obd(struct obd_export *exp)
635 struct obd_device *class_conn2obd(struct lustre_handle *conn)
637 struct obd_export *export;
638 export = class_conn2export(conn);
640 struct obd_device *obd = export->exp_obd;
641 class_export_put(export);
647 struct obd_import *class_exp2cliimp(struct obd_export *exp)
649 struct obd_device *obd = exp->exp_obd;
652 return obd->u.cli.cl_import;
655 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
657 struct obd_device *obd = class_conn2obd(conn);
660 return obd->u.cli.cl_import;
663 /* Export management functions */
664 static void export_handle_addref(void *export)
666 class_export_get(export);
669 struct obd_export *class_export_get(struct obd_export *exp)
671 atomic_inc(&exp->exp_refcount);
672 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
673 atomic_read(&exp->exp_refcount));
676 EXPORT_SYMBOL(class_export_get);
678 void class_export_put(struct obd_export *exp)
680 LASSERT(exp != NULL);
681 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
682 atomic_read(&exp->exp_refcount) - 1);
683 LASSERT(atomic_read(&exp->exp_refcount) > 0);
684 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
686 if (atomic_dec_and_test(&exp->exp_refcount)) {
687 LASSERT (list_empty(&exp->exp_obd_chain));
689 CDEBUG(D_IOCTL, "final put %p/%s\n",
690 exp, exp->exp_client_uuid.uuid);
692 spin_lock(&obd_zombie_impexp_lock);
693 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
694 spin_unlock(&obd_zombie_impexp_lock);
696 if (obd_zombie_impexp_notify != NULL)
697 obd_zombie_impexp_notify();
700 EXPORT_SYMBOL(class_export_put);
702 static void class_export_destroy(struct obd_export *exp)
704 struct obd_device *obd = exp->exp_obd;
707 LASSERT (atomic_read(&exp->exp_refcount) == 0);
709 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
710 exp->exp_client_uuid.uuid, obd->obd_name);
712 LASSERT(obd != NULL);
714 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
715 if (exp->exp_connection)
716 ptlrpc_put_connection_superhack(exp->exp_connection);
718 LASSERT(list_empty(&exp->exp_outstanding_replies));
719 LASSERT(list_empty(&exp->exp_req_replay_queue));
720 obd_destroy_export(exp);
721 class_decref(obd, "export", exp);
723 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
727 /* Creates a new export, adds it to the hash table, and returns a
728 * pointer to it. The refcount is 2: one for the hash reference, and
729 * one for the pointer returned by this function. */
730 struct obd_export *class_new_export(struct obd_device *obd,
731 struct obd_uuid *cluuid)
733 struct obd_export *export;
736 OBD_ALLOC_PTR(export);
738 return ERR_PTR(-ENOMEM);
740 export->exp_conn_cnt = 0;
741 export->exp_lock_hash = NULL;
742 atomic_set(&export->exp_refcount, 2);
743 atomic_set(&export->exp_rpc_count, 0);
744 export->exp_obd = obd;
745 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
746 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
747 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
748 class_handle_hash(&export->exp_handle, export_handle_addref);
749 export->exp_last_request_time = cfs_time_current_sec();
750 spin_lock_init(&export->exp_lock);
751 INIT_HLIST_NODE(&export->exp_uuid_hash);
752 INIT_HLIST_NODE(&export->exp_nid_hash);
754 export->exp_sp_peer = LUSTRE_SP_ANY;
755 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
756 export->exp_client_uuid = *cluuid;
757 obd_init_export(export);
759 spin_lock(&obd->obd_dev_lock);
760 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
761 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
762 &export->exp_uuid_hash);
764 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
765 obd->obd_name, cluuid->uuid, rc);
766 spin_unlock(&obd->obd_dev_lock);
767 class_handle_unhash(&export->exp_handle);
768 OBD_FREE_PTR(export);
769 return ERR_PTR(-EALREADY);
773 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
774 class_incref(obd, "export", export);
775 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
776 list_add_tail(&export->exp_obd_chain_timed,
777 &export->exp_obd->obd_exports_timed);
778 export->exp_obd->obd_num_exports++;
779 spin_unlock(&obd->obd_dev_lock);
783 EXPORT_SYMBOL(class_new_export);
785 void class_unlink_export(struct obd_export *exp)
787 class_handle_unhash(&exp->exp_handle);
789 spin_lock(&exp->exp_obd->obd_dev_lock);
790 /* delete an uuid-export hashitem from hashtables */
791 if (!hlist_unhashed(&exp->exp_uuid_hash))
792 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
793 &exp->exp_client_uuid,
794 &exp->exp_uuid_hash);
796 list_del_init(&exp->exp_obd_chain);
797 list_del_init(&exp->exp_obd_chain_timed);
798 exp->exp_obd->obd_num_exports--;
799 spin_unlock(&exp->exp_obd->obd_dev_lock);
801 class_export_put(exp);
803 EXPORT_SYMBOL(class_unlink_export);
805 /* Import management functions */
806 static void import_handle_addref(void *import)
808 class_import_get(import);
811 struct obd_import *class_import_get(struct obd_import *import)
813 LASSERT(atomic_read(&import->imp_refcount) >= 0);
814 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
815 atomic_inc(&import->imp_refcount);
816 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
817 atomic_read(&import->imp_refcount));
820 EXPORT_SYMBOL(class_import_get);
822 void class_import_put(struct obd_import *import)
826 LASSERT(atomic_read(&import->imp_refcount) > 0);
827 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
828 LASSERT(list_empty(&import->imp_zombie_chain));
830 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
831 atomic_read(&import->imp_refcount) - 1);
833 if (atomic_dec_and_test(&import->imp_refcount)) {
835 CDEBUG(D_INFO, "final put import %p\n", import);
837 spin_lock(&obd_zombie_impexp_lock);
838 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
839 spin_unlock(&obd_zombie_impexp_lock);
841 if (obd_zombie_impexp_notify != NULL)
842 obd_zombie_impexp_notify();
847 EXPORT_SYMBOL(class_import_put);
849 void class_import_destroy(struct obd_import *import)
853 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
854 import->imp_obd->obd_name);
856 LASSERT(atomic_read(&import->imp_refcount) == 0);
858 ptlrpc_put_connection_superhack(import->imp_connection);
860 while (!list_empty(&import->imp_conn_list)) {
861 struct obd_import_conn *imp_conn;
863 imp_conn = list_entry(import->imp_conn_list.next,
864 struct obd_import_conn, oic_item);
865 list_del(&imp_conn->oic_item);
866 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
867 OBD_FREE(imp_conn, sizeof(*imp_conn));
870 LASSERT(import->imp_sec == NULL);
871 class_decref(import->imp_obd, "import", import);
872 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
876 static void init_imp_at(struct imp_at *at) {
878 at_init(&at->iat_net_latency, 0, 0);
879 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
880 /* max service estimates are tracked on the server side, so
881 don't use the AT history here, just use the last reported
882 val. (But keep hist for proc histogram, worst_ever) */
883 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
888 struct obd_import *class_new_import(struct obd_device *obd)
890 struct obd_import *imp;
892 OBD_ALLOC(imp, sizeof(*imp));
896 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
897 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
898 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
899 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
900 spin_lock_init(&imp->imp_lock);
901 imp->imp_last_success_conn = 0;
902 imp->imp_state = LUSTRE_IMP_NEW;
903 imp->imp_obd = class_incref(obd, "import", imp);
904 sema_init(&imp->imp_sec_mutex, 1);
905 cfs_waitq_init(&imp->imp_recovery_waitq);
907 atomic_set(&imp->imp_refcount, 2);
908 atomic_set(&imp->imp_unregistering, 0);
909 atomic_set(&imp->imp_inflight, 0);
910 atomic_set(&imp->imp_replay_inflight, 0);
911 atomic_set(&imp->imp_inval_count, 0);
912 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
913 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
914 class_handle_hash(&imp->imp_handle, import_handle_addref);
915 init_imp_at(&imp->imp_at);
917 /* the default magic is V2, will be used in connect RPC, and
918 * then adjusted according to the flags in request/reply. */
919 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
923 EXPORT_SYMBOL(class_new_import);
925 void class_destroy_import(struct obd_import *import)
927 LASSERT(import != NULL);
928 LASSERT(import != LP_POISON);
930 class_handle_unhash(&import->imp_handle);
932 spin_lock(&import->imp_lock);
933 import->imp_generation++;
934 spin_unlock(&import->imp_lock);
935 class_import_put(import);
937 EXPORT_SYMBOL(class_destroy_import);
939 /* A connection defines an export context in which preallocation can
940 be managed. This releases the export pointer reference, and returns
941 the export handle, so the export refcount is 1 when this function
943 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
944 struct obd_uuid *cluuid)
946 struct obd_export *export;
947 LASSERT(conn != NULL);
948 LASSERT(obd != NULL);
949 LASSERT(cluuid != NULL);
952 export = class_new_export(obd, cluuid);
954 RETURN(PTR_ERR(export));
956 conn->cookie = export->exp_handle.h_cookie;
957 class_export_put(export);
959 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
960 cluuid->uuid, conn->cookie);
963 EXPORT_SYMBOL(class_connect);
965 /* if export is involved in recovery then clean up related things */
966 void class_export_recovery_cleanup(struct obd_export *exp)
968 struct obd_device *obd = exp->exp_obd;
970 spin_lock_bh(&obd->obd_processing_task_lock);
971 if (obd->obd_recovering && exp->exp_in_recovery) {
972 spin_lock(&exp->exp_lock);
973 exp->exp_in_recovery = 0;
974 spin_unlock(&exp->exp_lock);
975 obd->obd_connected_clients--;
976 /* each connected client is counted as recoverable */
977 obd->obd_recoverable_clients--;
978 if (exp->exp_req_replay_needed) {
979 spin_lock(&exp->exp_lock);
980 exp->exp_req_replay_needed = 0;
981 spin_unlock(&exp->exp_lock);
982 LASSERT(atomic_read(&obd->obd_req_replay_clients));
983 atomic_dec(&obd->obd_req_replay_clients);
985 if (exp->exp_lock_replay_needed) {
986 spin_lock(&exp->exp_lock);
987 exp->exp_lock_replay_needed = 0;
988 spin_unlock(&exp->exp_lock);
989 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
990 atomic_dec(&obd->obd_lock_replay_clients);
993 spin_unlock_bh(&obd->obd_processing_task_lock);
996 /* This function removes two references from the export: one for the
997 * hash entry and one for the export pointer passed in. The export
998 * pointer passed to this function is destroyed should not be used
1000 int class_disconnect(struct obd_export *export)
1002 int already_disconnected;
1005 if (export == NULL) {
1007 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1011 spin_lock(&export->exp_lock);
1012 already_disconnected = export->exp_disconnected;
1013 export->exp_disconnected = 1;
1015 if (!hlist_unhashed(&export->exp_nid_hash))
1016 lustre_hash_del(export->exp_obd->obd_nid_hash,
1017 &export->exp_connection->c_peer.nid,
1018 &export->exp_nid_hash);
1020 spin_unlock(&export->exp_lock);
1022 /* class_cleanup(), abort_recovery(), and class_fail_export()
1023 * all end up in here, and if any of them race we shouldn't
1024 * call extra class_export_puts(). */
1025 if (already_disconnected)
1028 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1029 export->exp_handle.h_cookie);
1031 class_export_recovery_cleanup(export);
1032 class_unlink_export(export);
1033 class_export_put(export);
1037 static void class_disconnect_export_list(struct list_head *list, int flags)
1040 struct lustre_handle fake_conn;
1041 struct obd_export *fake_exp, *exp;
1044 /* It's possible that an export may disconnect itself, but
1045 * nothing else will be added to this list. */
1046 while (!list_empty(list)) {
1047 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1048 class_export_get(exp);
1050 spin_lock(&exp->exp_lock);
1051 exp->exp_flags = flags;
1052 spin_unlock(&exp->exp_lock);
1054 if (obd_uuid_equals(&exp->exp_client_uuid,
1055 &exp->exp_obd->obd_uuid)) {
1057 "exp %p export uuid == obd uuid, don't discon\n",
1059 /* Need to delete this now so we don't end up pointing
1060 * to work_list later when this export is cleaned up. */
1061 list_del_init(&exp->exp_obd_chain);
1062 class_export_put(exp);
1066 fake_conn.cookie = exp->exp_handle.h_cookie;
1067 fake_exp = class_conn2export(&fake_conn);
1069 class_export_put(exp);
1073 spin_lock(&fake_exp->exp_lock);
1074 fake_exp->exp_flags = flags;
1075 spin_unlock(&fake_exp->exp_lock);
1077 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1078 "last request at "CFS_TIME_T"\n",
1079 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1080 exp, exp->exp_last_request_time);
1081 rc = obd_disconnect(fake_exp);
1082 class_export_put(exp);
1087 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1089 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1090 (obd->obd_force ? OBD_OPT_FORCE : 0));
1093 void class_disconnect_exports(struct obd_device *obd)
1095 struct list_head work_list;
1098 /* Move all of the exports from obd_exports to a work list, en masse. */
1099 spin_lock(&obd->obd_dev_lock);
1100 list_add(&work_list, &obd->obd_exports);
1101 list_del_init(&obd->obd_exports);
1102 spin_unlock(&obd->obd_dev_lock);
1104 if (!list_empty(&work_list)) {
1105 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1106 "disconnecting them\n", obd->obd_minor, obd);
1107 class_disconnect_export_list(&work_list,
1108 get_exp_flags_from_obd(obd));
1110 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1111 obd->obd_minor, obd);
1114 EXPORT_SYMBOL(class_disconnect_exports);
1116 /* Remove exports that have not completed recovery.
1118 int class_disconnect_stale_exports(struct obd_device *obd,
1119 int (*test_export)(struct obd_export *))
1121 struct list_head work_list;
1122 struct list_head *pos, *n;
1123 struct obd_export *exp;
1127 CFS_INIT_LIST_HEAD(&work_list);
1128 spin_lock(&obd->obd_dev_lock);
1129 list_for_each_safe(pos, n, &obd->obd_exports) {
1130 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1131 if (test_export(exp))
1134 list_del(&exp->exp_obd_chain);
1135 list_add(&exp->exp_obd_chain, &work_list);
1136 /* don't count self-export as client */
1137 if (obd_uuid_equals(&exp->exp_client_uuid,
1138 &exp->exp_obd->obd_uuid))
1142 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1143 obd->obd_name, exp->exp_client_uuid.uuid,
1144 exp->exp_connection == NULL ? "<unknown>" :
1145 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1147 spin_unlock(&obd->obd_dev_lock);
1149 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1150 obd->obd_name, cnt);
1151 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1154 EXPORT_SYMBOL(class_disconnect_stale_exports);
1156 void class_fail_export(struct obd_export *exp)
1158 int rc, already_failed;
1160 spin_lock(&exp->exp_lock);
1161 already_failed = exp->exp_failed;
1162 exp->exp_failed = 1;
1163 spin_unlock(&exp->exp_lock);
1165 if (already_failed) {
1166 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1167 exp, exp->exp_client_uuid.uuid);
1171 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1172 exp, exp->exp_client_uuid.uuid);
1174 if (obd_dump_on_timeout)
1175 libcfs_debug_dumplog();
1177 /* Most callers into obd_disconnect are removing their own reference
1178 * (request, for example) in addition to the one from the hash table.
1179 * We don't have such a reference here, so make one. */
1180 class_export_get(exp);
1181 rc = obd_disconnect(exp);
1183 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1185 CDEBUG(D_HA, "disconnected export %p/%s\n",
1186 exp, exp->exp_client_uuid.uuid);
1188 EXPORT_SYMBOL(class_fail_export);
1190 char *obd_export_nid2str(struct obd_export *exp)
1192 if (exp->exp_connection != NULL)
1193 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1197 EXPORT_SYMBOL(obd_export_nid2str);
1199 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1201 struct obd_export *doomed_exp = NULL;
1202 int exports_evicted = 0;
1204 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1207 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1208 if (doomed_exp == NULL)
1211 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1212 "nid %s found, wanted nid %s, requested nid %s\n",
1213 obd_export_nid2str(doomed_exp),
1214 libcfs_nid2str(nid_key), nid);
1215 LASSERTF(doomed_exp != obd->obd_self_export,
1216 "self-export is hashed by NID?\n");
1218 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1219 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1221 class_fail_export(doomed_exp);
1222 class_export_put(doomed_exp);
1225 if (!exports_evicted)
1226 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1227 obd->obd_name, nid);
1228 return exports_evicted;
1230 EXPORT_SYMBOL(obd_export_evict_by_nid);
1232 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1234 struct obd_export *doomed_exp = NULL;
1235 struct obd_uuid doomed_uuid;
1236 int exports_evicted = 0;
1238 obd_str2uuid(&doomed_uuid, uuid);
1239 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1240 CERROR("%s: can't evict myself\n", obd->obd_name);
1241 return exports_evicted;
1244 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1246 if (doomed_exp == NULL) {
1247 CERROR("%s: can't disconnect %s: no exports found\n",
1248 obd->obd_name, uuid);
1250 CWARN("%s: evicting %s at adminstrative request\n",
1251 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1252 class_fail_export(doomed_exp);
1253 class_export_put(doomed_exp);
1257 return exports_evicted;
1259 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1262 * kill zombie imports and exports
1264 void obd_zombie_impexp_cull(void)
1266 struct obd_import *import;
1267 struct obd_export *export;
1271 spin_lock (&obd_zombie_impexp_lock);
1274 if (!list_empty(&obd_zombie_imports)) {
1275 import = list_entry(obd_zombie_imports.next,
1278 list_del(&import->imp_zombie_chain);
1282 if (!list_empty(&obd_zombie_exports)) {
1283 export = list_entry(obd_zombie_exports.next,
1286 list_del_init(&export->exp_obd_chain);
1289 spin_unlock(&obd_zombie_impexp_lock);
1292 class_import_destroy(import);
1295 class_export_destroy(export);
1297 } while (import != NULL || export != NULL);
1301 static struct completion obd_zombie_start;
1302 static struct completion obd_zombie_stop;
1303 static unsigned long obd_zombie_flags;
1304 static cfs_waitq_t obd_zombie_waitq;
1311 * check for work for kill zombie import/export thread.
1313 static int obd_zombie_impexp_check(void *arg)
1317 spin_lock(&obd_zombie_impexp_lock);
1318 rc = list_empty(&obd_zombie_imports) &&
1319 list_empty(&obd_zombie_exports) &&
1320 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1322 spin_unlock(&obd_zombie_impexp_lock);
1328 * notify import/export destroy thread about new zombie.
1330 static void obd_zombie_impexp_notify(void)
1332 cfs_waitq_signal(&obd_zombie_waitq);
1336 * check whether obd_zombie is idle
1338 static int obd_zombie_is_idle(void)
1342 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1343 spin_lock(&obd_zombie_impexp_lock);
1344 rc = list_empty(&obd_zombie_imports) &&
1345 list_empty(&obd_zombie_exports);
1346 spin_unlock(&obd_zombie_impexp_lock);
1351 * wait when obd_zombie import/export queues become empty
1353 void obd_zombie_barrier(void)
1355 struct l_wait_info lwi = { 0 };
1357 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1359 EXPORT_SYMBOL(obd_zombie_barrier);
1364 * destroy zombie export/import thread.
1366 static int obd_zombie_impexp_thread(void *unused)
1370 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1371 complete(&obd_zombie_start);
1375 complete(&obd_zombie_start);
1377 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1378 struct l_wait_info lwi = { 0 };
1380 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1382 obd_zombie_impexp_cull();
1383 /* Notify obd_zombie_barrier callers that queues may be empty */
1384 cfs_waitq_signal(&obd_zombie_waitq);
1387 complete(&obd_zombie_stop);
1392 #else /* ! KERNEL */
1394 static atomic_t zombie_recur = ATOMIC_INIT(0);
1395 static void *obd_zombie_impexp_work_cb;
1396 static void *obd_zombie_impexp_idle_cb;
1398 int obd_zombie_impexp_kill(void *arg)
1402 if (atomic_inc_return(&zombie_recur) == 1) {
1403 obd_zombie_impexp_cull();
1406 atomic_dec(&zombie_recur);
1413 * start destroy zombie import/export thread
1415 int obd_zombie_impexp_init(void)
1419 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1420 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1421 spin_lock_init(&obd_zombie_impexp_lock);
1422 init_completion(&obd_zombie_start);
1423 init_completion(&obd_zombie_stop);
1424 cfs_waitq_init(&obd_zombie_waitq);
1427 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1431 wait_for_completion(&obd_zombie_start);
1434 obd_zombie_impexp_work_cb =
1435 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1436 &obd_zombie_impexp_kill, NULL);
1438 obd_zombie_impexp_idle_cb =
1439 liblustre_register_idle_callback("obd_zombi_impexp_check",
1440 &obd_zombie_impexp_check, NULL);
1447 * stop destroy zombie import/export thread
1449 void obd_zombie_impexp_stop(void)
1451 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1452 obd_zombie_impexp_notify();
1454 wait_for_completion(&obd_zombie_stop);
1456 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1457 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);