4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
123 if (!cfs_request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 cfs_spin_lock(&type->obd_type_lock);
135 cfs_try_module_get(type->typ_dt_ops->o_owner);
136 cfs_spin_unlock(&type->obd_type_lock);
140 EXPORT_SYMBOL(class_get_type);
142 void class_put_type(struct obd_type *type)
145 cfs_spin_lock(&type->obd_type_lock);
147 cfs_module_put(type->typ_dt_ops->o_owner);
148 cfs_spin_unlock(&type->obd_type_lock);
150 EXPORT_SYMBOL(class_put_type);
152 #define CLASS_MAX_NAME 1024
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155 struct lprocfs_vars *vars, const char *name,
156 struct lu_device_type *ldt)
158 struct obd_type *type;
163 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165 if (class_search_type(name)) {
166 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171 OBD_ALLOC(type, sizeof(*type));
175 OBD_ALLOC_PTR(type->typ_dt_ops);
176 OBD_ALLOC_PTR(type->typ_md_ops);
177 OBD_ALLOC(type->typ_name, strlen(name) + 1);
179 if (type->typ_dt_ops == NULL ||
180 type->typ_md_ops == NULL ||
181 type->typ_name == NULL)
184 *(type->typ_dt_ops) = *dt_ops;
185 /* md_ops is optional */
187 *(type->typ_md_ops) = *md_ops;
188 strcpy(type->typ_name, name);
189 cfs_spin_lock_init(&type->obd_type_lock);
192 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194 if (IS_ERR(type->typ_procroot)) {
195 rc = PTR_ERR(type->typ_procroot);
196 type->typ_procroot = NULL;
202 rc = lu_device_type_init(ldt);
207 cfs_spin_lock(&obd_types_lock);
208 cfs_list_add(&type->typ_chain, &obd_types);
209 cfs_spin_unlock(&obd_types_lock);
214 if (type->typ_name != NULL)
215 OBD_FREE(type->typ_name, strlen(name) + 1);
216 if (type->typ_md_ops != NULL)
217 OBD_FREE_PTR(type->typ_md_ops);
218 if (type->typ_dt_ops != NULL)
219 OBD_FREE_PTR(type->typ_dt_ops);
220 OBD_FREE(type, sizeof(*type));
223 EXPORT_SYMBOL(class_register_type);
225 int class_unregister_type(const char *name)
227 struct obd_type *type = class_search_type(name);
231 CERROR("unknown obd type\n");
235 if (type->typ_refcnt) {
236 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
237 /* This is a bad situation, let's make the best of it */
238 /* Remove ops, but leave the name for debugging */
239 OBD_FREE_PTR(type->typ_dt_ops);
240 OBD_FREE_PTR(type->typ_md_ops);
244 if (type->typ_procroot) {
245 lprocfs_remove(&type->typ_procroot);
249 lu_device_type_fini(type->typ_lu);
251 cfs_spin_lock(&obd_types_lock);
252 cfs_list_del(&type->typ_chain);
253 cfs_spin_unlock(&obd_types_lock);
254 OBD_FREE(type->typ_name, strlen(name) + 1);
255 if (type->typ_dt_ops != NULL)
256 OBD_FREE_PTR(type->typ_dt_ops);
257 if (type->typ_md_ops != NULL)
258 OBD_FREE_PTR(type->typ_md_ops);
259 OBD_FREE(type, sizeof(*type));
261 } /* class_unregister_type */
262 EXPORT_SYMBOL(class_unregister_type);
265 * Create a new obd device.
267 * Find an empty slot in ::obd_devs[], create a new obd device in it.
269 * \param[in] type_name obd device type string.
270 * \param[in] name obd device name.
272 * \retval NULL if create fails, otherwise return the obd device
275 struct obd_device *class_newdev(const char *type_name, const char *name)
277 struct obd_device *result = NULL;
278 struct obd_device *newdev;
279 struct obd_type *type = NULL;
281 int new_obd_minor = 0;
284 if (strlen(name) >= MAX_OBD_NAME) {
285 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286 RETURN(ERR_PTR(-EINVAL));
289 type = class_get_type(type_name);
291 CERROR("OBD: unknown type: %s\n", type_name);
292 RETURN(ERR_PTR(-ENODEV));
295 newdev = obd_device_alloc();
296 if (newdev == NULL) {
297 class_put_type(type);
298 RETURN(ERR_PTR(-ENOMEM));
300 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302 cfs_write_lock(&obd_dev_lock);
303 for (i = 0; i < class_devno_max(); i++) {
304 struct obd_device *obd = class_num2obd(i);
306 if (obd && obd->obd_name &&
307 (strcmp(name, obd->obd_name) == 0)) {
308 CERROR("Device %s already exists at %d, won't add\n",
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0]='\0';
321 result = ERR_PTR(-EEXIST);
324 if (!result && !obd) {
326 result->obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
334 cfs_write_unlock(&obd_dev_lock);
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 RETURN(ERR_PTR(-EOVERFLOW));
342 if (IS_ERR(result)) {
343 obd_device_free(newdev);
344 class_put_type(type);
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
352 void class_release_dev(struct obd_device *obd)
354 struct obd_type *obd_type = obd->obd_type;
356 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360 LASSERT(obd_type != NULL);
362 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
363 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365 cfs_write_lock(&obd_dev_lock);
366 obd_devs[obd->obd_minor] = NULL;
367 cfs_write_unlock(&obd_dev_lock);
368 obd_device_free(obd);
370 class_put_type(obd_type);
373 int class_name2dev(const char *name)
380 cfs_read_lock(&obd_dev_lock);
381 for (i = 0; i < class_devno_max(); i++) {
382 struct obd_device *obd = class_num2obd(i);
384 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385 /* Make sure we finished attaching before we give
386 out any references */
387 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388 if (obd->obd_attached) {
389 cfs_read_unlock(&obd_dev_lock);
395 cfs_read_unlock(&obd_dev_lock);
399 EXPORT_SYMBOL(class_name2dev);
401 struct obd_device *class_name2obd(const char *name)
403 int dev = class_name2dev(name);
405 if (dev < 0 || dev > class_devno_max())
407 return class_num2obd(dev);
409 EXPORT_SYMBOL(class_name2obd);
411 int class_uuid2dev(struct obd_uuid *uuid)
415 cfs_read_lock(&obd_dev_lock);
416 for (i = 0; i < class_devno_max(); i++) {
417 struct obd_device *obd = class_num2obd(i);
419 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421 cfs_read_unlock(&obd_dev_lock);
425 cfs_read_unlock(&obd_dev_lock);
429 EXPORT_SYMBOL(class_uuid2dev);
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 int dev = class_uuid2dev(uuid);
436 return class_num2obd(dev);
438 EXPORT_SYMBOL(class_uuid2obd);
441 * Get obd device from ::obd_devs[]
443 * \param num [in] array index
445 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446 * otherwise return the obd device there.
448 struct obd_device *class_num2obd(int num)
450 struct obd_device *obd = NULL;
452 if (num < class_devno_max()) {
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458 "%p obd_magic %08x != %08x\n",
459 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460 LASSERTF(obd->obd_minor == num,
461 "%p obd_minor %0d != %0d\n",
462 obd, obd->obd_minor, num);
467 EXPORT_SYMBOL(class_num2obd);
469 void class_obd_list(void)
474 cfs_read_lock(&obd_dev_lock);
475 for (i = 0; i < class_devno_max(); i++) {
476 struct obd_device *obd = class_num2obd(i);
480 if (obd->obd_stopping)
482 else if (obd->obd_set_up)
484 else if (obd->obd_attached)
488 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489 i, status, obd->obd_type->typ_name,
490 obd->obd_name, obd->obd_uuid.uuid,
491 cfs_atomic_read(&obd->obd_refcount));
493 cfs_read_unlock(&obd_dev_lock);
497 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
498 specified, then only the client with that uuid is returned,
499 otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501 const char * typ_name,
502 struct obd_uuid *grp_uuid)
506 cfs_read_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
512 if ((strncmp(obd->obd_type->typ_name, typ_name,
513 strlen(typ_name)) == 0)) {
514 if (obd_uuid_equals(tgt_uuid,
515 &obd->u.cli.cl_target_uuid) &&
516 ((grp_uuid)? obd_uuid_equals(grp_uuid,
517 &obd->obd_uuid) : 1)) {
518 cfs_read_unlock(&obd_dev_lock);
523 cfs_read_unlock(&obd_dev_lock);
527 EXPORT_SYMBOL(class_find_client_obd);
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530 searching at *next, and if a device is found, the next index to look
531 at is saved in *next. If next is NULL, then the first matching device
532 will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
539 else if (*next >= 0 && *next < class_devno_max())
544 cfs_read_lock(&obd_dev_lock);
545 for (; i < class_devno_max(); i++) {
546 struct obd_device *obd = class_num2obd(i);
550 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
553 cfs_read_unlock(&obd_dev_lock);
557 cfs_read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_devices_in_group);
564 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565 * adjust sptlrpc settings accordingly.
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 struct obd_device *obd;
573 LASSERT(namelen > 0);
575 cfs_read_lock(&obd_dev_lock);
576 for (i = 0; i < class_devno_max(); i++) {
577 obd = class_num2obd(i);
579 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
582 /* only notify mdc, osc, mdt, ost */
583 type = obd->obd_type->typ_name;
584 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587 strcmp(type, LUSTRE_OST_NAME) != 0)
590 if (strncmp(obd->obd_name, fsname, namelen))
593 class_incref(obd, __FUNCTION__, obd);
594 cfs_read_unlock(&obd_dev_lock);
595 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
596 sizeof(KEY_SPTLRPC_CONF),
597 KEY_SPTLRPC_CONF, 0, NULL, NULL);
599 class_decref(obd, __FUNCTION__, obd);
600 cfs_read_lock(&obd_dev_lock);
602 cfs_read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607 void obd_cleanup_caches(void)
612 if (obd_device_cachep) {
613 rc = cfs_mem_cache_destroy(obd_device_cachep);
614 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615 obd_device_cachep = NULL;
618 rc = cfs_mem_cache_destroy(obdo_cachep);
619 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
623 rc = cfs_mem_cache_destroy(import_cachep);
624 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625 import_cachep = NULL;
628 rc = cfs_mem_cache_destroy(capa_cachep);
629 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
635 int obd_init_caches(void)
639 LASSERT(obd_device_cachep == NULL);
640 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641 sizeof(struct obd_device),
643 if (!obd_device_cachep)
646 LASSERT(obdo_cachep == NULL);
647 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
652 LASSERT(import_cachep == NULL);
653 import_cachep = cfs_mem_cache_create("ll_import_cache",
654 sizeof(struct obd_import),
659 LASSERT(capa_cachep == NULL);
660 capa_cachep = cfs_mem_cache_create("capa_cache",
661 sizeof(struct obd_capa), 0, 0);
667 obd_cleanup_caches();
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 struct obd_export *export;
679 CDEBUG(D_CACHE, "looking for null handle\n");
683 if (conn->cookie == -1) { /* this means assign a new connection */
684 CDEBUG(D_CACHE, "want a new connection\n");
688 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689 export = class_handle2object(conn->cookie);
692 EXPORT_SYMBOL(class_conn2export);
694 struct obd_device *class_exp2obd(struct obd_export *exp)
700 EXPORT_SYMBOL(class_exp2obd);
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 struct obd_export *export;
705 export = class_conn2export(conn);
707 struct obd_device *obd = export->exp_obd;
708 class_export_put(export);
713 EXPORT_SYMBOL(class_conn2obd);
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 struct obd_device *obd = exp->exp_obd;
720 return obd->u.cli.cl_import;
722 EXPORT_SYMBOL(class_exp2cliimp);
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 struct obd_device *obd = class_conn2obd(conn);
729 return obd->u.cli.cl_import;
731 EXPORT_SYMBOL(class_conn2cliimp);
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
736 struct obd_device *obd = exp->exp_obd;
739 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
740 LASSERT(obd != NULL);
742 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
743 exp->exp_client_uuid.uuid, obd->obd_name);
745 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
746 if (exp->exp_connection)
747 ptlrpc_put_connection_superhack(exp->exp_connection);
749 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
750 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
751 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
752 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
753 obd_destroy_export(exp);
754 class_decref(obd, "export", exp);
756 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
760 static void export_handle_addref(void *export)
762 class_export_get(export);
765 static struct portals_handle_ops export_handle_ops = {
766 .hop_addref = export_handle_addref,
770 struct obd_export *class_export_get(struct obd_export *exp)
772 cfs_atomic_inc(&exp->exp_refcount);
773 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
774 cfs_atomic_read(&exp->exp_refcount));
777 EXPORT_SYMBOL(class_export_get);
779 void class_export_put(struct obd_export *exp)
781 LASSERT(exp != NULL);
782 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
783 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
784 cfs_atomic_read(&exp->exp_refcount) - 1);
786 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
787 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
788 CDEBUG(D_IOCTL, "final put %p/%s\n",
789 exp, exp->exp_client_uuid.uuid);
791 /* release nid stat refererence */
792 lprocfs_exp_cleanup(exp);
794 obd_zombie_export_add(exp);
797 EXPORT_SYMBOL(class_export_put);
799 /* Creates a new export, adds it to the hash table, and returns a
800 * pointer to it. The refcount is 2: one for the hash reference, and
801 * one for the pointer returned by this function. */
802 struct obd_export *class_new_export(struct obd_device *obd,
803 struct obd_uuid *cluuid)
805 struct obd_export *export;
806 cfs_hash_t *hash = NULL;
810 OBD_ALLOC_PTR(export);
812 return ERR_PTR(-ENOMEM);
814 export->exp_conn_cnt = 0;
815 export->exp_lock_hash = NULL;
816 export->exp_flock_hash = NULL;
817 cfs_atomic_set(&export->exp_refcount, 2);
818 cfs_atomic_set(&export->exp_rpc_count, 0);
819 cfs_atomic_set(&export->exp_cb_count, 0);
820 cfs_atomic_set(&export->exp_locks_count, 0);
821 #if LUSTRE_TRACKS_LOCK_EXP_REFS
822 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
823 cfs_spin_lock_init(&export->exp_locks_list_guard);
825 cfs_atomic_set(&export->exp_replay_count, 0);
826 export->exp_obd = obd;
827 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
828 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
829 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
830 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
831 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
832 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
833 class_handle_hash(&export->exp_handle, &export_handle_ops);
834 export->exp_last_request_time = cfs_time_current_sec();
835 cfs_spin_lock_init(&export->exp_lock);
836 cfs_spin_lock_init(&export->exp_rpc_lock);
837 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
838 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
839 cfs_spin_lock_init(&export->exp_bl_list_lock);
840 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
842 export->exp_sp_peer = LUSTRE_SP_ANY;
843 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
844 export->exp_client_uuid = *cluuid;
845 obd_init_export(export);
847 cfs_spin_lock(&obd->obd_dev_lock);
848 /* shouldn't happen, but might race */
849 if (obd->obd_stopping)
850 GOTO(exit_unlock, rc = -ENODEV);
852 hash = cfs_hash_getref(obd->obd_uuid_hash);
854 GOTO(exit_unlock, rc = -ENODEV);
855 cfs_spin_unlock(&obd->obd_dev_lock);
857 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
858 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
860 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
861 obd->obd_name, cluuid->uuid, rc);
862 GOTO(exit_err, rc = -EALREADY);
866 cfs_spin_lock(&obd->obd_dev_lock);
867 if (obd->obd_stopping) {
868 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
869 GOTO(exit_unlock, rc = -ENODEV);
872 class_incref(obd, "export", export);
873 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
874 cfs_list_add_tail(&export->exp_obd_chain_timed,
875 &export->exp_obd->obd_exports_timed);
876 export->exp_obd->obd_num_exports++;
877 cfs_spin_unlock(&obd->obd_dev_lock);
878 cfs_hash_putref(hash);
882 cfs_spin_unlock(&obd->obd_dev_lock);
885 cfs_hash_putref(hash);
886 class_handle_unhash(&export->exp_handle);
887 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
888 obd_destroy_export(export);
889 OBD_FREE_PTR(export);
892 EXPORT_SYMBOL(class_new_export);
894 void class_unlink_export(struct obd_export *exp)
896 class_handle_unhash(&exp->exp_handle);
898 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
899 /* delete an uuid-export hashitem from hashtables */
900 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
901 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
902 &exp->exp_client_uuid,
903 &exp->exp_uuid_hash);
905 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
906 cfs_list_del_init(&exp->exp_obd_chain_timed);
907 exp->exp_obd->obd_num_exports--;
908 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
909 class_export_put(exp);
911 EXPORT_SYMBOL(class_unlink_export);
913 /* Import management functions */
914 void class_import_destroy(struct obd_import *imp)
918 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
919 imp->imp_obd->obd_name);
921 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
923 ptlrpc_put_connection_superhack(imp->imp_connection);
925 while (!cfs_list_empty(&imp->imp_conn_list)) {
926 struct obd_import_conn *imp_conn;
928 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
929 struct obd_import_conn, oic_item);
930 cfs_list_del_init(&imp_conn->oic_item);
931 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
932 OBD_FREE(imp_conn, sizeof(*imp_conn));
935 LASSERT(imp->imp_sec == NULL);
936 class_decref(imp->imp_obd, "import", imp);
937 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
941 static void import_handle_addref(void *import)
943 class_import_get(import);
946 static struct portals_handle_ops import_handle_ops = {
947 .hop_addref = import_handle_addref,
951 struct obd_import *class_import_get(struct obd_import *import)
953 cfs_atomic_inc(&import->imp_refcount);
954 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
955 cfs_atomic_read(&import->imp_refcount),
956 import->imp_obd->obd_name);
959 EXPORT_SYMBOL(class_import_get);
961 void class_import_put(struct obd_import *imp)
965 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
966 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
968 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
969 cfs_atomic_read(&imp->imp_refcount) - 1,
970 imp->imp_obd->obd_name);
972 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
973 CDEBUG(D_INFO, "final put import %p\n", imp);
974 obd_zombie_import_add(imp);
977 /* catch possible import put race */
978 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
981 EXPORT_SYMBOL(class_import_put);
983 static void init_imp_at(struct imp_at *at) {
985 at_init(&at->iat_net_latency, 0, 0);
986 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
987 /* max service estimates are tracked on the server side, so
988 don't use the AT history here, just use the last reported
989 val. (But keep hist for proc histogram, worst_ever) */
990 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
995 struct obd_import *class_new_import(struct obd_device *obd)
997 struct obd_import *imp;
999 OBD_ALLOC(imp, sizeof(*imp));
1003 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1004 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1005 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1006 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1007 cfs_spin_lock_init(&imp->imp_lock);
1008 imp->imp_last_success_conn = 0;
1009 imp->imp_state = LUSTRE_IMP_NEW;
1010 imp->imp_obd = class_incref(obd, "import", imp);
1011 cfs_mutex_init(&imp->imp_sec_mutex);
1012 cfs_waitq_init(&imp->imp_recovery_waitq);
1014 cfs_atomic_set(&imp->imp_refcount, 2);
1015 cfs_atomic_set(&imp->imp_unregistering, 0);
1016 cfs_atomic_set(&imp->imp_inflight, 0);
1017 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1018 cfs_atomic_set(&imp->imp_inval_count, 0);
1019 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1020 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1021 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1022 init_imp_at(&imp->imp_at);
1024 /* the default magic is V2, will be used in connect RPC, and
1025 * then adjusted according to the flags in request/reply. */
1026 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1030 EXPORT_SYMBOL(class_new_import);
1032 void class_destroy_import(struct obd_import *import)
1034 LASSERT(import != NULL);
1035 LASSERT(import != LP_POISON);
1037 class_handle_unhash(&import->imp_handle);
1039 cfs_spin_lock(&import->imp_lock);
1040 import->imp_generation++;
1041 cfs_spin_unlock(&import->imp_lock);
1042 class_import_put(import);
1044 EXPORT_SYMBOL(class_destroy_import);
1046 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1048 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1050 cfs_spin_lock(&exp->exp_locks_list_guard);
1052 LASSERT(lock->l_exp_refs_nr >= 0);
1054 if (lock->l_exp_refs_target != NULL &&
1055 lock->l_exp_refs_target != exp) {
1056 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1057 exp, lock, lock->l_exp_refs_target);
1059 if ((lock->l_exp_refs_nr ++) == 0) {
1060 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1061 lock->l_exp_refs_target = exp;
1063 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1064 lock, exp, lock->l_exp_refs_nr);
1065 cfs_spin_unlock(&exp->exp_locks_list_guard);
1067 EXPORT_SYMBOL(__class_export_add_lock_ref);
1069 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1071 cfs_spin_lock(&exp->exp_locks_list_guard);
1072 LASSERT(lock->l_exp_refs_nr > 0);
1073 if (lock->l_exp_refs_target != exp) {
1074 LCONSOLE_WARN("lock %p, "
1075 "mismatching export pointers: %p, %p\n",
1076 lock, lock->l_exp_refs_target, exp);
1078 if (-- lock->l_exp_refs_nr == 0) {
1079 cfs_list_del_init(&lock->l_exp_refs_link);
1080 lock->l_exp_refs_target = NULL;
1082 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1083 lock, exp, lock->l_exp_refs_nr);
1084 cfs_spin_unlock(&exp->exp_locks_list_guard);
1086 EXPORT_SYMBOL(__class_export_del_lock_ref);
1089 /* A connection defines an export context in which preallocation can
1090 be managed. This releases the export pointer reference, and returns
1091 the export handle, so the export refcount is 1 when this function
1093 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1094 struct obd_uuid *cluuid)
1096 struct obd_export *export;
1097 LASSERT(conn != NULL);
1098 LASSERT(obd != NULL);
1099 LASSERT(cluuid != NULL);
1102 export = class_new_export(obd, cluuid);
1104 RETURN(PTR_ERR(export));
1106 conn->cookie = export->exp_handle.h_cookie;
1107 class_export_put(export);
1109 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1110 cluuid->uuid, conn->cookie);
1113 EXPORT_SYMBOL(class_connect);
1115 /* if export is involved in recovery then clean up related things */
1116 void class_export_recovery_cleanup(struct obd_export *exp)
1118 struct obd_device *obd = exp->exp_obd;
1120 cfs_spin_lock(&obd->obd_recovery_task_lock);
1121 if (exp->exp_delayed)
1122 obd->obd_delayed_clients--;
1123 if (obd->obd_recovering && exp->exp_in_recovery) {
1124 cfs_spin_lock(&exp->exp_lock);
1125 exp->exp_in_recovery = 0;
1126 cfs_spin_unlock(&exp->exp_lock);
1127 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1128 cfs_atomic_dec(&obd->obd_connected_clients);
1130 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1131 /** Cleanup req replay fields */
1132 if (exp->exp_req_replay_needed) {
1133 cfs_spin_lock(&exp->exp_lock);
1134 exp->exp_req_replay_needed = 0;
1135 cfs_spin_unlock(&exp->exp_lock);
1136 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1137 cfs_atomic_dec(&obd->obd_req_replay_clients);
1139 /** Cleanup lock replay data */
1140 if (exp->exp_lock_replay_needed) {
1141 cfs_spin_lock(&exp->exp_lock);
1142 exp->exp_lock_replay_needed = 0;
1143 cfs_spin_unlock(&exp->exp_lock);
1144 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1145 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1149 /* This function removes 1-3 references from the export:
1150 * 1 - for export pointer passed
1151 * and if disconnect really need
1152 * 2 - removing from hash
1153 * 3 - in client_unlink_export
1154 * The export pointer passed to this function can destroyed */
1155 int class_disconnect(struct obd_export *export)
1157 int already_disconnected;
1160 if (export == NULL) {
1161 CWARN("attempting to free NULL export %p\n", export);
1165 cfs_spin_lock(&export->exp_lock);
1166 already_disconnected = export->exp_disconnected;
1167 export->exp_disconnected = 1;
1168 cfs_spin_unlock(&export->exp_lock);
1170 /* class_cleanup(), abort_recovery(), and class_fail_export()
1171 * all end up in here, and if any of them race we shouldn't
1172 * call extra class_export_puts(). */
1173 if (already_disconnected) {
1174 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1175 GOTO(no_disconn, already_disconnected);
1178 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1179 export->exp_handle.h_cookie);
1181 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1182 cfs_hash_del(export->exp_obd->obd_nid_hash,
1183 &export->exp_connection->c_peer.nid,
1184 &export->exp_nid_hash);
1186 class_export_recovery_cleanup(export);
1187 class_unlink_export(export);
1189 class_export_put(export);
1192 EXPORT_SYMBOL(class_disconnect);
1194 /* Return non-zero for a fully connected export */
1195 int class_connected_export(struct obd_export *exp)
1199 cfs_spin_lock(&exp->exp_lock);
1200 connected = (exp->exp_conn_cnt > 0);
1201 cfs_spin_unlock(&exp->exp_lock);
1206 EXPORT_SYMBOL(class_connected_export);
1208 static void class_disconnect_export_list(cfs_list_t *list,
1209 enum obd_option flags)
1212 struct obd_export *exp;
1215 /* It's possible that an export may disconnect itself, but
1216 * nothing else will be added to this list. */
1217 while (!cfs_list_empty(list)) {
1218 exp = cfs_list_entry(list->next, struct obd_export,
1220 /* need for safe call CDEBUG after obd_disconnect */
1221 class_export_get(exp);
1223 cfs_spin_lock(&exp->exp_lock);
1224 exp->exp_flags = flags;
1225 cfs_spin_unlock(&exp->exp_lock);
1227 if (obd_uuid_equals(&exp->exp_client_uuid,
1228 &exp->exp_obd->obd_uuid)) {
1230 "exp %p export uuid == obd uuid, don't discon\n",
1232 /* Need to delete this now so we don't end up pointing
1233 * to work_list later when this export is cleaned up. */
1234 cfs_list_del_init(&exp->exp_obd_chain);
1235 class_export_put(exp);
1239 class_export_get(exp);
1240 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1241 "last request at "CFS_TIME_T"\n",
1242 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1243 exp, exp->exp_last_request_time);
1244 /* release one export reference anyway */
1245 rc = obd_disconnect(exp);
1247 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1248 obd_export_nid2str(exp), exp, rc);
1249 class_export_put(exp);
1254 void class_disconnect_exports(struct obd_device *obd)
1256 cfs_list_t work_list;
1259 /* Move all of the exports from obd_exports to a work list, en masse. */
1260 CFS_INIT_LIST_HEAD(&work_list);
1261 cfs_spin_lock(&obd->obd_dev_lock);
1262 cfs_list_splice_init(&obd->obd_exports, &work_list);
1263 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1264 cfs_spin_unlock(&obd->obd_dev_lock);
1266 if (!cfs_list_empty(&work_list)) {
1267 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1268 "disconnecting them\n", obd->obd_minor, obd);
1269 class_disconnect_export_list(&work_list,
1270 exp_flags_from_obd(obd));
1272 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1273 obd->obd_minor, obd);
1276 EXPORT_SYMBOL(class_disconnect_exports);
1278 /* Remove exports that have not completed recovery.
1280 void class_disconnect_stale_exports(struct obd_device *obd,
1281 int (*test_export)(struct obd_export *))
1283 cfs_list_t work_list;
1284 struct obd_export *exp, *n;
1288 CFS_INIT_LIST_HEAD(&work_list);
1289 cfs_spin_lock(&obd->obd_dev_lock);
1290 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1292 /* don't count self-export as client */
1293 if (obd_uuid_equals(&exp->exp_client_uuid,
1294 &exp->exp_obd->obd_uuid))
1297 cfs_spin_lock(&exp->exp_lock);
1298 if (test_export(exp)) {
1299 cfs_spin_unlock(&exp->exp_lock);
1302 exp->exp_failed = 1;
1303 cfs_spin_unlock(&exp->exp_lock);
1305 cfs_list_move(&exp->exp_obd_chain, &work_list);
1307 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1308 obd->obd_name, exp->exp_client_uuid.uuid,
1309 exp->exp_connection == NULL ? "<unknown>" :
1310 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1311 print_export_data(exp, "EVICTING", 0);
1313 cfs_spin_unlock(&obd->obd_dev_lock);
1316 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1317 obd->obd_name, evicted);
1318 obd->obd_stale_clients += evicted;
1320 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1321 OBD_OPT_ABORT_RECOV);
1324 EXPORT_SYMBOL(class_disconnect_stale_exports);
1326 void class_fail_export(struct obd_export *exp)
1328 int rc, already_failed;
1330 cfs_spin_lock(&exp->exp_lock);
1331 already_failed = exp->exp_failed;
1332 exp->exp_failed = 1;
1333 cfs_spin_unlock(&exp->exp_lock);
1335 if (already_failed) {
1336 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1337 exp, exp->exp_client_uuid.uuid);
1341 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1342 exp, exp->exp_client_uuid.uuid);
1344 if (obd_dump_on_timeout)
1345 libcfs_debug_dumplog();
1347 /* need for safe call CDEBUG after obd_disconnect */
1348 class_export_get(exp);
1350 /* Most callers into obd_disconnect are removing their own reference
1351 * (request, for example) in addition to the one from the hash table.
1352 * We don't have such a reference here, so make one. */
1353 class_export_get(exp);
1354 rc = obd_disconnect(exp);
1356 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1358 CDEBUG(D_HA, "disconnected export %p/%s\n",
1359 exp, exp->exp_client_uuid.uuid);
1360 class_export_put(exp);
1362 EXPORT_SYMBOL(class_fail_export);
1364 char *obd_export_nid2str(struct obd_export *exp)
1366 if (exp->exp_connection != NULL)
1367 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1371 EXPORT_SYMBOL(obd_export_nid2str);
1373 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1375 struct obd_export *doomed_exp = NULL;
1376 int exports_evicted = 0;
1378 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1381 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1382 if (doomed_exp == NULL)
1385 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1386 "nid %s found, wanted nid %s, requested nid %s\n",
1387 obd_export_nid2str(doomed_exp),
1388 libcfs_nid2str(nid_key), nid);
1389 LASSERTF(doomed_exp != obd->obd_self_export,
1390 "self-export is hashed by NID?\n");
1392 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1393 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1395 class_fail_export(doomed_exp);
1396 class_export_put(doomed_exp);
1399 if (!exports_evicted)
1400 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1401 obd->obd_name, nid);
1402 return exports_evicted;
1404 EXPORT_SYMBOL(obd_export_evict_by_nid);
1406 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1408 struct obd_export *doomed_exp = NULL;
1409 struct obd_uuid doomed_uuid;
1410 int exports_evicted = 0;
1412 obd_str2uuid(&doomed_uuid, uuid);
1413 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1414 CERROR("%s: can't evict myself\n", obd->obd_name);
1415 return exports_evicted;
1418 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1420 if (doomed_exp == NULL) {
1421 CERROR("%s: can't disconnect %s: no exports found\n",
1422 obd->obd_name, uuid);
1424 CWARN("%s: evicting %s at adminstrative request\n",
1425 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1426 class_fail_export(doomed_exp);
1427 class_export_put(doomed_exp);
1431 return exports_evicted;
1433 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1435 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1436 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1437 EXPORT_SYMBOL(class_export_dump_hook);
1440 static void print_export_data(struct obd_export *exp, const char *status,
1443 struct ptlrpc_reply_state *rs;
1444 struct ptlrpc_reply_state *first_reply = NULL;
1447 cfs_spin_lock(&exp->exp_lock);
1448 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1454 cfs_spin_unlock(&exp->exp_lock);
1456 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1457 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1458 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1459 cfs_atomic_read(&exp->exp_rpc_count),
1460 cfs_atomic_read(&exp->exp_cb_count),
1461 cfs_atomic_read(&exp->exp_locks_count),
1462 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1463 nreplies, first_reply, nreplies > 3 ? "..." : "",
1464 exp->exp_last_committed);
1465 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1466 if (locks && class_export_dump_hook != NULL)
1467 class_export_dump_hook(exp);
1471 void dump_exports(struct obd_device *obd, int locks)
1473 struct obd_export *exp;
1475 cfs_spin_lock(&obd->obd_dev_lock);
1476 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1477 print_export_data(exp, "ACTIVE", locks);
1478 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1479 print_export_data(exp, "UNLINKED", locks);
1480 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1481 print_export_data(exp, "DELAYED", locks);
1482 cfs_spin_unlock(&obd->obd_dev_lock);
1483 cfs_spin_lock(&obd_zombie_impexp_lock);
1484 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1485 print_export_data(exp, "ZOMBIE", locks);
1486 cfs_spin_unlock(&obd_zombie_impexp_lock);
1488 EXPORT_SYMBOL(dump_exports);
1490 void obd_exports_barrier(struct obd_device *obd)
1493 LASSERT(cfs_list_empty(&obd->obd_exports));
1494 cfs_spin_lock(&obd->obd_dev_lock);
1495 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1496 cfs_spin_unlock(&obd->obd_dev_lock);
1497 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1498 cfs_time_seconds(waited));
1499 if (waited > 5 && IS_PO2(waited)) {
1500 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1501 "more than %d seconds. "
1502 "The obd refcount = %d. Is it stuck?\n",
1503 obd->obd_name, waited,
1504 cfs_atomic_read(&obd->obd_refcount));
1505 dump_exports(obd, 1);
1508 cfs_spin_lock(&obd->obd_dev_lock);
1510 cfs_spin_unlock(&obd->obd_dev_lock);
1512 EXPORT_SYMBOL(obd_exports_barrier);
1514 /* Total amount of zombies to be destroyed */
1515 static int zombies_count = 0;
1518 * kill zombie imports and exports
1520 void obd_zombie_impexp_cull(void)
1522 struct obd_import *import;
1523 struct obd_export *export;
1527 cfs_spin_lock(&obd_zombie_impexp_lock);
1530 if (!cfs_list_empty(&obd_zombie_imports)) {
1531 import = cfs_list_entry(obd_zombie_imports.next,
1534 cfs_list_del_init(&import->imp_zombie_chain);
1538 if (!cfs_list_empty(&obd_zombie_exports)) {
1539 export = cfs_list_entry(obd_zombie_exports.next,
1542 cfs_list_del_init(&export->exp_obd_chain);
1545 cfs_spin_unlock(&obd_zombie_impexp_lock);
1547 if (import != NULL) {
1548 class_import_destroy(import);
1549 cfs_spin_lock(&obd_zombie_impexp_lock);
1551 cfs_spin_unlock(&obd_zombie_impexp_lock);
1554 if (export != NULL) {
1555 class_export_destroy(export);
1556 cfs_spin_lock(&obd_zombie_impexp_lock);
1558 cfs_spin_unlock(&obd_zombie_impexp_lock);
1562 } while (import != NULL || export != NULL);
1566 static cfs_completion_t obd_zombie_start;
1567 static cfs_completion_t obd_zombie_stop;
1568 static unsigned long obd_zombie_flags;
1569 static cfs_waitq_t obd_zombie_waitq;
1570 static pid_t obd_zombie_pid;
1573 OBD_ZOMBIE_STOP = 1 << 1
1577 * check for work for kill zombie import/export thread.
1579 static int obd_zombie_impexp_check(void *arg)
1583 cfs_spin_lock(&obd_zombie_impexp_lock);
1584 rc = (zombies_count == 0) &&
1585 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1586 cfs_spin_unlock(&obd_zombie_impexp_lock);
1592 * Add export to the obd_zombe thread and notify it.
1594 static void obd_zombie_export_add(struct obd_export *exp) {
1595 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1596 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1597 cfs_list_del_init(&exp->exp_obd_chain);
1598 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1599 cfs_spin_lock(&obd_zombie_impexp_lock);
1601 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1602 cfs_spin_unlock(&obd_zombie_impexp_lock);
1604 obd_zombie_impexp_notify();
1608 * Add import to the obd_zombe thread and notify it.
1610 static void obd_zombie_import_add(struct obd_import *imp) {
1611 LASSERT(imp->imp_sec == NULL);
1612 LASSERT(imp->imp_rq_pool == NULL);
1613 cfs_spin_lock(&obd_zombie_impexp_lock);
1614 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1616 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1617 cfs_spin_unlock(&obd_zombie_impexp_lock);
1619 obd_zombie_impexp_notify();
1623 * notify import/export destroy thread about new zombie.
1625 static void obd_zombie_impexp_notify(void)
1628 * Make sure obd_zomebie_impexp_thread get this notification.
1629 * It is possible this signal only get by obd_zombie_barrier, and
1630 * barrier gulps this notification and sleeps away and hangs ensues
1632 cfs_waitq_broadcast(&obd_zombie_waitq);
1636 * check whether obd_zombie is idle
1638 static int obd_zombie_is_idle(void)
1642 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1643 cfs_spin_lock(&obd_zombie_impexp_lock);
1644 rc = (zombies_count == 0);
1645 cfs_spin_unlock(&obd_zombie_impexp_lock);
1650 * wait when obd_zombie import/export queues become empty
1652 void obd_zombie_barrier(void)
1654 struct l_wait_info lwi = { 0 };
1656 if (obd_zombie_pid == cfs_curproc_pid())
1657 /* don't wait for myself */
1659 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1661 EXPORT_SYMBOL(obd_zombie_barrier);
1666 * destroy zombie export/import thread.
1668 static int obd_zombie_impexp_thread(void *unused)
1672 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1673 cfs_complete(&obd_zombie_start);
1677 cfs_complete(&obd_zombie_start);
1679 obd_zombie_pid = cfs_curproc_pid();
1681 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1682 struct l_wait_info lwi = { 0 };
1684 l_wait_event(obd_zombie_waitq,
1685 !obd_zombie_impexp_check(NULL), &lwi);
1686 obd_zombie_impexp_cull();
1689 * Notify obd_zombie_barrier callers that queues
1692 cfs_waitq_signal(&obd_zombie_waitq);
1695 cfs_complete(&obd_zombie_stop);
1700 #else /* ! KERNEL */
1702 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1703 static void *obd_zombie_impexp_work_cb;
1704 static void *obd_zombie_impexp_idle_cb;
1706 int obd_zombie_impexp_kill(void *arg)
1710 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1711 obd_zombie_impexp_cull();
1714 cfs_atomic_dec(&zombie_recur);
1721 * start destroy zombie import/export thread
1723 int obd_zombie_impexp_init(void)
1727 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1728 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1729 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1730 cfs_init_completion(&obd_zombie_start);
1731 cfs_init_completion(&obd_zombie_stop);
1732 cfs_waitq_init(&obd_zombie_waitq);
1736 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1740 cfs_wait_for_completion(&obd_zombie_start);
1743 obd_zombie_impexp_work_cb =
1744 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1745 &obd_zombie_impexp_kill, NULL);
1747 obd_zombie_impexp_idle_cb =
1748 liblustre_register_idle_callback("obd_zombi_impexp_check",
1749 &obd_zombie_impexp_check, NULL);
1755 * stop destroy zombie import/export thread
1757 void obd_zombie_impexp_stop(void)
1759 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1760 obd_zombie_impexp_notify();
1762 cfs_wait_for_completion(&obd_zombie_stop);
1764 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1765 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1769 /***** Kernel-userspace comm helpers *******/
1771 /* Get length of entire message, including header */
1772 int kuc_len(int payload_len)
1774 return sizeof(struct kuc_hdr) + payload_len;
1776 EXPORT_SYMBOL(kuc_len);
1778 /* Get a pointer to kuc header, given a ptr to the payload
1779 * @param p Pointer to payload area
1780 * @returns Pointer to kuc header
1782 struct kuc_hdr * kuc_ptr(void *p)
1784 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1785 LASSERT(lh->kuc_magic == KUC_MAGIC);
1788 EXPORT_SYMBOL(kuc_ptr);
1790 /* Test if payload is part of kuc message
1791 * @param p Pointer to payload area
1794 int kuc_ispayload(void *p)
1796 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1798 if (kh->kuc_magic == KUC_MAGIC)
1803 EXPORT_SYMBOL(kuc_ispayload);
1805 /* Alloc space for a message, and fill in header
1806 * @return Pointer to payload area
1808 void *kuc_alloc(int payload_len, int transport, int type)
1811 int len = kuc_len(payload_len);
1815 return ERR_PTR(-ENOMEM);
1817 lh->kuc_magic = KUC_MAGIC;
1818 lh->kuc_transport = transport;
1819 lh->kuc_msgtype = type;
1820 lh->kuc_msglen = len;
1822 return (void *)(lh + 1);
1824 EXPORT_SYMBOL(kuc_alloc);
1826 /* Takes pointer to payload area */
1827 inline void kuc_free(void *p, int payload_len)
1829 struct kuc_hdr *lh = kuc_ptr(p);
1830 OBD_FREE(lh, kuc_len(payload_len));
1832 EXPORT_SYMBOL(kuc_free);