1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/obdclass/genops.c
40 * These are the only exported functions, they provide some generic
41 * infrastructure for managing object devices
44 #define DEBUG_SUBSYSTEM S_CLASS
46 #include <liblustre.h>
49 #include <obd_class.h>
50 #include <lprocfs_status.h>
52 extern cfs_list_t obd_types;
53 cfs_spinlock_t obd_types_lock;
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
60 cfs_list_t obd_zombie_imports;
61 cfs_list_t obd_zombie_exports;
62 cfs_spinlock_t obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64 static void obd_zombie_export_add(struct obd_export *exp);
65 static void obd_zombie_import_add(struct obd_import *imp);
66 static void print_export_data(struct obd_export *exp,
67 const char *status, int locks);
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 * support functions: we could use inter-module communication, but this
73 * is more portable to other OS's
75 static struct obd_device *obd_device_alloc(void)
77 struct obd_device *obd;
79 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81 obd->obd_magic = OBD_DEVICE_MAGIC;
86 static void obd_device_free(struct obd_device *obd)
89 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91 if (obd->obd_namespace != NULL) {
92 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93 obd, obd->obd_namespace, obd->obd_force);
96 lu_ref_fini(&obd->obd_reference);
97 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 struct obd_type *class_search_type(const char *name)
103 struct obd_type *type;
105 cfs_spin_lock(&obd_types_lock);
106 cfs_list_for_each(tmp, &obd_types) {
107 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
108 if (strcmp(type->typ_name, name) == 0) {
109 cfs_spin_unlock(&obd_types_lock);
113 cfs_spin_unlock(&obd_types_lock);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
124 if (!cfs_request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 cfs_spin_lock(&type->obd_type_lock);
136 cfs_try_module_get(type->typ_dt_ops->o_owner);
137 cfs_spin_unlock(&type->obd_type_lock);
141 EXPORT_SYMBOL(class_get_type);
143 void class_put_type(struct obd_type *type)
146 cfs_spin_lock(&type->obd_type_lock);
148 cfs_module_put(type->typ_dt_ops->o_owner);
149 cfs_spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_put_type);
153 #define CLASS_MAX_NAME 1024
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156 struct lprocfs_vars *vars, const char *name,
157 struct lu_device_type *ldt)
159 struct obd_type *type;
164 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166 if (class_search_type(name)) {
167 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172 OBD_ALLOC(type, sizeof(*type));
176 OBD_ALLOC_PTR(type->typ_dt_ops);
177 OBD_ALLOC_PTR(type->typ_md_ops);
178 OBD_ALLOC(type->typ_name, strlen(name) + 1);
180 if (type->typ_dt_ops == NULL ||
181 type->typ_md_ops == NULL ||
182 type->typ_name == NULL)
185 *(type->typ_dt_ops) = *dt_ops;
186 /* md_ops is optional */
188 *(type->typ_md_ops) = *md_ops;
189 strcpy(type->typ_name, name);
190 cfs_spin_lock_init(&type->obd_type_lock);
193 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195 if (IS_ERR(type->typ_procroot)) {
196 rc = PTR_ERR(type->typ_procroot);
197 type->typ_procroot = NULL;
203 rc = lu_device_type_init(ldt);
208 cfs_spin_lock(&obd_types_lock);
209 cfs_list_add(&type->typ_chain, &obd_types);
210 cfs_spin_unlock(&obd_types_lock);
215 if (type->typ_name != NULL)
216 OBD_FREE(type->typ_name, strlen(name) + 1);
217 if (type->typ_md_ops != NULL)
218 OBD_FREE_PTR(type->typ_md_ops);
219 if (type->typ_dt_ops != NULL)
220 OBD_FREE_PTR(type->typ_dt_ops);
221 OBD_FREE(type, sizeof(*type));
224 EXPORT_SYMBOL(class_register_type);
226 int class_unregister_type(const char *name)
228 struct obd_type *type = class_search_type(name);
232 CERROR("unknown obd type\n");
236 if (type->typ_refcnt) {
237 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238 /* This is a bad situation, let's make the best of it */
239 /* Remove ops, but leave the name for debugging */
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE_PTR(type->typ_md_ops);
245 if (type->typ_procroot) {
246 lprocfs_remove(&type->typ_procroot);
250 lu_device_type_fini(type->typ_lu);
252 cfs_spin_lock(&obd_types_lock);
253 cfs_list_del(&type->typ_chain);
254 cfs_spin_unlock(&obd_types_lock);
255 OBD_FREE(type->typ_name, strlen(name) + 1);
256 if (type->typ_dt_ops != NULL)
257 OBD_FREE_PTR(type->typ_dt_ops);
258 if (type->typ_md_ops != NULL)
259 OBD_FREE_PTR(type->typ_md_ops);
260 OBD_FREE(type, sizeof(*type));
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
266 * Create a new obd device.
268 * Find an empty slot in ::obd_devs[], create a new obd device in it.
270 * \param[in] type_name obd device type string.
271 * \param[in] name obd device name.
273 * \retval NULL if create fails, otherwise return the obd device
276 struct obd_device *class_newdev(const char *type_name, const char *name)
278 struct obd_device *result = NULL;
279 struct obd_device *newdev;
280 struct obd_type *type = NULL;
282 int new_obd_minor = 0;
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 RETURN(ERR_PTR(-EINVAL));
290 type = class_get_type(type_name);
292 CERROR("OBD: unknown type: %s\n", type_name);
293 RETURN(ERR_PTR(-ENODEV));
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 class_put_type(type);
299 RETURN(ERR_PTR(-ENOMEM));
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303 cfs_write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
307 if (obd && obd->obd_name &&
308 (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
312 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313 "%p obd_magic %08x != %08x\n", result,
314 result->obd_magic, OBD_DEVICE_MAGIC);
315 LASSERTF(result->obd_minor == new_obd_minor,
316 "%p obd_minor %d != %d\n", result,
317 result->obd_minor, new_obd_minor);
319 obd_devs[result->obd_minor] = NULL;
320 result->obd_name[0]='\0';
322 result = ERR_PTR(-EEXIST);
325 if (!result && !obd) {
327 result->obd_minor = i;
329 result->obd_type = type;
330 strncpy(result->obd_name, name,
331 sizeof(result->obd_name) - 1);
332 obd_devs[i] = result;
335 cfs_write_unlock(&obd_dev_lock);
337 if (result == NULL && i >= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 RETURN(ERR_PTR(-EOVERFLOW));
343 if (IS_ERR(result)) {
344 obd_device_free(newdev);
345 class_put_type(type);
347 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348 result->obd_name, result);
353 void class_release_dev(struct obd_device *obd)
355 struct obd_type *obd_type = obd->obd_type;
357 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361 LASSERT(obd_type != NULL);
363 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
366 cfs_write_lock(&obd_dev_lock);
367 obd_devs[obd->obd_minor] = NULL;
368 cfs_write_unlock(&obd_dev_lock);
369 obd_device_free(obd);
371 class_put_type(obd_type);
374 int class_name2dev(const char *name)
381 cfs_read_lock(&obd_dev_lock);
382 for (i = 0; i < class_devno_max(); i++) {
383 struct obd_device *obd = class_num2obd(i);
385 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386 /* Make sure we finished attaching before we give
387 out any references */
388 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389 if (obd->obd_attached) {
390 cfs_read_unlock(&obd_dev_lock);
396 cfs_read_unlock(&obd_dev_lock);
400 EXPORT_SYMBOL(class_name2dev);
402 struct obd_device *class_name2obd(const char *name)
404 int dev = class_name2dev(name);
406 if (dev < 0 || dev > class_devno_max())
408 return class_num2obd(dev);
410 EXPORT_SYMBOL(class_name2obd);
412 int class_uuid2dev(struct obd_uuid *uuid)
416 cfs_read_lock(&obd_dev_lock);
417 for (i = 0; i < class_devno_max(); i++) {
418 struct obd_device *obd = class_num2obd(i);
420 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422 cfs_read_unlock(&obd_dev_lock);
426 cfs_read_unlock(&obd_dev_lock);
430 EXPORT_SYMBOL(class_uuid2dev);
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
434 int dev = class_uuid2dev(uuid);
437 return class_num2obd(dev);
439 EXPORT_SYMBOL(class_uuid2obd);
442 * Get obd device from ::obd_devs[]
444 * \param num [in] array index
446 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447 * otherwise return the obd device there.
449 struct obd_device *class_num2obd(int num)
451 struct obd_device *obd = NULL;
453 if (num < class_devno_max()) {
458 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459 "%p obd_magic %08x != %08x\n",
460 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461 LASSERTF(obd->obd_minor == num,
462 "%p obd_minor %0d != %0d\n",
463 obd, obd->obd_minor, num);
468 EXPORT_SYMBOL(class_num2obd);
470 void class_obd_list(void)
475 cfs_read_lock(&obd_dev_lock);
476 for (i = 0; i < class_devno_max(); i++) {
477 struct obd_device *obd = class_num2obd(i);
481 if (obd->obd_stopping)
483 else if (obd->obd_set_up)
485 else if (obd->obd_attached)
489 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490 i, status, obd->obd_type->typ_name,
491 obd->obd_name, obd->obd_uuid.uuid,
492 cfs_atomic_read(&obd->obd_refcount));
494 cfs_read_unlock(&obd_dev_lock);
498 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
499 specified, then only the client with that uuid is returned,
500 otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502 const char * typ_name,
503 struct obd_uuid *grp_uuid)
507 cfs_read_lock(&obd_dev_lock);
508 for (i = 0; i < class_devno_max(); i++) {
509 struct obd_device *obd = class_num2obd(i);
513 if ((strncmp(obd->obd_type->typ_name, typ_name,
514 strlen(typ_name)) == 0)) {
515 if (obd_uuid_equals(tgt_uuid,
516 &obd->u.cli.cl_target_uuid) &&
517 ((grp_uuid)? obd_uuid_equals(grp_uuid,
518 &obd->obd_uuid) : 1)) {
519 cfs_read_unlock(&obd_dev_lock);
524 cfs_read_unlock(&obd_dev_lock);
528 EXPORT_SYMBOL(class_find_client_obd);
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531 searching at *next, and if a device is found, the next index to look
532 at is saved in *next. If next is NULL, then the first matching device
533 will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
540 else if (*next >= 0 && *next < class_devno_max())
545 cfs_read_lock(&obd_dev_lock);
546 for (; i < class_devno_max(); i++) {
547 struct obd_device *obd = class_num2obd(i);
551 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
554 cfs_read_unlock(&obd_dev_lock);
558 cfs_read_unlock(&obd_dev_lock);
562 EXPORT_SYMBOL(class_devices_in_group);
565 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566 * adjust sptlrpc settings accordingly.
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
570 struct obd_device *obd;
574 LASSERT(namelen > 0);
576 cfs_read_lock(&obd_dev_lock);
577 for (i = 0; i < class_devno_max(); i++) {
578 obd = class_num2obd(i);
580 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
583 /* only notify mdc, osc, mdt, ost */
584 type = obd->obd_type->typ_name;
585 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588 strcmp(type, LUSTRE_OST_NAME) != 0)
591 if (strncmp(obd->obd_name, fsname, namelen))
594 class_incref(obd, __FUNCTION__, obd);
595 cfs_read_unlock(&obd_dev_lock);
596 rc2 = obd_set_info_async(obd->obd_self_export,
597 sizeof(KEY_SPTLRPC_CONF),
598 KEY_SPTLRPC_CONF, 0, NULL, NULL);
600 class_decref(obd, __FUNCTION__, obd);
601 cfs_read_lock(&obd_dev_lock);
603 cfs_read_unlock(&obd_dev_lock);
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
608 void obd_cleanup_caches(void)
613 if (obd_device_cachep) {
614 rc = cfs_mem_cache_destroy(obd_device_cachep);
615 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616 obd_device_cachep = NULL;
619 rc = cfs_mem_cache_destroy(obdo_cachep);
620 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
624 rc = cfs_mem_cache_destroy(import_cachep);
625 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626 import_cachep = NULL;
629 rc = cfs_mem_cache_destroy(capa_cachep);
630 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
636 int obd_init_caches(void)
640 LASSERT(obd_device_cachep == NULL);
641 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642 sizeof(struct obd_device),
644 if (!obd_device_cachep)
647 LASSERT(obdo_cachep == NULL);
648 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
653 LASSERT(import_cachep == NULL);
654 import_cachep = cfs_mem_cache_create("ll_import_cache",
655 sizeof(struct obd_import),
660 LASSERT(capa_cachep == NULL);
661 capa_cachep = cfs_mem_cache_create("capa_cache",
662 sizeof(struct obd_capa), 0, 0);
668 obd_cleanup_caches();
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
676 struct obd_export *export;
680 CDEBUG(D_CACHE, "looking for null handle\n");
684 if (conn->cookie == -1) { /* this means assign a new connection */
685 CDEBUG(D_CACHE, "want a new connection\n");
689 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690 export = class_handle2object(conn->cookie);
693 EXPORT_SYMBOL(class_conn2export);
695 struct obd_device *class_exp2obd(struct obd_export *exp)
701 EXPORT_SYMBOL(class_exp2obd);
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
705 struct obd_export *export;
706 export = class_conn2export(conn);
708 struct obd_device *obd = export->exp_obd;
709 class_export_put(export);
714 EXPORT_SYMBOL(class_conn2obd);
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
718 struct obd_device *obd = exp->exp_obd;
721 return obd->u.cli.cl_import;
723 EXPORT_SYMBOL(class_exp2cliimp);
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
727 struct obd_device *obd = class_conn2obd(conn);
730 return obd->u.cli.cl_import;
732 EXPORT_SYMBOL(class_conn2cliimp);
734 /* Export management functions */
736 /* if export is involved in recovery then clean up related things */
737 void class_export_recovery_cleanup(struct obd_export *exp)
739 struct obd_device *obd = exp->exp_obd;
741 cfs_spin_lock(&obd->obd_recovery_task_lock);
742 if (exp->exp_delayed)
743 obd->obd_delayed_clients--;
744 if (obd->obd_recovering && exp->exp_in_recovery) {
745 cfs_spin_lock(&exp->exp_lock);
746 exp->exp_in_recovery = 0;
747 cfs_spin_unlock(&exp->exp_lock);
748 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
749 cfs_atomic_dec(&obd->obd_connected_clients);
751 cfs_spin_unlock(&obd->obd_recovery_task_lock);
752 /** Cleanup req replay fields */
753 if (exp->exp_req_replay_needed) {
754 cfs_spin_lock(&exp->exp_lock);
755 exp->exp_req_replay_needed = 0;
756 cfs_spin_unlock(&exp->exp_lock);
757 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
758 cfs_atomic_dec(&obd->obd_req_replay_clients);
760 /** Cleanup lock replay data */
761 if (exp->exp_lock_replay_needed) {
762 cfs_spin_lock(&exp->exp_lock);
763 exp->exp_lock_replay_needed = 0;
764 cfs_spin_unlock(&exp->exp_lock);
765 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
766 cfs_atomic_dec(&obd->obd_lock_replay_clients);
770 static void class_export_destroy(struct obd_export *exp)
772 struct obd_device *obd = exp->exp_obd;
775 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
776 LASSERT(obd != NULL);
778 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
779 exp->exp_client_uuid.uuid, obd->obd_name);
782 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783 if (exp->exp_connection)
784 ptlrpc_put_connection_superhack(exp->exp_connection);
786 LASSERT(cfs_list_empty(&exp->exp_flock_wait_list));
787 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
788 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
789 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
790 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
791 obd_destroy_export(exp);
792 class_decref(obd, "export", exp);
794 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
798 static void export_handle_addref(void *export)
800 class_export_get(export);
803 struct obd_export *class_export_get(struct obd_export *exp)
805 cfs_atomic_inc(&exp->exp_refcount);
806 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807 cfs_atomic_read(&exp->exp_refcount));
810 EXPORT_SYMBOL(class_export_get);
812 void class_export_put(struct obd_export *exp)
814 LASSERT(exp != NULL);
815 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817 cfs_atomic_read(&exp->exp_refcount) - 1);
819 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
820 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
821 CDEBUG(D_IOCTL, "final put %p/%s\n",
822 exp, exp->exp_client_uuid.uuid);
824 /* release nid stat refererence */
825 lprocfs_exp_cleanup(exp);
826 class_export_recovery_cleanup(exp);
828 obd_zombie_export_add(exp);
831 EXPORT_SYMBOL(class_export_put);
833 /* Creates a new export, adds it to the hash table, and returns a
834 * pointer to it. The refcount is 2: one for the hash reference, and
835 * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837 struct obd_uuid *cluuid)
839 struct obd_export *export;
840 cfs_hash_t *hash = NULL;
844 OBD_ALLOC_PTR(export);
846 return ERR_PTR(-ENOMEM);
848 export->exp_conn_cnt = 0;
849 export->exp_lock_hash = NULL;
850 cfs_atomic_set(&export->exp_refcount, 2);
851 cfs_atomic_set(&export->exp_rpc_count, 0);
852 cfs_atomic_set(&export->exp_cb_count, 0);
853 cfs_atomic_set(&export->exp_locks_count, 0);
854 #if LUSTRE_TRACKS_LOCK_EXP_REFS
855 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
856 cfs_spin_lock_init(&export->exp_locks_list_guard);
858 cfs_atomic_set(&export->exp_replay_count, 0);
859 export->exp_obd = obd;
860 CFS_INIT_LIST_HEAD(&export->exp_flock_wait_list);
861 cfs_rwlock_init(&export->exp_flock_wait_lock);
862 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
863 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
864 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
865 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
866 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
867 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
868 class_handle_hash(&export->exp_handle, export_handle_addref);
869 export->exp_last_request_time = cfs_time_current_sec();
870 cfs_spin_lock_init(&export->exp_lock);
871 cfs_spin_lock_init(&export->exp_rpc_lock);
872 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
873 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
874 cfs_spin_lock_init(&export->exp_bl_list_lock);
875 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
877 export->exp_sp_peer = LUSTRE_SP_ANY;
878 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
879 export->exp_client_uuid = *cluuid;
880 obd_init_export(export);
882 cfs_spin_lock(&obd->obd_dev_lock);
883 /* shouldn't happen, but might race */
884 if (obd->obd_stopping)
885 GOTO(exit_unlock, rc = -ENODEV);
887 hash = cfs_hash_getref(obd->obd_uuid_hash);
889 GOTO(exit_unlock, rc = -ENODEV);
890 cfs_spin_unlock(&obd->obd_dev_lock);
892 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
893 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
895 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
896 obd->obd_name, cluuid->uuid, rc);
897 GOTO(exit_err, rc = -EALREADY);
901 cfs_spin_lock(&obd->obd_dev_lock);
902 if (obd->obd_stopping) {
903 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904 GOTO(exit_unlock, rc = -ENODEV);
907 class_incref(obd, "export", export);
908 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909 cfs_list_add_tail(&export->exp_obd_chain_timed,
910 &export->exp_obd->obd_exports_timed);
911 export->exp_obd->obd_num_exports++;
912 cfs_spin_unlock(&obd->obd_dev_lock);
913 cfs_hash_putref(hash);
917 cfs_spin_unlock(&obd->obd_dev_lock);
920 cfs_hash_putref(hash);
921 class_handle_unhash(&export->exp_handle);
922 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
923 obd_destroy_export(export);
924 OBD_FREE_PTR(export);
927 EXPORT_SYMBOL(class_new_export);
929 void class_unlink_export(struct obd_export *exp)
931 class_handle_unhash(&exp->exp_handle);
933 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
934 /* delete an uuid-export hashitem from hashtables */
935 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
936 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937 &exp->exp_client_uuid,
938 &exp->exp_uuid_hash);
940 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
941 cfs_list_del_init(&exp->exp_obd_chain_timed);
942 exp->exp_obd->obd_num_exports--;
943 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
944 class_export_put(exp);
946 EXPORT_SYMBOL(class_unlink_export);
948 /* Import management functions */
949 void class_import_destroy(struct obd_import *imp)
953 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
954 imp->imp_obd->obd_name);
956 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
958 ptlrpc_put_connection_superhack(imp->imp_connection);
960 while (!cfs_list_empty(&imp->imp_conn_list)) {
961 struct obd_import_conn *imp_conn;
963 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
964 struct obd_import_conn, oic_item);
965 cfs_list_del_init(&imp_conn->oic_item);
966 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
967 OBD_FREE(imp_conn, sizeof(*imp_conn));
970 LASSERT(imp->imp_sec == NULL);
971 class_decref(imp->imp_obd, "import", imp);
972 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
976 static void import_handle_addref(void *import)
978 class_import_get(import);
981 struct obd_import *class_import_get(struct obd_import *import)
983 cfs_atomic_inc(&import->imp_refcount);
984 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
985 cfs_atomic_read(&import->imp_refcount),
986 import->imp_obd->obd_name);
989 EXPORT_SYMBOL(class_import_get);
991 void class_import_put(struct obd_import *imp)
995 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
996 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
998 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
999 cfs_atomic_read(&imp->imp_refcount) - 1,
1000 imp->imp_obd->obd_name);
1002 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1003 CDEBUG(D_INFO, "final put import %p\n", imp);
1004 obd_zombie_import_add(imp);
1009 EXPORT_SYMBOL(class_import_put);
1011 static void init_imp_at(struct imp_at *at) {
1013 at_init(&at->iat_net_latency, 0, 0);
1014 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1015 /* max service estimates are tracked on the server side, so
1016 don't use the AT history here, just use the last reported
1017 val. (But keep hist for proc histogram, worst_ever) */
1018 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1023 struct obd_import *class_new_import(struct obd_device *obd)
1025 struct obd_import *imp;
1027 OBD_ALLOC(imp, sizeof(*imp));
1031 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1032 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1033 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1034 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1035 cfs_spin_lock_init(&imp->imp_lock);
1036 imp->imp_last_success_conn = 0;
1037 imp->imp_state = LUSTRE_IMP_NEW;
1038 imp->imp_obd = class_incref(obd, "import", imp);
1039 cfs_mutex_init(&imp->imp_sec_mutex);
1040 cfs_waitq_init(&imp->imp_recovery_waitq);
1042 cfs_atomic_set(&imp->imp_refcount, 2);
1043 cfs_atomic_set(&imp->imp_unregistering, 0);
1044 cfs_atomic_set(&imp->imp_inflight, 0);
1045 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1046 cfs_atomic_set(&imp->imp_inval_count, 0);
1047 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1048 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1049 class_handle_hash(&imp->imp_handle, import_handle_addref);
1050 init_imp_at(&imp->imp_at);
1052 /* the default magic is V2, will be used in connect RPC, and
1053 * then adjusted according to the flags in request/reply. */
1054 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1058 EXPORT_SYMBOL(class_new_import);
1060 void class_destroy_import(struct obd_import *import)
1062 LASSERT(import != NULL);
1063 LASSERT(import != LP_POISON);
1065 class_handle_unhash(&import->imp_handle);
1067 cfs_spin_lock(&import->imp_lock);
1068 import->imp_generation++;
1069 cfs_spin_unlock(&import->imp_lock);
1070 class_import_put(import);
1072 EXPORT_SYMBOL(class_destroy_import);
1074 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1076 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1078 cfs_spin_lock(&exp->exp_locks_list_guard);
1080 LASSERT(lock->l_exp_refs_nr >= 0);
1082 if (lock->l_exp_refs_target != NULL &&
1083 lock->l_exp_refs_target != exp) {
1084 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1085 exp, lock, lock->l_exp_refs_target);
1087 if ((lock->l_exp_refs_nr ++) == 0) {
1088 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1089 lock->l_exp_refs_target = exp;
1091 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1092 lock, exp, lock->l_exp_refs_nr);
1093 cfs_spin_unlock(&exp->exp_locks_list_guard);
1095 EXPORT_SYMBOL(__class_export_add_lock_ref);
1097 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1099 cfs_spin_lock(&exp->exp_locks_list_guard);
1100 LASSERT(lock->l_exp_refs_nr > 0);
1101 if (lock->l_exp_refs_target != exp) {
1102 LCONSOLE_WARN("lock %p, "
1103 "mismatching export pointers: %p, %p\n",
1104 lock, lock->l_exp_refs_target, exp);
1106 if (-- lock->l_exp_refs_nr == 0) {
1107 cfs_list_del_init(&lock->l_exp_refs_link);
1108 lock->l_exp_refs_target = NULL;
1110 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1111 lock, exp, lock->l_exp_refs_nr);
1112 cfs_spin_unlock(&exp->exp_locks_list_guard);
1114 EXPORT_SYMBOL(__class_export_del_lock_ref);
1117 /* A connection defines an export context in which preallocation can
1118 be managed. This releases the export pointer reference, and returns
1119 the export handle, so the export refcount is 1 when this function
1121 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1122 struct obd_uuid *cluuid)
1124 struct obd_export *export;
1125 LASSERT(conn != NULL);
1126 LASSERT(obd != NULL);
1127 LASSERT(cluuid != NULL);
1130 export = class_new_export(obd, cluuid);
1132 RETURN(PTR_ERR(export));
1134 conn->cookie = export->exp_handle.h_cookie;
1135 class_export_put(export);
1137 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1138 cluuid->uuid, conn->cookie);
1141 EXPORT_SYMBOL(class_connect);
1144 /* This function removes 1-3 references from the export:
1145 * 1 - for export pointer passed
1146 * and if disconnect really need
1147 * 2 - removing from hash
1148 * 3 - in client_unlink_export
1149 * The export pointer passed to this function can destroyed */
1150 int class_disconnect(struct obd_export *export)
1152 int already_disconnected;
1155 if (export == NULL) {
1156 CWARN("attempting to free NULL export %p\n", export);
1160 cfs_spin_lock(&export->exp_lock);
1161 already_disconnected = export->exp_disconnected;
1162 export->exp_disconnected = 1;
1163 cfs_spin_unlock(&export->exp_lock);
1165 /* class_cleanup(), abort_recovery(), and class_fail_export()
1166 * all end up in here, and if any of them race we shouldn't
1167 * call extra class_export_puts(). */
1168 if (already_disconnected) {
1169 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1170 GOTO(no_disconn, already_disconnected);
1173 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1174 export->exp_handle.h_cookie);
1176 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1177 cfs_hash_del(export->exp_obd->obd_nid_hash,
1178 &export->exp_connection->c_peer.nid,
1179 &export->exp_nid_hash);
1181 class_unlink_export(export);
1183 class_export_put(export);
1186 EXPORT_SYMBOL(class_disconnect);
1188 /* Return non-zero for a fully connected export */
1189 int class_connected_export(struct obd_export *exp)
1193 cfs_spin_lock(&exp->exp_lock);
1194 connected = (exp->exp_conn_cnt > 0);
1195 cfs_spin_unlock(&exp->exp_lock);
1200 EXPORT_SYMBOL(class_connected_export);
1202 static void class_disconnect_export_list(cfs_list_t *list,
1203 enum obd_option flags)
1206 struct obd_export *exp;
1209 /* It's possible that an export may disconnect itself, but
1210 * nothing else will be added to this list. */
1211 while (!cfs_list_empty(list)) {
1212 exp = cfs_list_entry(list->next, struct obd_export,
1214 /* need for safe call CDEBUG after obd_disconnect */
1215 class_export_get(exp);
1217 cfs_spin_lock(&exp->exp_lock);
1218 exp->exp_flags = flags;
1219 cfs_spin_unlock(&exp->exp_lock);
1221 if (obd_uuid_equals(&exp->exp_client_uuid,
1222 &exp->exp_obd->obd_uuid)) {
1224 "exp %p export uuid == obd uuid, don't discon\n",
1226 /* Need to delete this now so we don't end up pointing
1227 * to work_list later when this export is cleaned up. */
1228 cfs_list_del_init(&exp->exp_obd_chain);
1229 class_export_put(exp);
1233 class_export_get(exp);
1234 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1235 "last request at "CFS_TIME_T"\n",
1236 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1237 exp, exp->exp_last_request_time);
1238 /* release one export reference anyway */
1239 rc = obd_disconnect(exp);
1241 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1242 obd_export_nid2str(exp), exp, rc);
1243 class_export_put(exp);
1248 void class_disconnect_exports(struct obd_device *obd)
1250 cfs_list_t work_list;
1253 /* Move all of the exports from obd_exports to a work list, en masse. */
1254 CFS_INIT_LIST_HEAD(&work_list);
1255 cfs_spin_lock(&obd->obd_dev_lock);
1256 cfs_list_splice_init(&obd->obd_exports, &work_list);
1257 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1258 cfs_spin_unlock(&obd->obd_dev_lock);
1260 if (!cfs_list_empty(&work_list)) {
1261 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1262 "disconnecting them\n", obd->obd_minor, obd);
1263 class_disconnect_export_list(&work_list,
1264 exp_flags_from_obd(obd));
1266 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1267 obd->obd_minor, obd);
1270 EXPORT_SYMBOL(class_disconnect_exports);
1272 /* Remove exports that have not completed recovery.
1274 void class_disconnect_stale_exports(struct obd_device *obd,
1275 int (*test_export)(struct obd_export *))
1277 cfs_list_t work_list;
1278 cfs_list_t *pos, *n;
1279 struct obd_export *exp;
1283 CFS_INIT_LIST_HEAD(&work_list);
1284 cfs_spin_lock(&obd->obd_dev_lock);
1285 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1288 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1290 /* don't count self-export as client */
1291 if (obd_uuid_equals(&exp->exp_client_uuid,
1292 &exp->exp_obd->obd_uuid))
1295 if (test_export(exp))
1298 cfs_spin_lock(&exp->exp_lock);
1299 failed = exp->exp_failed;
1300 exp->exp_failed = 1;
1301 cfs_spin_unlock(&exp->exp_lock);
1305 cfs_list_move(&exp->exp_obd_chain, &work_list);
1307 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1308 obd->obd_name, exp->exp_client_uuid.uuid,
1309 exp->exp_connection == NULL ? "<unknown>" :
1310 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1311 print_export_data(exp, "EVICTING", 0);
1313 cfs_spin_unlock(&obd->obd_dev_lock);
1316 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1317 obd->obd_name, evicted);
1318 obd->obd_stale_clients += evicted;
1320 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1321 OBD_OPT_ABORT_RECOV);
1324 EXPORT_SYMBOL(class_disconnect_stale_exports);
1326 void class_fail_export(struct obd_export *exp)
1328 int rc, already_failed;
1330 cfs_spin_lock(&exp->exp_lock);
1331 already_failed = exp->exp_failed;
1332 exp->exp_failed = 1;
1333 cfs_spin_unlock(&exp->exp_lock);
1335 if (already_failed) {
1336 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1337 exp, exp->exp_client_uuid.uuid);
1341 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1342 exp, exp->exp_client_uuid.uuid);
1344 if (obd_dump_on_timeout)
1345 libcfs_debug_dumplog();
1347 /* need for safe call CDEBUG after obd_disconnect */
1348 class_export_get(exp);
1350 /* Most callers into obd_disconnect are removing their own reference
1351 * (request, for example) in addition to the one from the hash table.
1352 * We don't have such a reference here, so make one. */
1353 class_export_get(exp);
1354 rc = obd_disconnect(exp);
1356 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1358 CDEBUG(D_HA, "disconnected export %p/%s\n",
1359 exp, exp->exp_client_uuid.uuid);
1360 class_export_put(exp);
1362 EXPORT_SYMBOL(class_fail_export);
1364 char *obd_export_nid2str(struct obd_export *exp)
1366 if (exp->exp_connection != NULL)
1367 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1371 EXPORT_SYMBOL(obd_export_nid2str);
1373 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1375 struct obd_export *doomed_exp = NULL;
1376 int exports_evicted = 0;
1378 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1381 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1382 if (doomed_exp == NULL)
1385 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1386 "nid %s found, wanted nid %s, requested nid %s\n",
1387 obd_export_nid2str(doomed_exp),
1388 libcfs_nid2str(nid_key), nid);
1389 LASSERTF(doomed_exp != obd->obd_self_export,
1390 "self-export is hashed by NID?\n");
1392 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1393 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1395 class_fail_export(doomed_exp);
1396 class_export_put(doomed_exp);
1399 if (!exports_evicted)
1400 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1401 obd->obd_name, nid);
1402 return exports_evicted;
1404 EXPORT_SYMBOL(obd_export_evict_by_nid);
1406 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1408 struct obd_export *doomed_exp = NULL;
1409 struct obd_uuid doomed_uuid;
1410 int exports_evicted = 0;
1412 obd_str2uuid(&doomed_uuid, uuid);
1413 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1414 CERROR("%s: can't evict myself\n", obd->obd_name);
1415 return exports_evicted;
1418 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1420 if (doomed_exp == NULL) {
1421 CERROR("%s: can't disconnect %s: no exports found\n",
1422 obd->obd_name, uuid);
1424 CWARN("%s: evicting %s at adminstrative request\n",
1425 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1426 class_fail_export(doomed_exp);
1427 class_export_put(doomed_exp);
1431 return exports_evicted;
1433 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1435 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1436 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1437 EXPORT_SYMBOL(class_export_dump_hook);
1440 static void print_export_data(struct obd_export *exp, const char *status,
1443 struct ptlrpc_reply_state *rs;
1444 struct ptlrpc_reply_state *first_reply = NULL;
1447 cfs_spin_lock(&exp->exp_lock);
1448 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1454 cfs_spin_unlock(&exp->exp_lock);
1456 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1457 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1458 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1459 cfs_atomic_read(&exp->exp_rpc_count),
1460 cfs_atomic_read(&exp->exp_cb_count),
1461 cfs_atomic_read(&exp->exp_locks_count),
1462 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1463 nreplies, first_reply, nreplies > 3 ? "..." : "",
1464 exp->exp_last_committed);
1465 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1466 if (locks && class_export_dump_hook != NULL)
1467 class_export_dump_hook(exp);
1471 void dump_exports(struct obd_device *obd, int locks)
1473 struct obd_export *exp;
1475 cfs_spin_lock(&obd->obd_dev_lock);
1476 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1477 print_export_data(exp, "ACTIVE", locks);
1478 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1479 print_export_data(exp, "UNLINKED", locks);
1480 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1481 print_export_data(exp, "DELAYED", locks);
1482 cfs_spin_unlock(&obd->obd_dev_lock);
1483 cfs_spin_lock(&obd_zombie_impexp_lock);
1484 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1485 print_export_data(exp, "ZOMBIE", locks);
1486 cfs_spin_unlock(&obd_zombie_impexp_lock);
1488 EXPORT_SYMBOL(dump_exports);
1490 void obd_exports_barrier(struct obd_device *obd)
1493 LASSERT(cfs_list_empty(&obd->obd_exports));
1494 cfs_spin_lock(&obd->obd_dev_lock);
1495 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1496 cfs_spin_unlock(&obd->obd_dev_lock);
1497 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1498 cfs_time_seconds(waited));
1499 if (waited > 5 && IS_PO2(waited)) {
1500 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1501 "more than %d seconds. "
1502 "The obd refcount = %d. Is it stuck?\n",
1503 obd->obd_name, waited,
1504 cfs_atomic_read(&obd->obd_refcount));
1505 dump_exports(obd, 1);
1508 cfs_spin_lock(&obd->obd_dev_lock);
1510 cfs_spin_unlock(&obd->obd_dev_lock);
1512 EXPORT_SYMBOL(obd_exports_barrier);
1514 /* Total amount of zombies to be destroyed */
1515 static int zombies_count = 0;
1518 * kill zombie imports and exports
1520 void obd_zombie_impexp_cull(void)
1522 struct obd_import *import;
1523 struct obd_export *export;
1527 cfs_spin_lock(&obd_zombie_impexp_lock);
1530 if (!cfs_list_empty(&obd_zombie_imports)) {
1531 import = cfs_list_entry(obd_zombie_imports.next,
1534 cfs_list_del_init(&import->imp_zombie_chain);
1538 if (!cfs_list_empty(&obd_zombie_exports)) {
1539 export = cfs_list_entry(obd_zombie_exports.next,
1542 cfs_list_del_init(&export->exp_obd_chain);
1545 cfs_spin_unlock(&obd_zombie_impexp_lock);
1547 if (import != NULL) {
1548 class_import_destroy(import);
1549 cfs_spin_lock(&obd_zombie_impexp_lock);
1551 cfs_spin_unlock(&obd_zombie_impexp_lock);
1554 if (export != NULL) {
1555 class_export_destroy(export);
1556 cfs_spin_lock(&obd_zombie_impexp_lock);
1558 cfs_spin_unlock(&obd_zombie_impexp_lock);
1562 } while (import != NULL || export != NULL);
1566 static cfs_completion_t obd_zombie_start;
1567 static cfs_completion_t obd_zombie_stop;
1568 static unsigned long obd_zombie_flags;
1569 static cfs_waitq_t obd_zombie_waitq;
1570 static pid_t obd_zombie_pid;
1573 OBD_ZOMBIE_STOP = 1 << 1
1577 * check for work for kill zombie import/export thread.
1579 static int obd_zombie_impexp_check(void *arg)
1583 cfs_spin_lock(&obd_zombie_impexp_lock);
1584 rc = (zombies_count == 0) &&
1585 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1586 cfs_spin_unlock(&obd_zombie_impexp_lock);
1592 * Add export to the obd_zombe thread and notify it.
1594 static void obd_zombie_export_add(struct obd_export *exp) {
1595 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1596 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1597 cfs_list_del_init(&exp->exp_obd_chain);
1598 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1599 cfs_spin_lock(&obd_zombie_impexp_lock);
1601 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1602 cfs_spin_unlock(&obd_zombie_impexp_lock);
1604 obd_zombie_impexp_notify();
1608 * Add import to the obd_zombe thread and notify it.
1610 static void obd_zombie_import_add(struct obd_import *imp) {
1611 LASSERT(imp->imp_sec == NULL);
1612 LASSERT(imp->imp_rq_pool == NULL);
1613 cfs_spin_lock(&obd_zombie_impexp_lock);
1614 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1616 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1617 cfs_spin_unlock(&obd_zombie_impexp_lock);
1619 obd_zombie_impexp_notify();
1623 * notify import/export destroy thread about new zombie.
1625 static void obd_zombie_impexp_notify(void)
1628 * Make sure obd_zomebie_impexp_thread get this notification.
1629 * It is possible this signal only get by obd_zombie_barrier, and
1630 * barrier gulps this notification and sleeps away and hangs ensues
1632 cfs_waitq_broadcast(&obd_zombie_waitq);
1636 * check whether obd_zombie is idle
1638 static int obd_zombie_is_idle(void)
1642 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1643 cfs_spin_lock(&obd_zombie_impexp_lock);
1644 rc = (zombies_count == 0);
1645 cfs_spin_unlock(&obd_zombie_impexp_lock);
1650 * wait when obd_zombie import/export queues become empty
1652 void obd_zombie_barrier(void)
1654 struct l_wait_info lwi = { 0 };
1656 if (obd_zombie_pid == cfs_curproc_pid())
1657 /* don't wait for myself */
1659 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1661 EXPORT_SYMBOL(obd_zombie_barrier);
1666 * destroy zombie export/import thread.
1668 static int obd_zombie_impexp_thread(void *unused)
1672 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1673 cfs_complete(&obd_zombie_start);
1677 cfs_complete(&obd_zombie_start);
1679 obd_zombie_pid = cfs_curproc_pid();
1681 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1682 struct l_wait_info lwi = { 0 };
1684 l_wait_event(obd_zombie_waitq,
1685 !obd_zombie_impexp_check(NULL), &lwi);
1686 obd_zombie_impexp_cull();
1689 * Notify obd_zombie_barrier callers that queues
1692 cfs_waitq_signal(&obd_zombie_waitq);
1695 cfs_complete(&obd_zombie_stop);
1700 #else /* ! KERNEL */
1702 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1703 static void *obd_zombie_impexp_work_cb;
1704 static void *obd_zombie_impexp_idle_cb;
1706 int obd_zombie_impexp_kill(void *arg)
1710 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1711 obd_zombie_impexp_cull();
1714 cfs_atomic_dec(&zombie_recur);
1721 * start destroy zombie import/export thread
1723 int obd_zombie_impexp_init(void)
1727 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1728 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1729 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1730 cfs_init_completion(&obd_zombie_start);
1731 cfs_init_completion(&obd_zombie_stop);
1732 cfs_waitq_init(&obd_zombie_waitq);
1736 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1740 cfs_wait_for_completion(&obd_zombie_start);
1743 obd_zombie_impexp_work_cb =
1744 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1745 &obd_zombie_impexp_kill, NULL);
1747 obd_zombie_impexp_idle_cb =
1748 liblustre_register_idle_callback("obd_zombi_impexp_check",
1749 &obd_zombie_impexp_check, NULL);
1755 * stop destroy zombie import/export thread
1757 void obd_zombie_impexp_stop(void)
1759 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1760 obd_zombie_impexp_notify();
1762 cfs_wait_for_completion(&obd_zombie_stop);
1764 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1765 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1769 /***** Kernel-userspace comm helpers *******/
1771 /* Get length of entire message, including header */
1772 int kuc_len(int payload_len)
1774 return sizeof(struct kuc_hdr) + payload_len;
1776 EXPORT_SYMBOL(kuc_len);
1778 /* Get a pointer to kuc header, given a ptr to the payload
1779 * @param p Pointer to payload area
1780 * @returns Pointer to kuc header
1782 struct kuc_hdr * kuc_ptr(void *p)
1784 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1785 LASSERT(lh->kuc_magic == KUC_MAGIC);
1788 EXPORT_SYMBOL(kuc_ptr);
1790 /* Test if payload is part of kuc message
1791 * @param p Pointer to payload area
1794 int kuc_ispayload(void *p)
1796 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1798 if (kh->kuc_magic == KUC_MAGIC)
1803 EXPORT_SYMBOL(kuc_ispayload);
1805 /* Alloc space for a message, and fill in header
1806 * @return Pointer to payload area
1808 void *kuc_alloc(int payload_len, int transport, int type)
1811 int len = kuc_len(payload_len);
1815 return ERR_PTR(-ENOMEM);
1817 lh->kuc_magic = KUC_MAGIC;
1818 lh->kuc_transport = transport;
1819 lh->kuc_msgtype = type;
1820 lh->kuc_msglen = len;
1822 return (void *)(lh + 1);
1824 EXPORT_SYMBOL(kuc_alloc);
1826 /* Takes pointer to payload area */
1827 inline void kuc_free(void *p, int payload_len)
1829 struct kuc_hdr *lh = kuc_ptr(p);
1830 OBD_FREE(lh, kuc_len(payload_len));
1832 EXPORT_SYMBOL(kuc_free);