1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
82 EXPORT_SYMBOL(obd_device_alloc);
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
121 const char *modname = name;
122 if (!request_module(modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 spin_lock(&type->obd_type_lock);
134 try_module_get(type->typ_dt_ops->o_owner);
135 spin_unlock(&type->obd_type_lock);
140 void class_put_type(struct obd_type *type)
143 spin_lock(&type->obd_type_lock);
145 module_put(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
149 #define CLASS_MAX_NAME 1024
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152 struct lprocfs_vars *vars, const char *name,
153 struct lu_device_type *ldt)
155 struct obd_type *type;
160 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162 if (class_search_type(name)) {
163 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168 OBD_ALLOC(type, sizeof(*type));
172 OBD_ALLOC_PTR(type->typ_dt_ops);
173 OBD_ALLOC_PTR(type->typ_md_ops);
174 OBD_ALLOC(type->typ_name, strlen(name) + 1);
176 if (type->typ_dt_ops == NULL ||
177 type->typ_md_ops == NULL ||
178 type->typ_name == NULL)
181 *(type->typ_dt_ops) = *dt_ops;
182 /* md_ops is optional */
184 *(type->typ_md_ops) = *md_ops;
185 strcpy(type->typ_name, name);
186 spin_lock_init(&type->obd_type_lock);
189 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191 if (IS_ERR(type->typ_procroot)) {
192 rc = PTR_ERR(type->typ_procroot);
193 type->typ_procroot = NULL;
199 rc = lu_device_type_init(ldt);
204 spin_lock(&obd_types_lock);
205 list_add(&type->typ_chain, &obd_types);
206 spin_unlock(&obd_types_lock);
211 if (type->typ_name != NULL)
212 OBD_FREE(type->typ_name, strlen(name) + 1);
213 if (type->typ_md_ops != NULL)
214 OBD_FREE_PTR(type->typ_md_ops);
215 if (type->typ_dt_ops != NULL)
216 OBD_FREE_PTR(type->typ_dt_ops);
217 OBD_FREE(type, sizeof(*type));
221 int class_unregister_type(const char *name)
223 struct obd_type *type = class_search_type(name);
227 CERROR("unknown obd type\n");
231 if (type->typ_refcnt) {
232 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233 /* This is a bad situation, let's make the best of it */
234 /* Remove ops, but leave the name for debugging */
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE_PTR(type->typ_md_ops);
240 if (type->typ_procroot) {
241 lprocfs_remove(&type->typ_procroot);
245 lu_device_type_fini(type->typ_lu);
247 spin_lock(&obd_types_lock);
248 list_del(&type->typ_chain);
249 spin_unlock(&obd_types_lock);
250 OBD_FREE(type->typ_name, strlen(name) + 1);
251 if (type->typ_dt_ops != NULL)
252 OBD_FREE_PTR(type->typ_dt_ops);
253 if (type->typ_md_ops != NULL)
254 OBD_FREE_PTR(type->typ_md_ops);
255 OBD_FREE(type, sizeof(*type));
257 } /* class_unregister_type */
260 * Create a new obd device.
262 * Find an empty slot in ::obd_devs[], create a new obd device in it.
264 * \param typename [in] obd device type string.
265 * \param name [in] obd device name.
267 * \retval NULL if create fails, otherwise return the obd device
270 struct obd_device *class_newdev(const char *type_name, const char *name)
272 struct obd_device *result = NULL;
273 struct obd_device *newdev;
274 struct obd_type *type = NULL;
276 int new_obd_minor = 0;
278 if (strlen(name) >= MAX_OBD_NAME) {
279 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280 RETURN(ERR_PTR(-EINVAL));
283 type = class_get_type(type_name);
285 CERROR("OBD: unknown type: %s\n", type_name);
286 RETURN(ERR_PTR(-ENODEV));
289 newdev = obd_device_alloc();
290 if (newdev == NULL) {
291 class_put_type(type);
292 RETURN(ERR_PTR(-ENOMEM));
294 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296 spin_lock(&obd_dev_lock);
297 for (i = 0; i < class_devno_max(); i++) {
298 struct obd_device *obd = class_num2obd(i);
299 if (obd && obd->obd_name &&
300 (strcmp(name, obd->obd_name) == 0)) {
301 CERROR("Device %s already exists, won't add\n", name);
303 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304 "%p obd_magic %08x != %08x\n", result,
305 result->obd_magic, OBD_DEVICE_MAGIC);
306 LASSERTF(result->obd_minor == new_obd_minor,
307 "%p obd_minor %d != %d\n", result,
308 result->obd_minor, new_obd_minor);
310 obd_devs[result->obd_minor] = NULL;
311 result->obd_name[0]='\0';
313 result = ERR_PTR(-EEXIST);
316 if (!result && !obd) {
318 result->obd_minor = i;
320 result->obd_type = type;
321 strncpy(result->obd_name, name,
322 sizeof(result->obd_name) - 1);
323 obd_devs[i] = result;
326 spin_unlock(&obd_dev_lock);
328 if (result == NULL && i >= class_devno_max()) {
329 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331 result = ERR_PTR(-EOVERFLOW);
334 if (IS_ERR(result)) {
335 obd_device_free(newdev);
336 class_put_type(type);
338 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339 result->obd_name, result);
344 void class_release_dev(struct obd_device *obd)
346 struct obd_type *obd_type = obd->obd_type;
348 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352 LASSERT(obd_type != NULL);
354 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355 obd->obd_name,obd->obd_type->typ_name);
357 spin_lock(&obd_dev_lock);
358 obd_devs[obd->obd_minor] = NULL;
359 spin_unlock(&obd_dev_lock);
360 obd_device_free(obd);
362 class_put_type(obd_type);
365 int class_name2dev(const char *name)
372 spin_lock(&obd_dev_lock);
373 for (i = 0; i < class_devno_max(); i++) {
374 struct obd_device *obd = class_num2obd(i);
375 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376 /* Make sure we finished attaching before we give
377 out any references */
378 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379 if (obd->obd_attached) {
380 spin_unlock(&obd_dev_lock);
386 spin_unlock(&obd_dev_lock);
391 struct obd_device *class_name2obd(const char *name)
393 int dev = class_name2dev(name);
395 if (dev < 0 || dev > class_devno_max())
397 return class_num2obd(dev);
400 int class_uuid2dev(struct obd_uuid *uuid)
404 spin_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
407 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409 spin_unlock(&obd_dev_lock);
413 spin_unlock(&obd_dev_lock);
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 int dev = class_uuid2dev(uuid);
423 return class_num2obd(dev);
427 * Get obd device from ::obd_devs[]
429 * \param num [in] array index
431 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432 * otherwise return the obd device there.
434 struct obd_device *class_num2obd(int num)
436 struct obd_device *obd = NULL;
438 if (num < class_devno_max()) {
443 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444 "%p obd_magic %08x != %08x\n",
445 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446 LASSERTF(obd->obd_minor == num,
447 "%p obd_minor %0d != %0d\n",
448 obd, obd->obd_minor, num);
454 void class_obd_list(void)
459 spin_lock(&obd_dev_lock);
460 for (i = 0; i < class_devno_max(); i++) {
461 struct obd_device *obd = class_num2obd(i);
464 if (obd->obd_stopping)
466 else if (obd->obd_set_up)
468 else if (obd->obd_attached)
472 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473 i, status, obd->obd_type->typ_name,
474 obd->obd_name, obd->obd_uuid.uuid,
475 atomic_read(&obd->obd_refcount));
477 spin_unlock(&obd_dev_lock);
481 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
482 specified, then only the client with that uuid is returned,
483 otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485 const char * typ_name,
486 struct obd_uuid *grp_uuid)
490 spin_lock(&obd_dev_lock);
491 for (i = 0; i < class_devno_max(); i++) {
492 struct obd_device *obd = class_num2obd(i);
495 if ((strncmp(obd->obd_type->typ_name, typ_name,
496 strlen(typ_name)) == 0)) {
497 if (obd_uuid_equals(tgt_uuid,
498 &obd->u.cli.cl_target_uuid) &&
499 ((grp_uuid)? obd_uuid_equals(grp_uuid,
500 &obd->obd_uuid) : 1)) {
501 spin_unlock(&obd_dev_lock);
506 spin_unlock(&obd_dev_lock);
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512 searching at *next, and if a device is found, the next index to look
513 at is saved in *next. If next is NULL, then the first matching device
514 will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
521 else if (*next >= 0 && *next < class_devno_max())
526 spin_lock(&obd_dev_lock);
527 for (; i < class_devno_max(); i++) {
528 struct obd_device *obd = class_num2obd(i);
531 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
534 spin_unlock(&obd_dev_lock);
538 spin_unlock(&obd_dev_lock);
544 * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545 * adjust sptlrpc settings accordingly.
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 struct obd_device *obd;
553 LASSERT(namelen > 0);
555 spin_lock(&obd_dev_lock);
556 for (i = 0; i < class_devno_max(); i++) {
557 obd = class_num2obd(i);
559 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
562 /* only notify mdc, osc, mdt, ost */
563 type = obd->obd_type->typ_name;
564 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567 strcmp(type, LUSTRE_OST_NAME) != 0)
570 if (strncmp(obd->obd_name, fsname, namelen))
573 class_incref(obd, __FUNCTION__, obd);
574 spin_unlock(&obd_dev_lock);
575 rc2 = obd_set_info_async(obd->obd_self_export,
576 sizeof(KEY_SPTLRPC_CONF),
577 KEY_SPTLRPC_CONF, 0, NULL, NULL);
579 class_decref(obd, __FUNCTION__, obd);
580 spin_lock(&obd_dev_lock);
582 spin_unlock(&obd_dev_lock);
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587 void obd_cleanup_caches(void)
592 if (obd_device_cachep) {
593 rc = cfs_mem_cache_destroy(obd_device_cachep);
594 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595 obd_device_cachep = NULL;
598 rc = cfs_mem_cache_destroy(obdo_cachep);
599 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
603 rc = cfs_mem_cache_destroy(import_cachep);
604 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605 import_cachep = NULL;
608 rc = cfs_mem_cache_destroy(capa_cachep);
609 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
615 int obd_init_caches(void)
619 LASSERT(obd_device_cachep == NULL);
620 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621 sizeof(struct obd_device),
623 if (!obd_device_cachep)
626 LASSERT(obdo_cachep == NULL);
627 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
632 LASSERT(import_cachep == NULL);
633 import_cachep = cfs_mem_cache_create("ll_import_cache",
634 sizeof(struct obd_import),
639 LASSERT(capa_cachep == NULL);
640 capa_cachep = cfs_mem_cache_create("capa_cache",
641 sizeof(struct obd_capa), 0, 0);
647 obd_cleanup_caches();
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 struct obd_export *export;
659 CDEBUG(D_CACHE, "looking for null handle\n");
663 if (conn->cookie == -1) { /* this means assign a new connection */
664 CDEBUG(D_CACHE, "want a new connection\n");
668 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669 export = class_handle2object(conn->cookie);
673 struct obd_device *class_exp2obd(struct obd_export *exp)
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 struct obd_export *export;
683 export = class_conn2export(conn);
685 struct obd_device *obd = export->exp_obd;
686 class_export_put(export);
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 struct obd_device *obd = exp->exp_obd;
697 return obd->u.cli.cl_import;
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 struct obd_device *obd = class_conn2obd(conn);
705 return obd->u.cli.cl_import;
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
711 struct obd_device *obd = exp->exp_obd;
714 LASSERT (atomic_read(&exp->exp_refcount) == 0);
716 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717 exp->exp_client_uuid.uuid, obd->obd_name);
719 LASSERT(obd != NULL);
721 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722 if (exp->exp_connection)
723 ptlrpc_put_connection_superhack(exp->exp_connection);
725 LASSERT(list_empty(&exp->exp_outstanding_replies));
726 LASSERT(list_empty(&exp->exp_req_replay_queue));
727 LASSERT(list_empty(&exp->exp_queued_rpc));
728 obd_destroy_export(exp);
729 class_decref(obd, "export", exp);
731 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
735 static void export_handle_addref(void *export)
737 class_export_get(export);
740 struct obd_export *class_export_get(struct obd_export *exp)
742 atomic_inc(&exp->exp_refcount);
743 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
744 atomic_read(&exp->exp_refcount));
747 EXPORT_SYMBOL(class_export_get);
749 void class_export_put(struct obd_export *exp)
751 LASSERT(exp != NULL);
752 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
753 atomic_read(&exp->exp_refcount) - 1);
754 LASSERT(atomic_read(&exp->exp_refcount) > 0);
755 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
757 if (atomic_dec_and_test(&exp->exp_refcount)) {
758 CDEBUG(D_IOCTL, "final put %p/%s\n",
759 exp, exp->exp_client_uuid.uuid);
760 obd_zombie_export_add(exp);
763 EXPORT_SYMBOL(class_export_put);
765 /* Creates a new export, adds it to the hash table, and returns a
766 * pointer to it. The refcount is 2: one for the hash reference, and
767 * one for the pointer returned by this function. */
768 struct obd_export *class_new_export(struct obd_device *obd,
769 struct obd_uuid *cluuid)
771 struct obd_export *export;
774 OBD_ALLOC_PTR(export);
776 return ERR_PTR(-ENOMEM);
778 export->exp_conn_cnt = 0;
779 export->exp_lock_hash = NULL;
780 atomic_set(&export->exp_refcount, 2);
781 atomic_set(&export->exp_rpc_count, 0);
782 export->exp_obd = obd;
783 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
784 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
785 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
786 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
787 class_handle_hash(&export->exp_handle, export_handle_addref);
788 export->exp_last_request_time = cfs_time_current_sec();
789 spin_lock_init(&export->exp_lock);
790 INIT_HLIST_NODE(&export->exp_uuid_hash);
791 INIT_HLIST_NODE(&export->exp_nid_hash);
793 export->exp_sp_peer = LUSTRE_SP_ANY;
794 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
795 export->exp_client_uuid = *cluuid;
796 obd_init_export(export);
798 spin_lock(&obd->obd_dev_lock);
799 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
800 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
801 &export->exp_uuid_hash);
803 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
804 obd->obd_name, cluuid->uuid, rc);
805 spin_unlock(&obd->obd_dev_lock);
806 class_handle_unhash(&export->exp_handle);
807 OBD_FREE_PTR(export);
808 return ERR_PTR(-EALREADY);
812 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
813 class_incref(obd, "export", export);
814 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
815 list_add_tail(&export->exp_obd_chain_timed,
816 &export->exp_obd->obd_exports_timed);
817 export->exp_obd->obd_num_exports++;
818 spin_unlock(&obd->obd_dev_lock);
822 EXPORT_SYMBOL(class_new_export);
824 void class_unlink_export(struct obd_export *exp)
826 class_handle_unhash(&exp->exp_handle);
828 spin_lock(&exp->exp_obd->obd_dev_lock);
829 /* delete an uuid-export hashitem from hashtables */
830 if (!hlist_unhashed(&exp->exp_uuid_hash))
831 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
832 &exp->exp_client_uuid,
833 &exp->exp_uuid_hash);
835 list_del_init(&exp->exp_obd_chain);
836 list_del_init(&exp->exp_obd_chain_timed);
837 exp->exp_obd->obd_num_exports--;
838 spin_unlock(&exp->exp_obd->obd_dev_lock);
840 class_export_put(exp);
842 EXPORT_SYMBOL(class_unlink_export);
844 /* Import management functions */
845 void class_import_destroy(struct obd_import *imp)
849 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
850 imp->imp_obd->obd_name);
852 LASSERT(atomic_read(&imp->imp_refcount) == 0);
854 ptlrpc_put_connection_superhack(imp->imp_connection);
856 while (!list_empty(&imp->imp_conn_list)) {
857 struct obd_import_conn *imp_conn;
859 imp_conn = list_entry(imp->imp_conn_list.next,
860 struct obd_import_conn, oic_item);
861 list_del_init(&imp_conn->oic_item);
862 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
863 OBD_FREE(imp_conn, sizeof(*imp_conn));
866 LASSERT(imp->imp_sec == NULL);
867 class_decref(imp->imp_obd, "import", imp);
868 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
872 static void import_handle_addref(void *import)
874 class_import_get(import);
877 struct obd_import *class_import_get(struct obd_import *import)
879 LASSERT(atomic_read(&import->imp_refcount) >= 0);
880 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
881 atomic_inc(&import->imp_refcount);
882 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
883 atomic_read(&import->imp_refcount),
884 import->imp_obd->obd_name);
887 EXPORT_SYMBOL(class_import_get);
889 void class_import_put(struct obd_import *imp)
893 LASSERT(atomic_read(&imp->imp_refcount) > 0);
894 LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
895 LASSERT(list_empty(&imp->imp_zombie_chain));
897 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
898 atomic_read(&imp->imp_refcount) - 1,
899 imp->imp_obd->obd_name);
901 if (atomic_dec_and_test(&imp->imp_refcount)) {
902 CDEBUG(D_INFO, "final put import %p\n", imp);
903 obd_zombie_import_add(imp);
908 EXPORT_SYMBOL(class_import_put);
910 static void init_imp_at(struct imp_at *at) {
912 at_init(&at->iat_net_latency, 0, 0);
913 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
914 /* max service estimates are tracked on the server side, so
915 don't use the AT history here, just use the last reported
916 val. (But keep hist for proc histogram, worst_ever) */
917 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
922 struct obd_import *class_new_import(struct obd_device *obd)
924 struct obd_import *imp;
926 OBD_ALLOC(imp, sizeof(*imp));
930 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
931 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
932 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
933 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
934 spin_lock_init(&imp->imp_lock);
935 imp->imp_last_success_conn = 0;
936 imp->imp_state = LUSTRE_IMP_NEW;
937 imp->imp_obd = class_incref(obd, "import", imp);
938 sema_init(&imp->imp_sec_mutex, 1);
939 cfs_waitq_init(&imp->imp_recovery_waitq);
941 atomic_set(&imp->imp_refcount, 2);
942 atomic_set(&imp->imp_unregistering, 0);
943 atomic_set(&imp->imp_inflight, 0);
944 atomic_set(&imp->imp_replay_inflight, 0);
945 atomic_set(&imp->imp_inval_count, 0);
946 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
947 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
948 class_handle_hash(&imp->imp_handle, import_handle_addref);
949 init_imp_at(&imp->imp_at);
951 /* the default magic is V2, will be used in connect RPC, and
952 * then adjusted according to the flags in request/reply. */
953 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
957 EXPORT_SYMBOL(class_new_import);
959 void class_destroy_import(struct obd_import *import)
961 LASSERT(import != NULL);
962 LASSERT(import != LP_POISON);
964 class_handle_unhash(&import->imp_handle);
966 spin_lock(&import->imp_lock);
967 import->imp_generation++;
968 spin_unlock(&import->imp_lock);
969 class_import_put(import);
971 EXPORT_SYMBOL(class_destroy_import);
973 /* A connection defines an export context in which preallocation can
974 be managed. This releases the export pointer reference, and returns
975 the export handle, so the export refcount is 1 when this function
977 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
978 struct obd_uuid *cluuid)
980 struct obd_export *export;
981 LASSERT(conn != NULL);
982 LASSERT(obd != NULL);
983 LASSERT(cluuid != NULL);
986 export = class_new_export(obd, cluuid);
988 RETURN(PTR_ERR(export));
990 conn->cookie = export->exp_handle.h_cookie;
991 class_export_put(export);
993 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
994 cluuid->uuid, conn->cookie);
997 EXPORT_SYMBOL(class_connect);
999 /* if export is involved in recovery then clean up related things */
1000 void class_export_recovery_cleanup(struct obd_export *exp)
1002 struct obd_device *obd = exp->exp_obd;
1004 spin_lock_bh(&obd->obd_processing_task_lock);
1005 if (obd->obd_recovering && exp->exp_in_recovery) {
1006 spin_lock(&exp->exp_lock);
1007 exp->exp_in_recovery = 0;
1008 spin_unlock(&exp->exp_lock);
1009 obd->obd_connected_clients--;
1010 /* each connected client is counted as recoverable */
1011 obd->obd_recoverable_clients--;
1012 if (exp->exp_req_replay_needed) {
1013 spin_lock(&exp->exp_lock);
1014 exp->exp_req_replay_needed = 0;
1015 spin_unlock(&exp->exp_lock);
1016 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1017 atomic_dec(&obd->obd_req_replay_clients);
1019 if (exp->exp_lock_replay_needed) {
1020 spin_lock(&exp->exp_lock);
1021 exp->exp_lock_replay_needed = 0;
1022 spin_unlock(&exp->exp_lock);
1023 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1024 atomic_dec(&obd->obd_lock_replay_clients);
1027 spin_unlock_bh(&obd->obd_processing_task_lock);
1030 /* This function removes two references from the export: one for the
1031 * hash entry and one for the export pointer passed in. The export
1032 * pointer passed to this function is destroyed should not be used
1034 int class_disconnect(struct obd_export *export)
1036 int already_disconnected;
1039 if (export == NULL) {
1041 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1045 spin_lock(&export->exp_lock);
1046 already_disconnected = export->exp_disconnected;
1047 export->exp_disconnected = 1;
1049 if (!hlist_unhashed(&export->exp_nid_hash))
1050 lustre_hash_del(export->exp_obd->obd_nid_hash,
1051 &export->exp_connection->c_peer.nid,
1052 &export->exp_nid_hash);
1054 spin_unlock(&export->exp_lock);
1056 /* class_cleanup(), abort_recovery(), and class_fail_export()
1057 * all end up in here, and if any of them race we shouldn't
1058 * call extra class_export_puts(). */
1059 if (already_disconnected)
1062 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1063 export->exp_handle.h_cookie);
1065 class_export_recovery_cleanup(export);
1066 class_unlink_export(export);
1067 class_export_put(export);
1071 static void class_disconnect_export_list(struct list_head *list,
1072 enum obd_option flags)
1075 struct lustre_handle fake_conn;
1076 struct obd_export *fake_exp, *exp;
1079 /* It's possible that an export may disconnect itself, but
1080 * nothing else will be added to this list. */
1081 while (!list_empty(list)) {
1082 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1083 class_export_get(exp);
1085 spin_lock(&exp->exp_lock);
1086 exp->exp_flags = flags;
1087 spin_unlock(&exp->exp_lock);
1089 if (obd_uuid_equals(&exp->exp_client_uuid,
1090 &exp->exp_obd->obd_uuid)) {
1092 "exp %p export uuid == obd uuid, don't discon\n",
1094 /* Need to delete this now so we don't end up pointing
1095 * to work_list later when this export is cleaned up. */
1096 list_del_init(&exp->exp_obd_chain);
1097 class_export_put(exp);
1101 fake_conn.cookie = exp->exp_handle.h_cookie;
1102 fake_exp = class_conn2export(&fake_conn);
1104 class_export_put(exp);
1108 spin_lock(&fake_exp->exp_lock);
1109 fake_exp->exp_flags = flags;
1110 spin_unlock(&fake_exp->exp_lock);
1112 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1113 "last request at "CFS_TIME_T"\n",
1114 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1115 exp, exp->exp_last_request_time);
1116 rc = obd_disconnect(fake_exp);
1117 class_export_put(exp);
1122 void class_disconnect_exports(struct obd_device *obd)
1124 struct list_head work_list;
1127 /* Move all of the exports from obd_exports to a work list, en masse. */
1128 spin_lock(&obd->obd_dev_lock);
1129 list_add(&work_list, &obd->obd_exports);
1130 list_del_init(&obd->obd_exports);
1131 spin_unlock(&obd->obd_dev_lock);
1133 if (!list_empty(&work_list)) {
1134 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1135 "disconnecting them\n", obd->obd_minor, obd);
1136 class_disconnect_export_list(&work_list,
1137 exp_flags_from_obd(obd));
1139 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1140 obd->obd_minor, obd);
1143 EXPORT_SYMBOL(class_disconnect_exports);
1145 /* Remove exports that have not completed recovery.
1147 int class_disconnect_stale_exports(struct obd_device *obd,
1148 int (*test_export)(struct obd_export *),
1149 enum obd_option flags)
1151 struct list_head work_list;
1152 struct list_head *pos, *n;
1153 struct obd_export *exp;
1157 CFS_INIT_LIST_HEAD(&work_list);
1158 spin_lock(&obd->obd_dev_lock);
1159 list_for_each_safe(pos, n, &obd->obd_exports) {
1160 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1161 if (test_export(exp))
1164 list_del(&exp->exp_obd_chain);
1165 list_add(&exp->exp_obd_chain, &work_list);
1166 /* don't count self-export as client */
1167 if (obd_uuid_equals(&exp->exp_client_uuid,
1168 &exp->exp_obd->obd_uuid))
1172 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1173 obd->obd_name, exp->exp_client_uuid.uuid,
1174 exp->exp_connection == NULL ? "<unknown>" :
1175 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1177 spin_unlock(&obd->obd_dev_lock);
1179 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1180 obd->obd_name, cnt);
1181 class_disconnect_export_list(&work_list, flags);
1184 EXPORT_SYMBOL(class_disconnect_stale_exports);
1186 void class_fail_export(struct obd_export *exp)
1188 int rc, already_failed;
1190 spin_lock(&exp->exp_lock);
1191 already_failed = exp->exp_failed;
1192 exp->exp_failed = 1;
1193 spin_unlock(&exp->exp_lock);
1195 if (already_failed) {
1196 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1197 exp, exp->exp_client_uuid.uuid);
1201 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1202 exp, exp->exp_client_uuid.uuid);
1204 if (obd_dump_on_timeout)
1205 libcfs_debug_dumplog();
1207 /* Most callers into obd_disconnect are removing their own reference
1208 * (request, for example) in addition to the one from the hash table.
1209 * We don't have such a reference here, so make one. */
1210 class_export_get(exp);
1211 rc = obd_disconnect(exp);
1213 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1215 CDEBUG(D_HA, "disconnected export %p/%s\n",
1216 exp, exp->exp_client_uuid.uuid);
1218 EXPORT_SYMBOL(class_fail_export);
1220 char *obd_export_nid2str(struct obd_export *exp)
1222 if (exp->exp_connection != NULL)
1223 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1227 EXPORT_SYMBOL(obd_export_nid2str);
1229 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1231 struct obd_export *doomed_exp = NULL;
1232 int exports_evicted = 0;
1234 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1237 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1238 if (doomed_exp == NULL)
1241 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1242 "nid %s found, wanted nid %s, requested nid %s\n",
1243 obd_export_nid2str(doomed_exp),
1244 libcfs_nid2str(nid_key), nid);
1245 LASSERTF(doomed_exp != obd->obd_self_export,
1246 "self-export is hashed by NID?\n");
1248 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1249 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1251 class_fail_export(doomed_exp);
1252 class_export_put(doomed_exp);
1255 if (!exports_evicted)
1256 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1257 obd->obd_name, nid);
1258 return exports_evicted;
1260 EXPORT_SYMBOL(obd_export_evict_by_nid);
1262 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1264 struct obd_export *doomed_exp = NULL;
1265 struct obd_uuid doomed_uuid;
1266 int exports_evicted = 0;
1268 obd_str2uuid(&doomed_uuid, uuid);
1269 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1270 CERROR("%s: can't evict myself\n", obd->obd_name);
1271 return exports_evicted;
1274 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1276 if (doomed_exp == NULL) {
1277 CERROR("%s: can't disconnect %s: no exports found\n",
1278 obd->obd_name, uuid);
1280 CWARN("%s: evicting %s at adminstrative request\n",
1281 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1282 class_fail_export(doomed_exp);
1283 class_export_put(doomed_exp);
1287 return exports_evicted;
1289 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1292 * kill zombie imports and exports
1294 void obd_zombie_impexp_cull(void)
1296 struct obd_import *import;
1297 struct obd_export *export;
1301 spin_lock(&obd_zombie_impexp_lock);
1304 if (!list_empty(&obd_zombie_imports)) {
1305 import = list_entry(obd_zombie_imports.next,
1308 list_del_init(&import->imp_zombie_chain);
1312 if (!list_empty(&obd_zombie_exports)) {
1313 export = list_entry(obd_zombie_exports.next,
1316 list_del_init(&export->exp_obd_chain);
1319 spin_unlock(&obd_zombie_impexp_lock);
1322 class_import_destroy(import);
1325 class_export_destroy(export);
1327 } while (import != NULL || export != NULL);
1331 static struct completion obd_zombie_start;
1332 static struct completion obd_zombie_stop;
1333 static unsigned long obd_zombie_flags;
1334 static cfs_waitq_t obd_zombie_waitq;
1337 OBD_ZOMBIE_STOP = 1 << 1
1341 * check for work for kill zombie import/export thread.
1343 static int obd_zombie_impexp_check(void *arg)
1347 spin_lock(&obd_zombie_impexp_lock);
1348 rc = list_empty(&obd_zombie_imports) &&
1349 list_empty(&obd_zombie_exports) &&
1350 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1352 spin_unlock(&obd_zombie_impexp_lock);
1358 * Add export to the obd_zombe thread and notify it.
1360 static void obd_zombie_export_add(struct obd_export *exp) {
1361 spin_lock(&obd_zombie_impexp_lock);
1362 LASSERT(list_empty(&exp->exp_obd_chain));
1363 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1364 spin_unlock(&obd_zombie_impexp_lock);
1366 if (obd_zombie_impexp_notify != NULL)
1367 obd_zombie_impexp_notify();
1371 * Add import to the obd_zombe thread and notify it.
1373 static void obd_zombie_import_add(struct obd_import *imp) {
1374 spin_lock(&obd_zombie_impexp_lock);
1375 LASSERT(list_empty(&imp->imp_zombie_chain));
1376 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1377 spin_unlock(&obd_zombie_impexp_lock);
1379 if (obd_zombie_impexp_notify != NULL)
1380 obd_zombie_impexp_notify();
1384 * notify import/export destroy thread about new zombie.
1386 static void obd_zombie_impexp_notify(void)
1388 cfs_waitq_signal(&obd_zombie_waitq);
1392 * check whether obd_zombie is idle
1394 static int obd_zombie_is_idle(void)
1398 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1399 spin_lock(&obd_zombie_impexp_lock);
1400 rc = list_empty(&obd_zombie_imports) &&
1401 list_empty(&obd_zombie_exports);
1402 spin_unlock(&obd_zombie_impexp_lock);
1407 * wait when obd_zombie import/export queues become empty
1409 void obd_zombie_barrier(void)
1411 struct l_wait_info lwi = { 0 };
1412 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1414 EXPORT_SYMBOL(obd_zombie_barrier);
1419 * destroy zombie export/import thread.
1421 static int obd_zombie_impexp_thread(void *unused)
1425 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1426 complete(&obd_zombie_start);
1430 complete(&obd_zombie_start);
1432 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1433 struct l_wait_info lwi = { 0 };
1435 l_wait_event(obd_zombie_waitq,
1436 !obd_zombie_impexp_check(NULL), &lwi);
1437 obd_zombie_impexp_cull();
1440 * Notify obd_zombie_barrier callers that queues
1443 cfs_waitq_signal(&obd_zombie_waitq);
1446 complete(&obd_zombie_stop);
1451 #else /* ! KERNEL */
1453 static atomic_t zombie_recur = ATOMIC_INIT(0);
1454 static void *obd_zombie_impexp_work_cb;
1455 static void *obd_zombie_impexp_idle_cb;
1457 int obd_zombie_impexp_kill(void *arg)
1461 if (atomic_inc_return(&zombie_recur) == 1) {
1462 obd_zombie_impexp_cull();
1465 atomic_dec(&zombie_recur);
1472 * start destroy zombie import/export thread
1474 int obd_zombie_impexp_init(void)
1478 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1479 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1480 spin_lock_init(&obd_zombie_impexp_lock);
1481 init_completion(&obd_zombie_start);
1482 init_completion(&obd_zombie_stop);
1483 cfs_waitq_init(&obd_zombie_waitq);
1486 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1490 wait_for_completion(&obd_zombie_start);
1493 obd_zombie_impexp_work_cb =
1494 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1495 &obd_zombie_impexp_kill, NULL);
1497 obd_zombie_impexp_idle_cb =
1498 liblustre_register_idle_callback("obd_zombi_impexp_check",
1499 &obd_zombie_impexp_check, NULL);
1505 * stop destroy zombie import/export thread
1507 void obd_zombie_impexp_stop(void)
1509 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1510 obd_zombie_impexp_notify();
1512 wait_for_completion(&obd_zombie_stop);
1514 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1515 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);