1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/obdclass/genops.c
40 * These are the only exported functions, they provide some generic
41 * infrastructure for managing object devices
44 #define DEBUG_SUBSYSTEM S_CLASS
46 #include <liblustre.h>
49 #include <obd_class.h>
50 #include <lprocfs_status.h>
52 extern cfs_list_t obd_types;
53 cfs_spinlock_t obd_types_lock;
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
60 cfs_list_t obd_zombie_imports;
61 cfs_list_t obd_zombie_exports;
62 cfs_spinlock_t obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64 static void obd_zombie_export_add(struct obd_export *exp);
65 static void obd_zombie_import_add(struct obd_import *imp);
66 static void print_export_data(struct obd_export *exp,
67 const char *status, int locks);
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 * support functions: we could use inter-module communication, but this
73 * is more portable to other OS's
75 static struct obd_device *obd_device_alloc(void)
77 struct obd_device *obd;
79 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81 obd->obd_magic = OBD_DEVICE_MAGIC;
86 static void obd_device_free(struct obd_device *obd)
89 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91 if (obd->obd_namespace != NULL) {
92 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93 obd, obd->obd_namespace, obd->obd_force);
96 lu_ref_fini(&obd->obd_reference);
97 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 struct obd_type *class_search_type(const char *name)
103 struct obd_type *type;
105 cfs_spin_lock(&obd_types_lock);
106 cfs_list_for_each(tmp, &obd_types) {
107 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
108 if (strcmp(type->typ_name, name) == 0) {
109 cfs_spin_unlock(&obd_types_lock);
113 cfs_spin_unlock(&obd_types_lock);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
124 if (!cfs_request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 cfs_spin_lock(&type->obd_type_lock);
136 cfs_try_module_get(type->typ_dt_ops->o_owner);
137 cfs_spin_unlock(&type->obd_type_lock);
141 EXPORT_SYMBOL(class_get_type);
143 void class_put_type(struct obd_type *type)
146 cfs_spin_lock(&type->obd_type_lock);
148 cfs_module_put(type->typ_dt_ops->o_owner);
149 cfs_spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_put_type);
153 #define CLASS_MAX_NAME 1024
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156 struct lprocfs_vars *vars, const char *name,
157 struct lu_device_type *ldt)
159 struct obd_type *type;
164 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166 if (class_search_type(name)) {
167 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172 OBD_ALLOC(type, sizeof(*type));
176 OBD_ALLOC_PTR(type->typ_dt_ops);
177 OBD_ALLOC_PTR(type->typ_md_ops);
178 OBD_ALLOC(type->typ_name, strlen(name) + 1);
180 if (type->typ_dt_ops == NULL ||
181 type->typ_md_ops == NULL ||
182 type->typ_name == NULL)
185 *(type->typ_dt_ops) = *dt_ops;
186 /* md_ops is optional */
188 *(type->typ_md_ops) = *md_ops;
189 strcpy(type->typ_name, name);
190 cfs_spin_lock_init(&type->obd_type_lock);
193 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195 if (IS_ERR(type->typ_procroot)) {
196 rc = PTR_ERR(type->typ_procroot);
197 type->typ_procroot = NULL;
203 rc = lu_device_type_init(ldt);
208 cfs_spin_lock(&obd_types_lock);
209 cfs_list_add(&type->typ_chain, &obd_types);
210 cfs_spin_unlock(&obd_types_lock);
215 if (type->typ_name != NULL)
216 OBD_FREE(type->typ_name, strlen(name) + 1);
217 if (type->typ_md_ops != NULL)
218 OBD_FREE_PTR(type->typ_md_ops);
219 if (type->typ_dt_ops != NULL)
220 OBD_FREE_PTR(type->typ_dt_ops);
221 OBD_FREE(type, sizeof(*type));
224 EXPORT_SYMBOL(class_register_type);
226 int class_unregister_type(const char *name)
228 struct obd_type *type = class_search_type(name);
232 CERROR("unknown obd type\n");
236 if (type->typ_refcnt) {
237 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238 /* This is a bad situation, let's make the best of it */
239 /* Remove ops, but leave the name for debugging */
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE_PTR(type->typ_md_ops);
245 if (type->typ_procroot) {
246 lprocfs_remove(&type->typ_procroot);
250 lu_device_type_fini(type->typ_lu);
252 cfs_spin_lock(&obd_types_lock);
253 cfs_list_del(&type->typ_chain);
254 cfs_spin_unlock(&obd_types_lock);
255 OBD_FREE(type->typ_name, strlen(name) + 1);
256 if (type->typ_dt_ops != NULL)
257 OBD_FREE_PTR(type->typ_dt_ops);
258 if (type->typ_md_ops != NULL)
259 OBD_FREE_PTR(type->typ_md_ops);
260 OBD_FREE(type, sizeof(*type));
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
266 * Create a new obd device.
268 * Find an empty slot in ::obd_devs[], create a new obd device in it.
270 * \param[in] type_name obd device type string.
271 * \param[in] name obd device name.
273 * \retval NULL if create fails, otherwise return the obd device
276 struct obd_device *class_newdev(const char *type_name, const char *name)
278 struct obd_device *result = NULL;
279 struct obd_device *newdev;
280 struct obd_type *type = NULL;
282 int new_obd_minor = 0;
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 RETURN(ERR_PTR(-EINVAL));
290 type = class_get_type(type_name);
292 CERROR("OBD: unknown type: %s\n", type_name);
293 RETURN(ERR_PTR(-ENODEV));
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 class_put_type(type);
299 RETURN(ERR_PTR(-ENOMEM));
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303 cfs_write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
307 if (obd && obd->obd_name &&
308 (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
312 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313 "%p obd_magic %08x != %08x\n", result,
314 result->obd_magic, OBD_DEVICE_MAGIC);
315 LASSERTF(result->obd_minor == new_obd_minor,
316 "%p obd_minor %d != %d\n", result,
317 result->obd_minor, new_obd_minor);
319 obd_devs[result->obd_minor] = NULL;
320 result->obd_name[0]='\0';
322 result = ERR_PTR(-EEXIST);
325 if (!result && !obd) {
327 result->obd_minor = i;
329 result->obd_type = type;
330 strncpy(result->obd_name, name,
331 sizeof(result->obd_name) - 1);
332 obd_devs[i] = result;
335 cfs_write_unlock(&obd_dev_lock);
337 if (result == NULL && i >= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 RETURN(ERR_PTR(-EOVERFLOW));
343 if (IS_ERR(result)) {
344 obd_device_free(newdev);
345 class_put_type(type);
347 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348 result->obd_name, result);
353 void class_release_dev(struct obd_device *obd)
355 struct obd_type *obd_type = obd->obd_type;
357 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361 LASSERT(obd_type != NULL);
363 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
366 cfs_write_lock(&obd_dev_lock);
367 obd_devs[obd->obd_minor] = NULL;
368 cfs_write_unlock(&obd_dev_lock);
369 obd_device_free(obd);
371 class_put_type(obd_type);
374 int class_name2dev(const char *name)
381 cfs_read_lock(&obd_dev_lock);
382 for (i = 0; i < class_devno_max(); i++) {
383 struct obd_device *obd = class_num2obd(i);
385 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386 /* Make sure we finished attaching before we give
387 out any references */
388 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389 if (obd->obd_attached) {
390 cfs_read_unlock(&obd_dev_lock);
396 cfs_read_unlock(&obd_dev_lock);
400 EXPORT_SYMBOL(class_name2dev);
402 struct obd_device *class_name2obd(const char *name)
404 int dev = class_name2dev(name);
406 if (dev < 0 || dev > class_devno_max())
408 return class_num2obd(dev);
410 EXPORT_SYMBOL(class_name2obd);
412 int class_uuid2dev(struct obd_uuid *uuid)
416 cfs_read_lock(&obd_dev_lock);
417 for (i = 0; i < class_devno_max(); i++) {
418 struct obd_device *obd = class_num2obd(i);
420 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422 cfs_read_unlock(&obd_dev_lock);
426 cfs_read_unlock(&obd_dev_lock);
430 EXPORT_SYMBOL(class_uuid2dev);
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
434 int dev = class_uuid2dev(uuid);
437 return class_num2obd(dev);
439 EXPORT_SYMBOL(class_uuid2obd);
442 * Get obd device from ::obd_devs[]
444 * \param num [in] array index
446 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447 * otherwise return the obd device there.
449 struct obd_device *class_num2obd(int num)
451 struct obd_device *obd = NULL;
453 if (num < class_devno_max()) {
458 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459 "%p obd_magic %08x != %08x\n",
460 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461 LASSERTF(obd->obd_minor == num,
462 "%p obd_minor %0d != %0d\n",
463 obd, obd->obd_minor, num);
468 EXPORT_SYMBOL(class_num2obd);
470 void class_obd_list(void)
475 cfs_read_lock(&obd_dev_lock);
476 for (i = 0; i < class_devno_max(); i++) {
477 struct obd_device *obd = class_num2obd(i);
481 if (obd->obd_stopping)
483 else if (obd->obd_set_up)
485 else if (obd->obd_attached)
489 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490 i, status, obd->obd_type->typ_name,
491 obd->obd_name, obd->obd_uuid.uuid,
492 cfs_atomic_read(&obd->obd_refcount));
494 cfs_read_unlock(&obd_dev_lock);
498 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
499 specified, then only the client with that uuid is returned,
500 otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502 const char * typ_name,
503 struct obd_uuid *grp_uuid)
507 cfs_read_lock(&obd_dev_lock);
508 for (i = 0; i < class_devno_max(); i++) {
509 struct obd_device *obd = class_num2obd(i);
513 if ((strncmp(obd->obd_type->typ_name, typ_name,
514 strlen(typ_name)) == 0)) {
515 if (obd_uuid_equals(tgt_uuid,
516 &obd->u.cli.cl_target_uuid) &&
517 ((grp_uuid)? obd_uuid_equals(grp_uuid,
518 &obd->obd_uuid) : 1)) {
519 cfs_read_unlock(&obd_dev_lock);
524 cfs_read_unlock(&obd_dev_lock);
528 EXPORT_SYMBOL(class_find_client_obd);
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531 searching at *next, and if a device is found, the next index to look
532 at is saved in *next. If next is NULL, then the first matching device
533 will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
540 else if (*next >= 0 && *next < class_devno_max())
545 cfs_read_lock(&obd_dev_lock);
546 for (; i < class_devno_max(); i++) {
547 struct obd_device *obd = class_num2obd(i);
551 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
554 cfs_read_unlock(&obd_dev_lock);
558 cfs_read_unlock(&obd_dev_lock);
562 EXPORT_SYMBOL(class_devices_in_group);
565 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566 * adjust sptlrpc settings accordingly.
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
570 struct obd_device *obd;
574 LASSERT(namelen > 0);
576 cfs_read_lock(&obd_dev_lock);
577 for (i = 0; i < class_devno_max(); i++) {
578 obd = class_num2obd(i);
580 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
583 /* only notify mdc, osc, mdt, ost */
584 type = obd->obd_type->typ_name;
585 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588 strcmp(type, LUSTRE_OST_NAME) != 0)
591 if (strncmp(obd->obd_name, fsname, namelen))
594 class_incref(obd, __FUNCTION__, obd);
595 cfs_read_unlock(&obd_dev_lock);
596 rc2 = obd_set_info_async(obd->obd_self_export,
597 sizeof(KEY_SPTLRPC_CONF),
598 KEY_SPTLRPC_CONF, 0, NULL, NULL);
600 class_decref(obd, __FUNCTION__, obd);
601 cfs_read_lock(&obd_dev_lock);
603 cfs_read_unlock(&obd_dev_lock);
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
608 void obd_cleanup_caches(void)
613 if (obd_device_cachep) {
614 rc = cfs_mem_cache_destroy(obd_device_cachep);
615 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616 obd_device_cachep = NULL;
619 rc = cfs_mem_cache_destroy(obdo_cachep);
620 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
624 rc = cfs_mem_cache_destroy(import_cachep);
625 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626 import_cachep = NULL;
629 rc = cfs_mem_cache_destroy(capa_cachep);
630 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
636 int obd_init_caches(void)
640 LASSERT(obd_device_cachep == NULL);
641 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642 sizeof(struct obd_device),
644 if (!obd_device_cachep)
647 LASSERT(obdo_cachep == NULL);
648 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
653 LASSERT(import_cachep == NULL);
654 import_cachep = cfs_mem_cache_create("ll_import_cache",
655 sizeof(struct obd_import),
660 LASSERT(capa_cachep == NULL);
661 capa_cachep = cfs_mem_cache_create("capa_cache",
662 sizeof(struct obd_capa), 0, 0);
668 obd_cleanup_caches();
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
676 struct obd_export *export;
680 CDEBUG(D_CACHE, "looking for null handle\n");
684 if (conn->cookie == -1) { /* this means assign a new connection */
685 CDEBUG(D_CACHE, "want a new connection\n");
689 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690 export = class_handle2object(conn->cookie);
693 EXPORT_SYMBOL(class_conn2export);
695 struct obd_device *class_exp2obd(struct obd_export *exp)
701 EXPORT_SYMBOL(class_exp2obd);
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
705 struct obd_export *export;
706 export = class_conn2export(conn);
708 struct obd_device *obd = export->exp_obd;
709 class_export_put(export);
714 EXPORT_SYMBOL(class_conn2obd);
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
718 struct obd_device *obd = exp->exp_obd;
721 return obd->u.cli.cl_import;
723 EXPORT_SYMBOL(class_exp2cliimp);
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
727 struct obd_device *obd = class_conn2obd(conn);
730 return obd->u.cli.cl_import;
732 EXPORT_SYMBOL(class_conn2cliimp);
734 /* Export management functions */
736 /* if export is involved in recovery then clean up related things */
737 void class_export_recovery_cleanup(struct obd_export *exp)
739 struct obd_device *obd = exp->exp_obd;
741 cfs_spin_lock(&obd->obd_recovery_task_lock);
742 if (exp->exp_delayed)
743 obd->obd_delayed_clients--;
744 if (obd->obd_recovering && exp->exp_in_recovery) {
745 cfs_spin_lock(&exp->exp_lock);
746 exp->exp_in_recovery = 0;
747 cfs_spin_unlock(&exp->exp_lock);
748 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
749 cfs_atomic_dec(&obd->obd_connected_clients);
751 cfs_spin_unlock(&obd->obd_recovery_task_lock);
752 /** Cleanup req replay fields */
753 if (exp->exp_req_replay_needed) {
754 cfs_spin_lock(&exp->exp_lock);
755 exp->exp_req_replay_needed = 0;
756 cfs_spin_unlock(&exp->exp_lock);
757 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
758 cfs_atomic_dec(&obd->obd_req_replay_clients);
760 /** Cleanup lock replay data */
761 if (exp->exp_lock_replay_needed) {
762 cfs_spin_lock(&exp->exp_lock);
763 exp->exp_lock_replay_needed = 0;
764 cfs_spin_unlock(&exp->exp_lock);
765 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
766 cfs_atomic_dec(&obd->obd_lock_replay_clients);
770 static void class_export_destroy(struct obd_export *exp)
772 struct obd_device *obd = exp->exp_obd;
775 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
776 LASSERT(obd != NULL);
778 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
779 exp->exp_client_uuid.uuid, obd->obd_name);
782 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783 if (exp->exp_connection)
784 ptlrpc_put_connection_superhack(exp->exp_connection);
786 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
787 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
788 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
789 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
790 obd_destroy_export(exp);
791 class_decref(obd, "export", exp);
793 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
797 static void export_handle_addref(void *export)
799 class_export_get(export);
802 struct obd_export *class_export_get(struct obd_export *exp)
804 cfs_atomic_inc(&exp->exp_refcount);
805 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
806 cfs_atomic_read(&exp->exp_refcount));
809 EXPORT_SYMBOL(class_export_get);
811 void class_export_put(struct obd_export *exp)
813 LASSERT(exp != NULL);
814 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
815 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
816 cfs_atomic_read(&exp->exp_refcount) - 1);
818 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
819 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
820 CDEBUG(D_IOCTL, "final put %p/%s\n",
821 exp, exp->exp_client_uuid.uuid);
823 /* release nid stat refererence */
824 lprocfs_exp_cleanup(exp);
825 class_export_recovery_cleanup(exp);
827 obd_zombie_export_add(exp);
830 EXPORT_SYMBOL(class_export_put);
832 /* Creates a new export, adds it to the hash table, and returns a
833 * pointer to it. The refcount is 2: one for the hash reference, and
834 * one for the pointer returned by this function. */
835 struct obd_export *class_new_export(struct obd_device *obd,
836 struct obd_uuid *cluuid)
838 struct obd_export *export;
839 cfs_hash_t *hash = NULL;
843 OBD_ALLOC_PTR(export);
845 return ERR_PTR(-ENOMEM);
847 export->exp_conn_cnt = 0;
848 export->exp_lock_hash = NULL;
849 cfs_atomic_set(&export->exp_refcount, 2);
850 cfs_atomic_set(&export->exp_rpc_count, 0);
851 cfs_atomic_set(&export->exp_cb_count, 0);
852 cfs_atomic_set(&export->exp_locks_count, 0);
853 #if LUSTRE_TRACKS_LOCK_EXP_REFS
854 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
855 cfs_spin_lock_init(&export->exp_locks_list_guard);
857 cfs_atomic_set(&export->exp_replay_count, 0);
858 export->exp_obd = obd;
859 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
860 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
861 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
862 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
863 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
864 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
865 class_handle_hash(&export->exp_handle, export_handle_addref);
866 export->exp_last_request_time = cfs_time_current_sec();
867 cfs_spin_lock_init(&export->exp_lock);
868 cfs_spin_lock_init(&export->exp_rpc_lock);
869 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
870 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
871 cfs_spin_lock_init(&export->exp_bl_list_lock);
872 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
874 export->exp_sp_peer = LUSTRE_SP_ANY;
875 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
876 export->exp_client_uuid = *cluuid;
877 obd_init_export(export);
879 cfs_spin_lock(&obd->obd_dev_lock);
880 /* shouldn't happen, but might race */
881 if (obd->obd_stopping)
882 GOTO(exit_unlock, rc = -ENODEV);
884 hash = cfs_hash_getref(obd->obd_uuid_hash);
886 GOTO(exit_unlock, rc = -ENODEV);
887 cfs_spin_unlock(&obd->obd_dev_lock);
889 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
890 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
892 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
893 obd->obd_name, cluuid->uuid, rc);
894 GOTO(exit_err, rc = -EALREADY);
898 cfs_spin_lock(&obd->obd_dev_lock);
899 if (obd->obd_stopping) {
900 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
901 GOTO(exit_unlock, rc = -ENODEV);
904 class_incref(obd, "export", export);
905 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
906 cfs_list_add_tail(&export->exp_obd_chain_timed,
907 &export->exp_obd->obd_exports_timed);
908 export->exp_obd->obd_num_exports++;
909 cfs_spin_unlock(&obd->obd_dev_lock);
910 cfs_hash_putref(hash);
914 cfs_spin_unlock(&obd->obd_dev_lock);
917 cfs_hash_putref(hash);
918 class_handle_unhash(&export->exp_handle);
919 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
920 obd_destroy_export(export);
921 OBD_FREE_PTR(export);
924 EXPORT_SYMBOL(class_new_export);
926 void class_unlink_export(struct obd_export *exp)
928 class_handle_unhash(&exp->exp_handle);
930 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
931 /* delete an uuid-export hashitem from hashtables */
932 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
933 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
934 &exp->exp_client_uuid,
935 &exp->exp_uuid_hash);
937 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
938 cfs_list_del_init(&exp->exp_obd_chain_timed);
939 exp->exp_obd->obd_num_exports--;
940 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
941 class_export_put(exp);
943 EXPORT_SYMBOL(class_unlink_export);
945 /* Import management functions */
946 void class_import_destroy(struct obd_import *imp)
950 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
951 imp->imp_obd->obd_name);
953 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
955 ptlrpc_put_connection_superhack(imp->imp_connection);
957 while (!cfs_list_empty(&imp->imp_conn_list)) {
958 struct obd_import_conn *imp_conn;
960 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
961 struct obd_import_conn, oic_item);
962 cfs_list_del_init(&imp_conn->oic_item);
963 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
964 OBD_FREE(imp_conn, sizeof(*imp_conn));
967 LASSERT(imp->imp_sec == NULL);
968 class_decref(imp->imp_obd, "import", imp);
969 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
973 static void import_handle_addref(void *import)
975 class_import_get(import);
978 struct obd_import *class_import_get(struct obd_import *import)
980 cfs_atomic_inc(&import->imp_refcount);
981 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
982 cfs_atomic_read(&import->imp_refcount),
983 import->imp_obd->obd_name);
986 EXPORT_SYMBOL(class_import_get);
988 void class_import_put(struct obd_import *imp)
992 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
993 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
995 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
996 cfs_atomic_read(&imp->imp_refcount) - 1,
997 imp->imp_obd->obd_name);
999 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1000 CDEBUG(D_INFO, "final put import %p\n", imp);
1001 obd_zombie_import_add(imp);
1006 EXPORT_SYMBOL(class_import_put);
1008 static void init_imp_at(struct imp_at *at) {
1010 at_init(&at->iat_net_latency, 0, 0);
1011 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1012 /* max service estimates are tracked on the server side, so
1013 don't use the AT history here, just use the last reported
1014 val. (But keep hist for proc histogram, worst_ever) */
1015 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1020 struct obd_import *class_new_import(struct obd_device *obd)
1022 struct obd_import *imp;
1024 OBD_ALLOC(imp, sizeof(*imp));
1028 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1029 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1030 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1031 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1032 cfs_spin_lock_init(&imp->imp_lock);
1033 imp->imp_last_success_conn = 0;
1034 imp->imp_state = LUSTRE_IMP_NEW;
1035 imp->imp_obd = class_incref(obd, "import", imp);
1036 cfs_mutex_init(&imp->imp_sec_mutex);
1037 cfs_waitq_init(&imp->imp_recovery_waitq);
1039 cfs_atomic_set(&imp->imp_refcount, 2);
1040 cfs_atomic_set(&imp->imp_unregistering, 0);
1041 cfs_atomic_set(&imp->imp_inflight, 0);
1042 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1043 cfs_atomic_set(&imp->imp_inval_count, 0);
1044 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1045 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1046 class_handle_hash(&imp->imp_handle, import_handle_addref);
1047 init_imp_at(&imp->imp_at);
1049 /* the default magic is V2, will be used in connect RPC, and
1050 * then adjusted according to the flags in request/reply. */
1051 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1055 EXPORT_SYMBOL(class_new_import);
1057 void class_destroy_import(struct obd_import *import)
1059 LASSERT(import != NULL);
1060 LASSERT(import != LP_POISON);
1062 class_handle_unhash(&import->imp_handle);
1064 cfs_spin_lock(&import->imp_lock);
1065 import->imp_generation++;
1066 cfs_spin_unlock(&import->imp_lock);
1067 class_import_put(import);
1069 EXPORT_SYMBOL(class_destroy_import);
1071 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1073 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1075 cfs_spin_lock(&exp->exp_locks_list_guard);
1077 LASSERT(lock->l_exp_refs_nr >= 0);
1079 if (lock->l_exp_refs_target != NULL &&
1080 lock->l_exp_refs_target != exp) {
1081 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1082 exp, lock, lock->l_exp_refs_target);
1084 if ((lock->l_exp_refs_nr ++) == 0) {
1085 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1086 lock->l_exp_refs_target = exp;
1088 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1089 lock, exp, lock->l_exp_refs_nr);
1090 cfs_spin_unlock(&exp->exp_locks_list_guard);
1092 EXPORT_SYMBOL(__class_export_add_lock_ref);
1094 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1096 cfs_spin_lock(&exp->exp_locks_list_guard);
1097 LASSERT(lock->l_exp_refs_nr > 0);
1098 if (lock->l_exp_refs_target != exp) {
1099 LCONSOLE_WARN("lock %p, "
1100 "mismatching export pointers: %p, %p\n",
1101 lock, lock->l_exp_refs_target, exp);
1103 if (-- lock->l_exp_refs_nr == 0) {
1104 cfs_list_del_init(&lock->l_exp_refs_link);
1105 lock->l_exp_refs_target = NULL;
1107 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1108 lock, exp, lock->l_exp_refs_nr);
1109 cfs_spin_unlock(&exp->exp_locks_list_guard);
1111 EXPORT_SYMBOL(__class_export_del_lock_ref);
1114 /* A connection defines an export context in which preallocation can
1115 be managed. This releases the export pointer reference, and returns
1116 the export handle, so the export refcount is 1 when this function
1118 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1119 struct obd_uuid *cluuid)
1121 struct obd_export *export;
1122 LASSERT(conn != NULL);
1123 LASSERT(obd != NULL);
1124 LASSERT(cluuid != NULL);
1127 export = class_new_export(obd, cluuid);
1129 RETURN(PTR_ERR(export));
1131 conn->cookie = export->exp_handle.h_cookie;
1132 class_export_put(export);
1134 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1135 cluuid->uuid, conn->cookie);
1138 EXPORT_SYMBOL(class_connect);
1141 /* This function removes 1-3 references from the export:
1142 * 1 - for export pointer passed
1143 * and if disconnect really need
1144 * 2 - removing from hash
1145 * 3 - in client_unlink_export
1146 * The export pointer passed to this function can destroyed */
1147 int class_disconnect(struct obd_export *export)
1149 int already_disconnected;
1152 if (export == NULL) {
1153 CWARN("attempting to free NULL export %p\n", export);
1157 cfs_spin_lock(&export->exp_lock);
1158 already_disconnected = export->exp_disconnected;
1159 export->exp_disconnected = 1;
1160 cfs_spin_unlock(&export->exp_lock);
1162 /* class_cleanup(), abort_recovery(), and class_fail_export()
1163 * all end up in here, and if any of them race we shouldn't
1164 * call extra class_export_puts(). */
1165 if (already_disconnected) {
1166 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1167 GOTO(no_disconn, already_disconnected);
1170 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1171 export->exp_handle.h_cookie);
1173 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1174 cfs_hash_del(export->exp_obd->obd_nid_hash,
1175 &export->exp_connection->c_peer.nid,
1176 &export->exp_nid_hash);
1178 class_unlink_export(export);
1180 class_export_put(export);
1183 EXPORT_SYMBOL(class_disconnect);
1185 /* Return non-zero for a fully connected export */
1186 int class_connected_export(struct obd_export *exp)
1190 cfs_spin_lock(&exp->exp_lock);
1191 connected = (exp->exp_conn_cnt > 0);
1192 cfs_spin_unlock(&exp->exp_lock);
1197 EXPORT_SYMBOL(class_connected_export);
1199 static void class_disconnect_export_list(cfs_list_t *list,
1200 enum obd_option flags)
1203 struct obd_export *exp;
1206 /* It's possible that an export may disconnect itself, but
1207 * nothing else will be added to this list. */
1208 while (!cfs_list_empty(list)) {
1209 exp = cfs_list_entry(list->next, struct obd_export,
1211 /* need for safe call CDEBUG after obd_disconnect */
1212 class_export_get(exp);
1214 cfs_spin_lock(&exp->exp_lock);
1215 exp->exp_flags = flags;
1216 cfs_spin_unlock(&exp->exp_lock);
1218 if (obd_uuid_equals(&exp->exp_client_uuid,
1219 &exp->exp_obd->obd_uuid)) {
1221 "exp %p export uuid == obd uuid, don't discon\n",
1223 /* Need to delete this now so we don't end up pointing
1224 * to work_list later when this export is cleaned up. */
1225 cfs_list_del_init(&exp->exp_obd_chain);
1226 class_export_put(exp);
1230 class_export_get(exp);
1231 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1232 "last request at "CFS_TIME_T"\n",
1233 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1234 exp, exp->exp_last_request_time);
1235 /* release one export reference anyway */
1236 rc = obd_disconnect(exp);
1238 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1239 obd_export_nid2str(exp), exp, rc);
1240 class_export_put(exp);
1245 void class_disconnect_exports(struct obd_device *obd)
1247 cfs_list_t work_list;
1250 /* Move all of the exports from obd_exports to a work list, en masse. */
1251 CFS_INIT_LIST_HEAD(&work_list);
1252 cfs_spin_lock(&obd->obd_dev_lock);
1253 cfs_list_splice_init(&obd->obd_exports, &work_list);
1254 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1255 cfs_spin_unlock(&obd->obd_dev_lock);
1257 if (!cfs_list_empty(&work_list)) {
1258 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1259 "disconnecting them\n", obd->obd_minor, obd);
1260 class_disconnect_export_list(&work_list,
1261 exp_flags_from_obd(obd));
1263 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1264 obd->obd_minor, obd);
1267 EXPORT_SYMBOL(class_disconnect_exports);
1269 /* Remove exports that have not completed recovery.
1271 void class_disconnect_stale_exports(struct obd_device *obd,
1272 int (*test_export)(struct obd_export *))
1274 cfs_list_t work_list;
1275 cfs_list_t *pos, *n;
1276 struct obd_export *exp;
1280 CFS_INIT_LIST_HEAD(&work_list);
1281 cfs_spin_lock(&obd->obd_dev_lock);
1282 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1285 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1287 /* don't count self-export as client */
1288 if (obd_uuid_equals(&exp->exp_client_uuid,
1289 &exp->exp_obd->obd_uuid))
1292 if (test_export(exp))
1295 cfs_spin_lock(&exp->exp_lock);
1296 failed = exp->exp_failed;
1297 exp->exp_failed = 1;
1298 cfs_spin_unlock(&exp->exp_lock);
1302 cfs_list_move(&exp->exp_obd_chain, &work_list);
1304 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1305 obd->obd_name, exp->exp_client_uuid.uuid,
1306 exp->exp_connection == NULL ? "<unknown>" :
1307 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1308 print_export_data(exp, "EVICTING", 0);
1310 cfs_spin_unlock(&obd->obd_dev_lock);
1313 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1314 obd->obd_name, evicted);
1315 obd->obd_stale_clients += evicted;
1317 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1318 OBD_OPT_ABORT_RECOV);
1321 EXPORT_SYMBOL(class_disconnect_stale_exports);
1323 void class_fail_export(struct obd_export *exp)
1325 int rc, already_failed;
1327 cfs_spin_lock(&exp->exp_lock);
1328 already_failed = exp->exp_failed;
1329 exp->exp_failed = 1;
1330 cfs_spin_unlock(&exp->exp_lock);
1332 if (already_failed) {
1333 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1334 exp, exp->exp_client_uuid.uuid);
1338 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1339 exp, exp->exp_client_uuid.uuid);
1341 if (obd_dump_on_timeout)
1342 libcfs_debug_dumplog();
1344 /* need for safe call CDEBUG after obd_disconnect */
1345 class_export_get(exp);
1347 /* Most callers into obd_disconnect are removing their own reference
1348 * (request, for example) in addition to the one from the hash table.
1349 * We don't have such a reference here, so make one. */
1350 class_export_get(exp);
1351 rc = obd_disconnect(exp);
1353 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1355 CDEBUG(D_HA, "disconnected export %p/%s\n",
1356 exp, exp->exp_client_uuid.uuid);
1357 class_export_put(exp);
1359 EXPORT_SYMBOL(class_fail_export);
1361 char *obd_export_nid2str(struct obd_export *exp)
1363 if (exp->exp_connection != NULL)
1364 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1368 EXPORT_SYMBOL(obd_export_nid2str);
1370 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1372 struct obd_export *doomed_exp = NULL;
1373 int exports_evicted = 0;
1375 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1378 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1379 if (doomed_exp == NULL)
1382 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1383 "nid %s found, wanted nid %s, requested nid %s\n",
1384 obd_export_nid2str(doomed_exp),
1385 libcfs_nid2str(nid_key), nid);
1386 LASSERTF(doomed_exp != obd->obd_self_export,
1387 "self-export is hashed by NID?\n");
1389 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1390 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1392 class_fail_export(doomed_exp);
1393 class_export_put(doomed_exp);
1396 if (!exports_evicted)
1397 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1398 obd->obd_name, nid);
1399 return exports_evicted;
1401 EXPORT_SYMBOL(obd_export_evict_by_nid);
1403 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1405 struct obd_export *doomed_exp = NULL;
1406 struct obd_uuid doomed_uuid;
1407 int exports_evicted = 0;
1409 obd_str2uuid(&doomed_uuid, uuid);
1410 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1411 CERROR("%s: can't evict myself\n", obd->obd_name);
1412 return exports_evicted;
1415 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1417 if (doomed_exp == NULL) {
1418 CERROR("%s: can't disconnect %s: no exports found\n",
1419 obd->obd_name, uuid);
1421 CWARN("%s: evicting %s at adminstrative request\n",
1422 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1423 class_fail_export(doomed_exp);
1424 class_export_put(doomed_exp);
1428 return exports_evicted;
1430 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1432 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1433 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1434 EXPORT_SYMBOL(class_export_dump_hook);
1437 static void print_export_data(struct obd_export *exp, const char *status,
1440 struct ptlrpc_reply_state *rs;
1441 struct ptlrpc_reply_state *first_reply = NULL;
1444 cfs_spin_lock(&exp->exp_lock);
1445 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1451 cfs_spin_unlock(&exp->exp_lock);
1453 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1454 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1455 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1456 cfs_atomic_read(&exp->exp_rpc_count),
1457 cfs_atomic_read(&exp->exp_cb_count),
1458 cfs_atomic_read(&exp->exp_locks_count),
1459 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1460 nreplies, first_reply, nreplies > 3 ? "..." : "",
1461 exp->exp_last_committed);
1462 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1463 if (locks && class_export_dump_hook != NULL)
1464 class_export_dump_hook(exp);
1468 void dump_exports(struct obd_device *obd, int locks)
1470 struct obd_export *exp;
1472 cfs_spin_lock(&obd->obd_dev_lock);
1473 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1474 print_export_data(exp, "ACTIVE", locks);
1475 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1476 print_export_data(exp, "UNLINKED", locks);
1477 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1478 print_export_data(exp, "DELAYED", locks);
1479 cfs_spin_unlock(&obd->obd_dev_lock);
1480 cfs_spin_lock(&obd_zombie_impexp_lock);
1481 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1482 print_export_data(exp, "ZOMBIE", locks);
1483 cfs_spin_unlock(&obd_zombie_impexp_lock);
1485 EXPORT_SYMBOL(dump_exports);
1487 void obd_exports_barrier(struct obd_device *obd)
1490 LASSERT(cfs_list_empty(&obd->obd_exports));
1491 cfs_spin_lock(&obd->obd_dev_lock);
1492 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1493 cfs_spin_unlock(&obd->obd_dev_lock);
1494 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1495 cfs_time_seconds(waited));
1496 if (waited > 5 && IS_PO2(waited)) {
1497 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1498 "more than %d seconds. "
1499 "The obd refcount = %d. Is it stuck?\n",
1500 obd->obd_name, waited,
1501 cfs_atomic_read(&obd->obd_refcount));
1502 dump_exports(obd, 1);
1505 cfs_spin_lock(&obd->obd_dev_lock);
1507 cfs_spin_unlock(&obd->obd_dev_lock);
1509 EXPORT_SYMBOL(obd_exports_barrier);
1511 /* Total amount of zombies to be destroyed */
1512 static int zombies_count = 0;
1515 * kill zombie imports and exports
1517 void obd_zombie_impexp_cull(void)
1519 struct obd_import *import;
1520 struct obd_export *export;
1524 cfs_spin_lock(&obd_zombie_impexp_lock);
1527 if (!cfs_list_empty(&obd_zombie_imports)) {
1528 import = cfs_list_entry(obd_zombie_imports.next,
1531 cfs_list_del_init(&import->imp_zombie_chain);
1535 if (!cfs_list_empty(&obd_zombie_exports)) {
1536 export = cfs_list_entry(obd_zombie_exports.next,
1539 cfs_list_del_init(&export->exp_obd_chain);
1542 cfs_spin_unlock(&obd_zombie_impexp_lock);
1544 if (import != NULL) {
1545 class_import_destroy(import);
1546 cfs_spin_lock(&obd_zombie_impexp_lock);
1548 cfs_spin_unlock(&obd_zombie_impexp_lock);
1551 if (export != NULL) {
1552 class_export_destroy(export);
1553 cfs_spin_lock(&obd_zombie_impexp_lock);
1555 cfs_spin_unlock(&obd_zombie_impexp_lock);
1559 } while (import != NULL || export != NULL);
1563 static cfs_completion_t obd_zombie_start;
1564 static cfs_completion_t obd_zombie_stop;
1565 static unsigned long obd_zombie_flags;
1566 static cfs_waitq_t obd_zombie_waitq;
1567 static pid_t obd_zombie_pid;
1570 OBD_ZOMBIE_STOP = 1 << 1
1574 * check for work for kill zombie import/export thread.
1576 static int obd_zombie_impexp_check(void *arg)
1580 cfs_spin_lock(&obd_zombie_impexp_lock);
1581 rc = (zombies_count == 0) &&
1582 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1583 cfs_spin_unlock(&obd_zombie_impexp_lock);
1589 * Add export to the obd_zombe thread and notify it.
1591 static void obd_zombie_export_add(struct obd_export *exp) {
1592 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1593 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1594 cfs_list_del_init(&exp->exp_obd_chain);
1595 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1596 cfs_spin_lock(&obd_zombie_impexp_lock);
1598 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1599 cfs_spin_unlock(&obd_zombie_impexp_lock);
1601 obd_zombie_impexp_notify();
1605 * Add import to the obd_zombe thread and notify it.
1607 static void obd_zombie_import_add(struct obd_import *imp) {
1608 LASSERT(imp->imp_sec == NULL);
1609 cfs_spin_lock(&obd_zombie_impexp_lock);
1610 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1612 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1613 cfs_spin_unlock(&obd_zombie_impexp_lock);
1615 obd_zombie_impexp_notify();
1619 * notify import/export destroy thread about new zombie.
1621 static void obd_zombie_impexp_notify(void)
1624 * Make sure obd_zomebie_impexp_thread get this notification.
1625 * It is possible this signal only get by obd_zombie_barrier, and
1626 * barrier gulps this notification and sleeps away and hangs ensues
1628 cfs_waitq_broadcast(&obd_zombie_waitq);
1632 * check whether obd_zombie is idle
1634 static int obd_zombie_is_idle(void)
1638 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1639 cfs_spin_lock(&obd_zombie_impexp_lock);
1640 rc = (zombies_count == 0);
1641 cfs_spin_unlock(&obd_zombie_impexp_lock);
1646 * wait when obd_zombie import/export queues become empty
1648 void obd_zombie_barrier(void)
1650 struct l_wait_info lwi = { 0 };
1652 if (obd_zombie_pid == cfs_curproc_pid())
1653 /* don't wait for myself */
1655 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1657 EXPORT_SYMBOL(obd_zombie_barrier);
1662 * destroy zombie export/import thread.
1664 static int obd_zombie_impexp_thread(void *unused)
1668 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1669 cfs_complete(&obd_zombie_start);
1673 cfs_complete(&obd_zombie_start);
1675 obd_zombie_pid = cfs_curproc_pid();
1677 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1678 struct l_wait_info lwi = { 0 };
1680 l_wait_event(obd_zombie_waitq,
1681 !obd_zombie_impexp_check(NULL), &lwi);
1682 obd_zombie_impexp_cull();
1685 * Notify obd_zombie_barrier callers that queues
1688 cfs_waitq_signal(&obd_zombie_waitq);
1691 cfs_complete(&obd_zombie_stop);
1696 #else /* ! KERNEL */
1698 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1699 static void *obd_zombie_impexp_work_cb;
1700 static void *obd_zombie_impexp_idle_cb;
1702 int obd_zombie_impexp_kill(void *arg)
1706 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1707 obd_zombie_impexp_cull();
1710 cfs_atomic_dec(&zombie_recur);
1717 * start destroy zombie import/export thread
1719 int obd_zombie_impexp_init(void)
1723 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1724 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1725 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1726 cfs_init_completion(&obd_zombie_start);
1727 cfs_init_completion(&obd_zombie_stop);
1728 cfs_waitq_init(&obd_zombie_waitq);
1732 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1736 cfs_wait_for_completion(&obd_zombie_start);
1739 obd_zombie_impexp_work_cb =
1740 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1741 &obd_zombie_impexp_kill, NULL);
1743 obd_zombie_impexp_idle_cb =
1744 liblustre_register_idle_callback("obd_zombi_impexp_check",
1745 &obd_zombie_impexp_check, NULL);
1751 * stop destroy zombie import/export thread
1753 void obd_zombie_impexp_stop(void)
1755 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1756 obd_zombie_impexp_notify();
1758 cfs_wait_for_completion(&obd_zombie_stop);
1760 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1761 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1765 /***** Kernel-userspace comm helpers *******/
1767 /* Get length of entire message, including header */
1768 int kuc_len(int payload_len)
1770 return sizeof(struct kuc_hdr) + payload_len;
1772 EXPORT_SYMBOL(kuc_len);
1774 /* Get a pointer to kuc header, given a ptr to the payload
1775 * @param p Pointer to payload area
1776 * @returns Pointer to kuc header
1778 struct kuc_hdr * kuc_ptr(void *p)
1780 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1781 LASSERT(lh->kuc_magic == KUC_MAGIC);
1784 EXPORT_SYMBOL(kuc_ptr);
1786 /* Test if payload is part of kuc message
1787 * @param p Pointer to payload area
1790 int kuc_ispayload(void *p)
1792 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1794 if (kh->kuc_magic == KUC_MAGIC)
1799 EXPORT_SYMBOL(kuc_ispayload);
1801 /* Alloc space for a message, and fill in header
1802 * @return Pointer to payload area
1804 void *kuc_alloc(int payload_len, int transport, int type)
1807 int len = kuc_len(payload_len);
1811 return ERR_PTR(-ENOMEM);
1813 lh->kuc_magic = KUC_MAGIC;
1814 lh->kuc_transport = transport;
1815 lh->kuc_msgtype = type;
1816 lh->kuc_msglen = len;
1818 return (void *)(lh + 1);
1820 EXPORT_SYMBOL(kuc_alloc);
1822 /* Takes pointer to payload area */
1823 inline void kuc_free(void *p, int payload_len)
1825 struct kuc_hdr *lh = kuc_ptr(p);
1826 OBD_FREE(lh, kuc_len(payload_len));
1828 EXPORT_SYMBOL(kuc_free);