4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 cfs_spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 cfs_spin_unlock(&obd_types_lock);
111 cfs_spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
121 const char *modname = name;
122 if (!cfs_request_module("%s", modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 cfs_spin_lock(&type->obd_type_lock);
134 cfs_try_module_get(type->typ_dt_ops->o_owner);
135 cfs_spin_unlock(&type->obd_type_lock);
139 EXPORT_SYMBOL(class_get_type);
141 void class_put_type(struct obd_type *type)
144 cfs_spin_lock(&type->obd_type_lock);
146 cfs_module_put(type->typ_dt_ops->o_owner);
147 cfs_spin_unlock(&type->obd_type_lock);
149 EXPORT_SYMBOL(class_put_type);
151 #define CLASS_MAX_NAME 1024
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154 struct lprocfs_vars *vars, const char *name,
155 struct lu_device_type *ldt)
157 struct obd_type *type;
162 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164 if (class_search_type(name)) {
165 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
170 OBD_ALLOC(type, sizeof(*type));
174 OBD_ALLOC_PTR(type->typ_dt_ops);
175 OBD_ALLOC_PTR(type->typ_md_ops);
176 OBD_ALLOC(type->typ_name, strlen(name) + 1);
178 if (type->typ_dt_ops == NULL ||
179 type->typ_md_ops == NULL ||
180 type->typ_name == NULL)
183 *(type->typ_dt_ops) = *dt_ops;
184 /* md_ops is optional */
186 *(type->typ_md_ops) = *md_ops;
187 strcpy(type->typ_name, name);
188 cfs_spin_lock_init(&type->obd_type_lock);
191 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193 if (IS_ERR(type->typ_procroot)) {
194 rc = PTR_ERR(type->typ_procroot);
195 type->typ_procroot = NULL;
201 rc = lu_device_type_init(ldt);
206 cfs_spin_lock(&obd_types_lock);
207 cfs_list_add(&type->typ_chain, &obd_types);
208 cfs_spin_unlock(&obd_types_lock);
213 if (type->typ_name != NULL)
214 OBD_FREE(type->typ_name, strlen(name) + 1);
215 if (type->typ_md_ops != NULL)
216 OBD_FREE_PTR(type->typ_md_ops);
217 if (type->typ_dt_ops != NULL)
218 OBD_FREE_PTR(type->typ_dt_ops);
219 OBD_FREE(type, sizeof(*type));
222 EXPORT_SYMBOL(class_register_type);
224 int class_unregister_type(const char *name)
226 struct obd_type *type = class_search_type(name);
230 CERROR("unknown obd type\n");
234 if (type->typ_refcnt) {
235 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236 /* This is a bad situation, let's make the best of it */
237 /* Remove ops, but leave the name for debugging */
238 OBD_FREE_PTR(type->typ_dt_ops);
239 OBD_FREE_PTR(type->typ_md_ops);
243 if (type->typ_procroot) {
244 lprocfs_remove(&type->typ_procroot);
248 lu_device_type_fini(type->typ_lu);
250 cfs_spin_lock(&obd_types_lock);
251 cfs_list_del(&type->typ_chain);
252 cfs_spin_unlock(&obd_types_lock);
253 OBD_FREE(type->typ_name, strlen(name) + 1);
254 if (type->typ_dt_ops != NULL)
255 OBD_FREE_PTR(type->typ_dt_ops);
256 if (type->typ_md_ops != NULL)
257 OBD_FREE_PTR(type->typ_md_ops);
258 OBD_FREE(type, sizeof(*type));
260 } /* class_unregister_type */
261 EXPORT_SYMBOL(class_unregister_type);
264 * Create a new obd device.
266 * Find an empty slot in ::obd_devs[], create a new obd device in it.
268 * \param[in] type_name obd device type string.
269 * \param[in] name obd device name.
271 * \retval NULL if create fails, otherwise return the obd device
274 struct obd_device *class_newdev(const char *type_name, const char *name)
276 struct obd_device *result = NULL;
277 struct obd_device *newdev;
278 struct obd_type *type = NULL;
280 int new_obd_minor = 0;
283 if (strlen(name) >= MAX_OBD_NAME) {
284 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
285 RETURN(ERR_PTR(-EINVAL));
288 type = class_get_type(type_name);
290 CERROR("OBD: unknown type: %s\n", type_name);
291 RETURN(ERR_PTR(-ENODEV));
294 newdev = obd_device_alloc();
295 if (newdev == NULL) {
296 class_put_type(type);
297 RETURN(ERR_PTR(-ENOMEM));
299 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
301 cfs_write_lock(&obd_dev_lock);
302 for (i = 0; i < class_devno_max(); i++) {
303 struct obd_device *obd = class_num2obd(i);
305 if (obd && obd->obd_name &&
306 (strcmp(name, obd->obd_name) == 0)) {
307 CERROR("Device %s already exists at %d, won't add\n",
310 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
311 "%p obd_magic %08x != %08x\n", result,
312 result->obd_magic, OBD_DEVICE_MAGIC);
313 LASSERTF(result->obd_minor == new_obd_minor,
314 "%p obd_minor %d != %d\n", result,
315 result->obd_minor, new_obd_minor);
317 obd_devs[result->obd_minor] = NULL;
318 result->obd_name[0]='\0';
320 result = ERR_PTR(-EEXIST);
323 if (!result && !obd) {
325 result->obd_minor = i;
327 result->obd_type = type;
328 strncpy(result->obd_name, name,
329 sizeof(result->obd_name) - 1);
330 obd_devs[i] = result;
333 cfs_write_unlock(&obd_dev_lock);
335 if (result == NULL && i >= class_devno_max()) {
336 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338 RETURN(ERR_PTR(-EOVERFLOW));
341 if (IS_ERR(result)) {
342 obd_device_free(newdev);
343 class_put_type(type);
345 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346 result->obd_name, result);
351 void class_release_dev(struct obd_device *obd)
353 struct obd_type *obd_type = obd->obd_type;
355 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
356 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
357 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
358 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
359 LASSERT(obd_type != NULL);
361 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
362 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
364 cfs_write_lock(&obd_dev_lock);
365 obd_devs[obd->obd_minor] = NULL;
366 cfs_write_unlock(&obd_dev_lock);
367 obd_device_free(obd);
369 class_put_type(obd_type);
372 int class_name2dev(const char *name)
379 cfs_read_lock(&obd_dev_lock);
380 for (i = 0; i < class_devno_max(); i++) {
381 struct obd_device *obd = class_num2obd(i);
383 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
384 /* Make sure we finished attaching before we give
385 out any references */
386 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
387 if (obd->obd_attached) {
388 cfs_read_unlock(&obd_dev_lock);
394 cfs_read_unlock(&obd_dev_lock);
398 EXPORT_SYMBOL(class_name2dev);
400 struct obd_device *class_name2obd(const char *name)
402 int dev = class_name2dev(name);
404 if (dev < 0 || dev > class_devno_max())
406 return class_num2obd(dev);
408 EXPORT_SYMBOL(class_name2obd);
410 int class_uuid2dev(struct obd_uuid *uuid)
414 cfs_read_lock(&obd_dev_lock);
415 for (i = 0; i < class_devno_max(); i++) {
416 struct obd_device *obd = class_num2obd(i);
418 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
419 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
420 cfs_read_unlock(&obd_dev_lock);
424 cfs_read_unlock(&obd_dev_lock);
428 EXPORT_SYMBOL(class_uuid2dev);
430 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 int dev = class_uuid2dev(uuid);
435 return class_num2obd(dev);
437 EXPORT_SYMBOL(class_uuid2obd);
440 * Get obd device from ::obd_devs[]
442 * \param num [in] array index
444 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
445 * otherwise return the obd device there.
447 struct obd_device *class_num2obd(int num)
449 struct obd_device *obd = NULL;
451 if (num < class_devno_max()) {
456 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
457 "%p obd_magic %08x != %08x\n",
458 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == num,
460 "%p obd_minor %0d != %0d\n",
461 obd, obd->obd_minor, num);
466 EXPORT_SYMBOL(class_num2obd);
468 void class_obd_list(void)
473 cfs_read_lock(&obd_dev_lock);
474 for (i = 0; i < class_devno_max(); i++) {
475 struct obd_device *obd = class_num2obd(i);
479 if (obd->obd_stopping)
481 else if (obd->obd_set_up)
483 else if (obd->obd_attached)
487 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
488 i, status, obd->obd_type->typ_name,
489 obd->obd_name, obd->obd_uuid.uuid,
490 cfs_atomic_read(&obd->obd_refcount));
492 cfs_read_unlock(&obd_dev_lock);
496 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
497 specified, then only the client with that uuid is returned,
498 otherwise any client connected to the tgt is returned. */
499 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
500 const char * typ_name,
501 struct obd_uuid *grp_uuid)
505 cfs_read_lock(&obd_dev_lock);
506 for (i = 0; i < class_devno_max(); i++) {
507 struct obd_device *obd = class_num2obd(i);
511 if ((strncmp(obd->obd_type->typ_name, typ_name,
512 strlen(typ_name)) == 0)) {
513 if (obd_uuid_equals(tgt_uuid,
514 &obd->u.cli.cl_target_uuid) &&
515 ((grp_uuid)? obd_uuid_equals(grp_uuid,
516 &obd->obd_uuid) : 1)) {
517 cfs_read_unlock(&obd_dev_lock);
522 cfs_read_unlock(&obd_dev_lock);
526 EXPORT_SYMBOL(class_find_client_obd);
528 /* Iterate the obd_device list looking devices have grp_uuid. Start
529 searching at *next, and if a device is found, the next index to look
530 at is saved in *next. If next is NULL, then the first matching device
531 will always be returned. */
532 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
538 else if (*next >= 0 && *next < class_devno_max())
543 cfs_read_lock(&obd_dev_lock);
544 for (; i < class_devno_max(); i++) {
545 struct obd_device *obd = class_num2obd(i);
549 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552 cfs_read_unlock(&obd_dev_lock);
556 cfs_read_unlock(&obd_dev_lock);
560 EXPORT_SYMBOL(class_devices_in_group);
563 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
564 * adjust sptlrpc settings accordingly.
566 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 struct obd_device *obd;
572 LASSERT(namelen > 0);
574 cfs_read_lock(&obd_dev_lock);
575 for (i = 0; i < class_devno_max(); i++) {
576 obd = class_num2obd(i);
578 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581 /* only notify mdc, osc, mdt, ost */
582 type = obd->obd_type->typ_name;
583 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
584 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
585 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
586 strcmp(type, LUSTRE_OST_NAME) != 0)
589 if (strncmp(obd->obd_name, fsname, namelen))
592 class_incref(obd, __FUNCTION__, obd);
593 cfs_read_unlock(&obd_dev_lock);
594 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
595 sizeof(KEY_SPTLRPC_CONF),
596 KEY_SPTLRPC_CONF, 0, NULL, NULL);
598 class_decref(obd, __FUNCTION__, obd);
599 cfs_read_lock(&obd_dev_lock);
601 cfs_read_unlock(&obd_dev_lock);
604 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606 void obd_cleanup_caches(void)
611 if (obd_device_cachep) {
612 rc = cfs_mem_cache_destroy(obd_device_cachep);
613 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
614 obd_device_cachep = NULL;
617 rc = cfs_mem_cache_destroy(obdo_cachep);
618 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
622 rc = cfs_mem_cache_destroy(import_cachep);
623 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
624 import_cachep = NULL;
627 rc = cfs_mem_cache_destroy(capa_cachep);
628 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
634 int obd_init_caches(void)
638 LASSERT(obd_device_cachep == NULL);
639 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
640 sizeof(struct obd_device),
642 if (!obd_device_cachep)
645 LASSERT(obdo_cachep == NULL);
646 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
651 LASSERT(import_cachep == NULL);
652 import_cachep = cfs_mem_cache_create("ll_import_cache",
653 sizeof(struct obd_import),
658 LASSERT(capa_cachep == NULL);
659 capa_cachep = cfs_mem_cache_create("capa_cache",
660 sizeof(struct obd_capa), 0, 0);
666 obd_cleanup_caches();
671 /* map connection to client */
672 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 struct obd_export *export;
678 CDEBUG(D_CACHE, "looking for null handle\n");
682 if (conn->cookie == -1) { /* this means assign a new connection */
683 CDEBUG(D_CACHE, "want a new connection\n");
687 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
688 export = class_handle2object(conn->cookie);
691 EXPORT_SYMBOL(class_conn2export);
693 struct obd_device *class_exp2obd(struct obd_export *exp)
699 EXPORT_SYMBOL(class_exp2obd);
701 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 struct obd_export *export;
704 export = class_conn2export(conn);
706 struct obd_device *obd = export->exp_obd;
707 class_export_put(export);
712 EXPORT_SYMBOL(class_conn2obd);
714 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 struct obd_device *obd = exp->exp_obd;
719 return obd->u.cli.cl_import;
721 EXPORT_SYMBOL(class_exp2cliimp);
723 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 struct obd_device *obd = class_conn2obd(conn);
728 return obd->u.cli.cl_import;
730 EXPORT_SYMBOL(class_conn2cliimp);
732 /* Export management functions */
734 /* if export is involved in recovery then clean up related things */
735 void class_export_recovery_cleanup(struct obd_export *exp)
737 struct obd_device *obd = exp->exp_obd;
739 cfs_spin_lock(&obd->obd_recovery_task_lock);
740 if (exp->exp_delayed)
741 obd->obd_delayed_clients--;
742 if (obd->obd_recovering && exp->exp_in_recovery) {
743 cfs_spin_lock(&exp->exp_lock);
744 exp->exp_in_recovery = 0;
745 cfs_spin_unlock(&exp->exp_lock);
746 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
747 cfs_atomic_dec(&obd->obd_connected_clients);
749 cfs_spin_unlock(&obd->obd_recovery_task_lock);
750 /** Cleanup req replay fields */
751 if (exp->exp_req_replay_needed) {
752 cfs_spin_lock(&exp->exp_lock);
753 exp->exp_req_replay_needed = 0;
754 cfs_spin_unlock(&exp->exp_lock);
755 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
756 cfs_atomic_dec(&obd->obd_req_replay_clients);
758 /** Cleanup lock replay data */
759 if (exp->exp_lock_replay_needed) {
760 cfs_spin_lock(&exp->exp_lock);
761 exp->exp_lock_replay_needed = 0;
762 cfs_spin_unlock(&exp->exp_lock);
763 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
764 cfs_atomic_dec(&obd->obd_lock_replay_clients);
768 static void class_export_destroy(struct obd_export *exp)
770 struct obd_device *obd = exp->exp_obd;
773 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774 LASSERT(obd != NULL);
776 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777 exp->exp_client_uuid.uuid, obd->obd_name);
780 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781 if (exp->exp_connection)
782 ptlrpc_put_connection_superhack(exp->exp_connection);
784 LASSERT(cfs_list_empty(&exp->exp_flock_wait_list));
785 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789 obd_destroy_export(exp);
790 class_decref(obd, "export", exp);
792 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
796 static void export_handle_addref(void *export)
798 class_export_get(export);
801 static struct portals_handle_ops export_handle_ops = {
802 .hop_addref = export_handle_addref,
806 struct obd_export *class_export_get(struct obd_export *exp)
808 cfs_atomic_inc(&exp->exp_refcount);
809 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
810 cfs_atomic_read(&exp->exp_refcount));
813 EXPORT_SYMBOL(class_export_get);
815 void class_export_put(struct obd_export *exp)
817 LASSERT(exp != NULL);
818 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
819 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
820 cfs_atomic_read(&exp->exp_refcount) - 1);
822 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
823 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
824 CDEBUG(D_IOCTL, "final put %p/%s\n",
825 exp, exp->exp_client_uuid.uuid);
827 /* release nid stat refererence */
828 lprocfs_exp_cleanup(exp);
829 class_export_recovery_cleanup(exp);
831 obd_zombie_export_add(exp);
834 EXPORT_SYMBOL(class_export_put);
836 /* Creates a new export, adds it to the hash table, and returns a
837 * pointer to it. The refcount is 2: one for the hash reference, and
838 * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840 struct obd_uuid *cluuid)
842 struct obd_export *export;
843 cfs_hash_t *hash = NULL;
847 OBD_ALLOC_PTR(export);
849 return ERR_PTR(-ENOMEM);
851 export->exp_conn_cnt = 0;
852 export->exp_lock_hash = NULL;
853 cfs_atomic_set(&export->exp_refcount, 2);
854 cfs_atomic_set(&export->exp_rpc_count, 0);
855 cfs_atomic_set(&export->exp_cb_count, 0);
856 cfs_atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
859 cfs_spin_lock_init(&export->exp_locks_list_guard);
861 cfs_atomic_set(&export->exp_replay_count, 0);
862 export->exp_obd = obd;
863 CFS_INIT_LIST_HEAD(&export->exp_flock_wait_list);
864 cfs_rwlock_init(&export->exp_flock_wait_lock);
865 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
866 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
867 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
868 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
869 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
870 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
871 class_handle_hash(&export->exp_handle, &export_handle_ops);
872 export->exp_last_request_time = cfs_time_current_sec();
873 cfs_spin_lock_init(&export->exp_lock);
874 cfs_spin_lock_init(&export->exp_rpc_lock);
875 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
876 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
877 cfs_spin_lock_init(&export->exp_bl_list_lock);
878 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
880 export->exp_sp_peer = LUSTRE_SP_ANY;
881 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
882 export->exp_client_uuid = *cluuid;
883 obd_init_export(export);
885 cfs_spin_lock(&obd->obd_dev_lock);
886 /* shouldn't happen, but might race */
887 if (obd->obd_stopping)
888 GOTO(exit_unlock, rc = -ENODEV);
890 hash = cfs_hash_getref(obd->obd_uuid_hash);
892 GOTO(exit_unlock, rc = -ENODEV);
893 cfs_spin_unlock(&obd->obd_dev_lock);
895 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
896 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
898 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
899 obd->obd_name, cluuid->uuid, rc);
900 GOTO(exit_err, rc = -EALREADY);
904 cfs_spin_lock(&obd->obd_dev_lock);
905 if (obd->obd_stopping) {
906 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907 GOTO(exit_unlock, rc = -ENODEV);
910 class_incref(obd, "export", export);
911 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912 cfs_list_add_tail(&export->exp_obd_chain_timed,
913 &export->exp_obd->obd_exports_timed);
914 export->exp_obd->obd_num_exports++;
915 cfs_spin_unlock(&obd->obd_dev_lock);
916 cfs_hash_putref(hash);
920 cfs_spin_unlock(&obd->obd_dev_lock);
923 cfs_hash_putref(hash);
924 class_handle_unhash(&export->exp_handle);
925 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
926 obd_destroy_export(export);
927 OBD_FREE_PTR(export);
930 EXPORT_SYMBOL(class_new_export);
932 void class_unlink_export(struct obd_export *exp)
934 class_handle_unhash(&exp->exp_handle);
936 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
937 /* delete an uuid-export hashitem from hashtables */
938 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
939 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940 &exp->exp_client_uuid,
941 &exp->exp_uuid_hash);
943 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
944 cfs_list_del_init(&exp->exp_obd_chain_timed);
945 exp->exp_obd->obd_num_exports--;
946 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
947 class_export_put(exp);
949 EXPORT_SYMBOL(class_unlink_export);
951 /* Import management functions */
952 void class_import_destroy(struct obd_import *imp)
956 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
957 imp->imp_obd->obd_name);
959 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
961 ptlrpc_put_connection_superhack(imp->imp_connection);
963 while (!cfs_list_empty(&imp->imp_conn_list)) {
964 struct obd_import_conn *imp_conn;
966 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
967 struct obd_import_conn, oic_item);
968 cfs_list_del_init(&imp_conn->oic_item);
969 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
970 OBD_FREE(imp_conn, sizeof(*imp_conn));
973 LASSERT(imp->imp_sec == NULL);
974 class_decref(imp->imp_obd, "import", imp);
975 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
979 static void import_handle_addref(void *import)
981 class_import_get(import);
984 static struct portals_handle_ops import_handle_ops = {
985 .hop_addref = import_handle_addref,
989 struct obd_import *class_import_get(struct obd_import *import)
991 cfs_atomic_inc(&import->imp_refcount);
992 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
993 cfs_atomic_read(&import->imp_refcount),
994 import->imp_obd->obd_name);
997 EXPORT_SYMBOL(class_import_get);
999 void class_import_put(struct obd_import *imp)
1003 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1004 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1006 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1007 cfs_atomic_read(&imp->imp_refcount) - 1,
1008 imp->imp_obd->obd_name);
1010 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1011 CDEBUG(D_INFO, "final put import %p\n", imp);
1012 obd_zombie_import_add(imp);
1017 EXPORT_SYMBOL(class_import_put);
1019 static void init_imp_at(struct imp_at *at) {
1021 at_init(&at->iat_net_latency, 0, 0);
1022 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1023 /* max service estimates are tracked on the server side, so
1024 don't use the AT history here, just use the last reported
1025 val. (But keep hist for proc histogram, worst_ever) */
1026 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1031 struct obd_import *class_new_import(struct obd_device *obd)
1033 struct obd_import *imp;
1035 OBD_ALLOC(imp, sizeof(*imp));
1039 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1040 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1041 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1042 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1043 cfs_spin_lock_init(&imp->imp_lock);
1044 imp->imp_last_success_conn = 0;
1045 imp->imp_state = LUSTRE_IMP_NEW;
1046 imp->imp_obd = class_incref(obd, "import", imp);
1047 cfs_mutex_init(&imp->imp_sec_mutex);
1048 cfs_waitq_init(&imp->imp_recovery_waitq);
1050 cfs_atomic_set(&imp->imp_refcount, 2);
1051 cfs_atomic_set(&imp->imp_unregistering, 0);
1052 cfs_atomic_set(&imp->imp_inflight, 0);
1053 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1054 cfs_atomic_set(&imp->imp_inval_count, 0);
1055 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1056 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1057 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1058 init_imp_at(&imp->imp_at);
1060 /* the default magic is V2, will be used in connect RPC, and
1061 * then adjusted according to the flags in request/reply. */
1062 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1066 EXPORT_SYMBOL(class_new_import);
1068 void class_destroy_import(struct obd_import *import)
1070 LASSERT(import != NULL);
1071 LASSERT(import != LP_POISON);
1073 class_handle_unhash(&import->imp_handle);
1075 cfs_spin_lock(&import->imp_lock);
1076 import->imp_generation++;
1077 cfs_spin_unlock(&import->imp_lock);
1078 class_import_put(import);
1080 EXPORT_SYMBOL(class_destroy_import);
1082 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1084 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 cfs_spin_lock(&exp->exp_locks_list_guard);
1088 LASSERT(lock->l_exp_refs_nr >= 0);
1090 if (lock->l_exp_refs_target != NULL &&
1091 lock->l_exp_refs_target != exp) {
1092 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1093 exp, lock, lock->l_exp_refs_target);
1095 if ((lock->l_exp_refs_nr ++) == 0) {
1096 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1097 lock->l_exp_refs_target = exp;
1099 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1100 lock, exp, lock->l_exp_refs_nr);
1101 cfs_spin_unlock(&exp->exp_locks_list_guard);
1103 EXPORT_SYMBOL(__class_export_add_lock_ref);
1105 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1107 cfs_spin_lock(&exp->exp_locks_list_guard);
1108 LASSERT(lock->l_exp_refs_nr > 0);
1109 if (lock->l_exp_refs_target != exp) {
1110 LCONSOLE_WARN("lock %p, "
1111 "mismatching export pointers: %p, %p\n",
1112 lock, lock->l_exp_refs_target, exp);
1114 if (-- lock->l_exp_refs_nr == 0) {
1115 cfs_list_del_init(&lock->l_exp_refs_link);
1116 lock->l_exp_refs_target = NULL;
1118 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1119 lock, exp, lock->l_exp_refs_nr);
1120 cfs_spin_unlock(&exp->exp_locks_list_guard);
1122 EXPORT_SYMBOL(__class_export_del_lock_ref);
1125 /* A connection defines an export context in which preallocation can
1126 be managed. This releases the export pointer reference, and returns
1127 the export handle, so the export refcount is 1 when this function
1129 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1130 struct obd_uuid *cluuid)
1132 struct obd_export *export;
1133 LASSERT(conn != NULL);
1134 LASSERT(obd != NULL);
1135 LASSERT(cluuid != NULL);
1138 export = class_new_export(obd, cluuid);
1140 RETURN(PTR_ERR(export));
1142 conn->cookie = export->exp_handle.h_cookie;
1143 class_export_put(export);
1145 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1146 cluuid->uuid, conn->cookie);
1149 EXPORT_SYMBOL(class_connect);
1152 /* This function removes 1-3 references from the export:
1153 * 1 - for export pointer passed
1154 * and if disconnect really need
1155 * 2 - removing from hash
1156 * 3 - in client_unlink_export
1157 * The export pointer passed to this function can destroyed */
1158 int class_disconnect(struct obd_export *export)
1160 int already_disconnected;
1163 if (export == NULL) {
1164 CWARN("attempting to free NULL export %p\n", export);
1168 cfs_spin_lock(&export->exp_lock);
1169 already_disconnected = export->exp_disconnected;
1170 export->exp_disconnected = 1;
1171 cfs_spin_unlock(&export->exp_lock);
1173 /* class_cleanup(), abort_recovery(), and class_fail_export()
1174 * all end up in here, and if any of them race we shouldn't
1175 * call extra class_export_puts(). */
1176 if (already_disconnected) {
1177 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1178 GOTO(no_disconn, already_disconnected);
1181 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1182 export->exp_handle.h_cookie);
1184 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1185 cfs_hash_del(export->exp_obd->obd_nid_hash,
1186 &export->exp_connection->c_peer.nid,
1187 &export->exp_nid_hash);
1189 class_unlink_export(export);
1191 class_export_put(export);
1194 EXPORT_SYMBOL(class_disconnect);
1196 /* Return non-zero for a fully connected export */
1197 int class_connected_export(struct obd_export *exp)
1201 cfs_spin_lock(&exp->exp_lock);
1202 connected = (exp->exp_conn_cnt > 0);
1203 cfs_spin_unlock(&exp->exp_lock);
1208 EXPORT_SYMBOL(class_connected_export);
1210 static void class_disconnect_export_list(cfs_list_t *list,
1211 enum obd_option flags)
1214 struct obd_export *exp;
1217 /* It's possible that an export may disconnect itself, but
1218 * nothing else will be added to this list. */
1219 while (!cfs_list_empty(list)) {
1220 exp = cfs_list_entry(list->next, struct obd_export,
1222 /* need for safe call CDEBUG after obd_disconnect */
1223 class_export_get(exp);
1225 cfs_spin_lock(&exp->exp_lock);
1226 exp->exp_flags = flags;
1227 cfs_spin_unlock(&exp->exp_lock);
1229 if (obd_uuid_equals(&exp->exp_client_uuid,
1230 &exp->exp_obd->obd_uuid)) {
1232 "exp %p export uuid == obd uuid, don't discon\n",
1234 /* Need to delete this now so we don't end up pointing
1235 * to work_list later when this export is cleaned up. */
1236 cfs_list_del_init(&exp->exp_obd_chain);
1237 class_export_put(exp);
1241 class_export_get(exp);
1242 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1243 "last request at "CFS_TIME_T"\n",
1244 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1245 exp, exp->exp_last_request_time);
1246 /* release one export reference anyway */
1247 rc = obd_disconnect(exp);
1249 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1250 obd_export_nid2str(exp), exp, rc);
1251 class_export_put(exp);
1256 void class_disconnect_exports(struct obd_device *obd)
1258 cfs_list_t work_list;
1261 /* Move all of the exports from obd_exports to a work list, en masse. */
1262 CFS_INIT_LIST_HEAD(&work_list);
1263 cfs_spin_lock(&obd->obd_dev_lock);
1264 cfs_list_splice_init(&obd->obd_exports, &work_list);
1265 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1266 cfs_spin_unlock(&obd->obd_dev_lock);
1268 if (!cfs_list_empty(&work_list)) {
1269 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1270 "disconnecting them\n", obd->obd_minor, obd);
1271 class_disconnect_export_list(&work_list,
1272 exp_flags_from_obd(obd));
1274 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1275 obd->obd_minor, obd);
1278 EXPORT_SYMBOL(class_disconnect_exports);
1280 /* Remove exports that have not completed recovery.
1282 void class_disconnect_stale_exports(struct obd_device *obd,
1283 int (*test_export)(struct obd_export *))
1285 cfs_list_t work_list;
1286 cfs_list_t *pos, *n;
1287 struct obd_export *exp;
1291 CFS_INIT_LIST_HEAD(&work_list);
1292 cfs_spin_lock(&obd->obd_dev_lock);
1293 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1296 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1298 /* don't count self-export as client */
1299 if (obd_uuid_equals(&exp->exp_client_uuid,
1300 &exp->exp_obd->obd_uuid))
1303 if (test_export(exp))
1306 cfs_spin_lock(&exp->exp_lock);
1307 failed = exp->exp_failed;
1308 exp->exp_failed = 1;
1309 cfs_spin_unlock(&exp->exp_lock);
1313 cfs_list_move(&exp->exp_obd_chain, &work_list);
1315 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1316 obd->obd_name, exp->exp_client_uuid.uuid,
1317 exp->exp_connection == NULL ? "<unknown>" :
1318 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1319 print_export_data(exp, "EVICTING", 0);
1321 cfs_spin_unlock(&obd->obd_dev_lock);
1324 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1325 obd->obd_name, evicted);
1326 obd->obd_stale_clients += evicted;
1328 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1329 OBD_OPT_ABORT_RECOV);
1332 EXPORT_SYMBOL(class_disconnect_stale_exports);
1334 void class_fail_export(struct obd_export *exp)
1336 int rc, already_failed;
1338 cfs_spin_lock(&exp->exp_lock);
1339 already_failed = exp->exp_failed;
1340 exp->exp_failed = 1;
1341 cfs_spin_unlock(&exp->exp_lock);
1343 if (already_failed) {
1344 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1345 exp, exp->exp_client_uuid.uuid);
1349 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1350 exp, exp->exp_client_uuid.uuid);
1352 if (obd_dump_on_timeout)
1353 libcfs_debug_dumplog();
1355 /* need for safe call CDEBUG after obd_disconnect */
1356 class_export_get(exp);
1358 /* Most callers into obd_disconnect are removing their own reference
1359 * (request, for example) in addition to the one from the hash table.
1360 * We don't have such a reference here, so make one. */
1361 class_export_get(exp);
1362 rc = obd_disconnect(exp);
1364 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1366 CDEBUG(D_HA, "disconnected export %p/%s\n",
1367 exp, exp->exp_client_uuid.uuid);
1368 class_export_put(exp);
1370 EXPORT_SYMBOL(class_fail_export);
1372 char *obd_export_nid2str(struct obd_export *exp)
1374 if (exp->exp_connection != NULL)
1375 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1379 EXPORT_SYMBOL(obd_export_nid2str);
1381 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1383 struct obd_export *doomed_exp = NULL;
1384 int exports_evicted = 0;
1386 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1389 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1390 if (doomed_exp == NULL)
1393 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1394 "nid %s found, wanted nid %s, requested nid %s\n",
1395 obd_export_nid2str(doomed_exp),
1396 libcfs_nid2str(nid_key), nid);
1397 LASSERTF(doomed_exp != obd->obd_self_export,
1398 "self-export is hashed by NID?\n");
1400 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1401 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1403 class_fail_export(doomed_exp);
1404 class_export_put(doomed_exp);
1407 if (!exports_evicted)
1408 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1409 obd->obd_name, nid);
1410 return exports_evicted;
1412 EXPORT_SYMBOL(obd_export_evict_by_nid);
1414 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1416 struct obd_export *doomed_exp = NULL;
1417 struct obd_uuid doomed_uuid;
1418 int exports_evicted = 0;
1420 obd_str2uuid(&doomed_uuid, uuid);
1421 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1422 CERROR("%s: can't evict myself\n", obd->obd_name);
1423 return exports_evicted;
1426 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1428 if (doomed_exp == NULL) {
1429 CERROR("%s: can't disconnect %s: no exports found\n",
1430 obd->obd_name, uuid);
1432 CWARN("%s: evicting %s at adminstrative request\n",
1433 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1434 class_fail_export(doomed_exp);
1435 class_export_put(doomed_exp);
1439 return exports_evicted;
1441 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1443 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1444 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1445 EXPORT_SYMBOL(class_export_dump_hook);
1448 static void print_export_data(struct obd_export *exp, const char *status,
1451 struct ptlrpc_reply_state *rs;
1452 struct ptlrpc_reply_state *first_reply = NULL;
1455 cfs_spin_lock(&exp->exp_lock);
1456 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1462 cfs_spin_unlock(&exp->exp_lock);
1464 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1465 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1466 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1467 cfs_atomic_read(&exp->exp_rpc_count),
1468 cfs_atomic_read(&exp->exp_cb_count),
1469 cfs_atomic_read(&exp->exp_locks_count),
1470 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1471 nreplies, first_reply, nreplies > 3 ? "..." : "",
1472 exp->exp_last_committed);
1473 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1474 if (locks && class_export_dump_hook != NULL)
1475 class_export_dump_hook(exp);
1479 void dump_exports(struct obd_device *obd, int locks)
1481 struct obd_export *exp;
1483 cfs_spin_lock(&obd->obd_dev_lock);
1484 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1485 print_export_data(exp, "ACTIVE", locks);
1486 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1487 print_export_data(exp, "UNLINKED", locks);
1488 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1489 print_export_data(exp, "DELAYED", locks);
1490 cfs_spin_unlock(&obd->obd_dev_lock);
1491 cfs_spin_lock(&obd_zombie_impexp_lock);
1492 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1493 print_export_data(exp, "ZOMBIE", locks);
1494 cfs_spin_unlock(&obd_zombie_impexp_lock);
1496 EXPORT_SYMBOL(dump_exports);
1498 void obd_exports_barrier(struct obd_device *obd)
1501 LASSERT(cfs_list_empty(&obd->obd_exports));
1502 cfs_spin_lock(&obd->obd_dev_lock);
1503 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1504 cfs_spin_unlock(&obd->obd_dev_lock);
1505 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1506 cfs_time_seconds(waited));
1507 if (waited > 5 && IS_PO2(waited)) {
1508 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1509 "more than %d seconds. "
1510 "The obd refcount = %d. Is it stuck?\n",
1511 obd->obd_name, waited,
1512 cfs_atomic_read(&obd->obd_refcount));
1513 dump_exports(obd, 1);
1516 cfs_spin_lock(&obd->obd_dev_lock);
1518 cfs_spin_unlock(&obd->obd_dev_lock);
1520 EXPORT_SYMBOL(obd_exports_barrier);
1522 /* Total amount of zombies to be destroyed */
1523 static int zombies_count = 0;
1526 * kill zombie imports and exports
1528 void obd_zombie_impexp_cull(void)
1530 struct obd_import *import;
1531 struct obd_export *export;
1535 cfs_spin_lock(&obd_zombie_impexp_lock);
1538 if (!cfs_list_empty(&obd_zombie_imports)) {
1539 import = cfs_list_entry(obd_zombie_imports.next,
1542 cfs_list_del_init(&import->imp_zombie_chain);
1546 if (!cfs_list_empty(&obd_zombie_exports)) {
1547 export = cfs_list_entry(obd_zombie_exports.next,
1550 cfs_list_del_init(&export->exp_obd_chain);
1553 cfs_spin_unlock(&obd_zombie_impexp_lock);
1555 if (import != NULL) {
1556 class_import_destroy(import);
1557 cfs_spin_lock(&obd_zombie_impexp_lock);
1559 cfs_spin_unlock(&obd_zombie_impexp_lock);
1562 if (export != NULL) {
1563 class_export_destroy(export);
1564 cfs_spin_lock(&obd_zombie_impexp_lock);
1566 cfs_spin_unlock(&obd_zombie_impexp_lock);
1570 } while (import != NULL || export != NULL);
1574 static cfs_completion_t obd_zombie_start;
1575 static cfs_completion_t obd_zombie_stop;
1576 static unsigned long obd_zombie_flags;
1577 static cfs_waitq_t obd_zombie_waitq;
1578 static pid_t obd_zombie_pid;
1581 OBD_ZOMBIE_STOP = 1 << 1
1585 * check for work for kill zombie import/export thread.
1587 static int obd_zombie_impexp_check(void *arg)
1591 cfs_spin_lock(&obd_zombie_impexp_lock);
1592 rc = (zombies_count == 0) &&
1593 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1594 cfs_spin_unlock(&obd_zombie_impexp_lock);
1600 * Add export to the obd_zombe thread and notify it.
1602 static void obd_zombie_export_add(struct obd_export *exp) {
1603 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1604 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1605 cfs_list_del_init(&exp->exp_obd_chain);
1606 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1607 cfs_spin_lock(&obd_zombie_impexp_lock);
1609 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1610 cfs_spin_unlock(&obd_zombie_impexp_lock);
1612 obd_zombie_impexp_notify();
1616 * Add import to the obd_zombe thread and notify it.
1618 static void obd_zombie_import_add(struct obd_import *imp) {
1619 LASSERT(imp->imp_sec == NULL);
1620 LASSERT(imp->imp_rq_pool == NULL);
1621 cfs_spin_lock(&obd_zombie_impexp_lock);
1622 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1624 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1625 cfs_spin_unlock(&obd_zombie_impexp_lock);
1627 obd_zombie_impexp_notify();
1631 * notify import/export destroy thread about new zombie.
1633 static void obd_zombie_impexp_notify(void)
1636 * Make sure obd_zomebie_impexp_thread get this notification.
1637 * It is possible this signal only get by obd_zombie_barrier, and
1638 * barrier gulps this notification and sleeps away and hangs ensues
1640 cfs_waitq_broadcast(&obd_zombie_waitq);
1644 * check whether obd_zombie is idle
1646 static int obd_zombie_is_idle(void)
1650 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1651 cfs_spin_lock(&obd_zombie_impexp_lock);
1652 rc = (zombies_count == 0);
1653 cfs_spin_unlock(&obd_zombie_impexp_lock);
1658 * wait when obd_zombie import/export queues become empty
1660 void obd_zombie_barrier(void)
1662 struct l_wait_info lwi = { 0 };
1664 if (obd_zombie_pid == cfs_curproc_pid())
1665 /* don't wait for myself */
1667 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1669 EXPORT_SYMBOL(obd_zombie_barrier);
1674 * destroy zombie export/import thread.
1676 static int obd_zombie_impexp_thread(void *unused)
1680 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1681 cfs_complete(&obd_zombie_start);
1685 cfs_complete(&obd_zombie_start);
1687 obd_zombie_pid = cfs_curproc_pid();
1689 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1690 struct l_wait_info lwi = { 0 };
1692 l_wait_event(obd_zombie_waitq,
1693 !obd_zombie_impexp_check(NULL), &lwi);
1694 obd_zombie_impexp_cull();
1697 * Notify obd_zombie_barrier callers that queues
1700 cfs_waitq_signal(&obd_zombie_waitq);
1703 cfs_complete(&obd_zombie_stop);
1708 #else /* ! KERNEL */
1710 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1711 static void *obd_zombie_impexp_work_cb;
1712 static void *obd_zombie_impexp_idle_cb;
1714 int obd_zombie_impexp_kill(void *arg)
1718 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1719 obd_zombie_impexp_cull();
1722 cfs_atomic_dec(&zombie_recur);
1729 * start destroy zombie import/export thread
1731 int obd_zombie_impexp_init(void)
1735 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1736 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1737 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1738 cfs_init_completion(&obd_zombie_start);
1739 cfs_init_completion(&obd_zombie_stop);
1740 cfs_waitq_init(&obd_zombie_waitq);
1744 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1748 cfs_wait_for_completion(&obd_zombie_start);
1751 obd_zombie_impexp_work_cb =
1752 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1753 &obd_zombie_impexp_kill, NULL);
1755 obd_zombie_impexp_idle_cb =
1756 liblustre_register_idle_callback("obd_zombi_impexp_check",
1757 &obd_zombie_impexp_check, NULL);
1763 * stop destroy zombie import/export thread
1765 void obd_zombie_impexp_stop(void)
1767 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1768 obd_zombie_impexp_notify();
1770 cfs_wait_for_completion(&obd_zombie_stop);
1772 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1773 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1777 /***** Kernel-userspace comm helpers *******/
1779 /* Get length of entire message, including header */
1780 int kuc_len(int payload_len)
1782 return sizeof(struct kuc_hdr) + payload_len;
1784 EXPORT_SYMBOL(kuc_len);
1786 /* Get a pointer to kuc header, given a ptr to the payload
1787 * @param p Pointer to payload area
1788 * @returns Pointer to kuc header
1790 struct kuc_hdr * kuc_ptr(void *p)
1792 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1793 LASSERT(lh->kuc_magic == KUC_MAGIC);
1796 EXPORT_SYMBOL(kuc_ptr);
1798 /* Test if payload is part of kuc message
1799 * @param p Pointer to payload area
1802 int kuc_ispayload(void *p)
1804 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1806 if (kh->kuc_magic == KUC_MAGIC)
1811 EXPORT_SYMBOL(kuc_ispayload);
1813 /* Alloc space for a message, and fill in header
1814 * @return Pointer to payload area
1816 void *kuc_alloc(int payload_len, int transport, int type)
1819 int len = kuc_len(payload_len);
1823 return ERR_PTR(-ENOMEM);
1825 lh->kuc_magic = KUC_MAGIC;
1826 lh->kuc_transport = transport;
1827 lh->kuc_msgtype = type;
1828 lh->kuc_msglen = len;
1830 return (void *)(lh + 1);
1832 EXPORT_SYMBOL(kuc_alloc);
1834 /* Takes pointer to payload area */
1835 inline void kuc_free(void *p, int payload_len)
1837 struct kuc_hdr *lh = kuc_ptr(p);
1838 OBD_FREE(lh, kuc_len(payload_len));
1840 EXPORT_SYMBOL(kuc_free);