1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 cfs_spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 cfs_spin_unlock(&obd_types_lock);
111 cfs_spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
121 const char *modname = name;
122 if (!cfs_request_module("%s", modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 cfs_spin_lock(&type->obd_type_lock);
134 cfs_try_module_get(type->typ_dt_ops->o_owner);
135 cfs_spin_unlock(&type->obd_type_lock);
140 void class_put_type(struct obd_type *type)
143 cfs_spin_lock(&type->obd_type_lock);
145 cfs_module_put(type->typ_dt_ops->o_owner);
146 cfs_spin_unlock(&type->obd_type_lock);
149 #define CLASS_MAX_NAME 1024
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152 struct lprocfs_vars *vars, const char *name,
153 struct lu_device_type *ldt)
155 struct obd_type *type;
160 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162 if (class_search_type(name)) {
163 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168 OBD_ALLOC(type, sizeof(*type));
172 OBD_ALLOC_PTR(type->typ_dt_ops);
173 OBD_ALLOC_PTR(type->typ_md_ops);
174 OBD_ALLOC(type->typ_name, strlen(name) + 1);
176 if (type->typ_dt_ops == NULL ||
177 type->typ_md_ops == NULL ||
178 type->typ_name == NULL)
181 *(type->typ_dt_ops) = *dt_ops;
182 /* md_ops is optional */
184 *(type->typ_md_ops) = *md_ops;
185 strcpy(type->typ_name, name);
186 cfs_spin_lock_init(&type->obd_type_lock);
189 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191 if (IS_ERR(type->typ_procroot)) {
192 rc = PTR_ERR(type->typ_procroot);
193 type->typ_procroot = NULL;
199 rc = lu_device_type_init(ldt);
204 cfs_spin_lock(&obd_types_lock);
205 cfs_list_add(&type->typ_chain, &obd_types);
206 cfs_spin_unlock(&obd_types_lock);
211 if (type->typ_name != NULL)
212 OBD_FREE(type->typ_name, strlen(name) + 1);
213 if (type->typ_md_ops != NULL)
214 OBD_FREE_PTR(type->typ_md_ops);
215 if (type->typ_dt_ops != NULL)
216 OBD_FREE_PTR(type->typ_dt_ops);
217 OBD_FREE(type, sizeof(*type));
221 int class_unregister_type(const char *name)
223 struct obd_type *type = class_search_type(name);
227 CERROR("unknown obd type\n");
231 if (type->typ_refcnt) {
232 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233 /* This is a bad situation, let's make the best of it */
234 /* Remove ops, but leave the name for debugging */
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE_PTR(type->typ_md_ops);
240 if (type->typ_procroot) {
241 lprocfs_remove(&type->typ_procroot);
245 lu_device_type_fini(type->typ_lu);
247 cfs_spin_lock(&obd_types_lock);
248 cfs_list_del(&type->typ_chain);
249 cfs_spin_unlock(&obd_types_lock);
250 OBD_FREE(type->typ_name, strlen(name) + 1);
251 if (type->typ_dt_ops != NULL)
252 OBD_FREE_PTR(type->typ_dt_ops);
253 if (type->typ_md_ops != NULL)
254 OBD_FREE_PTR(type->typ_md_ops);
255 OBD_FREE(type, sizeof(*type));
257 } /* class_unregister_type */
260 * Create a new obd device.
262 * Find an empty slot in ::obd_devs[], create a new obd device in it.
264 * \param[in] type_name obd device type string.
265 * \param[in] name obd device name.
267 * \retval NULL if create fails, otherwise return the obd device
270 struct obd_device *class_newdev(const char *type_name, const char *name)
272 struct obd_device *result = NULL;
273 struct obd_device *newdev;
274 struct obd_type *type = NULL;
276 int new_obd_minor = 0;
278 if (strlen(name) >= MAX_OBD_NAME) {
279 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280 RETURN(ERR_PTR(-EINVAL));
283 type = class_get_type(type_name);
285 CERROR("OBD: unknown type: %s\n", type_name);
286 RETURN(ERR_PTR(-ENODEV));
289 newdev = obd_device_alloc();
290 if (newdev == NULL) {
291 class_put_type(type);
292 RETURN(ERR_PTR(-ENOMEM));
294 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296 cfs_spin_lock(&obd_dev_lock);
297 for (i = 0; i < class_devno_max(); i++) {
298 struct obd_device *obd = class_num2obd(i);
299 if (obd && obd->obd_name &&
300 (strcmp(name, obd->obd_name) == 0)) {
301 CERROR("Device %s already exists, won't add\n", name);
303 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304 "%p obd_magic %08x != %08x\n", result,
305 result->obd_magic, OBD_DEVICE_MAGIC);
306 LASSERTF(result->obd_minor == new_obd_minor,
307 "%p obd_minor %d != %d\n", result,
308 result->obd_minor, new_obd_minor);
310 obd_devs[result->obd_minor] = NULL;
311 result->obd_name[0]='\0';
313 result = ERR_PTR(-EEXIST);
316 if (!result && !obd) {
318 result->obd_minor = i;
320 result->obd_type = type;
321 strncpy(result->obd_name, name,
322 sizeof(result->obd_name) - 1);
323 obd_devs[i] = result;
326 cfs_spin_unlock(&obd_dev_lock);
328 if (result == NULL && i >= class_devno_max()) {
329 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331 result = ERR_PTR(-EOVERFLOW);
334 if (IS_ERR(result)) {
335 obd_device_free(newdev);
336 class_put_type(type);
338 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339 result->obd_name, result);
344 void class_release_dev(struct obd_device *obd)
346 struct obd_type *obd_type = obd->obd_type;
348 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352 LASSERT(obd_type != NULL);
354 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355 obd->obd_name,obd->obd_type->typ_name);
357 cfs_spin_lock(&obd_dev_lock);
358 obd_devs[obd->obd_minor] = NULL;
359 cfs_spin_unlock(&obd_dev_lock);
360 obd_device_free(obd);
362 class_put_type(obd_type);
365 int class_name2dev(const char *name)
372 cfs_spin_lock(&obd_dev_lock);
373 for (i = 0; i < class_devno_max(); i++) {
374 struct obd_device *obd = class_num2obd(i);
375 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376 /* Make sure we finished attaching before we give
377 out any references */
378 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379 if (obd->obd_attached) {
380 cfs_spin_unlock(&obd_dev_lock);
386 cfs_spin_unlock(&obd_dev_lock);
391 struct obd_device *class_name2obd(const char *name)
393 int dev = class_name2dev(name);
395 if (dev < 0 || dev > class_devno_max())
397 return class_num2obd(dev);
400 int class_uuid2dev(struct obd_uuid *uuid)
404 cfs_spin_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
407 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409 cfs_spin_unlock(&obd_dev_lock);
413 cfs_spin_unlock(&obd_dev_lock);
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 int dev = class_uuid2dev(uuid);
423 return class_num2obd(dev);
427 * Get obd device from ::obd_devs[]
429 * \param num [in] array index
431 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432 * otherwise return the obd device there.
434 struct obd_device *class_num2obd(int num)
436 struct obd_device *obd = NULL;
438 if (num < class_devno_max()) {
443 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444 "%p obd_magic %08x != %08x\n",
445 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446 LASSERTF(obd->obd_minor == num,
447 "%p obd_minor %0d != %0d\n",
448 obd, obd->obd_minor, num);
454 void class_obd_list(void)
459 cfs_spin_lock(&obd_dev_lock);
460 for (i = 0; i < class_devno_max(); i++) {
461 struct obd_device *obd = class_num2obd(i);
464 if (obd->obd_stopping)
466 else if (obd->obd_set_up)
468 else if (obd->obd_attached)
472 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473 i, status, obd->obd_type->typ_name,
474 obd->obd_name, obd->obd_uuid.uuid,
475 cfs_atomic_read(&obd->obd_refcount));
477 cfs_spin_unlock(&obd_dev_lock);
481 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
482 specified, then only the client with that uuid is returned,
483 otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485 const char * typ_name,
486 struct obd_uuid *grp_uuid)
490 cfs_spin_lock(&obd_dev_lock);
491 for (i = 0; i < class_devno_max(); i++) {
492 struct obd_device *obd = class_num2obd(i);
495 if ((strncmp(obd->obd_type->typ_name, typ_name,
496 strlen(typ_name)) == 0)) {
497 if (obd_uuid_equals(tgt_uuid,
498 &obd->u.cli.cl_target_uuid) &&
499 ((grp_uuid)? obd_uuid_equals(grp_uuid,
500 &obd->obd_uuid) : 1)) {
501 cfs_spin_unlock(&obd_dev_lock);
506 cfs_spin_unlock(&obd_dev_lock);
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512 searching at *next, and if a device is found, the next index to look
513 at is saved in *next. If next is NULL, then the first matching device
514 will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
521 else if (*next >= 0 && *next < class_devno_max())
526 cfs_spin_lock(&obd_dev_lock);
527 for (; i < class_devno_max(); i++) {
528 struct obd_device *obd = class_num2obd(i);
531 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
534 cfs_spin_unlock(&obd_dev_lock);
538 cfs_spin_unlock(&obd_dev_lock);
544 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
545 * adjust sptlrpc settings accordingly.
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 struct obd_device *obd;
553 LASSERT(namelen > 0);
555 cfs_spin_lock(&obd_dev_lock);
556 for (i = 0; i < class_devno_max(); i++) {
557 obd = class_num2obd(i);
559 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
562 /* only notify mdc, osc, mdt, ost */
563 type = obd->obd_type->typ_name;
564 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567 strcmp(type, LUSTRE_OST_NAME) != 0)
570 if (strncmp(obd->obd_name, fsname, namelen))
573 class_incref(obd, __FUNCTION__, obd);
574 cfs_spin_unlock(&obd_dev_lock);
575 rc2 = obd_set_info_async(obd->obd_self_export,
576 sizeof(KEY_SPTLRPC_CONF),
577 KEY_SPTLRPC_CONF, 0, NULL, NULL);
579 class_decref(obd, __FUNCTION__, obd);
580 cfs_spin_lock(&obd_dev_lock);
582 cfs_spin_unlock(&obd_dev_lock);
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587 void obd_cleanup_caches(void)
592 if (obd_device_cachep) {
593 rc = cfs_mem_cache_destroy(obd_device_cachep);
594 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595 obd_device_cachep = NULL;
598 rc = cfs_mem_cache_destroy(obdo_cachep);
599 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
603 rc = cfs_mem_cache_destroy(import_cachep);
604 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605 import_cachep = NULL;
608 rc = cfs_mem_cache_destroy(capa_cachep);
609 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
615 int obd_init_caches(void)
619 LASSERT(obd_device_cachep == NULL);
620 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621 sizeof(struct obd_device),
623 if (!obd_device_cachep)
626 LASSERT(obdo_cachep == NULL);
627 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
632 LASSERT(import_cachep == NULL);
633 import_cachep = cfs_mem_cache_create("ll_import_cache",
634 sizeof(struct obd_import),
639 LASSERT(capa_cachep == NULL);
640 capa_cachep = cfs_mem_cache_create("capa_cache",
641 sizeof(struct obd_capa), 0, 0);
647 obd_cleanup_caches();
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 struct obd_export *export;
659 CDEBUG(D_CACHE, "looking for null handle\n");
663 if (conn->cookie == -1) { /* this means assign a new connection */
664 CDEBUG(D_CACHE, "want a new connection\n");
668 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669 export = class_handle2object(conn->cookie);
673 struct obd_device *class_exp2obd(struct obd_export *exp)
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 struct obd_export *export;
683 export = class_conn2export(conn);
685 struct obd_device *obd = export->exp_obd;
686 class_export_put(export);
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 struct obd_device *obd = exp->exp_obd;
697 return obd->u.cli.cl_import;
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 struct obd_device *obd = class_conn2obd(conn);
705 return obd->u.cli.cl_import;
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
711 struct obd_device *obd = exp->exp_obd;
714 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
716 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717 exp->exp_client_uuid.uuid, obd->obd_name);
719 LASSERT(obd != NULL);
721 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722 if (exp->exp_connection)
723 ptlrpc_put_connection_superhack(exp->exp_connection);
725 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
726 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
727 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
728 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
729 obd_destroy_export(exp);
730 class_decref(obd, "export", exp);
732 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736 static void export_handle_addref(void *export)
738 class_export_get(export);
741 struct obd_export *class_export_get(struct obd_export *exp)
743 cfs_atomic_inc(&exp->exp_refcount);
744 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745 cfs_atomic_read(&exp->exp_refcount));
748 EXPORT_SYMBOL(class_export_get);
750 void class_export_put(struct obd_export *exp)
752 LASSERT(exp != NULL);
753 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
754 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755 cfs_atomic_read(&exp->exp_refcount) - 1);
757 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
758 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
759 CDEBUG(D_IOCTL, "final put %p/%s\n",
760 exp, exp->exp_client_uuid.uuid);
762 /* release nid stat refererence */
763 lprocfs_exp_cleanup(exp);
765 obd_zombie_export_add(exp);
768 EXPORT_SYMBOL(class_export_put);
770 /* Creates a new export, adds it to the hash table, and returns a
771 * pointer to it. The refcount is 2: one for the hash reference, and
772 * one for the pointer returned by this function. */
773 struct obd_export *class_new_export(struct obd_device *obd,
774 struct obd_uuid *cluuid)
776 struct obd_export *export;
777 cfs_hash_t *hash = NULL;
781 OBD_ALLOC_PTR(export);
783 return ERR_PTR(-ENOMEM);
785 export->exp_conn_cnt = 0;
786 export->exp_lock_hash = NULL;
787 cfs_atomic_set(&export->exp_refcount, 2);
788 cfs_atomic_set(&export->exp_rpc_count, 0);
789 cfs_atomic_set(&export->exp_cb_count, 0);
790 cfs_atomic_set(&export->exp_locks_count, 0);
791 #if LUSTRE_TRACKS_LOCK_EXP_REFS
792 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
793 cfs_spin_lock_init(&export->exp_locks_list_guard);
795 cfs_atomic_set(&export->exp_replay_count, 0);
796 export->exp_obd = obd;
797 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
798 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
799 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
800 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
801 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
802 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
803 class_handle_hash(&export->exp_handle, export_handle_addref);
804 export->exp_last_request_time = cfs_time_current_sec();
805 cfs_spin_lock_init(&export->exp_lock);
806 cfs_spin_lock_init(&export->exp_rpc_lock);
807 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
808 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
810 export->exp_sp_peer = LUSTRE_SP_ANY;
811 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
812 export->exp_client_uuid = *cluuid;
813 obd_init_export(export);
815 cfs_spin_lock(&obd->obd_dev_lock);
816 /* shouldn't happen, but might race */
817 if (obd->obd_stopping)
818 GOTO(exit_unlock, rc = -ENODEV);
820 hash = cfs_hash_getref(obd->obd_uuid_hash);
822 GOTO(exit_unlock, rc = -ENODEV);
823 cfs_spin_unlock(&obd->obd_dev_lock);
825 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
826 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
828 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
829 obd->obd_name, cluuid->uuid, rc);
830 GOTO(exit_err, rc = -EALREADY);
834 cfs_spin_lock(&obd->obd_dev_lock);
835 if (obd->obd_stopping) {
836 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
837 GOTO(exit_unlock, rc = -ENODEV);
840 class_incref(obd, "export", export);
841 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
842 cfs_list_add_tail(&export->exp_obd_chain_timed,
843 &export->exp_obd->obd_exports_timed);
844 export->exp_obd->obd_num_exports++;
845 cfs_spin_unlock(&obd->obd_dev_lock);
846 cfs_hash_putref(hash);
850 cfs_spin_unlock(&obd->obd_dev_lock);
853 cfs_hash_putref(hash);
854 class_handle_unhash(&export->exp_handle);
855 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
856 obd_destroy_export(export);
857 OBD_FREE_PTR(export);
860 EXPORT_SYMBOL(class_new_export);
862 void class_unlink_export(struct obd_export *exp)
864 class_handle_unhash(&exp->exp_handle);
866 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
867 /* delete an uuid-export hashitem from hashtables */
868 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
869 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
870 &exp->exp_client_uuid,
871 &exp->exp_uuid_hash);
873 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
874 cfs_list_del_init(&exp->exp_obd_chain_timed);
875 exp->exp_obd->obd_num_exports--;
876 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
877 class_export_put(exp);
879 EXPORT_SYMBOL(class_unlink_export);
881 /* Import management functions */
882 void class_import_destroy(struct obd_import *imp)
886 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
887 imp->imp_obd->obd_name);
889 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
891 ptlrpc_put_connection_superhack(imp->imp_connection);
893 while (!cfs_list_empty(&imp->imp_conn_list)) {
894 struct obd_import_conn *imp_conn;
896 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
897 struct obd_import_conn, oic_item);
898 cfs_list_del_init(&imp_conn->oic_item);
899 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
900 OBD_FREE(imp_conn, sizeof(*imp_conn));
903 LASSERT(imp->imp_sec == NULL);
904 class_decref(imp->imp_obd, "import", imp);
905 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
909 static void import_handle_addref(void *import)
911 class_import_get(import);
914 struct obd_import *class_import_get(struct obd_import *import)
916 cfs_atomic_inc(&import->imp_refcount);
917 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
918 cfs_atomic_read(&import->imp_refcount),
919 import->imp_obd->obd_name);
922 EXPORT_SYMBOL(class_import_get);
924 void class_import_put(struct obd_import *imp)
928 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
929 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
931 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
932 cfs_atomic_read(&imp->imp_refcount) - 1,
933 imp->imp_obd->obd_name);
935 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
936 CDEBUG(D_INFO, "final put import %p\n", imp);
937 obd_zombie_import_add(imp);
942 EXPORT_SYMBOL(class_import_put);
944 static void init_imp_at(struct imp_at *at) {
946 at_init(&at->iat_net_latency, 0, 0);
947 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
948 /* max service estimates are tracked on the server side, so
949 don't use the AT history here, just use the last reported
950 val. (But keep hist for proc histogram, worst_ever) */
951 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
956 struct obd_import *class_new_import(struct obd_device *obd)
958 struct obd_import *imp;
960 OBD_ALLOC(imp, sizeof(*imp));
964 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
965 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
966 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
967 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
968 cfs_spin_lock_init(&imp->imp_lock);
969 imp->imp_last_success_conn = 0;
970 imp->imp_state = LUSTRE_IMP_NEW;
971 imp->imp_obd = class_incref(obd, "import", imp);
972 cfs_sema_init(&imp->imp_sec_mutex, 1);
973 cfs_waitq_init(&imp->imp_recovery_waitq);
975 cfs_atomic_set(&imp->imp_refcount, 2);
976 cfs_atomic_set(&imp->imp_unregistering, 0);
977 cfs_atomic_set(&imp->imp_inflight, 0);
978 cfs_atomic_set(&imp->imp_replay_inflight, 0);
979 cfs_atomic_set(&imp->imp_inval_count, 0);
980 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
981 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
982 class_handle_hash(&imp->imp_handle, import_handle_addref);
983 init_imp_at(&imp->imp_at);
985 /* the default magic is V2, will be used in connect RPC, and
986 * then adjusted according to the flags in request/reply. */
987 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
991 EXPORT_SYMBOL(class_new_import);
993 void class_destroy_import(struct obd_import *import)
995 LASSERT(import != NULL);
996 LASSERT(import != LP_POISON);
998 class_handle_unhash(&import->imp_handle);
1000 cfs_spin_lock(&import->imp_lock);
1001 import->imp_generation++;
1002 cfs_spin_unlock(&import->imp_lock);
1003 class_import_put(import);
1005 EXPORT_SYMBOL(class_destroy_import);
1007 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1009 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1011 cfs_spin_lock(&exp->exp_locks_list_guard);
1013 LASSERT(lock->l_exp_refs_nr >= 0);
1015 if (lock->l_exp_refs_target != NULL &&
1016 lock->l_exp_refs_target != exp) {
1017 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1018 exp, lock, lock->l_exp_refs_target);
1020 if ((lock->l_exp_refs_nr ++) == 0) {
1021 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1022 lock->l_exp_refs_target = exp;
1024 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1025 lock, exp, lock->l_exp_refs_nr);
1026 cfs_spin_unlock(&exp->exp_locks_list_guard);
1028 EXPORT_SYMBOL(__class_export_add_lock_ref);
1030 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1032 cfs_spin_lock(&exp->exp_locks_list_guard);
1033 LASSERT(lock->l_exp_refs_nr > 0);
1034 if (lock->l_exp_refs_target != exp) {
1035 LCONSOLE_WARN("lock %p, "
1036 "mismatching export pointers: %p, %p\n",
1037 lock, lock->l_exp_refs_target, exp);
1039 if (-- lock->l_exp_refs_nr == 0) {
1040 cfs_list_del_init(&lock->l_exp_refs_link);
1041 lock->l_exp_refs_target = NULL;
1043 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1044 lock, exp, lock->l_exp_refs_nr);
1045 cfs_spin_unlock(&exp->exp_locks_list_guard);
1047 EXPORT_SYMBOL(__class_export_del_lock_ref);
1050 /* A connection defines an export context in which preallocation can
1051 be managed. This releases the export pointer reference, and returns
1052 the export handle, so the export refcount is 1 when this function
1054 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1055 struct obd_uuid *cluuid)
1057 struct obd_export *export;
1058 LASSERT(conn != NULL);
1059 LASSERT(obd != NULL);
1060 LASSERT(cluuid != NULL);
1063 export = class_new_export(obd, cluuid);
1065 RETURN(PTR_ERR(export));
1067 conn->cookie = export->exp_handle.h_cookie;
1068 class_export_put(export);
1070 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1071 cluuid->uuid, conn->cookie);
1074 EXPORT_SYMBOL(class_connect);
1076 /* if export is involved in recovery then clean up related things */
1077 void class_export_recovery_cleanup(struct obd_export *exp)
1079 struct obd_device *obd = exp->exp_obd;
1081 cfs_spin_lock(&obd->obd_recovery_task_lock);
1082 if (exp->exp_delayed)
1083 obd->obd_delayed_clients--;
1084 if (obd->obd_recovering && exp->exp_in_recovery) {
1085 cfs_spin_lock(&exp->exp_lock);
1086 exp->exp_in_recovery = 0;
1087 cfs_spin_unlock(&exp->exp_lock);
1088 LASSERT(obd->obd_connected_clients);
1089 obd->obd_connected_clients--;
1091 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1092 /** Cleanup req replay fields */
1093 if (exp->exp_req_replay_needed) {
1094 cfs_spin_lock(&exp->exp_lock);
1095 exp->exp_req_replay_needed = 0;
1096 cfs_spin_unlock(&exp->exp_lock);
1097 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1098 cfs_atomic_dec(&obd->obd_req_replay_clients);
1100 /** Cleanup lock replay data */
1101 if (exp->exp_lock_replay_needed) {
1102 cfs_spin_lock(&exp->exp_lock);
1103 exp->exp_lock_replay_needed = 0;
1104 cfs_spin_unlock(&exp->exp_lock);
1105 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1106 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1110 /* This function removes 1-3 references from the export:
1111 * 1 - for export pointer passed
1112 * and if disconnect really need
1113 * 2 - removing from hash
1114 * 3 - in client_unlink_export
1115 * The export pointer passed to this function can destroyed */
1116 int class_disconnect(struct obd_export *export)
1118 int already_disconnected;
1121 if (export == NULL) {
1123 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1127 cfs_spin_lock(&export->exp_lock);
1128 already_disconnected = export->exp_disconnected;
1129 export->exp_disconnected = 1;
1130 cfs_spin_unlock(&export->exp_lock);
1132 /* class_cleanup(), abort_recovery(), and class_fail_export()
1133 * all end up in here, and if any of them race we shouldn't
1134 * call extra class_export_puts(). */
1135 if (already_disconnected) {
1136 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1137 GOTO(no_disconn, already_disconnected);
1140 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1141 export->exp_handle.h_cookie);
1143 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1144 cfs_hash_del(export->exp_obd->obd_nid_hash,
1145 &export->exp_connection->c_peer.nid,
1146 &export->exp_nid_hash);
1148 class_export_recovery_cleanup(export);
1149 class_unlink_export(export);
1151 class_export_put(export);
1155 /* Return non-zero for a fully connected export */
1156 int class_connected_export(struct obd_export *exp)
1160 cfs_spin_lock(&exp->exp_lock);
1161 connected = (exp->exp_conn_cnt > 0);
1162 cfs_spin_unlock(&exp->exp_lock);
1167 EXPORT_SYMBOL(class_connected_export);
1169 static void class_disconnect_export_list(cfs_list_t *list,
1170 enum obd_option flags)
1173 struct obd_export *exp;
1176 /* It's possible that an export may disconnect itself, but
1177 * nothing else will be added to this list. */
1178 while (!cfs_list_empty(list)) {
1179 exp = cfs_list_entry(list->next, struct obd_export,
1181 /* need for safe call CDEBUG after obd_disconnect */
1182 class_export_get(exp);
1184 cfs_spin_lock(&exp->exp_lock);
1185 exp->exp_flags = flags;
1186 cfs_spin_unlock(&exp->exp_lock);
1188 if (obd_uuid_equals(&exp->exp_client_uuid,
1189 &exp->exp_obd->obd_uuid)) {
1191 "exp %p export uuid == obd uuid, don't discon\n",
1193 /* Need to delete this now so we don't end up pointing
1194 * to work_list later when this export is cleaned up. */
1195 cfs_list_del_init(&exp->exp_obd_chain);
1196 class_export_put(exp);
1200 class_export_get(exp);
1201 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1202 "last request at "CFS_TIME_T"\n",
1203 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1204 exp, exp->exp_last_request_time);
1205 /* release one export reference anyway */
1206 rc = obd_disconnect(exp);
1208 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1209 obd_export_nid2str(exp), exp, rc);
1210 class_export_put(exp);
1215 void class_disconnect_exports(struct obd_device *obd)
1217 cfs_list_t work_list;
1220 /* Move all of the exports from obd_exports to a work list, en masse. */
1221 CFS_INIT_LIST_HEAD(&work_list);
1222 cfs_spin_lock(&obd->obd_dev_lock);
1223 cfs_list_splice_init(&obd->obd_exports, &work_list);
1224 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1225 cfs_spin_unlock(&obd->obd_dev_lock);
1227 if (!cfs_list_empty(&work_list)) {
1228 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1229 "disconnecting them\n", obd->obd_minor, obd);
1230 class_disconnect_export_list(&work_list,
1231 exp_flags_from_obd(obd));
1233 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1234 obd->obd_minor, obd);
1237 EXPORT_SYMBOL(class_disconnect_exports);
1239 /* Remove exports that have not completed recovery.
1241 void class_disconnect_stale_exports(struct obd_device *obd,
1242 int (*test_export)(struct obd_export *))
1244 cfs_list_t work_list;
1245 cfs_list_t *pos, *n;
1246 struct obd_export *exp;
1250 CFS_INIT_LIST_HEAD(&work_list);
1251 cfs_spin_lock(&obd->obd_dev_lock);
1252 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1253 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1254 if (test_export(exp))
1257 /* don't count self-export as client */
1258 if (obd_uuid_equals(&exp->exp_client_uuid,
1259 &exp->exp_obd->obd_uuid))
1262 cfs_list_move(&exp->exp_obd_chain, &work_list);
1264 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1265 obd->obd_name, exp->exp_client_uuid.uuid,
1266 exp->exp_connection == NULL ? "<unknown>" :
1267 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1268 print_export_data(exp, "EVICTING", 0);
1270 cfs_spin_unlock(&obd->obd_dev_lock);
1273 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1274 obd->obd_name, evicted);
1275 obd->obd_stale_clients += evicted;
1277 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1278 OBD_OPT_ABORT_RECOV);
1281 EXPORT_SYMBOL(class_disconnect_stale_exports);
1283 void class_fail_export(struct obd_export *exp)
1285 int rc, already_failed;
1287 cfs_spin_lock(&exp->exp_lock);
1288 already_failed = exp->exp_failed;
1289 exp->exp_failed = 1;
1290 cfs_spin_unlock(&exp->exp_lock);
1292 if (already_failed) {
1293 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1294 exp, exp->exp_client_uuid.uuid);
1298 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1299 exp, exp->exp_client_uuid.uuid);
1301 if (obd_dump_on_timeout)
1302 libcfs_debug_dumplog();
1304 /* Most callers into obd_disconnect are removing their own reference
1305 * (request, for example) in addition to the one from the hash table.
1306 * We don't have such a reference here, so make one. */
1307 class_export_get(exp);
1308 rc = obd_disconnect(exp);
1310 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1312 CDEBUG(D_HA, "disconnected export %p/%s\n",
1313 exp, exp->exp_client_uuid.uuid);
1315 EXPORT_SYMBOL(class_fail_export);
1317 char *obd_export_nid2str(struct obd_export *exp)
1319 if (exp->exp_connection != NULL)
1320 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1324 EXPORT_SYMBOL(obd_export_nid2str);
1326 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1328 struct obd_export *doomed_exp = NULL;
1329 int exports_evicted = 0;
1331 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1334 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1335 if (doomed_exp == NULL)
1338 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1339 "nid %s found, wanted nid %s, requested nid %s\n",
1340 obd_export_nid2str(doomed_exp),
1341 libcfs_nid2str(nid_key), nid);
1342 LASSERTF(doomed_exp != obd->obd_self_export,
1343 "self-export is hashed by NID?\n");
1345 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1346 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1348 class_fail_export(doomed_exp);
1349 class_export_put(doomed_exp);
1352 if (!exports_evicted)
1353 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1354 obd->obd_name, nid);
1355 return exports_evicted;
1357 EXPORT_SYMBOL(obd_export_evict_by_nid);
1359 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1361 struct obd_export *doomed_exp = NULL;
1362 struct obd_uuid doomed_uuid;
1363 int exports_evicted = 0;
1365 obd_str2uuid(&doomed_uuid, uuid);
1366 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1367 CERROR("%s: can't evict myself\n", obd->obd_name);
1368 return exports_evicted;
1371 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1373 if (doomed_exp == NULL) {
1374 CERROR("%s: can't disconnect %s: no exports found\n",
1375 obd->obd_name, uuid);
1377 CWARN("%s: evicting %s at adminstrative request\n",
1378 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1379 class_fail_export(doomed_exp);
1380 class_export_put(doomed_exp);
1384 return exports_evicted;
1386 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1388 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1389 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1390 EXPORT_SYMBOL(class_export_dump_hook);
1393 static void print_export_data(struct obd_export *exp, const char *status,
1396 struct ptlrpc_reply_state *rs;
1397 struct ptlrpc_reply_state *first_reply = NULL;
1400 cfs_spin_lock(&exp->exp_lock);
1401 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1407 cfs_spin_unlock(&exp->exp_lock);
1409 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1410 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1411 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1412 cfs_atomic_read(&exp->exp_rpc_count),
1413 cfs_atomic_read(&exp->exp_cb_count),
1414 cfs_atomic_read(&exp->exp_locks_count),
1415 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1416 nreplies, first_reply, nreplies > 3 ? "..." : "",
1417 exp->exp_last_committed);
1418 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1419 if (locks && class_export_dump_hook != NULL)
1420 class_export_dump_hook(exp);
1424 void dump_exports(struct obd_device *obd, int locks)
1426 struct obd_export *exp;
1428 cfs_spin_lock(&obd->obd_dev_lock);
1429 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1430 print_export_data(exp, "ACTIVE", locks);
1431 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1432 print_export_data(exp, "UNLINKED", locks);
1433 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1434 print_export_data(exp, "DELAYED", locks);
1435 cfs_spin_unlock(&obd->obd_dev_lock);
1436 cfs_spin_lock(&obd_zombie_impexp_lock);
1437 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1438 print_export_data(exp, "ZOMBIE", locks);
1439 cfs_spin_unlock(&obd_zombie_impexp_lock);
1441 EXPORT_SYMBOL(dump_exports);
1443 void obd_exports_barrier(struct obd_device *obd)
1446 LASSERT(cfs_list_empty(&obd->obd_exports));
1447 cfs_spin_lock(&obd->obd_dev_lock);
1448 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1449 cfs_spin_unlock(&obd->obd_dev_lock);
1450 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1451 cfs_time_seconds(waited));
1452 if (waited > 5 && IS_PO2(waited)) {
1453 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1454 "more than %d seconds. "
1455 "The obd refcount = %d. Is it stuck?\n",
1456 obd->obd_name, waited,
1457 cfs_atomic_read(&obd->obd_refcount));
1458 dump_exports(obd, 1);
1461 cfs_spin_lock(&obd->obd_dev_lock);
1463 cfs_spin_unlock(&obd->obd_dev_lock);
1465 EXPORT_SYMBOL(obd_exports_barrier);
1467 /* Total amount of zombies to be destroyed */
1468 static int zombies_count = 0;
1471 * kill zombie imports and exports
1473 void obd_zombie_impexp_cull(void)
1475 struct obd_import *import;
1476 struct obd_export *export;
1480 cfs_spin_lock(&obd_zombie_impexp_lock);
1483 if (!cfs_list_empty(&obd_zombie_imports)) {
1484 import = cfs_list_entry(obd_zombie_imports.next,
1487 cfs_list_del_init(&import->imp_zombie_chain);
1491 if (!cfs_list_empty(&obd_zombie_exports)) {
1492 export = cfs_list_entry(obd_zombie_exports.next,
1495 cfs_list_del_init(&export->exp_obd_chain);
1498 cfs_spin_unlock(&obd_zombie_impexp_lock);
1500 if (import != NULL) {
1501 class_import_destroy(import);
1502 cfs_spin_lock(&obd_zombie_impexp_lock);
1504 cfs_spin_unlock(&obd_zombie_impexp_lock);
1507 if (export != NULL) {
1508 class_export_destroy(export);
1509 cfs_spin_lock(&obd_zombie_impexp_lock);
1511 cfs_spin_unlock(&obd_zombie_impexp_lock);
1515 } while (import != NULL || export != NULL);
1519 static cfs_completion_t obd_zombie_start;
1520 static cfs_completion_t obd_zombie_stop;
1521 static unsigned long obd_zombie_flags;
1522 static cfs_waitq_t obd_zombie_waitq;
1523 static pid_t obd_zombie_pid;
1526 OBD_ZOMBIE_STOP = 1 << 1
1530 * check for work for kill zombie import/export thread.
1532 static int obd_zombie_impexp_check(void *arg)
1536 cfs_spin_lock(&obd_zombie_impexp_lock);
1537 rc = (zombies_count == 0) &&
1538 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1539 cfs_spin_unlock(&obd_zombie_impexp_lock);
1545 * Add export to the obd_zombe thread and notify it.
1547 static void obd_zombie_export_add(struct obd_export *exp) {
1548 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1549 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1550 cfs_list_del_init(&exp->exp_obd_chain);
1551 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1552 cfs_spin_lock(&obd_zombie_impexp_lock);
1554 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1555 cfs_spin_unlock(&obd_zombie_impexp_lock);
1557 if (obd_zombie_impexp_notify != NULL)
1558 obd_zombie_impexp_notify();
1562 * Add import to the obd_zombe thread and notify it.
1564 static void obd_zombie_import_add(struct obd_import *imp) {
1565 LASSERT(imp->imp_sec == NULL);
1566 cfs_spin_lock(&obd_zombie_impexp_lock);
1567 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1569 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1570 cfs_spin_unlock(&obd_zombie_impexp_lock);
1572 if (obd_zombie_impexp_notify != NULL)
1573 obd_zombie_impexp_notify();
1577 * notify import/export destroy thread about new zombie.
1579 static void obd_zombie_impexp_notify(void)
1581 cfs_waitq_signal(&obd_zombie_waitq);
1585 * check whether obd_zombie is idle
1587 static int obd_zombie_is_idle(void)
1591 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1592 cfs_spin_lock(&obd_zombie_impexp_lock);
1593 rc = (zombies_count == 0);
1594 cfs_spin_unlock(&obd_zombie_impexp_lock);
1599 * wait when obd_zombie import/export queues become empty
1601 void obd_zombie_barrier(void)
1603 struct l_wait_info lwi = { 0 };
1605 if (obd_zombie_pid == cfs_curproc_pid())
1606 /* don't wait for myself */
1608 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1610 EXPORT_SYMBOL(obd_zombie_barrier);
1615 * destroy zombie export/import thread.
1617 static int obd_zombie_impexp_thread(void *unused)
1621 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1622 cfs_complete(&obd_zombie_start);
1626 cfs_complete(&obd_zombie_start);
1628 obd_zombie_pid = cfs_curproc_pid();
1630 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1631 struct l_wait_info lwi = { 0 };
1633 l_wait_event(obd_zombie_waitq,
1634 !obd_zombie_impexp_check(NULL), &lwi);
1635 obd_zombie_impexp_cull();
1638 * Notify obd_zombie_barrier callers that queues
1641 cfs_waitq_signal(&obd_zombie_waitq);
1644 cfs_complete(&obd_zombie_stop);
1649 #else /* ! KERNEL */
1651 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1652 static void *obd_zombie_impexp_work_cb;
1653 static void *obd_zombie_impexp_idle_cb;
1655 int obd_zombie_impexp_kill(void *arg)
1659 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1660 obd_zombie_impexp_cull();
1663 cfs_atomic_dec(&zombie_recur);
1670 * start destroy zombie import/export thread
1672 int obd_zombie_impexp_init(void)
1676 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1677 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1678 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1679 cfs_init_completion(&obd_zombie_start);
1680 cfs_init_completion(&obd_zombie_stop);
1681 cfs_waitq_init(&obd_zombie_waitq);
1685 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1689 cfs_wait_for_completion(&obd_zombie_start);
1692 obd_zombie_impexp_work_cb =
1693 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1694 &obd_zombie_impexp_kill, NULL);
1696 obd_zombie_impexp_idle_cb =
1697 liblustre_register_idle_callback("obd_zombi_impexp_check",
1698 &obd_zombie_impexp_check, NULL);
1704 * stop destroy zombie import/export thread
1706 void obd_zombie_impexp_stop(void)
1708 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1709 obd_zombie_impexp_notify();
1711 cfs_wait_for_completion(&obd_zombie_stop);
1713 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1714 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1718 /***** Kernel-userspace comm helpers *******/
1720 /* Get length of entire message, including header */
1721 int kuc_len(int payload_len)
1723 return sizeof(struct kuc_hdr) + payload_len;
1725 EXPORT_SYMBOL(kuc_len);
1727 /* Get a pointer to kuc header, given a ptr to the payload
1728 * @param p Pointer to payload area
1729 * @returns Pointer to kuc header
1731 struct kuc_hdr * kuc_ptr(void *p)
1733 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1734 LASSERT(lh->kuc_magic == KUC_MAGIC);
1737 EXPORT_SYMBOL(kuc_ptr);
1739 /* Test if payload is part of kuc message
1740 * @param p Pointer to payload area
1743 int kuc_ispayload(void *p)
1745 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1747 if (kh->kuc_magic == KUC_MAGIC)
1752 EXPORT_SYMBOL(kuc_ispayload);
1754 /* Alloc space for a message, and fill in header
1755 * @return Pointer to payload area
1757 void *kuc_alloc(int payload_len, int transport, int type)
1760 int len = kuc_len(payload_len);
1764 return ERR_PTR(-ENOMEM);
1766 lh->kuc_magic = KUC_MAGIC;
1767 lh->kuc_transport = transport;
1768 lh->kuc_msgtype = type;
1769 lh->kuc_msglen = len;
1771 return (void *)(lh + 1);
1773 EXPORT_SYMBOL(kuc_alloc);
1775 /* Takes pointer to payload area */
1776 inline void kuc_free(void *p, int payload_len)
1778 struct kuc_hdr *lh = kuc_ptr(p);
1779 OBD_FREE(lh, kuc_len(payload_len));
1781 EXPORT_SYMBOL(kuc_free);