4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 cfs_spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 cfs_spin_unlock(&obd_types_lock);
111 cfs_spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
121 const char *modname = name;
122 if (!cfs_request_module("%s", modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 cfs_spin_lock(&type->obd_type_lock);
134 cfs_try_module_get(type->typ_dt_ops->o_owner);
135 cfs_spin_unlock(&type->obd_type_lock);
139 EXPORT_SYMBOL(class_get_type);
141 void class_put_type(struct obd_type *type)
144 cfs_spin_lock(&type->obd_type_lock);
146 cfs_module_put(type->typ_dt_ops->o_owner);
147 cfs_spin_unlock(&type->obd_type_lock);
149 EXPORT_SYMBOL(class_put_type);
151 #define CLASS_MAX_NAME 1024
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154 struct lprocfs_vars *vars, const char *name,
155 struct lu_device_type *ldt)
157 struct obd_type *type;
162 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164 if (class_search_type(name)) {
165 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
170 OBD_ALLOC(type, sizeof(*type));
174 OBD_ALLOC_PTR(type->typ_dt_ops);
175 OBD_ALLOC_PTR(type->typ_md_ops);
176 OBD_ALLOC(type->typ_name, strlen(name) + 1);
178 if (type->typ_dt_ops == NULL ||
179 type->typ_md_ops == NULL ||
180 type->typ_name == NULL)
183 *(type->typ_dt_ops) = *dt_ops;
184 /* md_ops is optional */
186 *(type->typ_md_ops) = *md_ops;
187 strcpy(type->typ_name, name);
188 cfs_spin_lock_init(&type->obd_type_lock);
191 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193 if (IS_ERR(type->typ_procroot)) {
194 rc = PTR_ERR(type->typ_procroot);
195 type->typ_procroot = NULL;
201 rc = lu_device_type_init(ldt);
206 cfs_spin_lock(&obd_types_lock);
207 cfs_list_add(&type->typ_chain, &obd_types);
208 cfs_spin_unlock(&obd_types_lock);
213 if (type->typ_name != NULL)
214 OBD_FREE(type->typ_name, strlen(name) + 1);
215 if (type->typ_md_ops != NULL)
216 OBD_FREE_PTR(type->typ_md_ops);
217 if (type->typ_dt_ops != NULL)
218 OBD_FREE_PTR(type->typ_dt_ops);
219 OBD_FREE(type, sizeof(*type));
222 EXPORT_SYMBOL(class_register_type);
224 int class_unregister_type(const char *name)
226 struct obd_type *type = class_search_type(name);
230 CERROR("unknown obd type\n");
234 if (type->typ_refcnt) {
235 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236 /* This is a bad situation, let's make the best of it */
237 /* Remove ops, but leave the name for debugging */
238 OBD_FREE_PTR(type->typ_dt_ops);
239 OBD_FREE_PTR(type->typ_md_ops);
243 if (type->typ_procroot) {
244 lprocfs_remove(&type->typ_procroot);
248 lu_device_type_fini(type->typ_lu);
250 cfs_spin_lock(&obd_types_lock);
251 cfs_list_del(&type->typ_chain);
252 cfs_spin_unlock(&obd_types_lock);
253 OBD_FREE(type->typ_name, strlen(name) + 1);
254 if (type->typ_dt_ops != NULL)
255 OBD_FREE_PTR(type->typ_dt_ops);
256 if (type->typ_md_ops != NULL)
257 OBD_FREE_PTR(type->typ_md_ops);
258 OBD_FREE(type, sizeof(*type));
260 } /* class_unregister_type */
261 EXPORT_SYMBOL(class_unregister_type);
264 * Create a new obd device.
266 * Find an empty slot in ::obd_devs[], create a new obd device in it.
268 * \param[in] type_name obd device type string.
269 * \param[in] name obd device name.
271 * \retval NULL if create fails, otherwise return the obd device
274 struct obd_device *class_newdev(const char *type_name, const char *name)
276 struct obd_device *result = NULL;
277 struct obd_device *newdev;
278 struct obd_type *type = NULL;
280 int new_obd_minor = 0;
283 if (strlen(name) >= MAX_OBD_NAME) {
284 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
285 RETURN(ERR_PTR(-EINVAL));
288 type = class_get_type(type_name);
290 CERROR("OBD: unknown type: %s\n", type_name);
291 RETURN(ERR_PTR(-ENODEV));
294 newdev = obd_device_alloc();
295 if (newdev == NULL) {
296 class_put_type(type);
297 RETURN(ERR_PTR(-ENOMEM));
299 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
301 cfs_write_lock(&obd_dev_lock);
302 for (i = 0; i < class_devno_max(); i++) {
303 struct obd_device *obd = class_num2obd(i);
305 if (obd && obd->obd_name &&
306 (strcmp(name, obd->obd_name) == 0)) {
307 CERROR("Device %s already exists at %d, won't add\n",
310 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
311 "%p obd_magic %08x != %08x\n", result,
312 result->obd_magic, OBD_DEVICE_MAGIC);
313 LASSERTF(result->obd_minor == new_obd_minor,
314 "%p obd_minor %d != %d\n", result,
315 result->obd_minor, new_obd_minor);
317 obd_devs[result->obd_minor] = NULL;
318 result->obd_name[0]='\0';
320 result = ERR_PTR(-EEXIST);
323 if (!result && !obd) {
325 result->obd_minor = i;
327 result->obd_type = type;
328 strncpy(result->obd_name, name,
329 sizeof(result->obd_name) - 1);
330 obd_devs[i] = result;
333 cfs_write_unlock(&obd_dev_lock);
335 if (result == NULL && i >= class_devno_max()) {
336 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338 RETURN(ERR_PTR(-EOVERFLOW));
341 if (IS_ERR(result)) {
342 obd_device_free(newdev);
343 class_put_type(type);
345 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346 result->obd_name, result);
351 void class_release_dev(struct obd_device *obd)
353 struct obd_type *obd_type = obd->obd_type;
355 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
356 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
357 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
358 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
359 LASSERT(obd_type != NULL);
361 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
362 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
364 cfs_write_lock(&obd_dev_lock);
365 obd_devs[obd->obd_minor] = NULL;
366 cfs_write_unlock(&obd_dev_lock);
367 obd_device_free(obd);
369 class_put_type(obd_type);
372 int class_name2dev(const char *name)
379 cfs_read_lock(&obd_dev_lock);
380 for (i = 0; i < class_devno_max(); i++) {
381 struct obd_device *obd = class_num2obd(i);
383 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
384 /* Make sure we finished attaching before we give
385 out any references */
386 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
387 if (obd->obd_attached) {
388 cfs_read_unlock(&obd_dev_lock);
394 cfs_read_unlock(&obd_dev_lock);
398 EXPORT_SYMBOL(class_name2dev);
400 struct obd_device *class_name2obd(const char *name)
402 int dev = class_name2dev(name);
404 if (dev < 0 || dev > class_devno_max())
406 return class_num2obd(dev);
408 EXPORT_SYMBOL(class_name2obd);
410 int class_uuid2dev(struct obd_uuid *uuid)
414 cfs_read_lock(&obd_dev_lock);
415 for (i = 0; i < class_devno_max(); i++) {
416 struct obd_device *obd = class_num2obd(i);
418 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
419 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
420 cfs_read_unlock(&obd_dev_lock);
424 cfs_read_unlock(&obd_dev_lock);
428 EXPORT_SYMBOL(class_uuid2dev);
430 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 int dev = class_uuid2dev(uuid);
435 return class_num2obd(dev);
437 EXPORT_SYMBOL(class_uuid2obd);
440 * Get obd device from ::obd_devs[]
442 * \param num [in] array index
444 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
445 * otherwise return the obd device there.
447 struct obd_device *class_num2obd(int num)
449 struct obd_device *obd = NULL;
451 if (num < class_devno_max()) {
456 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
457 "%p obd_magic %08x != %08x\n",
458 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == num,
460 "%p obd_minor %0d != %0d\n",
461 obd, obd->obd_minor, num);
466 EXPORT_SYMBOL(class_num2obd);
468 void class_obd_list(void)
473 cfs_read_lock(&obd_dev_lock);
474 for (i = 0; i < class_devno_max(); i++) {
475 struct obd_device *obd = class_num2obd(i);
479 if (obd->obd_stopping)
481 else if (obd->obd_set_up)
483 else if (obd->obd_attached)
487 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
488 i, status, obd->obd_type->typ_name,
489 obd->obd_name, obd->obd_uuid.uuid,
490 cfs_atomic_read(&obd->obd_refcount));
492 cfs_read_unlock(&obd_dev_lock);
496 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
497 specified, then only the client with that uuid is returned,
498 otherwise any client connected to the tgt is returned. */
499 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
500 const char * typ_name,
501 struct obd_uuid *grp_uuid)
505 cfs_read_lock(&obd_dev_lock);
506 for (i = 0; i < class_devno_max(); i++) {
507 struct obd_device *obd = class_num2obd(i);
511 if ((strncmp(obd->obd_type->typ_name, typ_name,
512 strlen(typ_name)) == 0)) {
513 if (obd_uuid_equals(tgt_uuid,
514 &obd->u.cli.cl_target_uuid) &&
515 ((grp_uuid)? obd_uuid_equals(grp_uuid,
516 &obd->obd_uuid) : 1)) {
517 cfs_read_unlock(&obd_dev_lock);
522 cfs_read_unlock(&obd_dev_lock);
526 EXPORT_SYMBOL(class_find_client_obd);
528 /* Iterate the obd_device list looking devices have grp_uuid. Start
529 searching at *next, and if a device is found, the next index to look
530 at is saved in *next. If next is NULL, then the first matching device
531 will always be returned. */
532 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
538 else if (*next >= 0 && *next < class_devno_max())
543 cfs_read_lock(&obd_dev_lock);
544 for (; i < class_devno_max(); i++) {
545 struct obd_device *obd = class_num2obd(i);
549 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552 cfs_read_unlock(&obd_dev_lock);
556 cfs_read_unlock(&obd_dev_lock);
560 EXPORT_SYMBOL(class_devices_in_group);
563 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
564 * adjust sptlrpc settings accordingly.
566 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 struct obd_device *obd;
572 LASSERT(namelen > 0);
574 cfs_read_lock(&obd_dev_lock);
575 for (i = 0; i < class_devno_max(); i++) {
576 obd = class_num2obd(i);
578 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581 /* only notify mdc, osc, mdt, ost */
582 type = obd->obd_type->typ_name;
583 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
584 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
585 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
586 strcmp(type, LUSTRE_OST_NAME) != 0)
589 if (strncmp(obd->obd_name, fsname, namelen))
592 class_incref(obd, __FUNCTION__, obd);
593 cfs_read_unlock(&obd_dev_lock);
594 rc2 = obd_set_info_async(obd->obd_self_export,
595 sizeof(KEY_SPTLRPC_CONF),
596 KEY_SPTLRPC_CONF, 0, NULL, NULL);
598 class_decref(obd, __FUNCTION__, obd);
599 cfs_read_lock(&obd_dev_lock);
601 cfs_read_unlock(&obd_dev_lock);
604 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606 void obd_cleanup_caches(void)
611 if (obd_device_cachep) {
612 rc = cfs_mem_cache_destroy(obd_device_cachep);
613 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
614 obd_device_cachep = NULL;
617 rc = cfs_mem_cache_destroy(obdo_cachep);
618 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
622 rc = cfs_mem_cache_destroy(import_cachep);
623 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
624 import_cachep = NULL;
627 rc = cfs_mem_cache_destroy(capa_cachep);
628 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
634 int obd_init_caches(void)
638 LASSERT(obd_device_cachep == NULL);
639 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
640 sizeof(struct obd_device),
642 if (!obd_device_cachep)
645 LASSERT(obdo_cachep == NULL);
646 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
651 LASSERT(import_cachep == NULL);
652 import_cachep = cfs_mem_cache_create("ll_import_cache",
653 sizeof(struct obd_import),
658 LASSERT(capa_cachep == NULL);
659 capa_cachep = cfs_mem_cache_create("capa_cache",
660 sizeof(struct obd_capa), 0, 0);
666 obd_cleanup_caches();
671 /* map connection to client */
672 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 struct obd_export *export;
678 CDEBUG(D_CACHE, "looking for null handle\n");
682 if (conn->cookie == -1) { /* this means assign a new connection */
683 CDEBUG(D_CACHE, "want a new connection\n");
687 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
688 export = class_handle2object(conn->cookie);
691 EXPORT_SYMBOL(class_conn2export);
693 struct obd_device *class_exp2obd(struct obd_export *exp)
699 EXPORT_SYMBOL(class_exp2obd);
701 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 struct obd_export *export;
704 export = class_conn2export(conn);
706 struct obd_device *obd = export->exp_obd;
707 class_export_put(export);
712 EXPORT_SYMBOL(class_conn2obd);
714 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 struct obd_device *obd = exp->exp_obd;
719 return obd->u.cli.cl_import;
721 EXPORT_SYMBOL(class_exp2cliimp);
723 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 struct obd_device *obd = class_conn2obd(conn);
728 return obd->u.cli.cl_import;
730 EXPORT_SYMBOL(class_conn2cliimp);
732 /* Export management functions */
734 /* if export is involved in recovery then clean up related things */
735 void class_export_recovery_cleanup(struct obd_export *exp)
737 struct obd_device *obd = exp->exp_obd;
739 cfs_spin_lock(&obd->obd_recovery_task_lock);
740 if (exp->exp_delayed)
741 obd->obd_delayed_clients--;
742 if (obd->obd_recovering && exp->exp_in_recovery) {
743 cfs_spin_lock(&exp->exp_lock);
744 exp->exp_in_recovery = 0;
745 cfs_spin_unlock(&exp->exp_lock);
746 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
747 cfs_atomic_dec(&obd->obd_connected_clients);
749 cfs_spin_unlock(&obd->obd_recovery_task_lock);
750 /** Cleanup req replay fields */
751 if (exp->exp_req_replay_needed) {
752 cfs_spin_lock(&exp->exp_lock);
753 exp->exp_req_replay_needed = 0;
754 cfs_spin_unlock(&exp->exp_lock);
755 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
756 cfs_atomic_dec(&obd->obd_req_replay_clients);
758 /** Cleanup lock replay data */
759 if (exp->exp_lock_replay_needed) {
760 cfs_spin_lock(&exp->exp_lock);
761 exp->exp_lock_replay_needed = 0;
762 cfs_spin_unlock(&exp->exp_lock);
763 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
764 cfs_atomic_dec(&obd->obd_lock_replay_clients);
768 static void class_export_destroy(struct obd_export *exp)
770 struct obd_device *obd = exp->exp_obd;
773 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774 LASSERT(obd != NULL);
776 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777 exp->exp_client_uuid.uuid, obd->obd_name);
780 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781 if (exp->exp_connection)
782 ptlrpc_put_connection_superhack(exp->exp_connection);
784 LASSERT(cfs_list_empty(&exp->exp_flock_wait_list));
785 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789 obd_destroy_export(exp);
790 class_decref(obd, "export", exp);
792 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
796 static void export_handle_addref(void *export)
798 class_export_get(export);
801 struct obd_export *class_export_get(struct obd_export *exp)
803 cfs_atomic_inc(&exp->exp_refcount);
804 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
805 cfs_atomic_read(&exp->exp_refcount));
808 EXPORT_SYMBOL(class_export_get);
810 void class_export_put(struct obd_export *exp)
812 LASSERT(exp != NULL);
813 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
814 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
815 cfs_atomic_read(&exp->exp_refcount) - 1);
817 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
818 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
819 CDEBUG(D_IOCTL, "final put %p/%s\n",
820 exp, exp->exp_client_uuid.uuid);
822 /* release nid stat refererence */
823 lprocfs_exp_cleanup(exp);
824 class_export_recovery_cleanup(exp);
826 obd_zombie_export_add(exp);
829 EXPORT_SYMBOL(class_export_put);
831 /* Creates a new export, adds it to the hash table, and returns a
832 * pointer to it. The refcount is 2: one for the hash reference, and
833 * one for the pointer returned by this function. */
834 struct obd_export *class_new_export(struct obd_device *obd,
835 struct obd_uuid *cluuid)
837 struct obd_export *export;
838 cfs_hash_t *hash = NULL;
842 OBD_ALLOC_PTR(export);
844 return ERR_PTR(-ENOMEM);
846 export->exp_conn_cnt = 0;
847 export->exp_lock_hash = NULL;
848 cfs_atomic_set(&export->exp_refcount, 2);
849 cfs_atomic_set(&export->exp_rpc_count, 0);
850 cfs_atomic_set(&export->exp_cb_count, 0);
851 cfs_atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
854 cfs_spin_lock_init(&export->exp_locks_list_guard);
856 cfs_atomic_set(&export->exp_replay_count, 0);
857 export->exp_obd = obd;
858 CFS_INIT_LIST_HEAD(&export->exp_flock_wait_list);
859 cfs_rwlock_init(&export->exp_flock_wait_lock);
860 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
861 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
862 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
863 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
864 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
865 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
866 class_handle_hash(&export->exp_handle, export_handle_addref);
867 export->exp_last_request_time = cfs_time_current_sec();
868 cfs_spin_lock_init(&export->exp_lock);
869 cfs_spin_lock_init(&export->exp_rpc_lock);
870 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
871 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
872 cfs_spin_lock_init(&export->exp_bl_list_lock);
873 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
875 export->exp_sp_peer = LUSTRE_SP_ANY;
876 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877 export->exp_client_uuid = *cluuid;
878 obd_init_export(export);
880 cfs_spin_lock(&obd->obd_dev_lock);
881 /* shouldn't happen, but might race */
882 if (obd->obd_stopping)
883 GOTO(exit_unlock, rc = -ENODEV);
885 hash = cfs_hash_getref(obd->obd_uuid_hash);
887 GOTO(exit_unlock, rc = -ENODEV);
888 cfs_spin_unlock(&obd->obd_dev_lock);
890 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894 obd->obd_name, cluuid->uuid, rc);
895 GOTO(exit_err, rc = -EALREADY);
899 cfs_spin_lock(&obd->obd_dev_lock);
900 if (obd->obd_stopping) {
901 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
902 GOTO(exit_unlock, rc = -ENODEV);
905 class_incref(obd, "export", export);
906 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
907 cfs_list_add_tail(&export->exp_obd_chain_timed,
908 &export->exp_obd->obd_exports_timed);
909 export->exp_obd->obd_num_exports++;
910 cfs_spin_unlock(&obd->obd_dev_lock);
911 cfs_hash_putref(hash);
915 cfs_spin_unlock(&obd->obd_dev_lock);
918 cfs_hash_putref(hash);
919 class_handle_unhash(&export->exp_handle);
920 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
921 obd_destroy_export(export);
922 OBD_FREE_PTR(export);
925 EXPORT_SYMBOL(class_new_export);
927 void class_unlink_export(struct obd_export *exp)
929 class_handle_unhash(&exp->exp_handle);
931 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
932 /* delete an uuid-export hashitem from hashtables */
933 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
934 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
935 &exp->exp_client_uuid,
936 &exp->exp_uuid_hash);
938 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
939 cfs_list_del_init(&exp->exp_obd_chain_timed);
940 exp->exp_obd->obd_num_exports--;
941 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
942 class_export_put(exp);
944 EXPORT_SYMBOL(class_unlink_export);
946 /* Import management functions */
947 void class_import_destroy(struct obd_import *imp)
951 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
952 imp->imp_obd->obd_name);
954 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
956 ptlrpc_put_connection_superhack(imp->imp_connection);
958 while (!cfs_list_empty(&imp->imp_conn_list)) {
959 struct obd_import_conn *imp_conn;
961 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
962 struct obd_import_conn, oic_item);
963 cfs_list_del_init(&imp_conn->oic_item);
964 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
965 OBD_FREE(imp_conn, sizeof(*imp_conn));
968 LASSERT(imp->imp_sec == NULL);
969 class_decref(imp->imp_obd, "import", imp);
970 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
974 static void import_handle_addref(void *import)
976 class_import_get(import);
979 struct obd_import *class_import_get(struct obd_import *import)
981 cfs_atomic_inc(&import->imp_refcount);
982 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
983 cfs_atomic_read(&import->imp_refcount),
984 import->imp_obd->obd_name);
987 EXPORT_SYMBOL(class_import_get);
989 void class_import_put(struct obd_import *imp)
993 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
994 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
996 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
997 cfs_atomic_read(&imp->imp_refcount) - 1,
998 imp->imp_obd->obd_name);
1000 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1001 CDEBUG(D_INFO, "final put import %p\n", imp);
1002 obd_zombie_import_add(imp);
1007 EXPORT_SYMBOL(class_import_put);
1009 static void init_imp_at(struct imp_at *at) {
1011 at_init(&at->iat_net_latency, 0, 0);
1012 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1013 /* max service estimates are tracked on the server side, so
1014 don't use the AT history here, just use the last reported
1015 val. (But keep hist for proc histogram, worst_ever) */
1016 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1021 struct obd_import *class_new_import(struct obd_device *obd)
1023 struct obd_import *imp;
1025 OBD_ALLOC(imp, sizeof(*imp));
1029 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1030 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1031 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1032 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1033 cfs_spin_lock_init(&imp->imp_lock);
1034 imp->imp_last_success_conn = 0;
1035 imp->imp_state = LUSTRE_IMP_NEW;
1036 imp->imp_obd = class_incref(obd, "import", imp);
1037 cfs_mutex_init(&imp->imp_sec_mutex);
1038 cfs_waitq_init(&imp->imp_recovery_waitq);
1040 cfs_atomic_set(&imp->imp_refcount, 2);
1041 cfs_atomic_set(&imp->imp_unregistering, 0);
1042 cfs_atomic_set(&imp->imp_inflight, 0);
1043 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1044 cfs_atomic_set(&imp->imp_inval_count, 0);
1045 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1046 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1047 class_handle_hash(&imp->imp_handle, import_handle_addref);
1048 init_imp_at(&imp->imp_at);
1050 /* the default magic is V2, will be used in connect RPC, and
1051 * then adjusted according to the flags in request/reply. */
1052 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1056 EXPORT_SYMBOL(class_new_import);
1058 void class_destroy_import(struct obd_import *import)
1060 LASSERT(import != NULL);
1061 LASSERT(import != LP_POISON);
1063 class_handle_unhash(&import->imp_handle);
1065 cfs_spin_lock(&import->imp_lock);
1066 import->imp_generation++;
1067 cfs_spin_unlock(&import->imp_lock);
1068 class_import_put(import);
1070 EXPORT_SYMBOL(class_destroy_import);
1072 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1074 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1076 cfs_spin_lock(&exp->exp_locks_list_guard);
1078 LASSERT(lock->l_exp_refs_nr >= 0);
1080 if (lock->l_exp_refs_target != NULL &&
1081 lock->l_exp_refs_target != exp) {
1082 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1083 exp, lock, lock->l_exp_refs_target);
1085 if ((lock->l_exp_refs_nr ++) == 0) {
1086 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1087 lock->l_exp_refs_target = exp;
1089 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1090 lock, exp, lock->l_exp_refs_nr);
1091 cfs_spin_unlock(&exp->exp_locks_list_guard);
1093 EXPORT_SYMBOL(__class_export_add_lock_ref);
1095 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1097 cfs_spin_lock(&exp->exp_locks_list_guard);
1098 LASSERT(lock->l_exp_refs_nr > 0);
1099 if (lock->l_exp_refs_target != exp) {
1100 LCONSOLE_WARN("lock %p, "
1101 "mismatching export pointers: %p, %p\n",
1102 lock, lock->l_exp_refs_target, exp);
1104 if (-- lock->l_exp_refs_nr == 0) {
1105 cfs_list_del_init(&lock->l_exp_refs_link);
1106 lock->l_exp_refs_target = NULL;
1108 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1109 lock, exp, lock->l_exp_refs_nr);
1110 cfs_spin_unlock(&exp->exp_locks_list_guard);
1112 EXPORT_SYMBOL(__class_export_del_lock_ref);
1115 /* A connection defines an export context in which preallocation can
1116 be managed. This releases the export pointer reference, and returns
1117 the export handle, so the export refcount is 1 when this function
1119 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1120 struct obd_uuid *cluuid)
1122 struct obd_export *export;
1123 LASSERT(conn != NULL);
1124 LASSERT(obd != NULL);
1125 LASSERT(cluuid != NULL);
1128 export = class_new_export(obd, cluuid);
1130 RETURN(PTR_ERR(export));
1132 conn->cookie = export->exp_handle.h_cookie;
1133 class_export_put(export);
1135 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1136 cluuid->uuid, conn->cookie);
1139 EXPORT_SYMBOL(class_connect);
1142 /* This function removes 1-3 references from the export:
1143 * 1 - for export pointer passed
1144 * and if disconnect really need
1145 * 2 - removing from hash
1146 * 3 - in client_unlink_export
1147 * The export pointer passed to this function can destroyed */
1148 int class_disconnect(struct obd_export *export)
1150 int already_disconnected;
1153 if (export == NULL) {
1154 CWARN("attempting to free NULL export %p\n", export);
1158 cfs_spin_lock(&export->exp_lock);
1159 already_disconnected = export->exp_disconnected;
1160 export->exp_disconnected = 1;
1161 cfs_spin_unlock(&export->exp_lock);
1163 /* class_cleanup(), abort_recovery(), and class_fail_export()
1164 * all end up in here, and if any of them race we shouldn't
1165 * call extra class_export_puts(). */
1166 if (already_disconnected) {
1167 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1168 GOTO(no_disconn, already_disconnected);
1171 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1172 export->exp_handle.h_cookie);
1174 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1175 cfs_hash_del(export->exp_obd->obd_nid_hash,
1176 &export->exp_connection->c_peer.nid,
1177 &export->exp_nid_hash);
1179 class_unlink_export(export);
1181 class_export_put(export);
1184 EXPORT_SYMBOL(class_disconnect);
1186 /* Return non-zero for a fully connected export */
1187 int class_connected_export(struct obd_export *exp)
1191 cfs_spin_lock(&exp->exp_lock);
1192 connected = (exp->exp_conn_cnt > 0);
1193 cfs_spin_unlock(&exp->exp_lock);
1198 EXPORT_SYMBOL(class_connected_export);
1200 static void class_disconnect_export_list(cfs_list_t *list,
1201 enum obd_option flags)
1204 struct obd_export *exp;
1207 /* It's possible that an export may disconnect itself, but
1208 * nothing else will be added to this list. */
1209 while (!cfs_list_empty(list)) {
1210 exp = cfs_list_entry(list->next, struct obd_export,
1212 /* need for safe call CDEBUG after obd_disconnect */
1213 class_export_get(exp);
1215 cfs_spin_lock(&exp->exp_lock);
1216 exp->exp_flags = flags;
1217 cfs_spin_unlock(&exp->exp_lock);
1219 if (obd_uuid_equals(&exp->exp_client_uuid,
1220 &exp->exp_obd->obd_uuid)) {
1222 "exp %p export uuid == obd uuid, don't discon\n",
1224 /* Need to delete this now so we don't end up pointing
1225 * to work_list later when this export is cleaned up. */
1226 cfs_list_del_init(&exp->exp_obd_chain);
1227 class_export_put(exp);
1231 class_export_get(exp);
1232 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1233 "last request at "CFS_TIME_T"\n",
1234 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1235 exp, exp->exp_last_request_time);
1236 /* release one export reference anyway */
1237 rc = obd_disconnect(exp);
1239 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1240 obd_export_nid2str(exp), exp, rc);
1241 class_export_put(exp);
1246 void class_disconnect_exports(struct obd_device *obd)
1248 cfs_list_t work_list;
1251 /* Move all of the exports from obd_exports to a work list, en masse. */
1252 CFS_INIT_LIST_HEAD(&work_list);
1253 cfs_spin_lock(&obd->obd_dev_lock);
1254 cfs_list_splice_init(&obd->obd_exports, &work_list);
1255 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1256 cfs_spin_unlock(&obd->obd_dev_lock);
1258 if (!cfs_list_empty(&work_list)) {
1259 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1260 "disconnecting them\n", obd->obd_minor, obd);
1261 class_disconnect_export_list(&work_list,
1262 exp_flags_from_obd(obd));
1264 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1265 obd->obd_minor, obd);
1268 EXPORT_SYMBOL(class_disconnect_exports);
1270 /* Remove exports that have not completed recovery.
1272 void class_disconnect_stale_exports(struct obd_device *obd,
1273 int (*test_export)(struct obd_export *))
1275 cfs_list_t work_list;
1276 cfs_list_t *pos, *n;
1277 struct obd_export *exp;
1281 CFS_INIT_LIST_HEAD(&work_list);
1282 cfs_spin_lock(&obd->obd_dev_lock);
1283 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1286 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1288 /* don't count self-export as client */
1289 if (obd_uuid_equals(&exp->exp_client_uuid,
1290 &exp->exp_obd->obd_uuid))
1293 if (test_export(exp))
1296 cfs_spin_lock(&exp->exp_lock);
1297 failed = exp->exp_failed;
1298 exp->exp_failed = 1;
1299 cfs_spin_unlock(&exp->exp_lock);
1303 cfs_list_move(&exp->exp_obd_chain, &work_list);
1305 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1306 obd->obd_name, exp->exp_client_uuid.uuid,
1307 exp->exp_connection == NULL ? "<unknown>" :
1308 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1309 print_export_data(exp, "EVICTING", 0);
1311 cfs_spin_unlock(&obd->obd_dev_lock);
1314 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1315 obd->obd_name, evicted);
1316 obd->obd_stale_clients += evicted;
1318 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1319 OBD_OPT_ABORT_RECOV);
1322 EXPORT_SYMBOL(class_disconnect_stale_exports);
1324 void class_fail_export(struct obd_export *exp)
1326 int rc, already_failed;
1328 cfs_spin_lock(&exp->exp_lock);
1329 already_failed = exp->exp_failed;
1330 exp->exp_failed = 1;
1331 cfs_spin_unlock(&exp->exp_lock);
1333 if (already_failed) {
1334 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1335 exp, exp->exp_client_uuid.uuid);
1339 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1340 exp, exp->exp_client_uuid.uuid);
1342 if (obd_dump_on_timeout)
1343 libcfs_debug_dumplog();
1345 /* need for safe call CDEBUG after obd_disconnect */
1346 class_export_get(exp);
1348 /* Most callers into obd_disconnect are removing their own reference
1349 * (request, for example) in addition to the one from the hash table.
1350 * We don't have such a reference here, so make one. */
1351 class_export_get(exp);
1352 rc = obd_disconnect(exp);
1354 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1356 CDEBUG(D_HA, "disconnected export %p/%s\n",
1357 exp, exp->exp_client_uuid.uuid);
1358 class_export_put(exp);
1360 EXPORT_SYMBOL(class_fail_export);
1362 char *obd_export_nid2str(struct obd_export *exp)
1364 if (exp->exp_connection != NULL)
1365 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1369 EXPORT_SYMBOL(obd_export_nid2str);
1371 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1373 struct obd_export *doomed_exp = NULL;
1374 int exports_evicted = 0;
1376 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1379 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1380 if (doomed_exp == NULL)
1383 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1384 "nid %s found, wanted nid %s, requested nid %s\n",
1385 obd_export_nid2str(doomed_exp),
1386 libcfs_nid2str(nid_key), nid);
1387 LASSERTF(doomed_exp != obd->obd_self_export,
1388 "self-export is hashed by NID?\n");
1390 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1391 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1393 class_fail_export(doomed_exp);
1394 class_export_put(doomed_exp);
1397 if (!exports_evicted)
1398 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1399 obd->obd_name, nid);
1400 return exports_evicted;
1402 EXPORT_SYMBOL(obd_export_evict_by_nid);
1404 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1406 struct obd_export *doomed_exp = NULL;
1407 struct obd_uuid doomed_uuid;
1408 int exports_evicted = 0;
1410 obd_str2uuid(&doomed_uuid, uuid);
1411 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1412 CERROR("%s: can't evict myself\n", obd->obd_name);
1413 return exports_evicted;
1416 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1418 if (doomed_exp == NULL) {
1419 CERROR("%s: can't disconnect %s: no exports found\n",
1420 obd->obd_name, uuid);
1422 CWARN("%s: evicting %s at adminstrative request\n",
1423 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1424 class_fail_export(doomed_exp);
1425 class_export_put(doomed_exp);
1429 return exports_evicted;
1431 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1433 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1434 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1435 EXPORT_SYMBOL(class_export_dump_hook);
1438 static void print_export_data(struct obd_export *exp, const char *status,
1441 struct ptlrpc_reply_state *rs;
1442 struct ptlrpc_reply_state *first_reply = NULL;
1445 cfs_spin_lock(&exp->exp_lock);
1446 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1452 cfs_spin_unlock(&exp->exp_lock);
1454 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1455 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1456 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1457 cfs_atomic_read(&exp->exp_rpc_count),
1458 cfs_atomic_read(&exp->exp_cb_count),
1459 cfs_atomic_read(&exp->exp_locks_count),
1460 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1461 nreplies, first_reply, nreplies > 3 ? "..." : "",
1462 exp->exp_last_committed);
1463 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1464 if (locks && class_export_dump_hook != NULL)
1465 class_export_dump_hook(exp);
1469 void dump_exports(struct obd_device *obd, int locks)
1471 struct obd_export *exp;
1473 cfs_spin_lock(&obd->obd_dev_lock);
1474 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1475 print_export_data(exp, "ACTIVE", locks);
1476 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1477 print_export_data(exp, "UNLINKED", locks);
1478 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1479 print_export_data(exp, "DELAYED", locks);
1480 cfs_spin_unlock(&obd->obd_dev_lock);
1481 cfs_spin_lock(&obd_zombie_impexp_lock);
1482 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1483 print_export_data(exp, "ZOMBIE", locks);
1484 cfs_spin_unlock(&obd_zombie_impexp_lock);
1486 EXPORT_SYMBOL(dump_exports);
1488 void obd_exports_barrier(struct obd_device *obd)
1491 LASSERT(cfs_list_empty(&obd->obd_exports));
1492 cfs_spin_lock(&obd->obd_dev_lock);
1493 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1494 cfs_spin_unlock(&obd->obd_dev_lock);
1495 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1496 cfs_time_seconds(waited));
1497 if (waited > 5 && IS_PO2(waited)) {
1498 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1499 "more than %d seconds. "
1500 "The obd refcount = %d. Is it stuck?\n",
1501 obd->obd_name, waited,
1502 cfs_atomic_read(&obd->obd_refcount));
1503 dump_exports(obd, 1);
1506 cfs_spin_lock(&obd->obd_dev_lock);
1508 cfs_spin_unlock(&obd->obd_dev_lock);
1510 EXPORT_SYMBOL(obd_exports_barrier);
1512 /* Total amount of zombies to be destroyed */
1513 static int zombies_count = 0;
1516 * kill zombie imports and exports
1518 void obd_zombie_impexp_cull(void)
1520 struct obd_import *import;
1521 struct obd_export *export;
1525 cfs_spin_lock(&obd_zombie_impexp_lock);
1528 if (!cfs_list_empty(&obd_zombie_imports)) {
1529 import = cfs_list_entry(obd_zombie_imports.next,
1532 cfs_list_del_init(&import->imp_zombie_chain);
1536 if (!cfs_list_empty(&obd_zombie_exports)) {
1537 export = cfs_list_entry(obd_zombie_exports.next,
1540 cfs_list_del_init(&export->exp_obd_chain);
1543 cfs_spin_unlock(&obd_zombie_impexp_lock);
1545 if (import != NULL) {
1546 class_import_destroy(import);
1547 cfs_spin_lock(&obd_zombie_impexp_lock);
1549 cfs_spin_unlock(&obd_zombie_impexp_lock);
1552 if (export != NULL) {
1553 class_export_destroy(export);
1554 cfs_spin_lock(&obd_zombie_impexp_lock);
1556 cfs_spin_unlock(&obd_zombie_impexp_lock);
1560 } while (import != NULL || export != NULL);
1564 static cfs_completion_t obd_zombie_start;
1565 static cfs_completion_t obd_zombie_stop;
1566 static unsigned long obd_zombie_flags;
1567 static cfs_waitq_t obd_zombie_waitq;
1568 static pid_t obd_zombie_pid;
1571 OBD_ZOMBIE_STOP = 1 << 1
1575 * check for work for kill zombie import/export thread.
1577 static int obd_zombie_impexp_check(void *arg)
1581 cfs_spin_lock(&obd_zombie_impexp_lock);
1582 rc = (zombies_count == 0) &&
1583 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1584 cfs_spin_unlock(&obd_zombie_impexp_lock);
1590 * Add export to the obd_zombe thread and notify it.
1592 static void obd_zombie_export_add(struct obd_export *exp) {
1593 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1594 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1595 cfs_list_del_init(&exp->exp_obd_chain);
1596 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1597 cfs_spin_lock(&obd_zombie_impexp_lock);
1599 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1600 cfs_spin_unlock(&obd_zombie_impexp_lock);
1602 obd_zombie_impexp_notify();
1606 * Add import to the obd_zombe thread and notify it.
1608 static void obd_zombie_import_add(struct obd_import *imp) {
1609 LASSERT(imp->imp_sec == NULL);
1610 LASSERT(imp->imp_rq_pool == NULL);
1611 cfs_spin_lock(&obd_zombie_impexp_lock);
1612 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1614 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1615 cfs_spin_unlock(&obd_zombie_impexp_lock);
1617 obd_zombie_impexp_notify();
1621 * notify import/export destroy thread about new zombie.
1623 static void obd_zombie_impexp_notify(void)
1626 * Make sure obd_zomebie_impexp_thread get this notification.
1627 * It is possible this signal only get by obd_zombie_barrier, and
1628 * barrier gulps this notification and sleeps away and hangs ensues
1630 cfs_waitq_broadcast(&obd_zombie_waitq);
1634 * check whether obd_zombie is idle
1636 static int obd_zombie_is_idle(void)
1640 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1641 cfs_spin_lock(&obd_zombie_impexp_lock);
1642 rc = (zombies_count == 0);
1643 cfs_spin_unlock(&obd_zombie_impexp_lock);
1648 * wait when obd_zombie import/export queues become empty
1650 void obd_zombie_barrier(void)
1652 struct l_wait_info lwi = { 0 };
1654 if (obd_zombie_pid == cfs_curproc_pid())
1655 /* don't wait for myself */
1657 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1659 EXPORT_SYMBOL(obd_zombie_barrier);
1664 * destroy zombie export/import thread.
1666 static int obd_zombie_impexp_thread(void *unused)
1670 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1671 cfs_complete(&obd_zombie_start);
1675 cfs_complete(&obd_zombie_start);
1677 obd_zombie_pid = cfs_curproc_pid();
1679 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1680 struct l_wait_info lwi = { 0 };
1682 l_wait_event(obd_zombie_waitq,
1683 !obd_zombie_impexp_check(NULL), &lwi);
1684 obd_zombie_impexp_cull();
1687 * Notify obd_zombie_barrier callers that queues
1690 cfs_waitq_signal(&obd_zombie_waitq);
1693 cfs_complete(&obd_zombie_stop);
1698 #else /* ! KERNEL */
1700 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1701 static void *obd_zombie_impexp_work_cb;
1702 static void *obd_zombie_impexp_idle_cb;
1704 int obd_zombie_impexp_kill(void *arg)
1708 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1709 obd_zombie_impexp_cull();
1712 cfs_atomic_dec(&zombie_recur);
1719 * start destroy zombie import/export thread
1721 int obd_zombie_impexp_init(void)
1725 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1726 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1727 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1728 cfs_init_completion(&obd_zombie_start);
1729 cfs_init_completion(&obd_zombie_stop);
1730 cfs_waitq_init(&obd_zombie_waitq);
1734 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1738 cfs_wait_for_completion(&obd_zombie_start);
1741 obd_zombie_impexp_work_cb =
1742 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1743 &obd_zombie_impexp_kill, NULL);
1745 obd_zombie_impexp_idle_cb =
1746 liblustre_register_idle_callback("obd_zombi_impexp_check",
1747 &obd_zombie_impexp_check, NULL);
1753 * stop destroy zombie import/export thread
1755 void obd_zombie_impexp_stop(void)
1757 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1758 obd_zombie_impexp_notify();
1760 cfs_wait_for_completion(&obd_zombie_stop);
1762 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1763 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1767 /***** Kernel-userspace comm helpers *******/
1769 /* Get length of entire message, including header */
1770 int kuc_len(int payload_len)
1772 return sizeof(struct kuc_hdr) + payload_len;
1774 EXPORT_SYMBOL(kuc_len);
1776 /* Get a pointer to kuc header, given a ptr to the payload
1777 * @param p Pointer to payload area
1778 * @returns Pointer to kuc header
1780 struct kuc_hdr * kuc_ptr(void *p)
1782 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1783 LASSERT(lh->kuc_magic == KUC_MAGIC);
1786 EXPORT_SYMBOL(kuc_ptr);
1788 /* Test if payload is part of kuc message
1789 * @param p Pointer to payload area
1792 int kuc_ispayload(void *p)
1794 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1796 if (kh->kuc_magic == KUC_MAGIC)
1801 EXPORT_SYMBOL(kuc_ispayload);
1803 /* Alloc space for a message, and fill in header
1804 * @return Pointer to payload area
1806 void *kuc_alloc(int payload_len, int transport, int type)
1809 int len = kuc_len(payload_len);
1813 return ERR_PTR(-ENOMEM);
1815 lh->kuc_magic = KUC_MAGIC;
1816 lh->kuc_transport = transport;
1817 lh->kuc_msgtype = type;
1818 lh->kuc_msglen = len;
1820 return (void *)(lh + 1);
1822 EXPORT_SYMBOL(kuc_alloc);
1824 /* Takes pointer to payload area */
1825 inline void kuc_free(void *p, int payload_len)
1827 struct kuc_hdr *lh = kuc_ptr(p);
1828 OBD_FREE(lh, kuc_len(payload_len));
1830 EXPORT_SYMBOL(kuc_free);